关于rocketmq:RocketMQ学习笔记一-初遇篇

39次阅读

共计 6786 个字符,预计需要花费 17 分钟才能阅读完成。

在 MQ 引论中,我曾经介绍了 MQ 的一些概念和用途,没看过的能够先看一下。这次我抉择介绍的是 RocketMQ

Rocket MQ 的概念

一图胜千言,都在图里了。

从下面的图,咱们能够看出主题是一类音讯的汇合,每条音讯必须属于一个主题。
RocketMQ 中的每个音讯领有惟一的 Message ID,且能够携带具备业务标识的 key,当然你也能够抉择不给。
标签 (tag) 是更细一级的划分,用于同一主题下辨别不同类型的音讯。

同一类生产者形成生产者小组,同一类消费者形成消费者小组。
消费者生产的形式也有两种:

  • 推动式生产

该模式下 Broker 收到数据后会被动推送给生产端,该生产模式个别实时性较高。

  • 拉取式生产

Consumer 生产的一种类型,利用通常被动调用 Consumer 的拉音讯办法从 Broker 服务器拉音讯、主动权由利用管制。一旦获取了批量音讯,利用就会启动生产过程。

咱们讲过 RocketMQ 是能够集群的,所以当多台 Broker 的时候,咱们就不能按 IP 和端口去找了,咱们用 broker 的名字去找,让 nameServer 去问,相似于注册核心。

架构图:

有了下面的根底概念,咱们来开始写个 Hello world 吧。

装置与配置

RocketMQ 对内存要求比拟高,默认是 8G 来着,就算你改了默认的配置,改成 2G, 基本上也跑不起来,晓得假相的我眼泪掉下来,我的阿里云服务器学生版只有 2G,过后改了配置,我还有疑难为啥跑不起来。
Rocket MQ 是阿里巴巴开源的,文档也比拟丰盛,咱们依照文档走就行。

咱们本次下载 4.40.release 版的

而后上传到 Linux 上

-- 上传
rz rocketmq-all-4.4.0-bin-release.zip
-- 解压
unzip rocketmq-all-4.4.0-bin-release.zip
-- 名字太长改下名
mv rocketmq-all-4.4.0-bin-release.zip rocketmq

次要配置的

配置的

  • nameServer 哪台主机是 nameServer
  • master: 谁是主节点
  • /etc/hosts 域名映射

在 hosts 文件中退出以下配置

-- 将来会搭建集群,mqmater 是主节点,mqnameserver1 是一个, 将来会有多个
47.101.136.147 mqmaster1
47.101.136.147 mqnameserver1
-- 长久化到哪里
mkdir mqstore
cd mqstore
-- 日志
mkdir commitlog
-- 生产队列
mkdir consumequeue
-- 索引
mkdir index
-- 进入配置的文件夹
cd conf 
 两主两从异步 
cd 2m-2s-async 
vi broker-a.properties 
namesrvAddr = mqnameserver1:9876 分号隔开
默认主题数量 
autoCreateTopicEnable =true
defaultTopicQueueNums = 4
-- 对外裸露的接口
listenPort = 10911
-- 无用文件保留工夫
deleteWhen = 04
-- 最多保留多长时间
fileReservedTime = 48
-- 存储门路
storePathRootDir=/usr/rocketmq/mqstore
-- 提交日志
storePathCommitLog=/usr/rocketmq/mqstore/commitlog
-- 生产门路
storePathConsuQueue=/usr/rocketmq/mqstore/commitlog/consumequeue
-- 索引门路
storePathIndex=/usr/rocketmq/mqstore/commitlog/consumequeue
-- 音讯的最大数量
maxMessageSize=65536
-- 主从同步, 这是指明以后音讯队列是异步,而且是主节点
brokerRole=ASYCN_MASTER
-- 同步到硬盘, 异步刷新
flushDiskType=ASYNC_FLUSH
-- 0 示意 mastter 大于 0 是从
brokerId = 0
-- 批量替换 这是配置 RocketMq 的日志门路, 这个请在 conf 文件夹下配置
sed -i 's#${user.home}#/usr/rocketmq/#g' *.xml

设置启动参数:

-- 把 8G 变成 1G
 vi runbroker.sh
 -- 把 8G 变成 1G
 vi runserver.sh


启动:

-- 可能不少视频在教这类入门的时候, 会倡议你把防火墙关掉, 我不倡议你这么做,你搞一台云服务器就懂了,很容易受到攻打。-- 所以这里咱们将 RocketMQ 须要的端口在端口中放行婴青睐。firewall-cmd --zone=public --add-port=9876/tcp --permanent
 firewall-cmd --zone=public --add-port=10911/tcp --permanent
 firewall-cmd --zone=public --add-port=10909/tcp --permanent
 firewall-cmd --reload
 -- 后盾启动 
 nohup sh mqnamesrv &
 -- 后盾启动
 nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties&
 -- jps 在 linux 上能够查出有启动了多少 java 利用, 看到上面这些,阐明启动失常。31972 BrokerStartup
 31930 NamesrvStartup
 3485 Application
 32031 Jps

控制台监控 MQ

通常对于 MQ 这类比拟重要的产品,咱们是须要监控一些的,RocketMQ 提供了监控服务:


地址:
https://github.com/apache/roc…

下载之后, 咱们引入到 IDEA 中,下载的是源码。
server.contextPath=
server.port=8080
改下配置文件

### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey

#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr= 这里是你 RocketMQ 的地址
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=false

这个控制台是用 Spring Boot 写的,间接启动即可。
失常启动,浏览器中的页面是这样的:

Hello World

接着咱们来大抵的写一个生产音讯,生产音讯的 demo 吧。学到肯定境地,有的时候能够不必看接口文档的,间接依据思维推断即可。

下面咱们讲到生产者和消费者之间的桥梁是 nameServer,在 java 中必定是要先 new 的,先 new 消费者,指明消费者在哪个组。
而后生产音讯应该怎么生产,一个一个生产是跳着生产,生产的主题是哪一个,主题的哪一个标签。
基于这种思维,咱们的 DEMO 如下:

public class MyProducer {
    private static  final String NAMESRV_ADDR = "47.101.136.147:9876";
    public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("myProducer");
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        for (int i = 0; i < 10 ; i++) {Message message = new Message("MyTopic4","MyTags1","key"+i,("mq"+i).getBytes());
            try {SendResult result = producer.send(message,1000000);
                System.out.println("发送胜利"+result);
            } catch (RemotingException e) {e.printStackTrace();
            } catch (MQBrokerException e) {e.printStackTrace();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        producer.shutdown();}
}
public class MyConsumer {
    // 指明 nameServer 的地址
    private static  final String NAMESRV_ADDR = "47.101.136.147:9876";
    public static void main(String[] args) throws MQClientException {
       // 音讯队列向消费者推, 你也能够拉, 结构中,指明消费者所属的组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        // 设置为程序音讯
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 指明生产主题
        consumer.subscribe("MyTopic4","*");
        // 监听,
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {MessageExt message = list.get(0);
                 System.out.println(message.getTopic());
                 System.out.println(message.getKeys());
                 System.out.println(message.getTags());
            try {System.out.println(new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET));
            } catch (UnsupportedEncodingException e) {e.printStackTrace();
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();}
}

所须要的依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-tools</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-namesrv</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-broker</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>${aspectj.version}</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>${aspectj.version}</version>
        </dependency>
        <!-- Spring AOP + AspectJ -->
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.jooq</groupId>
            <artifactId>joor</artifactId>
            <version>0.9.6</version>
        </dependency>
    </dependencies>

正文完
 0