背景介绍
咱们小组次要负责四轮场景下的司乘匹配工作,基于开源分布式搜索引擎 ElasticSearch 实现订单的召回。同时咱们应用 Flink 实时生产 kafka 音讯,将订单数据写入到对应的 ES 索引中。
想要应用 Elasticsearch 服务,则要先创立一个能够连贯到 ES 集群的客户端。ES 官网提供了很多版本的 Java 客户端,蕴含但不限于:
- Transport 客户端
- Low Level REST 客户端
- High Level REST 客户端
- Java API 客户端
晚期咱们应用的是向 SLB 域名(SLB 是负载平衡服务的缩写)发送申请,在 SLB 前面有配置对应的 ES 节点 ip,应用 SLB 来实现负载平衡。但随着流量的增长,在节假日和线上压测时,经常出现 SLB 带宽超限的问题,影响零碎的稳定性。
于是咱们改用 ip 直连的形式,RestClient 客户端自身自带 ip 节点的负载平衡策略,实现上应用了 Collections.rotate() 办法,感兴趣的能够能够看底层的算法思路。
尽管解决了 SLB 带宽超限的问题,但当初需手动配置 ip 列表,比拟容易出错,一旦配错一个节点,就会引起线上报错,而且每次扩缩容时都要进行 ip 列表的变更。
于是咱们开始调研节点的嗅探机制,先从官网文档动手。
官网文档给出的示例如下:
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"))
.build();
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(60000).build();
下面代码的含意就是初始化一个嗅探器,每 60s 更新一次节点列表。
还能够启用失败嗅探,在每次申请失败之后,节点列表都会立刻更新,而不是在下一轮一般的嗅探中更新。这种首先须要创立一个 SniffOnfalureListener,并在创立 RestClient 时设置一个监听器,在每次节点失败时告诉该监听器。
SniffOnFailureListener sniffOnFailureListener =
new SniffOnFailureListener();
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
.setFailureListener(sniffOnFailureListener)
.build();
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(60000)
.build();
sniffOnFailureListener.setSniffer(sniffer);
除了嗅探器之外,还发现了另一个非凡的配置:节点选择器。
节点选择器能够实现对节点列表的过滤,比方我想过滤出协调节点,只向协调节点发送申请,就能够定义一个 NodeSelector 来实现。
源码分析
上面咱们开始分析 Sniffer 组件的底层实现,先进入 build() 办法:
public Sniffer build() {if (nodesSniffer == null) {this.nodesSniffer = new ElasticsearchNodesSniffer(restClient);
}
return new Sniffer(restClient, nodesSniffer, sniffIntervalMillis, sniffAfterFailureDelayMillis);
}
咱们能够看到实例化一个 sniffer 对象之前,先创立了一个 nodeSniffer 对象,指向的类型是 ElasticsearchNodesSniffer,这个是真正的发送 ES 节点嗅探申请的类,咱们待会再看。
接着来到 Sniffer 的构造方法,看一下作了哪些参数的初始化:
- nodesSniffer:发送节点嗅探申请的对象
- restClient:向 ES 集群发送申请的客户端
- sniffIntervalMillis、sniffAfterFailureDelayMillis:定时嗅探周期、失败后嗅探延迟时间
- scheduler:定时工作线程池
Sniffer(RestClient restClient, NodesSniffer nodesSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) {
this.nodesSniffer = nodesSniffer;
this.restClient = restClient;
this.sniffIntervalMillis = sniffInterval;
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay;
this.scheduler = scheduler;
/*
* The first sniffing round is async, so this constructor returns before nextScheduledTask is assigned to a task.
* The initialized flag is a protection against NPE due to that.
*/
Task task = new Task(sniffIntervalMillis) {
@Override
public void run() {super.run();
initialized.compareAndSet(false, true);
}
};
/*
* We do not keep track of the returned future as we never intend to cancel the initial sniffing round, we rather
* prevent any other operation from being executed till the sniffer is properly initialized
*/
scheduler.schedule(task, 0L);
}
除此之外,咱们能够看到实例化了一个 Task 对象,同时提交工作到了线程池中,工作的延迟时间是 0,阐明立刻执行。从正文能够看出这是初始嗅探,即利用刚启动时立刻触发一轮嗅探。
咱们看一下 Task 对象的 run() 办法是咋样的。Task 有一个属性:nextTaskDelay,从名字能够看出是下一次执行的延迟时间,下面咱们初始化时传入的参数是 sniffIntervalMillis,即下一次工作在通过一个嗅探周期后执行。能够看到在 finally 代码块中,向线程池提交的新工作,延迟时间设置的也是 nextTaskDelay。
class Task implements Runnable {
final long nextTaskDelay;
final AtomicReference<TaskState> taskState = new AtomicReference<>(TaskState.WAITING);
Task(long nextTaskDelay) {this.nextTaskDelay = nextTaskDelay;}
@Override
public void run() {
/*
* Skipped or already started tasks do nothing. In most cases tasks will be cancelled and not run, but we want to protect for
* cases where future#cancel returns true yet the task runs. We want to make sure that such tasks do nothing otherwise they will
* schedule another round at the end and so on, leaving us with multiple parallel sniffing "tracks" whish is undesirable.
*/
if (taskState.compareAndSet(TaskState.WAITING, TaskState.STARTED) == false) {return;}
try {sniff();
} catch (Exception e) {logger.error("error while sniffing nodes", e);
} finally {Task task = new Task(sniffIntervalMillis);
Future<?> future = scheduler.schedule(task, nextTaskDelay);
//tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable
ScheduledTask previousTask = nextScheduledTask;
nextScheduledTask = new ScheduledTask(task, future);
...
}
}
}
具体的嗅探逻辑就在这个 sniff() 办法里了,咱们接着往下看。
final void sniff() throws IOException {List<Node> sniffedNodes = nodesSniffer.sniff();
if (logger.isDebugEnabled()) {logger.debug("sniffed nodes:" + sniffedNodes);
}
if (sniffedNodes.isEmpty()) {logger.warn("no nodes to set, nodes will be updated at the next sniffing round");
} else {restClient.setNodes(sniffedNodes);
}
}
能够看到调用了 nodesSniffer 对象的 sniff() 办法,所以说这个对象才是真正用于发送 ES 节点嗅探申请的,只好持续看它的 sniff() 是怎么实现的啦。
咱们发现它应用 restClient 向 ES 集群发了一次申请!
@Override
public List<Node> sniff() throws IOException {Response response = restClient.performRequest(request);
return readHosts(response.getEntity(), scheme, jsonFactory);
}
这个申请具体是啥呢,咱们在构造函数里找一找:
public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {
...
this.request = new Request("GET", "/_nodes/http");
...
}
能够看到结构了一个 Request 对象,向集群发送 GET 申请,申请的 url 是 /_nodes/http。
申请实现之后,须要对集群的返回后果作解析,readHosts 办法次要是作一些 json 解析工作,咱们就不细看了。最终返回一个蕴含集群中全副节点的 Node 对象列表。
static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {try (InputStream inputStream = entity.getContent()) {JsonParser parser = jsonFactory.createParser(inputStream);
...
return nodes;
}
}
拿到节点列表之后,当然是设置到 restClient 实例的属性当中去,所以回到 Sniffer.sniff() 办法,能够看到最初的确设置到了 restClient 当中。
final void sniff() throws IOException {List<Node> sniffedNodes = nodesSniffer.sniff();
if (logger.isDebugEnabled()) {logger.debug("sniffed nodes:" + sniffedNodes);
}
if (sniffedNodes.isEmpty()) {logger.warn("no nodes to set, nodes will be updated at the next sniffing round");
} else {restClient.setNodes(sniffedNodes);
}
}
好了,到这咱们根本就把 Sniffer 组件的外围逻辑看完了,能够发现就是每次嗅探后同时向线程池提交新的嗅探工作来实现的,工作的执行工夫是设置的嗅探周期。
当然,咱们后面还讲到了能够设置失败嗅探,那这块又是怎么实现的呢?再看一下是怎么设置的:
SniffOnFailureListener sniffOnFailureListener =
new SniffOnFailureListener();
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
.setFailureListener(sniffOnFailureListener)
.build();
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(60000)
.build();
sniffOnFailureListener.setSniffer(sniffer);
编码经验丰富的大佬应该看出采纳的是监听器模式。监听器模式的实质就是观察者模式,先将回调函数注册到被察看对象,当被察看对象发生变化时,通过回调函数告知观察者 / 监听者。
咱们看 performRequest 办法,所有向集群发送的同步申请(比方一次查问申请)最终都会调用这个办法。
private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
final InternalRequest request,
Exception previousException) throws IOException {RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
HttpResponse httpResponse;
try {httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();} catch(Exception e) {RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
onFailure(context.node);
...
}
...
}
能够看到在申请执行异样时(比方查问超时、网络异样),会执行到 onFailure 办法,在这里就会调用监听器中的办法。
private void onFailure(Node node) {
...
failureListener.onFailure(node);
}
咱们看一下 SniffOnFailureListener 这个监听器的实现:
public class SniffOnFailureListener extends RestClient.FailureListener {
private volatile Sniffer sniffer;
@Override
public void onFailure(Node node) {if (sniffer == null) {throw new IllegalStateException("sniffer was not set, unable to sniff on failure");
}
sniffer.sniffOnFailure();}
}
public void sniffOnFailure() {if (this.initialized.get() && this.nextScheduledTask.skip()) {this.scheduler.schedule(new Task(this.sniffAfterFailureDelayMillis), 0L);
}
}
能够看到的确是会立刻触发一次嗅探。启用失败后嗅探的益处就是如果集群中有节点下线可能及时将其从节点列表中移除,而不必等到下一个嗅探周期。
总结
咱们 ES 客户端的节点配置初始化从 SLB 域名切换到动态 ip 列表,目标是移除对 SLB 的依赖,但因为人工配置 ip 列表容易出错,应用 ElasticSearch 节点嗅探机制,缩小人工操作,进步扩缩容效率。
(本文作者:方选豪)