共计 5396 个字符,预计需要花费 14 分钟才能阅读完成。
序
本文主要研究一下 spring cloud 的 ConsulCatalogWatch
ConsulCatalogWatch
spring-cloud-consul-discovery-2.1.2.RELEASE-sources.jar!/org/springframework/cloud/consul/discovery/ConsulCatalogWatch.java
public class ConsulCatalogWatch
implements ApplicationEventPublisherAware, SmartLifecycle {private static final Log log = LogFactory.getLog(ConsulDiscoveryClient.class);
private final ConsulDiscoveryProperties properties;
private final ConsulClient consul;
private final TaskScheduler taskScheduler;
private final AtomicReference<BigInteger> catalogServicesIndex = new AtomicReference<>();
private final AtomicBoolean running = new AtomicBoolean(false);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
public ConsulCatalogWatch(ConsulDiscoveryProperties properties, ConsulClient consul) {this(properties, consul, getTaskScheduler());
}
public ConsulCatalogWatch(ConsulDiscoveryProperties properties, ConsulClient consul,
TaskScheduler taskScheduler) {
this.properties = properties;
this.consul = consul;
this.taskScheduler = taskScheduler;
}
private static ThreadPoolTaskScheduler getTaskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {this.publisher = publisher;}
@Override
public boolean isAutoStartup() {return true;}
@Override
public void stop(Runnable callback) {this.stop();
callback.run();}
@Override
public void start() {if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::catalogServicesWatch,
this.properties.getCatalogServicesWatchDelay());
}
}
@Override
public void stop() {if (this.running.compareAndSet(true, false) && this.watchFuture != null) {this.watchFuture.cancel(true);
}
}
@Override
public boolean isRunning() {return false;}
@Override
public int getPhase() {return 0;}
@Timed("consul.watch-catalog-services")
public void catalogServicesWatch() {
try {
long index = -1;
if (this.catalogServicesIndex.get() != null) {index = this.catalogServicesIndex.get().longValue();}
Response<Map<String, List<String>>> response = this.consul.getCatalogServices(new QueryParams(this.properties.getCatalogServicesWatchTimeout(),
index),
this.properties.getAclToken());
Long consulIndex = response.getConsulIndex();
if (consulIndex != null) {this.catalogServicesIndex.set(BigInteger.valueOf(consulIndex));
}
if (log.isTraceEnabled()) {log.trace("Received services update from consul:" + response.getValue()
+ ", index:" + consulIndex);
}
this.publisher.publishEvent(new HeartbeatEvent(this, consulIndex));
}
catch (Exception e) {log.error("Error watching Consul CatalogServices", e);
}
}
}
- ConsulCatalogWatch 构造器接收 ConsulDiscoveryProperties、ConsulClient、TaskScheduler;其 start 方法会使用 taskScheduler.scheduleWithFixedDelay 注册 catalogServicesWatch 的定时任务;stop 方法则是 cancel 掉这个定时任务;catalogServicesWatch 方法使用 consul.getCatalogServices 方法获取 consulIndex 然后更新本地的 catalogServicesIndex,发布 HeartbeatEvent
ConsulDiscoveryClientConfiguration
spring-cloud-consul-discovery-2.1.2.RELEASE-sources.jar!/org/springframework/cloud/consul/discovery/ConsulDiscoveryClientConfiguration.java
@Configuration
@ConditionalOnConsulEnabled
@ConditionalOnProperty(value = "spring.cloud.consul.discovery.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@EnableConfigurationProperties
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class })
public class ConsulDiscoveryClientConfiguration {
/**
* Name of the catalog watch task scheduler bean.
*/
public static final String CATALOG_WATCH_TASK_SCHEDULER_NAME = "catalogWatchTaskScheduler";
@Autowired
private ConsulClient consulClient;
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty("spring.cloud.consul.discovery.heartbeat.enabled")
// TODO: move to service-registry for Edgware
public TtlScheduler ttlScheduler(HeartbeatProperties heartbeatProperties) {return new TtlScheduler(heartbeatProperties, this.consulClient);
}
@Bean
@ConditionalOnMissingBean
// TODO: move to service-registry for Edgware
public HeartbeatProperties heartbeatProperties() {return new HeartbeatProperties();
}
@Bean
@ConditionalOnMissingBean
// TODO: Split appropriate values to service-registry for Edgware
public ConsulDiscoveryProperties consulDiscoveryProperties(InetUtils inetUtils) {return new ConsulDiscoveryProperties(inetUtils);
}
@Bean
@ConditionalOnMissingBean
public ConsulDiscoveryClient consulDiscoveryClient(ConsulDiscoveryProperties discoveryProperties) {return new ConsulDiscoveryClient(this.consulClient, discoveryProperties);
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.consul.discovery.catalog-services-watch.enabled", matchIfMissing = true)
public ConsulCatalogWatch consulCatalogWatch(
ConsulDiscoveryProperties discoveryProperties,
@Qualifier(CATALOG_WATCH_TASK_SCHEDULER_NAME) TaskScheduler taskScheduler) {
return new ConsulCatalogWatch(discoveryProperties, this.consulClient,
taskScheduler);
}
@Bean(name = CATALOG_WATCH_TASK_SCHEDULER_NAME)
@ConditionalOnProperty(name = "spring.cloud.consul.discovery.catalog-services-watch.enabled", matchIfMissing = true)
public TaskScheduler catalogWatchTaskScheduler() {return new ThreadPoolTaskScheduler();
}
}
- ConsulDiscoveryClientConfiguration 会注册 ConsulCatalogWatch,其使用了名为 catalogWatchTaskScheduler 的 taskScheduler;这里创建的是 ThreadPoolTaskScheduler
小结
ConsulCatalogWatch 构造器接收 ConsulDiscoveryProperties、ConsulClient、TaskScheduler;其 start 方法会使用 taskScheduler.scheduleWithFixedDelay 注册 catalogServicesWatch 的定时任务;stop 方法则是 cancel 掉这个定时任务;catalogServicesWatch 方法使用 consul.getCatalogServices 方法获取 consulIndex 然后更新本地的 catalogServicesIndex,发布 HeartbeatEvent
doc
- ConsulCatalogWatch
正文完