1、需要
零碎每日从某个固定的目录中读取csv
文件,并在管制台上打印。
2、解决方案
要解决上述需要,能够应用的办法有很多,此处抉择应用Spring Batch
来实现。
3、注意事项
1、文件门路的获取
此处简略解决,读取 JobParameters
中的日期,而后构建一个文件门路,并将文件门路放入到 ExecutionContext
中。此处为了简略,文件门路会在程序中写死,然而同时也会将文件门路存入到 ExecutionContext
中,并且在具体的某个Step
中从ExecutionContext
中获取门路。
留神:ExecutionContext
中存入的数据尽管在各个Step
中都能够获取到,然而不举荐
存入比拟大
的数据到ExecutionContext
中,因为这个对象的数据须要存入到数据库中。
2、各个Step如果获取到ExecutionContext中的值
- 类上退出
@StepScope
注解 - 通过
@Value("#{jobExecutionContext['importPath']}")
来获取
eg:
@Bean@StepScopepublic FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) { // 读取数据 return new FlatFileItemReaderBuilder<Person>() .name("read-csv-file") .resource(new ClassPathResource(importPath)) .delimited().delimiter(",") .names("username", "age", "sex") .fieldSetMapper(new RecordFieldSetMapper<>(Person.class)) .build();}
解释:在程序实例化FlatFileItemReader
的时候,此时是没有jobExecutionContext的,那么就会报错,如果加上@StepScope
,此时就没有问题了。@StepScope
示意达到Step阶段才实例化这个Bean
3、FlatFileItemReader应用留神
当咱们应用FlatFileItemReader
来读取咱们的csv文件时,此处须要返回 FlatFileItemReader
类型,而不能间接返回ItemReader
,否则可能呈现如下谬误 Reader must be open before it can be read
4、实现步骤
1、导入依赖,配置
1、导入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency></dependencies>
2、初始化SpringBatch数据库
spring.datasource.username=rootspring.datasource.password=root@1993spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=falsespring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# 程序启动时,默认不执行jobspring.batch.job.enabled=falsespring.batch.jdbc.initialize-schema=always# 初始化spring-batch数据库脚本spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql
2、构建文件读取门路
此处我的想法是,在JobExecutionListener
中实现文件门路的获取,并将之放入到ExecutionContext
,而后在各个Step
中就能够获取到文件门路的值了。
/** * 在此监听器中,获取到具体的须要读取的文件门路,并保留到 ExecutionContext * * @author huan.fu * @date 2022/8/30 - 22:22 */@Slf4jpublic class AssemblyReadCsvPathListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution.getExecutionContext(); JobParameters jobParameters = jobExecution.getJobParameters(); String importDate = jobParameters.getString("importDate"); log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate); String readCsvPath = "data/person.csv"; log.info("依据日期组装须要读取的csv门路为:[{}],此处排除日期,间接写一个死的门路", readCsvPath); executionContext.putString("importPath", readCsvPath); } @Override public void afterJob(JobExecution jobExecution) { }}
3、构建Tasklet,输入文件门路
@Slf4j@Component@StepScopepublic class PrintImportFilePathTaskLet implements Tasklet { @Value("#{jobExecutionContext['importPath']}") private String importFilePath; @Value("#{jobParameters['importDate']}") private String importDate; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { log.info("从job parameter 中获取到的 importDate:[{}],从 jobExecutionContext 中获取的 importPath:[{}]", importDate, importFilePath); return RepeatStatus.FINISHED; }}
须要留神的是,此类上退出了 @StepScope
注解
4、编写实体类
@AllArgsConstructor@Getter@ToStringpublic class Person { /** * 用户名 */ private String username; /** * 年龄 */ private Integer age; /** * 性别 */ private String sex;}
5、编写Job配置
@Configuration@AllArgsConstructor@Slf4jpublic class ImportPersonJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final PrintImportFilePathTaskLet printImportFilePathTaskLet; private final ItemReader<Person> readCsvItemReader; @Bean public Job importPersonJob() { // 获取一个job builder, jobName能够是不存在的 return jobBuilderFactory.get("import-person-job") // 增加job execution 监听器 .listener(new AssemblyReadCsvPathListener()) // 打印 job parameters 和 ExecutionContext 中的值 .start(printParametersAndContextVariables()) // 读取csv的数据并解决 .next(handleCsvFileStep()) .build(); } /** * 读取数据 * 留神:此处须要返回 FlatFileItemReader类型,而不要返回ItemReader * 否则可能报如下异样 Reader must be open before it can be read * * @param importPath 文件门路 * @return reader */ @Bean @StepScope public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) { // 读取数据 return new FlatFileItemReaderBuilder<Person>() .name("read-csv-file") .resource(new ClassPathResource(importPath)) .delimited().delimiter(",") .names("username", "age", "sex") .fieldSetMapper(new RecordFieldSetMapper<>(Person.class)) .build(); } @Bean public Step handleCsvFileStep() { // 每读取一条数据,交给这个解决 ItemProcessor<Person, Person> processor = item -> { if (item.getAge() > 25) { log.info("用户[{}]的年龄:[{}>25]不解决", item.getUsername(), item.getAge()); return null; } return item; }; // 读取到了 chunk 大小的数据后,开始执行写入 ItemWriter<Person> itemWriter = items -> { log.info("开始写入数据"); for (Person item : items) { log.info("{}", item); } }; return stepBuilderFactory.get("handle-csv-file") // 每读取2条数据,执行一次write,当每read一条数据后,都会执行process .<Person, Person>chunk(2) // 读取数据 .reader(readCsvItemReader) // 读取一条数据就开始解决 .processor(processor) // 当读取的数据的数量达到 chunk 时,调用该办法进行解决 .writer(itemWriter) .build(); } /** * 打印 job parameters 和 ExecutionContext 中的值 * <p> * TaskletStep是一个非常简单的接口,仅有一个办法——execute。 * TaskletStep会重复的调用这个办法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异样。 * 所有的Tasklet调用都会包装在一个事物中。 * * @return Step */ private Step printParametersAndContextVariables() { return stepBuilderFactory.get("print-context-params") .tasklet(printImportFilePathTaskLet) // 当job重启时,如果达到了3此,则该step不在执行 .startLimit(3) // 当job重启时,如果该step的是曾经解决实现即COMPLETED状态时,下方给false示意该step不在重启,即不在执行 .allowStartIfComplete(false) // 增加 step 监听 .listener(new CustomStepExecutionListener()) .build(); }}
6、编写Job启动类
@Component@Slf4jpublic class StartImportPersonJob { @Autowired private Job importPersonJob; @Autowired private JobLauncher jobLauncher; @PostConstruct public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParametersBuilder() .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd"))) .toJobParameters(); JobExecution execution = jobLauncher.run(importPersonJob, jobParameters); log.info("job invoked"); }}
7、主动配置SpringBatch
@SpringBootApplication@EnableBatchProcessingpublic class SpringBatchReadCsvApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchReadCsvApplication.class, args); }}
次要是 @EnableBatchProcessing
注解
5、执行后果
6、残缺代码
https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv