乐趣区

关于后端:解析-RocketMQ-业务消息顺序消息

简介:本篇将持续业务音讯集成的场景,从性能原理、利用案例、最佳实际以及实战等角度介绍 RocketMQ 的程序音讯性能。作者:绍舒 引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里团体外部业务以及阿里云数以万计的企业客户。作为金融级牢靠的业务音讯计划,RocketMQ 从创立之初就始终专一于业务集成畛域的异步通信能力构建。本篇将持续业务音讯集成的场景,从性能原理、利用案例、最佳实际以及实战等角度介绍 RocketMQ 的程序音讯性能。简介 程序音讯是音讯队列 RocketMQ 版提供的一种对音讯发送和生产程序有严格要求的音讯。对于一个指定的 Topic,同一 MessageGroup 的音讯依照严格的先进先出(FIFO)准则进行公布和生产,即先公布的音讯先生产,后公布的音讯后生产,服务端严格依照发送程序进行存储、生产。同一 MessageGroup 的音讯保障程序,不同 MessageGroup 之间的音讯程序不做要求,因而需做到两点,发送的程序性和生产的程序性。

 性能原理 在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?接下来就会围绕这个问题,比照一般音讯和程序音讯进行论述。程序发送 在分布式环境下,保障音讯的全局程序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的状况下各自向 RocketMQ 服务端发送音讯 a 和音讯 b,因为分布式系统的限度,咱们无奈保障 a 和 b 的程序。因而业界音讯零碎通常保障的是分区的程序性,即保障带有同一属性的音讯的程序,咱们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条音讯 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。

 同时,对于同一 MessageGroup,为了保障其发送程序的先后性,比较简单的做法是结构一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,程序音讯是同步发送的。同步发送的益处是不言而喻的,在客户端失去上一条音讯的发送后果后再发送下一条,即能精确保障发送程序,若应用异步发送或多线程则很难保障这一点。

 因而能够看到,尽管在底层原理上,程序音讯发送和一般音讯发送并无二异,然而为了保障程序音讯的发送程序性,同步发送的形式相比拟一般音讯,实际上升高了音讯的最大吞吐。程序生产 与程序音讯不同的是,一般音讯的生产实际上没有任何限度,消费者拉取的音讯是被异步、并发生产的,而程序音讯,须要保障对于同一个 MessageGroup,同一时刻只有一个客户端在生产音讯,并且在该条音讯被确认生产实现之前(或者进入死信队列),消费者无奈生产同一 MessageGroup 的下一条音讯,否则生产的程序性将得不到保障。因而这里存在着一个生产瓶颈,该瓶颈取决于用户本身的业务解决逻辑。极其状况下当某一 MessageGroup 的音讯过多时,就可能导致生产沉积。当然也须要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的音讯之间并不存在程序性的关联,是能够进行并发生产的。因而全文中提到的程序实际上是一种偏序。

 小结 无论对于发送还是生产,咱们通过 MessageGroup 的形式将音讯分组,即并发的根本单元是 MessageGroup,不同的 MessageGroup 能够并发的发送和生产,从而肯定水平具备了可拓展性,反对多队列存储、程度拆分、并发生产,且不受影响。回顾一般音讯,站在程序音讯的视角,能够认为一般音讯的并发根本单元是单条音讯,即每条音讯均领有不同的 MessageGroup。咱们回到结尾那个问题:既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?当初大家对于这个问题可能有一个根本的印象了,音讯的程序性当然很好,然而为了实现程序性也是有代价的。下述是一个表格,简要比照了程序音讯和一般音讯。

 最佳实际 正当设置 MessageGroup MessageGroup 会有很多谬误的抉择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为局部规模较大的商家会产出较多订单,因为上游生产能力的限度,因而这部分商家所对应的订单就产生了重大的沉积。正确的做法该当是将订单号作为 MessageGroup,而且站在背地的业务逻辑上来说,同一订单才有程序性的要求。即抉择 MessageGroup 的最佳实际是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量该当尽量雷同且平均。同步发送和发送重试 如之前章节所述,需应用同步发送和发送重试来保障发送的程序性。生产幂等 音讯传输链路在异样场景下会有大量反复,业务生产是须要做生产幂等,防止反复解决带来的危险。利用案例 用户注册须要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的音讯都会依照公布的先后顺序来生产。电商的订单创立,以订单 ID 作为 MessageGroup,那么同一个订单相干的创立订单音讯、订单领取音讯、订单退款音讯、订单物流音讯都会依照公布的先后顺序来生产。

 实战 发送 能够看到,该发送案例设置了 MessageGroup 并且应用了同步发送,发送的代码如下:public class ProducerFifoMessageExample {

private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);

private ProducerFifoMessageExample() {}

public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();

    // Credential provider is optional for client configuration.
    String accessKey = "yourAccessKey";
    String secretKey = "yourSecretKey";
    SessionCredentialsProvider sessionCredentialsProvider =
        new StaticSessionCredentialsProvider(accessKey, secretKey);

    String endpoints = "foobar.com:8080";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .setCredentialProvider(sessionCredentialsProvider)
        .build();
    String topic = "yourFifoTopic";
    final Producer producer = provider.newProducerBuilder()
        .setClientConfiguration(clientConfiguration)
        // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before 
        // message publishing.
        .setTopics(topic)
        // May throw {@link ClientException} if the producer is not initialized.
        .build();
    // Define your message body.
    byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
    String tag = "yourMessageTagA";
    final Message message = provider.newMessageBuilder()
        // Set topic for the current message.
        .setTopic(topic)
        // Message secondary classifier of message besides topic.
        .setTag(tag)
        // Key(s) of the message, another way to mark message besides message id.
        .setKeys("yourMessageKey-1ff69ada8e0e")
        // Message group decides the message delivery order.
        .setMessageGroup("youMessageGroup0")
        .setBody(body)
        .build();
    try {final SendReceipt sendReceipt = producer.send(message);
        LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (Throwable t) {LOGGER.error("Failed to send message", t);
    }
    // Close the producer when you don't need it anymore.
    producer.close();}

} 生产 生产的代码如下:public class SimpleConsumerExample {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {}

public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();

    // Credential provider is optional for client configuration.
    String accessKey = "yourAccessKey";
    String secretKey = "yourSecretKey";
    SessionCredentialsProvider sessionCredentialsProvider =
        new StaticSessionCredentialsProvider(accessKey, secretKey);

    String endpoints = "foobar.com:8080";
    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
        .setEndpoints(endpoints)
        .setCredentialProvider(sessionCredentialsProvider)
        .build();
    String consumerGroup = "yourConsumerGroup";
    Duration awaitDuration = Duration.ofSeconds(30);
    String tag = "yourMessageTagA";
    String topic = "yourTopic";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
        .setClientConfiguration(clientConfiguration)
        // Set the consumer group name.
        .setConsumerGroup(consumerGroup)
        // set await duration for long-polling.
        .setAwaitDuration(awaitDuration)
        // Set the subscription for the consumer.
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        .build();
    // Max message num for each long polling.
    int maxMessageNum = 16;
    // Set message invisible duration after it is received.
    Duration invisibleDuration = Duration.ofSeconds(5);
    final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
    for (MessageView message : messages) {
        try {consumer.ack(message);
        } catch (Throwable t) {LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
        }
    }
    // Close the simple consumer when you don't need it anymore.
    consumer.close();}

} 明天通过对 RocketMQ 程序音讯的介绍,心愿可能帮大家对程序音讯的原理和利用有更深刻的理解,同时也冀望 RocketMQ 的程序音讯可能帮忙您更无效的解决业务问题。如果您对 RocktMQ 的业务音讯感兴趣,也欢迎您扫描下方二维码退出钉钉群一起沟通交流~ 

 点击此处,进入官网理解更多详情~原文链接:https://click.aliyun.com/m/10… 本文为阿里云原创内容,未经容许不得转载。

退出移动版