乐趣区

关于java:关于聚合和多线程的处理套路

概述

无差别地申请多个内部接口并聚合所有申请后果,应该有属于它本人的套路,应该将所有多线程的操作屏蔽之,咱们只关怀参数和后果。因而,应该摈弃 Callable/FutureTask/Future 等这些手工模式,这些代码应该交给框架来实现。

手工模式

何为手工模式,咱们以 Callable 为例设计申请内部的接口,可能像上面这样子,参数是 NumberParam,两个内部接口别离是 IntToStringCallable 和 DoubleToStringCallable,

    class IntToStringCallable implements Callable<String> {
        private final NumberParam param;

        IntToStringCallable(NumberParam numberParam) {this.param = numberParam;}
        @Override
        public String call() {return Integer.toHexString(param.getAge());
        }
    }

    class DoubleToStringCallable implements Callable<String> {

        private final NumberParam param;

        DoubleToStringCallable(NumberParam numberParam) {this.param = numberParam;}
        @Override
        public String call() {return Double.toHexString(param.getMoney());
        }
    }

如果采纳 FutureTask 的形式多线程执行这两个接口,可能是这样子的,

        FutureTask<String> r1 = new FutureTask<>(new IntToStringCallable(numberParam));
        new Thread(r1).start();
        FutureTask<String> r2 = new FutureTask<>(new DoubleToStringCallable(numberParam));
        new Thread(r2).start();
        try {List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {}

须要首先结构 FutureTask,而后应用 Thread 比拟原始的 api 去执行,当然还能够再简化一下,比方应用 Future 形式,

        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        Future<String> r1 = threadPool.submit(new IntToStringCallable(numberParam));
        Future<String> r2 = threadPool.submit(new DoubleToStringCallable(numberParam));
        try {List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {}

我置信这是一种广泛常见的做法了。这里没有必要持续评论这些做法的问题了。

Java 8 之后

Java 8 之后有了更加不便的异步编程形式了,不必再辛苦地去写 Callable 的,一句话就能够表白 Callable+FutureTask/…,

CompletableFuture<String> pf = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());

革新之前的做法后果可能就是这个样子了,

        CompletableFuture<String> r1 = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call());
        CompletableFuture<String> r2 = CompletableFuture.supplyAsync(() -> new DoubleToStringCallable(numberParam).call());
        try {List<String> ret = new ArrayList<>();
            ret.add(r1.get());
            ret.add(r2.get());
            log.info("ret=" + ret);
        } catch (Exception ignore) {}

其实能够看进去,这个时候咱们不肯定须要一个 Callable 了,提供异步的能力是 supplyAsync 来实现的,咱们只须要失常的入参出参的一般办法就能够了。

Java 8 之后再之后

Java 8 之后的异步编程形式的确简略了很多,然而在咱们的业务代码中还是呈现了和异步编程相干的无关业务逻辑的事件,可否持续简化呢。本案的设计灵感来自同样 Java 8 的优良设计——ParallelStream,举个简略的例子,

Arrays.asList("a", "b", "c").parallelStream().map(String::toUpperCase).collect(Collectors.toList());

异步及多线程是 ParallelStream 来实现的,用户只须要实现 String::toUpperCase 局部。

本案的设计次要有三个 interface 来实现,别离是,

public interface MyProvider<T,V> {T provide(V v);
}
public interface MyCollector<T> {void collectList(T t);

    List<T> retList();}
public interface MyStream<T,V> {List<T> toList(List<MyProvider<T,V>> providers, V v);
}

其实 MyProvider 表白是申请内部接口,MyStream 示意一种相似 ParallelStream 的思维,一种内化异步多线程的操作模式,MyCollector 属于外部设计 api 能够不裸露给用户;
一个改写下面的例子的例子,

    @Test
    public void testStream() {MyProvider<String,NumberParam> p1 = new IntToStringProvider();
        MyProvider<String,NumberParam> p2 = new DoubleToStringProvider();
        List<MyProvider<String, NumberParam>> providers = Arrays.asList(p1, p2);
        MyStream<String, NumberParam> myStream = new CollectStringStream();

        List<String> strings = myStream.toList(providers, numberParam);
        log.info("ret=" + strings);
    }

在这个办法内一点异步编程的内容都没有的,用户只须要编程本人关怀的逻辑即可,当然是要依照 Provider 的思路去写,这或者有一点心智累赘。
这个 CollectStringStream 帮咱们实现来一些脏活累活,

    public List<String> toList(List<MyProvider<String, NumberParam>> myProviders, NumberParam param) {MyCollector<String> myCollector = new NoMeaningCollector();
        List<CompletableFuture<Void>> pfs = new ArrayList<>(myProviders.size());
        for (MyProvider<String, NumberParam> provider : myProviders) {CompletableFuture<Void> pf = CompletableFuture.runAsync(() -> myCollector.collectList(provider.provide(param)), executor);
            pfs.add(pf);
        }
        try {CompletableFuture.allOf(pfs.toArray(new CompletableFuture[0])).get(3, TimeUnit.SECONDS);
        } catch (Exception e) {if (e instanceof TimeoutException) {
                pfs.forEach(p -> {if (!p.isDone()){p.cancel(true);
                    }});
            }
        }
        return myCollector.retList();}

这样看起这个设计又不美了,然而如果有更多的内部接口须要调用,CollectStringStream 就显得很有价值了,新退出再多的申请内部接口要改变的代码很少很少,所以这种思维我感觉是值得推广的。

总结

照例附上参考代码,不过值得思考的是咱们如何像优良的代码学习并使用到本人的我的项目中。
参考代码,java-toy

退出移动版