乐趣区

聊聊storm的LoggingClusterMetricsConsumer


本文主要研究一下 storm 的 LoggingClusterMetricsConsumer
LoggingClusterMetricsConsumer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);

static private String padding = ” “;

@Override
public void prepare(Object registrationArgument) {
}

@Override
public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
StringBuilder sb = new StringBuilder();
String header = String.format(“%d\t%15s\t%40s\t”,
clusterInfo.getTimestamp(), “<cluster>”, “<cluster>”);
sb.append(header);
logDataPoints(dataPoints, sb, header);
}

@Override
public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
StringBuilder sb = new StringBuilder();
String header = String.format(“%d\t%15s\t%40s\t”,
supervisorInfo.getTimestamp(),
supervisorInfo.getSrcSupervisorHost(),
supervisorInfo.getSrcSupervisorId());
sb.append(header);
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.getName())
.append(padding).delete(header.length() + 23, sb.length()).append(“\t”)
.append(p.getValue());
LOG.info(sb.toString());
}
}

@Override
public void cleanup() {
}

private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) {
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.getName())
.append(padding).delete(header.length() + 23, sb.length()).append(“\t”)
.append(p.getValue());
LOG.info(sb.toString());
}
}
}

这个是 cluster 级别的 metrics consumer,只能在 storm.yaml 里头配置
它的 handleDataPoints 供 ClusterMetricsConsumerExecutor 回调
这里 handleDataPoints 仅仅是打印到日志文件

storm.yaml 配置
## Cluster Metrics Consumers
storm.cluster.metrics.consumer.register:
– class: “org.apache.storm.metric.LoggingClusterMetricsConsumer”
# – class: “com.example.demo.metric.FixedLoggingClusterMetricsConsumer”
# argument:
# – endpoint: “metrics-collector.mycompany.org”
#
storm.cluster.metrics.consumer.publish.interval.secs: 5
这里指定了 consumer 类为 LoggingClusterMetricsConsumer,同时指定了 publish interval 为 5 秒
cluster.xml
<?xml version=”1.0″ encoding=”UTF-8″?>
<!–
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the “License”); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
–>

<configuration monitorInterval=”60″ shutdownHook=”disable” packages=”org.apache.logging.log4j.core,io.sentry.log4j2″>
<properties>
<property name=”pattern”>%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property>
<property name=”patternMetrics”>%d %-8r %m%n</property>
</properties>
<appenders>
<RollingFile name=”A1″ immediateFlush=”false”
fileName=”${sys:storm.log.dir}/${sys:logfile.name}”
filePattern=”${sys:storm.log.dir}/${sys:logfile.name}.%i.gz”>
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size=”100 MB”/> <!– Or every 100 MB –>
</Policies>
<DefaultRolloverStrategy max=”9″/>
</RollingFile>
<RollingFile name=”WEB-ACCESS” immediateFlush=”false”
fileName=”${sys:storm.log.dir}/access-web-${sys:daemon.name}.log”
filePattern=”${sys:storm.log.dir}/access-web-${sys:daemon.name}.log.%i.gz”>
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size=”100 MB”/> <!– Or every 100 MB –>
</Policies>
<DefaultRolloverStrategy max=”9″/>
</RollingFile>
<RollingFile name=”THRIFT-ACCESS” immediateFlush=”false”
fileName=”${sys:storm.log.dir}/access-${sys:logfile.name}”
filePattern=”${sys:storm.log.dir}/access-${sys:logfile.name}.%i.gz”>
<PatternLayout>
<pattern>${pattern}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size=”100 MB”/> <!– Or every 100 MB –>
</Policies>
<DefaultRolloverStrategy max=”9″/>
</RollingFile>
<RollingFile name=”METRICS”
fileName=”${sys:storm.log.dir}/${sys:logfile.name}.metrics”
filePattern=”${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz”>
<PatternLayout>
<pattern>${patternMetrics}</pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size=”2 MB”/>
</Policies>
<DefaultRolloverStrategy max=”9″/>
</RollingFile>
<Syslog name=”syslog” format=”RFC5424″ charset=”UTF-8″ host=”localhost” port=”514″
protocol=”UDP” appName=”[${sys:daemon.name}]” mdcId=”mdc” includeMDC=”true”
facility=”LOCAL5″ enterpriseNumber=”18060″ newLine=”true” exceptionPattern=”%rEx{full}”
messageId=”[${sys:user.name}:S0]” id=”storm” immediateFlush=”true” immediateFail=”true”/>
</appenders>
<loggers>

<Logger name=”org.apache.storm.logging.filters.AccessLoggingFilter” level=”info” additivity=”false”>
<AppenderRef ref=”WEB-ACCESS”/>
<AppenderRef ref=”syslog”/>
</Logger>
<Logger name=”org.apache.storm.logging.ThriftAccessLogger” level=”info” additivity=”false”>
<AppenderRef ref=”THRIFT-ACCESS”/>
<AppenderRef ref=”syslog”/>
</Logger>
<Logger name=”org.apache.storm.metric.LoggingClusterMetricsConsumer” level=”info” additivity=”false”>
<appender-ref ref=”METRICS”/>
</Logger>
<root level=”info”> <!– We log everything –>
<appender-ref ref=”A1″/>
<appender-ref ref=”syslog”/>
<appender-ref ref=”Sentry” level=”ERROR” />
</root>
</loggers>
</configuration>

cluster.xml 指定了 metrics logging 的相关配置,这里使用的是 METRICS appender,该 appender 是一个 RollingFile,文件名为 &dollar;{sys:storm.log.dir}/&dollar;{sys:logfile.name}.metrics,例如 nimbus 默认的 logfile.name 为 nimbus.log,supervisor 默认的 logfile.name 为 supervisor.log,因而写入的文件为 nimbus.log.metrics 及 supervisor.log.metrics
输出实例如下

2018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> supervisors 1
2018-11-06 07:51:51,488 18628 1541490711 <cluster> <cluster> topologies 0
2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsTotal 4
2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsUsed 0
2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> slotsFree 4
2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> executorsTotal 0
2018-11-06 07:51:51,489 18629 1541490711 <cluster> <cluster> tasksTotal 0
2018-11-06 07:51:51,496 18636 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsTotal 4
2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 slotsUsed 0
2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalMem 3072.0
2018-11-06 07:51:51,497 18637 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 totalCpu 400.0
2018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedMem 0.0
2018-11-06 07:51:51,498 18638 1541490711 192.168.99.100 5bbd576d-218c-4365-ac5e-865b1f6e9b29 usedCpu 0.0
ClusterMetricsConsumerExecutor
storm-2.0.0/storm-server/src/main/java/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
public class ClusterMetricsConsumerExecutor {
public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
private static final String ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED =
“Preparation of Cluster Metrics Consumer failed. ” +
“Please check your configuration and/or corresponding systems and relaunch Nimbus. ” +
“Skipping handle metrics.”;

private IClusterMetricsConsumer metricsConsumer;
private String consumerClassName;
private Object registrationArgument;

public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {
this.consumerClassName = consumerClassName;
this.registrationArgument = registrationArgument;
}

public void prepare() {
try {
metricsConsumer = (IClusterMetricsConsumer) Class.forName(consumerClassName).newInstance();
metricsConsumer.prepare(registrationArgument);
} catch (Exception e) {
LOG.error(“Could not instantiate or prepare Cluster Metrics Consumer with fully qualified name ” +
consumerClassName, e);

if (metricsConsumer != null) {
metricsConsumer.cleanup();
}
metricsConsumer = null;
}
}

public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) {
if (metricsConsumer == null) {
LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
return;
}

try {
metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
} catch (Throwable e) {
LOG.error(“Error while handling cluster data points, consumer class: ” + consumerClassName, e);
}
}

public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) {
if (metricsConsumer == null) {
LOG.error(ERROR_MESSAGE_PREPARATION_CLUSTER_METRICS_CONSUMER_FAILED);
return;
}

try {
metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
} catch (Throwable e) {
LOG.error(“Error while handling cluster data points, consumer class: ” + consumerClassName, e);
}
}

public void cleanup() {
if (metricsConsumer != null) {
metricsConsumer.cleanup();
}
}
}

ClusterMetricsConsumerExecutor 在 prepare 的时候,根据 consumerClassName 来实例化 IClusterMetricsConsumer 的实现类,之后传入调用 metricsConsumer.prepare(registrationArgument) 做些准备
ClusterMetricsConsumerExecutor 的 handleDataPoints 方法实际上是代理了 metricsConsumer 的 handleDataPoints
该 handleDataPoints 方法有两个,他们都有共同的参数 dataPoints,另外一个不同的参数,是一个传的是 ClusterInfo,一个是 SupervisorInfo,分别用于 nimbus 及 supervisor

Nimbus.launchServer
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
public void launchServer() throws Exception {
try {
BlobStore store = blobStore;
IStormClusterState state = stormClusterState;
NimbusInfo hpi = nimbusHostPortInfo;

LOG.info(“Starting Nimbus with conf {}”, ConfigUtils.maskPasswords(conf));
validator.prepare(conf);

//……

timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
() -> {
try {
if (isLeader()) {
sendClusterMetricsToExecutors();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});

timer.scheduleRecurring(5, 5, clusterMetricSet);
} catch (Exception e) {
if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
throw e;
}

if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
throw e;
}
LOG.error(“Error on initialization of nimbus”, e);
Utils.exitProcess(13, “Error on initialization of nimbus”);
}
}

private boolean isLeader() throws Exception {
return leaderElector.isLeader();
}

Nimbus 的 launchServer 方法创建了一个定时任务,如果是 leader 节点,则调用 sendClusterMetricsToExecutors 方法发送相关 metrics 到 metrics consumer
定时任务的调度时间间隔为 DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS(storm.cluster.metrics.consumer.publish.interval.secs),在 defaults.yaml 文件中默认为 60
除了发送 metrics 到 metrics consumer,它还有一个定时任务,每隔 5 秒调用一下 ClusterSummaryMetricSet 这个线程

Nimbus.sendClusterMetricsToExecutors
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private void sendClusterMetricsToExecutors() throws Exception {
ClusterInfo clusterInfo = mkClusterInfo();
ClusterSummary clusterSummary = getClusterInfoImpl();
List<DataPoint> clusterMetrics = extractClusterMetrics(clusterSummary);
Map<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> supervisorMetrics = extractSupervisorMetrics(clusterSummary);
for (ClusterMetricsConsumerExecutor consumerExecutor : clusterConsumerExceutors) {
consumerExecutor.handleDataPoints(clusterInfo, clusterMetrics);
for (Entry<IClusterMetricsConsumer.SupervisorInfo, List<DataPoint>> entry : supervisorMetrics.entrySet()) {
consumerExecutor.handleDataPoints(entry.getKey(), entry.getValue());
}
}
}
nimbus 的 sendClusterMetricsToExecutors 方法通过 extractClusterMetrics 及 extractSupervisorMetrics 提取相关 metrics,然后调用 consumerExecutor.handleDataPoints 传递数据
ClusterSummaryMetricSet
storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
private class ClusterSummaryMetricSet implements Runnable {
private static final int CACHING_WINDOW = 5;

private final ClusterSummaryMetrics clusterSummaryMetrics = new ClusterSummaryMetrics();

private final Function<String, Histogram> registerHistogram = (name) -> {
//This histogram reflects the data distribution across only one ClusterSummary, i.e.,
// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment.
// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update
final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
clusterSummaryMetrics.put(name, histogram);
return histogram;
};
private volatile boolean active = false;

//NImbus metrics distribution
private final Histogram nimbusUptime = registerHistogram.apply(“nimbuses:uptime-secs”);

//Supervisor metrics distribution
private final Histogram supervisorsUptime = registerHistogram.apply(“supervisors:uptime-secs”);
private final Histogram supervisorsNumWorkers = registerHistogram.apply(“supervisors:num-workers”);
private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply(“supervisors:num-used-workers”);
private final Histogram supervisorsUsedMem = registerHistogram.apply(“supervisors:used-mem”);
private final Histogram supervisorsUsedCpu = registerHistogram.apply(“supervisors:used-cpu”);
private final Histogram supervisorsFragmentedMem = registerHistogram.apply(“supervisors:fragmented-mem”);
private final Histogram supervisorsFragmentedCpu = registerHistogram.apply(“supervisors:fragmented-cpu”);

//Topology metrics distribution
private final Histogram topologiesNumTasks = registerHistogram.apply(“topologies:num-tasks”);
private final Histogram topologiesNumExecutors = registerHistogram.apply(“topologies:num-executors”);
private final Histogram topologiesNumWorker = registerHistogram.apply(“topologies:num-workers”);
private final Histogram topologiesUptime = registerHistogram.apply(“topologies:uptime-secs”);
private final Histogram topologiesReplicationCount = registerHistogram.apply(“topologies:replication-count”);
private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply(“topologies:requested-mem-on-heap”);
private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply(“topologies:requested-mem-off-heap”);
private final Histogram topologiesRequestedCpu = registerHistogram.apply(“topologies:requested-cpu”);
private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply(“topologies:assigned-mem-on-heap”);
private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply(“topologies:assigned-mem-off-heap”);
private final Histogram topologiesAssignedCpu = registerHistogram.apply(“topologies:assigned-cpu”);
private final StormMetricsRegistry metricsRegistry;

/**
* Constructor to put all items in ClusterSummary in MetricSet as a metric.
* All metrics are derived from a cached ClusterSummary object,
* expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters.
* In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than
* reporting interval to avoid outdated reporting.
*/
ClusterSummaryMetricSet(StormMetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
//Break the code if out of sync to thrift protocol
assert ClusterSummary._Fields.values().length == 3
&& ClusterSummary._Fields.findByName(“supervisors”) == ClusterSummary._Fields.SUPERVISORS
&& ClusterSummary._Fields.findByName(“topologies”) == ClusterSummary._Fields.TOPOLOGIES
&& ClusterSummary._Fields.findByName(“nimbuses”) == ClusterSummary._Fields.NIMBUSES;

final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
@Override
protected ClusterSummary loadValue() {
try {
ClusterSummary newSummary = getClusterInfoImpl();
LOG.debug(“The new summary is {}”, newSummary);
/*
* Update histograms based on the new summary. Most common implementation of Reporter reports Gauges before
* Histograms. Because DerivativeGauge will trigger cache refresh upon reporter’s query, histogram will also be
* updated before query
*/
updateHistogram(newSummary);
return newSummary;
} catch (Exception e) {
LOG.warn(“Get cluster info exception.”, e);
throw new RuntimeException(e);
}
}
};

clusterSummaryMetrics.put(“cluster:num-nimbus-leaders”, new DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
@Override
protected Long transform(ClusterSummary clusterSummary) {
return clusterSummary.get_nimbuses().stream()
.filter(NimbusSummary::is_isLeader)
.count();
}
});
clusterSummaryMetrics.put(“cluster:num-nimbuses”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
@Override
protected Integer transform(ClusterSummary clusterSummary) {
return clusterSummary.get_nimbuses_size();
}
});
clusterSummaryMetrics.put(“cluster:num-supervisors”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
@Override
protected Integer transform(ClusterSummary clusterSummary) {
return clusterSummary.get_supervisors_size();
}
});
clusterSummaryMetrics.put(“cluster:num-topologies”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
@Override
protected Integer transform(ClusterSummary clusterSummary) {
return clusterSummary.get_topologies_size();
}
});
clusterSummaryMetrics.put(“cluster:num-total-workers”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
@Override
protected Integer transform(ClusterSummary clusterSummary) {
return clusterSummary.get_supervisors().stream()
.mapToInt(SupervisorSummary::get_num_workers)
.sum();
}
});
clusterSummaryMetrics.put(“cluster:num-total-used-workers”, new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
@Override
protected Integer transform(ClusterSummary clusterSummary) {
return clusterSummary.get_supervisors().stream()
.mapToInt(SupervisorSummary::get_num_used_workers)
.sum();
}
});
clusterSummaryMetrics.put(“cluster:total-fragmented-memory-non-negative”, new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
@Override
protected Double transform(ClusterSummary clusterSummary) {
return clusterSummary.get_supervisors().stream()
//Filtered negative value
.mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0))
.sum();
}
});
clusterSummaryMetrics.put(“cluster:total-fragmented-cpu-non-negative”, new DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
@Override
protected Double transform(ClusterSummary clusterSummary) {
return clusterSummary.get_supervisors().stream()
//Filtered negative value
.mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0))
.sum();
}
});
}

private void updateHistogram(ClusterSummary newSummary) {
for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
nimbusUptime.update(nimbusSummary.get_uptime_secs());
}
for (SupervisorSummary summary : newSummary.get_supervisors()) {
supervisorsUptime.update(summary.get_uptime_secs());
supervisorsNumWorkers.update(summary.get_num_workers());
supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
}
for (TopologySummary summary : newSummary.get_topologies()) {
topologiesNumTasks.update(summary.get_num_tasks());
topologiesNumExecutors.update(summary.get_num_executors());
topologiesNumWorker.update(summary.get_num_workers());
topologiesUptime.update(summary.get_uptime_secs());
topologiesReplicationCount.update(summary.get_replication_count());
topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
}
}

void setActive(final boolean active) {
if (this.active != active) {
this.active = active;
if (active) {
metricsRegistry.registerAll(clusterSummaryMetrics);
} else {
metricsRegistry.removeAll(clusterSummaryMetrics);
}
}
}

@Override
public void run() {
try {
setActive(isLeader());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

这个线程主要是调用 setActive 方法,做的工作的话,就是不断判断节点状态变化,如果 leader 发生变化,自己是 leader 则注册 clusterSummaryMetrics,如果自己变成不是 leader 则删除掉 clusterSummaryMetrics
clusterSummaryMetrics 添加了 cluster:num-nimbus-leaders、cluster:num-nimbuses、cluster:num-supervisors、cluster:num-topologies、cluster:num-total-workers、cluster:num-total-used-workers、cluster:total-fragmented-memory-non-negative、cluster:total-fragmented-cpu-non-negative 这几个指标

小结

LoggingClusterMetricsConsumer 消费的是 cluster 级别的指标,它消费了指标数据,然后打印到日志文件,log4j2 的配置读取的是 cluster.xml,最后写入的文件是 nimbus.log.metrics、supervisor.log.metrics
Nimbus 在 launchServer 的时候,会建立一个定时任务,在当前节点是 leader 的情况下,定时发送 metrics 指标到 ClusterMetricsConsumerExecutor,然后间接回调 LoggingClusterMetricsConsumer 的 handleDataPoints 方法,把数据打印到日志
handleDataPoints 处理两类 info,一类是 ClusterInfo,一类是 SupervisorInfo;这里要注意一下定时任务是在当前节点是 leader 的情况下才会 sendClusterMetricsToExecutors 的,正常情况 nimbus 与 supervisor 不在同一个节点上,因而 supervisor.log.metrics 可能是空的

doc
Storm Metrics

退出移动版