文章首发于公众号《程序员果果》
地址 : https://mp.weixin.qq.com/s/Ff…
简介
上一篇文章《Eureka 源码分析之 Eureka Client》通过源码知道,eureka Client 是通过 http rest 来 与 eureka server 交互,实现 注册服务,续约服务,服务下线 等。本篇探究下 eureka server。
源码分析
从 @EnableEurekaServer 注解为入口分析,通过源码可以看出他是一个标记注解:
/**
* Annotation to activate Eureka Server related configuration {@link
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {}
从注释可以知道,用来激活 eureka server 的 配置类 EurekaServerAutoConfiguration 中相关配置,EurekaServerAutoConfiguration 的关键代码如下:
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
/**
* List of packages containing Jersey resources required by the Eureka server
*/
private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",
"com.netflix.eureka" };
@Autowired
private ApplicationInfoManager applicationInfoManager;
@Autowired
private EurekaServerConfig eurekaServerConfig;
@Autowired
private EurekaClientConfig eurekaClientConfig;
@Autowired
private EurekaClient eurekaClient;
@Autowired
private InstanceRegistryProperties instanceRegistryProperties;
public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();
@Bean
public HasFeatures eurekaServerFeature() {
return HasFeatures.namedFeature("Eureka Server",
EurekaServerAutoConfiguration.class);
}
@Configuration
protected static class EurekaServerConfigBeanConfiguration {
// 创建并加载 EurekaServerConfig 的实现类,主要是 Eureka-server 的配置信息
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
// 加载 EurekaController,SpringCloud 提供了一些额外的接口,用来获取 eurekaServer 的信息
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager);
}
// 省略 ...
// 接收客户端的注册等请求就是通过 InstanceRegistry 来处理的,是真正处理业务的类,接下来会详细分析
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// 配置服务节点信息,这里的作用主要是为了配置 Eureka 的 peer 节点,也就是说当有收到有节点注册上来的时候,需要通知给哪些节点
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
// 省略 ...
//EurekaServer 的上下文
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
// 初始化 Eureka-server,会同步其他注册中心的数据到当前注册中心
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
// 配置拦截器,ServletContainer 里面实现了 jersey 框架,通过他来实现 eurekaServer 对外的 restFull 接口
@Bean
public FilterRegistrationBean jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
// 省略 ...
}
从 EurekaServerAutoConfiguration 类上的注解 @Import(EurekaServerInitializerConfiguration.class) 可以到,实例化类 EurekaServerAutoConfiguration 之前,已经实例化了 EurekaServerInitializerConfiguration 类,代码如下:
@Configuration
@CommonsLog
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
// 此处省略部分代码
@Override
public void start() {
// 启动一个线程
new Thread(new Runnable() {
@Override
public void run() {
try {
// 初始化 EurekaServer,同时启动 Eureka Server,后面着重讲这里
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 发布 EurekaServer 的注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
// 设置启动的状态为 true
EurekaServerInitializerConfiguration.this.running = true;
// 发送 Eureka Start 事件,其他还有各种事件,我们可以监听这种时间,然后做一些特定的业务需求,后面会讲到。publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();}
// 此处省略部分代码
}
这个 start 方法中开启了一个新的线程,然后进行一些 Eureka Server 的初始化工作,比如调用 eurekaServerBootstrap 的 contextInitialized 方法,EurekaServerBootstrap 代码如下:
public class EurekaServerBootstrap {
// 此处省略部分代码
public void contextInitialized(ServletContext context) {
try {
// 初始化 Eureka 的环境变量
initEurekaEnvironment();
// 初始化 Eureka 的上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaEnvironment() throws Exception {log.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
else {ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();}
// 初始化 eureka server 上下文
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 从相邻的 eureka 节点复制注册表
int registryCount = this.registry.syncUp();
// 默认每 30 秒发送心跳,1 分钟就是 2 次
// 修改 eureka 状态为 up
// 同时,这里面会开启一个定时任务,用于清理 60 秒没有心跳的客户端。自动下线
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();}
public void contextDestroyed(ServletContext context) {
try {log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();}
catch (Throwable e) {log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
}
在初始化 Eureka Server 上下文环境后,就会继续执行 openForTraffic 方法,这个方法主要是设置了期望每分钟接收到的心跳次数,并将服务实例的状态设置为 UP,最后又通过方法 postInit 来开启一个定时任务,用于每隔一段时间(默认 60 秒)将没有续约的服务实例(默认 90 秒没有续约)清理掉。openForTraffic 的方法代码如下:
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 计算每分钟最大续约数
this.expectedNumberOfRenewsPerMin = count * 2;
// 计算每分钟最小续约数
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 修改服务实例的状态为 UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 开启定时任务,每隔一段时间(默认 60 秒)将没有续约的服务实例(默认 90 秒没有续约)清理掉
super.postInit();}
postInit 方法开启了一个新的定时任务,代码如下:
protected void postInit() {renewsLastMin.start();
if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
这里的时间间隔都来自于 EurekaServerConfigBean 类,可以在配置文件中以 eureka.server 开头的配置来进行设置。
参考
https://www.e-learn.cn/conten…
https://nobodyiam.com/2016/06…
https://blog.csdn.net/Lammonp…
关注我