共计 10379 个字符,预计需要花费 26 分钟才能阅读完成。
1. 背景
许多批处理问题都能够通过单线程、单过程作业来解决,因而在思考更简单的实现之前,最好先查看这些作业是否满足您的须要。掂量一份事实工作的体现,首先看看最简略的实现是否满足您的需要。即便应用规范硬件,您也能够在一分钟内读写数百 MB 的文件。
2. 并行处理
Spring Batch 提供了一系列选项,本章对此进行了形容,只管其余中央介绍了一些性能。在较高级别上,有两种并行处理模式:
- 单过程、多线程
- 多过程
这些也可分为以下几类:
- 多线程步骤(单过程)
- 并行步骤(单过程)
- 基于 step 的近程分块(多过程)
- 基于 step 进行分区(单个或多个过程)
2.1 多线程 step
启动并行处理的最简略办法是将 TaskExecutor 增加到步骤配置中。
应用 java 配置时,能够将 TaskExecutor 增加到步骤中,如下例所示:
@Bean
public TaskExecutor taskExecutor(){return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleMutliStep(TaskExecutor taskExecutor) {return this.stepBuilderFactory.get("sampleMutliStep")
.<DemoUser, DemoUser>chunk(2)
.reader(new DemoReader(10))
.writer(new DemoWriter<>())
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();}
2.2 并行 step
只有须要并行化的利用程序逻辑能够划分为不同的职责并调配给各个步骤,那么它就能够在单个过程中并行化。并行步骤执行易于配置和应用。
应用 java 配置时,与 step3 并行执行步骤(step1、step2)非常简单,如下例所示:
@Bean
public Job job() {return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance}
@Bean
public Flow splitFlow() {return new FlowBuilder<SimpleFlow>("splitFlow")
.split(paralTaskExecutor())
.add(flow1(), flow2())
.build();}
@Bean
public Flow flow1() {return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();}
@Bean
public Flow flow2() {return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();}
2.3 近程分块
在近程分块中,步骤解决被宰割到多个过程中,通过一些中间件互相通信。下图显示了该模式:
manager 组件是单个过程,Worker 是多个近程过程。如果管理器不是瓶颈,则此模式成果最好,因而解决的老本必须高于读取我的项目的老本(在实践中常常如此)。
manager 是 Spring 批处理步骤的一个实现,ItemWriter 被一个通用版本所取代,该版本晓得如何将我的项目块作为音讯发送到中间件。工作人员是应用任何中间件的规范侦听器(例如,对于 JMS,他们将是 MessageListener 实现),他们的角色是通过 ChunkProcessor 接口应用规范 ItemWriter 或 ItemProcessor plus ItemWriter 解决我的项目块。应用此模式的长处之一是读卡器、处理器和写入器组件是现成的(与本地执行步骤时应用的组件雷同)。我的项目被动静划分,工作通过中间件共享,因而,如果监听器都是热心的消费者,那么负载平衡是主动的。中间件必须是长久的,有保障的交付,每个音讯都有一个消费者。JMS 是不言而喻的候选者,但网格计算和共享内存产品空间中存在其余选项(如 JavaSpace)。
更进一步能够通过应用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)将区块解决内部化,它将我的项目发送进来并收集后果。发送后,Spring Batch 将持续读取和分组我的项目的过程,而无需期待后果。相同,ChunkMessageChannelItemWriter 负责收集后果并将其集成回 Spring 批处理过程。
通过 Spring 集成,您能够齐全管制过程的并发性(例如,通过应用 QueueChannel 而不是 DirectChannel)。此外,通过依赖 Spring Integration 丰盛的通道适配器汇合(如 JMS 和 AMQP),您能够将批处理作业的块分发给内部零碎进行解决。
具备要近程分块的步骤的简略作业可能具备相似以下配置:
从版本 4.1 开始,Spring Batch Integration 引入了 @EnableBatchIntegration 正文,可用于简化近程分块设置。此正文提供了两个能够在应用程序上下文中主动连贯的 bean:
RemoteChunkingManagerStepBuilderFactory:用于配置管理器步骤
RemoteChunkingWorkerBuilder:用于配置近程工作者集成流
这些 API 负责配置许多组件,如下图所述:
- 代码如下:看下生产者 remoteChunkManager
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Job remoteChunkingJob() {return this.jobBuilderFactory.get("remoteChunkingJob")
.start(remoteChunkManagerStep()).build();}
@Bean
public DirectChannel mangerRequests() {return new DirectChannel();
}
@Bean
public IntegrationFlow managerOutboundFlow() {return IntegrationFlows.from(mangerRequests())
.handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests"))
.get();}
@Bean
public QueueChannel managerReplies() {return new QueueChannel();
}
@Bean
public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"replies")
)
.channel(managerReplies()).get();}
@Bean
public TaskletStep remoteChunkManagerStep() {return this.managerStepBuilderFactory.get("remoteChunkManagerStep")
.<Integer, Integer>chunk(3).reader(itemReader())
.outputChannel(mangerRequests())
.inputChannel(managerReplies()).build();}
@Bean
public ListItemReader<Integer> itemReader() {return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
}
能够看到。Manager 须要定义 Reader。定义发送的音讯通道,定义接管的音讯通道。
这外面只定义了 reader。好奇收回去的是什么?接管到的又是什么?
在 RemoteChunkingManagerStepBuilder#build()中又增加了 ChunkMessageChannelItemWriter 相当于外部拼接了个实现的 step。
那收回去的是啥?
收回去的 ChunkRequest
-
看下消费者 remoteChunkWorker
@Autowired private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder; @Autowired private RabbitTemplate rabbitTemplate; @Bean public DirectChannel workerRequests() {return new DirectChannel(); } @Bean public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) { return IntegrationFlows .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"requests")) .channel(workerRequests()).get();} @Bean public DirectChannel workerReplies() {return new DirectChannel(); } @Bean public IntegrationFlow workerOutboundFlow() {return IntegrationFlows.from(workerReplies()) .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies")) .get();} @Bean public ItemProcessor<Integer, Integer> itemProcessor() { return item -> {System.out.println("processing item" + item); return item; }; } @Bean public ItemWriter<Integer> itemWriter() { return items -> {for (Integer item : items) {System.out.println("writing item" + item); } }; } @Bean public IntegrationFlow workerIntegrationFlow() { return this.remoteChunkingWorkerBuilder .itemProcessor(itemProcessor()) .itemWriter(itemWriter()) .inputChannel(workerRequests()) .outputChannel(workerReplies()).build();}
接管到的音讯是如何解决的?
执行 process 后续过程。执行实现返回 ChunResponse
并将 response 发送到队列。发回到生产者。生产者是如何解决的呢
生产者从 replychannel 接管音讯。而后将音讯更新到内存。进行运算。内存运算实体为 LocalState
-
解读一下哈:
-
在 Manager 方面,RemoteChunkingManagerStepBuilderFactory 容许您通过申明以下内容来配置管理器步骤:
reader 读取我的项目并将其发送给 Worker
向 Worker 发送申请的输入通道(“requests”)
接管 Worker 回复的输出通道(“replies”)
无需显式配置 ChunkMessageChannelItemWriter 和 MessagingTemplate(如果须要,依然能够显式配置)
-
在 Worker 方面,RemoteChunkingWorkerBuilder 容许您将工作者配置为:
侦听管理器在输出通道上发送的申请(“requests”)
应用配置的 ItemProcessor 和 ItemWriter 为每个申请调用 ChunkProcessorChunkHandler 的 handleChunk 办法
将输入通道上的回复(“replies”)发送给 Manager
无需显式配置 SimpleChunkProcessor 和 ChunkProcessorChunkHandler(如果须要,能够显式配置)。
-
整体流程能够概括如下:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();}
// Middleware beans setup omitted
}
}
2.4 近程分区
近程分区是通过分区器 partitioner 来管制整体流程。真正的执行 (蕴含 reader 这里是一个残缺的 step 性能) 不同于近程分块模块。
另一方面,当导致瓶颈的不是我的项目的解决而是相干的 I / O 时,近程分区很有用。应用近程分区,能够将工作调配给执行残缺的 Spring 批处理步骤的工作人员。因而,每个 worker 都有本人的 ItemReader、ItemProcessor 和 ItemWriter。为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler。
PartitionHandler 接口的这个实现应用 MessageChannel 实例向 remote Worker 发送指令并接管他们的响应。这为用于与 remote Worker 通信的传输(如 JMS 和 AMQP 例子用的是 rabbitmq)提供了一个很好的形象。
“可伸缩性”一章中波及近程分区的局部概述了配置近程分区所需的概念和组件,并显示了应用默认 TaskExecutionPartitionHandler 在独自的本地执行线程中进行分区的示例。对于到多个 JVM 的近程分区,须要另外两个组件:
- 远程结构或网格环境
- 反对所需近程解决构造或网格环境的 PartitionHandler 实现
与近程分块相似,JMS/AMQP 能够用作“近程解决构造”。在这种状况下,应用 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现,如上所述。以下示例假设存在一个分区作业,并重点介绍 MessageChannelPartitionHandler 和 JMS/AMQP 配置:
-
代码如下:
- 看下 manager 代码
@Bean public DirectChannel managerRequests() {return new DirectChannel(); } @Bean public IntegrationFlow managerOutboundFlow() {return IntegrationFlows.from(managerRequests()) .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("requests")) .get();} @Bean public DirectChannel managerReplies() {return new DirectChannel(); } @Bean public IntegrationFlow managerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) { return IntegrationFlows .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"replies")) .channel(managerReplies()).get();} @Bean public Step managerStep() {return this.managerStepBuilderFactory.get("managerStep") .partitioner("workerStep", new SimplePartitioner()) .gridSize(GRID_SIZE) .outputChannel(managerRequests()) .inputChannel(managerReplies()).build();} @Bean public Job remotePartitioningJob() { return this.jobBuilderFactory .get("remotePartitioningJob") .start(managerStep()) .build();}
- 看 worker 代码
@Bean public DirectChannel workerRequests() {return new DirectChannel(); } @Bean public IntegrationFlow workerInboundFlow(ConnectionFactory rabbitmqConnectionFactory) { return IntegrationFlows .from(Amqp.inboundAdapter(rabbitmqConnectionFactory,"requests")) .channel(workerRequests()).get();} @Bean public DirectChannel workerReplies() {return new DirectChannel(); } @Bean public IntegrationFlow workerOutboundFlow() {return IntegrationFlows.from(workerReplies()) .handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies")) .get();} @Bean public Step workerStep() { return this.workerStepBuilderFactory .get("workerStep") .inputChannel(workerRequests()) .outputChannel(workerReplies()) .tasklet(tasklet(null)).build();} @Bean @StepScope public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) {return (contribution, chunkContext) -> {System.out.println("processing" + partition); return RepeatStatus.FINISHED; }; }
可用于简化近程分区设置的 @EnableBatchIntegration 正文。此正文提供了两个对近程分区有用的 bean:
RemotePartitioningManagerStepBuilderFactory:用于配置 Manager 步骤
RemotePartitioningWorkerStepBuilderFactory:用于配置 Worker 步骤
这些 API 负责配置许多组件,如下图所述:(应用 db 轮询模式)
Remote Partitioning Configuration (with job repository polling)
Remote Partitioning Configuration (with replies aggregation)
在 Manager 方面,RemotePartitioningManagerStepBuilderFactory 容许您通过申明以下内容来配 Manager 步骤:
- 用于对数据进行分区的 Partitioner
- 向 Worker 发送申请的输入通道(“传出申请”)
- 接管 Worker 回复的输出通道(“传入回复”)(配置回复聚合时)
- 轮询距离和超时参数(配置作业存储库轮询时)
无需显式配置 MessageChannelPartitionHandler 和 MessagingTemplate(如果须要,依然能够显式配置)。
在工作者方面,RemotePartitioningWorkersStepBuilderFactory 容许您将工作者配置为:
- 侦听管理器在输出通道上发送的申请(“传入申请”)
- 为每个申请调用 StepExecutionRequestHandler 的句柄办法
- = 将输入通道上的回复(“传出回复”)发送给经理
无需显式配置 StepExecutionRequestHandler(如果须要,能够显式配置)。
以下示例显示了如何应用这些 API:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();}
// Middleware beans setup omitted
}
}