共计 4392 个字符,预计需要花费 11 分钟才能阅读完成。
标签:RocketMq5.Dashboard;
一、简介
RocketMQ 因其架构简略、业务功能丰富、具备极强可扩展性等特点被广泛应用,比方金融业务、互联网、大数据、物联网等畛域的业务场景;
二、环境部署
1、编译打包
1、下载 5.0 版本源码包 | |
rocketmq-all-5.0.0-source-release.zip | |
2、解压后进入目录,编译打包 | |
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U |
2、批改配置
在 distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh
3、服务启动
1、该目录下 | |
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/ | |
2、启动 NameServer | |
sh mqnamesrv | |
输入日志 | |
The Name Server boot success. serializeType=JSON | |
3、启动 Broker+Proxy | |
sh mqbroker -n localhost:9876 --enable-proxy | |
输入日志 | |
rocketmq-proxy startup successfully | |
4、敞开服务 | |
sh mqshutdown namesrv | |
Send shutdown request to mqnamesrv(18636) OK | |
sh mqshutdown broker | |
Send shutdown request to mqbroker with proxy enable OK(18647) |
4、控制台装置
1、下载 master 源码包 | |
rocketmq-dashboard-master | |
2、解压后进入目录,编译打包 | |
mvn clean package -Dmaven.test.skip=true | |
3、启动服务 | |
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar | |
4、输入日志 | |
INFO main - Tomcat started on port(s): 8080 (http) with context path '' | |
5、拜访服务:localhost:8080 |
三、工程搭建
1、工程构造
2、依赖治理
在 rocketmq-starter
组件中,实际上依赖的是 rocketmq-client
组件的 5.0
版本,因为两个新版框架间的兼容问题,须要增加相干配置解决该问题;
<dependency> | |
<groupId>org.apache.rocketmq</groupId> | |
<artifactId>rocketmq-spring-boot-starter</artifactId> | |
<version>${rocketmq-starter.version}</version> | |
</dependency> |
3、配置文件
配置 RocketMq 服务地址,音讯生产者和消费者;
rocketmq: | |
name-server: 127.0.0.1:9876 | |
# 生产者 | |
producer: | |
group: boot_group_1 | |
# 音讯发送超时工夫 | |
send-message-timeout: 3000 | |
# 音讯最大长度 4M | |
max-message-size: 4096 | |
# 音讯发送失败重试次数 | |
retry-times-when-send-failed: 3 | |
# 异步音讯发送失败重试次数 | |
retry-times-when-send-async-failed: 2 | |
# 消费者 | |
consumer: | |
group: boot_group_1 | |
# 每次提取的最大音讯数 | |
pull-batch-size: 5 |
4、配置类
在配置类中次要定义两个 Bean 的加载,即 RocketMQTemplate
和DefaultMQProducer
,次要是提供音讯发送的能力,即生产音讯;
@Configuration | |
public class RocketMqConfig {@Value("${rocketmq.name-server}") | |
private String nameServer; | |
@Value("${rocketmq.producer.group}") | |
private String producerGroup; | |
@Value("${rocketmq.producer.send-message-timeout}") | |
private Integer sendMsgTimeout; | |
@Value("${rocketmq.producer.max-message-size}") | |
private Integer maxMessageSize; | |
@Value("${rocketmq.producer.retry-times-when-send-failed}") | |
private Integer retryTimesWhenSendFailed ; | |
@Value("${rocketmq.producer.retry-times-when-send-async-failed}") | |
private Integer retryTimesWhenSendAsyncFailed ; | |
@Bean | |
public RocketMQTemplate rocketMqTemplate(){RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); | |
rocketMqTemplate.setProducer(defaultMqProducer()); | |
return rocketMqTemplate; | |
} | |
@Bean | |
public DefaultMQProducer defaultMqProducer() {DefaultMQProducer producer = new DefaultMQProducer(); | |
producer.setNamesrvAddr(this.nameServer); | |
producer.setProducerGroup(this.producerGroup); | |
producer.setSendMsgTimeout(this.sendMsgTimeout); | |
producer.setMaxMessageSize(this.maxMessageSize); | |
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); | |
producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed); | |
return producer; | |
} | |
} |
四、根底用法
1、音讯生产
编写一个生产者接口类,别离应用 RocketMQTemplate
和DefaultMQProducer
实现音讯发送的性能,而后能够通过 Dashboard
控制面板查看音讯详情;
@RestController | |
public class ProducerWeb {private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class); | |
@Autowired | |
private RocketMQTemplate rocketMqTemplate; | |
@GetMapping("/send/msg1") | |
public String sendMsg1 (){ | |
try { | |
// 构建音讯主体 | |
JsonMapper jsonMapper = new JsonMapper(); | |
String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg")); | |
// 发送音讯 | |
rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
return "OK" ; | |
} | |
@Autowired | |
private DefaultMQProducer defaultMqProducer ; | |
@GetMapping("/send/msg2") | |
public String sendMsg2 (){ | |
try { | |
// 构建音讯主体 | |
JsonMapper jsonMapper = new JsonMapper(); | |
String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg")); | |
// 构建音讯对象 | |
Message message = new Message(); | |
message.setTopic("boot-mq-topic"); | |
message.setTags("boot-mq-tag"); | |
message.setKeys("boot-mq-key"); | |
message.setBody(msgBody.getBytes()); | |
// 发送音讯,打印日志 | |
SendResult sendResult = defaultMqProducer.send(message); | |
log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus()); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
return "OK" ; | |
} | |
} |
2、音讯生产
编写音讯监听类,实现 RocketMQListener
接口,通过 RocketMQMessageListener
注解管制监听的具体信息;
@Service | |
@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic") | |
public class ConsumerListener implements RocketMQListener<String> {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); | |
@Override | |
public void onMessage(String message) {log.info("\n=====\n message:{} \n=====\n",message); | |
} | |
} |
五、参考源码
文档仓库:https://gitee.com/cicadasmile/butte-java-note | |
源码仓库:https://gitee.com/cicadasmile/butte-spring-parent |
正文完