共计 6212 个字符,预计需要花费 16 分钟才能阅读完成。
1、需要
零碎每日从某个固定的目录中读取 csv
文件,并在管制台上打印。
2、解决方案
要解决上述需要,能够应用的办法有很多,此处抉择应用 Spring Batch
来实现。
3、注意事项
1、文件门路的获取
此处简略解决,读取 JobParameters
中的日期,而后构建一个文件门路,并将文件门路放入到 ExecutionContext
中。此处为了简略,文件门路会在程序中写死,然而同时也会将文件门路存入到 ExecutionContext
中,并且在具体的某个 Step
中从 ExecutionContext
中获取门路。
留神: ExecutionContext
中存入的数据尽管在各个 Step
中都能够获取到,然而 不举荐
存入 比拟大
的数据到 ExecutionContext
中,因为这个对象的数据须要存入到数据库中。
2、各个 Step 如果获取到 ExecutionContext 中的值
- 类上退出
@StepScope
注解 - 通过
@Value("#{jobExecutionContext['importPath']}")
来获取
eg:
@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
// 读取数据
return new FlatFileItemReaderBuilder<Person>()
.name("read-csv-file")
.resource(new ClassPathResource(importPath))
.delimited().delimiter(",")
.names("username", "age", "sex")
.fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
.build();}
解释:在程序实例化 FlatFileItemReader
的时候,此时是没有 jobExecutionContext 的,那么就会报错,如果加上 @StepScope
,此时就没有问题了。@StepScope
示意达到 Step 阶段才实例化这个 Bean
3、FlatFileItemReader 应用留神
当咱们应用 FlatFileItemReader
来读取咱们的 csv 文件时,此处须要返回 FlatFileItemReader
类型,而不能间接返回ItemReader
,否则可能呈现如下谬误 Reader must be open before it can be read
4、实现步骤
1、导入依赖,配置
1、导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
</dependencies>
2、初始化 SpringBatch 数据库
spring.datasource.username=root
spring.datasource.password=root@1993
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 程序启动时,默认不执行 job
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
# 初始化 spring-batch 数据库脚本
spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql
2、构建文件读取门路
此处我的想法是,在 JobExecutionListener
中实现文件门路的获取,并将之放入到 ExecutionContext
,而后在各个Step
中就能够获取到文件门路的值了。
/**
* 在此监听器中,获取到具体的须要读取的文件门路,并保留到 ExecutionContext
*
* @author huan.fu
* @date 2022/8/30 - 22:22
*/
@Slf4j
public class AssemblyReadCsvPathListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {ExecutionContext executionContext = jobExecution.getExecutionContext();
JobParameters jobParameters = jobExecution.getJobParameters();
String importDate = jobParameters.getString("importDate");
log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);
String readCsvPath = "data/person.csv";
log.info("依据日期组装须要读取的 csv 门路为:[{}], 此处排除日期,间接写一个死的门路", readCsvPath);
executionContext.putString("importPath", readCsvPath);
}
@Override
public void afterJob(JobExecution jobExecution) {}}
3、构建 Tasklet,输入文件门路
@Slf4j
@Component
@StepScope
public class PrintImportFilePathTaskLet implements Tasklet {@Value("#{jobExecutionContext['importPath']}")
private String importFilePath;
@Value("#{jobParameters['importDate']}")
private String importDate;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {log.info("从 job parameter 中获取到的 importDate:[{}], 从 jobExecutionContext 中获取的 importPath:[{}]",
importDate, importFilePath);
return RepeatStatus.FINISHED;
}
}
须要留神的是,此类上退出了 @StepScope
注解
4、编写实体类
@AllArgsConstructor
@Getter
@ToString
public class Person {
/**
* 用户名
*/
private String username;
/**
* 年龄
*/
private Integer age;
/**
* 性别
*/
private String sex;
}
5、编写 Job 配置
@Configuration
@AllArgsConstructor
@Slf4j
public class ImportPersonJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final PrintImportFilePathTaskLet printImportFilePathTaskLet;
private final ItemReader<Person> readCsvItemReader;
@Bean
public Job importPersonJob() {
// 获取一个 job builder, jobName 能够是不存在的
return jobBuilderFactory.get("import-person-job")
// 增加 job execution 监听器
.listener(new AssemblyReadCsvPathListener())
// 打印 job parameters 和 ExecutionContext 中的值
.start(printParametersAndContextVariables())
// 读取 csv 的数据并解决
.next(handleCsvFileStep())
.build();}
/**
* 读取数据
* 留神:此处须要返回 FlatFileItemReader 类型,而不要返回 ItemReader
* 否则可能报如下异样 Reader must be open before it can be read
*
* @param importPath 文件门路
* @return reader
*/
@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
// 读取数据
return new FlatFileItemReaderBuilder<Person>()
.name("read-csv-file")
.resource(new ClassPathResource(importPath))
.delimited().delimiter(",")
.names("username", "age", "sex")
.fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
.build();}
@Bean
public Step handleCsvFileStep() {
// 每读取一条数据,交给这个解决
ItemProcessor<Person, Person> processor = item -> {if (item.getAge() > 25) {log.info("用户 [{}] 的年龄:[{}>25]不解决", item.getUsername(), item.getAge());
return null;
}
return item;
};
// 读取到了 chunk 大小的数据后,开始执行写入
ItemWriter<Person> itemWriter = items -> {log.info("开始写入数据");
for (Person item : items) {log.info("{}", item);
}
};
return stepBuilderFactory.get("handle-csv-file")
// 每读取 2 条数据,执行一次 write,当每 read 一条数据后,都会执行 process
.<Person, Person>chunk(2)
// 读取数据
.reader(readCsvItemReader)
// 读取一条数据就开始解决
.processor(processor)
// 当读取的数据的数量达到 chunk 时,调用该办法进行解决
.writer(itemWriter)
.build();}
/**
* 打印 job parameters 和 ExecutionContext 中的值
* <p>
* TaskletStep 是一个非常简单的接口,仅有一个办法——execute。* TaskletStep 会重复的调用这个办法直到获取一个 RepeatStatus.FINISHED 返回或者抛出一个异样。* 所有的 Tasklet 调用都会包装在一个事物中。*
* @return Step
*/
private Step printParametersAndContextVariables() {return stepBuilderFactory.get("print-context-params")
.tasklet(printImportFilePathTaskLet)
// 当 job 重启时,如果达到了 3 此,则该 step 不在执行
.startLimit(3)
// 当 job 重启时,如果该 step 的是曾经解决实现即 COMPLETED 状态时,下方给 false 示意该 step 不在重启,即不在执行
.allowStartIfComplete(false)
// 增加 step 监听
.listener(new CustomStepExecutionListener())
.build();}
}
6、编写 Job 启动类
@Component
@Slf4j
public class StartImportPersonJob {
@Autowired
private Job importPersonJob;
@Autowired
private JobLauncher jobLauncher;
@PostConstruct
public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {JobParameters jobParameters = new JobParametersBuilder()
.addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
.toJobParameters();
JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);
log.info("job invoked");
}
}
7、主动配置 SpringBatch
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchReadCsvApplication {public static void main(String[] args) {SpringApplication.run(SpringBatchReadCsvApplication.class, args);
}
}
次要是 @EnableBatchProcessing
注解
5、执行后果
6、残缺代码
https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv