乐趣区

关于rocketmq:RocketMQ学习四消息发送高可用设计

咱们晓得 RocketMQ 的 NameServer 并非强统一而是最终一致性的,也就是客户端隔一段时间定时去获取 Broker 信息,如果 Broker 一段时间内呈现了故障,客户端并不能马上感应到,那 RocketMQ 如何做到音讯发送的高可用呢?大抵能够从上面三个方面来开展:

  • 重试机制
  • 顺次更换队列
  • 躲避己故障的 Broker

一,重试机制
在发送音讯过程中有这样一段代码:

    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    for (; times < timesTotal; times++) {...}

如果是同步发送的形式默认会重试 3 次,重试次数也能够通过 retryTimesWhenSendFailed 进行配置。

二,顺次更换队列
在 selectOneMessageQueue 办法在里,会从 sendWhichQueue 获得上一次应用过的队列的索引,这个 sendWhichQueue 是一个 ThreadLocalIndex 类型,外面有一个 ThreadLocal,队列索引就是存储在这个 ThreadLocal 里,获得索引后会有一个自增的动作,而后再依据新的索引获取新的队列。举个例子:有一个名为 broker- a 的 broker,外面有队列 q1,q2,q3,q4,如果上一次应用的是队列 q1 那么索引自增后会抉择 q2 作为新的发送队列。通过这样的形式会让这 4 个队列的负载尽量保持一致。代码如下:

                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    // 获取一个队列,队列里记录着所属 broker
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断 broker 是否生效
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        // 第一次获取会是空;获取的 broker 是无效的且还是上一次获取应用过的
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

躲避己故障的 Broker

如果上次应用的 Broker 呈现了故障或延时超过了肯定的范畴,RocketMQ 是如何躲避的呢?
每次发送完音讯后 RocketMQ 都应用 ConcurrentHashMap 记录该 broker 与发送音讯所花工夫的对应关系,在下次发送音讯之前会依据这个工夫判断一下队列所在 Broker 是否能够应用,如果是可用的并且也是上次应用的 Broker 则间接应用,这里咱们重点看一下获取的 Broker 不可应用的状况。咱们假如有 broker-a,broker- b 两个 broker,每个队列里都有 q1,q2,q3,q4 这四个队列:

在发送之前 sendWhichQueue 该值为 broker-a 的 q1,如果因为此时 broker-a 的突发流量异样大导致音讯发送失败,会触发重试,依照轮循机制,下一个抉择的队列为 broker-a 的 q2 队列,此次音讯发送大概率还是会失败,即只管会重试 2 次,但都是发送给同一个 Broker 解决,此过程会显得不那么靠谱,即大概率还是会失败,那这样重试的其实是没多大意义的。

RocketMQ 为了解决该问题,引入了故障躲避机制,在音讯重试的时候,会尽量躲避上一次发送的 Broker,回到上述示例,当音讯发往 broker-a q1 队列时返回发送失败,那重试的时候会先排除 broker-a 中所有队列,即这次会抉择 broker-b q1 队列,增大音讯发送的成功率。RocketMQ 提供了两种躲避策略,该参数由 sendLatencyFaultEnable 管制,用户可干涉,示意是否开启提早躲避机制,默认为不开启。躲避 broker- a 的代码如下:

public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {Collections.shuffle(tmpList);

            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;
            if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();}
        }

        return null;
    }

开启提早躲避机制,一旦音讯发送失败会将 broker-a“乐观”地认为在接下来的一段时间内该 Broker 不可用,在为将来某一段时间内所有的客户端不会向该 Broker 发送音讯。这个延迟时间就是通过 notAvailableDuration、latencyMax 独特计算的,就首先先计算本次音讯发送失败所耗的时延,而后对应 latencyMax 中哪个区间,即计算在 latencyMax 的下标,而后返回 notAvailableDuration 同一个下标对应的提早值. 例如,如果上次申请的 latency 超过 550Lms,就退却 3000Lms;超过 1000L,就退却 60000L;

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }

RocketMQ 就是通过失败重试,顺次更换队列还有躲避有故障的 broker 这三个方面来保障音讯发送的高可用的。

退出移动版