关于java:rocketONSstarter阿里云ONS消息服务的轻量级Spring-Boot-Starter

3次阅读

共计 4547 个字符,预计需要花费 12 分钟才能阅读完成。

rocketONS-starter

介绍

rocketONS-starter 是一个基于阿里云 ONS 音讯服务的轻量级 Spring Boot Starter。它为您提供了一个简略高效的封装,使您可能疾速创立 RocketMQ 生产者和消费者,实现利用之间的消息传递。实用于高并发、高吞吐、低提早的分布式音讯场景。

软件架构

rocketONS-starter 基于 Spring 的 ApplicationContext 容器治理,主动扫描 consumer 监听器,并注册启动消费者,用于接管和解决来自阿里云 ONS 服务散发的音讯。依据配置文件动态创建消费者和生产者,自定义消费者的启停开关,并主动序列化和解析音讯实体。同时,它还反对音讯过滤、程序音讯和延时音讯等高级性能,满足不同场景的需要。

次要特点

  • 简略易用:提供简洁的配置和 API,疾速上手,轻松实现音讯生产和生产。
  • 高性能:基于阿里云 ONS 音讯服务,享受高并发、高吞吐、低提早的分布式音讯服务。
  • 弹性扩大:依据业务需要自在增加消费者和生产者,实现弹性扩大。
  • 高级性能反对:反对音讯过滤、程序音讯和延时音讯等高级性能,满足不同场景的需要。

装置

将以下依赖增加到您的我的项目中:


<dependency>
    <groupId>io.gitee.zhucan123</groupId>
    <artifactId>rocket-ons-spring-boot-starter</artifactId>
    <version>1.0.8</version>
</dependency>

应用阐明

1. 将配置增加到我的项目中

rocket:
  address: http://xxxx
  secretKey: xxxx
  accessKey: xxxx
  topic: xxxx
  groupSuffix: GID_
  enable: true
  delay: 1000
参数名 类型 是否必填 默认值 形容
accessKey String 用于身份认证的 AccessKeyId,创立于阿里云账号治理控制台。
secretKey String 用于身份认证的 AccessKeySecret,创立于阿里云账号治理控制台。
address String 设置 TCP 协定接入点。
groupSuffix String GID_ 控制台创立的 Group ID 的前缀,通常以 ”GID_” 结尾。
topic String 当生产者未指定 topic 时应用的默认绑定 topic。
delay Integer 1000 音讯发送提早毫秒数。
enable Boolean true 是否启用 starter。

2. 在主程序中启用 rocketONS-starter

@EnableRocketONS
public class App {public static void main(String[] args) {SpringApplication.run(App.class, args);
    }
}

3. 示例代码:应用 consumer

@ConsumerListener(tags = "msg_tag", consumers = 2)
@OnsConfiguration(topic = "topic-example", group = "GID_${example.group}")
public class ExampleConsumerListener implements RocketListener<MessageData> {

  @Override
  public Action consume(Message message, MessageData messageBody, ConsumeContext consumeContext) {

    // 解决业务逻辑

    return Action.CommitMessage;
  }
}
  • @OnsConfiguration:注册成一个 Spring 容器,并设置消费者绑定的 topic 和 group。可设置固定值,也可应用 ${propertiesKey} 的形式读取配置文件中的配置。
  • @ConsumerListener:标识这是一个 ONS 音讯消费者监听器。能够设置 tags 来进行音讯过滤,设置 consumers 来指定消费者线程数。

4. 示例代码:应用 producer

@Service
public class ExampleProducerService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(MessageData messageData) {rocketMQTemplate.syncSend("topic-example:msg_tag", messageData);
    }
}

通过注入 RocketMQTemplate,应用 syncSend 办法同步发送音讯。办法参数中的字符串格局为 topic:tag,示意发送到指定 topic 并设置音讯 tag。

高级性能

1. 程序音讯

应用 RocketMQTemplate 的 syncSendOrderly 办法发送程序音讯,确保消费者依照发送程序进行音讯解决。

rocketMQTemplate.syncSendOrderly("topic-example:msg_tag", messageData, orderId);

2. 延时音讯

在发送音讯时,通过设置 messageDelayLevel 参数指定延时级别。

rocketMQTemplate.syncSend("topic-example:msg_tag", messageData, messageDelayLevel);

3. 音讯过滤

应用 @ConsumerListener 注解的 tags 属性来实现音讯过滤。设置对应的 tag 值,消费者将只会生产带有该 tag 的音讯。

@ConsumerListener(tags = "msg_tag", consumers = 2)
public class ExampleConsumerListener implements RocketListener<MessageData> {...}

请留神,您须要在生产音讯时设置相应的 tag。


rocketMQTemplate.syncSend("topic-example:msg_tag", messageData); 

4. 异步发送音讯

除了同步发送音讯,RocketMQTemplate 还提供了异步发送音讯的办法。应用 asyncSend 办法发送音讯,提供一个回调函数来解决发送后果。

rocketMQTemplate.asyncSend("topic-example:msg_tag", messageData, new SendCallback() {
  @Override 
  public void onSuccess(SendResult sendResult) {// 解决发送胜利的逻辑}
  @Override 
  public void onException(Throwable e) {// 解决发送失败的逻辑} 
});

5. 播送模式

播送模式容许您将音讯发送给所有消费者。要启用播送模式,须要在消费者监听器中设置 broadcast 属性为 true。

@ConsumerListener(tags = "msg_tag", consumers = 2, broadcast = true)
public class ExampleConsumerListener implements RocketListener<MessageData> {...}

6. 音讯重试策略

当消费者解决音讯失败时,能够通过返回 Action.ReconsumeLater 来触发音讯重试。您能够在配置文件中设置重试策略。

yamlCopy code
rocket:
  retry: 3
  delay: 1000
参数名 类型 是否必填 默认值 形容
retry Integer 3 音讯重试次数。
delay Integer 1000 音讯发送提早毫秒数。

7. 自定义序列化与反序列化

默认状况下,rocketONS-starter 应用 Java 序列化和反序列化音讯。但您能够通过实现 MessageSerializer 接口来自定义序列化和反序列化策略。public class CustomMessageSerializer implements MessageSerializer {@Override public byte[] serialize(Object obj) {// 实现自定义的序列化逻辑} @Override public <T> T deserialize(byte[] data, Class<T> clazz) {// 实现自定义的反序列化逻辑} }

而后,在配置文件中指定自定义序列化器。


rocket:
  serializer: com.example.CustomMessageSerializer

注意事项

  • 确保在生产和生产环境中应用雷同的序列化和反序列化策略。
  • 当应用 Action.ReconsumeLater 触发音讯重试时,务必留神防止重试风暴,免得影响整体性能。

8. 提早音讯

RocketMQ 反对提早音讯发送,能够在发送时指定提早级别。要发送提早音讯,请应用 sendDelay 办法。


int delayLevel = 3; // 提早级别,具体延迟时间须要参考 RocketMQ 的提早级别配置
rocketMQTemplate.sendDelay("topic-example:msg_tag", messageData, delayLevel);

9. 事务音讯

RocketMQ 反对发送事务音讯,能够在发送音讯时关联一个本地事务。应用 sendMessageInTransaction 办法发送事务音讯。


rocketMQTemplate.sendMessageInTransaction("topic-example:msg_tag", messageData, new LocalTransactionExecuter() {
    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        // 执行本地事务
        // 返回事务状态
    }
}, null);

10. 集成测试

为了不便集成测试,您能够应用 RocketMQTestListener 注解启动一个测试消费者。测试消费者会把收到的音讯存储在内存中,以便测试时验证。


@RocketMQTestListener(topics = "topic-example", tags = "msg_tag")
public class ExampleConsumerListener implements RocketListener<MessageData> {...}

在测试用例中,您能够应用 RocketMQTestListener.getMessages() 办法获取收到的音讯。

11. 性能调优

为了进步零碎性能,您能够通过以下形式调优 RocketMQ:

  • 调整线程池大小。
  • 优化网络参数,如连贯超时工夫。
  • 调整消费者拉取批次大小。
  • 优化音讯沉积参数。

具体参数配置请参考 RocketMQ 官网文档。

12. 社区与反对

  • 有问题请在 码云 提交 issue。
  • 欢送参加我的项目开发,可通过 Fork 仓库并提交 Pull Request。
  • 更多信息请关注作者 码云主页。
正文完
 0