共计 4214 个字符,预计需要花费 11 分钟才能阅读完成。
序
本文次要钻研一下 parallelStream 怎么应用自定义的线程池
ForkJoinPool
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService { | |
public ForkJoinPool(int parallelism, | |
ForkJoinWorkerThreadFactory factory, | |
UncaughtExceptionHandler handler, | |
boolean asyncMode) {this(checkParallelism(parallelism), | |
checkFactory(factory), | |
handler, | |
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, | |
"ForkJoinPool-" + nextPoolId() + "-worker-"); | |
checkPermission();} | |
private ForkJoinPool(int parallelism, | |
ForkJoinWorkerThreadFactory factory, | |
UncaughtExceptionHandler handler, | |
int mode, | |
String workerNamePrefix) { | |
this.workerNamePrefix = workerNamePrefix; | |
this.factory = factory; | |
this.ueh = handler; | |
this.config = (parallelism & SMASK) | mode; | |
long np = (long)(-parallelism); // offset ctl counts | |
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); | |
} | |
private static ForkJoinPool makeCommonPool() { | |
int parallelism = -1; | |
ForkJoinWorkerThreadFactory factory = null; | |
UncaughtExceptionHandler handler = null; | |
try { // ignore exceptions in accessing/parsing properties | |
String pp = System.getProperty | |
("java.util.concurrent.ForkJoinPool.common.parallelism"); | |
String fp = System.getProperty | |
("java.util.concurrent.ForkJoinPool.common.threadFactory"); | |
String hp = System.getProperty | |
("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); | |
if (pp != null) | |
parallelism = Integer.parseInt(pp); | |
if (fp != null) | |
factory = ((ForkJoinWorkerThreadFactory)ClassLoader. | |
getSystemClassLoader().loadClass(fp).newInstance()); | |
if (hp != null) | |
handler = ((UncaughtExceptionHandler)ClassLoader. | |
getSystemClassLoader().loadClass(hp).newInstance()); | |
} catch (Exception ignore) { } | |
if (factory == null) {if (System.getSecurityManager() == null) | |
factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); | |
else // use security-managed default | |
factory = new InnocuousForkJoinWorkerThreadFactory();} | |
if (parallelism < 0 && // default 1 less than #cores | |
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) | |
parallelism = 1; | |
if (parallelism > MAX_CAP) | |
parallelism = MAX_CAP; | |
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, | |
"ForkJoinPool.commonPool-worker-"); | |
} | |
} |
parallelStream 默认应用的是 common 的 ForkJoinPool,能够通过零碎属性来设置 parallelism 等
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean { | |
private boolean commonPool = false; | |
private int parallelism = Runtime.getRuntime().availableProcessors(); | |
private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; | |
@Nullable | |
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; | |
private boolean asyncMode = false; | |
private int awaitTerminationSeconds = 0; | |
@Nullable | |
private ForkJoinPool forkJoinPool; | |
//...... | |
@Override | |
public void destroy() {if (this.forkJoinPool != null) { | |
// Ignored for the common pool. | |
this.forkJoinPool.shutdown(); | |
// Wait for all tasks to terminate - works for the common pool as well. | |
if (this.awaitTerminationSeconds > 0) { | |
try {this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS); | |
} | |
catch (InterruptedException ex) {Thread.currentThread().interrupt();} | |
} | |
} | |
} | |
} |
spring3.1 提供了 ForkJoinPoolFactoryBean,能够用于创立并托管 forkJoinPool
示例
配置
@Configuration | |
public class ForkJoinConfig { | |
@Bean | |
public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() {ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean(); | |
factoryBean.setCommonPool(false); | |
// NOTE LIFO_QUEUE FOR working steal from tail of queue | |
factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE | |
factoryBean.setParallelism(10); | |
// factoryBean.setUncaughtExceptionHandler(); | |
factoryBean.setAwaitTerminationSeconds(60); | |
return factoryBean; | |
} | |
} |
应用
@Autowired | |
ForkJoinPoolFactoryBean forkJoinPoolFactoryBean; | |
public void streamParallel() throws ExecutionException, InterruptedException {List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() { | |
@Override | |
public List<TodoTask> call() throws Exception {return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> {log.info("thread:{}", Thread.currentThread().getName()); | |
return new TodoTask(i, "name"+i); | |
}).collect(Collectors.toList()); | |
} | |
}).get(); | |
result.stream().forEach(System.out::println); | |
} |
common 的 workerName 前缀为 ForkJoinPool.commonPool-worker-
自定义的 workerName 前缀默认为 ForkJoinPool- nextPoolId() -worker-
小结
parallelStream 默认应用的是 commonPool,是 static 代码块默认初始化,针对个别场景能够自定义 ForkJoinPool,将 parallelStream 作为一个工作丢进去,这样子就不会影响默认的 commonPool。
正文完