1 前言
欢送拜访南瓜慢说 www.pkslow.com获取更多精彩文章!
Spring相干文章:
Springboot-Cloud相干
Spring Batch
是一个轻量级的、欠缺的批处理框架,作为Spring
体系中的一员,它领有灵便、不便、生产可用的特点。在应答高效解决大量信息、定时解决大量数据等场景非常简便。
联合调度框架能更大地施展Spring Batch
的作用。
2 Spring Batch的概念常识
2.1 分层架构
Spring Batch
的分层架构图如下:
能够看到它分为三层,别离是:
Application
应用层:蕴含了所有工作batch jobs
和开发人员自定义的代码,次要是依据我的项目须要开发的业务流程等。Batch Core
核心层:蕴含启动和治理工作的运行环境类,如JobLauncher
等。Batch Infrastructure
根底层:下面两层是建设在根底层之上的,蕴含根底的读入reader
和写出writer
、重试框架等。
2.2 要害概念
了解下图所波及的概念至关重要,不然很难进行后续开发和问题剖析。
2.2.1 JobRepository
专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch
是须要依赖数据库来治理的。
2.2.2 工作启动器JobLauncher
负责启动工作Job
。
2.2.3 工作Job
Job
是封装整个批处理过程的单位,跑一个批处理工作,就是跑一个Job
所定义的内容。
上图介绍了Job
的一些相干概念:
Job
:封装解决实体,定义过程逻辑。JobInstance
:Job
的运行实例,不同的实例,参数不同,所以定义好一个Job
后能够通过不同参数运行屡次。JobParameters
:与JobInstance
相关联的参数。JobExecution
:代表Job
的一次理论执行,可能胜利、可能失败。
所以,开发人员要做的事件,就是定义Job
。
2.2.4 步骤Step
Step
是对Job
某个过程的封装,一个Job
能够蕴含一个或多个Step
,一步步的Step
按特定逻辑执行,才代表Job
执行实现。
通过定义Step
来组装Job
能够更灵便地实现简单的业务逻辑。
2.2.5 输出——解决——输入
所以,定义一个Job
要害是定义好一个或多个Step
,而后把它们组装好即可。而定义Step
有多种办法,但有一种罕用的模型就是输出——解决——输入
,即Item Reader
、Item Processor
和Item Writer
。比方通过Item Reader
从文件输出数据,而后通过Item Processor
进行业务解决和数据转换,最初通过Item Writer
写到数据库中去。
Spring Batch
为咱们提供了许多开箱即用的Reader
和Writer
,十分不便。
3 代码实例
了解了基本概念后,就间接通过代码来感受一下吧。整个我的项目的性能是从多个csv
文件中读数据,解决后输入到一个csv
文件。
3.1 根本框架
增加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope></dependency>
须要增加Spring Batch
的依赖,同时应用H2
作为内存数据库比拟不便,理论生产必定是要应用内部的数据库,如Oracle
、PostgreSQL
。
入口主类:
@SpringBootApplication@EnableBatchProcessingpublic class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); }}
也很简略,只是在Springboot
的根底上增加注解@EnableBatchProcessing
。
畛域实体类Employee
:
package com.pkslow.batch.entity;public class Employee { String id; String firstName; String lastName;}
对应的csv
文件内容如下:
id,firstName,lastName1,Lokesh,Gupta2,Amit,Mishra3,Pankaj,Kumar4,David,Miller
3.2 输出——解决——输入
3.2.1 读取ItemReader
因为有多个输出文件,所以定义如下:
@Value("input/inputData*.csv")private Resource[] inputResources;@Beanpublic MultiResourceItemReader<Employee> multiResourceItemReader(){ MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader;}@Beanpublic FlatFileItemReader<Employee> reader(){ FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳过csv文件第一行,为表头 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //转换化后的指标类 setTargetType(Employee.class); } }); } }); return reader;}
这里应用了FlatFileItemReader
,不便咱们从文件读取数据。
3.2.2 解决ItemProcessor
为了简略演示,解决很简略,就是把最初一列转为大写:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; };}
3.2.3 输入ItremWriter
比较简单,代码及正文如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv");@Beanpublic FlatFileItemWriter<Employee> writer(){ FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否为追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //设置宰割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //设置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer;}
3.3 Step
有了Reader-Processor-Writer
后,就能够定义Step
了:
@Beanpublic Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build();}
这里有一个chunk
的设置,值为5
,意思是5条记录后再提交输入,能够依据本人需要定义。
3.4 Job
实现了Step
的编码,定义Job
就容易了:
@Beanpublic Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build();}
3.5 运行
实现以上编码后,执行程序,后果如下:
胜利读取数据,并将最初字段转为大写,并输入到outputData.csv
文件。
4 监听Listener
能够通过Listener
接口对特定事件进行监听,以实现更多业务性能。比方如果解决失败,就记录一条失败日志;解决实现,就告诉上游拿数据等。
咱们别离对Read
、Process
和Write
事件进行监听,对应别离要实现ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因为代码比较简单,就是打印一下日志,这里只贴出ItemWriteListener
的实现代码:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); }}
把实现的监听器listener
整合到Step
中去:
@Beanpublic Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build();}
执行后看一下日志:
这里就能显著看到之前设置的chunk
的作用了。Writer
每次是解决5条记录,如果一条输入一次,会对IO
造成压力。
5 总结
Spring Batch
还有许多优良的个性,如面对大量数据时的并行处理。本文次要入门介绍为主,不一一介绍,后续会专门解说。
欢送关注微信公众号<南瓜慢说>,将继续为你更新...
多读书,多分享;多写作,多整顿。