关于云计算:使用Java客户端发送消息和消费的应用

31次阅读

共计 5668 个字符,预计需要花费 15 分钟才能阅读完成。

体验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

试验简介

本教程将 Demo 演示应用 java 客户端发送音讯和生产的利用场景

试验实操

第 1 节 如何发送和生产并发音讯

并发音讯,也叫一般音讯,是绝对程序音讯而言的,一般音讯的效率最高。本教程将简略演示如何应用纯 java client 发送和生产音讯。

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

再执行命令,能够看到失常生产和生产输入

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

3. Demo 代码阐明

Demo 代码能够查看 github。并发音讯,意思是生产者能够并发的向 topic 中发送音讯,生产端不辨别程序的音讯,这种模式效率最好。生产者 demo 代码如下:

最初留一个思考题给大家:生产者实例和消费者实例,都是线程平安的吗?

第 2 节 如何发送和生产程序音讯

程序音讯分为分区有序和全局有序。生产生产代码都是一样的,区别在于分区有序的 topic 中 queue 个数能够是任意有效值,全局有序的 topic 要求 queue 的个数为 1。程序音讯的实现非常简单易懂,但就义了可用性,单节点故障会间接影响程序音讯。

什么是分区有序音讯,什么场景应该应用呢,又该如何发送分区有序音讯?分区有序示意在一个 queue 中的音讯是有序的,发送音讯时设置设置了雷同 key 的音讯会被发送到同一个 queue 中。

本教程将简略演示如何应用纯 java client 发送和生产程序音讯。

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

再执行命令,能够看到失常生产和生产输入。生产输入留神看雷同 queue id 的音讯输入内容中的数字,依照从小到大就是正确的。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

3. Demo 代码阐明

Demo 代码能够查看 github。

  • 生产者阐明

生产者会依据设置的 keys 做 hash,雷同 hash 值的音讯会发送到雷同的 queue 中。所以雷同 hash 值的音讯须要保障在同一个线程中程序的发送。

  • 消费者阐明

消费者应用绝对比较简单,音讯监听类实现 org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly 接口即可。雷同 queue 的音讯须要串行解决,这样救保障生产的程序性

第 3 节 如何发送和生产提早音讯

提早音讯,对于一些非凡场景比方订票后 30 分钟不领取主动勾销等相似场景比拟有用。本教程将简略演示如何应用纯 java client 发送和生产提早音讯。

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

执行命令,能够看到失常生产和生产输入。目前 RocketMQ 反对多种提早级别, 不过每种提早级别都是基于 RocketMQ 本身,理论延迟时间会加上 Broker-Client 端的网络状况不同而略有差别。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

3. Demo 代码阐明

Demo 代码能够查看 github。

  • 生产者阐明

生产者在发送音讯的时候须要设置提早级别,RocketMQ 反对多种提早级别。如果把延迟时间算作一个以空格宰割的数组,提早级别就是延迟时间数组的下标 index+1。RocketMQ 如何解析提早级别和延迟时间映射关系。

  • 消费者阐明: 消费者依照并发音讯生产即可

第 4 节 如何发送和生产事务音讯

事务音讯,是 RocketMQ 解决分布式事务的一种实现,极其简略好用。一个事物音讯大抵的生命周期如下图

概括为如下几个重要点:

  1. 生产者发送 half 音讯(事物音讯)
  2. Broker 存储 half 音讯
  3. 生产者解决本地事物,解决胜利后 commit 事物
  4. 消费者生产到事物音讯

本教程将简略演示如何应用纯 java client 发送和生产事物音讯。

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

执行命令,能够看到事物音讯的全副过程。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

3. Demo 代码阐明

Demo 代码能够查看 github。在事物音讯中,生产代码和一般音讯的生产一样,次要代码在生产者端。

生产者端的次要代码蕴含 3 个步骤:

  1. 初始化生产者,设置回调线程池、设置本地事物解决监听类。

这里留神事物音讯的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是一般生产者类。

事物监听类须要实现 2 个办法,这里的逻辑都是 mock 的,理论应用的时候须要依据理论批改。

  1. 发送事物音讯。调用 sendMessageInTransaction() 办法发送事物音讯,而不是以前的 send() 办法。

第 5 节 生产者消费者如何同步发送、生产音讯(Request-Reply)

request-reply 模式,能够满足目前相似 RPC 同步调用的场景,本教程将简略演示如何应用该模式。

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

执行命令,能够看到失常生产和生产输入。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

通过代码后果和代码比拟,咱们得悉 request-reply 相似 RPC 同步调用的成果。

集体感觉:须要同步调用就用 RPC,不要走 MQ,毕竟两者是齐全不同的指标的产品,业余的事件交给业余的产品。

3. Demo 代码阐明

Demo 代码能够查看 github。

request-reply 模式,在生产者和消费者两端都和个别的生产生产有区别,上面别离介绍下 demo 代码。

生产者 demo 次要代码, 次要区别在于调用 request(),而不是 send() 办法。

消费者 demo 次要代码: 生产代码次要减少了“回复”逻辑。回复是利用音讯发送间接向生产者发送一条音讯。有点相似事物音讯中 broker 回查生产者。

一个小问题:事物音讯和 request-reply 音讯时,生产者的生产者组名有什么要求嘛?

第 6 节 如何有选择性的生产音讯

有时候咱们只想生产局部音讯,当然全副生产,在代码中过滤。如果音讯海量时,会有很多资源节约,比方节约不必要的带宽。咱们能够通过 tag,sql92 表达式来选择性的生产。

  • 进入 broker 目录
cd /usr/local/services/5-rocketmq/broker-01
  • 编辑配置文件,批改 broker 配置项 2 个
vim conf/broker.conf

配置项值:

// 是否反对重试音讯也过滤
filterSupportRetry=true

// 反对属性过滤
enablePropertyFilter=true

批改后:

  • 重启 broker
./restart.sh

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行 tag 过滤代码 demo

执行命令,能够看到失常生产和生产输入。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

3. 执行 sql 过滤代码 demo

执行命令,能够看到失常生产和生产输入。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

4. Demo 代码阐明

Demo 代码能够查看 github。以下别离介绍生产者和消费者次要 demo 代码。

  • 生产者

在生产 tag 音讯的时候,音讯中须要加上发送 tag;sql92 过滤的时候,加上自定义 k -v。

  • 消费者

tag 过滤生产时,在订阅 topic 时,也增加上 tag 订阅

SQL92 过滤时,增加上 SQL 过滤订阅。至于 SQL92 除了等号,还是反对什么,大家能够自行自行查看或者到群里问。

第 7 节 如何应用 ACL 客户端生产生产音讯

ACL,全称是 Access Control List,是 RocketMQ 设计来做拜访和权限管制的。更多文档参见 github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

0. 启动一个集群

  • 进入 broker 目录
cd /usr/local/services/5-rocketmq/broker-01
  • 编辑配置文件,批改 broker 配置项 1 个
vim conf/broker.conf

配置项值:

aclEnable=true

批改后:

  • 重启 broker
./restart.sh

1. 下载 java 代码 demo(已下载则疏忽操作)

cd /data/demos

git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git

2. 打包,执行代码 demo

执行命令,能够看到失常生产和生产输入。demo 代码应用的 admin 权限发送和生产,理论应用须要对于每个 topic,消费者组受权,能力失常生产生产。

// 进入 demo 代码目录
cd /data/demos/06-all-java-demos/

// 打包
mvn clean package

// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

3. Demo 代码阐明

Demo 代码能够查看 github。带 ACL 的生产者和消费者在初始化的时候,都必须给一个 hook 实例,构建办法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

在 broker 端 secret key 用来校验信息的完整性,access key 用来校验用户权限。二者缺一不可。

正文完
 0