聊聊storm的AssignmentDistributionService

53次阅读

共计 11754 个字符,预计需要花费 30 分钟才能阅读完成。


本文主要研究一下 storm 的 AssignmentDistributionService
AssignmentDistributionService
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/**
* A service for distributing master assignments to supervisors, this service makes the assignments notification
* asynchronous.
*
* <p>We support multiple working threads to distribute assignment, every thread has a queue buffer.
*
* <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request,
* let the supervisors sync instead.
*
* <p>Caution: this class is not thread safe.
*
* <pre>{@code
* Working mode
* +——–+ +—————–+
* | queue1 | ==> | Working thread1 |
* +——–+ shuffle +——–+ +—————–+
* | Master | ==>
* +——–+ +——–+ +—————–+
* | queue2 | ==> | Working thread2 |
* +——–+ +—————–+
* }
* </pre>
*/
public class AssignmentDistributionService implements Closeable {
//……
private ExecutorService service;

/**
* Assignments request queue.
*/
private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;

/**
* Add an assignments for a node/supervisor for distribution.
* @param node node id of supervisor.
* @param host host name for the node.
* @param serverPort node thrift server port.
* @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
*/
public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
try {
//For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
//Just skip for this scheduling round.
if (serverPort == null) {
LOG.warn(“Discard an assignment distribution for node {} because server port info is missing.”, node);
return;
}

boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
if (!success) {
LOG.warn(“Discard an assignment distribution for node {} because the target sub queue is full.”, node);
}

} catch (InterruptedException e) {
LOG.error(“Add node assignments interrupted: {}”, e.getMessage());
throw new RuntimeException(e);
}
}

private LinkedBlockingQueue<NodeAssignments> nextQueue() {
return this.assignmentsQueue.get(nextQueueId());
}
}

Nimbus 通过调用 AssignmentDistributionService 的 addAssignmentsForNode,将任务分配结果通知到 supervisor
addAssignmentsForNode 主要是将 SupervisorAssignments 放入到 assignmentsQueue

AssignmentDistributionService.getInstance
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/**
* Factory method for initialize a instance.
* @param conf config.
* @return an instance of {@link AssignmentDistributionService}
*/
public static AssignmentDistributionService getInstance(Map conf) {
AssignmentDistributionService service = new AssignmentDistributionService();
service.prepare(conf);
return service;
}

/**
* Function for initialization.
*
* @param conf config
*/
public void prepare(Map conf) {
this.conf = conf;
this.random = new Random(47);

this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100);

this.assignmentsQueue = new HashMap<>();
for (int i = 0; i < threadsNum; i++) {
this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize));
}
//start the thread pool
this.service = Executors.newFixedThreadPool(threadsNum);
this.active = true;
//start the threads
for (int i = 0; i < threadsNum; i++) {
this.service.submit(new DistributeTask(this, i));
}
// for local cluster
localSupervisors = new HashMap<>();
if (ConfigUtils.isLocalMode(conf)) {
isLocalMode = true;
}
}

getInstance 方法 new 了一个 AssignmentDistributionService,同时调用 prepare 方法进行初始化
prepare 的时候,创建了 threadsNum 数量的 LinkedBlockingQueue,队列大小为 DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE
另外通过 Executors.newFixedThreadPool(threadsNum) 创建一个线程池,然后提交 threadsNum 数量的 DistributeTask,每个 queue 对应一个 DistributeTask

DistributeTask
storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
/**
* Task to distribute assignments.
*/
static class DistributeTask implements Runnable {
private AssignmentDistributionService service;
private Integer queueIndex;

DistributeTask(AssignmentDistributionService service, Integer index) {
this.service = service;
this.queueIndex = index;
}

@Override
public void run() {
while (service.isActive()) {
try {
NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex);
sendAssignmentsToNode(nodeAssignments);
} catch (InterruptedException e) {
if (service.isActive()) {
LOG.error(“Get an unexpected interrupt when distributing assignments to node, {}”, e.getCause());
} else {
// service is off now just interrupt it.
Thread.currentThread().interrupt();
}
}
}
}

private void sendAssignmentsToNode(NodeAssignments assignments) {
if (this.service.isLocalMode) {
//local node
Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
if (supervisor != null) {
supervisor.sendSupervisorAssignments(assignments.getAssignments());
} else {
LOG.error(“Can not find node {} for assignments distribution”, assignments.getNode());
throw new RuntimeException(“null for node ” + assignments.getNode() + ” supervisor instance.”);
}
} else {
// distributed mode
try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
assignments.getHost(), assignments.getServerPort())) {
try {
client.getClient().sendSupervisorAssignments(assignments.getAssignments());
} catch (Exception e) {
//just ignore the exception.
LOG.error(“Exception when trying to send assignments to node {}: {}”, assignments.getNode(), e.getMessage());
}
} catch (Throwable e) {
//just ignore any error/exception.
LOG.error(“Exception to create supervisor client for node {}: {}”, assignments.getNode(), e.getMessage());
}

}
}
}

/**
* Get an assignments from the target queue with the specific index.
* @param queueIndex index of the queue
* @return an {@link NodeAssignments}
* @throws InterruptedException
*/
public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException {
NodeAssignments target = null;
while (true) {
target = getQueueById(queueIndex).poll();
if (target != null) {
return target;
}
Time.sleep(100L);
}
}

AssignmentDistributionService 在 prepare 的时候,会往线程池提交 DistributeTask
DistributeTask 的 run 方法不断循环,从对应的 queue 取 NodeAssignments,然后调用 sendAssignmentsToNode 进行远程通信
sendAssignmentsToNode 调用 client.getClient().sendSupervisorAssignments(assignments.getAssignments())

Supervisor.launchSupervisorThriftServer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
// validate port
int port = getThriftServerPort();
try {
ServerSocket socket = new ServerSocket(port);
socket.close();
} catch (BindException e) {
LOG.error(“{} is not available. Check if another process is already listening on {}”, port, port);
throw new RuntimeException(e);
}

TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
new org.apache.storm.generated.Supervisor.Iface() {
@Override
public void sendSupervisorAssignments(SupervisorAssignments assignments)
throws AuthorizationException, TException {
checkAuthorization(“sendSupervisorAssignments”);
LOG.info(“Got an assignments from master, will start to sync with assignments: {}”, assignments);
SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
getReadClusterState());
getEventManger().add(syn);
}

//……
});
this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
this.thriftServer.serve();
}
Supervisor.launchSupervisorThriftServer 的时候,添加了 TProcessor,将 SupervisorAssignments 包装为 SynchronizeAssignments 添加到 EventManager 中
SynchronizeAssignments.run
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
/**
* A runnable which will synchronize assignments to node local and then worker processes.
*/
public class SynchronizeAssignments implements Runnable {
//……
@Override
public void run() {
// first sync assignments to local, then sync processes.
if (null == assignments) {
getAssignmentsFromMaster(this.supervisor.getConf(), this.supervisor.getStormClusterState(), this.supervisor.getAssignmentId());
} else {
assignedAssignmentsToLocal(this.supervisor.getStormClusterState(), assignments);
}
this.readClusterState.run();
}

private static void assignedAssignmentsToLocal(IStormClusterState clusterState, SupervisorAssignments assignments) {
if (null == assignments) {
//unknown error, just skip
return;
}
Map<String, byte[]> serAssignments = new HashMap<>();
for (Map.Entry<String, Assignment> entry : assignments.get_storm_assignment().entrySet()) {
serAssignments.put(entry.getKey(), Utils.serialize(entry.getValue()));
}
clusterState.syncRemoteAssignments(serAssignments);
}
}

这里调用了 assignedAssignmentsToLocal,然后还触发了 this.readClusterState.run()
assignedAssignmentsToLocal 调用了 clusterState.syncRemoteAssignments(serAssignments)

StormClusterStateImpl.syncRemoteAssignments
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@Override
public void syncRemoteAssignments(Map<String, byte[]> remote) {
if (null != remote) {
this.assignmentsBackend.syncRemoteAssignments(remote);
} else {
Map<String, byte[]> tmp = new HashMap<>();
List<String> stormIds = this.stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, false);
for (String stormId : stormIds) {
byte[] assignment = this.stateStorage.get_data(ClusterUtils.assignmentPath(stormId), false);
tmp.put(stormId, assignment);
}
this.assignmentsBackend.syncRemoteAssignments(tmp);
}
}

这里将 serAssignments 信息更新到 assignmentsBackend(即本地内存)
如果 remote 为 null,这里则从 zk 读取分配信息,然后更新到内存;zk 地址为 ClusterUtils.assignmentPath(stormId)(/assignments/{topologyId})

ReadClusterState.run
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@Override
public synchronized void run() {
try {
List<String> stormIds = stormClusterState.assignments(null);
Map<String, Assignment> assignmentsSnapshot = getAssignmentsSnapshot(stormClusterState);

Map<Integer, LocalAssignment> allAssignments = readAssignments(assignmentsSnapshot);
if (allAssignments == null) {
//Something odd happened try again later
return;
}
Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds);

HashSet<Integer> assignedPorts = new HashSet<>();
LOG.debug(“Synchronizing supervisor”);
LOG.debug(“All assignment: {}”, allAssignments);
LOG.debug(“Topology Ids -> Profiler Actions {}”, topoIdToProfilerActions);
for (Integer port : allAssignments.keySet()) {
if (iSuper.confirmAssigned(port)) {
assignedPorts.add(port);
}
}
HashSet<Integer> allPorts = new HashSet<>(assignedPorts);
iSuper.assigned(allPorts);
allPorts.addAll(slots.keySet());

Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>();
for (Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) {
String topoId = entry.getKey();
if (entry.getValue() != null) {
for (ProfileRequest req : entry.getValue()) {
NodeInfo ni = req.get_nodeInfo();
if (host.equals(ni.get_node())) {
Long port = ni.get_port().iterator().next();
Set<TopoProfileAction> actions = filtered.get(port.intValue());
if (actions == null) {
actions = new HashSet<>();
filtered.put(port.intValue(), actions);
}
actions.add(new TopoProfileAction(topoId, req));
}
}
}
}

for (Integer port : allPorts) {
Slot slot = slots.get(port);
if (slot == null) {
slot = mkSlot(port);
slots.put(port, slot);
slot.start();
}
slot.setNewAssignment(allAssignments.get(port));
slot.addProfilerActions(filtered.get(port));
}

} catch (Exception e) {
LOG.error(“Failed to Sync Supervisor”, e);
throw new RuntimeException(e);
}
}

这里调用 slot 的 setNewAssignment 进行分配,设置 slot 的 AtomicReference<LocalAssignment> newAssignment
Slot 的 run 方法会轮询通过 stateMachineStep 方法对 newAssignment 进行判断然后更新 nextState

小结

Nimbus 通过调用 AssignmentDistributionService 的 addAssignmentsForNode,将任务分配结果通知到 supervisor

addAssignmentsForNode 主要是将 SupervisorAssignments 放入到 assignmentsQueue;AssignmentDistributionService 默认创建一个指定线程数的线程池,同时创建指定线程数的队列及 DistributeTask
DistributeTask 不断循环从指定 queue 拉取 SynchronizeAssignments,然后调用 sendAssignmentsToNode 通知到 supervisor

Supervisor 在启动的时候会 launchSupervisorThriftServer,注册了响应 sendSupervisorAssignments 的 processor,将接收到的 SupervisorAssignments 包装为 SynchronizeAssignments 添加到 EventManager 中

EventManager 处理 SynchronizeAssignments 时执行其 run 方法,调用了 assignedAssignmentsToLocal,然后还触发了 this.readClusterState.run()
assignedAssignmentsToLocal 调用了 clusterState.syncRemoteAssignments(serAssignments) 将分配信息更新到本地内存;而 readClusterState.run() 主要是更新 slot 的 newAssignment 值,之后依赖 Slot 的轮询去感知状态变化,然后触发相应的处理

doc
Understanding the Parallelism of a Storm Topology

正文完
 0