自定义-ForkJoinPool-提升并行流-ParallelStream-执行速度

35次阅读

共计 2163 个字符,预计需要花费 6 分钟才能阅读完成。

简介

    在 java8 中 添加了流 Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。
     
    下面我们看看 2 个简单的示例:

示例 1(list)

Arrays.asList(1,2,3,4,5,6)
    .parallelStream()
    .forEach((value) -> {String name = Thread.currentThread().getName();
        System.out.println("示例 1 Thread:" + name + "value:" + value);
    });

示例 2(array)

Stream.of(1,2,3,4,5,6)
    .parallel()
    .forEach((value) -> {String name = Thread.currentThread().getName();
        System.out.println("示例 2 Thread:" + name + "value:" + value);
    });

问题引出

    笔者最近在做一些爬虫相关的业务,其核心工具已开源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http,经过 2 个版本的迭代已经发展成了一个强大非账号爬虫利器,赶紧来试试吧。

    我们采集了大量的代理 ip 用来供爬虫使用,其中有个定时任务每 5 分钟去检测代理是否失效,代理 ip 检测比较费时,我们给每个检测的请求
设定了 2s 的超时,这样单线程的话 1000 个 ip 就得消耗半个多小时,当然笔者在校验的时候采用的 parallel Stream 简化开发。

    然后发现效果并不明显,代理 ip 数量上来之后 5 分钟完全检测不完,导致任务堆积。明明用了并发流为什么没有明显的提高执行速度呢?

    下面我们来看看刚刚的“示例”打印出的信息:

示例 1 Thread:main value:4
示例 1 Thread:ForkJoinPool.commonPool-worker-2 value:1
示例 1 Thread:main value:6
示例 1 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例 1 Thread:main value:3
示例 1 Thread:ForkJoinPool.commonPool-worker-1 value:2
示例 2 Thread:main value:4
示例 2 Thread:ForkJoinPool.commonPool-worker-3 value:3
示例 2 Thread:ForkJoinPool.commonPool-worker-2 value:5
示例 2 Thread:ForkJoinPool.commonPool-worker-4 value:1
示例 2 Thread:ForkJoinPool.commonPool-worker-5 value:2
示例 2 Thread:ForkJoinPool.commonPool-worker-1 value:6

我们可以看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样我们就算使用了 Parallel Stream,
整个 jvm 共用一个 common pool 线程池,一不小心就任务堆积了,在校验代理 ip 的时候我们还有采集代理等其他的任务中也大量使用了并发流,
这样也就印证了为什么会任务堆积了。

解决问题

    使用自定义 ForkJoinPool 执行速度。示例代码如下:

// 示例:自定义线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

// 这里是从数据库里查出来的一批代理 ip
List<ProxyList> records = new ArrayList<>();

// 找出失效的代理 ip
List<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()
    .map(ProxyList::getIpPort)
    .filter(IProxyListTask::isFailed)
    .collect(Collectors.toList())
).join();

// 删除失效的代理

    整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池之后,执行速度有了明显的提升。以前 5 分钟执行不完的任务现在 2 分钟之内就能全部执行完毕。

结论

    java8 的并发流在大批量数据处理时可简化多线程的使用,在遇到耗时业务或者重度使用并发流不妨根据业务情况采用自定义线程池来提示处理速度。

开源推荐

  • Spring boot 微服务高效开发 mica 工具集:https://gitee.com/596392912/mica
  • pig 宇宙最强微服务(架构师必备):https://gitee.com/log4j/pig
  • SpringBlade 完整的线上解决方案(企业开发必备):https://gitee.com/smallc/SpringBlade
  • 加入【如梦技术】Spring QQ 群:479710041,了解更多。

关注我们

扫描上面二维码,更多 精彩内容 每天推荐!

正文完
 0