序本文主要研究一下storm的LoggingClusterMetricsConsumerLoggingClusterMetricsConsumerstorm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.javapublic class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class); static private String padding = " “; @Override public void prepare(Object registrationArgument) { } @Override public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t”, clusterInfo.getTimestamp(), “<cluster>”, “<cluster>”); sb.append(header); logDataPoints(dataPoints, sb, header); } @Override public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); String header = String.format("%d\t%15s\t%40s\t", supervisorInfo.getTimestamp(), supervisorInfo.getSrcSupervisorHost(), supervisorInfo.getSrcSupervisorId()); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } } @Override public void cleanup() { } private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) { for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.getName()) .append(padding).delete(header.length() + 23, sb.length()).append("\t") .append(p.getValue()); LOG.info(sb.toString()); } }}这个是cluster级别的metrics consumer,只能在storm.yaml里头配置它的handleDataPoints供ClusterMetricsConsumerExecutor回调这里handleDataPoints仅仅是打印到日志文件storm.yaml配置## Cluster Metrics Consumersstorm.cluster.metrics.consumer.register: - class: “org.apache.storm.metric.LoggingClusterMetricsConsumer”# - class: “com.example.demo.metric.FixedLoggingClusterMetricsConsumer”# argument:# - endpoint: “metrics-collector.mycompany.org”#storm.cluster.metrics.consumer.publish.interval.secs: 5这里指定了consumer类为LoggingClusterMetricsConsumer,同时指定了publish interval为5秒cluster.xml<?xml version=“1.0” encoding=“UTF-8”?><!– Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.–><configuration monitorInterval=“60” shutdownHook=“disable” packages=“org.apache.logging.log4j.core,io.sentry.log4j2”><properties> <property name=“pattern”>%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property> <property name=“patternMetrics”>%d %-8r %m%n</property></properties><appenders> <RollingFile name=“A1” immediateFlush=“false” fileName="${sys:storm.log.dir}/${sys:logfile.name}" filePattern="${sys:storm.log.dir}/${sys:logfile.name}.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size=“100 MB”/> <!– Or every 100 MB –> </Policies> <DefaultRolloverStrategy max=“9”/> </RollingFile> <RollingFile name=“WEB-ACCESS” immediateFlush=“false” fileName="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log" filePattern="${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size=“100 MB”/> <!– Or every 100 MB –> </Policies> <DefaultRolloverStrategy max=“9”/> </RollingFile> <RollingFile name=“THRIFT-ACCESS” immediateFlush=“false” fileName="${sys:storm.log.dir}/access-${sys:logfile.name}" filePattern="${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz"> <PatternLayout> <pattern>${pattern}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size=“100 MB”/> <!– Or every 100 MB –> </Policies> <DefaultRolloverStrategy max=“9”/> </RollingFile> <RollingFile name=“METRICS” fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics" filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz"> <PatternLayout> <pattern>${patternMetrics}</pattern> </PatternLayout> <Policies> <SizeBasedTriggeringPolicy size=“2 MB”/> </Policies> <DefaultRolloverStrategy max=“9”/> </RollingFile> <Syslog name=“syslog” format=“RFC5424” charset=“UTF-8” host=“localhost” port=“514” protocol=“UDP” appName="[${sys:daemon.name}]" mdcId=“mdc” includeMDC=“true” facility=“LOCAL5” enterpriseNumber=“18060” newLine=“true” exceptionPattern="%rEx{full}" messageId="[${sys:user.name}:S0]" id=“storm” immediateFlush=“true” immediateFail=“true”/></appenders><loggers> <Logger name=“org.apache.storm.logging.filters.AccessLoggingFilter” level=“info” additivity=“false”> <AppenderRef ref=“WEB-ACCESS”/> <AppenderRef ref=“syslog”/> </Logger> <Logger name=“org.apache.storm.logging.ThriftAccessLogger” level=“info” additivity=“false”> <AppenderRef ref=“THRIFT-ACCESS”/> <AppenderRef ref=“syslog”/> </Logger> <Logger name=“org.apache.storm.metric.LoggingClusterMetricsConsumer” level=“info” additivity=“false”> <appender-ref ref=“METRICS”/> </Logger> <root level=“info”> <!– We log everything –> <appender-ref ref=“A1”/> <appender-ref ref=“syslog”/> <appender-ref ref=“Sentry” level=“ERROR” /> </root></loggers></configuration>cluster.xml指定了metrics logging的相关配置,这里使用的是METRICS appender,该appender是一个RollingFile,文件名为&dollar;{sys:storm.log.dir}/&dollar;{sys:logfile.name}.metrics,例如nimbus默认的logfile.name为nimbus.log,supervisor默认的logfile.name为supervisor.log,因而写入的文件为nimbus.log.metrics及supervisor.log.metrics输出实例如下2018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> supervisors 12018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> topologies 02018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsTotal 42018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsUsed 02018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsFree 42018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> executorsTotal 02018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> tasksTotal 02018-11-06 07:51:51,496 18636 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsTotal 42018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsUsed 02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalMem 3072.02018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalCpu 400.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedMem 0.02018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedCpu 0.0ClusterMetricsConsumerExecutorstorm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.javapublic class ClusterMetricsConsumerExecutor { public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED = “Preparation of Cluster Metrics Consumer failed. " + “Please check your configuration and/or corresponding systems and relaunch Nimbus. " + “Skipping handle metrics.”; private IClusterMetricsConsumer metricsConsumer; private String consumerClassName; private Object registrationArgument; public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) { this.consumerClassName = consumerClassName; this.registrationArgument = registrationArgument; } public void prepare() { try { metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance(); metricsConsumer.prepare(registrationArgument); } catch (Exception e) { LOG.error(“Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name " + consumerClassName, e); if (metricsConsumer != null) { metricsConsumer.cleanup(); } metricsConsumer = null; } } public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(clusterInfo, dataPoints); } catch (Throwable e) { LOG.error(“Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) { if (metricsConsumer == null) { LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED); return; } try { metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); } catch (Throwable e) { LOG.error(“Error while handling cluster data points, consumer class: " + consumerClassName, e); } } public void cleanup() { if (metricsConsumer != null) { metricsConsumer.cleanup(); } }}ClusterMetricsConsumerExecutor在prepare的时候,根据consumerClassName来实例化IClusterMetricsConsumer的实现类,之后传入调用metricsConsumer.prepare(registrationArgument)做些准备ClusterMetricsConsumerExecutor的handleDataPoints方法实际上是代理了metricsConsumer的handleDataPoints该handleDataPoints方法有两个,他们都有共同的参数dataPoints,另外一个不同的参数,是一个传的是ClusterInfo,一个是SupervisorInfo,分别用于nimbus及supervisorNimbus.launchServerstorm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java public void launchServer() throws Exception { try { BlobStore store = blobStore; IStormClusterState state = stormClusterState; NimbusInfo hpi = nimbusHostPortInfo; LOG.info(“Starting Nimbus with conf {}”, ConfigUtils.maskPasswords(conf)); validator.prepare(conf); //…… timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), () -> { try { if (isLeader()) { sendClusterMetricsToExecutors(); } } catch (Exception e) { throw new RuntimeException(e); } }); timer.scheduleRecurring(5, 5, clusterMetricSet); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; } if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) { throw e; } LOG.error(“Error on initialization of nimbus”, e); Utils.exitProcess(13, “Error on initialization of nimbus”); } } private boolean isLeader() throws Exception { return leaderElector.isLeader(); }Nimbus的launchServer方法创建了一个定时任务,如果是leader节点,则调用sendClusterMetricsToExecutors方法发送相关metrics到metrics consumer定时任务的调度时间间隔为DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(storm.cluster.metrics.consumer.publish.interval.secs),在defaults.yaml文件中默认为60除了发送metrics到metrics consumer,它还有一个定时任务,每隔5秒调用一下ClusterSummaryMetricSet这个线程Nimbus.sendClusterMetricsToExecutorsstorm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java private void sendClusterMetricsToExecutors() throws Exception { ClusterInfo clusterInfo = mkClusterInfo(); ClusterSummary clusterSummary = getClusterInfoImpl(); List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary); Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary); for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) { consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics); for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) { consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue()); } } }nimbus的sendClusterMetricsToExecutors方法通过extractClusterMetrics及extractSupervisorMetrics提取相关metrics,然后调用consumerExecutor.handleDataPoints传递数据ClusterSummaryMetricSetstorm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java private class ClusterSummaryMetricSet implements Runnable { private static final int CACHING_WINDOW = 5; private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics(); private final Function<String, Histogram> registerHistogram = (name) -> { //This histogram reflects the data distribution across only one ClusterSummary, i.e., // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); clusterSummaryMetrics.put(name, histogram); return histogram; }; private volatile boolean active = false; //NImbus metrics distribution private final Histogram nimbusUptime = registerHistogram.apply(“nimbuses:uptime-secs”); //Supervisor metrics distribution private final Histogram supervisorsUptime = registerHistogram.apply(“supervisors:uptime-secs”); private final Histogram supervisorsNumWorkers = registerHistogram.apply(“supervisors:num-workers”); private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply(“supervisors:num-used-workers”); private final Histogram supervisorsUsedMem = registerHistogram.apply(“supervisors:used-mem”); private final Histogram supervisorsUsedCpu = registerHistogram.apply(“supervisors:used-cpu”); private final Histogram supervisorsFragmentedMem = registerHistogram.apply(“supervisors:fragmented-mem”); private final Histogram supervisorsFragmentedCpu = registerHistogram.apply(“supervisors:fragmented-cpu”); //Topology metrics distribution private final Histogram topologiesNumTasks = registerHistogram.apply(“topologies:num-tasks”); private final Histogram topologiesNumExecutors = registerHistogram.apply(“topologies:num-executors”); private final Histogram topologiesNumWorker = registerHistogram.apply(“topologies:num-workers”); private final Histogram topologiesUptime = registerHistogram.apply(“topologies:uptime-secs”); private final Histogram topologiesReplicationCount = registerHistogram.apply(“topologies:replication-count”); private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply(“topologies:requested-mem-on-heap”); private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply(“topologies:requested-mem-off-heap”); private final Histogram topologiesRequestedCpu = registerHistogram.apply(“topologies:requested-cpu”); private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply(“topologies:assigned-mem-on-heap”); private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply(“topologies:assigned-mem-off-heap”); private final Histogram topologiesAssignedCpu = registerHistogram.apply(“topologies:assigned-cpu”); private final StormMetricsRegistry metricsRegistry; /** * Constructor to put all items in ClusterSummary in MetricSet as a metric. * All metrics are derived from a cached ClusterSummary object, * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than * reporting interval to avoid outdated reporting. / ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) { this.metricsRegistry = metricsRegistry; //Break the code if out of sync to thrift protocol assert ClusterSummary._Fields.values().length == 3 && ClusterSummary._Fields.findByName(“supervisors”) == ClusterSummary._Fields.SUPERVISORS && ClusterSummary._Fields.findByName(“topologies”) == ClusterSummary._Fields.TOPOLOGIES && ClusterSummary._Fields.findByName(“nimbuses”) == ClusterSummary._Fields.NIMBUSES; final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { @Override protected ClusterSummary loadValue() { try { ClusterSummary newSummary = getClusterInfoImpl(); LOG.debug(“The new summary is {}”, newSummary); / * Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before * Histograms. Because DerivativeGauge will trigger cache refresh upon reporter’s query, histogram will also be * updated before query */ updateHistogram(newSummary); return newSummary; } catch (Exception e) { LOG.warn(“Get cluster info exception.”, e); throw new RuntimeException(e); } } }; clusterSummaryMetrics.put(“cluster:num-nimbus-leaders”, new DerivativeGauge<ClusterSummary, Long>(cachedSummary) { @Override protected Long transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses().stream() .filter(NimbusSummary::is_isLeader) .count(); } }); clusterSummaryMetrics.put(“cluster:num-nimbuses”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_nimbuses_size(); } }); clusterSummaryMetrics.put(“cluster:num-supervisors”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors_size(); } }); clusterSummaryMetrics.put(“cluster:num-topologies”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_topologies_size(); } }); clusterSummaryMetrics.put(“cluster:num-total-workers”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_workers) .sum(); } }); clusterSummaryMetrics.put(“cluster:num-total-used-workers”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { @Override protected Integer transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() .mapToInt(SupervisorSummary::get_num_used_workers) .sum(); } }); clusterSummaryMetrics.put(“cluster:total-fragmented-memory-non-negative”, new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0)) .sum(); } }); clusterSummaryMetrics.put(“cluster:total-fragmented-cpu-non-negative”, new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { @Override protected Double transform(ClusterSummary clusterSummary) { return clusterSummary.get_supervisors().stream() //Filtered negative value .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0)) .sum(); } }); } private void updateHistogram(ClusterSummary newSummary) { for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) { nimbusUptime.update(nimbusSummary.get_uptime_secs()); } for (SupervisorSummary summary : newSummary.get_supervisors()) { supervisorsUptime.update(summary.get_uptime_secs()); supervisorsNumWorkers.update(summary.get_num_workers()); supervisorsNumUsedWorkers.update(summary.get_num_used_workers()); supervisorsUsedMem.update(Math.round(summary.get_used_mem())); supervisorsUsedCpu.update(Math.round(summary.get_used_cpu())); supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem())); supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu())); } for (TopologySummary summary : newSummary.get_topologies()) { topologiesNumTasks.update(summary.get_num_tasks()); topologiesNumExecutors.update(summary.get_num_executors()); topologiesNumWorker.update(summary.get_num_workers()); topologiesUptime.update(summary.get_uptime_secs()); topologiesReplicationCount.update(summary.get_replication_count()); topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap())); topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap())); topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu())); topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap())); topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap())); topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu())); } } void setActive(final boolean active) { if (this.active != active) { this.active = active; if (active) { metricsRegistry.registerAll(clusterSummaryMetrics); } else { metricsRegistry.removeAll(clusterSummaryMetrics); } } } @Override public void run() { try { setActive(isLeader()); } catch (Exception e) { throw new RuntimeException(e); } } }这个线程主要是调用setActive方法,做的工作的话,就是不断判断节点状态变化,如果leader发生变化,自己是leader则注册clusterSummaryMetrics,如果自己变成不是leader则删除掉clusterSummaryMetricsclusterSummaryMetrics添加了cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative这几个指标小结LoggingClusterMetricsConsumer消费的是cluster级别的指标,它消费了指标数据,然后打印到日志文件,log4j2的配置读取的是cluster.xml,最后写入的文件是nimbus.log.metrics、supervisor.log.metricsNimbus在launchServer的时候,会建立一个定时任务,在当前节点是leader的情况下,定时发送metrics指标到ClusterMetricsConsumerExecutor,然后间接回调LoggingClusterMetricsConsumer的handleDataPoints方法,把数据打印到日志handleDataPoints处理两类info,一类是ClusterInfo,一类是SupervisorInfo;这里要注意一下定时任务是在当前节点是leader的情况下才会sendClusterMetricsToExecutors的,正常情况nimbus与supervisor不在同一个节点上,因而supervisor.log.metrics可能是空的docStorm Metrics