背景介绍
咱们小组次要负责四轮场景下的司乘匹配工作,基于开源分布式搜索引擎ElasticSearch实现订单的召回。同时咱们应用Flink实时生产kafka音讯,将订单数据写入到对应的ES索引中。
想要应用Elasticsearch服务,则要先创立一个能够连贯到ES集群的客户端。ES官网提供了很多版本的 Java 客户端,蕴含但不限于:
- Transport 客户端
- Low Level REST 客户端
- High Level REST 客户端
- Java API 客户端
晚期咱们应用的是向SLB域名(SLB是负载平衡服务的缩写)发送申请,在SLB前面有配置对应的ES节点ip,应用SLB来实现负载平衡。但随着流量的增长,在节假日和线上压测时,经常出现SLB带宽超限的问题,影响零碎的稳定性。
于是咱们改用ip直连的形式,RestClient客户端自身自带ip节点的负载平衡策略,实现上应用了 Collections.rotate() 办法,感兴趣的能够能够看底层的算法思路。
尽管解决了SLB带宽超限的问题,但当初需手动配置ip列表,比拟容易出错,一旦配错一个节点,就会引起线上报错,而且每次扩缩容时都要进行ip列表的变更。
于是咱们开始调研节点的嗅探机制,先从官网文档动手。
官网文档给出的示例如下:
RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200, "http")) .build();Sniffer sniffer = Sniffer.builder(restClient) .setSniffIntervalMillis(60000).build();
下面代码的含意就是初始化一个嗅探器,每60s更新一次节点列表。
还能够启用失败嗅探,在每次申请失败之后,节点列表都会立刻更新,而不是在下一轮一般的嗅探中更新。这种首先须要创立一个 SniffOnfalureListener,并在创立 RestClient 时设置一个监听器,在每次节点失败时告诉该监听器。
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200)) .setFailureListener(sniffOnFailureListener) .build();Sniffer sniffer = Sniffer.builder(restClient) .setSniffIntervalMillis(60000) .build();sniffOnFailureListener.setSniffer(sniffer);
除了嗅探器之外,还发现了另一个非凡的配置:节点选择器。
节点选择器能够实现对节点列表的过滤,比方我想过滤出协调节点,只向协调节点发送申请,就能够定义一个NodeSelector来实现。
源码分析
上面咱们开始分析Sniffer组件的底层实现,先进入build()办法:
public Sniffer build() { if (nodesSniffer == null) { this.nodesSniffer = new ElasticsearchNodesSniffer(restClient); } return new Sniffer(restClient, nodesSniffer, sniffIntervalMillis, sniffAfterFailureDelayMillis);}
咱们能够看到实例化一个sniffer对象之前,先创立了一个nodeSniffer对象,指向的类型是ElasticsearchNodesSniffer,这个是真正的发送ES节点嗅探申请的类,咱们待会再看。
接着来到Sniffer的构造方法,看一下作了哪些参数的初始化:
- nodesSniffer:发送节点嗅探申请的对象
- restClient:向ES集群发送申请的客户端
- sniffIntervalMillis、sniffAfterFailureDelayMillis:定时嗅探周期、失败后嗅探延迟时间
- scheduler:定时工作线程池
Sniffer(RestClient restClient, NodesSniffer nodesSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) { this.nodesSniffer = nodesSniffer; this.restClient = restClient; this.sniffIntervalMillis = sniffInterval; this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay; this.scheduler = scheduler; /* * The first sniffing round is async, so this constructor returns before nextScheduledTask is assigned to a task. * The initialized flag is a protection against NPE due to that. */ Task task = new Task(sniffIntervalMillis) { @Override public void run() { super.run(); initialized.compareAndSet(false, true); } }; /* * We do not keep track of the returned future as we never intend to cancel the initial sniffing round, we rather * prevent any other operation from being executed till the sniffer is properly initialized */ scheduler.schedule(task, 0L);}
除此之外,咱们能够看到实例化了一个Task对象,同时提交工作到了线程池中,工作的延迟时间是0,阐明立刻执行。从正文能够看出这是初始嗅探,即利用刚启动时立刻触发一轮嗅探。
咱们看一下Task对象的run()办法是咋样的。Task有一个属性:nextTaskDelay,从名字能够看出是下一次执行的延迟时间,下面咱们初始化时传入的参数是sniffIntervalMillis,即下一次工作在通过一个嗅探周期后执行。能够看到在finally代码块中,向线程池提交的新工作,延迟时间设置的也是nextTaskDelay。
class Task implements Runnable { final long nextTaskDelay; final AtomicReference<TaskState> taskState = new AtomicReference<>(TaskState.WAITING); Task(long nextTaskDelay) { this.nextTaskDelay = nextTaskDelay; } @Override public void run() { /* * Skipped or already started tasks do nothing. In most cases tasks will be cancelled and not run, but we want to protect for * cases where future#cancel returns true yet the task runs. We want to make sure that such tasks do nothing otherwise they will * schedule another round at the end and so on, leaving us with multiple parallel sniffing "tracks" whish is undesirable. */ if (taskState.compareAndSet(TaskState.WAITING, TaskState.STARTED) == false) { return; } try { sniff(); } catch (Exception e) { logger.error("error while sniffing nodes", e); } finally { Task task = new Task(sniffIntervalMillis); Future<?> future = scheduler.schedule(task, nextTaskDelay); //tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable ScheduledTask previousTask = nextScheduledTask; nextScheduledTask = new ScheduledTask(task, future); ... } }}
具体的嗅探逻辑就在这个sniff()办法里了,咱们接着往下看。
final void sniff() throws IOException { List<Node> sniffedNodes = nodesSniffer.sniff(); if (logger.isDebugEnabled()) { logger.debug("sniffed nodes: " + sniffedNodes); } if (sniffedNodes.isEmpty()) { logger.warn("no nodes to set, nodes will be updated at the next sniffing round"); } else { restClient.setNodes(sniffedNodes); }}
能够看到调用了nodesSniffer对象的sniff()办法,所以说这个对象才是真正用于发送ES节点嗅探申请的,只好持续看它的sniff()是怎么实现的啦。
咱们发现它应用restClient向ES集群发了一次申请!
@Overridepublic List<Node> sniff() throws IOException { Response response = restClient.performRequest(request); return readHosts(response.getEntity(), scheme, jsonFactory);}
这个申请具体是啥呢,咱们在构造函数里找一找:
public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) { ... this.request = new Request("GET", "/_nodes/http"); ...}
能够看到结构了一个Request对象,向集群发送GET申请,申请的url是/_nodes/http。
申请实现之后,须要对集群的返回后果作解析,readHosts办法次要是作一些json解析工作,咱们就不细看了。最终返回一个蕴含集群中全副节点的Node对象列表。
static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException { try (InputStream inputStream = entity.getContent()) { JsonParser parser = jsonFactory.createParser(inputStream); ... return nodes; }}
拿到节点列表之后,当然是设置到restClient实例的属性当中去,所以回到Sniffer.sniff()办法,能够看到最初的确设置到了restClient当中。
final void sniff() throws IOException { List<Node> sniffedNodes = nodesSniffer.sniff(); if (logger.isDebugEnabled()) { logger.debug("sniffed nodes: " + sniffedNodes); } if (sniffedNodes.isEmpty()) { logger.warn("no nodes to set, nodes will be updated at the next sniffing round"); } else { restClient.setNodes(sniffedNodes); }}
好了,到这咱们根本就把Sniffer组件的外围逻辑看完了,能够发现就是每次嗅探后同时向线程池提交新的嗅探工作来实现的,工作的执行工夫是设置的嗅探周期。
当然,咱们后面还讲到了能够设置失败嗅探,那这块又是怎么实现的呢?再看一下是怎么设置的:
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200)) .setFailureListener(sniffOnFailureListener) .build();Sniffer sniffer = Sniffer.builder(restClient) .setSniffIntervalMillis(60000) .build();sniffOnFailureListener.setSniffer(sniffer);
编码经验丰富的大佬应该看出采纳的是监听器模式。监听器模式的实质就是观察者模式,先将回调函数注册到被察看对象,当被察看对象发生变化时,通过回调函数告知观察者/监听者。
咱们看performRequest办法,所有向集群发送的同步申请(比方一次查问申请)最终都会调用这个办法。
private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple, final InternalRequest request, Exception previousException) throws IOException { RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); HttpResponse httpResponse; try { httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); } catch(Exception e) { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); ... } ...}
能够看到在申请执行异样时(比方查问超时、网络异样),会执行到onFailure办法,在这里就会调用监听器中的办法。
private void onFailure(Node node) { ... failureListener.onFailure(node);}
咱们看一下SniffOnFailureListener这个监听器的实现:
public class SniffOnFailureListener extends RestClient.FailureListener { private volatile Sniffer sniffer; @Override public void onFailure(Node node) { if (sniffer == null) { throw new IllegalStateException("sniffer was not set, unable to sniff on failure"); } sniffer.sniffOnFailure(); }}
public void sniffOnFailure() { if (this.initialized.get() && this.nextScheduledTask.skip()) { this.scheduler.schedule(new Task(this.sniffAfterFailureDelayMillis), 0L); } }
能够看到的确是会立刻触发一次嗅探。启用失败后嗅探的益处就是如果集群中有节点下线可能及时将其从节点列表中移除,而不必等到下一个嗅探周期。
总结
咱们ES客户端的节点配置初始化从SLB域名切换到动态ip列表,目标是移除对SLB的依赖,但因为人工配置ip列表容易出错,应用ElasticSearch节点嗅探机制,缩小人工操作,进步扩缩容效率。
(本文作者:方选豪)