关于后端:使用SpringBatch读取csv文件

225次阅读

共计 6212 个字符,预计需要花费 16 分钟才能阅读完成。

1、需要

零碎每日从某个固定的目录中读取 csv 文件,并在管制台上打印。

2、解决方案

要解决上述需要,能够应用的办法有很多,此处抉择应用 Spring Batch 来实现。

3、注意事项

1、文件门路的获取

此处简略解决,读取 JobParameters 中的日期,而后构建一个文件门路,并将文件门路放入到 ExecutionContext中。此处为了简略,文件门路会在程序中写死,然而同时也会将文件门路存入到 ExecutionContext 中,并且在具体的某个 Step 中从 ExecutionContext 中获取门路。

留神:
ExecutionContext中存入的数据尽管在各个 Step 中都能够获取到,然而 不举荐 存入 比拟大 的数据到 ExecutionContext 中,因为这个对象的数据须要存入到数据库中。

2、各个 Step 如果获取到 ExecutionContext 中的值

  1. 类上退出 @StepScope 注解
  2. 通过 @Value("#{jobExecutionContext['importPath']}") 来获取

eg:

@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
    // 读取数据
    return new FlatFileItemReaderBuilder<Person>()
            .name("read-csv-file")
            .resource(new ClassPathResource(importPath))
            .delimited().delimiter(",")
            .names("username", "age", "sex")
            .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
            .build();}

解释:在程序实例化 FlatFileItemReader 的时候,此时是没有 jobExecutionContext 的,那么就会报错,如果加上 @StepScope,此时就没有问题了。@StepScope 示意达到 Step 阶段才实例化这个 Bean

3、FlatFileItemReader 应用留神

当咱们应用 FlatFileItemReader 来读取咱们的 csv 文件时,此处须要返回 FlatFileItemReader类型,而不能间接返回ItemReader,否则可能呈现如下谬误 Reader must be open before it can be read

4、实现步骤

1、导入依赖,配置

1、导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

2、初始化 SpringBatch 数据库

spring.datasource.username=root
spring.datasource.password=root@1993
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# 程序启动时,默认不执行 job
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
# 初始化 spring-batch 数据库脚本
spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql

2、构建文件读取门路

此处我的想法是,在 JobExecutionListener 中实现文件门路的获取,并将之放入到 ExecutionContext,而后在各个Step 中就能够获取到文件门路的值了。

/**
 * 在此监听器中,获取到具体的须要读取的文件门路,并保留到 ExecutionContext
 *
 * @author huan.fu
 * @date 2022/8/30 - 22:22
 */
@Slf4j
public class AssemblyReadCsvPathListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {ExecutionContext executionContext = jobExecution.getExecutionContext();
        JobParameters jobParameters = jobExecution.getJobParameters();
        String importDate = jobParameters.getString("importDate");
        log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);
        String readCsvPath = "data/person.csv";
        log.info("依据日期组装须要读取的 csv 门路为:[{}], 此处排除日期,间接写一个死的门路", readCsvPath);
        executionContext.putString("importPath", readCsvPath);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {}}

3、构建 Tasklet,输入文件门路

@Slf4j
@Component
@StepScope
public class PrintImportFilePathTaskLet implements Tasklet {@Value("#{jobExecutionContext['importPath']}")
    private String importFilePath;

    @Value("#{jobParameters['importDate']}")
    private String importDate;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {log.info("从 job parameter 中获取到的 importDate:[{}], 从 jobExecutionContext 中获取的 importPath:[{}]",
                importDate, importFilePath);

        return RepeatStatus.FINISHED;
    }
}

须要留神的是,此类上退出了 @StepScope注解

4、编写实体类

@AllArgsConstructor
@Getter
@ToString
public class Person {
    /**
     * 用户名
     */
    private String username;
    /**
     * 年龄
     */
    private Integer age;
    /**
     * 性别
     */
    private String sex;
}

5、编写 Job 配置

@Configuration
@AllArgsConstructor
@Slf4j
public class ImportPersonJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final PrintImportFilePathTaskLet printImportFilePathTaskLet;
    private final ItemReader<Person> readCsvItemReader;

    @Bean
    public Job importPersonJob() {
        // 获取一个 job builder, jobName 能够是不存在的
        return jobBuilderFactory.get("import-person-job")
                // 增加 job execution 监听器
                .listener(new AssemblyReadCsvPathListener())
                // 打印 job parameters 和 ExecutionContext 中的值
                .start(printParametersAndContextVariables())
                // 读取 csv 的数据并解决
                .next(handleCsvFileStep())
                .build();}

    /**
     * 读取数据
     * 留神:此处须要返回 FlatFileItemReader 类型,而不要返回 ItemReader
     * 否则可能报如下异样 Reader must be open before it can be read
     *
     * @param importPath 文件门路
     * @return reader
     */
    @Bean
    @StepScope
    public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
        // 读取数据
        return new FlatFileItemReaderBuilder<Person>()
                .name("read-csv-file")
                .resource(new ClassPathResource(importPath))
                .delimited().delimiter(",")
                .names("username", "age", "sex")
                .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
                .build();}

    @Bean
    public Step handleCsvFileStep() {

        // 每读取一条数据,交给这个解决
        ItemProcessor<Person, Person> processor = item -> {if (item.getAge() > 25) {log.info("用户 [{}] 的年龄:[{}>25]不解决", item.getUsername(), item.getAge());
                return null;
            }
            return item;
        };

        // 读取到了 chunk 大小的数据后,开始执行写入
        ItemWriter<Person> itemWriter = items -> {log.info("开始写入数据");
            for (Person item : items) {log.info("{}", item);
            }
        };

        return stepBuilderFactory.get("handle-csv-file")
                // 每读取 2 条数据,执行一次 write,当每 read 一条数据后,都会执行 process
                .<Person, Person>chunk(2)
                // 读取数据
                .reader(readCsvItemReader)
                // 读取一条数据就开始解决
                .processor(processor)
                // 当读取的数据的数量达到 chunk 时,调用该办法进行解决
                .writer(itemWriter)
                .build();}

    /**
     * 打印 job parameters 和 ExecutionContext 中的值
     * <p>
     * TaskletStep 是一个非常简单的接口,仅有一个办法——execute。* TaskletStep 会重复的调用这个办法直到获取一个 RepeatStatus.FINISHED 返回或者抛出一个异样。* 所有的 Tasklet 调用都会包装在一个事物中。*
     * @return Step
     */
    private Step printParametersAndContextVariables() {return stepBuilderFactory.get("print-context-params")
                .tasklet(printImportFilePathTaskLet)
                // 当 job 重启时,如果达到了 3 此,则该 step 不在执行
                .startLimit(3)
                // 当 job 重启时,如果该 step 的是曾经解决实现即 COMPLETED 状态时,下方给 false 示意该 step 不在重启,即不在执行
                .allowStartIfComplete(false)
                // 增加 step 监听
                .listener(new CustomStepExecutionListener())
                .build();}
}

6、编写 Job 启动类

@Component
@Slf4j
public class StartImportPersonJob {

    @Autowired
    private Job importPersonJob;
    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {JobParameters jobParameters = new JobParametersBuilder()
                .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
                .toJobParameters();
        JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);
        log.info("job invoked");
    }
}

7、主动配置 SpringBatch

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchReadCsvApplication {public static void main(String[] args) {SpringApplication.run(SpringBatchReadCsvApplication.class, args);
    }
}

次要是 @EnableBatchProcessing 注解

5、执行后果

6、残缺代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv

正文完
 0