共计 5110 个字符,预计需要花费 13 分钟才能阅读完成。
1 前言
欢送拜访南瓜慢说 www.pkslow.com 获取更多精彩文章!
Spring 相干文章:
Springboot-Cloud 相干
Spring Batch
是一个轻量级的、欠缺的批处理框架,作为 Spring
体系中的一员,它领有灵便、不便、生产可用的特点。在应答高效解决大量信息、定时解决大量数据等场景非常简便。
联合调度框架能更大地施展 Spring Batch
的作用。
2 Spring Batch 的概念常识
2.1 分层架构
Spring Batch
的分层架构图如下:
能够看到它分为三层,别离是:
Application
应用层:蕴含了所有工作batch jobs
和开发人员自定义的代码,次要是依据我的项目须要开发的业务流程等。Batch Core
核心层:蕴含启动和治理工作的运行环境类,如JobLauncher
等。Batch Infrastructure
根底层:下面两层是建设在根底层之上的,蕴含根底的读入 reader
和写出 writer
、重试框架等。
2.2 要害概念
了解下图所波及的概念至关重要,不然很难进行后续开发和问题剖析。
2.2.1 JobRepository
专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以 Spring Batch
是须要依赖数据库来治理的。
2.2.2 工作启动器 JobLauncher
负责启动工作Job
。
2.2.3 工作 Job
Job
是封装整个批处理过程的单位,跑一个批处理工作,就是跑一个 Job
所定义的内容。
上图介绍了 Job
的一些相干概念:
Job
:封装解决实体,定义过程逻辑。JobInstance
:Job
的运行实例,不同的实例,参数不同,所以定义好一个Job
后能够通过不同参数运行屡次。JobParameters
:与JobInstance
相关联的参数。JobExecution
:代表Job
的一次理论执行,可能胜利、可能失败。
所以,开发人员要做的事件,就是定义Job
。
2.2.4 步骤 Step
Step
是对 Job
某个过程的封装,一个 Job
能够蕴含一个或多个 Step
,一步步的Step
按特定逻辑执行,才代表 Job
执行实现。
通过定义 Step
来组装 Job
能够更灵便地实现简单的业务逻辑。
2.2.5 输出——解决——输入
所以,定义一个 Job
要害是定义好一个或多个 Step
,而后把它们组装好即可。而定义Step
有多种办法,但有一种罕用的模型就是 输出——解决——输入
,即Item Reader
、Item Processor
和Item Writer
。比方通过 Item Reader
从文件输出数据,而后通过 Item Processor
进行业务解决和数据转换,最初通过 Item Writer
写到数据库中去。
Spring Batch
为咱们提供了许多开箱即用的 Reader
和Writer
,十分不便。
3 代码实例
了解了基本概念后,就间接通过代码来感受一下吧。整个我的项目的性能是从多个 csv
文件中读数据,解决后输入到一个 csv
文件。
3.1 根本框架
增加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
须要增加 Spring Batch
的依赖,同时应用 H2
作为内存数据库比拟不便,理论生产必定是要应用内部的数据库,如Oracle
、PostgreSQL
。
入口主类:
@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {public static void main(String[] args) {SpringApplication.run(PkslowBatchJobMain.class, args);
}
}
也很简略,只是在 Springboot
的根底上增加注解@EnableBatchProcessing
。
畛域实体类Employee
:
package com.pkslow.batch.entity;
public class Employee {
String id;
String firstName;
String lastName;
}
对应的 csv
文件内容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
3.2 输出——解决——输入
3.2.1 读取 ItemReader
因为有多个输出文件,所以定义如下:
@Value("input/inputData*.csv")
private Resource[] inputResources;
@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
resourceItemReader.setResources(inputResources);
resourceItemReader.setDelegate(reader());
return resourceItemReader;
}
@Bean
public FlatFileItemReader<Employee> reader()
{FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
// 跳过 csv 文件第一行,为表头
reader.setLinesToSkip(1);
reader.setLineMapper(new DefaultLineMapper() {
{setLineTokenizer(new DelimitedLineTokenizer() {
{
// 字段名
setNames(new String[] {"id", "firstName", "lastName"});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
{
// 转换化后的指标类
setTargetType(Employee.class);
}
});
}
});
return reader;
}
这里应用了FlatFileItemReader
,不便咱们从文件读取数据。
3.2.2 解决 ItemProcessor
为了简略演示,解决很简略,就是把最初一列转为大写:
public ItemProcessor<Employee, Employee> itemProcessor() {
return employee -> {employee.setLastName(employee.getLastName().toUpperCase());
return employee;
};
}
3.2.3 输入 ItremWriter
比较简单,代码及正文如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv");
@Bean
public FlatFileItemWriter<Employee> writer()
{FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
writer.setResource(outputResource);
// 是否为追加模式
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
{
// 设置宰割符
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
{
// 设置字段
setNames(new String[] {"id", "firstName", "lastName"});
}
});
}
});
return writer;
}
3.3 Step
有了 Reader-Processor-Writer
后,就能够定义 Step
了:
@Bean
public Step csvStep() {return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.processor(itemProcessor())
.writer(writer())
.build();}
这里有一个 chunk
的设置,值为5
,意思是 5 条记录后再提交输入,能够依据本人需要定义。
3.4 Job
实现了 Step
的编码,定义 Job
就容易了:
@Bean
public Job pkslowCsvJob() {
return jobBuilderFactory
.get("pkslowCsvJob")
.incrementer(new RunIdIncrementer())
.start(csvStep())
.build();}
3.5 运行
实现以上编码后,执行程序,后果如下:
胜利读取数据,并将最初字段转为大写,并输入到 outputData.csv
文件。
4 监听 Listener
能够通过 Listener
接口对特定事件进行监听,以实现更多业务性能。比方如果解决失败,就记录一条失败日志;解决实现,就告诉上游拿数据等。
咱们别离对 Read
、Process
和Write
事件进行监听,对应别离要实现 ItemReadListener
接口、ItemProcessListener
接口和 ItemWriteListener
接口。因为代码比较简单,就是打印一下日志,这里只贴出 ItemWriteListener
的实现代码:
public class PkslowWriteListener implements ItemWriteListener<Employee> {private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
@Override
public void beforeWrite(List<? extends Employee> list) {logger.info("beforeWrite:" + list);
}
@Override
public void afterWrite(List<? extends Employee> list) {logger.info("afterWrite:" + list);
}
@Override
public void onWriteError(Exception e, List<? extends Employee> list) {logger.info("onWriteError:" + list);
}
}
把实现的监听器 listener
整合到 Step
中去:
@Bean
public Step csvStep() {return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
.reader(multiResourceItemReader())
.listener(new PkslowReadListener())
.processor(itemProcessor())
.listener(new PkslowProcessListener())
.writer(writer())
.listener(new PkslowWriteListener())
.build();}
执行后看一下日志:
这里就能显著看到之前设置的 chunk
的作用了。Writer
每次是解决 5 条记录,如果一条输入一次,会对 IO
造成压力。
5 总结
Spring Batch
还有许多优良的个性,如面对大量数据时的并行处理。本文次要入门介绍为主,不一一介绍,后续会专门解说。
欢送关注微信公众号 <南瓜慢说>,将继续为你更新 …
多读书,多分享;多写作,多整顿。