序
本文主要研究一下 rocketmq 的 AccessChannel
AccessChannel
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java
public enum AccessChannel {
/**
* Means connect to private IDC cluster.
*/
LOCAL,
/**
* Means connect to Cloud service.
*/
CLOUD,
}
- AccessChannel 定义了两个枚举值,分别是 LOCAL 及 CLOUD
TraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java
public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();}
- TraceDispatcher 的 start 方法会接收 AccessChannel 类型的参数
AsyncTraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
public class AsyncTraceDispatcher implements TraceDispatcher {private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
//......
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {if (isStarted.compareAndSet(false, true)) {traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();}
//......
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {if (contextList != null) {this.contextList = contextList;} else {this.contextList = new ArrayList<TraceContext>(1);
}
}
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;}
final Message message = new Message(traceTopic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
//......
}
//......
}
//......
}
- AsyncTraceDispatcher 内部类 AsyncAppenderRequest 的 sendTraceDataByMQ 方法,针对 accessChannel 为 AccessChannel.CLOUD 类型的,会给 TraceConstants.TRACE_TOPIC_PREFIX 加上 regionId 作为 traceTopic
小结
AccessChannel 定义了两个枚举值,分别是 LOCAL 及 CLOUD;TraceDispatcher 的 start 方法会接收 AccessChannel 类型的参数;AsyncTraceDispatcher 内部类 AsyncAppenderRequest 的 sendTraceDataByMQ 方法,针对 accessChannel 为 AccessChannel.CLOUD 类型的,会给 TraceConstants.TRACE_TOPIC_PREFIX 加上 regionId 作为 traceTopic
doc
- AccessChannel