1、需要

零碎每日从某个固定的目录中读取csv文件,并在管制台上打印。

2、解决方案

要解决上述需要,能够应用的办法有很多,此处抉择应用Spring Batch来实现。

3、注意事项

1、文件门路的获取

此处简略解决,读取 JobParameters 中的日期,而后构建一个文件门路,并将文件门路放入到 ExecutionContext中。此处为了简略,文件门路会在程序中写死,然而同时也会将文件门路存入到 ExecutionContext 中,并且在具体的某个Step中从ExecutionContext中获取门路。

留神:
ExecutionContext中存入的数据尽管在各个Step中都能够获取到,然而不举荐存入比拟大的数据到ExecutionContext中,因为这个对象的数据须要存入到数据库中。

2、各个Step如果获取到ExecutionContext中的值

  1. 类上退出 @StepScope 注解
  2. 通过 @Value("#{jobExecutionContext['importPath']}") 来获取

eg:

@Bean@StepScopepublic FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {    // 读取数据    return new FlatFileItemReaderBuilder<Person>()            .name("read-csv-file")            .resource(new ClassPathResource(importPath))            .delimited().delimiter(",")            .names("username", "age", "sex")            .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))            .build();}

解释:在程序实例化FlatFileItemReader的时候,此时是没有jobExecutionContext的,那么就会报错,如果加上@StepScope,此时就没有问题了。@StepScope示意达到Step阶段才实例化这个Bean

3、FlatFileItemReader应用留神

当咱们应用FlatFileItemReader来读取咱们的csv文件时,此处须要返回 FlatFileItemReader类型,而不能间接返回ItemReader,否则可能呈现如下谬误 Reader must be open before it can be read

4、实现步骤

1、导入依赖,配置

1、导入依赖

<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-batch</artifactId>    </dependency></dependencies>

2、初始化SpringBatch数据库

spring.datasource.username=rootspring.datasource.password=root@1993spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=falsespring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# 程序启动时,默认不执行jobspring.batch.job.enabled=falsespring.batch.jdbc.initialize-schema=always# 初始化spring-batch数据库脚本spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql

2、构建文件读取门路

此处我的想法是,在JobExecutionListener中实现文件门路的获取,并将之放入到ExecutionContext,而后在各个Step中就能够获取到文件门路的值了。

/** * 在此监听器中,获取到具体的须要读取的文件门路,并保留到 ExecutionContext * * @author huan.fu * @date 2022/8/30 - 22:22 */@Slf4jpublic class AssemblyReadCsvPathListener implements JobExecutionListener {    @Override    public void beforeJob(JobExecution jobExecution) {        ExecutionContext executionContext = jobExecution.getExecutionContext();        JobParameters jobParameters = jobExecution.getJobParameters();        String importDate = jobParameters.getString("importDate");        log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);        String readCsvPath = "data/person.csv";        log.info("依据日期组装须要读取的csv门路为:[{}],此处排除日期,间接写一个死的门路", readCsvPath);        executionContext.putString("importPath", readCsvPath);    }    @Override    public void afterJob(JobExecution jobExecution) {    }}

3、构建Tasklet,输入文件门路

@Slf4j@Component@StepScopepublic class PrintImportFilePathTaskLet implements Tasklet {    @Value("#{jobExecutionContext['importPath']}")    private String importFilePath;    @Value("#{jobParameters['importDate']}")    private String importDate;    @Override    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {        log.info("从job parameter 中获取到的 importDate:[{}],从 jobExecutionContext 中获取的 importPath:[{}]",                importDate, importFilePath);        return RepeatStatus.FINISHED;    }}

须要留神的是,此类上退出了 @StepScope注解

4、编写实体类

@AllArgsConstructor@Getter@ToStringpublic class Person {    /**     * 用户名     */    private String username;    /**     * 年龄     */    private Integer age;    /**     * 性别     */    private String sex;}

5、编写Job配置

@Configuration@AllArgsConstructor@Slf4jpublic class ImportPersonJobConfig {    private final JobBuilderFactory jobBuilderFactory;    private final StepBuilderFactory stepBuilderFactory;    private final PrintImportFilePathTaskLet printImportFilePathTaskLet;    private final ItemReader<Person> readCsvItemReader;    @Bean    public Job importPersonJob() {        // 获取一个job builder, jobName能够是不存在的        return jobBuilderFactory.get("import-person-job")                // 增加job execution 监听器                .listener(new AssemblyReadCsvPathListener())                // 打印 job parameters 和 ExecutionContext 中的值                .start(printParametersAndContextVariables())                // 读取csv的数据并解决                .next(handleCsvFileStep())                .build();    }    /**     * 读取数据     * 留神:此处须要返回 FlatFileItemReader类型,而不要返回ItemReader     * 否则可能报如下异样 Reader must be open before it can be read     *     * @param importPath 文件门路     * @return reader     */    @Bean    @StepScope    public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {        // 读取数据        return new FlatFileItemReaderBuilder<Person>()                .name("read-csv-file")                .resource(new ClassPathResource(importPath))                .delimited().delimiter(",")                .names("username", "age", "sex")                .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))                .build();    }    @Bean    public Step handleCsvFileStep() {        // 每读取一条数据,交给这个解决        ItemProcessor<Person, Person> processor = item -> {            if (item.getAge() > 25) {                log.info("用户[{}]的年龄:[{}>25]不解决", item.getUsername(), item.getAge());                return null;            }            return item;        };        // 读取到了 chunk 大小的数据后,开始执行写入        ItemWriter<Person> itemWriter = items -> {            log.info("开始写入数据");            for (Person item : items) {                log.info("{}", item);            }        };        return stepBuilderFactory.get("handle-csv-file")                // 每读取2条数据,执行一次write,当每read一条数据后,都会执行process                .<Person, Person>chunk(2)                // 读取数据                .reader(readCsvItemReader)                // 读取一条数据就开始解决                .processor(processor)                // 当读取的数据的数量达到 chunk 时,调用该办法进行解决                .writer(itemWriter)                .build();    }    /**     * 打印 job parameters 和 ExecutionContext 中的值     * <p>     * TaskletStep是一个非常简单的接口,仅有一个办法——execute。     * TaskletStep会重复的调用这个办法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异样。     * 所有的Tasklet调用都会包装在一个事物中。     *     * @return Step     */    private Step printParametersAndContextVariables() {        return stepBuilderFactory.get("print-context-params")                .tasklet(printImportFilePathTaskLet)                // 当job重启时,如果达到了3此,则该step不在执行                .startLimit(3)                // 当job重启时,如果该step的是曾经解决实现即COMPLETED状态时,下方给false示意该step不在重启,即不在执行                .allowStartIfComplete(false)                // 增加 step 监听                .listener(new CustomStepExecutionListener())                .build();    }}

6、编写Job启动类

@Component@Slf4jpublic class StartImportPersonJob {    @Autowired    private Job importPersonJob;    @Autowired    private JobLauncher jobLauncher;    @PostConstruct    public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {        JobParameters jobParameters = new JobParametersBuilder()                .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))                .toJobParameters();        JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);        log.info("job invoked");    }}

7、主动配置SpringBatch

@SpringBootApplication@EnableBatchProcessingpublic class SpringBatchReadCsvApplication {    public static void main(String[] args) {        SpringApplication.run(SpringBatchReadCsvApplication.class, args);    }}

次要是 @EnableBatchProcessing 注解

5、执行后果

6、残缺代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv