rocketONS-starter
介绍
rocketONS-starter 是一个基于阿里云ONS音讯服务的轻量级Spring Boot Starter。它为您提供了一个简略高效的封装,使您可能疾速创立RocketMQ生产者和消费者,实现利用之间的消息传递。实用于高并发、高吞吐、低提早的分布式音讯场景。
软件架构
rocketONS-starter 基于Spring的ApplicationContext容器治理,主动扫描consumer监听器,并注册启动消费者,用于接管和解决来自阿里云ONS服务散发的音讯。依据配置文件动态创建消费者和生产者,自定义消费者的启停开关,并主动序列化和解析音讯实体。同时,它还反对音讯过滤、程序音讯和延时音讯等高级性能,满足不同场景的需要。
次要特点
- 简略易用:提供简洁的配置和API,疾速上手,轻松实现音讯生产和生产。
- 高性能:基于阿里云ONS音讯服务,享受高并发、高吞吐、低提早的分布式音讯服务。
- 弹性扩大:依据业务需要自在增加消费者和生产者,实现弹性扩大。
- 高级性能反对:反对音讯过滤、程序音讯和延时音讯等高级性能,满足不同场景的需要。
装置
将以下依赖增加到您的我的项目中:
<dependency> <groupId>io.gitee.zhucan123</groupId> <artifactId>rocket-ons-spring-boot-starter</artifactId> <version>1.0.8</version></dependency>
应用阐明
1. 将配置增加到我的项目中
rocket: address: http://xxxx secretKey: xxxx accessKey: xxxx topic: xxxx groupSuffix: GID_ enable: true delay: 1000
参数名 | 类型 | 是否必填 | 默认值 | 形容 |
---|---|---|---|---|
accessKey | String | 是 | - | 用于身份认证的AccessKeyId,创立于阿里云账号治理控制台。 |
secretKey | String | 是 | - | 用于身份认证的AccessKeySecret,创立于阿里云账号治理控制台。 |
address | String | 是 | - | 设置TCP协定接入点。 |
groupSuffix | String | 否 | GID_ | 控制台创立的Group ID的前缀,通常以"GID_"结尾。 |
topic | String | 是 | - | 当生产者未指定topic时应用的默认绑定topic。 |
delay | Integer | 否 | 1000 | 音讯发送提早毫秒数。 |
enable | Boolean | 否 | true | 是否启用starter。 |
2. 在主程序中启用rocketONS-starter
@EnableRocketONSpublic class App { public static void main(String[] args) { SpringApplication.run(App.class, args); }}
3. 示例代码:应用consumer
@ConsumerListener(tags = "msg_tag", consumers = 2)@OnsConfiguration(topic = "topic-example", group = "GID_${example.group}")public class ExampleConsumerListener implements RocketListener<MessageData> { @Override public Action consume(Message message, MessageData messageBody, ConsumeContext consumeContext) { // 解决业务逻辑 return Action.CommitMessage; }}
- @OnsConfiguration:注册成一个Spring容器,并设置消费者绑定的topic和group。可设置固定值,也可应用${propertiesKey}的形式读取配置文件中的配置。
- @ConsumerListener:标识这是一个ONS音讯消费者监听器。能够设置tags来进行音讯过滤,设置consumers来指定消费者线程数。
4. 示例代码:应用producer
@Servicepublic class ExampleProducerService { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage(MessageData messageData) { rocketMQTemplate.syncSend("topic-example:msg_tag", messageData); }}
通过注入RocketMQTemplate,应用syncSend办法同步发送音讯。办法参数中的字符串格局为topic:tag,示意发送到指定topic并设置音讯tag。
高级性能
1. 程序音讯
应用RocketMQTemplate的syncSendOrderly办法发送程序音讯,确保消费者依照发送程序进行音讯解决。
rocketMQTemplate.syncSendOrderly("topic-example:msg_tag", messageData, orderId);
2. 延时音讯
在发送音讯时,通过设置messageDelayLevel参数指定延时级别。
rocketMQTemplate.syncSend("topic-example:msg_tag", messageData, messageDelayLevel);
3. 音讯过滤
应用@ConsumerListener注解的tags属性来实现音讯过滤。设置对应的tag值,消费者将只会生产带有该tag的音讯。
@ConsumerListener(tags = "msg_tag", consumers = 2)public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
请留神,您须要在生产音讯时设置相应的tag。
rocketMQTemplate.syncSend("topic-example:msg_tag", messageData);
4. 异步发送音讯
除了同步发送音讯,RocketMQTemplate还提供了异步发送音讯的办法。应用asyncSend办法发送音讯,提供一个回调函数来解决发送后果。
rocketMQTemplate.asyncSend("topic-example:msg_tag", messageData, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 解决发送胜利的逻辑 } @Override public void onException(Throwable e) { // 解决发送失败的逻辑 } });
5. 播送模式
播送模式容许您将音讯发送给所有消费者。要启用播送模式,须要在消费者监听器中设置broadcast属性为true。
@ConsumerListener(tags = "msg_tag", consumers = 2, broadcast = true)public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
6. 音讯重试策略
当消费者解决音讯失败时,能够通过返回Action.ReconsumeLater来触发音讯重试。您能够在配置文件中设置重试策略。
yamlCopy coderocket: retry: 3 delay: 1000
参数名 | 类型 | 是否必填 | 默认值 | 形容 |
---|---|---|---|---|
retry | Integer | 否 | 3 | 音讯重试次数。 |
delay | Integer | 否 | 1000 | 音讯发送提早毫秒数。 |
7. 自定义序列化与反序列化
默认状况下,rocketONS-starter 应用 Java 序列化和反序列化音讯。但您能够通过实现MessageSerializer接口来自定义序列化和反序列化策略。 public class CustomMessageSerializer implements MessageSerializer { @Override public byte[] serialize(Object obj) { // 实现自定义的序列化逻辑 } @Override public <T> T deserialize(byte[] data, Class<T> clazz) { // 实现自定义的反序列化逻辑 } }
而后,在配置文件中指定自定义序列化器。
rocket: serializer: com.example.CustomMessageSerializer
注意事项
- 确保在生产和生产环境中应用雷同的序列化和反序列化策略。
- 当应用Action.ReconsumeLater触发音讯重试时,务必留神防止重试风暴,免得影响整体性能。
8. 提早音讯
RocketMQ 反对提早音讯发送,能够在发送时指定提早级别。要发送提早音讯,请应用 sendDelay 办法。
int delayLevel = 3; // 提早级别,具体延迟时间须要参考 RocketMQ 的提早级别配置rocketMQTemplate.sendDelay("topic-example:msg_tag", messageData, delayLevel);
9. 事务音讯
RocketMQ 反对发送事务音讯,能够在发送音讯时关联一个本地事务。应用 sendMessageInTransaction 办法发送事务音讯。
rocketMQTemplate.sendMessageInTransaction("topic-example:msg_tag", messageData, new LocalTransactionExecuter() { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { // 执行本地事务 // 返回事务状态 }}, null);
10. 集成测试
为了不便集成测试,您能够应用 RocketMQTestListener 注解启动一个测试消费者。测试消费者会把收到的音讯存储在内存中,以便测试时验证。
@RocketMQTestListener(topics = "topic-example", tags = "msg_tag")public class ExampleConsumerListener implements RocketListener<MessageData> { ... }
在测试用例中,您能够应用 RocketMQTestListener.getMessages() 办法获取收到的音讯。
11. 性能调优
为了进步零碎性能,您能够通过以下形式调优 RocketMQ:
- 调整线程池大小。
- 优化网络参数,如连贯超时工夫。
- 调整消费者拉取批次大小。
- 优化音讯沉积参数。
具体参数配置请参考 RocketMQ 官网文档。
12. 社区与反对
- 有问题请在 码云 提交 issue。
- 欢送参加我的项目开发,可通过 Fork 仓库并提交 Pull Request。
- 更多信息请关注作者 码云主页。