乐趣区

聊聊elasticsearch的TransportProxyClient


本文主要研究一下 elasticsearch 的 TransportProxyClient
TransportProxyClient
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportProxyClient.java
final class TransportProxyClient {

private final TransportClientNodesService nodesService;
private final Map<Action, TransportActionNodeProxy> proxies;

TransportProxyClient(Settings settings, TransportService transportService,
TransportClientNodesService nodesService, List<GenericAction> actions) {
this.nodesService = nodesService;
Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
for (GenericAction action : actions) {
if (action instanceof Action) {
proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.proxies = unmodifiableMap(proxies);
}

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
final Request request, ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
assert proxy != null : “no proxy found for action: ” + action;
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
}
}

TransportProxyClient 的构造器接收 Settings、TransportService、TransportClientNodesService、List<GenericAction> 四个参数
TransportProxyClient 的构造器会根据 actions 来给每个 action 创建 TransportActionNodeProxy,并放入到名为 proxies 的 map 中
TransportProxyClient 主要是提供了 execute 方法,该方法从 proxies 取出对应的 TransportActionNodeProxy,然后通过 TransportClientNodesService 的 execute 方法来执行 proxy.execute 方法

TransportActionNodeProxy
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/action/TransportActionNodeProxy.java
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {

private final TransportService transportService;
private final GenericAction<Request, Response> action;
private final TransportRequestOptions transportOptions;

public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
super(settings);
this.action = action;
this.transportService = transportService;
this.transportOptions = action.transportOptions(settings);
}

public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
transportService.sendRequest(node, action.name(), request, transportOptions,
new ActionListenerResponseHandler<>(listener, action::newResponse));
}
}
TransportActionNodeProxy 的构造器要求输入 Settings、GenericAction、TransportService 三个参数;TransportActionNodeProxy 提供了 execute 方法,它的方法参数要求输入 DiscoveryNode、Request、ActionListener,该方法主要是对 ActionListener 包装为 ActionListenerResponseHandler,然后调用 transportService.sendRequest
Nodes
TransportClientNodesService Nodes
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
final class TransportClientNodesService extends AbstractComponent implements Closeable {

private final TimeValue nodesSamplerInterval;

private final long pingTimeout;

private final ClusterName clusterName;

private final TransportService transportService;

private final ThreadPool threadPool;

private final Version minCompatibilityVersion;

// nodes that are added to be discovered
private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();

private final Object mutex = new Object();

private volatile List<DiscoveryNode> nodes = Collections.emptyList();
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();

private final NodeSampler nodesSampler;

private volatile ScheduledFuture nodesSamplerFuture;

private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

private final boolean ignoreClusterName;

private volatile boolean closed;

private final TransportClient.HostFailureListener hostFailureListener;

//……

public TransportClientNodesService addTransportAddresses(TransportAddress… transportAddresses) {
synchronized (mutex) {
if (closed) {
throw new IllegalStateException(“transport client is closed, can’t add an address”);
}
List<TransportAddress> filtered = new ArrayList<>(transportAddresses.length);
for (TransportAddress transportAddress : transportAddresses) {
boolean found = false;
for (DiscoveryNode otherNode : listedNodes) {
if (otherNode.getAddress().equals(transportAddress)) {
found = true;
logger.debug(“address [{}] already exists with [{}], ignoring…”, transportAddress, otherNode);
break;
}
}
if (!found) {
filtered.add(transportAddress);
}
}
if (filtered.isEmpty()) {
return this;
}
List<DiscoveryNode> builder = new ArrayList<>(listedNodes);
for (TransportAddress transportAddress : filtered) {
DiscoveryNode node = new DiscoveryNode(“#transport#-” + tempNodeIdGenerator.incrementAndGet(),
transportAddress, Collections.emptyMap(), Collections.emptySet(), minCompatibilityVersion);
logger.debug(“adding address [{}]”, node);
builder.add(node);
}
listedNodes = Collections.unmodifiableList(builder);
nodesSampler.sample();
}
return this;
}

public TransportClientNodesService removeTransportAddress(TransportAddress transportAddress) {
synchronized (mutex) {
if (closed) {
throw new IllegalStateException(“transport client is closed, can’t remove an address”);
}
List<DiscoveryNode> listNodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : listedNodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
listNodesBuilder.add(otherNode);
} else {
logger.debug(“removing address [{}] from listed nodes”, otherNode);
}
}
listedNodes = Collections.unmodifiableList(listNodesBuilder);
List<DiscoveryNode> nodesBuilder = new ArrayList<>();
for (DiscoveryNode otherNode : nodes) {
if (!otherNode.getAddress().equals(transportAddress)) {
nodesBuilder.add(otherNode);
} else {
logger.debug(“disconnecting from node with address [{}]”, otherNode);
transportService.disconnectFromNode(otherNode);
}
}
nodes = Collections.unmodifiableList(nodesBuilder);
nodesSampler.sample();
}
return this;
}

//……

}

TransportClientNodesService 定义了三个关于 DiscoveryNode 的 List 属性,分别是 listedNodes、nodes、filteredNodes
addTransportAddresses 方法会更新 listedNodes,然后调用 nodesSampler.sample()更新 nodes 及 filteredNodes;removeTransportAddress 方法会更新 listedNodes,nodes,然后调用 nodesSampler.sample()更新 nodes 及 filteredNodes
listedNodes 即为通过 addTransportAddresses 方法添加的 node(一般是通过配置文件指定的 clusterNodes);nodesSampler.sample()方法会对 listedNodes 进行进一步检测,比如将 clusterName 不是当前配置的 clusterName 的放到 filteredNodes,剩下的再进行连接的建立,成功的放到 nodes 里头

TransportClient Nodes
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClient.java
public abstract class TransportClient extends AbstractClient {

private final TransportClientNodesService nodesService;

private final TransportProxyClient proxy;

//……

/**
* Returns the current connected transport nodes that this client will use.
* <p>
* The nodes include all the nodes that are currently alive based on the transport
* addresses provided.
*/
public List<DiscoveryNode> connectedNodes() {
return nodesService.connectedNodes();
}

/**
* The list of filtered nodes that were not connected to, for example, due to
* mismatch in cluster name.
*/
public List<DiscoveryNode> filteredNodes() {
return nodesService.filteredNodes();
}

/**
* Returns the listed nodes in the transport client (ones added to it).
*/
public List<DiscoveryNode> listedNodes() {
return nodesService.listedNodes();
}

/**
* Adds a transport address that will be used to connect to.
* <p>
* The Node this transport address represents will be used if its possible to connect to it.
* If it is unavailable, it will be automatically connected to once it is up.
* <p>
* In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
*/
public TransportClient addTransportAddress(TransportAddress transportAddress) {
nodesService.addTransportAddresses(transportAddress);
return this;
}

/**
* Adds a list of transport addresses that will be used to connect to.
* <p>
* The Node this transport address represents will be used if its possible to connect to it.
* If it is unavailable, it will be automatically connected to once it is up.
* <p>
* In order to get the list of all the current connected nodes, please see {@link #connectedNodes()}.
*/
public TransportClient addTransportAddresses(TransportAddress… transportAddress) {
nodesService.addTransportAddresses(transportAddress);
return this;
}

/**
* Removes a transport address from the list of transport addresses that are used to connect to.
*/
public TransportClient removeTransportAddress(TransportAddress transportAddress) {
nodesService.removeTransportAddress(transportAddress);
return this;
}

//……
}
TransportClient 提供了 connectedNodes、filteredNodes、listedNodes 方法,可以看到它们内部都是调用的 TransportClientNodesService 对应的方法;从注释上可以看到,connectedNodes 返回的是当前已经建立连接的 nodes,供 client 端使用;filteredNodes 返回的是因为 clusterName 不匹配导致被过滤掉的 nodes,这些 nodes 不会被 client 使用;listedNodes 返回的是通过 addTransportAddresses 添加的 nodes
NodeSampler
ScheduledNodeSampler
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
TransportClientNodesService(Settings settings, TransportService transportService,
ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
super(settings);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();

this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);

if (logger.isDebugEnabled()) {
logger.debug(“node_sampler_interval[{}]”, nodesSamplerInterval);
}

if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
this.nodesSampler = new SniffNodesSampler();
} else {
this.nodesSampler = new SimpleNodeSampler();
}
this.hostFailureListener = hostFailureListener;
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
}

//……

class ScheduledNodeSampler implements Runnable {
@Override
public void run() {
try {
nodesSampler.sample();
if (!closed) {
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this);
}
} catch (Exception e) {
logger.warn(“failed to sample”, e);
}
}
}

//……
TransportClientNodesService 的构造器里头会根据 settings 的 client.transport.sniff 配置 (默认是 false) 来判断是创建 SniffNodesSampler 还是 SimpleNodeSampler,通过 threadPool 注册一个调度任务,每隔 nodesSamplerInterval 执行 ScheduledNodeSampler;ScheduledNodeSampler 实现了 Runnable 接口,其 fun 方法主要是调用 nodesSampler.sample(),之后只要 TransportClientNodesService 没有 close,则会继续注册调度任务,并更新 nodesSamplerFuture
NodeSampler
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
abstract class NodeSampler {
public void sample() {
synchronized (mutex) {
if (closed) {
return;
}
doSample();
}
}

protected abstract void doSample();

/**
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
* node returned in the handshake response is different than the discovery node.
*/
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext();) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace(“connecting to node [{}]”, node);
transportService.connectToNode(node);
} catch (Exception e) {
it.remove();
logger.debug(() -> new ParameterizedMessage(“failed to connect to discovered node [{}]”, node), e);
}
}
}

return Collections.unmodifiableList(new ArrayList<>(nodes));
}
}
NodeSampler 是个抽象类,它定义了 sample 方法,其内部是调用定义的抽象方法 doSample;NodeSampler 还提供了 establishNodeConnections 方法,它通过 transportService.nodeConnected(node)来判断 node 是否是 connected 的,如果不是则会通过 transportService.connectToNode(node)再尝试连接一次,如果抛异常则将该节点移除掉,最后返回这次检测是 connected 的 nodes;它有两个子类,分别是 SimpleNodeSampler、SniffNodesSampler
SimpleNodeSampler
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
class SimpleNodeSampler extends NodeSampler {

@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse read(StreamInput in) throws IOException {
LivenessResponse response = new LivenessResponse();
response.readFrom(in);
return response;
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
handler);
final LivenessResponse livenessResponse = handler.txGet();
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn(“node {} not part of the cluster {}, ignoring…”, listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
}
} catch (ConnectTransportException e) {
logger.debug(() -> new ParameterizedMessage(“failed to connect to node [{}], ignoring…”, listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
} catch (Exception e) {
logger.info(() -> new ParameterizedMessage(“failed to get node info for {}, disconnecting…”, listedNode), e);
}
}

nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
}
}
SimpleNodeSampler 的 doSample 方法会对 nodes 进行更进一步的存活检测,主要是发送 LivenessRequest,如果能成功返回 LivenessResponse,则判断 clusterName 是否一致,不一致的添加到 newFilteredNodes,最后赋值给 filteredNodes;一致的添加到 newNodes 中,最后通过 establishNodeConnections 方法建立连接并移除连接失败的 node(重试一次)最后赋值给 nodes
SniffNodesSampler
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
class SniffNodesSampler extends NodeSampler {

@Override
protected void doSample() {
// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Set<DiscoveryNode> nodesToPing = new HashSet<>();
for (DiscoveryNode node : listedNodes) {
nodesToPing.add(node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.add(node);
}

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
try {
for (final DiscoveryNode nodeToPing : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

/**
* we try to reuse existing connections but if needed we will open a temporary connection
* that will be closed at the end of the execution.
*/
Transport.Connection connectionToClose = null;

void onDone() {
try {
IOUtils.closeWhileHandlingException(connectionToClose);
} finally {
latch.countDown();
}
}

@Override
public void onFailure(Exception e) {
onDone();
if (e instanceof ConnectTransportException) {
logger.debug(() -> new ParameterizedMessage(“failed to connect to node [{}], ignoring…”, nodeToPing), e);
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} else {
logger.info(() -> new ParameterizedMessage(
“failed to get local cluster state info for {}, disconnecting…”, nodeToPing), e);
}
}

@Override
protected void doRun() throws Exception {
Transport.Connection pingConnection = null;
if (nodes.contains(nodeToPing)) {
try {
pingConnection = transportService.getConnection(nodeToPing);
} catch (NodeNotConnectedException e) {
// will use a temp connection
}
}
if (pingConnection == null) {
logger.trace(“connecting to cluster node [{}]”, nodeToPing);
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
pingConnection = connectionToClose;
}
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
onDone();
}

@Override
public void handleException(TransportException e) {
logger.info(() -> new ParameterizedMessage(
“failed to get local cluster state for {}, disconnecting…”, nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} finally {
onDone();
}
}
});
}
});
}
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
logger.warn(“node {} not part of the cluster {}, ignoring…”,
entry.getValue().getState().nodes().getLocalNode(), clusterName);
newFilteredNodes.add(entry.getKey());
continue;
}
for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
newNodes.add(cursor.value);
}
}

nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}

SniffNodesSampler 的 doSample 方法首先将 listedNodes 及 nodes 合并为名为 nodesToPing 的 Set,之后就挨个将 nodesToPing 的 node 放入到线程池异步执行检测,这里通过 CountDownLatch 来等待所有节点异步执行完毕
异步线程池检测的逻辑是对 node 发送 Requests.clusterStateRequest().clear().nodes(true).local(true)请求,如果成功则返回 ClusterStateResponse,并添加到 clusterStateResponses 这个 ConcurrentMap 中
之后遍历 clusterStateResponses 这个 ConcurrentMap,clusterName 不一致的 node 添加到 newFilteredNodes,最后赋值给 filteredNodes;clusterName 一致的则遍历 ClusterStateResponse.getState().nodes().getDataNodes().values(),将这些 node 添加到 newNodes,最后通过 establishNodeConnections 方法建立连接并移除连接失败的 node(重试一次)最后赋值给 nodes

TransportClientNodesService.execute
elasticsearch-6.4.3-sources.jar!/org/elasticsearch/client/transport/TransportClientNodesService.java
final class TransportClientNodesService extends AbstractComponent implements Closeable {

private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());

//……

public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
// we first read nodes before checking the closed state; this
// is because otherwise we could be subject to a race where we
// read the state as not being closed, and then the client is
// closed and the nodes list is cleared, and then a
// NoNodeAvailableException is thrown
// it is important that the order of first setting the state of
// closed and then clearing the list of nodes is maintained in
// the close method
final List<DiscoveryNode> nodes = this.nodes;
if (closed) {
throw new IllegalStateException(“transport client is closed”);
}
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
DiscoveryNode node = retryListener.getNode(0);
try {
callback.doWithNode(node, retryListener);
} catch (Exception e) {
try {
//this exception can’t come from the TransportService as it doesn’t throw exception at all
listener.onFailure(e);
} finally {
retryListener.maybeNodeFailed(node, e);
}
}
}

private void ensureNodesAreAvailable(List<DiscoveryNode> nodes) {
if (nodes.isEmpty()) {
String message = String.format(Locale.ROOT, “None of the configured nodes are available: %s”, this.listedNodes);
throw new NoNodeAvailableException(message);
}
}

private int getNodeNumber() {
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
return index;
}

public static class RetryListener<Response> implements ActionListener<Response> {
private final NodeListenerCallback<Response> callback;
private final ActionListener<Response> listener;
private final List<DiscoveryNode> nodes;
private final int index;
private final TransportClient.HostFailureListener hostFailureListener;

private volatile int i;

RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
this.index = index;
this.hostFailureListener = hostFailureListener;
}

@Override
public void onResponse(Response response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
Throwable throwable = ExceptionsHelper.unwrapCause(e);
if (throwable instanceof ConnectTransportException) {
maybeNodeFailed(getNode(this.i), (ConnectTransportException) throwable);
int i = ++this.i;
if (i >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException(“None of the configured nodes were available: ” + nodes, e));
} else {
try {
callback.doWithNode(getNode(i), this);
} catch(final Exception inner) {
inner.addSuppressed(e);
// this exception can’t come from the TransportService as it doesn’t throw exceptions at all
listener.onFailure(inner);
}
}
} else {
listener.onFailure(e);
}
}

final DiscoveryNode getNode(int i) {
return nodes.get((index + i) % nodes.size());
}

final void maybeNodeFailed(DiscoveryNode node, Exception ex) {
if (ex instanceof NodeDisconnectedException || ex instanceof NodeNotConnectedException) {
hostFailureListener.onNodeDisconnected(node, ex);
}
}
}

//……
}

TransportClientNodesService 提供的 execute 方法主要是做了两个事情,一个是对 nodes 节点进行客户端的负载均衡,一个是通过 RetryListener 对请求增加重试机制
ensureNodesAreAvailable 方法首先确保 nodes 这个列表不为空,如果为空则抛出 NoNodeAvailableException;之后通过 getNodeNumber 方法来确定 index 值,该方法使用 randomNodeGenerator 递增得到 index,如果 index 大于等于 0 则返回,如果 index 小于 0 则重置 randomNodeGenerator 的值为 0 并返回 0;这里 randomNodeGenerator 是 AtomicInteger 类型,其初始值为 Randomness.get().nextInt()
RetryListener 的构造器接收上一步计算出来的 index 值,它有一个 i 变量,初始为 0,在 onFailure 的时候,如果是 ConnectTransportException 异常,则会进行重试,重试的时候首先将 i 递增,之后判断如果 i >=nodes 大小则停止重试,抛出 NoNodeAvailableException,否则继续调用 callback.doWithNode 进行重试,重试时是通过 getNode 方法获取 node,同时传入当前的 listener;getNode 方法采取的是 (index + i) % nodes.size() 来获取 node 的 index,形成 Round Robin 的效果;对于 RetryListener 来说,内部重试时 i 会递增,对于 execute 方法来说,index 值也是递增的,因而无论请求成功还是失败,对 nodes 的方法都形成 Round Robin 的效果

小结

TransportProxyClient 主要是提供了 execute 方法,该方法从 proxies 取出对应的 TransportActionNodeProxy,然后通过 TransportClientNodesService 的 execute 方法来执行 proxy.execute 方法;TransportActionNodeProxy 提供了 execute 方法,它的方法参数要求输入 DiscoveryNode、Request、ActionListener,该方法主要是对 ActionListener 包装为 ActionListenerResponseHandler,然后调用 transportService.sendRequest
TransportClientNodesService 定义了三个关于 DiscoveryNode 的 List 属性,分别是 listedNodes、nodes、filteredNodes;其中 listedNodes 是通过 addTransportAddresses 添加的 nodes;nodes 是当前已经建立连接的 node 列表,供 client 端使用;filteredNodes 是因为 clusterName 不匹配导致被过滤掉的 nodes,这些 nodes 不会被 client 使用
TransportClientNodesService 的构造器里头会根据 settings 的 client.transport.sniff 配置 (默认是 false) 来判断是创建 SniffNodesSampler 还是 SimpleNodeSampler,通过 threadPool 注册一个调度任务,每隔 nodesSamplerInterval 执行 ScheduledNodeSampler;ScheduledNodeSampler 实现了 Runnable 接口,其 fun 方法主要是调用 nodesSampler.sample(),之后只要 TransportClientNodesService 没有 close,则会继续注册调度任务,并更新 nodesSamplerFuture
NodeSampler 是个抽象类,它定义了 sample 方法,其内部是调用定义的抽象方法 doSample;NodeSampler 还提供了 establishNodeConnections 方法,它通过 transportService.nodeConnected(node)来判断 node 是否是 connected 的,如果不是则会通过 transportService.connectToNode(node)再尝试连接一次,如果抛异常则将该节点移除掉,最后返回这次检测是 connected 的 nodes;它有两个子类,分别是 SimpleNodeSampler、SniffNodesSampler
SimpleNodeSampler 的 doSample 方法会对 nodes 进行更进一步的存活检测,主要是发送 LivenessRequest,如果能成功返回 LivenessResponse,则判断 clusterName 是否一致,不一致的添加到 newFilteredNodes,最后赋值给 filteredNodes;一致的添加到 newNodes 中,最后通过 establishNodeConnections 方法建立连接并移除连接失败的 node(重试一次)最后赋值给 nodes
SniffNodesSampler 的 doSample 方法首先将 listedNodes 及 nodes 合并为名为 nodesToPing 的 Set,之后就挨个将 nodesToPing 的 node 放入到线程池异步执行检测,这里通过 CountDownLatch 来等待所有节点异步执行完毕;异步线程池检测的逻辑是对 node 发送 Requests.clusterStateRequest().clear().nodes(true).local(true)请求,如果成功则返回 ClusterStateResponse,并添加到 clusterStateResponses 这个 ConcurrentMap 中;之后遍历 clusterStateResponses 这个 ConcurrentMap,clusterName 不一致的 node 添加到 newFilteredNodes,最后赋值给 filteredNodes;clusterName 一致的则遍历 ClusterStateResponse.getState().nodes().getDataNodes().values(),将这些 node 添加到 newNodes,最后通过 establishNodeConnections 方法建立连接并移除连接失败的 node(重试一次)最后赋值给 nodes
TransportClientNodesService 提供的 execute 方法主要是做了两个事情,一个是对 nodes 节点进行客户端的负载均衡,一个是通过 RetryListener 对请求增加重试机制;其对 nodes 的负载均衡策略为 Round Robin,而 RetryListener 只对 ConnectTransportException 异常进行重试,最大重试次数为 nodes.size()-1

doc

no node available elasticsearch
Elasticsearch 之 client 源码简要分析
elasticsearch 源代码分析之客户端负载均衡

退出移动版