从国外网站上翻译的,主要业务是创建移动电话日志分析器。

场景 - 移动呼叫日志分析器

移动电话及其持续时间将作为Apache Storm的输入提供,Storm将处理并分组相同呼叫者和接收者之间的呼叫及其呼叫总数。

创建Spout

Spout是用于数据生成的组件。基本上,spout将实现一个IRichSpout接口。“IRichSpout”界面有以下重要方法 -

  • open - 为spout提供执行环境。执行者将运行此方法来初始化spout。
  • nextTuple - 通过收集器发出生成的数据。
  • close - spout将要关闭时调用此方法。
  • declareOutputFields - 声明元组的输出模式。
  • ack - 确认处理了特定的tuple
  • fail - 指定一个特定的tuple不被处理并且不被重新处理。

open

__open__方法的签名如下 -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 为此spout提供storm暴配置。
  • context - 提供关于topology中spout位置,其任务ID,输入和输出信息的完整信息。
  • collector - 使我们能够发出将由bolts处理的tuple。

nextTuple

__nextTuple__方法的签名如下 -

nextTuple()

nextTuple()从与ack()和fail()方法相同的循环周期性地调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行检查处理是否完成。如果是这样,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。

close

__close__方法的签名如下 -

close()

declareOutputFields

__declareOutputFields__方法的签名如下所示 -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - 它用于声明输出流ID,输出字段等。此方法用于指定tuple的输出模式。

ack

__ack__方法的签名如下 -

ack(Object msgId)

该方法确认已经处理了特定的元组。

fail

__fail__方法的签名如下 -

fail(Object msgId)

此方法通知某个特定的元组尚未完全处理。Storm将重新处理特定的元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。

  • 来电号码
  • 接收器号码
  • 持续时间

由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。假信息将使用Random类创建。完整的程序代码如下。

编码 - FakeCallLogReaderSpout.java

[code lang=“java”]package spout;/** * @author: liyj * @Date:Created in 2018/5/8 /import java.util.;//import storm tuple packages//import Spout interface packagesimport org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichSpout;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalitiespublic class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add(“1234123401”); mobileNumbers.add(“1234123402”); mobileNumbers.add(“1234123403”); mobileNumbers.add(“1234123404”); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“from”, “to”, “duration”)); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; }}[/code] 

创建Bolt

Bolt是一个将tuple作为输入,处理tuple并生成新的tuple作为输出的组件。Bolts将实现__IRichBolt__接口。在这个程序中,使用两个螺栓类__CallLogCreatorBolt__和__CallLogCounterBolt__来执行操作。IRichBolt接口有以下方法 -

  • prepare - 为bolt提供执行的环境。执行者将运行此方法来初始化spout。
  • execute - 处理输入的单个tuple。
  • cleanup - 当bolt即将关闭时调用。
  • declareOutputFields - 声明tuple的输出模式。

准备

__准备__方法的签名如下 -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - 为此bolt提供storm配置。
  • context - 提供有关topology中bolt位置,其任务ID,输入和输出信息等的完整信息。
  • collector - 使我们能够发出处理过的tuple。

execute

__execute__方法的签名如下 -

execute(Tuple tuple)

这里的__tuple__是要处理的输入tuple。所述__execute__方法一次处理单tuple。tuple数据可以通过Tuple类的getValue方法访问。没有必要立即处理输入tuple。多tuple可以作为单个输出tuple进行处理和输出。处理过的tuple可以通过使用OutputCollector类发出。

cleanup

__cleanup__方法的签名如下 -

cleanup()

declareOutputFields

__declareOutputFields__方法的签名如下所示 -

declareOutputFields(OutputFieldsDeclarer declarer)

这里参数__declarer__用于声明输出流ID,输出字段等。此方法用于指定tuple的输出模式

通话记录创建者bolt

通话记录创建器bolt接收通话记录tuple。通话记录tuple具有主叫号码,接收者号码和通话时长。通过组合主叫方号码和接收方号码,此bolt简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“call”。完整的代码如下。

编码 - CallLogCreatorBolt.java

[code lang=“java”]package bolt;/** * @author: liyj * @Date:Created in 2018/5/8 *///import util packagesimport org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import java.util.HashMap;import java.util.Map;//Create a class CallLogCreatorBolt which implement IRichBolt interfacepublic class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“call”, “duration”)); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}[/code]

通话记录计数器bolt

呼叫记录计数器bolt接收呼叫及其持续时间作为tuple。这个bolt在prepare方法中初始化一个字典(Map)对象。在__execute__方法中,它检查tuple并在tuple中为每个新的“call”值在字典对象中创建一个新条目,并在字典对象中设置值1。对于字典中已有的条目,它只是递增其值。简单地说,这个bolt将call和它的计数保存在字典对象中。我们可以将它保存到数据源中,而不是将call和它的计数保存在字典中。完整的程序代码如下所示 -

编码 - CallLogCounterBolt.java

[code lang=“java”]package bolt;/** * @author: liyj * @Date:Created in 2018/5/8 */import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import java.util.HashMap;import java.util.Map;public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“call”)); } @Override public Map<String, Object> getComponentConfiguration() { return null; }}[/code]

创建topology

Storm topology基本上是一个Thrift结构。TopologyBuilder类提供了简单和轻松的方法来创建复杂的topology。TopologyBuilder类具有设置spout__(setSpout)和设置bolt(setBolt)的方法__。最后,TopologyBuilder创建Topology来创建topology。使用下面的代码片段来创建一个拓扑 -[code lang=“java”]TopologyBuilder builder = new TopologyBuilder();builder.setSpout(“call-log-reader-spout”, new FakeCallLogReaderSpout());builder.setBolt(“call-log-creator-bolt”, new CallLogCreatorBolt()) .shuffleGrouping(“call-log-reader-spout”);builder.setBolt(“call-log-counter-bolt”, new CallLogCounterBolt()) .fieldsGrouping(“call-log-creator-bolt”, new Fields(“call”));[/code]__shuffleGrouping__和__fieldsGrouping__方法有助于设置spout和bolt的流分组。

本地集群

出于开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交topology。“submitTopology”的一个参数是“Config”类的一个实例。在提交topology之前,“Config”类用于设置配置选项。该配置选项将在运行时与集群配置合并,并通过prepare方法发送到所有任务(spout和bolt)。将topology提交到集群后,我们将等待10秒钟,以便集群计算提交的topology,然后使用“LocalCluster”的“close”方法关闭群集。完整的程序代码如下所示 -

编码 - LogAnalyserStorm.java

[code lang=“java”]package topology;import bolt.CallLogCounterBolt;import bolt.CallLogCreatorBolt;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Fields;import spout.FakeCallLogReaderSpout;/** * @author: liyj * @Date:Created in 2018/5/8 *///Create main class LogAnalyserStorm submit topology.public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“call-log-reader-spout”, new FakeCallLogReaderSpout()); builder.setBolt(“call-log-creator-bolt”, new CallLogCreatorBolt()) .shuffleGrouping(“call-log-reader-spout”); builder.setBolt(“call-log-counter-bolt”, new CallLogCounterBolt()) .fieldsGrouping(“call-log-creator-bolt”, new Fields(“call”)); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“LogAnalyserStorm”, config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); }}[/code]

构建和运行应用程序(我已经将程序编写为maven项目,大家可以将项目clone到本地在自己的ide中查看)

完整的应用程序有四个Java代码。他们是 -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

应用程序可以使用以下命令构建 -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

应用程序可以使用以下命令运行 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

我在github上写的demo,直接clone到本地,在ide中打开即可。https://github.com/wukuili/storm_test.git

产量

一旦应用程序启动,它将输出有关集群启动过程,spout和bolt处理的完整详细信息,最后还会输出集群关闭过程。在“CallLogCounterBolt”中,我们打印了call及其计数详细信息。这些信息将如下显示在控制台上 -

1234123402 - 1234123401 : 781234123402 - 1234123404 : 881234123402 - 1234123403 : 1051234123401 - 1234123404 : 741234123401 - 1234123403 : 811234123401 - 1234123402 : 811234123403 - 1234123404 : 861234123404 - 1234123401 : 631234123404 - 1234123402 : 821234123403 - 1234123402 : 831234123404 - 1234123403 : 861234123403 - 1234123401 : 93

非JVM语言

Storm风格的topologies结构通过Thrift接口实现,这使得用任何语言提交topologies变得非常容易。Storm支持Ruby,Python和许多其他语言。我们来看看python绑定。

Python绑定

Python是一种通用的解释型,交互式,面向对象和高级编程语言。Storm支持Python来实现其topology。Python支持emitting, anchoring, acking和记录操作。如你所知,bolt可以用任何语言来定义。以另一种语言编写的bolt作为子流程执行,Storm通过标准输入/标准输出与JSON消息通信。首先拿一个支持python绑定的示例bolt WordCount。[code lang=“python”]public static class WordCount implements IRichBolt { public WordSplit() { super(“python”, “splitword.py”); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(“word”)); }}[/code]这里的__WordCount__类实现了__IRichBolt__接口,并使用python实现指定的超级方法参数“splitword.py”运行。现在创建一个名为“splitword.py”的python实现。[code lang=“python”]import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" “) for word in words: storm.emit([word])WordCountBolt().run()[/code]这是用于计算给定句子中的单词数量的Python示例实现。同样,您也可以使用其他支持语言进行绑定。