序
本文主要研究一下 nacos 的 TcpSuperSenseProcessor
TcpSuperSenseProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
@Component
public class TcpSuperSenseProcessor implements HealthCheckProcessor, Runnable {
@Autowired
private HealthCheckCommon healthCheckCommon;
@Autowired
private SwitchDomain switchDomain;
public static final int CONNECT_TIMEOUT_MS = 500;
private Map<String, BeatKey> keyMap = new ConcurrentHashMap<>();
private BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<Beat>();
/**
* this value has been carefully tuned, do not modify unless you're confident
*/
private static final int NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ?
1 : Runtime.getRuntime().availableProcessors() / 2;
/**
* because some hosts doesn't support keep-alive connections, disabled temporarily
*/
private static final long TCP_KEEP_ALIVE_MILLIS = 0;
private static ScheduledExecutorService TCP_CHECK_EXECUTOR
= new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setName("nacos.naming.tcp.check.worker");
t.setDaemon(true);
return t;
}
});
private static ScheduledExecutorService NIO_EXECUTOR
= Executors.newScheduledThreadPool(NIO_THREAD_COUNT,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("nacos.supersense.checker");
return thread;
}
}
);
private Selector selector;
public TcpSuperSenseProcessor() {
try {selector = Selector.open();
TCP_CHECK_EXECUTOR.submit(this);
} catch (Exception e) {throw new IllegalStateException("Error while initializing SuperSense(TM).");
}
}
@Override
public void process(HealthCheckTask task) {List<Instance> ips = task.getCluster().allIPs(false);
if (CollectionUtils.isEmpty(ips)) {return;}
for (Instance ip : ips) {if (ip.isMarked()) {if (SRV_LOG.isDebugEnabled()) {SRV_LOG.debug("tcp check, ip is marked as to skip health check, ip:" + ip.getIp());
}
continue;
}
if (!ip.markChecking()) {
SRV_LOG.warn("tcp check started before last one finished, service:"
+ task.getCluster().getService().getName() + ":"
+ task.getCluster().getName() + ":"
+ ip.getIp() + ":"
+ ip.getPort());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getTcpHealthParams());
continue;
}
Beat beat = new Beat(ip, task);
taskQueue.add(beat);
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
}
private void processTask() throws Exception {Collection<Callable<Void>> tasks = new LinkedList<>();
do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
if (beat == null) {return;}
tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
for (Future<?> f : NIO_EXECUTOR.invokeAll(tasks)) {f.get();
}
}
@Override
public void run() {while (true) {
try {processTask();
int readyCount = selector.selectNow();
if (readyCount <= 0) {continue;}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {SelectionKey key = iter.next();
iter.remove();
NIO_EXECUTOR.execute(new PostProcessor(key));
}
} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
}
}
}
//......
@Override
public String getType() {return "TCP";}
}
- TcpSuperSenseProcessor 实现了 HealthCheckProcessor、Runnable 接口
- 其 process 方法会遍历 instances,对于非 markChecking 的会执行 healthCheckCommon.reEvaluateCheckRT,对于 marked 的直接跳过,对于 markChecking 的会创建 Beat 添加到 taskQueue
- 其构造器会往 TCP_CHECK_EXECUTOR 注册自己的 Runnable,其 run 方法不断执行 processTask 方法,然后从 selector 中 select key 然后创建 PostProcessor 提交给 NIO_EXECUTOR;processTask 方法会从 taskQueue 取出 Beat,然后创建 TaskProcessor 添加到 tasks,当 tasks 大小达到一定值则使用 NIO_EXECUTOR.invokeAll(tasks) 批量异步执行
PostProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
public class PostProcessor implements Runnable {
SelectionKey key;
public PostProcessor(SelectionKey key) {this.key = key;}
@Override
public void run() {Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();
beat.finishCheck();
return;
}
if (key.isValid() && key.isConnectable()) {
//connected
channel.finishConnect();
beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(), "tcp:ok+");
}
if (key.isValid() && key.isReadable()) {
//disconnected
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {key.cancel();
key.channel().close();
} else {// not terminate request, ignore}
}
} catch (ConnectException e) {
// unable to connect, possibly port not opened
beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(), "tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());
try {key.cancel();
key.channel().close();
} catch (Exception ignore) {}}
}
}
- PostProcessor 实现了 Runnable 接口,其 run 方法主要是执行 beat.finishCheck
TaskProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
private class TaskProcessor implements Callable<Void> {
private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
Beat beat;
public TaskProcessor(Beat beat) {this.beat = beat;}
@Override
public Void call() {long waited = System.currentTimeMillis() - beat.getStartTime();
if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long:" + waited + "ms");
}
SocketChannel channel = null;
try {Instance instance = beat.getIp();
Cluster cluster = beat.getTask().getCluster();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {instance.setBeingChecked(false);
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
channel.connect(new InetSocketAddress(instance.getIp(), port));
SelectionKey key
= channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));
beat.setStartTime(System.currentTimeMillis());
NIO_EXECUTOR.schedule(new TimeOutTask(key),
CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage());
if (channel != null) {
try {channel.close();
} catch (Exception ignore) {}}
}
return null;
}
}
- TaskProcessor 实现了 Callable<Void> 接口,其 call 方法主要是对目标 instance 执行 beat 操作,同时它会往 NIO_EXECUTOR 注册 TimeOutTask 的延时任务
TimeOutTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/TcpSuperSenseProcessor.java
private static class TimeOutTask implements Runnable {
SelectionKey key;
public TimeOutTask(SelectionKey key) {this.key = key;}
@Override
public void run() {if (key != null && key.isValid()) {SocketChannel channel = (SocketChannel) key.channel();
Beat beat = (Beat) key.attachment();
if (channel.isConnected()) {return;}
try {channel.finishConnect();
} catch (Exception ignore) { }
try {beat.finishCheck(false, false, beat.getTask().getCheckRTNormalized() * 2, "tcp:timeout");
key.cancel();
key.channel().close();
} catch (Exception ignore) {}}
}
}
- TimeOutTask 实现了 Runnable 方法,其 run 方法会执行 channel.finishConnect(),然后执行 beat.finishCheck 标记 success 为 false,msg 为 tcp:timeout
小结
- TcpSuperSenseProcessor 实现了 HealthCheckProcessor、Runnable 接口
- 其 process 方法会遍历 instances,对于非 markChecking 的会执行 healthCheckCommon.reEvaluateCheckRT,对于 marked 的直接跳过,对于 markChecking 的会创建 Beat 添加到 taskQueue
- 其构造器会往 TCP_CHECK_EXECUTOR 注册自己的 Runnable,其 run 方法不断执行 processTask 方法,然后从 selector 中 select key 然后创建 PostProcessor 提交给 NIO_EXECUTOR;processTask 方法会从 taskQueue 取出 Beat,然后创建 TaskProcessor 添加到 tasks,当 tasks 大小达到一定值则使用 NIO_EXECUTOR.invokeAll(tasks) 批量异步执行
doc
- TcpSuperSenseProcessor