乐趣区

关于java:通过例子讲解Spring-Batch入门优秀的批处理框架

1 前言

欢送拜访南瓜慢说 www.pkslow.com 获取更多精彩文章!

Spring 相干文章:Springboot-Cloud 相干

Spring Batch是一个轻量级的、欠缺的批处理框架,作为 Spring 体系中的一员,它领有灵便、不便、生产可用的特点。在应答高效解决大量信息、定时解决大量数据等场景非常简便。

联合调度框架能更大地施展 Spring Batch 的作用。

2 Spring Batch 的概念常识

2.1 分层架构

Spring Batch的分层架构图如下:

能够看到它分为三层,别离是:

  • Application应用层:蕴含了所有工作 batch jobs 和开发人员自定义的代码,次要是依据我的项目须要开发的业务流程等。
  • Batch Core核心层:蕴含启动和治理工作的运行环境类,如 JobLauncher 等。
  • Batch Infrastructure根底层:下面两层是建设在根底层之上的,蕴含根底的 读入 reader 写出 writer、重试框架等。

2.2 要害概念

了解下图所波及的概念至关重要,不然很难进行后续开发和问题剖析。

2.2.1 JobRepository

专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以 Spring Batch 是须要依赖数据库来治理的。

2.2.2 工作启动器 JobLauncher

负责启动工作Job

2.2.3 工作 Job

Job是封装整个批处理过程的单位,跑一个批处理工作,就是跑一个 Job 所定义的内容。

上图介绍了 Job 的一些相干概念:

  • Job:封装解决实体,定义过程逻辑。
  • JobInstanceJob的运行实例,不同的实例,参数不同,所以定义好一个 Job 后能够通过不同参数运行屡次。
  • JobParameters:与 JobInstance 相关联的参数。
  • JobExecution:代表 Job 的一次理论执行,可能胜利、可能失败。

所以,开发人员要做的事件,就是定义Job

2.2.4 步骤 Step

Step是对 Job 某个过程的封装,一个 Job 能够蕴含一个或多个 Step,一步步的Step 按特定逻辑执行,才代表 Job 执行实现。

通过定义 Step 来组装 Job 能够更灵便地实现简单的业务逻辑。

2.2.5 输出——解决——输入

所以,定义一个 Job 要害是定义好一个或多个 Step,而后把它们组装好即可。而定义Step 有多种办法,但有一种罕用的模型就是 输出——解决——输入 ,即Item ReaderItem ProcessorItem Writer。比方通过 Item Reader 从文件输出数据,而后通过 Item Processor 进行业务解决和数据转换,最初通过 Item Writer 写到数据库中去。

Spring Batch为咱们提供了许多开箱即用的 ReaderWriter,十分不便。

3 代码实例

了解了基本概念后,就间接通过代码来感受一下吧。整个我的项目的性能是从多个 csv 文件中读数据,解决后输入到一个 csv 文件。

3.1 根本框架

增加依赖:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

须要增加 Spring Batch 的依赖,同时应用 H2 作为内存数据库比拟不便,理论生产必定是要应用内部的数据库,如OraclePostgreSQL

入口主类:

@SpringBootApplication
@EnableBatchProcessing
public class PkslowBatchJobMain {public static void main(String[] args) {SpringApplication.run(PkslowBatchJobMain.class, args);
    }
}

也很简略,只是在 Springboot 的根底上增加注解@EnableBatchProcessing

畛域实体类Employee

package com.pkslow.batch.entity;
public class Employee {
    String id;
    String firstName;
    String lastName;
}

对应的 csv 文件内容如下:

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller

3.2 输出——解决——输入

3.2.1 读取 ItemReader

因为有多个输出文件,所以定义如下:

@Value("input/inputData*.csv")
private Resource[] inputResources;

@Bean
public MultiResourceItemReader<Employee> multiResourceItemReader()
{MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();
  resourceItemReader.setResources(inputResources);
  resourceItemReader.setDelegate(reader());
  return resourceItemReader;
}

@Bean
public FlatFileItemReader<Employee> reader()
{FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();
  // 跳过 csv 文件第一行,为表头
  reader.setLinesToSkip(1);
  reader.setLineMapper(new DefaultLineMapper() {
    {setLineTokenizer(new DelimitedLineTokenizer() {
        {
          // 字段名
          setNames(new String[] {"id", "firstName", "lastName"});
        }
      });
      setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {
        {
          // 转换化后的指标类
          setTargetType(Employee.class);
        }
      });
    }
  });
  return reader;
}

这里应用了FlatFileItemReader,不便咱们从文件读取数据。

3.2.2 解决 ItemProcessor

为了简略演示,解决很简略,就是把最初一列转为大写:

public ItemProcessor<Employee, Employee> itemProcessor() {
  return employee -> {employee.setLastName(employee.getLastName().toUpperCase());
    return employee;
  };
}

3.2.3 输入 ItremWriter

比较简单,代码及正文如下:

private Resource outputResource = new FileSystemResource("output/outputData.csv");

@Bean
public FlatFileItemWriter<Employee> writer()
{FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
  writer.setResource(outputResource);
  // 是否为追加模式
  writer.setAppendAllowed(true);
  writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {
    {
      // 设置宰割符
      setDelimiter(",");
      setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {
        {
          // 设置字段
          setNames(new String[] {"id", "firstName", "lastName"});
        }
      });
    }
  });
  return writer;
}

3.3 Step

有了 Reader-Processor-Writer 后,就能够定义 Step 了:

@Bean
public Step csvStep() {return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .processor(itemProcessor())
    .writer(writer())
    .build();}

这里有一个 chunk 的设置,值为5,意思是 5 条记录后再提交输入,能够依据本人需要定义。

3.4 Job

实现了 Step 的编码,定义 Job 就容易了:

@Bean
public Job pkslowCsvJob() {
  return jobBuilderFactory
    .get("pkslowCsvJob")
    .incrementer(new RunIdIncrementer())
    .start(csvStep())
    .build();}

3.5 运行

实现以上编码后,执行程序,后果如下:

胜利读取数据,并将最初字段转为大写,并输入到 outputData.csv 文件。

4 监听 Listener

能够通过 Listener 接口对特定事件进行监听,以实现更多业务性能。比方如果解决失败,就记录一条失败日志;解决实现,就告诉上游拿数据等。

咱们别离对 ReadProcessWrite事件进行监听,对应别离要实现 ItemReadListener 接口、ItemProcessListener接口和 ItemWriteListener 接口。因为代码比较简单,就是打印一下日志,这里只贴出 ItemWriteListener 的实现代码:

public class PkslowWriteListener implements ItemWriteListener<Employee> {private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);
    @Override
    public void beforeWrite(List<? extends Employee> list) {logger.info("beforeWrite:" + list);
    }

    @Override
    public void afterWrite(List<? extends Employee> list) {logger.info("afterWrite:" + list);
    }

    @Override
    public void onWriteError(Exception e, List<? extends Employee> list) {logger.info("onWriteError:" + list);
    }
}

把实现的监听器 listener 整合到 Step 中去:

@Bean
public Step csvStep() {return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)
    .reader(multiResourceItemReader())
    .listener(new PkslowReadListener())
    .processor(itemProcessor())
    .listener(new PkslowProcessListener())
    .writer(writer())
    .listener(new PkslowWriteListener())
    .build();}

执行后看一下日志:

这里就能显著看到之前设置的 chunk 的作用了。Writer每次是解决 5 条记录,如果一条输入一次,会对 IO 造成压力。

5 总结

Spring Batch还有许多优良的个性,如面对大量数据时的并行处理。本文次要入门介绍为主,不一一介绍,后续会专门解说。


欢送关注微信公众号 <南瓜慢说>,将继续为你更新 …

多读书,多分享;多写作,多整顿。

退出移动版