关于java:Spring-Cloud-Eureka源码分析之心跳续约及自我保护机制

0次阅读

共计 12811 个字符,预计需要花费 33 分钟才能阅读完成。

Eureka-Server 是如何判断一个服务不可用的?

Eureka 是通过心跳续约的形式来查看各个服务提供者的衰弱状态。

实际上,在判断服务不可用这个局部,会分为两块逻辑。

  1. Eureka-Server 须要定期检查服务提供者的衰弱状态。
  2. Eureka-Client 在运行过程中须要定期更新注册信息。

Eureka 的心跳续约机制如下图所示。

  1. 客户端在启动时,会开启一个心跳工作,每隔 30s 向服务单发送一次心跳申请。
  2. 服务端保护了每个实例的最初一次心跳工夫,客户端发送心跳包过去后,会更新这个心跳工夫。
  3. 服务端在启动时,开启了一个定时工作,该工作每隔 60s 执行一次,查看每个实例的最初一次心跳工夫是否超过 90s,如果超过则认为过期,须要剔除。

对于上述流程中波及到的工夫,能够通过以下配置来更改.

#Server 至上一次收到 Client 的心跳之后,期待下一次心跳的超时工夫,在这个工夫内若没收到下一次心跳,则将移除该 Instance。eureka.instance.lease-expiration-duration-in-seconds=90
# Server 清理有效节点的工夫距离,默认 60000 毫秒,即 60 秒。eureka.server.eviction-interval-timer-in-ms=60

客户端心跳发动流程

心跳续约是客户端发动的,每隔 30s 执行一次。

DiscoveryClient.initScheduledTasks

持续回到 DiscoveryClient.initScheduledTasks 办法中,

private void initScheduledTasks() {
    // 省略....
    heartbeatTask = new TimedSupervisorTask(
        "heartbeat",
        scheduler,
        heartbeatExecutor,
        renewalIntervalInSecs,
        TimeUnit.SECONDS,
        expBackOffBound,
        new HeartbeatThread());
    scheduler.schedule(
        heartbeatTask,
        renewalIntervalInSecs, TimeUnit.SECONDS);
    // 省略....
}

renewalIntervalInSecs=30s, 默认每隔 30s 执行一次。

HeartbeatThread

这个线程的实现很简略,调用 renew() 续约,如果续约胜利,则更新最初一次心跳续约工夫。

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

renew() 办法中,调用 EurekaServer 的 "apps/" + appName + '/' + id; 这个地址,进行心跳续约。

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

服务端收到心跳解决

服务端具体为调用 [com.netflix.eureka.resources] 包下的 InstanceResource 类的 renewLease 办法进行续约,代码如下

@PUT
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);
    // 调用 renew 进行续约
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // Not found in the registry, immediately ask for a register
    if (!isSuccess) { // 如果续约失败,返回异样
        logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
        return Response.status(Status.NOT_FOUND).build();}
    // Check if we need to sync based on dirty time stamp, the client
    // instance might have changed some value
    Response response;
    // 校验客户端与服务端的工夫差别,如果存在问题则须要从新发动注册
    if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
        // Store the overridden status since the validation found out the node that replicates wins
        if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
                && (overriddenStatus != null)
                && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
                && isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
        }
    } else {response = Response.ok().build(); // 续约胜利,返回 200}
    logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
    return response;
}

InstanceRegistry.renew

renew 的实现办法如下,次要有两个流程

  1. 从服务注册列表中找到匹配以后申请的实例
  2. 公布 EurekaInstanceRenewedEvent 事件
@Override
public boolean renew(final String appName, final String serverId,
                     boolean isReplication) {log("renew" + appName + "serverId" + serverId + ", isReplication {}"
        + isReplication);
    // 获取所有服务注册信息
    List<Application> applications = getSortedApplications();
    for (Application input : applications) { // 逐个遍历
        if (input.getName().equals(appName)) { // 如果以后续约的客户端和某个服务注册信息节点雷同
            InstanceInfo instance = null;
            for (InstanceInfo info : input.getInstances()) { // 遍历这个服务集群下的所有节点,找到某个匹配的实例 instance 返回。if (info.getId().equals(serverId)) {
                    instance = info; //
                    break;
                }
            }
            // 公布 EurekaInstanceRenewedEvent 事件,这个事件在 EurekaServer 中并没有解决,咱们能够监听这个事件来做一些事件,比方做监控。publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                                                        instance, isReplication));
            break;
        }
    }
    return super.renew(appName, serverId, isReplication);
}

super.renew

public boolean renew(final String appName, final String id, final boolean isReplication) {if (super.renew(appName, id, isReplication)) { // 调用父类的续约办法,如果续约胜利
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); // 同步给集群中的所有节点
        return true;
    }
    return false;
}

AbstractInstanceRegistry.renew

在这个办法中,会拿到利用对应的实例列表,而后调用 Lease.renew()去进行心跳续约。

public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); // 依据服务名字获取实例信息
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {leaseToRenew = gMap.get(id);  // 获取须要续约的服务实例,}
    if (leaseToRenew == null) { // 如果为空,阐明这个服务实例不存在,间接返回续约失败
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else { // 示意实例存在
        InstanceInfo instanceInfo = leaseToRenew.getHolder(); // 获取实例的根本信息
        if (instanceInfo != null) { // 实例根本信息不为空
            // touchASGCache(instanceInfo.getASGName());
            // 获取实例的运行状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { // 如果运行状态未知,也返回续约失败
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            // 如果以后申请的实例信息
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info("The instance status {} is different from overridden instance status {} for instance {}."
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        // 更新上一分钟的续约数量
        renewsLastMin.increment();
        leaseToRenew.renew(); // 续约
        return true;
    }
}

续约的实现,就是更新服务端最初一次收到心跳申请的工夫。

public void renew() {lastUpdateTimestamp = System.currentTimeMillis() + duration;

}

Eureka 的自我爱护机制

理论,心跳检测机制有肯定的不确定行,比方服务提供者可能是失常的,然而因为网络通信的问题,导致在 90s 内没有收到心跳申请,那将会导致衰弱的服务被误杀。

为了防止这种问题,Eureka 提供了一种叫 自我爱护 机制的货色。简略来说,就是开启自我爱护机制后,Eureka Server 会包这些服务实例爱护起来,防止过期导致实例被剔除的问题,从而保障 Eurreka 集群更加强壮和稳固。

进入自我爱护状态后,会呈现以下几种状况

  • Eureka Server 不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务,如果在爱护期内如果服务刚好这个服务提供者非正常下线了,此时服务消费者就会拿到一个有效的服务实例,此时会调用失败,对于这个问题须要服务消费者端要有一些容错机制,如重试,断路器等!
  • Eureka Server 依然可能承受新服务的注册和查问申请,然而不会被同步到其余节点上,保障以后节点仍然可用。

Eureka 自我爱护机制,通过配置 eureka.server.enable-self-preservation 来【true】关上 /【false禁用】自我爱护机制,默认关上状态,倡议生产环境关上此配置。

自我爱护机制应该如何设计,能力更加精准的管制到 “是网络异样” 导致的通信提早,而不是服务宕机呢?

Eureka 是这么做的:如果低于 85% 的客户端节点都没有失常的心跳,那么 Eureka Server 就认为客户端与注册核心呈现了网络故障,Eureka Server 主动进入自我爱护状态.

其中,85%这个阈值,能够通过上面这个配置来设置

# 自我爱护续约百分比,默认是 0.85
eureka.server.renewal-percent-threshold=0.85

然而还有个问题,超过谁的 85% 呢?这里有一个预期的续约数量,这个数量计算公式如下:

// 自我爱护阀值 = 服务总数 * 每分钟续约数(60S/ 客户端续约距离) * 自我爱护续约百分比阀值因子

假如如果有 100 个服务,续约距离是30S,自我爱护阈值0.85,那么它的预期续约数量为:

自我爱护阈值 =100 * 60 / 30 * 0.85 = 170。

主动续约的阈值设置

在 EurekaServerBootstrap 这个类的 contextInitialized 办法中,会调用 initEurekaServerContext 进行初始化

public void contextInitialized(ServletContext context) {
    try {initEurekaEnvironment();
        initEurekaServerContext();

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

持续往下看。

protected void initEurekaServerContext() throws Exception {EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
    //...
    registry.openForTraffic(applicationInfoManager, registryCount);
}

在 openForTraffic 办法中,会初始化 expectedNumberOfClientsSendingRenews 这个值,这个值的含意是:预期每分钟收到续约的客户端数量,取决于注册到 eureka server 上的服务数量

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfClientsSendingRenews = count; // 初始值是 1.
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();}

updateRenewsPerMinThreshold

接着调用 updateRenewsPerMinThreshold 办法,会更新一个每分钟最小的续约数量,也就是 Eureka Server 冀望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我爱护机制。

protected void updateRenewsPerMinThreshold() {this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}
// 自我爱护阀值 = 服务总数 * 每分钟续约数(60S/ 客户端续约距离) * 自我爱护续约百分比阀值因子
  • getExpectedClientRenewalIntervalSeconds,客户端的续约距离,默认为 30s
  • getRenewalPercentThreshold,自我爱护续约百分比阈值因子,默认 0.85。也就是说每分钟的续约数量要大于 85%

预期值的变动触发机制

expectedNumberOfClientsSendingRenewsnumberOfRenewsPerMinThreshold 这两个值,会随着新增服务注册以及服务下线的触发而发生变化。

PeerAwareInstanceRegistryImpl.cancel

当服务提供者被动下线时,示意这个时候 Eureka-Server 要剔除这个服务提供者的地址,同时也代表这这个心跳续约的阈值要发生变化。所以在 PeerAwareInstanceRegistryImpl.cancel 中能够看到数据的更新

调用门路 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel

服务下线之后,意味着须要发送续约的客户端数量递加了,所以在这里进行批改

protected boolean internalCancel(String appName, String id, boolean isReplication) {
  //....
    synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {
            // Since the client wants to cancel it, reduce the number of clients to send renews.
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
            updateRenewsPerMinThreshold();}
    }
}

PeerAwareInstanceRegistryImpl.register

当有新的服务提供者注册到 eureka-server 上时,须要减少续约的客户端数量,所以在 register 办法中会进行解决

register ->super.register(AbstractInstanceRegistry)

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    //....    
    // The lease does not exist and hence it is a new registration
    synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {
            // Since the client wants to register it, increase the number of clients sending renews
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
            updateRenewsPerMinThreshold();}
    }
}

每隔 15 分钟刷新自我爱护阈值

PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask

每隔 15 分钟,更新一次自我爱护阈值!

private void updateRenewalThreshold() {
    try {
        // 1. 计算利用实例数
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {if (this.isRegisterable(instance)) {++count;}
            }
        }
        
        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold or if self preservation is disabled.
            // 当节点数量 count 大于最小续约数量时,或者没有开启自我爱护机制的状况下,从新计算 expectedNumberOfClientsSendingRenews 和 numberOfRenewsPerMinThreshold
            if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                || (!this.isSelfPreservationModeEnabled())) {
                this.expectedNumberOfClientsSendingRenews = count;
                updateRenewsPerMinThreshold();}
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {logger.error("Cannot update renewal threshold", e);
    }
}

自我爱护机制的触发

AbstractInstanceRegistrypostInit办法中,会开启一个 EvictionTask 的工作,这个工作用来检测是否须要开启自我爱护机制。

这个办法也是在 EurekaServerBootstrap 办法启动时触发。

protected void postInit() {renewsLastMin.start(); // 开启一个定时工作,用来实现每分钟的续约数量,每隔 60s 归 0 从新计算
    if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}
    evictionTaskRef.set(new EvictionTask()); // 启动一个定时工作 EvictionTask, 每隔 60s 执行一次
    evictionTimer.schedule(evictionTaskRef.get(),
                           serverConfig.getEvictionIntervalTimerInMs(),
                           serverConfig.getEvictionIntervalTimerInMs());
}

其中,EvictionTask 的代码如下。

private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);

@Override
public void run() {
    try {
        // 获取弥补工夫毫秒数
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {logger.error("Could not run the evict task", e);
    }
}

evict 办法

public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");
     // 是否须要开启自我爱护机制,如果须要,那么间接 RETURE,不须要持续往下执行了
    if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // 这上面次要是做服务主动下线的操作的。}

isLeaseExpirationEnabled

  • 是否开启了自我爱护机制,如果没有,则跳过,默认是开启
  • 计算是否须要开启自我爱护,判断最初一分钟收到的续约数量是否大于numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() {if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;}

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0