关于rocketmq:RocketMQ学习三消息发送方式

4次阅读

共计 4299 个字符,预计需要花费 11 分钟才能阅读完成。

RocketMQ 反对同步、异步、Oneway 三种音讯发送形式。

  • 同步:客户端发动一次音讯发送后会同步期待服务器的响应后果。
  • 异步:客户端发动一下音讯发动申请后不期待服务器响应后果而是立刻返回,这样不会阻塞客户端子线程,当客户端收到服务端(Broker)的响应后果后会主动调用回调函数。
  • Oneway:客户端发动音讯发送申请后并不会期待服务器的响应后果,也不会调用回调函数,即不关怀音讯的最终发送后果。

这里重点介绍下异步与同步。

异步音讯

  1. 每一个音讯发送者实例(DefaultMQProducer)外部会创立一个异步音讯发送线程池,默认线程数量为 CPU 核数,线程池外部持有一个有界队列,默认长度为 5W,并且会管制异步调用的最大并发度,默认为 65536,其能够通过参数 clientAsyncSemaphoreValue 来配置。
  2. 客户端使线程池将音讯发送到服务端,服务端解决实现后,返回构造并依据是否产生异样调用 SendCallback 回调函数

下面是发送异步音讯的过程,上面再从源码上剖析下。

public void start() throws MQClientException {this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {traceDispatcher.start(this.getNamesrvAddr());
            } catch (MQClientException e) {log.warn("trace dispatcher start failed", e);
            }
        }
    }

这个是 Producer 服务的启动入口。接着看 DefaultMQProducerImpl 类:

public void start(final boolean startFactory) throws MQClientException {
    ...
    if (startFactory) {
        // 启动 MQClientInstance
        mQClientFactory.start();}
    ...
}
public void start(final boolean startFactory) throws MQClientException {
        ...
        this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
        ...
    }

在 getAndCreateMQClientInstance 办法里会创立 MQClientInstance 实例,接着在 MQClientInstance 创立过程上又会创立 DefaultMQProducerImpl 对象,这时会创立一个异步音讯发送线程池。

        this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
        this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
            new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                }
            });

上面看下异步回调的中央,波及 MQClientAPIImpl#sendMessageAsync 办法:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {RemotingCommand response = responseFuture.getResponseCommand();
                ...
                if (response != null) {
                    try {SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) { }
            ...
            }
           }
        });

其中的 sendCallback.onSuccess(sendResult) 就是 broker 解决完申请后在进行回调。下面提到的限度 65535 并发是通过 NettyRemotingAbstract#invokeAsyncImpl()里设置的 Semaphore 实现的,它默认是 65535 且可通过 clientAsyncSemaphoreValue 调整。

同步发送
因为 RocketMQ 是借助 Netty 进行 IO 读写,而 Netty 是多主从 Ractor 模型,所以同步调用其实也是异步,只不过 RocketMQ 应用了一点技巧将异步转成了同步。咱们来看下代码:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();

        try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // 有响应后进行回调,这就是异步异步
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {// 相干点 1
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);
                        return;
                    } else {responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);// 相干点 2
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// 相干点 3
            if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {this.responseTable.remove(opaque);
        }
    }

在‘相干点 1’外面的回调就是 broker 解决完后应用另一个线程进行了回调。让主线程期待就是‘相干点 2’外的代码

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis)

外面应用了 countDownLatch 这个工具,那是在哪里进行的 countDown 呢?答案就是在回调里的‘相干点 2’处的responseFuture.putResponse(null)。这个就是异步转同步的办法。

Oneway 的形式

Oneway 形式通常用于发送一些不太重要的音讯,例如操作日志,偶尔呈现音讯失落对业务无影响,这里就不过多的提了。

总结

本文次要提到了 RocketMQ 三种音讯发送形式,重点介绍了异步发送逻辑与同步形式里如何将异步转成同步。
依据笔者在网上查到的一些材料来看应用异步发送的形式并不是特地多,如果想进步音讯发送效率,个别是能够从刷盘策略和复制策略动手进行优化,应用同步发送形式基本上是能够满足需要的,当然所有也得从理论的业务场景登程。
最初还要提一点就是失败重试,在三种发送形式里如果 SendStatus 不是 SEND_OK,只有同步的形式才会进行重试,也就是说在弥补机制、容错机制上,如果是异步或 Oneway 也是咱们在应用时须要思考的问题。

正文完
 0