本文主要研究一下Elasticsearch的MonitorService

MonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.java

public class MonitorService extends AbstractLifecycleComponent {    private final JvmGcMonitorService jvmGcMonitorService;    private final OsService osService;    private final ProcessService processService;    private final JvmService jvmService;    private final FsService fsService;    public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool,                          ClusterInfoService clusterInfoService) throws IOException {        this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);        this.osService = new OsService(settings);        this.processService = new ProcessService(settings);        this.jvmService = new JvmService(settings);        this.fsService = new FsService(settings, nodeEnvironment, clusterInfoService);    }    public OsService osService() {        return this.osService;    }    public ProcessService processService() {        return this.processService;    }    public JvmService jvmService() {        return this.jvmService;    }    public FsService fsService() {        return this.fsService;    }    @Override    protected void doStart() {        jvmGcMonitorService.start();    }    @Override    protected void doStop() {        jvmGcMonitorService.stop();    }    @Override    protected void doClose() {        jvmGcMonitorService.close();    }}
  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法

JvmGcMonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

public class JvmGcMonitorService extends AbstractLifecycleComponent {    private static final Logger logger = LogManager.getLogger(JvmGcMonitorService.class);    private final ThreadPool threadPool;    private final boolean enabled;    private final TimeValue interval;    private final Map<String, GcThreshold> gcThresholds;    private final GcOverheadThreshold gcOverheadThreshold;    private volatile Cancellable scheduledFuture;    public static final Setting<Boolean> ENABLED_SETTING =        Setting.boolSetting("monitor.jvm.gc.enabled", true, Property.NodeScope);    public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING =        Setting.timeSetting("monitor.jvm.gc.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1),            Property.NodeScope);    private static String GC_COLLECTOR_PREFIX = "monitor.jvm.gc.collector.";    public static final Setting<Settings> GC_SETTING = Setting.groupSetting(GC_COLLECTOR_PREFIX, Property.NodeScope);    public static final Setting<Integer> GC_OVERHEAD_WARN_SETTING =        Setting.intSetting("monitor.jvm.gc.overhead.warn", 50, 0, 100, Property.NodeScope);    public static final Setting<Integer> GC_OVERHEAD_INFO_SETTING =        Setting.intSetting("monitor.jvm.gc.overhead.info", 25, 0, 100, Property.NodeScope);    public static final Setting<Integer> GC_OVERHEAD_DEBUG_SETTING =        Setting.intSetting("monitor.jvm.gc.overhead.debug", 10, 0, 100, Property.NodeScope);    //......    public JvmGcMonitorService(Settings settings, ThreadPool threadPool) {        this.threadPool = threadPool;        this.enabled = ENABLED_SETTING.get(settings);        this.interval = REFRESH_INTERVAL_SETTING.get(settings);        Map<String, GcThreshold> gcThresholds = new HashMap<>();        Map<String, Settings> gcThresholdGroups = GC_SETTING.get(settings).getAsGroups();        for (Map.Entry<String, Settings> entry : gcThresholdGroups.entrySet()) {            String name = entry.getKey();            TimeValue warn = getValidThreshold(entry.getValue(), entry.getKey(), "warn");            TimeValue info = getValidThreshold(entry.getValue(), entry.getKey(), "info");            TimeValue debug = getValidThreshold(entry.getValue(), entry.getKey(), "debug");            gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis()));        }        gcThresholds.putIfAbsent(GcNames.YOUNG, new GcThreshold(GcNames.YOUNG, 1000, 700, 400));        gcThresholds.putIfAbsent(GcNames.OLD, new GcThreshold(GcNames.OLD, 10000, 5000, 2000));        gcThresholds.putIfAbsent("default", new GcThreshold("default", 10000, 5000, 2000));        this.gcThresholds = unmodifiableMap(gcThresholds);        if (GC_OVERHEAD_WARN_SETTING.get(settings) <= GC_OVERHEAD_INFO_SETTING.get(settings)) {            final String message =                String.format(                    Locale.ROOT,                    "[%s] must be greater than [%s] [%d] but was [%d]",                    GC_OVERHEAD_WARN_SETTING.getKey(),                    GC_OVERHEAD_INFO_SETTING.getKey(),                    GC_OVERHEAD_INFO_SETTING.get(settings),                    GC_OVERHEAD_WARN_SETTING.get(settings));            throw new IllegalArgumentException(message);        }        if (GC_OVERHEAD_INFO_SETTING.get(settings) <= GC_OVERHEAD_DEBUG_SETTING.get(settings)) {            final String message =                String.format(                    Locale.ROOT,                    "[%s] must be greater than [%s] [%d] but was [%d]",                    GC_OVERHEAD_INFO_SETTING.getKey(),                    GC_OVERHEAD_DEBUG_SETTING.getKey(),                    GC_OVERHEAD_DEBUG_SETTING.get(settings),                    GC_OVERHEAD_INFO_SETTING.get(settings));            throw new IllegalArgumentException(message);        }        this.gcOverheadThreshold = new GcOverheadThreshold(            GC_OVERHEAD_WARN_SETTING.get(settings),            GC_OVERHEAD_INFO_SETTING.get(settings),            GC_OVERHEAD_DEBUG_SETTING.get(settings));        logger.debug(            "enabled [{}], interval [{}], gc_threshold [{}], overhead [{}, {}, {}]",            this.enabled,            this.interval,            this.gcThresholds,            this.gcOverheadThreshold.warnThreshold,            this.gcOverheadThreshold.infoThreshold,            this.gcOverheadThreshold.debugThreshold);    }    @Override    protected void doStart() {        if (!enabled) {            return;        }        scheduledFuture = threadPool.scheduleWithFixedDelay(new JvmMonitor(gcThresholds, gcOverheadThreshold) {            @Override            void onMonitorFailure(Exception e) {                logger.debug("failed to monitor", e);            }            @Override            void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowGcEvent) {                logSlowGc(logger, threshold, seq, slowGcEvent, JvmGcMonitorService::buildPools);            }            @Override            void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) {                logGcOverhead(logger, threshold, current, elapsed, seq);            }        }, interval, Names.SAME);    }     @Override    protected void doStop() {        if (!enabled) {            return;        }        scheduledFuture.cancel();    }    @Override    protected void doClose() {    }    //......}
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务

JvmMonitor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

    abstract static class JvmMonitor implements Runnable {        enum Threshold { DEBUG, INFO, WARN }        static class SlowGcEvent {            final GarbageCollector currentGc;            final long collectionCount;            final TimeValue collectionTime;            final long elapsed;            final JvmStats lastJvmStats;            final JvmStats currentJvmStats;            final ByteSizeValue maxHeapUsed;            SlowGcEvent(                final GarbageCollector currentGc,                final long collectionCount,                final TimeValue collectionTime,                final long elapsed,                final JvmStats lastJvmStats,                final JvmStats currentJvmStats,                final ByteSizeValue maxHeapUsed) {                this.currentGc = currentGc;                this.collectionCount = collectionCount;                this.collectionTime = collectionTime;                this.elapsed = elapsed;                this.lastJvmStats = lastJvmStats;                this.currentJvmStats = currentJvmStats;                this.maxHeapUsed = maxHeapUsed;            }        }        private long lastTime = now();        private JvmStats lastJvmStats = jvmStats();        private long seq = 0;        private final Map<String, JvmGcMonitorService.GcThreshold> gcThresholds;        final GcOverheadThreshold gcOverheadThreshold;        JvmMonitor(final Map<String, GcThreshold> gcThresholds, final GcOverheadThreshold gcOverheadThreshold) {            this.gcThresholds = Objects.requireNonNull(gcThresholds);            this.gcOverheadThreshold = Objects.requireNonNull(gcOverheadThreshold);        }        @Override        public void run() {            try {                monitorGc();            } catch (Exception e) {                onMonitorFailure(e);            }        }        abstract void onMonitorFailure(Exception e);        synchronized void monitorGc() {            seq++;            final long currentTime = now();            JvmStats currentJvmStats = jvmStats();            final long elapsed = TimeUnit.NANOSECONDS.toMillis(currentTime - lastTime);            monitorSlowGc(currentJvmStats, elapsed);            monitorGcOverhead(currentJvmStats, elapsed);            lastTime = currentTime;            lastJvmStats = currentJvmStats;        }        final void monitorSlowGc(JvmStats currentJvmStats, long elapsed) {            for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) {                GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i];                GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i];                // no collection has happened                long collections = gc.getCollectionCount() - prevGc.getCollectionCount();                if (collections == 0) {                    continue;                }                long collectionTime = gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis();                if (collectionTime == 0) {                    continue;                }                GcThreshold gcThreshold = gcThresholds.get(gc.getName());                if (gcThreshold == null) {                    gcThreshold = gcThresholds.get("default");                }                long avgCollectionTime = collectionTime / collections;                Threshold threshold = null;                if (avgCollectionTime > gcThreshold.warnThreshold) {                    threshold = Threshold.WARN;                } else if (avgCollectionTime > gcThreshold.infoThreshold) {                    threshold = Threshold.INFO;                } else if (avgCollectionTime > gcThreshold.debugThreshold) {                    threshold = Threshold.DEBUG;                }                if (threshold != null) {                    onSlowGc(threshold, seq, new SlowGcEvent(                        gc,                        collections,                        TimeValue.timeValueMillis(collectionTime),                        elapsed,                        lastJvmStats,                        currentJvmStats,                        JvmInfo.jvmInfo().getMem().getHeapMax()));                }            }        }        final void monitorGcOverhead(final JvmStats currentJvmStats, final long elapsed) {            long current = 0;            for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) {                GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i];                GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i];                current += gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis();            }            checkGcOverhead(current, elapsed, seq);        }        void checkGcOverhead(final long current, final long elapsed, final long seq) {            final int fraction = (int) ((100 * current) / (double) elapsed);            Threshold overheadThreshold = null;            if (fraction >= gcOverheadThreshold.warnThreshold) {                overheadThreshold = Threshold.WARN;            } else if (fraction >= gcOverheadThreshold.infoThreshold) {                overheadThreshold = Threshold.INFO;            } else if (fraction >= gcOverheadThreshold.debugThreshold) {                overheadThreshold = Threshold.DEBUG;            }            if (overheadThreshold != null) {                onGcOverhead(overheadThreshold, current, elapsed, seq);            }        }        JvmStats jvmStats() {            return JvmStats.jvmStats();        }        long now() {            return System.nanoTime();        }        abstract void onSlowGc(Threshold threshold, long seq, SlowGcEvent slowGcEvent);        abstract void onGcOverhead(Threshold threshold, long total, long elapsed, long seq);    }
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现
  • monitorGc方法首先获取currentJvmStats,然后执行monitorSlowGc及monitorGcOverhead
  • monitorSlowGc方法主要是计算avgCollectionTime,然后判断是否超出指定level的阈值,超出则回调onSlowGc方法;monitorGcOverhead方法主要是计算gc耗时占比,如果判断是否超过指定level的阈值,超出则回调onGcOverhead方法

小结

  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现

doc

  • MonitorService