关于dolphinscheduler:dolphinscheduler-136源码解析
基于1.3.6, 仅集体了解,欢送斧正. 架构 master启动类为org.apache.dolphinscheduler.server.master.MasterServer,通过spring注解@PostConstruct启动run办法.master节点在启动时,次要做了以下4个事: 通过netty监听端口,与worker节点通信在注册核心(zk)上注册本人启动任务调度线程启动quartz其中quartz是一个定时工作的组件,能够通过数据库做集群. workerworker节点在启动时,次要做了以下: 监听端口,和master通信通过注册核心注册启动工作执行线程启动工作ack和后果上报重试线程RetryReportTaskStatusThreadmaster和worker节点都会监听端口,是因为单方都会作为客户端被动发送音讯给对方.worker节点可查看org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService#getRemoteChannel(int)代码,在原有连贯不可用时会通过注册核心,找到原有的master的ip和监听端口,而后发动新的连贯申请.master节点可查看NettyExecutorManager,作为客户端向worker发动申请. loggerLogger节点目前仅仅是通过netty监听了一个端口,承受对日志文件的读取申请,次要逻辑都在org.apache.dolphinscheduler.server.log.LoggerRequestProcessor中.目前日志文件都是写到worker节点的本地文件,因而logger节点必须和worker节点一对一部署在一起.日志写到本地文件,对容器部署不是很敌对,如果要长久化,势必须要通过长久化存储.个人感觉没什么必要特地的抽出这么一个节点,性能特地简略,又必须和worker节点一一部署在一起,齐全能够合并入worker节点中,或者是为了后续扩大吧. apiapi节点是个web利用,提供controller接口为前端提供服务,次要就是crud. alter定时拉取数据库,发送告警信息,该节点利用只有部署一个就能够了. 服务注册与发现master和worker节点在启动时都会作为服务端通过netty监听端口,只有客户端晓得服务端ip和该端口,即可通过向其发动连贯进行通信. master注册org.apache.dolphinscheduler.server.master.registry.MasterRegistry#registry public void registry() { // 1.获取本机地址 String address = NetUtils.getAddr(masterConfig.getListenPort()); // 2.master注册门路 String localNodePath = getMasterPath(); // 3.创立长期节点 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); // 4.注册连贯状态监听器 zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( (client, newState) -> { if (newState == ConnectionState.LOST) { logger.error("master : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("master : {} reconnected to zookeeper", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("master : {} connection SUSPENDED ", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } }); // 5.定时上报zk状态 int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), Constants.MASTER_TYPE, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); }在状态上报实现后,zk节点状态如下图,data为本机状态等信息,用逗号隔开. ...