在MQ引论中,我曾经介绍了MQ的一些概念和用途,没看过的能够先看一下。这次我抉择介绍的是RocketMQ
Rocket MQ的概念
一图胜千言,都在图里了。
从下面的图,咱们能够看出主题是一类音讯的汇合,每条音讯必须属于一个主题。
RocketMQ中的每个音讯领有惟一的Message ID,且能够携带具备业务标识的key,当然你也能够抉择不给。
标签(tag)是更细一级的划分,用于同一主题下辨别不同类型的音讯。
同一类生产者形成生产者小组,同一类消费者形成消费者小组。
消费者生产的形式也有两种:
- 推动式生产
该模式下Broker收到数据后会被动推送给生产端,该生产模式个别实时性较高。
- 拉取式生产
Consumer生产的一种类型,利用通常被动调用Consumer的拉音讯办法从Broker服务器拉音讯、主动权由利用管制。一旦获取了批量音讯,利用就会启动生产过程。
咱们讲过RocketMQ是能够集群的,所以当多台Broker的时候,咱们就不能按IP和端口去找了,咱们用broker的名字去找,让nameServer去问,相似于注册核心。
架构图:
有了下面的根底概念,咱们来开始写个Hello world吧。
装置与配置
RocketMQ对内存要求比拟高,默认是8G来着,就算你改了默认的配置,改成2G, 基本上也跑不起来,晓得假相的我眼泪掉下来,我的阿里云服务器学生版只有2G,过后改了配置,我还有疑难为啥跑不起来。
Rocket MQ 是阿里巴巴开源的,文档也比拟丰盛,咱们依照文档走就行。
咱们本次下载4.40.release版的
而后上传到Linux上
-- 上传rz rocketmq-all-4.4.0-bin-release.zip-- 解压unzip rocketmq-all-4.4.0-bin-release.zip-- 名字太长改下名mv rocketmq-all-4.4.0-bin-release.zip rocketmq
次要配置的
配置的
- nameServer 哪台主机是nameServer
- master: 谁是主节点
- /etc/hosts 域名映射
在hosts文件中退出以下配置
-- 将来会搭建集群,mqmater是主节点,mqnameserver1是一个,将来会有多个47.101.136.147 mqmaster147.101.136.147 mqnameserver1
-- 长久化到哪里mkdir mqstorecd mqstore-- 日志mkdir commitlog-- 生产队列mkdir consumequeue-- 索引mkdir index-- 进入配置的文件夹cd conf 两主两从异步 cd 2m-2s-async vi broker-a.properties namesrvAddr = mqnameserver1:9876 分号隔开默认主题数量 autoCreateTopicEnable =truedefaultTopicQueueNums = 4-- 对外裸露的接口listenPort = 10911-- 无用文件保留工夫deleteWhen = 04-- 最多保留多长时间fileReservedTime = 48-- 存储门路storePathRootDir=/usr/rocketmq/mqstore-- 提交日志storePathCommitLog=/usr/rocketmq/mqstore/commitlog-- 生产门路storePathConsuQueue=/usr/rocketmq/mqstore/commitlog/consumequeue-- 索引门路storePathIndex=/usr/rocketmq/mqstore/commitlog/consumequeue-- 音讯的最大数量maxMessageSize=65536-- 主从同步,这是指明以后音讯队列是异步,而且是主节点brokerRole=ASYCN_MASTER-- 同步到硬盘,异步刷新flushDiskType=ASYNC_FLUSH-- 0 示意mastter 大于0是从brokerId = 0-- 批量替换 这是配置RocketMq的日志门路,这个请在conf文件夹下配置sed -i 's#${user.home}#/usr/rocketmq/#g' *.xml
设置启动参数:
-- 把8G变成1G vi runbroker.sh -- 把8G变成1G vi runserver.sh
启动:
--可能不少视频在教这类入门的时候,会倡议你把防火墙关掉,我不倡议你这么做,你搞一台云服务器就懂了,很容易受到攻打。-- 所以这里咱们将RocketMQ须要的端口在端口中放行婴青睐。 firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --zone=public --add-port=10909/tcp --permanent firewall-cmd --reload -- 后盾启动 nohup sh mqnamesrv & -- 后盾启动 nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties& -- jps 在linux上能够查出有启动了多少java利用,看到上面这些,阐明启动失常。 31972 BrokerStartup 31930 NamesrvStartup 3485 Application 32031 Jps
控制台监控MQ
通常对于MQ这类比拟重要的产品,咱们是须要监控一些的,RocketMQ提供了监控服务:
地址:
https://github.com/apache/roc...
下载之后,咱们引入到IDEA中,下载的是源码。
server.contextPath=
server.port=8080
改下配置文件
### SSL setting#server.ssl.key-store=classpath:rmqcngkeystore.jks#server.ssl.key-store-password=rocketmq#server.ssl.keyStoreType=PKCS12#server.ssl.keyAlias=rmqcngkey#spring.application.index=truespring.application.name=rocketmq-consolespring.http.encoding.charset=UTF-8spring.http.encoding.enabled=truespring.http.encoding.force=truelogging.config=classpath:logback.xml#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876rocketmq.config.namesrvAddr= 这里是你RocketMQ的地址#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default truerocketmq.config.isVIPChannel=#rocketmq-console's data path:dashboard/monitorrocketmq.config.dataPath=/tmp/rocketmq-console/data#set it false if you don't want use dashboard.default truerocketmq.config.enableDashBoardCollect=true#set the message track trace topic if you don't want use the default onerocketmq.config.msgTrackTopicName=rocketmq.config.ticketKey=ticket#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is requiredrocketmq.config.loginRequired=false
这个控制台是用Spring Boot写的,间接启动即可。
失常启动,浏览器中的页面是这样的:
Hello World
接着咱们来大抵的写一个生产音讯,生产音讯的demo吧。学到肯定境地,有的时候能够不必看接口文档的,间接依据思维推断即可。
下面咱们讲到生产者和消费者之间的桥梁是nameServer,在java中必定是要先new的,先new消费者,指明消费者在哪个组。
而后生产音讯应该怎么生产,一个一个生产是跳着生产,生产的主题是哪一个,主题的哪一个标签。
基于这种思维,咱们的DEMO如下:
public class MyProducer { private static final String NAMESRV_ADDR = "47.101.136.147:9876"; public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("myProducer"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10 ; i++) { Message message = new Message("MyTopic4","MyTags1","key"+i,("mq"+i).getBytes()); try { SendResult result = producer.send(message,1000000); System.out.println("发送胜利"+result); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } producer.shutdown(); }}
public class MyConsumer { // 指明nameServer的地址 private static final String NAMESRV_ADDR = "47.101.136.147:9876"; public static void main(String[] args) throws MQClientException { // 音讯队列向消费者推,你也能够拉,结构中,指明消费者所属的组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer"); consumer.setNamesrvAddr(NAMESRV_ADDR); // 设置为程序音讯 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 指明生产主题 consumer.subscribe("MyTopic4","*"); // 监听, consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { MessageExt message = list.get(0); System.out.println(message.getTopic()); System.out.println(message.getKeys()); System.out.println(message.getTags()); try { System.out.println(new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}
所须要的依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-namesrv</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-broker</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>${aspectj.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>${aspectj.version}</version> </dependency> <!-- Spring AOP + AspectJ --> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.jooq</groupId> <artifactId>joor</artifactId> <version>0.9.6</version> </dependency> </dependencies>