聊聊rocketmq的AccessChannel

23次阅读

共计 3277 个字符,预计需要花费 9 分钟才能阅读完成。

本文主要研究一下 rocketmq 的 AccessChannel

AccessChannel

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java

public enum AccessChannel {
    /**
     * Means connect to private IDC cluster.
     */
    LOCAL,

    /**
     * Means connect to Cloud service.
     */
    CLOUD,
}
  • AccessChannel 定义了两个枚举值,分别是 LOCAL 及 CLOUD

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {

    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    /**
     * Append the transfering data
     * @param ctx data infomation
     * @return
     */
    boolean append(Object ctx);

    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;

    /**
     * Close the trace Hook
     */
    void shutdown();}
  • TraceDispatcher 的 start 方法会接收 AccessChannel 类型的参数

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {private final static InternalLogger log = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final int maxMsgSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecutor;
    // The last discard number of log
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();
    private String traceTopicName;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AccessChannel accessChannel = AccessChannel.LOCAL;

    //......

    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {if (isStarted.compareAndSet(false, true)) {traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();}
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();}

    //......

    class AsyncAppenderRequest implements Runnable {
        List<TraceContext> contextList;

        public AsyncAppenderRequest(final List<TraceContext> contextList) {if (contextList != null) {this.contextList = contextList;} else {this.contextList = new ArrayList<TraceContext>(1);
            }
        }

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
            String traceTopic = traceTopicName;
            if (AccessChannel.CLOUD == accessChannel) {traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;}
            final Message message = new Message(traceTopic, data.getBytes());
            // Keyset of message trace includes msgId of or original message
            message.setKeys(keySet);

            //......
        }

        //......
    }

    //......
}
  • AsyncTraceDispatcher 内部类 AsyncAppenderRequest 的 sendTraceDataByMQ 方法,针对 accessChannel 为 AccessChannel.CLOUD 类型的,会给 TraceConstants.TRACE_TOPIC_PREFIX 加上 regionId 作为 traceTopic

小结

AccessChannel 定义了两个枚举值,分别是 LOCAL 及 CLOUD;TraceDispatcher 的 start 方法会接收 AccessChannel 类型的参数;AsyncTraceDispatcher 内部类 AsyncAppenderRequest 的 sendTraceDataByMQ 方法,针对 accessChannel 为 AccessChannel.CLOUD 类型的,会给 TraceConstants.TRACE_TOPIC_PREFIX 加上 regionId 作为 traceTopic

doc

  • AccessChannel

正文完
 0