RocketMQ生产者外围详解
外围参数详解
- ProducerGroup:组名在一个利用外面是惟一的。
- CreateTopicKey:理论生产中不会使此参数进行生产者创立Topic。
- defaultTopicQueueNums:默认大小为4,一个topic下默认挂载的是四个队列。
- sendMsgTimeout:单位ms,音讯发送的超时工夫。
- compressMsgBodyOverHowmuch:默认压缩字节4096,主动压缩机制,当音讯超过4096就会压缩。
- retryTimesWhenSendFailed:同步重发次数。
- retryAnotherBrokerWhenNotStoreOK:默认false,没有存储胜利的话,是否能够向其它Broker存储。
- maxMessageSize:默认128k,最大音讯长度。
主从同步机制解析
咱们之前曾经理解了,当一条音讯发送到Master节点时候,会将音讯同步到Slave节点。然而怎么做的呢?
首先主从同步须要同步哪些内容?
第一点就是元数据信息同步,第二点就是音讯数据的同步。
元数据信息:是指topic config配置信息,还有consumer的offset(生产端的进度信息)。须要留神的是,并不是即时同步,而是底层代码启动定时工作去同步的。
同步信息:数据内容+元数据信息。
数据内容:commitlog理论音讯的存储信息,是实时同步的,并且底层应用的是Socket而不是Netty。
元数据信息:slave和master基于commitlog外面的数据一直比照,而后一直的同步。
元数据失落是能够承受的,能够复原。如果元数据在slave和master外面不统一,能够做复原,能够调整offset地位或者重启consumer。须要留神的是:commitlog外面的数据失落了,无奈复原。
主从同步相干源码
如果Broker角色为从服务器,会通过定时工作调用syncAll。
咱们点击syncAll()办法。从主服务器定时同步topic配置信息、音讯生产偏移量、提早队列偏移量、生产组订阅信息。
commitlog数据同步代码
HAConnection次要用于音讯读写操作。外面蕴含两个外部类:ReadSocketService、WriteSocketService。
Master节点:
- AcceptSocketService:接管Slave节点连贯。
HAConnection
- ReadSocketService:读来自Slave节点的数据。
- WriteSocketService:写往到Slave节点的数据。
Slave节点:
HAService
- HAClient:对Master节点连贯、读写数据。
通信协议:Master节点与Slave节点通信协议很简略,只有如下两条。
对象 | 用处 | 第几位 | 字段 | 数据类型 | 字节数 | 阐明 |
---|---|---|---|---|---|---|
Slave=>Master | 上报CommitLog曾经同步到的物理地位 | 0 | maxPhyOffset | Long | 8 | CommitLog最大物理地位 |
Master=>Slave | 传输新的CommitLog数据 | 0 | fromPhyOffset | Long | 8 | CommitLog开始物理地位 |
1 | size | Int | 4 | 传输的数据长度 | ||
2 | body | Bytes | size | 传输的数据 |
晓得了每个类的大略用处,上面咱们看一下代码。
在HAService中咱们能够看到ConnectMaster(),用于连贯Master的办法。
应用NIO函数:目标很显著,就是为了更加的高效。
private boolean connectMaster() throws ClosedChannelException { if (null == socketChannel) { String addr = this.masterAddress.get(); if (addr != null) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp = System.currentTimeMillis(); } return this.socketChannel != null; }
咱们能够点开RemotingUtil.connect(socketAddress),而后持续跟进去
public static SocketChannel connect(SocketAddress remote) { return connect(remote, 1000 * 5); //连贯近程地址,超时工夫5000ms}
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) { SocketChannel sc = null; try { sc = SocketChannel.open(); //关上channel sc.configureBlocking(true); // 设置同步阻塞 sc.socket().setSoLinger(false, -1); //设置敞开socket的提早事件,当线程执行到socket的close()办法时候,进入阻塞状态,晓得底层数据发送实现,或者超过了延迟时间,才从close()办法返回 sc.socket().setTcpNoDelay(true);//禁止应用Nagle算法,应用小数据即时传输 sc.socket().setReceiveBufferSize(1024 * 64);//设置缓冲区大小 sc.socket().setSendBufferSize(1024 * 64);//设置发送缓冲区大小 sc.socket().connect(remote, timeoutMillis);//连贯 sc.configureBlocking(false); //不分明为什么设置回去了? return sc; } catch (Exception e) { if (sc != null) { try { sc.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return null;}
接下来咱们看另一个重要的办法:dispatchReadRequest()。
读取Master传输的CommitLog数据,并返回是否OK。
如果读取到数据,就写入CommitLog。
如果产生异样:
- Master传输的数据开始地位Offset不等于Slave的CommitLog数据最大Offset。
- 上报到Master进度失败。
从dispatchReadRequest( )办法里能够看到,Slave应用dispatchPostion变量来指定每次解决的地位,其目标是为了应答粘包问题。每次提取数据的body局部,追加到CommitLog,当增加胜利一次就马上向Master上报此次的进度。
private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { // begin -> 读取到申请数据 int diff = this.byteBufferRead.position() - this.dispatchPostion; if (diff >= msgHeaderSize) { // 读取MasterPhyOffset、BodySize,应用dispatchPostion的起因是:解决数据"粘包"导致数据读取不残缺 long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); // 获取slave节点上commitLog文件最大的offset地位 long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { // 校验 Master传输来的数据offset 是否和 Slave的CommitLog数据最大offset 是否雷同 if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } // 读取到音讯 if (diff >= (msgHeaderSize + bodySize)) { // 写入CommitLog byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); // 设置解决到的地位 this.byteBufferRead.position(readSocketPos); this.dispatchPostion += msgHeaderSize + bodySize; // 上报到Master进度 if (!reportSlaveMaxOffsetPlus()) { return false; } //持续读数据 continue; } } // 空间写满,重新分配空间 if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true;}
音讯同步发送机制剖析
音讯的同步发送:Producer.send(msg)
同步发送音讯外围实现:DefaultMQProducerImpl
音讯的异步发送:Producer.send(msg,SendCallback sendCallback)
异步发送音讯外围实现:DefaultMQProducerImpl
producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("音讯发送后果:" + sendResult); } @Override public void onException(Throwable e) { System.out.println("音讯发送失败:" + e); }});
咱们能够看一下源码
最终调用sendDefaultImpl()
办法,在此办法中次要做了:
- 查找路由信息
- 应用故障容错组件抉择音讯队列。
private SendResult sendDefaultImpl( Message msg, // 发送的音讯 final CommunicationMode communicationMode, // 网络通信的模式:同步、异步、单向 final SendCallback sendCallback, // 音讯发送后的回调函数,次要用在异步发送 final long timeout // 超时工夫) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 查看音讯发送客户端是否是在运行状态 this.makeSureStateOK(); // 查看音讯,再一次查看 Validators.checkMessage(msg, this.defaultMQProducer); // 生成一个调用编号,用于上面打印日志,标记为同一次发送音讯 final long invokeID = random.nextLong(); // 开始工夫戳 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 只在有路由信息的时候,且路由信息失常(有音讯队列) if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 次数,同步=重试次数+1,异步=1, int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 以后为第几次发送 int times = 0; // 存储每次发送音讯抉择的Broker名称 String[] brokersSent = new String[timesTotal]; // 循环timesTotal次数进行发送,直到发送胜利为止 for (; times < timesTotal; times++) { // 抉择的broker String lastBrokerName = null == mq ? null : mq.getBrokerName(); //依据路由信息和Broker抉择音讯队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; // 设置以后发送的broker brokersSent[times] = mq.getBrokerName(); try { // 开始工夫 beginTimestampPrev = System.currentTimeMillis(); // 如果重试次数大于0,表明曾经重试了 if (times > 0) { msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 曾经用了的工夫 long costTime = beginTimestampPrev - beginTimestampFirst; // 超时了,就不持续了,间接退出循环,也就是重试必须在设置的超时工夫以内才从新发送 if (timeout < costTime) { callTimeout = true; break; } // 找到后音讯队列和路由信息后处理 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 完结工夫戳 endTimestamp = System.currentTimeMillis(); // 没有出现异常时的更新故障容错 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // 依据不同的发送形式返回不同的后果 // 异步和单向间接返回null switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果返回的后果不是OK的话且能重试那么就重试,如果失去的后果不是 SEND_OK // 没有返回后果时,比方超时了,那么此时就间接进行重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 同步发送胜利但存储有问题时候并且配置存储异样时从新发送开关时,进行重试 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } // 如果不重试的话间接返回后果了 return sendResult; // 如果通信模式是其余,那么间接返回 default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); // 更新故障容错 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { // 如果抉择到的音讯队列为空,那么间接退出循环 break; } } // if (sendResult != null) { return sendResult; } // 重试依然失败 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); // 如果超时了就抛出异样 if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } // 呈现其余异样的状况 if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
音讯的返回状态
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE,}
FLUSH_DISK_TIMEOUT
如果设置了 FlushDiskType=SYNC_FLUSH (默认是 ASYNC_FLUSH),并且 Broker 没有在 syncFlushTimeout (默认是 5 秒)设置的工夫内实现刷盘,就会收到此状态码。
FLUSH_SLAVE_TIMEOUT
如果设置为 SYNC_MASTER,并且 slave Broker 没有在 syncFlushTimeout 设定工夫内实现同步,就会收到此状态码。
SLAVE_NOT_AVAILABLE
如果设置为 SYNC_MASTER,并没有配置 slave Broker,就会收到此状态码。
SEND_OK
这个状态能够简略了解为,没有产生下面列出的三个问题状态就是SEND_OK。须要留神的是,SEND_OK 并不意味着牢靠,如果想严格确保没有音讯失落,须要开启 SYNC_MASTER or SYNC_FLUSH。
如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,意味着音讯会失落,有2个抉择,一是无所谓,实用于音讯不关紧要的场景,二是重发,但可能产生音讯反复,这就须要consumer进行去重管制。
音讯的提早投递
提早音讯:音讯达到Broker后,要在特定的工夫后才会被Consumer生产。
目前只反对固定精度的定时音讯。
MessageSoreConfig类中有messageDelayLevel属性。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
message.setDelayTimeLevel(1); // 也就是提早1秒之后投递
音讯的自定义投递规定
实现音讯的自定义投递,咱们须要在发送的时候去指定某一个队列。重写MessageQueueSelector的select办法。
SendResult sr = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer queueNumber = (Integer)arg; return mqs.get(queueNumber); } }, 2); //发送同一topic第二个队列外面 System.err.println(sr);
RocketMQ消费者外围详解
PushConsumer外围参数详解
consumeFromWhere:消费者从那个地位开始生产。
- CONSUME_FROM_LAST_OFFSET: 第一次启动从队列最初地位生产,后续再启动接着上次生产的进度开始生产。
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始地位生产,后续再启动接着上次生产的进度开始生产。
- CONSUME_FROM_TIMESTAMP:第一次启动从指定工夫点地位生产,后续再启动接着上次生产的进度开始生产。
- allocateMessageQueueStrategy:默认AllocateMessageQueueAveragely,Rebalance(轮询)算法实现策略。
- subscription:订阅。
- offsetStore:音讯进度存储,存储理论的偏移量,两种实现:分为本地和近程的存储。
- consumeThreadMin/consumeThreadMax:线程池的数量。
- consumeConcurrentlyMaxSpan/pullThresholdForQueue:单队列并行生产容许的最大跨度,默认2000;拉音讯本地队列缓存音讯最大数,默认1000。
- pullInterval:默认0,拉音讯距离,因为是长轮询,所以为0,然而如果利用为了流控,也能够设置大于0的值,单位毫秒。
- pullBatchSize: 默认32, 批量拉音讯,一次最多拉多少条。
- consumeMessageBatchMaxSize: 默认1,批量生产,一次生产多少条音讯。
PushConsumer生产模式-集群模式
RocketMQ有两种生产模式:Broadcasting播送模式,Clustering集群模式,默认的是集群生产模式。
Clustering集群模式(默认):
- 通过consumer.setMessageModel(MessageModel.CLUSTERING)进行设置。
- GroupName用于把多个Consumer组织到一起。
- 雷同GroupName的Consumer只生产所订阅音讯的一部分,即ConsumerGroup中的Consumer实例均匀摊派生产topic的音讯。
- 目标:达到人造的负载平衡机制。
- 音讯的生产进度,即consumerOffset.json保留在broker上。
- 音讯生产失败后,consumer会发回broker,broker依据生产失败次数设置不同的delayLevel进行重发。
- 雷同topic不同的consumerGroup组成伪播送模式,可达到所有consumer都会收到音讯。
PushConsumer生产模式-播送模式
- 通过consumer.setMessageModel(MessageModel.BROADCASTING)进行设置。
- 音讯的生产进度保留在consumer的机器上。
- 同一个ConsumerGroup里的Consumer都生产订阅Topic的全副信息。
- 不同ConsumerGroup里的Consumer能够实现依据tags进行生产即:
consumer1.subscribe("test_model_topic","TagA");consumer2.subscribe("test_model_topic","TagB");
- 音讯生产失败后间接抛弃,不会发回broker进行从新投递。
- 因为所有consumer都须要收到音讯,所以不存在负载平衡策略。
音讯存储外围-Offset存储
Offset是音讯生产进度的外围,指某个topic下的一条音讯在某个MessageQueue里的地位,通过Offset能够进行音讯的定位
Offset的存储实现分为近程文件类型和本地文件类型两种:集群模式下offset存在broker
中; 播送模式下offset存在consumer
中。
RocketMQ默认是集群生产模式Clustering,采纳近程文件存储Offset,即存储在broker中实质是因为多生产模式,每个Consumer只生产所订阅主题的一部分,这种状况下就须要由Broker去管制Offset的值,应用RemoteBrokerOffsetStore来实现。
在播送模式下,因为每个Consumer都会收到音讯且生产,那么各个Consumer之间没有任何烦扰,都是独立线程生产,所以应用LocalFileOffsetStore,即把Offset存储到本地。
PushConsumer消费者长轮询模式
DefaultPushConsumer是应用长轮询模式进行实现的。
常见的数据同步形式有上面几种:
- push:producer发送音讯后,broker马上把音讯投递给consumer。这种形式好在实时性比拟高,然而会减少broker的负载;而且生产端能力不同,如果push推送过快,生产端会呈现很多问题。
- pull:producer发送音讯后,broker什么也不做,等着consumer本人来读取。它的长处在于主动权在消费者端,可控性好;然而间隔时间不好设置,距离太短浪费资源,距离太长又会生产不及时。
- 长轮询机制:当consumer过去申请时,broker会放弃以后连贯一段时间 默认15s,如果这段时间内有音讯达到,则立即返回给consumer;15s没音讯的话则返回空而后从新申请。这种形式的毛病就是服务端要保留consumer状态,客户端过多会始终占用资源。
consumer是长轮询拉音讯,当consumer拉音讯时,broker端如果没有新音讯,broker会通过PullRequestHoldService服务hold住这个申请。
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); // 查看是否有新的音讯 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName());}
RocketMQ消费者-PullConsumer应用
pull形式次要做了三件事:
- 获取MessageQueue并遍历
- 保护OffsetStore
- 依据不同的音讯状态做不同的解决
DefaultMQPullConsumer,Pull模式简略样例
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/27 * @Description: */public class Consumer { // Map<key,value> key为指定队列,value为这个队列拉取数据的最初地位 private static final Map<MessageQueue, Long> offsetTable = new HashMap<>(); public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) { try { String group_name = "test_pull_producer_name"; DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name); consumer.setNamesrvAddr(NAME_SRV_ADDR); consumer.start(); //从topicTest这个主题去获取所有队列(默认会有4个队列) Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("test_pull_topic"); //遍历每一个队列进行数据拉取 for (MessageQueue messageQueue : messageQueues) { System.out.println("consumer from the queue:" + messageQueue); SINGLE_MQ: while (true) { try { //从queue中获取数据,从什么地位开始拉取数据,单次最多拉取32条数据 PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), 32); System.out.println(pullResult); System.out.println(pullResult.getPullStatus()); putMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: System.out.println("没有新的数据"); break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } } catch (MQClientException e) { e.printStackTrace(); } } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offsetTable.get(mq); if (offset != null) { return offset; } return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offsetTable.put(mq, offset); }}
RocketMQ Pull模式下提供的负载平衡样例(基于MQPullConsumerScheduleService)
/** * @author 又坏又迷人 * 公众号: Java菜鸟程序员 * @date 2021/1/26 * @Description: */public class PullConsumerScheduleService { public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161"; public static void main(String[] args) throws MQClientException { String group_name = "test_pull_consumer_name"; final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name); scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(NAME_SRV_ADDR); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback("test_pull_topic", (mq, context) -> { MQPullConsumer consumer = context.getPullConsumer(); System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------"); try { // 获取从哪里拉取 long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) { offset = 0; } PullResult pullResult = consumer.pull(mq, "*", offset, 32); switch (pullResult.getPullStatus()) { case FOUND: List<MessageExt> list = pullResult.getMsgFoundList(); for (MessageExt msg : list) { //生产数据 System.out.println(new String(msg.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); // 设置再过3000ms后从新拉取 context.setPullNextDelayTimeMillis(3000); } catch (Exception e) { e.printStackTrace(); } }); scheduleService.start(); }}
外围原理解析
RocketMQ音讯的存储构造
如下图所示:
- 音讯主体以及元数据都存储在CommitLog文件当中,齐全程序写,随机读。
- Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
- 每次读取音讯队列先读取consumerQueue,而后再通过consumerQueue去commitLog中拿到音讯主体。
同步刷盘和异步刷盘
RocketMQ音讯存储:内存+磁盘存储,两种刷盘形式。
RocketMQ和Redis等其余存储系统相似,提供了同步和异步两种刷盘形式,同步刷盘形式可能保证数据被写入硬盘,做到真正的长久化,然而也会让零碎的写入速度受制于磁盘的IO速度;而异步刷盘形式在将数据写入缓冲之后就返回,提供了零碎的IO速度,却存在零碎产生故障时未来得及写入硬盘的数据失落的危险。
RocketMQ的音讯是存储到磁盘上的,这样既能保障断电后复原,又能够让存储的音讯量超出内存的限度。
RocketMQ为了进步性能,会尽可能地保障磁盘的程序写。音讯在通过Producer写入RocketMQ的时候,有两种:
- 异步刷盘形式:在返回写胜利状态时,音讯可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的音讯量积攒到肯定水平时,对立触发写磁盘操作,疾速写入。
- 同步刷盘形式:在返回写胜利状态时,音讯曾经被写入磁盘。具体流程是,音讯写入内存的PAGECACHE后,立即告诉刷盘线程刷盘,而后期待刷盘实现,刷盘线程执行实现后唤醒期待的线程,返回音讯写胜利的状态。
同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个。
同步复制和异步复制
同一组broker中有Master和Slave,音讯须要从Master复制到Slave上,那么有同步和异步两种复制形式。
同步复制:是等Master和Slave均写胜利后才反馈给客户端写胜利状态。
异步复制:是只有Master写胜利即可反馈给客户端写胜利状态。
两种复制形式比照:
- 异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果Master出了故障,有些数据因为没有被写入Slave,有可能会失落。
- 同步复制形式下,如果Master出故障,Slave上有全副的备份数据,容易复原,然而同步复制会增大数据写入提早,升高零碎吞吐量。
配置形式:
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数能够被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。
理论利用中要联合业务场景,正当设置刷盘形式和主从复制形式,尤其是SYNC_FLUSH形式,因为频繁的触发写磁盘动作,会明显降低性能。
通常状况下,应该把Master和Slave设置成ASYNC_FLUSH的刷盘形式,主从之间配置成SYNC_MASTER的复制形式,这样即便有一台机器出故障,依然能够保证数据不丢。
高可用机制
当Master节点忙碌,可主动切换到Slave节点读取信息。
当Master节点down机或不可用时,rocketmq基于raft 协定反对主从切换,引入了多正本机制,即DLedger,反对主从切换,即当一个复制组内的主节点宕机后,会在该复制组内触发从新选主,选主实现后即可持续提供音讯写性能。
NameServer协调服务
Namesrv的性能,就相当于RPC或微服务中的注册核心。对于MQ而言,broker启动,将本身创立的topic等信息注册到Namesrv上。consumer和producer须要配置namesrv的地址,启动后,首先和namesrv建设长连贯,并获取相应的topic信息(比方,哪些broker有topic路由信息),而后再和broker建设长连贯。Namesrv自身无状态,可集群横向扩大部署。所有的注册信息,都保留在namesrv的相似map内存数据结构中。
public class RouteInfoManager { private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;}