上周花了一点工夫从头到尾、从无到有地搭建了一套RocketMQ的环境,感觉还挺easy的,所以就写篇文章分享给大家。
整篇文章能够大抵分为三个局部,第一局部属于一些外围概念和工作流程的解说;第二局部就是纯手动搭建了一套环境;第三局部是基于环境进行测试和集成到SpringBoot,因为整个过程讲的比拟细,所以我称之为“保姆级教程”。
好了,废话补多少,间接进入主题。
前言
RocketMQ是阿里巴巴旗下一款开源的MQ框架,经验过双十一考验、Java编程语言实现,有十分好残缺生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,反对事务音讯、程序音讯、批量音讯、定时音讯、音讯回溯等,总之就是葛大爷的一句话

外围概念

NameServer:能够了解为是一个注册核心,次要是用来保留topic路由信息,治理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
Broker:外围的一个角色,次要是用来保留topic的信息,承受生产者产生的音讯,长久化音讯。在一个Broker集群中,雷同的BrokerName能够称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是能够在Broker启动时通过配置文件配置的。每个Broker组只寄存一部分音讯。
生产者:生产音讯的一方就是生产者
生产者组:一个生产者组能够有很多生产者,只须要在创立生产者的时候指定生产者组,那么这个生产者就在那个生产者组
消费者:用来生产生产者音讯的一方
消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组能够有很多的消费者,不同的消费者组生产音讯是互不影响的。
topic(主题) :能够了解为一个音讯的汇合的名字,生产者在发送音讯的时候须要指定发到哪个topic下,消费者生产音讯的时候也须要晓得本人生产的是哪些topic底下的音讯。
Tag(子主题) :比topic低一级,能够用来辨别同一topic下的不同业务类型的音讯,发送音讯的时候也须要指定。

这里有组的概念是因为能够用来做到不同的生产者组或者消费者组有不同的配置,这样就能够使得生产者或者消费者更加灵便。
工作流程
说完外围概念,再来说一下外围的工作流程,这里我先画了一张图。

通过这张图就能够很分明的晓得,RocketMQ大抵的工作流程:

Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册本人的信息,这些信息包含本人的ip和端口号,本人这台Broker有哪些topic等信息。
Producer在启动之后会跟会NameServer建设连贯,定期从NameServer中获取Broker的信息,当发送音讯的时候,会依据音讯须要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送申请;没有找到的话,就看依据是否容许主动创立topic来决定是否发送音讯。
Broker在接管到Producer的音讯之后,会将音讯存起来,长久化,如果有从节点的话,也会被动同步给从节点,实现数据的备份
Consumer启动之后也会跟会NameServer建设连贯,定期从NameServer中获取Broker和对应topic的信息,而后依据本人须要订阅的topic信息找到对应的Broker的地址,而后跟Broker建设连贯,获取音讯,进行生产

就跟下面的图一样,整体的工作流程还是比较简单的,这里我简化了很多概念,次要是为了好了解。
环境搭建
终于讲完了一些简略的概念,接下来就来搭建一套RocketMQ的环境。
通过下面剖析,咱们晓得,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务零碎,所以这里不须要搭建,真正要搭建的就是NameServer和Broker,然而为了不便RocketMQ数据的可视化,这里我多搭建一套可视化的服务。
搭建过程比较简单,依照步骤一步一步来就能够实现,如果提醒一些命令不存在,那么间接通过yum装置这些命令就行。
一、筹备
须要筹备一个linux服务器,须要先装置好JDK
敞开防火墙
systemctl stop firewalld
systemctl disable firewalld
复制代码
下载并解压RocketMQ
1、创立一个目录,用来寄存rocketmq相干的货色
mkdir /usr/rocketmq
cd /usr/rocketmq
复制代码
2、下载并解压rocketmq
下载
wget https://archive.apache.org/di...
复制代码
解压
unzip rocketmq-all-4.7.1-bin-release.zip
复制代码
看到这一个文件夹就实现了

而后进入rocketmq-all-4.7.1-bin-release文件夹
cd rocketmq-all-4.7.1-bin-release
复制代码
RocketMQ的货色都在这了

二、搭建NameServer
批改jvm参数
在启动NameServer之前,强烈建议批改一下启动时的jvm参数,因为默认的参数都比拟大,为了防止内存不够,倡议批改小,当然,如果你的内存足够大,能够疏忽。
vi bin/runserver.sh
复制代码
批改画圈的这一行

这里你能够间接批改成跟我一样的
-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m
复制代码
启动NameServer
批改完之后,执行如下命令就能够启动NameServer了
nohup sh bin/mqnamesrv &
复制代码
查看NameServer日志
tail -f ~/logs/rocketmqlogs/namesrv.log
复制代码
如果看到如下的日志,就阐明启动胜利了

NameServer日志
三、搭建Broker
这里启动单机版的Broker
批改jvm参数
跟启动NameServer一样,也倡议去批改jvm参数
vi bin/runbroker.sh
复制代码
将画圈的中央设置小点,当然也别太小啊

当然你还是能够跟我设置的一样
-server -Xms1g -Xmx1g -Xmn512m
复制代码
批改Broker配置文件broker.conf
这里须要改一下Broker配置文件,须要指定NameServer的地址,因为须要Broker须要往NameServer注册
vi conf/broker.conf
复制代码
Broker配置文件

Broker配置文件
这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟后面说的对上了。
在文件开端追加地址
namesrvAddr = localhost:9876
复制代码
因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。
不过这里我还倡议再批改一处信息,因为Broker向NameServer进行注册的时候,带过来的ip如果不指定就会主动获取,然而主动获取的有个坑,就是有可能你的电脑无法访问到这个主动获取的ip,所以我倡议手动指定你的电脑能够拜访到的服务器ip。
我的虚拟机的ip是192.168.200.143,所以就指定为192.168.200.143,如下
brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143
复制代码
如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

启动Broker
nohup sh bin/mqbroker -c conf/broker.conf &
复制代码
-c 参数就是指定配置文件
查看日志
tail -f ~/logs/rocketmqlogs/broker.log
复制代码
当看到如下日志就阐明启动胜利了

四、搭建可视化控制台
其实后面NameServer和Broker搭建实现之后,就能够用来收发音讯了,然而为了更加直观,能够搭一套可视化的服务。
可视化服务其实就是一个jar包,启动就行了。
jar包能够从这获取

链接:pan.baidu.com/s/16s1qwY2q…
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了不便,因为rocketmq的货色都在这里
而后进入/usr/rocketmq下,执行如下命名
nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &
复制代码
rocketmq.config.namesrvAddr就是用来指定NameServer的地址的
查看日志
tail -f ~/logs/consolelogs/rocketmq-console.log
复制代码
当看到如下日志,就阐明启动胜利了

而后在浏览器中输出http://linux服务器的ip:8088/就能够看到控制台了,如果无法访问,能够看看防火墙有没有敞开

右上角能够把语言切换成中文

Broker集群信息

topic信息
通过控制台能够查看生产者、消费者、Broker集群等信息,十分直观。
性能很多,这里就不一一介绍了。
测试
环境搭好之后,就能够进行测试了。
引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>
复制代码
生产者发送音讯
public class Producer {
    public static void main(String[] args) throws Exception {
        //创立一个生产者,指定生产者组为sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");

        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 第一次发送可能会超时,我设置的比拟大
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        // 创立一条音讯
        // topic为 sanyouTopic
        // 音讯内容为 三友的java日记
        // tags 为 TagA
        Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送音讯并失去音讯的发送后果,而后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 敞开生产者
        producer.shutdown();
    }

}
复制代码

构建一个音讯生产者DefaultMQProducer实例,而后指定生产者组为sanyouProducer;
指定NameServer的地址:服务器的ip:9876,因为须要从NameServer拉取Broker的信息
producer.start() 启动生产者
构建一个内容为三友的java日记的音讯,而后指定这个音讯往sanyouTopic这个topic发送
producer.send(msg):发送音讯,打印后果
敞开生产者

运行后果如下
SendResult [sendStatus=SEND_OK, msgId=C0A81FAF54F818B4AAC2475FD2010000, offsetMsgId=C0A8C88F00002A9F000000000009AE55, messageQueue=MessageQueue [topic=sanyouTopic, brokerName=broker-a, queueId=0], queueOffset=0]
复制代码
sendStatus=SEND_OK 阐明发送胜利了,此时就能后控制台看到未生产的音讯了。

到控制台看到音讯那块,而后选定发送的topic,查问的工夫范畴手动再选一下,不选就查不进去(我狐疑这是个bug),而后查问就能看到了一条音讯。
而后点击一下MESSAGE DETAIL就可能看到详情。

这里就能看到发送音讯的详细信息。
左下角音讯的生产的生产,因为咱们还没有消费者订阅这个topic,所以左下角没数据。
消费者生产音讯
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 通过push模式生产音讯,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 订阅这个topic下的所有的音讯
        consumer.subscribe("sanyouTopic", "*");

        // 注册一个生产的监听器,当有音讯的时候,会回调这个监听器来生产音讯
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("生产音讯:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
复制代码

创立一个消费者实例对象,指定消费者组为sanyouConsumer
指定NameServer的地址:服务器的ip:9876
订阅 sanyouTopic 这个topic的所有信息
consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有音讯的时候就会回调这个监听器,解决音讯,所以须要用户实现这个接口,而后解决音讯。
启动消费者

启动之后,消费者就会生产方才生产者发送的音讯,于是控制台就打印出如下信息
Consumer Started.
生产音讯:三友的java日记 
复制代码
此时再去看控制台

发现被sanyouConsumer这个消费者组给生产了。
SpringBoot环境下集成RocketMQ
集成
在理论我的项目中必定不会像下面测试那样用,都是集成SpringBoot的。
1、引入依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <version>2.1.1.RELEASE</version>
</dependency>
复制代码
2、yml配置
rocketmq:
  producer:
    group: sanyouProducer
  name-server: 192.168.200.143:9876
复制代码
3、创立消费者
SpringBoot底下只须要实现RocketMQListener接口,而后加上@RocketMQMessageListener注解即可
@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic")
public class SanYouTopicListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("解决音讯:" + msg);
    }

}
复制代码
@RocketMQMessageListener须要指定消费者属于哪个消费者组,生产哪个topic,NameServer的地址曾经通过yml配置文件配置类
4、测试
@SpringBootTest(classes = RocketMQApplication.class)
@RunWith(SpringRunner.class)
public class RocketMQTest {

    @Autowired
    private RocketMQTemplate template;

    @Test
    public void send() throws InterruptedException {
        template.convertAndSend("sanyouTopic", "三友的java日记");
        Thread.sleep(60000);
    }

}
复制代码
间接注入一个RocketMQTemplate,而后通过RocketMQTemplate发送音讯。
运行后果如下:
解决音讯:三友的java日记
复制代码
确实生产到音讯了。
原理
其实原理是一样的,只不过在SpringBoot中给封装了一层,让应用起来更加简略。
1、RocketMQTemplate结构代码

所以从这能够看出,最终在结构RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以可想而知,最终RocketMQTemplate发送音讯也是通过DefaultMQProducer发送的。
2、@RocketMQMessageListener 注解解决

从这能够看出,会为每一个加了@RocketMQMessageListener注解的对象创立一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer生产音讯的。
至于监听器,是在这

遍历每条音讯,而后调用handleMessage,最终会调用实现了RocketMQListener的对象解决音讯。
最初
通过下面的实践介绍和理论的环境搭建再加上代码的测试,置信应该能够对RocketMQ有个入门,有趣味的小伙伴能够手动搭起来,整个过程顺利的话可能就十几二十分钟这样子。
最初我再说一句,从文章整体也能够看出本文没有波及太深刻的一些机制和原理的解说,比方音讯是如何存储的,事务和提早音讯是如何实现的,主从是如何同步的等等,甚至压根就没提到队列这个词,次要是因为我打算前面再写一篇文章,来独自分析这些机制和原理。
最初的最初,本文所有的代码地址:

github.com/sanyou3/roc…