RocketMQ 反对同步、异步、Oneway 三种音讯发送形式。

  • 同步:客户端发动一次音讯发送后会同步期待服务器的响应后果。
  • 异步:客户端发动一下音讯发动申请后不期待服务器响应后果而是立刻返回,这样不会阻塞客户端子线程,当客户端收到服务端(Broker)的响应后果后会主动调用回调函数。
  • Oneway:客户端发动音讯发送申请后并不会期待服务器的响应后果,也不会调用回调函数,即不关怀音讯的最终发送后果。

这里重点介绍下异步与同步。

异步音讯

  1. 每一个音讯发送者实例(DefaultMQProducer)外部会创立一个异步音讯发送线程池,默认线程数量为 CPU 核数,线程池外部持有一个有界队列,默认长度为 5W,并且会管制异步调用的最大并发度,默认为 65536,其能够通过参数 clientAsyncSemaphoreValue 来配置。
  2. 客户端使线程池将音讯发送到服务端,服务端解决实现后,返回构造并依据是否产生异样调用 SendCallback 回调函数

下面是发送异步音讯的过程,上面再从源码上剖析下。

public void start() throws MQClientException {        this.defaultMQProducerImpl.start();        if (null != traceDispatcher) {            try {                traceDispatcher.start(this.getNamesrvAddr());            } catch (MQClientException e) {                log.warn("trace dispatcher start failed ", e);            }        }    }

这个是Producer服务的启动入口。接着看DefaultMQProducerImpl类:

public void start(final boolean startFactory) throws MQClientException {    ...    if (startFactory) {        //启动MQClientInstance        mQClientFactory.start();    }    ...}public void start(final boolean startFactory) throws MQClientException {        ...        this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);        ...    }

在getAndCreateMQClientInstance办法里会创立MQClientInstance实例,接着在MQClientInstance创立过程上又会创立DefaultMQProducerImpl对象,这时会创立一个异步音讯发送线程池。

        this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);        this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(            Runtime.getRuntime().availableProcessors(),            Runtime.getRuntime().availableProcessors(),            1000 * 60,            TimeUnit.MILLISECONDS,            this.asyncSenderThreadPoolQueue,            new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());                }            });

上面看下异步回调的中央,波及MQClientAPIImpl#sendMessageAsync办法:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {            @Override            public void operationComplete(ResponseFuture responseFuture) {                RemotingCommand response = responseFuture.getResponseCommand();                ...                if (response != null) {                    try {                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);                        assert sendResult != null;                        if (context != null) {                            context.setSendResult(sendResult);                            context.getProducer().executeSendMessageHookAfter(context);                        }                        try {                            sendCallback.onSuccess(sendResult);                        } catch (Throwable e) {                        }            ...            }           }        });

其中的sendCallback.onSuccess(sendResult)就是broker解决完申请后在进行回调。下面提到的限度65535并发是通过NettyRemotingAbstract#invokeAsyncImpl()里设置的Semaphore实现的,它默认是65535且可通过clientAsyncSemaphoreValue调整。

同步发送
因为RocketMQ是借助Netty进行IO读写,而Netty是多主从Ractor模型,所以同步调用其实也是异步,只不过RocketMQ应用了一点技巧将异步转成了同步。咱们来看下代码:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,        final long timeoutMillis)        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {        final int opaque = request.getOpaque();        try {            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);            this.responseTable.put(opaque, responseFuture);            final SocketAddress addr = channel.remoteAddress();            //有响应后进行回调,这就是异步异步            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {//相干点1                @Override                public void operationComplete(ChannelFuture f) throws Exception {                    if (f.isSuccess()) {                        responseFuture.setSendRequestOK(true);                        return;                    } else {                        responseFuture.setSendRequestOK(false);                    }                    responseTable.remove(opaque);                    responseFuture.setCause(f.cause());                    responseFuture.putResponse(null);//相干点2                    log.warn("send a request command to channel <" + addr + "> failed.");                }            });            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//相干点3            if (null == responseCommand) {                if (responseFuture.isSendRequestOK()) {                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,                        responseFuture.getCause());                } else {                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());                }            }            return responseCommand;        } finally {            this.responseTable.remove(opaque);        }    }

在‘相干点1’外面的回调就是broker解决完后应用另一个线程进行了回调。让主线程期待就是‘相干点2’外的代码

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis)

外面应用了countDownLatch这个工具,那是在哪里进行的countDown呢?答案就是在回调里的‘相干点2’处的responseFuture.putResponse(null)。这个就是异步转同步的办法。

Oneway的形式

Oneway 形式通常用于发送一些不太重要的音讯,例如操作日志,偶尔呈现音讯失落对业务无影响,这里就不过多的提了。

总结

本文次要提到了RocketMQ三种音讯发送形式,重点介绍了异步发送逻辑与同步形式里如何将异步转成同步。
依据笔者在网上查到的一些材料来看应用异步发送的形式并不是特地多,如果想进步音讯发送效率,个别是能够从刷盘策略和复制策略动手进行优化,应用同步发送形式基本上是能够满足需要的,当然所有也得从理论的业务场景登程。
最初还要提一点就是失败重试,在三种发送形式里如果SendStatus不是SEND_OK,只有同步的形式才会进行重试,也就是说在弥补机制、容错机制上,如果是异步或Oneway也是咱们在应用时须要思考的问题。