apache storm demo示例

从国外网站上翻译的,主要业务是创建移动电话日志分析器。

场景 – 移动呼叫日志分析器

移动电话及其持续时间将作为Apache Storm的输入提供,Storm将处理并分组相同呼叫者和接收者之间的呼叫及其呼叫总数。

创建Spout

Spout是用于数据生成的组件。基本上,spout将实现一个IRichSpout接口。“IRichSpout”界面有以下重要方法 –

  • open – 为spout提供执行环境。执行者将运行此方法来初始化spout。
  • nextTuple – 通过收集器发出生成的数据。
  • close – spout将要关闭时调用此方法。
  • declareOutputFields – 声明元组的输出模式。
  • ack – 确认处理了特定的tuple
  • fail – 指定一个特定的tuple不被处理并且不被重新处理。

open

open方法的签名如下 –

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf – 为此spout提供storm暴配置。
  • context – 提供关于topology中spout位置,其任务ID,输入和输出信息的完整信息。
  • collector – 使我们能够发出将由bolts处理的tuple。

nextTuple

nextTuple方法的签名如下 –

nextTuple()

nextTuple()从与ack()和fail()方法相同的循环周期性地调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行检查处理是否完成。如果是这样,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。

close

close方法的签名如下 –

close()

declareOutputFields

declareOutputFields方法的签名如下所示 –

declareOutputFields(OutputFieldsDeclarer declarer)

declarer – 它用于声明输出流ID,输出字段等。

此方法用于指定tuple的输出模式。

ack

ack方法的签名如下 –

ack(Object msgId)

该方法确认已经处理了特定的元组。

fail

fail方法的签名如下 –

fail(Object msgId)

此方法通知某个特定的元组尚未完全处理。Storm将重新处理特定的元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。

  • 来电号码
  • 接收器号码
  • 持续时间

由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。假信息将使用Random类创建。完整的程序代码如下。

编码 – FakeCallLogReaderSpout.java

[code lang=”java”]
package spout;
/**
* @author: liyj
* @Date:Created in 2018/5/8
*/
import java.util.*;
//import storm tuple packages

//import Spout interface packages

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;

//Create instance for TopologyContext which contains topology data.
private TopologyContext context;

//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}

@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");

Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}

Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}

//Override all the interface methods
@Override
public void close() {}

public boolean isDistributed() {
return false;
}

@Override
public void activate() {}

@Override
public void deactivate() {}

@Override
public void ack(Object msgId) {}

@Override
public void fail(Object msgId) {}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

[/code]

 

创建Bolt

Bolt是一个将tuple作为输入,处理tuple并生成新的tuple作为输出的组件。Bolts将实现IRichBolt接口。在这个程序中,使用两个螺栓类CallLogCreatorBoltCallLogCounterBolt来执行操作。

IRichBolt接口有以下方法 –

  • prepare – 为bolt提供执行的环境。执行者将运行此方法来初始化spout。
  • execute – 处理输入的单个tuple。
  • cleanup – 当bolt即将关闭时调用。
  • declareOutputFields – 声明tuple的输出模式。

准备

准备方法的签名如下 –

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf – 为此bolt提供storm配置。
  • context – 提供有关topology中bolt位置,其任务ID,输入和输出信息等的完整信息。
  • collector – 使我们能够发出处理过的tuple。

execute

execute方法的签名如下 –

execute(Tuple tuple)

这里的tuple是要处理的输入tuple。

所述execute方法一次处理单tuple。tuple数据可以通过Tuple类的getValue方法访问。没有必要立即处理输入tuple。多tuple可以作为单个输出tuple进行处理和输出。处理过的tuple可以通过使用OutputCollector类发出。

cleanup

cleanup方法的签名如下 –

cleanup()

declareOutputFields

declareOutputFields方法的签名如下所示 –

declareOutputFields(OutputFieldsDeclarer declarer)

这里参数declarer用于声明输出流ID,输出字段等。

此方法用于指定tuple的输出模式

通话记录创建者bolt

通话记录创建器bolt接收通话记录tuple。通话记录tuple具有主叫号码,接收者号码和通话时长。通过组合主叫方号码和接收方号码,此bolt简单地创建一个新值。新值的格式为“来电号码 – 接收方号码”,并将其命名为新字段“call”。完整的代码如下。

编码 – CallLogCreatorBolt.java

[code lang=”java”]
package bolt;

/**
* @author: liyj
* @Date:Created in 2018/5/8
*/
//import util packages
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " – " + to, duration));
}

@Override
public void cleanup() {}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

[/code]

通话记录计数器bolt

呼叫记录计数器bolt接收呼叫及其持续时间作为tuple。这个bolt在prepare方法中初始化一个字典(Map)对象。在execute方法中,它检查tuple并在tuple中为每个新的“call”值在字典对象中创建一个新条目,并在字典对象中设置值1。对于字典中已有的条目,它只是递增其值。简单地说,这个bolt将call和它的计数保存在字典对象中。我们可以将它保存到数据源中,而不是将call和它的计数保存在字典中。完整的程序代码如下所示 –

编码 – CallLogCounterBolt.java

[code lang=”java”]
package bolt;

/**
* @author: liyj
* @Date:Created in 2018/5/8
*/
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);

if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}

collector.ack(tuple);
}

@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

}
[/code]

创建topology

Storm topology基本上是一个Thrift结构。TopologyBuilder类提供了简单和轻松的方法来创建复杂的topology。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder创建Topology来创建topology。使用下面的代码片段来创建一个拓扑 –

[code lang=”java”]
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
[/code]

shuffleGroupingfieldsGrouping方法有助于设置spout和bolt的流分组。

本地集群

出于开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交topology。“submitTopology”的一个参数是“Config”类的一个实例。在提交topology之前,“Config”类用于设置配置选项。该配置选项将在运行时与集群配置合并,并通过prepare方法发送到所有任务(spout和bolt)。将topology提交到集群后,我们将等待10秒钟,以便集群计算提交的topology,然后使用“LocalCluster”的“close”方法关闭群集。完整的程序代码如下所示 –

编码 – LogAnalyserStorm.java

[code lang=”java”]
package topology;

import bolt.CallLogCounterBolt;
import bolt.CallLogCreatorBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import spout.FakeCallLogReaderSpout;

/**
* @author: liyj
* @Date:Created in 2018/5/8
*/

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);

//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);

//Stop the topology

cluster.shutdown();
}
}
[/code]

构建和运行应用程序(我已经将程序编写为maven项目,大家可以将项目clone到本地在自己的ide中查看)

完整的应用程序有四个Java代码。他们是 –

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

应用程序可以使用以下命令构建 –

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

应用程序可以使用以下命令运行 –

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

我在github上写的demo,直接clone到本地,在ide中打开即可。

https://github.com/wukuili/storm_test.git

产量

一旦应用程序启动,它将输出有关集群启动过程,spout和bolt处理的完整详细信息,最后还会输出集群关闭过程。在“CallLogCounterBolt”中,我们打印了call及其计数详细信息。这些信息将如下显示在控制台上 –

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM语言

Storm风格的topologies结构通过Thrift接口实现,这使得用任何语言提交topologies变得非常容易。Storm支持Ruby,Python和许多其他语言。我们来看看python绑定。

Python绑定

Python是一种通用的解释型,交互式,面向对象和高级编程语言。Storm支持Python来实现其topology。Python支持emitting, anchoring, acking和记录操作。

如你所知,bolt可以用任何语言来定义。以另一种语言编写的bolt作为子流程执行,Storm通过标准输入/标准输出与JSON消息通信。首先拿一个支持python绑定的示例bolt WordCount。

[code lang=”python”]
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

[/code]

这里的WordCount类实现了IRichBolt接口,并使用python实现指定的超级方法参数“splitword.py”运行。现在创建一个名为“splitword.py”的python实现。

[code lang=”python”]
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
[/code]

这是用于计算给定句子中的单词数量的Python示例实现。同样,您也可以使用其他支持语言进行绑定。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理