本文主要研究一下nacos的HttpHealthCheckProcessor

HealthCheckProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckProcessor.java

public interface HealthCheckProcessor {    /**     * Run check task for service     *     * @param task check task     */    void process(HealthCheckTask task);    /**     * Get check task type, refer to enum HealthCheckType     *     * @return check type     */    String getType();}
  • HealthCheckProcessor接口定义了process、getType方法

HttpHealthCheckProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HttpHealthCheckProcessor.java

@Componentpublic class HttpHealthCheckProcessor implements HealthCheckProcessor {    @Autowired    private SwitchDomain switchDomain;    @Autowired    private HealthCheckCommon healthCheckCommon;    private static AsyncHttpClient asyncHttpClient;    private static final int CONNECT_TIMEOUT_MS = 500;    static {        try {            AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();            builder.setMaximumConnectionsTotal(-1);            builder.setMaximumConnectionsPerHost(-1);            builder.setAllowPoolingConnection(false);            builder.setFollowRedirects(false);            builder.setIdleConnectionTimeoutInMs(CONNECT_TIMEOUT_MS);            builder.setConnectionTimeoutInMs(CONNECT_TIMEOUT_MS);            builder.setCompressionEnabled(false);            builder.setIOThreadMultiplier(1);            builder.setMaxRequestRetry(0);            builder.setUserAgent("VIPServer");            asyncHttpClient = new AsyncHttpClient(builder.build());        } catch (Throwable e) {            SRV_LOG.error("[HEALTH-CHECK] Error while constructing HTTP asynchronous client", e);        }    }    @Override    public String getType() {        return "HTTP";    }    @Override    public void process(HealthCheckTask task) {        List<Instance> ips = task.getCluster().allIPs(false);        if (CollectionUtils.isEmpty(ips)) {            return;        }        if (!switchDomain.isHealthCheckEnabled()) {            return;        }        Cluster cluster = task.getCluster();        for (Instance ip : ips) {            try {                if (ip.isMarked()) {                    if (SRV_LOG.isDebugEnabled()) {                        SRV_LOG.debug("http check, ip is marked as to skip health check, ip: {}" + ip.getIp());                    }                    continue;                }                if (!ip.markChecking()) {                    SRV_LOG.warn("http check started before last one finished, service: {}:{}:{}",                        task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());                    healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getHttpHealthParams());                    continue;                }                AbstractHealthChecker.Http healthChecker = (AbstractHealthChecker.Http) cluster.getHealthChecker();                int ckPort = cluster.isUseIPPort4Check() ? ip.getPort() : cluster.getDefCkport();                URL host = new URL("http://" + ip.getIp() + ":" + ckPort);                URL target = new URL(host, healthChecker.getPath());                AsyncHttpClient.BoundRequestBuilder builder = asyncHttpClient.prepareGet(target.toString());                Map<String, String> customHeaders = healthChecker.getCustomHeaders();                for (Map.Entry<String, String> entry : customHeaders.entrySet()) {                    if ("Host".equals(entry.getKey())) {                        builder.setVirtualHost(entry.getValue());                        continue;                    }                    builder.setHeader(entry.getKey(), entry.getValue());                }                builder.execute(new HttpHealthCheckCallback(ip, task));                MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();            } catch (Throwable e) {                ip.setCheckRT(switchDomain.getHttpHealthParams().getMax());                healthCheckCommon.checkFail(ip, task, "http:error:" + e.getMessage());                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getHttpHealthParams());            }        }    }    //......}
  • HttpHealthCheckProcessor实现了HealthCheckProcessor接口,其static方法初始化了AsyncHttpClient;其getType返回的是HTTP;其process方法会遍历instances,然后对于非marked及markChecking的执行health check,并注册HttpHealthCheckCallback;对于非markChecking的或者出现异常的则执行healthCheckCommon.reEvaluateCheckRT

HttpHealthCheckCallback

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HttpHealthCheckProcessor.java

    private class HttpHealthCheckCallback extends AsyncCompletionHandler<Integer> {        private Instance ip;        private HealthCheckTask task;        private long startTime = System.currentTimeMillis();        public HttpHealthCheckCallback(Instance ip, HealthCheckTask task) {            this.ip = ip;            this.task = task;        }        @Override        public Integer onCompleted(Response response) throws Exception {            ip.setCheckRT(System.currentTimeMillis() - startTime);            int httpCode = response.getStatusCode();            if (HttpURLConnection.HTTP_OK == httpCode) {                healthCheckCommon.checkOK(ip, task, "http:" + httpCode);                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task, switchDomain.getHttpHealthParams());            } else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode || HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {                // server is busy, need verification later                healthCheckCommon.checkFail(ip, task, "http:" + httpCode);                healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getHttpHealthParams());            } else {                //probably means the state files has been removed by administrator                healthCheckCommon.checkFailNow(ip, task, "http:" + httpCode);                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getHttpHealthParams());            }            return httpCode;        }        @Override        public void onThrowable(Throwable t) {            ip.setCheckRT(System.currentTimeMillis() - startTime);            Throwable cause = t;            int maxStackDepth = 50;            for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {                if (cause instanceof SocketTimeoutException                        || cause instanceof ConnectTimeoutException                        || cause instanceof org.jboss.netty.channel.ConnectTimeoutException                        || cause instanceof TimeoutException                        || cause.getCause() instanceof TimeoutException) {                    healthCheckCommon.checkFail(ip, task, "http:timeout:" + cause.getMessage());                    healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getHttpHealthParams());                    return;                }                cause = cause.getCause();            }            // connection error, probably not reachable            if (t instanceof ConnectException) {                healthCheckCommon.checkFailNow(ip, task, "http:unable2connect:" + t.getMessage());                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getHttpHealthParams());            } else {                healthCheckCommon.checkFail(ip, task, "http:error:" + t.getMessage());                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getHttpHealthParams());            }        }    }
  • HttpHealthCheckCallback实现了async http client的AsyncCompletionHandler;其onCompleted方法对于HTTP_OK执行healthCheckCommon.checkOK;对于HTTP_UNAVAILABLE及HTTP_MOVED_TEMP执行healthCheckCommon.checkFail,其余的执行healthCheckCommon.checkFailNow;其onThrowable针对不同条件执行healthCheckCommon.checkFail或者healthCheckCommon.checkFailNow

小结

HttpHealthCheckProcessor实现了HealthCheckProcessor接口,其static方法初始化了AsyncHttpClient;其getType返回的是HTTP;其process方法会遍历instances,然后对于非marked及markChecking的执行health check,并注册HttpHealthCheckCallback;对于非markChecking的或者出现异常的则执行healthCheckCommon.reEvaluateCheckRT

doc

  • HttpHealthCheckProcessor