共计 5900 个字符,预计需要花费 15 分钟才能阅读完成。
作者:绍舒
引言
Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里团体外部业务以及阿里云数以万计的企业客户。作为金融级牢靠的业务音讯计划,RocketMQ 从创立之初就始终专一于业务集成畛域的异步通信能力构建。本篇将持续业务音讯集成的场景,从性能原理、利用案例、最佳实际以及实战等角度介绍 RocketMQ 的程序音讯性能。
简介
程序音讯是音讯队列 RocketMQ 版提供的一种对音讯发送和生产程序有严格要求的音讯。对于一个指定的 Topic,同一 MessageGroup 的音讯依照严格的先进先出(FIFO)准则进行公布和生产,即先公布的音讯先生产,后公布的音讯后生产,服务端严格依照发送程序进行存储、生产。同一 MessageGroup 的音讯保障程序,不同 MessageGroup 之间的音讯程序不做要求,因而需做到两点,发送的程序性和生产的程序性。
性能原理
在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?接下来就会围绕这个问题,比照一般音讯和程序音讯进行论述。
程序发送
在分布式环境下,保障音讯的全局程序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的状况下各自向 RocketMQ 服务端发送音讯 a 和音讯 b,因为分布式系统的限度,咱们无奈保障 a 和 b 的程序。因而业界音讯零碎通常保障的是分区的程序性,即保障带有同一属性的音讯的程序,咱们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条音讯 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。
同时,对于同一 MessageGroup,为了保障其发送程序的先后性,比较简单的做法是结构一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,程序音讯是同步发送的。同步发送的益处是不言而喻的,在客户端失去上一条音讯的发送后果后再发送下一条,即能精确保障发送程序,若应用异步发送或多线程则很难保障这一点。
因而能够看到,尽管在底层原理上,程序音讯发送和一般音讯发送并无二异,然而为了保障程序音讯的发送程序性,同步发送的形式相比拟一般音讯,实际上升高了音讯的最大吞吐。
程序生产
与程序音讯不同的是,一般音讯的生产实际上没有任何限度,消费者拉取的音讯是被异步、并发生产的,而程序音讯,须要保障对于同一个 MessageGroup,同一时刻只有一个客户端在生产音讯,并且在该条音讯被确认生产实现之前(或者进入死信队列),消费者无奈生产同一 MessageGroup 的下一条音讯,否则生产的程序性将得不到保障。因而这里存在着一个生产瓶颈,该瓶颈取决于用户本身的业务解决逻辑。极其状况下当某一 MessageGroup 的音讯过多时,就可能导致生产沉积。当然也须要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的音讯之间并不存在程序性的关联,是能够进行并发生产的。因而全文中提到的程序实际上是一种偏序。
小结
无论对于发送还是生产,咱们通过 MessageGroup 的形式将音讯分组,即并发的根本单元是 MessageGroup,不同的 MessageGroup 能够并发的发送和生产,从而肯定水平具备了可拓展性,反对多队列存储、程度拆分、并发生产,且不受影响。回顾一般音讯,站在程序音讯的视角,能够认为一般音讯的并发根本单元是单条音讯,即每条音讯均领有不同的 MessageGroup。
咱们回到结尾那个问题:
既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?
当初大家对于这个问题可能有一个根本的印象了,音讯的程序性当然很好,然而为了实现程序性也是有代价的。
下述是一个表格,简要比照了程序音讯和一般音讯。
最佳实际
正当设置 MessageGroup
MessageGroup 会有很多谬误的抉择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为局部规模较大的商家会产出较多订单,因为上游生产能力的限度,因而这部分商家所对应的订单就产生了重大的沉积。正确的做法该当是将订单号作为 MessageGroup,而且站在背地的业务逻辑上来说,同一订单才有程序性的要求。即抉择 MessageGroup 的最佳实际是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量该当尽量雷同且平均。
同步发送和发送重试
如之前章节所述,需应用同步发送和发送重试来保障发送的程序性。
生产幂等
音讯传输链路在异样场景下会有大量反复,业务生产是须要做生产幂等,防止反复解决带来的危险。
利用案例
- 用户注册须要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的音讯都会依照公布的先后顺序来生产。
- 电商的订单创立,以订单 ID 作为 MessageGroup,那么同一个订单相干的创立订单音讯、订单领取音讯、订单退款音讯、订单物流音讯都会依照公布的先后顺序来生产。
实战
发送
能够看到,该发送案例设置了 MessageGroup 并且应用了同步发送,发送的代码如下:
public class ProducerFifoMessageExample {private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);
private ProducerFifoMessageExample() {}
public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourFifoTopic";
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional. It makes producer could prefetch the topic route before
// message publishing.
.setTopics(topic)
// May throw {@link ClientException} if the producer is not initialized.
.build();
// Define your message body.
byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// Message secondary classifier of message besides topic.
.setTag(tag)
// Key(s) of the message, another way to mark message besides message id.
.setKeys("yourMessageKey-1ff69ada8e0e")
// Message group decides the message delivery order.
.setMessageGroup("youMessageGroup0")
.setBody(body)
.build();
try {final SendReceipt sendReceipt = producer.send(message);
LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {LOGGER.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();}
}
生产
生产的代码如下:
public class SimpleConsumerExample {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {}
public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "yourMessageTagA";
String topic = "yourTopic";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(5);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
for (MessageView message : messages) {
try {consumer.ack(message);
} catch (Throwable t) {LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
}
}
// Close the simple consumer when you don't need it anymore.
consumer.close();}
}
明天通过对 RocketMQ 程序音讯的介绍,心愿可能帮大家对程序音讯的原理和利用有更深刻的理解,同时也冀望 RocketMQ 的程序音讯可能帮忙您更无效的解决业务问题。如果您对 RocktMQ 的业务音讯感兴趣,也欢迎您扫描下方二维码退出钉钉群一起沟通交流~
点击此处,进入官网理解更多详情~