序
本文主要研究一下 elasticsearch 的 ZenPing
ZenPing
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
public interface ZenPing extends Releasable {void start();
void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);
class PingResponse implements Writeable {//......}
class PingCollection {//......}
}
- ZenPing 接口继承了 Releasable 接口,另外它还定义了 start、ping 方法;除此之外还定义了 PingResponse、PingCollection 这两个类
PingResponse
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
class PingResponse implements Writeable {
/**
* An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping
* responses don't override real ones.
*/
public static long FAKE_PING_ID = -1;
private static final AtomicLong idGenerator = new AtomicLong();
// an always increasing unique identifier for this ping response.
// lower values means older pings.
private final long id;
private final ClusterName clusterName;
private final DiscoveryNode node;
private final DiscoveryNode master;
private final long clusterStateVersion;
/**
* @param node the node which this ping describes
* @param master the current master of the node
* @param clusterName the cluster name of the node
* @param clusterStateVersion the current cluster state version of that node
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);
}
/**
* @param id the ping's ID
* @param node the node which this ping describes
* @param master the current master of the node
* @param clusterName the cluster name of the node
* @param clusterStateVersion the current cluster state version of that node
* ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
*/
public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
this.id = id;
this.node = node;
this.master = master;
this.clusterName = clusterName;
this.clusterStateVersion = clusterStateVersion;
}
public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) {this(node, master, state.getClusterName(),
state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ?
ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
}
PingResponse(StreamInput in) throws IOException {this.clusterName = new ClusterName(in);
this.node = new DiscoveryNode(in);
this.master = in.readOptionalWriteable(DiscoveryNode::new);
this.clusterStateVersion = in.readLong();
this.id = in.readLong();}
@Override
public void writeTo(StreamOutput out) throws IOException {clusterName.writeTo(out);
node.writeTo(out);
out.writeOptionalWriteable(master);
out.writeLong(clusterStateVersion);
out.writeLong(id);
}
/**
* an always increasing unique identifier for this ping response.
* lower values means older pings.
*/
public long id() {return this.id;}
/**
* the name of the cluster this node belongs to
*/
public ClusterName clusterName() {return this.clusterName;}
/** the node which this ping describes */
public DiscoveryNode node() {return node;}
/** the current master of the node */
public DiscoveryNode master() {return master;}
/**
* the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION}
* for not recovered) */
public long getClusterStateVersion() {return clusterStateVersion;}
@Override
public String toString() {return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +
"cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}";
}
}
- PingResponse 实现了 Writeable 接口,其 writeTo 方法会依次写入 clusterName、node、master、clusterStateVersion、id
PingCollection
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java
class PingCollection {
Map<DiscoveryNode, PingResponse> pings;
public PingCollection() {pings = new HashMap<>();
}
/**
* adds a ping if newer than previous pings from the same node
*
* @return true if added, false o.w.
*/
public synchronized boolean addPing(PingResponse ping) {PingResponse existingResponse = pings.get(ping.node());
// in case both existing and new ping have the same id (probably because they come
// from nodes from version <1.4.0) we prefer to use the last added one.
if (existingResponse == null || existingResponse.id() <= ping.id()) {pings.put(ping.node(), ping);
return true;
}
return false;
}
/** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
public synchronized List<PingResponse> toList() {return new ArrayList<>(pings.values());
}
/** the number of nodes for which there are known pings */
public synchronized int size() {return pings.size();
}
}
- PingCollection 定义了 DiscoveryNode 与 PingResponse 的 map,并提供了 addPing 方法来添加 pingResponse
UnicastZenPing
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
public class UnicastZenPing implements ZenPing {private static final Logger logger = LogManager.getLogger(UnicastZenPing.class);
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final PingContextProvider contextProvider;
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds responses from other nodes)
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
private final SeedHostsProvider hostsProvider;
protected final EsThreadPoolExecutor unicastZenPingExecutorService;
private final TimeValue resolveTimeout;
private final String nodeName;
private volatile boolean closed = false;
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
SeedHostsProvider seedHostsProvider, PingContextProvider contextProvider) {
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.hostsProvider = seedHostsProvider;
this.contextProvider = contextProvider;
final int concurrentConnects = SeedHostsResolver.getMaxConcurrentResolvers(settings);
resolveTimeout = SeedHostsResolver.getResolveTimeout(settings);
nodeName = Node.NODE_NAME_SETTING.get(settings);
logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]",
concurrentConnects,
resolveTimeout);
transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,
new UnicastPingRequestHandler());
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastZenPingExecutorService = EsExecutors.newScaling(
nodeName + "/" + "unicast_connect",
0,
concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());
}
private SeedHostsProvider.HostsResolver createHostsResolver() {return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
limitPortCounts, transportService, resolveTimeout);
}
@Override
public void close() {ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
Releasables.close(activePingingRounds.values());
closed = true;
}
@Override
public void start() {}
/**
* Clears the list of cached ping responses.
*/
public void clearTemporalResponses() {temporalResponses.clear();
}
/**
* Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving
* configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch
* of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.
* The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and
* handshake is performed, with a connection timeout of the specified {@link TimeValue}.
*
* @param resultsConsumer the callback when pinging is complete
* @param duration the timeout for various components of the pings
*/
@Override
public void ping(final Consumer<PingCollection> resultsConsumer, final TimeValue duration) {ping(resultsConsumer, duration, duration);
}
protected void ping(final Consumer<PingCollection> resultsConsumer,
final TimeValue scheduleDuration,
final TimeValue requestDuration) {final List<TransportAddress> seedAddresses = new ArrayList<>();
seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {seedAddresses.add(masterNode.value.getAddress());
}
final ConnectionProfile connectionProfile =
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
final AbstractRunnable pingSender = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {if (e instanceof AlreadyClosedException == false) {logger.warn("unexpected error while pinging", e);
}
}
@Override
protected void doRun() throws Exception {sendPings(requestDuration, pingingRound);
}
};
threadPool.generic().execute(pingSender);
threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
threadPool.schedule(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {finishPingingRound(pingingRound);
}
@Override
public void onFailure(Exception e) {logger.warn("unexpected error while finishing pinging round", e);
}
}, scheduleDuration, ThreadPool.Names.GENERIC);
}
protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {final ClusterState lastState = contextProvider.clusterState();
final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));
List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {assert clusterName.equals(pingResponse.clusterName()) :
"got a ping request from a different cluster. expected" + clusterName + "got" + pingResponse.clusterName();
return pingResponse.node().getAddress();
}).collect(Collectors.toList());
final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),
temporalAddresses.stream()).distinct();
// resolve what we can via the latest cluster state
final Set<DiscoveryNode> nodesToPing = uniqueAddresses
.map(address -> {DiscoveryNode foundNode = lastState.nodes().findByAddress(address);
if (foundNode != null && transportService.nodeConnected(foundNode)) {return foundNode;} else {
return new DiscoveryNode(address.toString(),
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
}
}).collect(Collectors.toSet());
nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest));
}
private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound,
final UnicastPingRequest pingRequest) {submitToExecutor(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
Connection connection = null;
if (transportService.nodeConnected(node)) {
try {
// concurrency can still cause disconnects
connection = transportService.getConnection(node);
} catch (NodeNotConnectedException e) {logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node);
}
}
if (connection == null) {connection = pingingRound.getOrConnect(node);
}
logger.trace("[{}] sending to {}", pingingRound.id(), node);
transportService.sendRequest(connection, ACTION_NAME, pingRequest,
TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(),
getPingResponseHandler(pingingRound, node));
}
@Override
public void onFailure(Exception e) {if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {
// can't connect to the node - this is more common path!
logger.trace(() -> new ParameterizedMessage("[{}] failed to ping {}", pingingRound.id(), node), e);
} else if (e instanceof RemoteTransportException) {
// something went wrong on the other side
logger.debug(() -> new ParameterizedMessage("[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e);
} else {logger.warn(() -> new ParameterizedMessage("[{}] failed send ping to {}", pingingRound.id(), node), e);
}
}
@Override
public void onRejection(Exception e) {
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
logger.debug("Ping execution rejected", e);
}
});
}
//......
}
- UnicastZenPing 实现了 ZenPing 接口,它创建了 unicastZenPingExecutorService 线程池,同时维护了一个 PingResponse 的 queue 以及 activePingingRounds
- ping 方法主要是异步及调度执行 pingSender,而其 doRun 方法执行的是 sendPings 方法;sendPings 方法构建 UnicastPingRequest,对 pingingRound 中的 seedAddresses 挨个执行 sendPingRequestToNode 方法
- sendPingRequestToNode 向线程池提交 AbstractRunnable,其 doRun 方法主要是使用 transportService.sendRequest 发送 pingRequest
PingingRound
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
protected class PingingRound implements Releasable {
private final int id;
private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
private final PingCollection pingCollection;
private final List<TransportAddress> seedAddresses;
private final Consumer<PingCollection> pingListener;
private final DiscoveryNode localNode;
private final ConnectionProfile connectionProfile;
private AtomicBoolean closed = new AtomicBoolean(false);
PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
ConnectionProfile connectionProfile) {
this.id = id;
this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));
this.pingListener = resultsConsumer;
this.localNode = localNode;
this.connectionProfile = connectionProfile;
this.pingCollection = new PingCollection();}
public int id() {return this.id;}
public boolean isClosed() {return this.closed.get();
}
public List<TransportAddress> getSeedAddresses() {ensureOpen();
return seedAddresses;
}
public Connection getOrConnect(DiscoveryNode node) throws IOException {
Connection result;
try (Releasable ignore = connectionLock.acquire(node.getAddress())) {result = tempConnections.get(node.getAddress());
if (result == null) {ensureOpen();
boolean success = false;
logger.trace("[{}] opening connection to [{}]", id(), node);
result = transportService.openConnection(node, connectionProfile);
try {transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis());
synchronized (this) {
// acquire lock and check if closed, to prevent leaving an open connection after closing
ensureOpen();
Connection existing = tempConnections.put(node.getAddress(), result);
assert existing == null;
success = true;
}
} finally {if (success == false) {logger.trace("[{}] closing connection to [{}] due to failure", id(), node);
IOUtils.closeWhileHandlingException(result);
}
}
}
}
return result;
}
private void ensureOpen() {if (isClosed()) {throw new AlreadyClosedException("pinging round [" + id + "] is finished");
}
}
public void addPingResponseToCollection(PingResponse pingResponse) {if (localNode.equals(pingResponse.node()) == false) {pingCollection.addPing(pingResponse);
}
}
@Override
public void close() {
List<Connection> toClose = null;
synchronized (this) {if (closed.compareAndSet(false, true)) {activePingingRounds.remove(id);
toClose = new ArrayList<>(tempConnections.values());
tempConnections.clear();}
}
if (toClose != null) {
// we actually closed
try {pingListener.accept(pingCollection);
} finally {IOUtils.closeWhileHandlingException(toClose);
}
}
}
public ConnectionProfile getConnectionProfile() {return connectionProfile;}
}
- PingingRound 提供了 getOrConnect 方法,来获取或创建一个 discoveryNode 的 Connection,主要是调用 transportService.openConnection 方法并执行 transportService.handshake
UnicastPingRequest
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
public static class UnicastPingRequest extends TransportRequest {
public final int id;
public final TimeValue timeout;
public final PingResponse pingResponse;
public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
this.id = id;
this.timeout = timeout;
this.pingResponse = pingResponse;
}
public UnicastPingRequest(StreamInput in) throws IOException {super(in);
id = in.readInt();
timeout = in.readTimeValue();
pingResponse = new PingResponse(in);
}
@Override
public void readFrom(StreamInput in) throws IOException {throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
out.writeInt(id);
out.writeTimeValue(timeout);
pingResponse.writeTo(out);
}
}
- UnicastPingRequest 继承了 TransportRequest,其 writeTo 方法,写入 id、timeout、pingResponse
UnicastPingResponse
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
public static class UnicastPingResponse extends TransportResponse {
final int id;
public final PingResponse[] pingResponses;
public UnicastPingResponse(int id, PingResponse[] pingResponses) {
this.id = id;
this.pingResponses = pingResponses;
}
public UnicastPingResponse(StreamInput in) throws IOException {id = in.readInt();
pingResponses = new PingResponse[in.readVInt()];
for (int i = 0; i < pingResponses.length; i++) {pingResponses[i] = new PingResponse(in);
}
}
@Override
public void readFrom(StreamInput in) throws IOException {throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {super.writeTo(out);
out.writeInt(id);
out.writeVInt(pingResponses.length);
for (PingResponse pingResponse : pingResponses) {pingResponse.writeTo(out);
}
}
}
- UnicastPingResponse 继承了 TransportResponse,其 writeTo 写入 id、pingResponses.length 及 pingResponses
UnicastPingRequestHandler
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {
@Override
public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {if (closed) {throw new AlreadyClosedException("node is shutting down");
}
if (request.pingResponse.clusterName().equals(clusterName)) {channel.sendResponse(handlePingRequest(request));
} else {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"mismatched cluster names; request: [%s], local: [%s]",
request.pingResponse.clusterName().value(),
clusterName.value()));
}
}
}
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {assert clusterName.equals(request.pingResponse.clusterName()) :
"got a ping request from a different cluster. expected" + clusterName + "got" + request.pingResponse.clusterName();
temporalResponses.add(request.pingResponse);
// add to any ongoing pinging
activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
threadPool.schedule(() -> temporalResponses.remove(request.pingResponse),
TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME);
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
pingResponses.add(createPingResponse(contextProvider.clusterState()));
return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));
}
- UnicastPingRequestHandler 继承了 TransportRequestHandler,其 messageReceived 方法主要是调用 handlePingRequest 方法并返回结果;handlePingRequest 方法主要是将使用 pingRound 的 addPingResponseToCollection 添加 request.pingResponse,同事注册一个 timeout 任务执行 temporalResponses.remove(request.pingResponse);最后创建 UnicastPingResponse 返回
getPingResponseHandler
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound,
final DiscoveryNode node) {return new TransportResponseHandler<UnicastPingResponse>() {
@Override
public UnicastPingResponse read(StreamInput in) throws IOException {return new UnicastPingResponse(in);
}
@Override
public String executor() {return ThreadPool.Names.SAME;}
@Override
public void handleResponse(UnicastPingResponse response) {logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses));
if (pingingRound.isClosed()) {if (logger.isTraceEnabled()) {logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node);
}
} else {Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);
}
}
@Override
public void handleException(TransportException exp) {if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException ||
exp.getCause() instanceof AlreadyClosedException) {
// ok, not connected...
logger.trace(() -> new ParameterizedMessage("failed to connect to {}", node), exp);
} else if (closed == false) {logger.warn(() -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
}
}
};
}
- getPingResponseHandler 方法创建了匿名的 TransportResponseHandler,用于处理 UnicastPingResponse;其 handleResponse 方法执行的是 pingingRound.addPingResponseToCollection 方法
小结
- ZenPing 接口继承了 Releasable 接口,另外它还定义了 start、ping 方法;除此之外还定义了 PingResponse、PingCollection 这两个类
- UnicastZenPing 实现了 ZenPing 接口,它创建了 unicastZenPingExecutorService 线程池,同时维护了一个 PingResponse 的 queue 以及 activePingingRounds
- ping 方法主要是异步及调度执行 pingSender,而其 doRun 方法执行的是 sendPings 方法;sendPings 方法构建 UnicastPingRequest,对 pingingRound 中的 seedAddresses 挨个执行 sendPingRequestToNode 方法;sendPingRequestToNode 向线程池提交 AbstractRunnable,其 doRun 方法主要是使用 transportService.sendRequest 发送 pingRequest
doc
- ZenPing