标签:RocketMq5.Dashboard;

一、简介

RocketMQ因其架构简略、业务功能丰富、具备极强可扩展性等特点被广泛应用,比方金融业务、互联网、大数据、物联网等畛域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包rocketmq-all-5.0.0-source-release.zip2、解压后进入目录,编译打包mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

2、批改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

3、服务启动

1、该目录下distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/2、启动NameServersh mqnamesrv输入日志The Name Server boot success. serializeType=JSON3、启动Broker+Proxysh mqbroker -n localhost:9876 --enable-proxy输入日志rocketmq-proxy startup successfully4、敞开服务sh mqshutdown namesrvSend shutdown request to mqnamesrv(18636) OKsh mqshutdown brokerSend shutdown request to mqbroker with proxy enable OK(18647)

4、控制台装置

1、下载master源码包rocketmq-dashboard-master2、解压后进入目录,编译打包mvn clean package -Dmaven.test.skip=true3、启动服务java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar4、输入日志INFO main - Tomcat started on port(s): 8080 (http) with context path ''5、拜访服务:localhost:8080

三、工程搭建

1、工程构造

2、依赖治理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,因为两个新版框架间的兼容问题,须要增加相干配置解决该问题;

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>${rocketmq-starter.version}</version></dependency>

3、配置文件

配置RocketMq服务地址,音讯生产者和消费者;

rocketmq:  name-server: 127.0.0.1:9876  # 生产者  producer:    group: boot_group_1    # 音讯发送超时工夫    send-message-timeout: 3000    # 音讯最大长度4M    max-message-size: 4096    # 音讯发送失败重试次数    retry-times-when-send-failed: 3    # 异步音讯发送失败重试次数    retry-times-when-send-async-failed: 2  # 消费者  consumer:    group: boot_group_1    # 每次提取的最大音讯数    pull-batch-size: 5

4、配置类

在配置类中次要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,次要是提供音讯发送的能力,即生产音讯;

@Configurationpublic class RocketMqConfig {    @Value("${rocketmq.name-server}")    private String nameServer;    @Value("${rocketmq.producer.group}")    private String producerGroup;    @Value("${rocketmq.producer.send-message-timeout}")    private Integer sendMsgTimeout;    @Value("${rocketmq.producer.max-message-size}")    private Integer maxMessageSize;    @Value("${rocketmq.producer.retry-times-when-send-failed}")    private Integer retryTimesWhenSendFailed ;    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")    private Integer retryTimesWhenSendAsyncFailed ;    @Bean    public RocketMQTemplate rocketMqTemplate(){        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();        rocketMqTemplate.setProducer(defaultMqProducer());        return rocketMqTemplate;    }    @Bean    public DefaultMQProducer defaultMqProducer() {        DefaultMQProducer producer = new DefaultMQProducer();        producer.setNamesrvAddr(this.nameServer);        producer.setProducerGroup(this.producerGroup);        producer.setSendMsgTimeout(this.sendMsgTimeout);        producer.setMaxMessageSize(this.maxMessageSize);        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);        return producer;    }}

四、根底用法

1、音讯生产

编写一个生产者接口类,别离应用RocketMQTemplateDefaultMQProducer实现音讯发送的性能,而后能够通过Dashboard控制面板查看音讯详情;

@RestControllerpublic class ProducerWeb {    private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);    @Autowired    private RocketMQTemplate rocketMqTemplate;    @GetMapping("/send/msg1")    public String sendMsg1 (){        try {            // 构建音讯主体            JsonMapper jsonMapper = new JsonMapper();            String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));            // 发送音讯            rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);        } catch (Exception e) {            e.printStackTrace();        }        return "OK" ;    }    @Autowired    private DefaultMQProducer defaultMqProducer ;    @GetMapping("/send/msg2")    public String sendMsg2 (){        try {            // 构建音讯主体            JsonMapper jsonMapper = new JsonMapper();            String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));            // 构建音讯对象            Message message = new Message();            message.setTopic("boot-mq-topic");            message.setTags("boot-mq-tag");            message.setKeys("boot-mq-key");            message.setBody(msgBody.getBytes());            // 发送音讯,打印日志            SendResult sendResult = defaultMqProducer.send(message);            log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());        } catch (Exception e) {            e.printStackTrace();        }        return "OK" ;    }}

2、音讯生产

编写音讯监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解管制监听的具体信息;

@Service@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")public class ConsumerListener implements RocketMQListener<String> {    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);    @Override    public void onMessage(String message) {        log.info("\n=====\n message:{} \n=====\n",message);    }}

五、参考源码

文档仓库:https://gitee.com/cicadasmile/butte-java-note源码仓库:https://gitee.com/cicadasmile/butte-spring-parent