序
本文主要研究一下 storm nimbus 的 LeaderElector
Nimbus
org/apache/storm/daemon/nimbus/Nimbus.java
public static void main(String[] args) throws Exception {
Utils.setupDefaultUncaughtExceptionHandler();
launch(new StandaloneINimbus());
}
public static Nimbus launch(INimbus inimbus) throws Exception {
Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),
ConfigUtils.readYamlConfig(“storm-cluster-auth.yaml”, false));
boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
if (checkAcl) {
AclEnforcement.verifyAcls(conf, fixupAcl);
}
return launchServer(conf, inimbus);
}
private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
StormCommon.validateDistributedMode(conf);
validatePortAvailable(conf);
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);
nimbus.launchServer();
final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
metricsRegistry.startMetricsReporters(conf);
Utils.addShutdownHookWithDelayedForceKill(() -> {
metricsRegistry.stopMetricsReporters();
nimbus.shutdown();
server.stop();
}, 10);
if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {
nimbus.initWorkerTokenManager();
}
LOG.info(“Starting nimbus server for storm version ‘{}'”, STORM_VERSION);
server.serve();
return nimbus;
}
public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
StormMetricsRegistry metricsRegistry)
throws Exception {
//……
if (blobStore == null) {
blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
}
this.blobStore = blobStore;
if (topoCache == null) {
topoCache = new TopoCache(blobStore, conf);
}
if (leaderElector == null) {
leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
metricsRegistry);
}
this.leaderElector = leaderElector;
this.blobStore.setLeaderElector(this.leaderElector);
//……
}
public void launchServer() throws Exception {
try {
BlobStore store = blobStore;
IStormClusterState state = stormClusterState;
NimbusInfo hpi = nimbusHostPortInfo;
LOG.info(“Starting Nimbus with conf {}”, ConfigUtils.maskPasswords(conf));
validator.prepare(conf);
//add to nimbuses
state.addNimbusHost(hpi.getHost(),
new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
leaderElector.addToLeaderLockQueue();
this.blobStore.startSyncBlobs();
for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
exec.prepare();
}
if (isLeader()) {
for (String topoId : state.activeStorms()) {
transition(topoId, TopologyActions.STARTUP, null);
}
clusterMetricSet.setActive(true);
}
//……
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
throw e;
}
if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
throw e;
}
LOG.error(“Error on initialization of nimbus”, e);
Utils.exitProcess(13, “Error on initialization of nimbus”);
}
}
Nimbus 在构造器里头调用 Zookeeper.zkLeaderElector 创建 leaderElector
launchServer 方法调用了 leaderElector.addToLeaderLockQueue() 参与 leader 选举
Zookeeper.zkLeaderElector
storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java
public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
return _instance.zkLeaderElectorImpl(conf, blobStore);
}
protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
CuratorFramework zk = mkClientImpl(conf, servers, port, “”, conf);
String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + “/leader-lock”;
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
leaderLatchListenerAtomicReference, blobStore);
}
这里使用 /leader-lock 路径创建了 LeaderLatch,然后使用 leaderLatchListenerImpl 创建了 LeaderLatchListener
最后使用 LeaderElectorImp 创建 ILeaderElector
leaderLatchListenerImpl
storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java
// Leader latch listener that will be invoked when we either gain or lose leadership
public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
return new LeaderLatchListener() {
final String STORM_JAR_SUFFIX = “-stormjar.jar”;
final String STORM_CODE_SUFFIX = “-stormcode.ser”;
final String STORM_CONF_SUFFIX = “-stormconf.ser”;
@Override
public void isLeader() {
Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);
// this finds all active topologies blob keys from all local topology blob keys
Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
LOG.info(“active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]”,
generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
generateJoinedString(diffTopology));
if (diffTopology.isEmpty()) {
Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);
// this finds all dependency blob keys from active topologies from all local blob keys
Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
LOG.info(“active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]”,
generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
generateJoinedString(diffDependencies));
if (diffDependencies.isEmpty()) {
LOG.info(“Accepting leadership, all active topologies and corresponding dependencies found locally.”);
} else {
LOG.info(“Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.”);
closeLatch();
}
} else {
LOG.info(“code for all active topologies not available locally, giving up leadership.”);
closeLatch();
}
}
@Override
public void notLeader() {
LOG.info(“{} lost leadership.”, hostName);
}
//……
private void closeLatch() {
try {
leaderLatch.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
leaderLatchListenerImpl 返回一个 LeaderLatchListener 接口的实现类
isLeader 接口里头做了一些校验,即当被 zookeeper 选中为 leader 的时候,如果本地没有所有的 active topologies 或者本地没有所有 dependencies,那么就需要调用 leaderLatch.close() 放弃 leadership
notLeader 接口主要打印一下 log
LeaderElectorImp
org/apache/storm/zookeeper/LeaderElectorImp.java
public class LeaderElectorImp implements ILeaderElector {
private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
private final Map<String, Object> conf;
private final List<String> servers;
private final CuratorFramework zk;
private final String leaderlockPath;
private final String id;
private final AtomicReference<LeaderLatch> leaderLatch;
private final AtomicReference<LeaderLatchListener> leaderLatchListener;
private final BlobStore blobStore;
private final TopoCache tc;
private final IStormClusterState clusterState;
private final List<ACL> acls;
private final StormMetricsRegistry metricsRegistry;
public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
StormMetricsRegistry metricsRegistry) {
this.conf = conf;
this.servers = servers;
this.zk = zk;
this.leaderlockPath = leaderlockPath;
this.id = id;
this.leaderLatch = leaderLatch;
this.leaderLatchListener = leaderLatchListener;
this.blobStore = blobStore;
this.tc = tc;
this.clusterState = clusterState;
this.acls = acls;
this.metricsRegistry = metricsRegistry;
}
@Override
public void prepare(Map<String, Object> conf) {
// no-op for zookeeper implementation
}
@Override
public void addToLeaderLockQueue() throws Exception {
// if this latch is already closed, we need to create new instance.
if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
metricsRegistry);
leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
LOG.info(“LeaderLatch was in closed state. Resetted the leaderLatch and listeners.”);
}
// Only if the latch is not already started we invoke start
if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
leaderLatch.get().addListener(leaderLatchListener.get());
leaderLatch.get().start();
LOG.info(“Queued up for leader lock.”);
} else {
LOG.info(“Node already in queue for leader lock.”);
}
}
@Override
// Only started latches can be closed.
public void removeFromLeaderLockQueue() throws Exception {
if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
leaderLatch.get().close();
LOG.info(“Removed from leader lock queue.”);
} else {
LOG.info(“leader latch is not started so no removeFromLeaderLockQueue needed.”);
}
}
@Override
public boolean isLeader() throws Exception {
return leaderLatch.get().hasLeadership();
}
@Override
public NimbusInfo getLeader() {
try {
return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
@Override
public List<NimbusInfo> getAllNimbuses() throws Exception {
List<NimbusInfo> nimbusInfos = new ArrayList<>();
Collection<Participant> participants = leaderLatch.get().getParticipants();
for (Participant participant : participants) {
nimbusInfos.add(Zookeeper.toNimbusInfo(participant));
}
return nimbusInfos;
}
@Override
public void close() {
//Do nothing now.
}
}
LeaderElectorImp 实现了 ILeaderElector 接口
addToLeaderLockQueue 方法检测如果 latch 已经 closed,则重新创建一个新的,然后检测 latch 的状态,如果还没有 start 的话,则调用 start 参与选举
之所以对 closed 状态的 latch 创建一个,主要有两个原因:一是对已经 closed 的 latch 进行方法调用会抛异常,二是被 zk 选举为 leader,但是不满意 storm 的一些 leader 条件会放弃 leadership 即 close 掉
小结
storm nimbus 的 LeaderElector 主要是基于 zookeeper recipies 的 LeaderLatch 来实现
storm nimbus 自定义了 LeaderLatchListener,对成为 leader 之后的 nimbus 进行校验,需要本地拥有所有的 active topologies 以及所有 dependencies,否则放弃 leadership
doc
Highly Available Nimbus Design