文章首发于公众号《程序员果果》
地址 : https://mp.weixin.qq.com/s/dY...
关注我
欢送扫码或微信搜寻公众号《程序员果果》关注我,关注有惊喜~
音讯发送示例
导入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version></dependency>
application.yml:
rocketmq: name-server: 172.16.250.129:9876 producer: group: myGroup
一般音讯
同步发送
原理:
同步发送是指音讯发送方收回一条音讯后,会在收到服务端返回响应之后才发下一条音讯的通信形式。
利用场景:
这种可靠性同步地发送形式利用场景十分宽泛,例如重要告诉邮件、报名短信告诉、营销短信零碎等。
示例代码:
public void sendMsg() throws Exception { Message message = new Message( // 一般音讯所属的Topic "Topic-Normal", // Message Tag可了解为Gmail中的标签,对音讯进行再归类,不便Consumer指定过滤条件在音讯队列 RocketMQ 的服务器过滤。 "TagA", // Message Body能够是任何二进制模式的数据。 "Hello MQ".getBytes() ); rocketMQTemplate.getProducer().send( message ); // 等同于下面的形式(罕用) //rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());}
异步发送
原理:
异步发送是指发送方收回一条音讯后,不等服务端返回响应,接着发送下一条音讯的通信形式。RocketMQ异步发送,须要实现异步发送回调接口(SendCallback)。音讯发送方在发送了一条音讯后,不须要期待服务端响应即可发送第二条音讯。发送方通过回调接口接管服务端响应,并解决响应后果。
利用场景:
异步发送个别用于链路耗时较长,对响应工夫较为敏感的业务场景,例如,您视频上传后告诉启动转码服务,转码实现后告诉推送转码后果等。
示例代码:
public void sendAsyncMsg() { Map<String , Object> map = new HashMap<>(); map.put( "name" , "zs" ); map.put( "age" , 20); rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 音讯发送胜利。 log.info( "async send success" ); } @Override public void onException(Throwable throwable) { // 音讯发送失败,须要进行重试解决,可从新发送这条音讯或长久化这条数据进行弥补解决。 log.info( "async send fail" ); } } );}
程序音讯
全局程序音讯
- 概念:对于指定的一个Topic,所有音讯依照严格的先入先出(FIFO)的程序来公布和生产。
- 实用场景:实用于性能要求不高,所有的音讯严格依照FIFO准则来公布和生产的场景。
- 示例:在证券解决中,以人民币兑换美元为Topic,在价格雷同的状况下,先出价者优先解决,则能够依照FIFO的形式公布和生产全局程序音讯。
分区程序音讯
- 概念:对于指定的一个Topic,所有音讯依据Sharding Key进行区块分区。同一个分区内的音讯依照严格的FIFO程序进行公布和生产。Sharding Key是程序音讯中用来辨别不同分区的关键字段,和一般音讯的Key是齐全不同的概念。
- 实用场景:实用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地依照FIFO准则进行音讯公布和生产的场景。
示例:
- 用户注册须要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的音讯都会依照公布的先后顺序来生产。
- 电商的订单创立,以订单ID作为Sharding Key,那么同一个订单相干的创立订单音讯、订单领取音讯、订单退款音讯、订单物流音讯都会依照公布的先后顺序来生产。
无序音讯、全局程序音讯、分区程序音讯的比照
示例代码
public void sendOrderlyMsg() { //依据指定的hashKey按程序发送 for (int i = 0; i < 1000; i++) { String orderId = "biz_" + i % 10; // 分区程序音讯中辨别不同分区的关键字段,Sharding Key与一般音讯的key是齐全不同的概念。 // 全局程序音讯,该字段能够设置为任意非空字符串。 String shardingKey = String.valueOf(orderId); try { SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey ); // 发送音讯,只有不抛异样就是胜利。 if (sendResult != null) { System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId()); } } catch (Exception e) { // 音讯发送失败,须要进行重试解决,可从新发送这条音讯或长久化这条数据进行弥补解决。 System.out.println(new Date() + " Send mq message failed"); e.printStackTrace(); } }}
延时音讯
概念:
Producer将音讯发送到音讯队列RocketMQ服务端,但并不冀望立马投递这条音讯,而是提早肯定工夫后才投递到Consumer进行生产,该音讯即延时音讯。
实用场景:
音讯生产和生产有工夫窗口要求,例如在电商交易中超时未领取敞开订单的场景,在订单创立时会发送一条延时音讯。这条音讯将会在30分钟当前投递给消费者,消费者收到此音讯后须要判断对应的订单是否已实现领取。如领取未实现,则敞开订单。如已实现领取则疏忽。
示例代码:
public void sendDelayMsg() { rocketMQTemplate.syncSend( "Topic-Delay", MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(), 3000, //设置延时等级3,这个音讯将在10s之后发送(当初只反对固定的几个工夫,详看delayTimeLevel) //messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 3 );}
事务音讯
概念:
- 事务音讯:音讯队列RocketMQ提供相似X/Open XA的分布式事务性能,通过音讯队列RocketMQ事务音讯能达到分布式事务的最终统一。
- 半事务音讯:暂不能投递的音讯,发送方曾经胜利地将音讯发送到了音讯队列RocketMQ服务端,然而服务端未收到生产者对该音讯的二次确认,此时该音讯被标记成“暂不能投递”状态,处于该种状态下的音讯即半事务音讯。
- 音讯回查:因为网络闪断、生产者利用重启等起因,导致某条事务音讯的二次确认失落,音讯队列RocketMQ服务端通过扫描发现某条音讯长期处于“半事务音讯”时,须要被动向音讯生产者询问该音讯的最终状态(Commit或是Rollback),该询问过程即音讯回查。
分布式事务音讯的劣势:
音讯队列RocketMQ分布式事务音讯不仅能够实现利用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务能够被拆分为小事务,不仅能晋升效率,还不会因为某一个关联利用的不可用导致整体回滚,从而最大限度保障外围零碎的可用性。在极其状况下,如果关联的某一个利用始终无奈解决胜利,也只需对以后利用进行弥补或数据勘误解决,而无需对整体业务进行回滚。
典型场景:
在电商购物车下单时,波及到购物车零碎和交易系统,这两个零碎之间的数据最终一致性能够通过分布式事务音讯的异步解决实现。在这种场景下,交易系统是最为外围的零碎,须要最大限度地保障下单胜利。而购物车零碎只须要订阅音讯队列RocketMQ的交易订单音讯,做相应的业务解决,即可保障最终的数据一致性。
事务音讯交互流程如下图所示:
事务音讯发送步骤如下:
- 发送方将半事务音讯发送至音讯队列RocketMQ服务端。
- 音讯队列RocketMQ服务端将音讯长久化胜利之后,向发送方返回Ack确认音讯曾经发送胜利,此时音讯为半事务音讯。
- 发送方开始执行本地事务逻辑。
- 发送方依据本地事务执行后果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务音讯标记为可投递,订阅方最终将收到该音讯;服务端收到Rollback状态则删除半事务音讯,订阅方将不会承受该音讯。
事务音讯回查步骤如下:
- 在断网或者是利用重启的非凡状况下,上述步骤4提交的二次确认最终未达到服务端,通过固定工夫后服务端将对该音讯发动音讯回查。
- 发送方收到音讯回查后,须要查看对应音讯的本地事务执行的最终后果。
- 发送方依据查看失去的本地事务的最终状态再次提交二次确认,服务端仍依照步骤4对半事务音讯进行操作。
示例代码:
发送事务音讯蕴含以下两个步骤:
- 发送半事务音讯(Half Message,示例代码如下
/** * 事务音讯 */public void sendTransactionMsg() { TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction( "Topic-Tx:TagA", MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(), null ); SendStatus sendStatus = transactionSendResult.getSendStatus(); LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState(); System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );}
- 发送方开始执行本地事务逻辑
@Component@RocketMQTransactionListenerpublic class TxProducerListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务 System.out.println("TX message listener execute local transaction"); RocketMQLocalTransactionState result; try { // 业务代码( 例如下订单 ) result = RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { System.out.println("execute local transaction error"); result = RocketMQLocalTransactionState.UNKNOWN; } return result; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 查看本地事务( 例如查看下订单是否胜利 ) System.out.println("TX message listener check local transaction"); RocketMQLocalTransactionState result; try { //业务代码( 依据查看后果,决定是COMMIT或ROLLBACK ) result = RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { // 异样就回滚 System.out.println("check local transaction error"); result = RocketMQLocalTransactionState.ROLLBACK; } return result; }}
- 发送方在本地事务执行后,若向服务端提交二次确认是Commit,RocketMQ服务端收到Commit状态则将半事务音讯标记为可投递,订阅方最终将收到该音讯;订阅方代码如下
@Component@Slf4j@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")public class TxConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("Receive message:{}" , message); }}
源码
https://github.com/gf-huanchupk/SpringBootLearning
系列文章
RocketMQ 简介<br/>
RocketMQ 装置