关于java:记录一次发送千万级别数量消息的定时任务优化

0次阅读

共计 4947 个字符,预计需要花费 13 分钟才能阅读完成。

业务场景

咱们每天都要对最近三个月内的沉闷用户进行批量营销、账单逾期计算等操作,用户数据大略是 800w。咱们的计划是发送一个 CUSTOMER_DAILY 音讯,而后订阅这个音讯再去别离发送批量营销、账单逾期等业务音讯。目前发送完 CUSTOMER_DAILY 音讯大概须要五个小时。

勿纠结当下

大家不用纠结当下为什么效率这么低 …… 因为零碎都是缓缓优化进去的嘛,以前的代码必定多少有些问题。或者再过一段时间咱们本人的代码也有很多问题,这都很失常。上面简略地贴一些我适当革新过的以后实现逻辑的代码来剖析以后计划存在的问题。

目前的计划

目前是采纳定时工作触发,线程池提交工作的形式,代码如下(行尾正文是我写的 example 值):

// 最大沉闷 id
long maxId = customerService.findMaxActiveId(beginTime); //20001
// 最小沉闷 id
long minId = customerService.findMinActiveId(beginTime); //1
// 查问的 id 最大可能总条数
long listSize = maxId - minId; //20000
// 开启的线程数
int runSize = 4;
// 均匀每次查问数目
long count = listSize / runSize; // 5000
// 创立一个线程池,外围线程数量和开启线程的数量一样
ExecutorService executor = CreateThreadUtil.createThread(runSize);
for (int i = 0; i <= runSize; i++) {
  // 计算 sql 语句中每个分页查问的起始和完结数据下标
  long min = minId + i * count; //1   ,  5001 ,  10001 , 15001
  long max = min + count;       //5001,  10001,  15001 , 20001
  executor.execute(() -> {List<Customer> customers = customerService.findByXxx(beginTime, min, max);
    customers.forEach(c -> {Message message = Message.build();// 省略结构音讯体
       messageService.save(message);
      applicationEventPublisher.publishEvent(new MessageSendEvent(message));
    });
  });
}

大家不必在意什么事务细节,因为这是我为了缩小代码简化的,看大抵实现逻辑即可。前面就是监听这个事件而后批改数据库音讯发送的状态,发送音讯

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void listen(MessageSendEvent event) {
  // 批改数据库音讯发送状态为:发送中
  // 执行发送音讯 SDK 的 API
  // 批改数据库音讯发送状态为:胜利
}

存在的问题

线程池资源可能未充分利用

仔细观察第一段代码的逻辑,通过查问最大最小的沉闷用户 id 来计算沉闷用户总数,这并不是精确数值。它可能存在这样一种状况

maxId = 20001,minId = 1
2~5000 都是非沉闷用户,5001~20000 是沉闷用户 

这样一来 for 循环中负责 1-5001 的那个线程其实只有一个用户工作须要解决,也就是说总共 4 个线程,1 个线程执行工作是 0.5 秒,其余三个线程可能要十几分钟。这样第一个线程的资源就被节约着了。看过我后面文章 学习 CompletableFuture 进阶之前先把握两种线程池 的都晓得 ForkJoinPool 有工作窃取机制,能够解决这个问题。

循环拜访数据库

因为咱们发送音讯须要入库记录,发送过程中又要批改两次状态(第一次是发送中、第二次是发送胜利),也就是说一条音讯会有三次数据库的 IO 操作,这样在大量循环下是一个最大的性能瓶颈。咱们能够估算一个阈值,批量地对该阈值的一组音讯用一个 batchInsert 只拜访一次数据库。

循环调用发送音讯 SDK 的 API

其实这个和循环拜访数据库是一个情理,失常的音讯队列都有批量发送音讯的性能,而不是只能一条音讯调用一个 SDK 的 API。

只有一台机器执行该工作

目前应用的定时工作是 xxl-job,路由策略是第一个,也就是说在 N 个服务实例上,只有一台实例会执行。这就相当于有 N-1 台实例在这个工作上是处于闲置状态的。咱们能够让 N 台机器一起来做这个事件,xxl-job 的分片播送能够满足。

总结下来能够优化的点

  • xxl-job 分片播送
  • ForkJoinPool
  • 批量拜访数据库
  • 批量发送音讯

入手优化

xxl-job 分片播送

分片播送的含意是触发对应集群中所有机器执行一次工作,同时零碎主动传递分片参数;简略来说就是每一台机器都会触发工作,同时每台机器会接到不同的参数,咱们能够依据这不同的参数去调配不同的利用实例解决不同的数据。

具体操作很简略,在 xxl-job 治理页面编辑工作路由策略为分片播送即可。

int shardIndex = XxlJobHelper.getShardIndex();// 以后分片 0/1/2/3
int shardTotal = XxlJobHelper.getShardTotal();// 总分片 4
List<Long> customerIds = customerService.findIdsByShard(request);// 依据分片查问该机器要解决的数据 

findIdsByShard 的实现其实是一个非常简单的 SQL

SELECT ID FROM t_customer WHERE MOD (ID, ${request.shardTotal} ) = ${request.shardIndex}

咱们将 ID 对总分片数 4 进行求余,一个数对 4 求余只有四个后果,0,1,2,3。这样一条 SQL 在所有机器上执行的数据后果就能瓜分要执行的总数据。

ForkJoinPool

假如下面每个利用实例拿到的 customerIds 数量是 200w,那么咱们当初应用 ForkJoinPool 对这 200w 数据进行分治。首先定义工作类

public class CustomerDailyTask extends RecursiveTask<Integer> {
  private final List<Long> customerIds; // 客户 id 汇合
  private final CustomerService customerService;
  public static final int THRESHOLD = 1000; // 拆分阈值 (这里须要本人屡次试验效率最高的阈值,目前我试了十几个值,1000 是最合适的)
  // 省略构造方法
  @Override
  protected Integer compute() {if (customerIds.size() <= THRESHOLD) {return customerService.sendDailyMessage(customerIds); 
    }
    int groupSize = (int) Math.ceil(customerIds.size() * 1.0 / 2); // 对半拆分
    List<List<Long>> partition = Lists.partition(customerIds, groupSize);
    CustomerDailyTask task1 = new CustomerDailyTask(partition.get(0), customerService);
    CustomerDailyTask task2 = new CustomerDailyTask(partition.get(1), customerService);
    invokeAll(task1, task2);
    return task1.join() + task2.join();
  }
}

初始化 ForkJoinPool 执行

CustomerDailyTask task = new CustomerDailyTask(customerIds, customerService);
int core = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(core - 1); // 留一个线程
pool.invoke(task);

批量拜访数据库

这里其实就简略了,上面这行代码中

return customerService.sendDailyMessage(customerIds);

依据传入的 customerIds 结构一个 List<Message> 应用 batchInsert 办法插入数据库,而后发送一个 Spring 本地事件 Spring 事件公布,之后在事件监听器中 batchUpdate 去更新状态,这里省略。

批量发送音讯

这个很简略间接调用批量发送的 SDK 即可,因为咱们用的 AWS SNS-SQS,SDK 版本比拟低,我还降级了个 SDK 版本 …… 这都是小问题,蛋疼的是 SDK 最多反对一次发送 10 条音讯,我间接无语 ……

我示意很纳闷,没方法那就拆吧

// SNS 限度一次最多发 10 条音讯
List<List<Message>> split = Lists.partition(list, 10);
List<CompletableFuture<PublishBatchResponse>> future = split.stream().map(item -> senderFactory.batchSend(item)).collect(Collectors.toList());
List<PublishBatchResponse> response = CompletableFutureUtil.allOfCompleted(future);
//... 更新数据库音讯状态为 SUCCESS

这里咱们应用 CompletableFuture 批量异步发送音讯,其实它外部用的线程池默认也是 ForkJoinPool,allOfCompleted() 实现如下

public static <T> List<T> allOfCompleted(List<CompletableFuture<T>> list) {CompletableFuture<Void> future = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));
  List<T> result = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
  future.thenApply(v -> list); // 阻塞主线程,执行完所有异步工作
  return result;
}

里面是 ForkJoinPool 拆分数据,每一个拆分的子单元外面又是一个 ForkJoinPool 来发消息 …… 我感觉机器配置如果很高的话这个设计方案就只有两个字 NB!

作者求教

大家看到了,目前我公司应用音讯队列的时候是要长久化音讯发送到数据库的,先在以后事务中插入一条音讯发送记录,状态记为 CREATED,应用 Spring 事件机制实现以后业务办法事务提交后执行音讯发送的 API,状态记为 PENDING,发送胜利后状态记为 SUCCESS,反之 FAILED。

这样做的益处是能 100% 保障音讯失落有迹可寻,每一条音讯的发送都有记录。并且不便后续音讯重试,或者重发,因为我这里记录了音讯体。

然而我感觉这样 emmm 拜访数据库的 IO 操作感觉有点节约,在面对高并发业务时比方秒杀零碎,感觉这样的实现是不可用的,因为数据库面临微小性能瓶颈。问了很多敌人他们公司是怎么做的,一半是入库记录,一半是不入库记录 …… 在此求教宽广网友,贵司是咋做的,请留言指教,谢谢!

结语

本篇文章剖析了一个大批量工作优化的计划,从 集群实例分担工作、正当应用线程资源、工作的分治、缩小数据库 IO、缩小 API 的调用 一步步优化出一个目前较为适合的解决计划。

正文完
 0