基于1.3.6, 仅集体了解,欢送斧正.
架构
master
启动类为org.apache.dolphinscheduler.server.master.MasterServer
,通过spring注解@PostConstruct
启动run办法.
master节点在启动时,次要做了以下4个事:
- 通过netty监听端口,与worker节点通信
- 在注册核心(zk)上注册本人
- 启动任务调度线程
- 启动quartz
其中quartz是一个定时工作的组件,能够通过数据库做集群.
worker
worker节点在启动时,次要做了以下:
- 监听端口,和master通信
- 通过注册核心注册
- 启动工作执行线程
- 启动工作ack和后果上报重试线程
RetryReportTaskStatusThread
master和worker节点都会监听端口,是因为单方都会作为客户端被动发送音讯给对方.worker节点可查看org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService#getRemoteChannel(int)
代码,在原有连贯不可用时会通过注册核心,找到原有的master的ip和监听端口,而后发动新的连贯申请.master节点可查看NettyExecutorManager,作为客户端向worker发动申请.
logger
Logger节点目前仅仅是通过netty监听了一个端口,承受对日志文件的读取申请,次要逻辑都在org.apache.dolphinscheduler.server.log.LoggerRequestProcessor
中.目前日志文件都是写到worker节点的本地文件,因而logger节点必须和worker节点一对一部署在一起.日志写到本地文件,对容器部署不是很敌对,如果要长久化,势必须要通过长久化存储.个人感觉没什么必要特地的抽出这么一个节点,性能特地简略,又必须和worker节点一一部署在一起,齐全能够合并入worker节点中,或者是为了后续扩大吧.
api
api节点是个web利用,提供controller接口为前端提供服务,次要就是crud.
alter
定时拉取数据库,发送告警信息,该节点利用只有部署一个就能够了.
服务注册与发现
master和worker节点在启动时都会作为服务端通过netty监听端口,只有客户端晓得服务端ip和该端口,即可通过向其发动连贯进行通信.
master注册
org.apache.dolphinscheduler.server.master.registry.MasterRegistry#registry
public void registry() { // 1.获取本机地址 String address = NetUtils.getAddr(masterConfig.getListenPort()); // 2.master注册门路 String localNodePath = getMasterPath(); // 3.创立长期节点 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); // 4.注册连贯状态监听器 zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( (client, newState) -> { if (newState == ConnectionState.LOST) { logger.error("master : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("master : {} reconnected to zookeeper", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("master : {} connection SUSPENDED ", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } }); // 5.定时上报zk状态 int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), Constants.MASTER_TYPE, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); }
在状态上报实现后,zk节点状态如下图,data为本机状态等信息,用逗号隔开.
worker注册
org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry#registry
public void registry() { // 1.获取本机地址 String address = NetUtils.getAddr(workerConfig.getListenPort()); // 2. 获取zk门路,须要依据配置文件的worker group注册 Set<String> workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); for (String workerZKPath : workerZkPaths) { // 3.创立长期节点 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); // 4.注册连贯状态监听器 zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( (client,newState) -> { if (newState == ConnectionState.LOST) { logger.error("worker : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("worker : {} reconnected to zookeeper", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("worker : {} connection SUSPENDED ", address); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); } }); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } // 5.定时上报zk状态 HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, this.workerConfig.getWorkerMaxCpuloadAvg(), this.workerConfig.getWorkerReservedMemory(), workerZkPaths, Constants.WORKER_TYPE, this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); }
在状态上报实现后,zk节点状态如下图:
可见在default组下注册胜利.
服务发现
服务注册实现之后,即可通过zk注册核心做到服务发现.通过查看ServerNodeManager
代码能够得悉,ds的服务发现是master负责的.其通过继承spring的InitializingBean
在启动时执行以下代码
@Override public void afterPropertiesSet() throws Exception { /** * 从zk中拿到master和worker节点 */ load(); /** * 定时同步表t_ds_worker_group的work group */ executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); /** * 注册master节点变更监听器 */ registryCenter.getRegisterOperator().addListener(new MasterNodeListener()); /** * 注册worker节点变更监听器 */ registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener()); }
查看WorkerNodeInfoAndGroupDbSyncTask
代码,t_ds_worker_group表的工作组,应该不能够和配置文件的工作组重名.在有节点新增和删除时,zk回调监听器,而后同步批改本地缓存,为ExecutorDispatcher
散发工作时应用.
调度流程
MasterSchedulerService
该线程在master节点启动时启动,循环调用scheduleProcess
办法.
scheduleProcess
- 通过zk获取分布式锁
- 从t_ds_command表上拉取一条命令
解决该命令失去工作实例
- 结构工作实例
- 若工作实例为空,插入谬误命令表t_ds_error_command,并删除该命令
- 若线程池数量有余,设置为期待线程状态
- 保留工作实例并删除该命令
- 线程池执行工作实例
command有很多种类型,定时触发的起源能够查看ProcessScheduleJob
类,其继承Job,每次定时工作触发就会回调execute办法向command插入数据.
MasterExecThread
每个工作实例都会有一个该线程进行执行监控,一般工作调用executeProcess
办法执行
prepareProcess
- 构建dag
初始化各个状态的工作节点队列
- readyToSubmitTaskQueue 筹备好去提交的工作节点队列
- activeTaskNode 运行中的工作节点
- dependFailedTask 依赖节点失败
runProcess
- 提交没有依赖的工作节点到readyToSubmitTaskQueue队列
- 循环判断是否有实现的工作节点,并提交后继的工作节点
从readyToSubmitTaskQueue提交工作节点执行
endProcess
保留工作实例,如果是期待线程状态就创立一个复原期待线程命令,最初发送告警
MasterTaskExecThread
在MasterExecThread循环中,会调用
submitStandByTask
办法将readyToSubmitTaskQueue
队列的工作节点提交到activeTaskNode中,提交工作节点的办法为submitTaskExec
.其中一般工作创立了一个MasterTaskExecThread
线程.每个工作节点都会有一个该线程来负责执行和监控状态.submit
首先提交到数据库,再通过dispatchTask办法散发工作到worker.循环提交尽量确保两者都实现
dispatchTask
构建TaskPriority对象并放入TaskPriorityQueueImpl队列,其中外部是一个线程平安的阻塞优先级队列PriorityBlockingQueue
waitTaskQuit
循环判断工作节点是否勾销,暂停,实现,超时.其中勾销会向worker节点发送TASK_KILL_REQUEST命令
TaskPriorityQueueConsumer
在dispatchTask办法中,工作被放入了一个工作优先级队列中,之后由TaskPriorityQueueConsumer线程来生产,该线程也是一直循环,从队列中获取工作,而后进行散发
dispatch
构建执行上下文,而后由ExecutorDispatcher散发
ExecutorDispatcherdispatch
首先获取ExecutorManager,目前只有一个NettyExecutorManager实现类.通过HostManager筛选出一个可执行的worker,默认会通过ServerNodeManager筛选出worker group下的worker节点
NettyExecutorManager
作为一个netty客户端向worker节点发送音讯,其中对每个节点会重试3次,若失败则会向其余同组节点重试
工作执行的通信流程
下面貌似梳理了很多,但实际上才实现了工作开始执行时,master节点向worker节点发送的第一条音讯TASK_EXECUTE_REQUEST的过程,而且过程中也省略了很多看不懂的代码.下图是工作节点执行过程中整个的master和worker的通信交互流程.
ds的netty封装
因为工作发送给worker当前,代码就执行到其余的节点上了,为了串联代码的调用流程,这里记录一下dolphinscheduler对netty的封装.
NettyRemotingServer
在构造方法中,依据入参配置,初始化了boss和work两个EventLoopGroup,而后在start办法中,设置tcp参数和ChannelPipeline,最初阻塞监听端口.其中业务逻辑都放在ChannelPipeline中.
initNettyChannel1
private void initNettyChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); pipeline.addLast("decoder", new NettyDecoder()); pipeline.addLast("handler", serverHandler);}
首先增加的是编解码器,因为tcp传输的对象都是字节流,须要编解码器来负责字节流和java对象的互相转换.其中能够看到,编码器是共享的,解码器是每个管道都从新new的,这是因为在解码时因为tcp的粘包和拆包,须要缓存没有被解码的字节.
NettyServerHandler
首先先看一下userEventTriggered办法,这里解决了IdleStateEvent事件,然而方才在initNettyChannel办法中是没有看到增加了IdleStateHandler的.在dev的最新代码上曾经修复了.
processReceived
在承受并解码实现时,会回调该办法.音讯体解码成Command对象之后,依据type找到通过registerProcessor办法注册的处理器NettyRequestProcessor对象和解决的线程池.耗时申请是不容许在io线程上执行的,会阻塞导致无奈解决新的音讯,所以这里应用线程池异步解决.
NettyRemotingClient
同样在构造方法中依据入参配置初始化boss和work两个EventLoopGroup,在start中设置tcp配置和初始化ChannelPipeline.此外多了一个responseFutureExecutor线程池解决超时的ResponseFuture.
sendAsync
异步发送音讯,应用asyncSemaphore信号量管制了并发申请数量,opaque作为音讯id对应申请和响应.在netty发送音讯的回调中判断发送是否胜利.
sendSync
同步发送音讯,返回参为近程服务返回的响应.
ResponseFuture
因为worker和master之间的通信不是同步申请,因而ds设计了这个类用来异步转同步,和申请超时管制.其在构造方法中会放入全局的FUTURE_TABLE中被responseFutureExecutor线程监控是否超时.
waitResponse
阻塞期待响应,或者超时.latch在putResponse中被开释.
putResponse
放入响应,并去除全局的缓存.
NettyClientHandler
NettyRequestProcessor
这是具体业务逻辑的接口,负责解决不同的Command类型.
工作执行
在worker节点启动的时候能够看到,接管到TASK_EXECUTE_REQUEST音讯时,注册的处理器是TaskExecuteProcessor
TaskExecuteProcessor
- 创立日志和执行文件夹
- 发送TASK_EXECUTE_ACK给master
交给线程池workerExecService执行
TaskExecuteThread
工作节点的执行线程
- 设置超时
- 通过TaskManager依据类型取得工作对象
- 顺次调用工作对象的init,handle,after
发送TASK_EXECUTE_RESPONSE把执行后果告知master
TaskResponseService
master在承受到TASK_EXECUTE_ACK和TASK_EXECUTE_RESPONSE之后,都会把音讯放到该类的eventQueue中,外部启动了一个线程负责长久化到数据库.而后发送DB_TASK_ACK和DB_TASK_RESPONSE音讯到worker,让worker去除重试的缓存.
因为数据库中状态曾经批改,那么MasterTaskExecThread的waitTaskQuit会退出循环.而后MasterExecThread的runProcess在遍历activeTaskNode时,future.isDone()的判断会通过,最终通过submitPostNode办法提交上面的工作节点.
容错
ZKMasterClient
dataChanged
通过zk的监听回调解决节点挂掉的容错.在该办法内依据类型别离解决master和worker节点挂掉的容错.
removeZKNodePath
通过获取zk锁避免多个节点容错.
failoverMaster
master容错.通过查表找到挂掉的master节点在解决的t_ds_process_instance.对每个ProcessInstance去除host,并创立一个容错复原的command.最终调度线程会解决该命令.
failoverWorker
worker容错.找到所有挂掉的worker节点的host正在解决的TaskInstance,而后遍历批改状态为须要容错.对照MasterTaskExecThread线程,发现在typeIsFinished判断之后跳出循环,最终MasterExecThread线程也会感知到并从新提交给其余worker.
网络抖动
因为” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而产生节点的remove事件。对于这种状况,咱们应用最简略的形式,那就是节点一旦和ZooKeeper产生超时连贯,则间接将Master或Worker服务停掉。
这段官网阐明,对应代码中很多循环都会判断Stopper是否在运行.在HeartBeatTask中判断节点是否为挂掉的节点,若是调用stop.
改良
工作运行时须要很多线程配合,每个工作实例须要一个MasterExecThread线程,外部taskExecService线程池为20个线程(默认,当然不肯定会启动这么多).单机可调度的数量会受到线程数量的限度.amee之前也是对每个工作都须要有一个线程来监控,前面批改为应用工夫轮全局同一监控.在MasterSchedulerService中masterExecService线程池数量默认为100,阐明默认状况下单机master只能同时调度100个工作实例.
很多状态是通过数据库来同步的,因而须要一直的在各处查询数据库,而全局保护的缓存此时曾经失去了意义.不仅对数据库产生了压力,代码个人感觉也比拟乱.有一个全局对立的模型,各处批改都对该模型进行批改或者更好.api节点对数据库的批改(例如暂停)须要告诉到master.
劣势
可视化,这是ds相较而言最大的长处.dag的创立,各种工作参数的设置,数据源,文件资源,运行时的状态监控等等都能够在页面上直观的看到.不过如果是二次开发成咱们本人的产品,页面设置和展现必定是本人做的.
反对程度扩大,容错,HA,非常适合于容器化部署(外部应用ip来标识,这里在k8s环境会有问题)
深度反对大数据环境的各种工作,对大数据场景来说能够做到真正意义上的开箱即用.