共计 1520 个字符,预计需要花费 4 分钟才能阅读完成。
工作切分
DataX 实现单个数据同步的作业,称之为 Job,这个 Job 会做 split 阶段,拆分成一个个的 Task,每个 Task 都会有一个 Reader 和 Writer 的工作。
拆分成多个 Task 是为了并发的执行,这个并发数,就是之前的 Channel 数量。
咱们假如这个 Job 在 split 阶段,有 100 个 Task,而 Channel 数量是 5,那理论的 Channel 数量取 100 和 5 的最小值,也就是 5。
每个并发执行的 Task 数量并不是简略地 100/5=20,而是先把 Task 放入 TaskGroup 中,而后把并发分给 TaskGroup。
比方每个 TaskGroup 的并发数量是 5,通过 core.container.taskGroup.channel 进行设置,那 TaskGroup 的数量就是总的 Channel 数 /TaskGroup 须要的数量,所有 TaskGroup 的数量就是 20/5=4。
因为有 100 个 Task,一共 4 个 TaskGroup,所有每个 TaskGroup 就有 100/4=25 个 Task。
工作启动
首先是启动线程池,这个线程池具备一个固定的数量,即 taskGroup 的数量。每个线程都启动一个 TaskGroupContainer,每个 TaskGroupContainer 都蕴含 JobId、taskGroupId、channelClazz 以及 configuration 等信息。
TaskGroupContainer 启动的时候,就会把 25 个工作,寄存在 taskQueue 汇合中。另外会创立一个大小为 5 的 runTasks 汇合中,寄存着 Task 执行器 TaskExecutor。
Task 执行器的创立,依赖着 taskQueue 汇合,每次从 taskQueue 汇合中拿出一个 task 创立 Task 执行器并启动,就须要从 taskQueue 汇合移除这个 task。
因为每个 TaskGroup 的并发数量是 5,所以 Task 执行器最多同时存在 5 个,也就是说,25 个 task,先执行 5 个,等某个或者多个执行完后,再执行剩下的 task。直至 taskQueue 汇合为空以及每个 Task 执行器都执行完,那这个 25 个 task 也就执行完了。
Task 执行器
每个 Task 都对应一个 Task 执行器,Task 执行器包含工作的运行配置、taskId、channel、用来读写的线程等。
读线程启动的是 ReaderRunner,外面有插件(比方 StreamReader$Task)、RecordSender 等属性。
写线程启动的是 WriterRunner,外面有插件(比方 StreamWriter$Task)、RecordReceiver 等属性。
Task 执行器启动的时候,首先是启动 WriterRunner 的线程,而后是 ReaderRunner 的线程。
WriterRunner 和 ReaderRunner 的执行程序如下,右边是 Reader 左边是 Writer。
次要的局部是 Reader 的 startRead 和 Writer 的 startWriter。
首先是 Reader 把数据读取进去,而后封装在 Record 中,这个 Record 蕴含了数据汇合 Column、byteSize、以及须要的内存 memorySize。Column 就是咱们读取的每条数据,包含数据的类型,数据的内容等。
读取进去的数据,发送给 Channel,目前默认的是基于内存的 MemoryChannel,也能够本人扩大。MemoryChannel 保护着 Record 的阻塞队列 queue。当有数据存进的 queue 时候,就会唤醒 Writer 的读取操作。
因为 Channel 是共用的,所以 Writer 就会从 Channel 的 queue 里读取 Reader 存入的数据,进行业务操作。