序
本文主要研究一下 storm 的 GraphiteStormReporter
GraphiteStormReporter
storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java
public class GraphiteStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
public static final String GRAPHITE_PREFIXED_WITH = “graphite.prefixed.with”;
public static final String GRAPHITE_HOST = “graphite.host”;
public static final String GRAPHITE_PORT = “graphite.port”;
public static final String GRAPHITE_TRANSPORT = “graphite.transport”;
@Override
public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
LOG.debug(“Preparing…”);
GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);
TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
if (durationUnit != null) {
builder.convertDurationsTo(durationUnit);
}
TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
if (rateUnit != null) {
builder.convertRatesTo(rateUnit);
}
StormMetricsFilter filter = getMetricsFilter(reporterConf);
if(filter != null){
builder.filter(filter);
}
String prefix = getMetricsPrefixedWith(reporterConf);
if (prefix != null) {
builder.prefixedWith(prefix);
}
//defaults to 10
reportingPeriod = getReportPeriod(reporterConf);
//defaults to seconds
reportingPeriodUnit = getReportPeriodUnit(reporterConf);
// Not exposed:
// * withClock(Clock)
String host = getMetricsTargetHost(reporterConf);
Integer port = getMetricsTargetPort(reporterConf);
String transport = getMetricsTargetTransport(reporterConf);
GraphiteSender sender = null;
if (transport.equalsIgnoreCase(“udp”)) {
sender = new GraphiteUDP(host, port);
} else {
sender = new Graphite(host, port);
}
reporter = builder.build(sender);
}
private static String getMetricsPrefixedWith(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
}
private static String getMetricsTargetHost(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
}
private static Integer getMetricsTargetPort(Map reporterConf) {
return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
}
private static String getMetricsTargetTransport(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), “tcp”);
}
}
继承了 ScheduledStormReporter,实现 prepare 方法
prepare 方法根据配置文件创建 com.codahale.metrics.graphite.GraphiteSender,然后创建 com.codahale.metrics.graphite.GraphiteReporter
ScheduledStormReporter
storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
public abstract class ScheduledStormReporter implements StormReporter{
private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
protected ScheduledReporter reporter;
protected long reportingPeriod;
protected TimeUnit reportingPeriodUnit;
@Override
public void start() {
if (reporter != null) {
LOG.debug(“Starting…”);
reporter.start(reportingPeriod, reportingPeriodUnit);
} else {
throw new IllegalStateException(“Attempt to start without preparing ” + getClass().getSimpleName());
}
}
@Override
public void stop() {
if (reporter != null) {
LOG.debug(“Stopping…”);
reporter.stop();
} else {
throw new IllegalStateException(“Attempt to stop without preparing ” + getClass().getSimpleName());
}
}
public static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) {
TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
return unit == null ? TimeUnit.SECONDS : unit;
}
private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
String rateUnitString = Utils.getString(reporterConf.get(configName), null);
if (rateUnitString != null) {
return TimeUnit.valueOf(rateUnitString);
}
return null;
}
public static long getReportPeriod(Map reporterConf) {
return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
}
public static StormMetricsFilter getMetricsFilter(Map reporterConf){
StormMetricsFilter filter = null;
Map<String, Object> filterConf = (Map)reporterConf.get(“filter”);
if(filterConf != null) {
String clazz = (String) filterConf.get(“class”);
if (clazz != null) {
filter = Utils.newInstance(clazz);
filter.prepare(filterConf);
}
}
return filter;
}
}
ScheduledStormReporter 封装了对 reporter 的生命周期的控制,启动时调用 start,关闭时调用 stop
小结
storm 从 1.2 版本开始启用了新的 metrics,即 metrics2,新版的 metrics 基于 Dropwizard Metrics
默认提供了 Console Reporter、CSV Reporter、Ganglia Reporter、Graphite Reporter、JMX Reporter
doc
New Metrics Reporting API
ubuntu-graphite-grafana