apache storm demo示例

11次阅读

共计 10454 个字符,预计需要花费 27 分钟才能阅读完成。

从国外网站上翻译的,主要业务是创建移动电话日志分析器。

场景 – 移动呼叫日志分析器

移动电话及其持续时间将作为 Apache Storm 的输入提供,Storm 将处理并分组相同呼叫者和接收者之间的呼叫及其呼叫总数。

创建 Spout

Spout 是用于数据生成的组件。基本上,spout 将实现一个 IRichSpout 接口。“IRichSpout”界面有以下重要方法 –

  • open – 为 spout 提供执行环境。执行者将运行此方法来初始化 spout。
  • nextTuple – 通过收集器发出生成的数据。
  • close – spout 将要关闭时调用此方法。
  • declareOutputFields – 声明元组的输出模式。
  • ack – 确认处理了特定的 tuple
  • fail – 指定一个特定的 tuple 不被处理并且不被重新处理。

open

open方法的签名如下 –

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf – 为此 spout 提供 storm 暴配置。
  • context – 提供关于 topology 中 spout 位置,其任务 ID,输入和输出信息的完整信息。
  • collector – 使我们能够发出将由 bolts 处理的 tuple。

nextTuple

nextTuple方法的签名如下 –

nextTuple()

nextTuple()从与 ack()和 fail()方法相同的循环周期性地调用。当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。所以 nextTuple 的第一行检查处理是否完成。如果是这样,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。

close

close方法的签名如下 –

close()

declareOutputFields

declareOutputFields方法的签名如下所示 –

declareOutputFields(OutputFieldsDeclarer declarer)

declarer – 它用于声明输出流 ID,输出字段等。

此方法用于指定 tuple 的输出模式。

ack

ack方法的签名如下 –

ack(Object msgId)

该方法确认已经处理了特定的元组。

fail

fail方法的签名如下 –

fail(Object msgId)

此方法通知某个特定的元组尚未完全处理。Storm 将重新处理特定的元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集通话记录详细信息。通话记录的信息包含。

  • 来电号码
  • 接收器号码
  • 持续时间

由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。假信息将使用 Random 类创建。完整的程序代码如下。

编码 – FakeCallLogReaderSpout.java

package spout;
/**
 * @author: liyj
 * @Date:Created in 2018/5/8
 */
import java.util.*;
//import storm tuple packages


//import Spout interface packages

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

//Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities

public class FakeCallLogReaderSpout implements IRichSpout {
    //Create instance for SpoutOutputCollector which passes tuples to bolt.
    private SpoutOutputCollector collector;
    private boolean completed = false;

    //Create instance for TopologyContext which contains topology data.
    private TopologyContext context;

    //Create instance for Random class.
    private Random randomGenerator = new Random();
    private Integer idx = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void nextTuple() {if(this.idx <= 1000) {List<String> mobileNumbers = new ArrayList<String>();
            mobileNumbers.add("1234123401");
            mobileNumbers.add("1234123402");
            mobileNumbers.add("1234123403");
            mobileNumbers.add("1234123404");

            Integer localIdx = 0;
            while(localIdx++ < 100 && this.idx++ < 1000) {String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

                while(fromMobileNumber == toMobileNumber) {toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                }

                Integer duration = randomGenerator.nextInt(60);
                this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("from", "to", "duration"));
    }

    //Override all the interface methods
    @Override
    public void close() {}

    public boolean isDistributed() {return false;}

    @Override
    public void activate() {}

    @Override
    public void deactivate() {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}

    @Override
    public Map<String, Object> getComponentConfiguration() {return null;}
}

 

创建 Bolt

Bolt 是一个将 tuple 作为输入,处理 tuple 并生成新的 tuple 作为输出的组件。Bolts 将实现 IRichBolt 接口。在这个程序中,使用两个螺栓类 CallLogCreatorBoltCallLogCounterBolt来执行操作。

IRichBolt 接口有以下方法 –

  • prepare – 为 bolt 提供执行的环境。执行者将运行此方法来初始化 spout。
  • execute – 处理输入的单个 tuple。
  • cleanup – 当 bolt 即将关闭时调用。
  • declareOutputFields – 声明 tuple 的输出模式。

准备

准备 方法的签名如下 –

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf – 为此 bolt 提供 storm 配置。
  • context – 提供有关 topology 中 bolt 位置,其任务 ID,输入和输出信息等的完整信息。
  • collector – 使我们能够发出处理过的 tuple。

execute

execute方法的签名如下 –

execute(Tuple tuple)

这里的 tuple 是要处理的输入 tuple。

所述 execute 方法一次处理单 tuple。tuple 数据可以通过 Tuple 类的 getValue 方法访问。没有必要立即处理输入 tuple。多 tuple 可以作为单个输出 tuple 进行处理和输出。处理过的 tuple 可以通过使用 OutputCollector 类发出。

cleanup

cleanup方法的签名如下 –

cleanup()

declareOutputFields

declareOutputFields方法的签名如下所示 –

declareOutputFields(OutputFieldsDeclarer declarer)

这里参数 declarer 用于声明输出流 ID,输出字段等。

此方法用于指定 tuple 的输出模式

通话记录创建者 bolt

通话记录创建器 bolt 接收通话记录 tuple。通话记录 tuple 具有主叫号码,接收者号码和通话时长。通过组合主叫方号码和接收方号码,此 bolt 简单地创建一个新值。新值的格式为“来电号码 – 接收方号码”,并将其命名为新字段“call”。完整的代码如下。

编码 – CallLogCreatorBolt.java

package bolt;

/**
 * @author: liyj
 * @Date:Created in 2018/5/8
 */
//import util packages
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;



//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
    //Create instance for OutputCollector which collects and emits tuples to produce output
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}

    @Override
    public void execute(Tuple tuple) {String from = tuple.getString(0);
        String to = tuple.getString(1);
        Integer duration = tuple.getInteger(2);
        collector.emit(new Values(from + " - " + to, duration));
    }

    @Override
    public void cleanup() {}

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("call", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {return null;}
}

通话记录计数器 bolt

呼叫记录计数器 bolt 接收呼叫及其持续时间作为 tuple。这个 bolt 在 prepare 方法中初始化一个字典(Map)对象。在 execute 方法中,它检查 tuple 并在 tuple 中为每个新的“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已有的条目,它只是递增其值。简单地说,这个 bolt 将 call 和它的计数保存在字典对象中。我们可以将它保存到数据源中,而不是将 call 和它的计数保存在字典中。完整的程序代码如下所示 –

编码 – CallLogCounterBolt.java

package bolt;

/**
 * @author: liyj
 * @Date:Created in 2018/5/8
 */
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;



public class CallLogCounterBolt implements IRichBolt {
    Map<String, Integer> counterMap;
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.counterMap = new HashMap<String, Integer>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {String call = tuple.getString(0);
        Integer duration = tuple.getInteger(1);

        if(!counterMap.containsKey(call)){counterMap.put(call, 1);
        }else{Integer c = counterMap.get(call) + 1;
            counterMap.put(call, c);
        }

        collector.ack(tuple);
    }

    @Override
    public void cleanup() {for(Map.Entry<String, Integer> entry:counterMap.entrySet()){System.out.println(entry.getKey()+" : " + entry.getValue());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("call"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {return null;}

}

创建 topology

Storm topology 基本上是一个 Thrift 结构。TopologyBuilder 类提供了简单和轻松的方法来创建复杂的 topology。TopologyBuilder 类具有设置 spout(setSpout)和设置 bolt(setBolt)的方法。最后,TopologyBuilder 创建 Topology 来创建 topology。使用下面的代码片段来创建一个拓扑 –

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping 方法有助于设置 spout 和 bolt 的流分组。

本地集群

出于开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交 topology。“submitTopology”的一个参数是“Config”类的一个实例。在提交 topology 之前,“Config”类用于设置配置选项。该配置选项将在运行时与集群配置合并,并通过 prepare 方法发送到所有任务(spout 和 bolt)。将 topology 提交到集群后,我们将等待 10 秒钟,以便集群计算提交的 topology,然后使用“LocalCluster”的“close”方法关闭群集。完整的程序代码如下所示 –

编码 – LogAnalyserStorm.java

package topology;

import bolt.CallLogCounterBolt;
import bolt.CallLogCreatorBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import spout.FakeCallLogReaderSpout;

/**
 * @author: liyj
 * @Date:Created in 2018/5/8
 */


//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {public static void main(String[] args) throws Exception{
        //Create Config instance for cluster configuration
        Config config = new Config();
        config.setDebug(true);

        //
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

        builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
                .shuffleGrouping("call-log-reader-spout");

        builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
                .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
        Thread.sleep(10000);

        //Stop the topology

        cluster.shutdown();}
}

构建和运行应用程序(我已经将程序编写为 maven 项目,大家可以将项目 clone 到本地在自己的 ide 中查看)

完整的应用程序有四个 Java 代码。他们是 –

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

应用程序可以使用以下命令构建 –

javac -cp“/path/to/storm/apache-storm-0.9.5/lib/*”*.java

应用程序可以使用以下命令运行 –

java -cp“/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

我在 github 上写的 demo, 直接 clone 到本地,在 ide 中打开即可。

https://github.com/wukuili/storm_test.git

产量

一旦应用程序启动,它将输出有关集群启动过程,spout 和 bolt 处理的完整详细信息,最后还会输出集群关闭过程。在“CallLogCounterBolt”中,我们打印了 call 及其计数详细信息。这些信息将如下显示在控制台上 –

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非 JVM 语言

Storm 风格的 topologies 结构通过 Thrift 接口实现,这使得用任何语言提交 topologies 变得非常容易。Storm 支持 Ruby,Python 和许多其他语言。我们来看看 python 绑定。

Python 绑定

Python 是一种通用的解释型,交互式,面向对象和高级编程语言。Storm 支持 Python 来实现其 topology。Python 支持 emitting, anchoring, acking 和记录操作。

如你所知,bolt 可以用任何语言来定义。以另一种语言编写的 bolt 作为子流程执行,Storm 通过标准输入 / 标准输出与 JSON 消息通信。首先拿一个支持 python 绑定的示例 bolt WordCount。

public static class WordCount implements IRichBolt {public WordSplit() {super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));
   }
}

这里的 WordCount 类实现了 IRichBolt 接口,并使用 python 实现指定的超级方法参数“splitword.py”运行。现在创建一个名为“splitword.py”的 python 实现。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

这是用于计算给定句子中的单词数量的 Python 示例实现。同样,您也可以使用其他支持语言进行绑定。

正文完
 0