关于java:如何高效的使用并行流

30次阅读

共计 3813 个字符,预计需要花费 10 分钟才能阅读完成。

在 Java7 之前想要并行处理大量数据是很艰难的,首先把数据拆分成很多个局部,而后把这这些子局部放入到每个线程中去执行计算逻辑,最初在把每个线程返回的计算结果进行合并操作;在 Java7 中提供了一个解决大数据的 fork/join 框架,屏蔽掉了线程之间交互的解决,更加专一于数据的解决。


Fork/Join 框架

Fork/Join 框架采纳的是思维就是分而治之,把大的工作拆分成小的工作,而后放入到独立的线程中去计算,同时为了最大限度的利用多核 CPU,采纳了一个种 工作窃取 的算法来运行工作,也就是说当某个线程解决完本人工作队列中的工作后,尝试当其余线程的工作队列中窃取一个工作来执行,直到所有工作处理完毕。所以为了缩小线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行;在百度找了一张图

  • 应用RecursiveTask

应用 Fork/Join 框架首先须要创立本人的工作,须要继承RecursiveTask,实现形象办法

protected abstract V compute();

实现类须要在该办法中实现工作的拆分,计算,合并;伪代码能够示意成这样:

if(工作曾经不可拆分){return 程序计算结果;} else {
    1. 工作拆分成两个子工作
    2. 递归调用本办法,拆分子工作
    3. 期待子工作执行实现
    4. 合并子工作的后果
}
  • Fork/Join 实战

工作:实现对一亿个自然数求和

咱们先应用串行的形式实现,代码如下:

long result = LongStream.rangeClosed(1, 100000000)
                .reduce(0, Long::sum);
System.out.println("result:" + result);

应用 Fork/Join 框架实现,代码如下:

public class SumRecursiveTask extends RecursiveTask<Long> {private long[] numbers;
    private int start;
    private int end;

    public SumRecursiveTask(long[] numbers) {
        this.numbers = numbers;
        this.start = 0;
        this.end = numbers.length;
    }

    public SumRecursiveTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length < 20000) {  // 小于 20000 个就不在进行拆分
            return sum();}
        SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); // 进行工作拆分
        SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); // 进行工作拆分
        leftTask.fork(); // 把该子工作交友 ForkJoinPoll 线程池去执行
        rightTask.fork(); // 把该子工作交友 ForkJoinPoll 线程池去执行
        return leftTask.join() + rightTask.join(); // 把子工作的后果相加
    }


    private long sum() {
        int sum = 0;
        for (int i = start; i < end; i++) {sum += numbers[i];
        }
        return sum;
    }


    public static void main(String[] args) {long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();

        Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers));
        System.out.println("result:" +result);
    }
}

Fork/Join 默认的线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().available- Processors() 失去的。然而你能够通过零碎属性 java.util.concurrent.ForkJoinPool.common. parallelism 来扭转线程池大小,如下所示:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 这是一个全局设置,因而它将影响代码中所有的并行流。目前还无奈专为某个 并行流指定这个值。因为会影响到所有的并行流,所以在工作中经验防止网络 /IO 操作,否则可能会拖慢其余并行流的运行速度


parallelStream

以上咱们说到的都是在 Java7 中应用并行流的操作,Java8 并没有止步于此,为咱们提供更加便当的形式,那就是 parallelStreamparallelStream 底层还是通过 Fork/Join 框架来实现的。

  • 常见的应用形式

1. 串行流转化成并行流

LongStream.rangeClosed(1,1000)
                .parallel()
                .forEach(System.out::println);

2. 间接生成并行流

 List<Integer> values = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {values.add(i);
        }
        values.parallelStream()
                .forEach(System.out::println);
  • 正确的应用 parallelStream

咱们应用 parallelStream 来实现下面的累加例子看看成果,代码如下:

public static void main(String[] args) {Summer summer = new Summer();
    LongStream.rangeClosed(1, 100000000)
            .parallel()
            .forEach(summer::add);
    System.out.println("result:" + summer.sum);

}

static class Summer {
    public long sum = 0;

    public void add(long value) {sum += value;}
}

运行后果如下:

运行之后,咱们发现运行的后果不正确,并且每次运行的后果都不一样,这是为什么呢?
这里其实就是错用 parallelStream 常见的状况,parallelStream是非线程平安的,在这个外面中应用多个线程去批改了共享变量 sum, 执行了 sum += value 操作,这个操作自身是非原子性的,所以在应用并行流时应该防止去批改共享变量。

批改下面的例子,正确应用 parallelStream 来实现,代码如下:

long result = LongStream.rangeClosed(1, 100000000)
        .parallel()
        .reduce(0, Long::sum);
System.out.println("result:" + result);

在后面咱们曾经说过了 fork/join 的操作流程是:拆子局部,计算,合并后果;因为 parallelStream 底层应用的也是 fork/join 框架,所以这些步骤也是须要做的,然而从下面的代码,咱们看到 Long::sum 做了计算,reduce做了合并后果,咱们并没有去做工作的拆分,所以这个过程必定是 parallelStream 曾经帮咱们实现了,这个时候就必须的说说Spliterator

Spliterator是 Java8 退出的新接口,是为了并行执行工作而设计的。

public interface Spliterator<T> {boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();}

tryAdvance: 遍历所有的元素,如果还有能够遍历的就返回 ture,否则返回 false

trySplit: 对所有的元素进行拆分成小的子局部,如果曾经不能拆分就返回 null

estimateSize: 以后拆分外面还残余多少个元素

characteristics: 返回以后 Spliterator 个性集的编码


总结

  1. 要证实并行处理比程序解决效率高,只能通过测试,不能靠猜想(本文累加的例子在多台电脑上运行了屡次,也并不能证实采纳并行来解决累加就肯定比串行的快多少,所以只能通过多测试,环境不同可能后果就会不同)
  2. 数据量较少,并且计算逻辑简略,通常不倡议应用并行流
  3. 须要思考流的操作工夫耗费
  4. 在有些状况下须要本人去实现拆分的逻辑,并行流能力高效

感激大家能够急躁地读到这里。
当然,文中或者会存在或多或少的有余、谬误之处,有倡议或者意见也十分欢送大家在评论交换。
最初,心愿敌人们能够点赞评论关注三连,因为这些就是我分享的全副能源起源????

正文完
 0