本文由作者林洋港授权网易云社区发布。
作为服务端程序,我们总是需要向外界报告一些统计数据,以助于了解系统的运行情况,比如某个接口的调用时间、系统处理的请求数等等。当我们的程序以 Storm Topology 的形式运行时同样需要输出这些统计数据。Storm 为我们提供了 Metric 接口,可以方便的把一些统计指标输出到指定的地方。Storm Metric 的统计方式为每隔指定的时间间隔输出统计内容。本文首先介绍 Storm Metric 相关的接口以及它们之间的关系,然后以实际应用中的一个例子来说明如何使用 Metric 接口。本文使用的 Storm 版本为 0.9.1-incubating。
IMetric 是 Storm 用于保存统计数据的接口
public interface IMetric {
public Object getValueAndReset();
}
接口只有一个 getValueAndReset 方法,当需要输出统计内容时,Storm 就会调用这个方法。值得注意的是 getValueAndReset 方法返回的是 Object 类型,这为统计内容的形式提供了灵活性,我们可以返回任意的类型作为统计信息,这一点在后面的例子中我们会再提到。另一个引起我们注意的地方是 IMetric 接口并没有声明更新统计数据的方法,这样当我们实现 IMetric 接口的时候就更加灵活了——参数类型、参数个数都没有限制。Storm 自身提供了 6 个 IMetric 实现:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。这里只介绍 CountMetric 和 MultiCountMetric 的使用方式,以印证前面说的 IMetric 接口统计数据更新方式的灵活性以及 getValueAndReset 返回 Object 类型的灵活性。CountMetric 就是一个简单的计数器,有两个方法 incr() 和 incrBy(long incrementBy),其 getValueAndReset 方法返回一个 long 类型的值:
public Object getValueAndReset() {
long ret = _value;
_value = 0;
return ret;
}
MultiCountMetric,顾名思义,就是多个指标的计数器,维护着一个 Map,只有一个方法 CountMetric scope(String key)。因此 MultiCountMetric 的更新方式为 MultiCountMetric.scope(key).incr() 或 MultiCountMetric.scope(key).incrBy(long incrementBy)。它的 getValueAndReset 返回的是一个 Map:
public Object getValueAndReset() {
Map ret = new HashMap();
for(Map.Entry e : _value.entrySet()) {
ret.put(e.getKey(), e.getValue().getValueAndReset());
}
return ret;
}
除了 IMetric 接口,还有另外一个接口 IMetricsConsumer,它负责向外输出统计信息,即把 IMetric getValueAndReset 方法返回的数据输出到外面。IMetricsConsumer 有三个方法
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
void handleDataPoints(TaskInfo taskInfo, Collection dataPoints);
void cleanup();
其中 prepare 是初始化,cleanup 是生命周期结束时的清理工作,handleDataPoints 才是真正的统计信息输出方法,taskInfo 参数存储当前 task 的信息(host、port、component id、task id 等等),dataPoints 存储的是 IMetric 返回的统计信息,可能是出于性能考虑,dataPoints 是一个集合,包含了多个 IMetric 返回的数据。让我们来具体看看 DataPoint 这个类:
public static class DataPoint {
@Override
public String toString() {
return “[” + name + ” = ” + value + “]”;
}
public String name;
public Object value;
}
name 是 IMetric 注册时的名字,value 就是 IMetric getValueAndReset 返回的那个 Object。
Storm 只提供了一个 IMetricsConsumer 实现——LoggingMetricsConsumer。LoggingMetricsConsumer 做的事情很简单,就是把 dataPoints 输出到日志文件 metrics.log,下面是其 handleDataPoints 方法的部分代码:
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.name)
.append(padding).delete(header.length()+23,sb.length()).append(“\t”)
.append(p.value);
LOG.info(sb.toString());
}
可以看到它通过调用 DataPoint 的 value 的 toString 方法把统计信息输出到日志里面的,所以如果你的 IMetric 实现返回的是自己定义的类型,记得重载 toString() 方法,让统计信息以可读的格式输出。
到这里 Storm 的 Metric 接口和自带的实现基本介绍完了,接下来我们来看看怎么使用 Storm 自带的这些实现。首先,Storm 默认的配置是关掉 Metric 功能的,可以有两种方式开启 Metric 功能:
1)在 storm.yaml 里面配置,这种是集群级别的设置,个人不建议这么做,所以就不多介绍了
2)conf.registerMetricsConsumer(Class klass, long parallelismHint); 这是 topology 级别的,klass 是 IMetricsConsumer 的实现类,parallelismHint 这个参数 Storm 代码里面没注释我也没深入看底层的实现,这里结合自己的实验谈谈它的意义:topology 是在 1 个或多个 worker 上面以多个 task 的方式跑的嘛,parallelismHint 就是指定多少个并发来输出统计信息。这里我也不知道 parallelismHint 指的是多个 task、worker 还是 supervisor,反正 parallelismHint= 1 的时候只在特定的一个 supervisor 下面的 metrics.log 有统计信息,parallelismHint>1 时可能取决于 worker 的数量,我测试的时候由于是在多个 supervisor 上跑的,因此观察到多个 supervisor 都有 metrics.log 的输出。个人经验是 parallelismHint 设为 1,这样可以在一个 supervisor 下面的 metrics.log 就能看到所有 task 的统计信息。
由于我建议采用第二种方法,所以示例代码为:
// 客户端注册 IMetricsConsumer
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
// 我们假设要统计 spout 某段代码的调用次数
// 注册 IMetric
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
…
metric=new CountMetric();
context.registerMetric(“spout time cost”, metric, 60); // 因此 DataPoint 的 name 为 spout time cost,60 表示 1 分钟统计一次
…
}
// 更新统计数据
@Override
public void nextTuple() {
if(…)…
else{
…
metric.incr();
}
}
这样就可以了,然后你就能在 metrics.log 看到统计数据了。
现在,假设我们的需求跟上面不太一样:1)metrics.log 只打印我们自己维护的统计信息,屏蔽__system、__fail-count 这种系统自己的统计信息;2)不只统计代码的调用次数,还要统计调用时间——最小时间、最大时间、平均时间。
第一点可以通过重载 LoggingMetricsConsumer 的方法来实现:
public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {
@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith(“__”)) return;
if (dataPoints == null || dataPoints.isEmpty()) return;
List<DataPoint> list = new ArrayList<DataPoint>();
for (DataPoint p : dataPoints) {
if (p.name == null || p.name.startsWith(“__”)) continue;
list.add(p);
}
if (list.isEmpty()) return;
super.handleDataPoints(taskInfo, list);
}
}
第二点需要开发我们自己的 IMetric 接口实现类 TimeCostMetric,以下是其主要代码:
@Override
public Object getValueAndReset() {
TimeCost timeCost=new TimeCost();
timeCost.count=count;
if(timeCost.count>0){
timeCost.min=min;
timeCost.max=max;
timeCost.mean=all*1.0/timeCost.count;
}
init();
return timeCost;
}
public void update(long time){
count++;
all+=time;
if(min>time)min=time;
if(max<time)max=time;
}
public static class TimeCost implements Serializable{
private static final long serialVersionUID = 8355726599226036228L;
int count;
long min;
long max;
double mean;
public String toString(){
return “count: “+count+”, min: “+min+”, max:”+max+”, mean: “+mean;
}
}
TimeCostMetric 的 getValueAndReset 方法返回的是一个 TimeCost 对象,日志中最终打印的就是其 toString 方法的内容。然后把前面红色部分的代码改成下面的内容:
① conf.registerMetricsConsumer(AppLoggingMetricsConsumer .class);
② metric=new TimeCostMetric();
context.registerMetric(“MQ spout time cost”, metric, 60);
③ metric.incr();
再来看看 metrics.log
本文中是直接把统计信息打到日志中,你也可以自己实现 IMetricsConsumer 接口,把统计信息保存到指定的地方,如数据库、监控平台等等。
免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐
更多网易技术、产品、运营经验分享请访问网易云社区。
文章来源:网易云社区