关于java:深入学习RocketMQ之底层解析

7次阅读

共计 19172 个字符,预计需要花费 48 分钟才能阅读完成。

RocketMQ 生产者外围详解

外围参数详解

  • ProducerGroup:组名在一个利用外面是惟一的。
  • CreateTopicKey:理论生产中不会使此参数进行生产者创立 Topic。
  • defaultTopicQueueNums:默认大小为 4,一个 topic 下默认挂载的是四个队列。
  • sendMsgTimeout:单位 ms,音讯发送的超时工夫。
  • compressMsgBodyOverHowmuch:默认压缩字节 4096,主动压缩机制, 当音讯超过 4096 就会压缩。
  • retryTimesWhenSendFailed:同步重发次数。
  • retryAnotherBrokerWhenNotStoreOK:默认 false,没有存储胜利的话,是否能够向其它 Broker 存储。
  • maxMessageSize:默认 128k,最大音讯长度。

主从同步机制解析

咱们之前曾经理解了,当一条音讯发送到 Master 节点时候,会将音讯同步到 Slave 节点。然而怎么做的呢?

首先主从同步须要同步哪些内容?

第一点就是元数据信息同步,第二点就是音讯数据的同步。

元数据信息:是指 topic config 配置信息,还有 consumer 的 offset(生产端的进度信息)。须要留神的是,并不是即时同步,而是底层代码启动定时工作去同步的。

同步信息:数据内容 + 元数据信息。

数据内容:commitlog 理论音讯的存储信息, 是实时同步的,并且底层应用的是 Socket 而不是 Netty。

元数据信息:slave 和 master 基于 commitlog 外面的数据一直比照,而后一直的同步。

元数据失落是能够承受的,能够复原。如果元数据在 slave 和 master 外面不统一,能够做复原,能够调整 offset 地位或者重启 consumer。须要留神的是:commitlog 外面的数据失落了,无奈复原。

主从同步相干源码

如果 Broker 角色为从服务器,会通过定时工作调用 syncAll。

咱们点击 syncAll()办法。从主服务器定时同步 topic 配置信息、音讯生产偏移量、提早队列偏移量、生产组订阅信息。

commitlog 数据同步代码

HAConnection 次要用于音讯读写操作。外面蕴含两个外部类:ReadSocketService、WriteSocketService。

Master 节点:

  • AcceptSocketService:接管 Slave 节点连贯。
  • HAConnection

    • ReadSocketService:读来自 Slave 节点的数据。
    • WriteSocketService:写往到 Slave 节点的数据。

Slave 节点:

  • HAService

    • HAClient:对 Master 节点连贯、读写数据。

通信协议:Master 节点与 Slave 节点通信协议很简略,只有如下两条。

对象 用处 第几位 字段 数据类型 字节数 阐明
Slave=>Master 上报 CommitLog 曾经同步到的物理地位 0 maxPhyOffset Long 8 CommitLog 最大物理地位
Master=>Slave 传输新的 CommitLog 数据 0 fromPhyOffset Long 8 CommitLog 开始物理地位
1 size Int 4 传输的数据长度
2 body Bytes size 传输的数据

晓得了每个类的大略用处,上面咱们看一下代码。

在 HAService 中咱们能够看到 ConnectMaster(),用于连贯 Master 的办法。

应用 NIO 函数:目标很显著,就是为了更加的高效。

private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();
                if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
                    if (socketAddress != null) {this.socketChannel = RemotingUtil.connect(socketAddress);
                        if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                        }
                    }
                }

                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                this.lastWriteTimestamp = System.currentTimeMillis();}

            return this.socketChannel != null;
        }

咱们能够点开 RemotingUtil.connect(socketAddress),而后持续跟进去

public static SocketChannel connect(SocketAddress remote) {return connect(remote, 1000 * 5); // 连贯近程地址,超时工夫 5000ms
}
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
    SocketChannel sc = null;
    try {sc = SocketChannel.open(); // 关上 channel
        sc.configureBlocking(true); // 设置同步阻塞
        sc.socket().setSoLinger(false, -1); // 设置敞开 socket 的提早事件,当线程执行到 socket 的 close()办法时候, 进入阻塞状态,晓得底层数据发送实现,或者超过了延迟时间,才从 close()办法返回
        sc.socket().setTcpNoDelay(true);// 禁止应用 Nagle 算法,应用小数据即时传输
        sc.socket().setReceiveBufferSize(1024 * 64);// 设置缓冲区大小
        sc.socket().setSendBufferSize(1024 * 64);// 设置发送缓冲区大小
        sc.socket().connect(remote, timeoutMillis);// 连贯
        sc.configureBlocking(false); // 不分明为什么设置回去了?return sc;
    } catch (Exception e) {if (sc != null) {
            try {sc.close();
            } catch (IOException e1) {e1.printStackTrace();
            }
        }
    }

    return null;
}

接下来咱们看另一个重要的办法:dispatchReadRequest()。

读取 Master 传输的 CommitLog 数据,并返回是否 OK。

如果读取到数据,就写入 CommitLog。

如果产生异样:

  • Master 传输的数据开始地位 Offset 不等于 Slave 的 CommitLog 数据最大 Offset。
  • 上报到 Master 进度失败。

从 dispatchReadRequest()办法里能够看到,Slave 应用 dispatchPostion 变量来指定每次解决的地位,其目标是为了应答粘包问题。每次提取数据的 body 局部,追加到 CommitLog,当增加胜利一次就马上向 Master 上报此次的进度。

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size
    int readSocketPos = this.byteBufferRead.position();

    while (true) {
          // begin -> 读取到申请数据
        int diff = this.byteBufferRead.position() - this.dispatchPostion;
        if (diff >= msgHeaderSize) {
           // 读取 MasterPhyOffset、BodySize,应用 dispatchPostion 的起因是:解决数据 "粘包" 导致数据读取不残缺
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
            int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
                       // 获取 slave 节点上 commitLog 文件最大的 offset 地位
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            if (slavePhyOffset != 0) {
              // 校验 Master 传输来的数据 offset 是否和 Slave 的 CommitLog 数据最大 offset 是否雷同
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE:"
                        + slavePhyOffset + "MASTER:" + masterPhyOffset);
                    return false;
                }
            }
                        // 读取到音讯
            if (diff >= (msgHeaderSize + bodySize)) {
                // 写入 CommitLog
                byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                this.byteBufferRead.get(bodyData);

                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
                                // 设置解决到的地位
                this.byteBufferRead.position(readSocketPos);
                this.dispatchPostion += msgHeaderSize + bodySize;
                                // 上报到 Master 进度
                if (!reportSlaveMaxOffsetPlus()) {return false;}
                                // 持续读数据
                continue;
            }
        }
                // 空间写满,重新分配空间
        if (!this.byteBufferRead.hasRemaining()) {this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}

音讯同步发送机制剖析

音讯的同步发送:Producer.send(msg)

同步发送音讯外围实现:DefaultMQProducerImpl

音讯的异步发送:Producer.send(msg,SendCallback sendCallback)

异步发送音讯外围实现:DefaultMQProducerImpl

producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {System.out.println("音讯发送后果:" + sendResult);
    }

    @Override
    public void onException(Throwable e) {System.out.println("音讯发送失败:" + e);
    }
});

咱们能够看一下源码

最终调用 sendDefaultImpl() 办法,在此办法中次要做了:

  • 查找路由信息
  • 应用故障容错组件抉择音讯队列。
private SendResult sendDefaultImpl(
    Message msg, // 发送的音讯
    final CommunicationMode communicationMode, // 网络通信的模式:同步、异步、单向
    final SendCallback sendCallback, // 音讯发送后的回调函数,次要用在异步发送
    final long timeout // 超时工夫
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 查看音讯发送客户端是否是在运行状态
    this.makeSureStateOK();
    // 查看音讯,再一次查看
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 生成一个调用编号,用于上面打印日志,标记为同一次发送音讯
    final long invokeID = random.nextLong();
    // 开始工夫戳
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取 topic 路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 只在有路由信息的时候,且路由信息失常(有音讯队列)if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 次数,同步 = 重试次数 +1, 异步 =1,int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        // 以后为第几次发送
        int times = 0;
        // 存储每次发送音讯抉择的 Broker 名称
        String[] brokersSent = new String[timesTotal];
        // 循环 timesTotal 次数进行发送,直到发送胜利为止
        for (; times < timesTotal; times++) {
            // 抉择的 broker
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 依据路由信息和 Broker 抉择音讯队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                // 设置以后发送的 broker
                brokersSent[times] = mq.getBrokerName();
                try {
                    // 开始工夫
                    beginTimestampPrev = System.currentTimeMillis();
                    // 如果重试次数大于 0,表明曾经重试了
                    if (times > 0) {msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    // 曾经用了的工夫
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    // 超时了,就不持续了,间接退出循环,也就是重试必须在设置的超时工夫以内才从新发送
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 找到后音讯队列和路由信息后处理
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    // 完结工夫戳
                    endTimestamp = System.currentTimeMillis();
                    // 没有出现异常时的更新故障容错
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    // 依据不同的发送形式返回不同的后果
                    // 异步和单向间接返回 null
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            // 如果返回的后果不是 OK 的话且能重试那么就重试,如果失去的后果不是 SEND_OK
                            // 没有返回后果时,比方超时了,那么此时就间接进行重试
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                // 同步发送胜利但存储有问题时候并且配置存储异样时从新发送开关时,进行重试
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}
                            }
                            // 如果不重试的话间接返回后果了
                            return sendResult;
                            // 如果通信模式是其余,那么间接返回
                        default:
                            break;
                    }
                } catch (RemotingException e) {endTimestamp = System.currentTimeMillis();
                    // 更新故障容错
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) {endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        default:
                            if (sendResult != null) {return sendResult;}

                            throw e;
                    }
                } catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn("sendKernelImpl exception", e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                // 如果抉择到的音讯队列为空,那么间接退出循环
                break;
            }
        }
        //
        if (sendResult != null) {return sendResult;}

        // 重试依然失败
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                                    times,
                                    System.currentTimeMillis() - beginTimestampFirst,
                                    msg.getTopic(),
                                    Arrays.toString(brokersSent));

        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

        MQClientException mqClientException = new MQClientException(info, exception);
        // 如果超时了就抛出异样
        if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
        }

        // 呈现其余异样的状况
        if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }

        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic:" + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

音讯的返回状态

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

FLUSH_DISK_TIMEOUT

如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout(默认是 5 秒)设置的工夫内实现刷盘,就会收到此状态码。

FLUSH_SLAVE_TIMEOUT

如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定工夫内实现同步,就会收到此状态码。

SLAVE_NOT_AVAILABLE

如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。

SEND_OK

这个状态能够简略了解为,没有产生下面列出的三个问题状态就是 SEND_OK。须要留神的是,SEND_OK 并不意味着牢靠,如果想严格确保没有音讯失落,须要开启 SYNC_MASTER or SYNC_FLUSH。

如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着音讯会失落,有 2 个抉择,一是无所谓,实用于音讯不关紧要的场景,二是重发,但可能产生音讯反复,这就须要 consumer 进行去重管制。

音讯的提早投递

提早音讯:音讯达到 Broker 后,要在特定的工夫后才会被 Consumer 生产。

目前只反对固定精度的定时音讯。

MessageSoreConfig 类中有 messageDelayLevel 属性。

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
message.setDelayTimeLevel(1); // 也就是提早 1 秒之后投递

音讯的自定义投递规定

实现音讯的自定义投递,咱们须要在发送的时候去指定某一个队列。重写 MessageQueueSelector 的 select 办法。

            SendResult sr = producer.send(message, new MessageQueueSelector() {
                
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer queueNumber = (Integer)arg;
                    return mqs.get(queueNumber);
                }
            }, 2); // 发送同一 topic 第二个队列外面
            System.err.println(sr);

RocketMQ 消费者外围详解

PushConsumer 外围参数详解

  • consumeFromWhere:消费者从那个地位开始生产。

    • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最初地位生产,后续再启动接着上次生产的进度开始生产。
    • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始地位生产,后续再启动接着上次生产的进度开始生产。
    • CONSUME_FROM_TIMESTAMP:第一次启动从指定工夫点地位生产,后续再启动接着上次生产的进度开始生产。
  • allocateMessageQueueStrategy:默认 AllocateMessageQueueAveragely,Rebalance(轮询)算法实现策略。
  • subscription:订阅。
  • offsetStore:音讯进度存储,存储理论的偏移量,两种实现:分为本地和近程的存储。
  • consumeThreadMin/consumeThreadMax:线程池的数量。
  • consumeConcurrentlyMaxSpan/pullThresholdForQueue:单队列并行生产容许的最大跨度,默认 2000;拉音讯本地队列缓存音讯最大数,默认 1000。
  • pullInterval:默认 0,拉音讯距离,因为是长轮询,所以为 0,然而如果利用为了流控,也能够设置大于 0 的值,单位毫秒。
  • pullBatchSize:默认 32,批量拉音讯,一次最多拉多少条。
  • consumeMessageBatchMaxSize:默认 1,批量生产,一次生产多少条音讯。

PushConsumer 生产模式 - 集群模式

RocketMQ 有两种生产模式:Broadcasting 播送模式,Clustering 集群模式,默认的是集群生产模式。
Clustering 集群模式(默认):

  • 通过 consumer.setMessageModel(MessageModel.CLUSTERING)进行设置。
  • GroupName 用于把多个 Consumer 组织到一起。
  • 雷同 GroupName 的 Consumer 只生产所订阅音讯的一部分,即 ConsumerGroup 中的 Consumer 实例均匀摊派生产 topic 的音讯。
  • 目标: 达到人造的负载平衡机制。
  • 音讯的生产进度,即 consumerOffset.json 保留在 broker 上。
  • 音讯生产失败后,consumer 会发回 broker,broker 依据生产失败次数设置不同的 delayLevel 进行重发。
  • 雷同 topic 不同的 consumerGroup 组成伪播送模式,可达到所有 consumer 都会收到音讯。

PushConsumer 生产模式 - 播送模式

  • 通过 consumer.setMessageModel(MessageModel.BROADCASTING)进行设置。
  • 音讯的生产进度保留在 consumer 的机器上。
  • 同一个 ConsumerGroup 里的 Consumer 都生产订阅 Topic 的全副信息。
  • 不同 ConsumerGroup 里的 Consumer 能够实现依据 tags 进行生产即:
consumer1.subscribe("test_model_topic","TagA");
consumer2.subscribe("test_model_topic","TagB");    
  • 音讯生产失败后间接抛弃,不会发回 broker 进行从新投递。
  • 因为所有 consumer 都须要收到音讯,所以不存在负载平衡策略。

音讯存储外围 -Offset 存储

Offset 是音讯生产进度的外围,指某个 topic 下的一条音讯在某个 MessageQueue 里的地位,通过 Offset 能够进行音讯的定位

Offset 的存储实现分为近程文件类型和本地文件类型两种:集群模式下 offset 存在 broker 中; 播送模式下 offset 存在 consumer 中。

RocketMQ 默认是集群生产模式 Clustering,采纳近程文件存储 Offset,即存储在 broker 中实质是因为多生产模式,每个 Consumer 只生产所订阅主题的一部分,这种状况下就须要由 Broker 去管制 Offset 的值,应用 RemoteBrokerOffsetStore 来实现。

在播送模式下,因为每个 Consumer 都会收到音讯且生产,那么各个 Consumer 之间没有任何烦扰,都是独立线程生产,所以应用 LocalFileOffsetStore,即把 Offset 存储到本地。

PushConsumer 消费者长轮询模式

DefaultPushConsumer 是应用长轮询模式进行实现的。

常见的数据同步形式有上面几种:

  • push:producer 发送音讯后,broker 马上把音讯投递给 consumer。这种形式好在实时性比拟高,然而会减少 broker 的负载;而且生产端能力不同,如果 push 推送过快,生产端会呈现很多问题。
  • pull:producer 发送音讯后,broker 什么也不做,等着 consumer 本人来读取。它的长处在于主动权在消费者端,可控性好;然而间隔时间不好设置,距离太短浪费资源,距离太长又会生产不及时。
  • 长轮询机制:当 consumer 过去申请时,broker 会放弃以后连贯一段时间 默认 15s, 如果这段时间内有音讯达到,则立即返回给 consumer;15s 没音讯的话则返回空而后从新申请。这种形式的毛病就是服务端要保留 consumer 状态,客户端过多会始终占用资源。

consumer 是长轮询拉音讯,当 consumer 拉音讯时,broker 端如果没有新音讯,broker 会通过 PullRequestHoldService 服务 hold 住这个申请。

public void run() {log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);
            } else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
           // 查看是否有新的音讯
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {log.warn(this.getServiceName() + "service has exception.", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

RocketMQ 消费者 -PullConsumer 应用

pull 形式次要做了三件事:

  • 获取 MessageQueue 并遍历
  • 保护 OffsetStore
  • 依据不同的音讯状态做不同的解决

DefaultMQPullConsumer,Pull 模式简略样例

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/27
 * @Description:
 */
public class Consumer {

    // Map<key,value> key 为指定队列,value 为这个队列拉取数据的最初地位
    private static final Map<MessageQueue, Long> offsetTable = new HashMap<>();
    public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";
    public static void main(String[] args)  {

        try {
            String group_name = "test_pull_producer_name";
            DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
            consumer.setNamesrvAddr(NAME_SRV_ADDR);
            consumer.start();
            // 从 topicTest 这个主题去获取所有队列(默认会有 4 个队列)
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("test_pull_topic");
            // 遍历每一个队列进行数据拉取
            for (MessageQueue messageQueue : messageQueues) {System.out.println("consumer from the queue:" + messageQueue);
                SINGLE_MQ:
                while (true) {
                    try {
                        // 从 queue 中获取数据,从什么地位开始拉取数据,单次最多拉取 32 条数据
                        PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), 32);
                        System.out.println(pullResult);
                        System.out.println(pullResult.getPullStatus());
                        putMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset());
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                                System.out.println("没有新的数据");
                                break SINGLE_MQ;
                            case OFFSET_ILLEGAL:
                                break;
                            default:
                                break;
                        }
                    } catch (Exception e) {e.printStackTrace();
                    }
                }

            }
        } catch (MQClientException e) {e.printStackTrace();
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offsetTable.get(mq);
        if (offset != null) {return offset;}

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {offsetTable.put(mq, offset);
    }

}

RocketMQ Pull 模式下提供的负载平衡样例(基于 MQPullConsumerScheduleService)

/**
 * @author 又坏又迷人
 * 公众号: Java 菜鸟程序员
 * @date 2021/1/26
 * @Description:
 */
public class PullConsumerScheduleService {

    public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

    public static void main(String[] args) throws MQClientException {
        String group_name = "test_pull_consumer_name";

        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(NAME_SRV_ADDR);

        scheduleService.setMessageModel(MessageModel.CLUSTERING);

        scheduleService.registerPullTaskCallback("test_pull_topic", (mq, context) -> {MQPullConsumer consumer = context.getPullConsumer();
            System.err.println("-------------- queueId:" + mq.getQueueId() + "-------------");
            try {
                // 获取从哪里拉取
                long offset = consumer.fetchConsumeOffset(mq, false);
                if (offset < 0) {offset = 0;}

                PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> list = pullResult.getMsgFoundList();
                        for (MessageExt msg : list) {
                            // 生产数据
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                // 设置再过 3000ms 后从新拉取
                context.setPullNextDelayTimeMillis(3000);

            } catch (Exception e) {e.printStackTrace();
            }
        });

        scheduleService.start();}

}

外围原理解析

RocketMQ 音讯的存储构造

如下图所示:

  • 音讯主体以及元数据都存储在 CommitLog 文件当中,齐全程序写,随机读。
  • Consume Queue 相当于 kafka 中的 partition,是一个逻辑队列,存储了这个 Queue 在 CommiLog 中的起始 offset,log 大小和 MessageTag 的 hashCode。
  • 每次读取音讯队列先读取 consumerQueue, 而后再通过 consumerQueue 去 commitLog 中拿到音讯主体。

同步刷盘和异步刷盘

RocketMQ 音讯存储:内存 + 磁盘存储,两种刷盘形式。

RocketMQ 和 Redis 等其余存储系统相似,提供了同步和异步两种刷盘形式,同步刷盘形式可能保证数据被写入硬盘,做到真正的长久化,然而也会让零碎的写入速度受制于磁盘的 IO 速度;而异步刷盘形式在将数据写入缓冲之后就返回,提供了零碎的 IO 速度,却存在零碎产生故障时未来得及写入硬盘的数据失落的危险。

RocketMQ 的音讯是存储到磁盘上的,这样既能保障断电后复原,又能够让存储的音讯量超出内存的限度。

RocketMQ 为了进步性能,会尽可能地保障磁盘的程序写。音讯在通过 Producer 写入 RocketMQ 的时候,有两种:

  • 异步刷盘形式:在返回写胜利状态时,音讯可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的音讯量积攒到肯定水平时,对立触发写磁盘操作,疾速写入。
  • 同步刷盘形式:在返回写胜利状态时,音讯曾经被写入磁盘。具体流程是,音讯写入内存的 PAGECACHE 后,立即告诉刷盘线程刷盘,而后期待刷盘实现,刷盘线程执行实现后唤醒期待的线程,返回音讯写胜利的状态。
    同步刷盘还是异步刷盘,是通过 Broker 配置文件里的 flushDiskType 参数设置的,这个参数被设置成 SYNC_FLUSH、ASYNC_FLUSH 中的一个。

同步复制和异步复制

同一组 broker 中有 Master 和 Slave,音讯须要从 Master 复制到 Slave 上,那么有同步和异步两种复制形式。

同步复制:是等 Master 和 Slave 均写胜利后才反馈给客户端写胜利状态。

异步复制:是只有 Master 写胜利即可反馈给客户端写胜利状态。

两种复制形式比照:

  • 异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果 Master 出了故障,有些数据因为没有被写入 Slave,有可能会失落。
  • 同步复制形式下,如果 Master 出故障,Slave 上有全副的备份数据,容易复原,然而同步复制会增大数据写入提早,升高零碎吞吐量。
    配置形式:
    同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数能够被设置成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。

理论利用中要联合业务场景,正当设置刷盘形式和主从复制形式,尤其是 SYNC_FLUSH 形式,因为频繁的触发写磁盘动作,会明显降低性能。

通常状况下,应该把 Master 和 Slave 设置成 ASYNC_FLUSH 的刷盘形式,主从之间配置成 SYNC_MASTER 的复制形式,这样即便有一台机器出故障,依然能够保证数据不丢。

高可用机制

当 Master 节点忙碌,可主动切换到 Slave 节点读取信息。

当 Master 节点 down 机或不可用时,rocketmq 基于 raft 协定反对主从切换,引入了多正本机制,即 DLedger,反对主从切换,即当一个复制组内的主节点宕机后,会在该复制组内触发从新选主,选主实现后即可持续提供音讯写性能。

NameServer 协调服务

Namesrv 的性能,就相当于 RPC 或微服务中的注册核心。对于 MQ 而言,broker 启动,将本身创立的 topic 等信息注册到 Namesrv 上。consumer 和 producer 须要配置 namesrv 的地址,启动后,首先和 namesrv 建设长连贯,并获取相应的 topic 信息(比方,哪些 broker 有 topic 路由信息),而后再和 broker 建设长连贯。Namesrv 自身无状态,可集群横向扩大部署。所有的注册信息,都保留在 namesrv 的相似 map 内存数据结构中。

public class RouteInfoManager {
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
正文完
 0