关于rocketmq:RocketMQ学习五选择队列等特性

47次阅读

共计 4339 个字符,预计需要花费 11 分钟才能阅读完成。

本文次要波及的内容有发送音讯时:

  • 程序音讯之队列抉择机制
  • RocketMQ key
  • RocketMQ tag
  • RocketMQ msgId

程序音讯之队列抉择机制

很多业务场景下须要保障音讯的程序解决,比方订单流转到不同状态都会向同一个 topic 发送音讯,但消费者在进行生产时心愿依照订单的的变动程序进行解决,如果不管制的话音讯会发送到 topic 里的不同队列里去,这样消费者就没方法进行程序生产了。本文先只剖析 Producer 是如何发送程序音讯的,至于 Consumer 的解决当前再进行剖析。
咱们晓得 RocketMQ 是反对队列级别的程序音讯的,那么在发送音讯的时候只有做到将须要程序生产的音讯按程序都发送到一个队列里就能够了。RocketMQ 在音讯发送时提供了自定义的队列负载机制,音讯发送的默认队列负载机制为轮询,那如何进行队列抉择呢?RocketMQ 提供了如下 API(这里只举了其中一个 API 的例子):

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

应用示例:

public static void main(String[] args) throws UnsupportedEncodingException {
        try {MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                int orderId = i % 10;
                Message msg =
                    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();
        }
    }

在发送音讯的时候,咱们指定了抉择队列的实现,传入参考 arg(能够是 orderId 或 userId 等) 而后对整个队列取模,这样就能够做到同一个 arg 都会发到同一个队列里了。

对于程序音讯上面几点须要特地留神:

  1. 如果咱们应用了 MessageQueueSelector,那音讯发送的重试机制将生效,即 RocketMQ 客户端并不会重试,音讯发送的高可用须要由业务方来保障,一个方法就是音讯发送失败后存在数据库中,而后定时调度,最终将音讯发送到 MQ。
  2. 当应用的是异步发送的形式且进行了重试 (异步时 RocketMQ 自身不会重试,业务方可能会进行重试),比方有 1,2,3 三条音讯,但音讯 2 失败了再进行重发,那它可能会在音讯 3 的前面重发胜利,像这种情景也就做不到音讯的有序性了。所以咱们在应用程序音讯时不要应用异步发送而要采纳同步发送形式。
  3. 当 Broker 宕机重启,因为分区会产生重均衡动作,此时生产端依据 key 哈希取模失去的分区发生变化,这时会产生短暂音讯程序不统一的景象。针对这一问题,如果业务方不能容忍短时的程序一致性,要么集群呈现故障后集群立马不可用,要么主题做成单分区,但这么做大大就义了集群的高可用,单分区也会另集群性能大大降低

RocketMQ key 的应用

RocketMQ 提供了丰盛的音讯查问机制,例如应用音讯偏移量、音讯全局惟一 msgId、音讯 Key。

RocketMQ 在音讯发送的时候,能够为一条音讯设置索引建,例如下面示例中咱们指定了 ”KEY”+ 序号作为音讯的 Key,这样咱们能够通过该索引 Key 进行查问音讯。

如果须要为音讯指定 Key,只须要在构建 Message 的时候传入 Key 参数即可,例如上面的 API:

public Message(String topic, String tags, String keys, byte[] body)

RocketMQ tag 的应用

RocketMQ 能够为 Topic 设置 Tag(标签),这样生产端能够对 Topic 中的音讯基于 Tag 进行过滤,即选择性的对 Topic 中的音讯进行解决。

例如一个订单的全生命流程:创立订单、待领取、领取实现、商家审核,商家发货、买家发货,订单每一个状态的变更都会向同一个主题 order_topic 发送音讯,但不同上游零碎只关注订单流中某几个阶段的音讯,并不是须要解决所有音讯。咱们就能够对下面每个状态指定不同的 tag,生产端在订阅音讯的时候也指定相应的 tag,这样消费者就能够只生产本人指定 tag 的音讯了。API 与指定 message key 一样雷同:

public Message(String topic, String tags, String keys, byte[] body)

生产端订阅时指定 tag API:

void subscribe(final String topic, final String subExpression) throws MQClientException;

在控制台咱们能够看到不合乎订阅的 Tag,其生产状态显示为 CONSUMED_BUT_FILTERED(生产但被过滤掉)。

RocketMQ msgId

咱们先来看下生成 msgId 的代码:

    public static String createUniqID() {StringBuilder sb = new StringBuilder(LEN * 2);
        sb.append(FIX_STRING);
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();}

    public static String createUniqID() {StringBuilder sb = new StringBuilder(LEN * 2);
        sb.append(FIX_STRING);
        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
        return sb.toString();}

    private static byte[] createUniqIDBuffer() {ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
        long current = System.currentTimeMillis();
        if (current >= nextStartTime) {setStartTime(current);
        }
        buffer.position(0);
        buffer.putInt((int) (System.currentTimeMillis() - startTime));
        buffer.putShort((short) COUNTER.getAndIncrement());
        return buffer.array();}

FIX_STRING 是什么内容呢?在 MessageClientIDSetter 类的动态代码块里有:

    static {
        LEN = 4 + 2 + 4 + 4 + 2;
        ByteBuffer tempBuffer = ByteBuffer.allocate(10);
        tempBuffer.position(2);
        tempBuffer.putInt(UtilAll.getPid());
        tempBuffer.position(0);
        try {tempBuffer.put(UtilAll.getIP());
        } catch (Exception e) {tempBuffer.put(createFakeIP());
        }
        tempBuffer.position(6);
        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
        setStartTime(System.currentTimeMillis());
        COUNTER = new AtomicInteger(0);
    }

组成成份有:

  • 客户端发送 IP,反对 IPV4 和 IPV6
  • 过程 PID(2 字节)
  • 类加载器的 hashcode(4 字节)
  • 以后零碎工夫戳与启动工夫戳的差值(4 字节)
  • 自增序列(2 字节)

对于每个 producer 实例来说 ip 都是惟一的,所以不同 producer 生成的 msgId 是不会反复的。对于 producer 单个实例来说的辨别因子是:time + counter。利用不重启的状况下 msgId 是能够保障唯一性的,利用重启了只有零碎的时钟不变 msgId 也是惟一的。所以只有零碎的时钟不回拨咱们就能够保障 msgId 的全局惟一。

下面组成部分里的工夫戳差值是以后工夫戳与上个月工夫戳的差值,那利用运行了一个月再进行重启 msgId 就会反复了。从生成算法上来说是的!然而 MQ 的 message 是有时效性的,有效期是 72 小时也就是 3 天。每天的凌晨 4 点 rocketMQ 会把过期的 message 革除掉。所以 msgId 也是保障全局惟一的。

最初再提一下 offsetMsgId.
offsetMsgId 指的是音讯所在 Broker 的物理偏移量,即在 commitlog 文件中的偏移量,其组成如下两局部组成:

  • Broker 的 IP 与端口号
  • commitlog 中的物理偏移量

咱们能够依据 offsetMsgId 即能够定位到具体的音讯,无需晓得该音讯的 Topic 等其余所有信息。

正文完
 0