共计 8241 个字符,预计需要花费 21 分钟才能阅读完成。
序
本文主要研究一下 Elasticsearch 的 FixedExecutorBuilder
FixedExecutorBuilder
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java
public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBuilder.FixedExecutorSettings> { | |
private final Setting<Integer> sizeSetting; | |
private final Setting<Integer> queueSizeSetting; | |
/** | |
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. | |
* | |
* @param settings the node-level settings | |
* @param name the name of the executor | |
* @param size the fixed number of threads | |
* @param queueSize the size of the backing queue, -1 for unbounded | |
*/ | |
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {this(settings, name, size, queueSize, "thread_pool." + name); | |
} | |
/** | |
* Construct a fixed executor builder. | |
* | |
* @param settings the node-level settings | |
* @param name the name of the executor | |
* @param size the fixed number of threads | |
* @param queueSize the size of the backing queue, -1 for unbounded | |
* @param prefix the prefix for the settings keys | |
*/ | |
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {super(name); | |
final String sizeKey = settingsKey(prefix, "size"); | |
this.sizeSetting = | |
new Setting<>( | |
sizeKey, | |
s -> Integer.toString(size), | |
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey), | |
Setting.Property.NodeScope); | |
final String queueSizeKey = settingsKey(prefix, "queue_size"); | |
this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope); | |
} | |
@Override | |
public List<Setting<?>> getRegisteredSettings() {return Arrays.asList(sizeSetting, queueSizeSetting); | |
} | |
@Override | |
FixedExecutorSettings getSettings(Settings settings) {final String nodeName = Node.NODE_NAME_SETTING.get(settings); | |
final int size = sizeSetting.get(settings); | |
final int queueSize = queueSizeSetting.get(settings); | |
return new FixedExecutorSettings(nodeName, size, queueSize); | |
} | |
@Override | |
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) { | |
int size = settings.size; | |
int queueSize = settings.queueSize; | |
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); | |
final ExecutorService executor = | |
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext); | |
final ThreadPool.Info info = | |
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); | |
return new ThreadPool.ExecutorHolder(executor, info); | |
} | |
@Override | |
String formatInfo(ThreadPool.Info info) { | |
return String.format( | |
Locale.ROOT, | |
"name [%s], size [%d], queue size [%s]", | |
info.getName(), | |
info.getMax(), | |
info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); | |
} | |
static class FixedExecutorSettings extends ExecutorBuilder.ExecutorSettings { | |
private final int size; | |
private final int queueSize; | |
FixedExecutorSettings(final String nodeName, final int size, final int queueSize) {super(nodeName); | |
this.size = size; | |
this.queueSize = queueSize; | |
} | |
} | |
} |
- FixedExecutorBuilder 继承了 ExecutorBuilder,其 ExecutorSettings 类型为 FixedExecutorSettings,它定义 size 及 queueSize 两个属性;其 build 方法使用 EsExecutors.newFixed 创建 ExecutorService,同时还创建了 ThreadPool.Info,最后返回包含二者的 ThreadPool.ExecutorHolder
EsExecutors.newFixed
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
public class EsExecutors { | |
//...... | |
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, | |
ThreadFactory threadFactory, ThreadContext contextHolder) { | |
BlockingQueue<Runnable> queue; | |
if (queueCapacity < 0) {queue = ConcurrentCollections.newBlockingQueue(); | |
} else {queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity); | |
} | |
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, | |
queue, threadFactory, new EsAbortPolicy(), contextHolder); | |
} | |
//...... | |
} |
- EsExecutors.newFixed 判断 queueCapacity 是否小于 0,小于 0 则使用 ConcurrentCollections.newBlockingQueue() 创建 BlockingQueue,否则创建 SizeBlockingQueue;最后创建的是 EsThreadPoolExecutor
实例
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
public class ThreadPool implements Scheduler, Closeable { | |
//...... | |
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {assert Node.NODE_NAME_SETTING.exists(settings); | |
final Map<String, ExecutorBuilder> builders = new HashMap<>(); | |
final int availableProcessors = EsExecutors.numberOfProcessors(settings); | |
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); | |
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); | |
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); | |
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); | |
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200)); | |
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); | |
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); | |
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, | |
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); | |
builders.put(Names.SEARCH_THROTTLED, new AutoQueueAdjustingExecutorBuilder(settings, | |
Names.SEARCH_THROTTLED, 1, 100, 100, 100, 200)); | |
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); | |
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded | |
// the assumption here is that the listeners should be very lightweight on the listeners side | |
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); | |
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); | |
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); | |
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); | |
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); | |
builders.put(Names.FETCH_SHARD_STARTED, | |
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); | |
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); | |
builders.put(Names.FETCH_SHARD_STORE, | |
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); | |
for (final ExecutorBuilder<?> builder : customBuilders) {if (builders.containsKey(builder.name())) {throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); | |
} | |
builders.put(builder.name(), builder); | |
} | |
this.builders = Collections.unmodifiableMap(builders); | |
threadContext = new ThreadContext(settings); | |
final Map<String, ExecutorHolder> executors = new HashMap<>(); | |
for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); | |
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); | |
if (executors.containsKey(executorHolder.info.getName())) {throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); | |
} | |
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); | |
executors.put(entry.getKey(), executorHolder); | |
} | |
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); | |
this.executors = unmodifiableMap(executors); | |
final List<Info> infos = | |
executors | |
.values() | |
.stream() | |
.filter(holder -> holder.info.getName().equals("same") == false) | |
.map(holder -> holder.info) | |
.collect(Collectors.toList()); | |
this.threadPoolInfo = new ThreadPoolInfo(infos); | |
this.scheduler = Scheduler.initScheduler(settings); | |
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); | |
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); | |
this.cachedTimeThread.start();} | |
//...... | |
} |
- ThreadPool 的构造器给 Names.WRITE、Names.GET、Names.ANALYZE、Names.LISTENER、Names.FORCE_MERGE 分配了 FixedExecutorBuilder
小结
FixedExecutorBuilder 继承了 ExecutorBuilder,其 ExecutorSettings 类型为 FixedExecutorSettings,它定义 size 及 queueSize 两个属性;其 build 方法使用 EsExecutors.newFixed 创建 ExecutorService,同时还创建了 ThreadPool.Info,最后返回包含二者的 ThreadPool.ExecutorHolder
doc
- FixedExecutorBuilder
正文完
发表至:无分类
2019-06-15