上周花了一点工夫从头到尾、从无到有地搭建了一套 RocketMQ 的环境,感觉还挺 easy 的,所以就写篇文章分享给大家。
整篇文章能够大抵分为三个局部,第一局部属于一些外围概念和工作流程的解说;第二局部就是纯手动搭建了一套环境;第三局部是基于环境进行测试和集成到 SpringBoot,因为整个过程讲的比拟细,所以我称之为“保姆级教程”。
好了,废话补多少,间接进入主题。
前言
RocketMQ 是阿里巴巴旗下一款开源的 MQ 框架,经验过双十一考验、Java 编程语言实现,有十分好残缺生态系统。RocketMQ 作为一款纯 java、分布式、队列模型的开源消息中间件,反对事务音讯、程序音讯、批量音讯、定时音讯、音讯回溯等,总之就是葛大爷的一句话
外围概念
NameServer:能够了解为是一个注册核心,次要是用来保留 topic 路由信息,治理 Broker。在 NameServer 的集群中,NameServer 与 NameServer 之间是没有任何通信的。
Broker:外围的一个角色,次要是用来保留 topic 的信息,承受生产者产生的音讯,长久化音讯。在一个 Broker 集群中,雷同的 BrokerName 能够称为一个 Broker 组,一个 Broker 组中,BrokerId 为 0 的为主节点,其它的为从节点。BrokerName 和 BrokerId 是能够在 Broker 启动时通过配置文件配置的。每个 Broker 组只寄存一部分音讯。
生产者:生产音讯的一方就是生产者
生产者组:一个生产者组能够有很多生产者,只须要在创立生产者的时候指定生产者组,那么这个生产者就在那个生产者组
消费者:用来生产生产者音讯的一方
消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组能够有很多的消费者,不同的消费者组生产音讯是互不影响的。
topic(主题):能够了解为一个音讯的汇合的名字,生产者在发送音讯的时候须要指定发到哪个 topic 下,消费者生产音讯的时候也须要晓得本人生产的是哪些 topic 底下的音讯。
Tag(子主题):比 topic 低一级,能够用来辨别同一 topic 下的不同业务类型的音讯,发送音讯的时候也须要指定。
这里有组的概念是因为能够用来做到不同的生产者组或者消费者组有不同的配置,这样就能够使得生产者或者消费者更加灵便。
工作流程
说完外围概念,再来说一下外围的工作流程,这里我先画了一张图。
通过这张图就能够很分明的晓得,RocketMQ 大抵的工作流程:
Broker 启动的时候,会往每台 NameServer(因为 NameServer 之间不通信,所以每台都得注册)注册本人的信息,这些信息包含本人的 ip 和端口号,本人这台 Broker 有哪些 topic 等信息。
Producer 在启动之后会跟会 NameServer 建设连贯,定期从 NameServer 中获取 Broker 的信息,当发送音讯的时候,会依据音讯须要发送到哪个 topic 去找对应的 Broker 地址,如果有的话,就向这台 Broker 发送申请;没有找到的话,就看依据是否容许主动创立 topic 来决定是否发送音讯。
Broker 在接管到 Producer 的音讯之后,会将音讯存起来,长久化,如果有从节点的话,也会被动同步给从节点,实现数据的备份
Consumer 启动之后也会跟会 NameServer 建设连贯,定期从 NameServer 中获取 Broker 和对应 topic 的信息,而后依据本人须要订阅的 topic 信息找到对应的 Broker 的地址,而后跟 Broker 建设连贯,获取音讯,进行生产
就跟下面的图一样,整体的工作流程还是比较简单的,这里我简化了很多概念,次要是为了好了解。
环境搭建
终于讲完了一些简略的概念,接下来就来搭建一套 RocketMQ 的环境。
通过下面剖析,咱们晓得,在 RocketMQ 中有 NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务零碎,所以这里不须要搭建,真正要搭建的就是 NameServer 和 Broker,然而为了不便 RocketMQ 数据的可视化,这里我多搭建一套可视化的服务。
搭建过程比较简单,依照步骤一步一步来就能够实现,如果提醒一些命令不存在,那么间接通过 yum 装置这些命令就行。
一、筹备
须要筹备一个 linux 服务器,须要先装置好 JDK
敞开防火墙
systemctl stop firewalld
systemctl disable firewalld
复制代码
下载并解压 RocketMQ
1、创立一个目录,用来寄存 rocketmq 相干的货色
mkdir /usr/rocketmq
cd /usr/rocketmq
复制代码
2、下载并解压 rocketmq
下载
wget https://archive.apache.org/di…
复制代码
解压
unzip rocketmq-all-4.7.1-bin-release.zip
复制代码
看到这一个文件夹就实现了
而后进入 rocketmq-all-4.7.1-bin-release 文件夹
cd rocketmq-all-4.7.1-bin-release
复制代码
RocketMQ 的货色都在这了
二、搭建 NameServer
批改 jvm 参数
在启动 NameServer 之前,强烈建议批改一下启动时的 jvm 参数,因为默认的参数都比拟大,为了防止内存不够,倡议批改小,当然,如果你的内存足够大,能够疏忽。
vi bin/runserver.sh
复制代码
批改画圈的这一行
这里你能够间接批改成跟我一样的
-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m
复制代码
启动 NameServer
批改完之后,执行如下命令就能够启动 NameServer 了
nohup sh bin/mqnamesrv &
复制代码
查看 NameServer 日志
tail -f ~/logs/rocketmqlogs/namesrv.log
复制代码
如果看到如下的日志,就阐明启动胜利了
NameServer 日志
三、搭建 Broker
这里启动单机版的 Broker
批改 jvm 参数
跟启动 NameServer 一样,也倡议去批改 jvm 参数
vi bin/runbroker.sh
复制代码
将画圈的中央设置小点,当然也别太小啊
当然你还是能够跟我设置的一样
-server -Xms1g -Xmx1g -Xmn512m
复制代码
批改 Broker 配置文件 broker.conf
这里须要改一下 Broker 配置文件,须要指定 NameServer 的地址,因为须要 Broker 须要往 NameServer 注册
vi conf/broker.conf
复制代码
Broker 配置文件
Broker 配置文件
这里就能看出 Broker 的配置了,什么 Broker 集群的名称啊,Broker 的名称啊,Broker 的 id 啊,都跟后面说的对上了。
在文件开端追加地址
namesrvAddr = localhost:9876
复制代码
因为 NameServer 跟 Broker 在同一台机器,所以是 localhost,NameServer 端口默认的是 9876。
不过这里我还倡议再批改一处信息,因为 Broker 向 NameServer 进行注册的时候,带过来的 ip 如果不指定就会主动获取,然而主动获取的有个坑,就是有可能你的电脑无法访问到这个主动获取的 ip,所以我倡议手动指定你的电脑能够拜访到的服务器 ip。
我的虚拟机的 ip 是 192.168.200.143,所以就指定为 192.168.200.143,如下
brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143
复制代码
如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的
启动 Broker
nohup sh bin/mqbroker -c conf/broker.conf &
复制代码
-c 参数就是指定配置文件
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
复制代码
当看到如下日志就阐明启动胜利了
四、搭建可视化控制台
其实后面 NameServer 和 Broker 搭建实现之后,就能够用来收发音讯了,然而为了更加直观,能够搭一套可视化的服务。
可视化服务其实就是一个 jar 包,启动就行了。
jar 包能够从这获取
链接:pan.baidu.com/s/16s1qwY2q…
提取码:s0sd
将 jar 包上传到服务器,放到 /usr/rocketmq 的目录底下,当然放哪都无所谓,这里只是为了不便,因为 rocketmq 的货色都在这里
而后进入 /usr/rocketmq 下,执行如下命名
nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &
复制代码
rocketmq.config.namesrvAddr 就是用来指定 NameServer 的地址的
查看日志
tail -f ~/logs/consolelogs/rocketmq-console.log
复制代码
当看到如下日志,就阐明启动胜利了
而后在浏览器中输出 http://linux 服务器的 ip:8088/ 就能够看到控制台了,如果无法访问,能够看看防火墙有没有敞开
右上角能够把语言切换成中文
Broker 集群信息
topic 信息
通过控制台能够查看生产者、消费者、Broker 集群等信息,十分直观。
性能很多,这里就不一一介绍了。
测试
环境搭好之后,就能够进行测试了。
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
复制代码
生产者发送音讯
public class Producer {
public static void main(String[] args) throws Exception {
// 创立一个生产者,指定生产者组为 sanyouProducer
DefaultMQProducer producer = new DefaultMQProducer(“sanyouProducer”);
// 指定 NameServer 的地址
producer.setNamesrvAddr(“192.168.200.143:9876”);
// 第一次发送可能会超时,我设置的比拟大
producer.setSendMsgTimeout(60000);
// 启动生产者
producer.start();
// 创立一条音讯
// topic 为 sanyouTopic
// 音讯内容为 三友的 java 日记
// tags 为 TagA
Message msg = new Message(“sanyouTopic”, “TagA”, “ 三友的 java 日记 “.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送音讯并失去音讯的发送后果,而后打印
SendResult sendResult = producer.send(msg);
System.out.printf(“%s%n”, sendResult);
// 敞开生产者
producer.shutdown();
}
}
复制代码
构建一个音讯生产者 DefaultMQProducer 实例,而后指定生产者组为 sanyouProducer;
指定 NameServer 的地址:服务器的 ip:9876,因为须要从 NameServer 拉取 Broker 的信息
producer.start() 启动生产者
构建一个内容为三友的 java 日记的音讯,而后指定这个音讯往 sanyouTopic 这个 topic 发送
producer.send(msg):发送音讯,打印后果
敞开生产者
运行后果如下
SendResult [sendStatus=SEND_OK, msgId=C0A81FAF54F818B4AAC2475FD2010000, offsetMsgId=C0A8C88F00002A9F000000000009AE55, messageQueue=MessageQueue [topic=sanyouTopic, brokerName=broker-a, queueId=0], queueOffset=0]
复制代码
sendStatus=SEND_OK 阐明发送胜利了,此时就能后控制台看到未生产的音讯了。
到控制台看到音讯那块,而后选定发送的 topic,查问的工夫范畴手动再选一下,不选就查不进去 (我狐疑这是个 bug),而后查问就能看到了一条音讯。
而后点击一下 MESSAGE DETAIL 就可能看到详情。
这里就能看到发送音讯的详细信息。
左下角音讯的生产的生产,因为咱们还没有消费者订阅这个 topic,所以左下角没数据。
消费者生产音讯
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 通过 push 模式生产音讯,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“sanyouConsumer”);
// 指定 NameServer 的地址
consumer.setNamesrvAddr(“192.168.200.143:9876”);
// 订阅这个 topic 下的所有的音讯
consumer.subscribe(“sanyouTopic”, “*”);
// 注册一个生产的监听器,当有音讯的时候,会回调这个监听器来生产音讯
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf(“ 生产音讯:%s”, new String(msg.getBody()) + “\n”);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf(“Consumer Started.%n”);
}
}
复制代码
创立一个消费者实例对象,指定消费者组为 sanyouConsumer
指定 NameServer 的地址:服务器的 ip:9876
订阅 sanyouTopic 这个 topic 的所有信息
consumer.registerMessageListener,这个很重要,是注册一个监听器,这个监听器是当有音讯的时候就会回调这个监听器,解决音讯,所以须要用户实现这个接口,而后解决音讯。
启动消费者
启动之后,消费者就会生产方才生产者发送的音讯,于是控制台就打印出如下信息
Consumer Started.
生产音讯: 三友的 java 日记
复制代码
此时再去看控制台
发现被 sanyouConsumer 这个消费者组给生产了。
SpringBoot 环境下集成 RocketMQ
集成
在理论我的项目中必定不会像下面测试那样用,都是集成 SpringBoot 的。
1、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
复制代码
2、yml 配置
rocketmq:
producer:
group: sanyouProducer
name-server: 192.168.200.143:9876
复制代码
3、创立消费者
SpringBoot 底下只须要实现 RocketMQListener 接口,而后加上 @RocketMQMessageListener 注解即可
@Component
@RocketMQMessageListener(consumerGroup = “sanyouConsumer”, topic = “sanyouTopic”)
public class SanYouTopicListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println(“ 解决音讯:” + msg);
}
}
复制代码
@RocketMQMessageListener 须要指定消费者属于哪个消费者组,生产哪个 topic,NameServer 的地址曾经通过 yml 配置文件配置类
4、测试
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class RocketMQTest {
@Autowired
private RocketMQTemplate template;
@Test
public void send() throws InterruptedException {
template.convertAndSend(“sanyouTopic”, “ 三友的 java 日记 ”);
Thread.sleep(60000);
}
}
复制代码
间接注入一个 RocketMQTemplate,而后通过 RocketMQTemplate 发送音讯。
运行后果如下:
解决音讯: 三友的 java 日记
复制代码
确实生产到音讯了。
原理
其实原理是一样的,只不过在 SpringBoot 中给封装了一层,让应用起来更加简略。
1、RocketMQTemplate 结构代码
所以从这能够看出,最终在结构 RocketMQTemplate 的时候,传入了一个 DefaultMQProducer,所以可想而知,最终 RocketMQTemplate 发送音讯也是通过 DefaultMQProducer 发送的。
2、@RocketMQMessageListener 注解解决
从这能够看出,会为每一个加了 @RocketMQMessageListener 注解的对象创立一个 DefaultMQPushConsumer,所以最终也是通过 DefaultMQPushConsumer 生产音讯的。
至于监听器,是在这
遍历每条音讯,而后调用 handleMessage,最终会调用实现了 RocketMQListener 的对象解决音讯。
最初
通过下面的实践介绍和理论的环境搭建再加上代码的测试,置信应该能够对 RocketMQ 有个入门,有趣味的小伙伴能够手动搭起来,整个过程顺利的话可能就十几二十分钟这样子。
最初我再说一句,从文章整体也能够看出本文没有波及太深刻的一些机制和原理的解说,比方音讯是如何存储的,事务和提早音讯是如何实现的,主从是如何同步的等等,甚至压根就没提到队列这个词,次要是因为我打算前面再写一篇文章,来独自分析这些机制和原理。
最初的最初,本文所有的代码地址:
github.com/sanyou3/roc…