聊聊storm worker的executor与task

29次阅读

共计 18753 个字符,预计需要花费 47 分钟才能阅读完成。


本文主要研究一下 storm worker 的 executor 与 task
Worker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
public static void main(String[] args) throws Exception {
Preconditions.checkArgument(args.length == 5, “Illegal number of arguments. Expected: 5, Actual: ” + args.length);
String stormId = args[0];
String assignmentId = args[1];
String supervisorPort = args[2];
String portStr = args[3];
String workerId = args[4];
Map<String, Object> conf = ConfigUtils.readStormConfig();
Utils.setupDefaultUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
Integer.parseInt(portStr), workerId);
worker.start();
Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
}
main 方法创建 Worker,然后调用 start
Worker.start
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
public void start() throws Exception {
LOG.info(“Launching worker for {} on {}:{} with id {} and conf {}”, topologyId, assignmentId, port, workerId,
ConfigUtils.maskPasswords(conf));
// because in local mode, its not a separate
// process. supervisor will register it in this case
// if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
if (!ConfigUtils.isLocalMode(conf)) {
// Distributed mode
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
String pid = Utils.processPid();
FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
Charset.forName(“UTF-8”));
}
final Map<String, Object> topologyConf =
ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);

StormMetricRegistry.start(conf, DaemonType.WORKER);

Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
Map<String, String> initCreds = new HashMap<>();
if (initialCredentials != null) {
initCreds.putAll(initialCredentials.get_creds());
}
autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf);
subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);

Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
() -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
);

}
这里主要是调用 loadWorker
Worker.loadWorker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
private AtomicReference<List<IRunningExecutor>> executorsAtom;

private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
topologyConf, stateStorage, stormClusterState, autoCreds);

// Heartbeat here so that worker process dies if this fails
// it’s important that worker heartbeat to supervisor ASAP so that supervisor knows
// that worker is running and moves on
doHeartBeat();

executorsAtom = new AtomicReference<>(null);

// launch heartbeat threads immediately so that slow-loading tasks don’t cause the worker to timeout
// to the supervisor
workerState.heartbeatTimer
.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
try {
doHeartBeat();
} catch (IOException e) {
throw new RuntimeException(e);
}
});

workerState.executorHeartbeatTimer
.scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);

workerState.registerCallbacks();

workerState.refreshConnections(null);

workerState.activateWorkerWhenAllConnectionsReady();

workerState.refreshStormActive(null);

workerState.runWorkerStartHooks();

List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
if (ConfigUtils.isLocalMode(topologyConf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
} else {
Executor executor = Executor.mkExecutor(workerState, e, initCreds);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
execs.add(executor);
}
}

List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
for (Executor executor : execs) {
newExecutors.add(executor.execute());
}
executorsAtom.set(newExecutors);

//……

setupFlushTupleTimer(topologyConf, newExecutors);
setupBackPressureCheckTimer(topologyConf);

LOG.info(“Worker has topology config {}”, ConfigUtils.maskPasswords(topologyConf));
LOG.info(“Worker {} for storm {} on {}:{} has finished loading”, workerId, topologyId, assignmentId, port);
return this;
}

这里通过 workerState.getLocalExecutors() 获取 List<Long> executorId 的集合
然后通过 Executor.mkExecutor 创建指定数量的 Executor,然后调用 execute() 方法转换为 ExecutorShutdown,然后保存到 AtomicReference<List<IRunningExecutor>> executorsAtom

WorkerState.getLocalExecutors
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
// local executors and localTaskIds running in this worker
final Set<List<Long>> localExecutors;

public Set<List<Long>> getLocalExecutors() {
return localExecutors;
}

public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException,
InvalidTopologyException {
this.autoCredentials = autoCredentials;
this.conf = conf;
this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
//……
}

private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
int port) {
LOG.info(“Reading assignments”);
List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
NodeInfo nodeInfo = entry.getValue();
if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
executorsAssignedToThisWorker.add(entry.getKey());
}
}
return executorsAssignedToThisWorker;
}

private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
if (!ConfigUtils.isLocalMode(conf)) {
try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
supervisorPort)) {
Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
return assignment;
} catch (Throwable tr1) {
//if any error/exception thrown, fetch it from zookeeper
return stormClusterState.remoteAssignmentInfo(topologyId, null);
}
} else {
return stormClusterState.remoteAssignmentInfo(topologyId, null);
}
}

WorkerState 在构造器里头通过 readWorkerExecutors 获取在本 worker 运行的 executorIds
通过 getLocalAssignment 方法获取 Assignment,然后通过 get_executor_node_port 方法获取 Map<List<Long>, NodeInfo> executorToNodePort
getLocalAssignment 通过 supervisorClient.getClient().getLocalAssignmentForStorm(topologyId) 获取 Assignment,如果出现异常则通过 stormClusterState.remoteAssignmentInfo 从 zookeeper 获取

StormClusterStateImpl.remoteAssignmentInfo
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
public Assignment remoteAssignmentInfo(String stormId, Runnable callback) {
if (callback != null) {
assignmentInfoCallback.put(stormId, callback);
}
byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
}

根据 topologyId 从 ClusterUtils.assignmentPath 获取路径,然后去 zookeeper 获取数据
数据采用 thrift 序列化,取回来需要反序列化

ClusterUtils.assignmentPath
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
public static final String ZK_SEPERATOR = “/”;

public static final String ASSIGNMENTS_ROOT = “assignments”;

public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;

public static String assignmentPath(String id) {
return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
}
路径为 /assignments/{topology},比如 /assignments/DemoTopology-1-1539163962
Executor.mkExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
Executor executor;

WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
String componentId = workerTopologyContext.getComponentId(taskIds.get(0));

String type = getExecutorType(workerTopologyContext, componentId);
if (ClientStatsUtil.SPOUT.equals(type)) {
executor = new SpoutExecutor(workerState, executorId, credentials);
} else {
executor = new BoltExecutor(workerState, executorId, credentials);
}

int minId = Integer.MAX_VALUE;
Map<Integer, Task> idToTask = new HashMap<>();
for (Integer taskId : taskIds) {
minId = Math.min(minId, taskId);
try {
Task task = new Task(executor, taskId);
idToTask.put(taskId, task);
} catch (IOException ex) {
throw Utils.wrapInRuntime(ex);
}
}

executor.idToTaskBase = minId;
executor.idToTask = Utils.convertToArray(idToTask, minId);
return executor;
}

根据组件类型创建 SpoutExecutor 或者 BoltExecutor
然后创建 tasks 并绑定到 executor

Executor.execute
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
/**
* separated from mkExecutor in order to replace executor transfer in executor data for testing.
*/
public ExecutorShutdown execute() throws Exception {
LOG.info(“Loading executor tasks ” + componentId + “:” + executorId);

String handlerName = componentId + “-executor” + executorId;
Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);

LOG.info(“Finished loading executor ” + componentId + “:” + executorId);
return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask, receiveQueue);
}
这里使用 Utils.asyncLoop 创建 Utils.SmartThread 并且调用 start 启动
Utils.asyncLoop
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java
/**
* Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous
* call.
*
* The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable
* that in turn returns the number of seconds to sleep. In the latter case isFactory.
*
* @param afn the code to call on each iteration
* @param isDaemon whether the new thread should be a daemon thread
* @param eh code to call when afn throws an exception
* @param priority the new thread’s priority
* @param isFactory whether afn returns a callable instead of sleep seconds
* @param startImmediately whether to start the thread before returning
* @param threadName a suffix to be appended to the thread name
* @return the newly created thread
*
* @see Thread
*/
public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
int priority, final boolean isFactory, boolean startImmediately,
String threadName) {
SmartThread thread = new SmartThread(new Runnable() {
public void run() {
try {
final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
while (true) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
final Long s = fn.call();
if (s == null) {// then stop running it
break;
}
if (s > 0) {
Time.sleep(s);
}
}
} catch (Throwable t) {
if (Utils.exceptionCauseIsInstanceOf(
InterruptedException.class, t)) {
LOG.info(“Async loop interrupted!”);
return;
}
LOG.error(“Async loop died!”, t);
throw new RuntimeException(t);
}
}
});
if (eh != null) {
thread.setUncaughtExceptionHandler(eh);
} else {
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error(“Async loop died!”, e);
Utils.exitProcess(1, “Async loop died!”);
}
});
}
thread.setDaemon(isDaemon);
thread.setPriority(priority);
if (threadName != null && !threadName.isEmpty()) {
thread.setName(thread.getName() + “-” + threadName);
}
if (startImmediately) {
thread.start();
}
return thread;
}

这里 run 方法无限循环调用 fn.call(),也就是调用 Executor.call().call() 方法
BoltExecutor.call 主要是调用 receiveQueue.consume 方法
SpoutExecutor.call 除了调用 receiveQueue.consume 方法,还调用了 spouts.get(j).nextTuple()

receiveQueue.consume
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
*/
public int consume(JCQueue.Consumer consumer) {
return consume(consumer, continueRunning);
}

/**
* Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
* elements consumed from Q
*/
public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
try {
return consumeImpl(consumer, exitCond);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
*
* @param consumer
* @param exitCond
*/
private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
int drainCount = 0;
while (exitCond.keepRunning()) {
Object tuple = recvQueue.poll();
if (tuple == null) {
break;
}
consumer.accept(tuple);
++drainCount;
}

int overflowDrainCount = 0;
int limit = overflowQ.size();
while (exitCond.keepRunning() && (overflowDrainCount < limit)) {// 2nd cond prevents staying stuck with consuming overflow
Object tuple = overflowQ.poll();
++overflowDrainCount;
consumer.accept(tuple);
}
int total = drainCount + overflowDrainCount;
if (total > 0) {
consumer.flush();
}
return total;
}
consume 方法主要是调用 consumer 的 accept 方法
Task
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
public class Task {

private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private final TaskMetrics taskMetrics;
private Executor executor;
private WorkerState workerData;
private TopologyContext systemTopologyContext;
private TopologyContext userTopologyContext;
private WorkerTopologyContext workerTopologyContext;
private Integer taskId;
private String componentId;
private Object taskObject; // Spout/Bolt object
private Map<String, Object> topoConf;
private BooleanSupplier emitSampler;
private CommonStats executorStats;
private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
private boolean debug;

public Task(Executor executor, Integer taskId) throws IOException {
this.taskId = taskId;
this.executor = executor;
this.workerData = executor.getWorkerData();
this.topoConf = executor.getTopoConf();
this.componentId = executor.getComponentId();
this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
this.executorStats = executor.getStats();
this.workerTopologyContext = executor.getWorkerTopologyContext();
this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
this.userTopologyContext = mkTopologyContext(workerData.getTopology());
this.taskObject = mkTaskObject();
this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
this.addTaskHooks();
this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId);
}

//……
}
Executor.accept
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@Override
public void accept(Object event) {
AddressedTuple addressedTuple = (AddressedTuple) event;
int taskId = addressedTuple.getDest();

TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
if (isDebug) {
LOG.info(“Processing received message FOR {} TUPLE: {}”, taskId, tuple);
}

try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

accept 方法主要是对每个 taskId,挨个调用 tupleActionFn 方法
BoltExecutor.tupleActionFn 主要是从 task 获取 boltObject,然后调用 boltObject.execute(tuple);
SpoutExecutor.tupleActionFn 主要是从 RotatingMap<Long, TupleInfo> pending 取出 TupleInfo,然后进行成功或失败的 ack

ExecutorShutdown
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
public class ExecutorShutdown implements Shutdownable, IRunningExecutor {

private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);

private final Executor executor;
private final List<Utils.SmartThread> threads;
private final ArrayList<Task> taskDatas;
private final JCQueue receiveQueue;

//……

@Override
public void credentialsChanged(Credentials credentials) {
TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
Constants.CREDENTIALS_CHANGED_STREAM_ID);
AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
try {
executor.getReceiveQueue().publish(addressedTuple);
executor.getReceiveQueue().flush();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public void loadChanged(LoadMapping loadMapping) {
executor.reflectNewLoadMapping(loadMapping);
}

@Override
public JCQueue getReceiveQueue() {
return receiveQueue;
}

@Override
public boolean publishFlushTuple() {
return executor.publishFlushTuple();
}

@Override
public void shutdown() {
try {
LOG.info(“Shutting down executor ” + executor.getComponentId() + “:” + executor.getExecutorId());
executor.getReceiveQueue().close();
for (Utils.SmartThread t : threads) {
t.interrupt();
}
for (Utils.SmartThread t : threads) {
LOG.debug(“Executor ” + executor.getComponentId() + “:” + executor.getExecutorId() + ” joining thread ” + t.getName());
t.join();
}
executor.getStats().cleanupStats();
for (Task task : taskDatas) {
if (task == null) {
continue;
}
TopologyContext userContext = task.getUserContext();
for (ITaskHook hook : userContext.getHooks()) {
hook.cleanup();
}
}
executor.getStormClusterState().disconnect();
if (executor.getOpenOrPrepareWasCalled().get()) {
for (Task task : taskDatas) {
if (task == null) {
continue;
}
Object object = task.getTaskObject();
if (object instanceof ISpout) {
((ISpout) object).close();
} else if (object instanceof IBolt) {
((IBolt) object).cleanup();
} else {
LOG.error(“unknown component object”);
}
}
}
LOG.info(“Shut down executor ” + executor.getComponentId() + “:” + executor.getExecutorId());
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
}
}
}
ExecutorShutdown 主要包装了一下 shutdown 的处理
小结

worker 启动之后从去 zk 的 /assignments/{topology} 路径,比如 /assignments/DemoTopology-1-1539163962 读取 assignment 信息
然后根据 assignment 信息获取 Map<List<Long>, NodeInfo> executorToNodePort,然后通过 Executor.mkExecutor 创建 Executor
创建 Executor 的时候根据 assignment 信息中的 task 信息创建 Task 绑定到 Executor
之后调用 executor 的 execute 方法,这个方法启动 Utils.SmartThread,该 thread 循环调用 Executor.call().call() 方法

BoltExecutor.call 主要是调用 receiveQueue.consume 方法;SpoutExecutor.call 除了调用 receiveQueue.consume 方法,还调用了 spouts.get(j).nextTuple()
receiveQueue.consume 方法主要是调用 Executor 的 accept 方法,而 accept 方法主要是对每个 taskId,挨个调用 tupleActionFn 方法
BoltExecutor.tupleActionFn 主要是从 task 获取 boltObject,然后调用 boltObject.execute(tuple);SpoutExecutor.tupleActionFn 主要是从 RotatingMap<Long, TupleInfo> pending 取出 TupleInfo,然后进行成功或失败的 ack

worker 可以理解为进程,executor 即为该进程里头的线程数,而 task 则可以理解为 spout 或 bolt 的实例,默认是一个 executor 对应一个 spout 或 bolt 的 task
增加 worker 或 executor 可以对 supervisor 进行扩容,这个过程称之为 rebalance,而 task 则作为载体及任务的抽象从负载大的 worker 的 executor 转到新 worker 的 executor 上,实现 rebalance(rebalance 命令只能重新调整 worker、executor 数量,无法改变 task 数量)

doc

Storm- 源码分析 - Component ,Executor ,Task 之间关系
Understanding the Parallelism of a Storm Topology

正文完
 0