关于java:Spring-Batch远程分区的本地Jar包模式

37次阅读

共计 5951 个字符,预计需要花费 15 分钟才能阅读完成。

1 前言

欢送拜访南瓜慢说 www.pkslow.com 获取更多精彩文章!

Spring 相干文章:Springboot-Cloud

Spring Batch近程分区对于大量数据的解决十分善于,它的实现有多种形式,如 本地 Jar 包模式MQ 模式Kubernetes 模式。这三种模式的如下:

(1)本地 Jar 包模式 :分区解决的worker 为一个 Java 过程,从jar 包启动,通过 jvm 参数和数据库传递参数;官网提供示例代码。

(2)MQ 模式 worker 是一个常驻过程,ManagerWorker 通过音讯队列来传递参数;网上有不少相干示例代码。

(3)Kubernetes 模式 workerK8s中的 PodManager 间接启动 Pod 来解决;网上并没有找到任何示例代码。

本文将通过代码来解说第一种模式(本地 Jar 包模式),其它后续再介绍。

倡议先看上面文章理解一下:

Spring Batch 入门:通过例子解说 Spring Batch 入门,优良的批处理框架

Spring Batch 并行处理介绍:大量数据也不在话下,Spring Batch 并行处理四种模式初探

2 代码解说

本文代码中,ManagerWorker 是放在一起的,在同一个我的项目里,也只会打一个 jar 包而已;咱们通过 profile 来区别是 manager 还是 worker,也就是通过Spring Profile 实现一份代码,两份逻辑。实际上也能够拆成两份代码,但放一起更不便测试,而且代码量不大,就没有必要了。

2.1 我的项目筹备

2.1.1 数据库

首先咱们须要筹备一个数据库,因为 ManagerWorker都须要同步状态到 DB 上,不能间接应用嵌入式的内存数据库了,须要一个内部可独特拜访的数据库。这里我应用的是H2 Database,装置可参考:把 H2 数据库从 jar 包部署到 Kubernetes,并解决 Ingress 不反对 TCP 的问题。

2.1.2 引入依赖

maven引入依赖如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-deployer-local</artifactId>
  <version>2.4.1</version>
</dependency>

<dependency>
  <groupId>org.springframework.batch</groupId>
  <artifactId>spring-batch-integration</artifactId>
</dependency>

spring-cloud-deployer-local用于部署和启动 worker,十分要害;其它就是Spring BatchTask相干的依赖;以及数据库连贯。

2.1.3 主类入口

Springboot的主类入口如下:

@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {public static void main(String[] args) {SpringApplication.run(PkslowRemotePartitionJar.class, args);
    }
}

Springboot 的根底上,增加了 Spring BatchSpring Cloud Task的反对。

2.2 要害代码编写

后面的数据库搭建和其它代码没有太多可讲的,接下来就开始要害代码的编写。

2.2.1 分区治理 Partitioner

Partitioner是近程分区中的外围 bean,它定义了分成多少个区、怎么分区,要把什么变量传递给worker。它会返回一组 < 分区名,执行上下文 > 的键值对,即返回Map<String, ExecutionContext>。把要传递给worker 的变量放在 ExecutionContext 中去,反对多种类型的变量,如 Stringintlong 等。实际上,咱们不倡议通过 ExecutionContext 来传递太多数据;能够传递一些标识或主键,而后 worker 本人去拿数据即可。

具体代码如下:

private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {return new Partitioner() {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

      for (int i = 0; i < GRID_SIZE; i++) {ExecutionContext executionContext = new ExecutionContext();
        executionContext.put("partitionNumber", i);
        partitions.put("partition" + i, executionContext);
      }

      return partitions;
    }
  };
}

下面分成 4 个区,程序会启动 4 个 worker 来解决;给 worker 传递的参数是partitionNumber

2.2.2 分区处理器 PartitionHandler

PartitionHandler也是外围的 bean,它决定了怎么去启动worker,给它们传递什么jvm 参数(跟之前的 ExecutionContext 传递不一样)。

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {Resource resource = this.resourceLoader.getResource(workerResource);

  DeployerPartitionHandler partitionHandler =
    new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);

  List<String> commandLineArgs = new ArrayList<>(3);
  commandLineArgs.add("--spring.profiles.active=worker");
  commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
  commandLineArgs.add("--spring.batch.initializer.enabled=false");

  partitionHandler
    .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
  partitionHandler
    .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
  partitionHandler.setMaxWorkers(2);
  partitionHandler.setApplicationName("PkslowWorkerJob");

  return partitionHandler;
}

下面代码中:

resourceworkerjar包地址,示意将启动该程序;

workerStepworker 将要执行的step

commandLineArgs定义了启动 workerjvm参数,如--spring.profiles.active=worker

environmentmanager 的零碎环境变量,能够传递给worker,当然也能够抉择不传递;

MaxWorkers是最多能同时启动多少个 worker,相似于线程池大小;设置为 2,示意最多同时有 2 个worker 来解决 4 个分区。

2.2.3 Manager 和 Worker 的 Batch 定义

实现了分区相干的代码,剩下的就只是如何定义 ManagerWorker的业务代码了。

Manager作为管理者,不必太多业务逻辑,代码如下:

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {Random random = new Random();
  return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
    .start(step1(partitionHandler))
    .build();}

@Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {return this.stepBuilderFactory.get("step1")
    .partitioner(workerStep().getName(), partitioner())
    .step(workerStep())
    .partitionHandler(partitionHandler)
    .build();}

Worker次要作用是解决数据,是咱们的业务代码,这里就演示一下如何获取 Manager 传递过去的partitionNumber

@Bean
public Step workerStep() {return this.stepBuilderFactory.get("workerStep")
    .tasklet(workerTasklet(null, null))
    .build();}

@Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {return new Tasklet() {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {Thread.sleep(6000); // 减少延时,查看成果,通过 jps:在 jar 状况下会新起 java 过程
      System.out.println("This tasklet ran partition:" + partitionNumber);
     
      return RepeatStatus.FINISHED;
    }
  };
}

通过表达式 @Value("#{stepExecutionContext['partitionNumber']}") 获取 Manager 传递过去的变量;留神要加注解@StepScope

3 程序运行

因为咱们分为 ManagerWorker,但都是同一份代码,所以咱们先打包一个 jar 进去,不然 manager 无奈启动。配置数据库和 Workerjar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h2.Driver

pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

执行程序如下:

能够看到启动了 4 次 Java 程序,还给出日志门路。

通过 jps 命令查看,能看到一个 Manager 过程,还有两个 worker 过程:

4 简单变量传递

后面讲了 Manager 能够通过 ExecutionContext 传递变量,如简略的 Stringlong 等。但其实它也是能够传递简单的 Java 对象的,但对应的类须要可序列化,如:

import java.io.Serializable;

public class Person implements Serializable {
    private Integer age;
    private String name;
    private String webSite;
  //getter and setter
}

Manager传递:

executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));

Worker接管:

@Value("#{stepExecutionContext['person']}") Person person

5 总结

本文介绍了 Spring Batch 近程分区的 本地 Jar 包模式,只能在一台机器上运行,所以也是无奈真正施展出近程分区的作用。但它对咱们后续了解更简单的模式是有很大帮忙的;同时,咱们也能够应用本地模式进行开发测试,毕竟它只须要一个数据库就行了,依赖很少。


欢送关注微信公众号 <南瓜慢说>,将继续为你更新 …

多读书,多分享;多写作,多整顿。

正文完
 0