关于阿里云:解析-RocketMQ-业务消息顺序消息

32次阅读

共计 5900 个字符,预计需要花费 15 分钟才能阅读完成。

作者:绍舒

引言

Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里团体外部业务以及阿里云数以万计的企业客户。作为金融级牢靠的业务音讯计划,RocketMQ 从创立之初就始终专一于业务集成畛域的异步通信能力构建。本篇将持续业务音讯集成的场景,从性能原理、利用案例、最佳实际以及实战等角度介绍 RocketMQ 的程序音讯性能。

简介

程序音讯是音讯队列 RocketMQ 版提供的一种对音讯发送和生产程序有严格要求的音讯。对于一个指定的 Topic,同一 MessageGroup 的音讯依照严格的先进先出(FIFO)准则进行公布和生产,即先公布的音讯先生产,后公布的音讯后生产,服务端严格依照发送程序进行存储、生产。同一 MessageGroup 的音讯保障程序,不同 MessageGroup 之间的音讯程序不做要求,因而需做到两点,发送的程序性和生产的程序性。

性能原理

在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?接下来就会围绕这个问题,比照一般音讯和程序音讯进行论述。

程序发送

在分布式环境下,保障音讯的全局程序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的状况下各自向 RocketMQ 服务端发送音讯 a 和音讯 b,因为分布式系统的限度,咱们无奈保障 a 和 b 的程序。因而业界音讯零碎通常保障的是分区的程序性,即保障带有同一属性的音讯的程序,咱们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条音讯 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。

同时,对于同一 MessageGroup,为了保障其发送程序的先后性,比较简单的做法是结构一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,程序音讯是同步发送的。同步发送的益处是不言而喻的,在客户端失去上一条音讯的发送后果后再发送下一条,即能精确保障发送程序,若应用异步发送或多线程则很难保障这一点。

因而能够看到,尽管在底层原理上,程序音讯发送和一般音讯发送并无二异,然而为了保障程序音讯的发送程序性,同步发送的形式相比拟一般音讯,实际上升高了音讯的最大吞吐。

程序生产

与程序音讯不同的是,一般音讯的生产实际上没有任何限度,消费者拉取的音讯是被异步、并发生产的,而程序音讯,须要保障对于同一个 MessageGroup,同一时刻只有一个客户端在生产音讯,并且在该条音讯被确认生产实现之前(或者进入死信队列),消费者无奈生产同一 MessageGroup 的下一条音讯,否则生产的程序性将得不到保障。因而这里存在着一个生产瓶颈,该瓶颈取决于用户本身的业务解决逻辑。极其状况下当某一 MessageGroup 的音讯过多时,就可能导致生产沉积。当然也须要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的音讯之间并不存在程序性的关联,是能够进行并发生产的。因而全文中提到的程序实际上是一种偏序。

小结

无论对于发送还是生产,咱们通过 MessageGroup 的形式将音讯分组,即并发的根本单元是 MessageGroup,不同的 MessageGroup 能够并发的发送和生产,从而肯定水平具备了可拓展性,反对多队列存储、程度拆分、并发生产,且不受影响。回顾一般音讯,站在程序音讯的视角,能够认为一般音讯的并发根本单元是单条音讯,即每条音讯均领有不同的 MessageGroup。

咱们回到结尾那个问题:

既然程序音讯能在一般音讯的根底上实现程序,看起来就是一般音讯的加强版,那么为什么不全副都应用程序音讯呢?

当初大家对于这个问题可能有一个根本的印象了,音讯的程序性当然很好,然而为了实现程序性也是有代价的。

下述是一个表格,简要比照了程序音讯和一般音讯。

最佳实际

正当设置 MessageGroup

MessageGroup 会有很多谬误的抉择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为局部规模较大的商家会产出较多订单,因为上游生产能力的限度,因而这部分商家所对应的订单就产生了重大的沉积。正确的做法该当是将订单号作为 MessageGroup,而且站在背地的业务逻辑上来说,同一订单才有程序性的要求。即抉择 MessageGroup 的最佳实际是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量该当尽量雷同且平均。

同步发送和发送重试

如之前章节所述,需应用同步发送和发送重试来保障发送的程序性。

生产幂等

音讯传输链路在异样场景下会有大量反复,业务生产是须要做生产幂等,防止反复解决带来的危险。

利用案例

  • 用户注册须要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的音讯都会依照公布的先后顺序来生产。
  • 电商的订单创立,以订单 ID 作为 MessageGroup,那么同一个订单相干的创立订单音讯、订单领取音讯、订单退款音讯、订单物流音讯都会依照公布的先后顺序来生产。

实战

发送

能够看到,该发送案例设置了 MessageGroup 并且应用了同步发送,发送的代码如下:

public class ProducerFifoMessageExample {private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);

    private ProducerFifoMessageExample() {}

    public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "foobar.com:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();
        String topic = "yourFifoTopic";
        final Producer producer = provider.newProducerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before 
            // message publishing.
            .setTopics(topic)
            // May throw {@link ClientException} if the producer is not initialized.
            .build();
        // Define your message body.
        byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
            // Set topic for the current message.
            .setTopic(topic)
            // Message secondary classifier of message besides topic.
            .setTag(tag)
            // Key(s) of the message, another way to mark message besides message id.
            .setKeys("yourMessageKey-1ff69ada8e0e")
            // Message group decides the message delivery order.
            .setMessageGroup("youMessageGroup0")
            .setBody(body)
            .build();
        try {final SendReceipt sendReceipt = producer.send(message);
            LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {LOGGER.error("Failed to send message", t);
        }
        // Close the producer when you don't need it anymore.
        producer.close();}
}

生产

生产的代码如下:

public class SimpleConsumerExample {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {}

    public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();

        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "foobar.com:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // set await duration for long-polling.
            .setAwaitDuration(awaitDuration)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .build();
        // Max message num for each long polling.
        int maxMessageNum = 16;
        // Set message invisible duration after it is received.
        Duration invisibleDuration = Duration.ofSeconds(5);
        final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
        for (MessageView message : messages) {
            try {consumer.ack(message);
            } catch (Throwable t) {LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
            }
        }
        // Close the simple consumer when you don't need it anymore.
        consumer.close();}
}

明天通过对 RocketMQ 程序音讯的介绍,心愿可能帮大家对程序音讯的原理和利用有更深刻的理解,同时也冀望 RocketMQ 的程序音讯可能帮忙您更无效的解决业务问题。如果您对 RocktMQ 的业务音讯感兴趣,也欢迎您扫描下方二维码退出钉钉群一起沟通交流~

点击此处,进入官网理解更多详情~

正文完
 0