乐趣区

大数据系列Storm安装和API

1. 实时计算

  • 有别于传统的离线批处理操作(对很多数据的集合进行的操作)
  • 实时处理,说白就是针对一条一条的数据 / 记录进行操作
  • 实时计算计算的是 无界数据

2. 有界数据和无界数据

2.1 有界数据

  • 离线计算面临的操作数据都是有界限的,无论是 1G、1T、1P、1EB、1NB
  • 数据的有界必然会导致计算的有界

2.2 无界数据

  • 实时计算面临的操作数据是源源不断的向水流一样,是没有界限的
  • 数据的无界必然导致计算的无界

3. 计算中心和计算引擎

在大数据领域中存在 三大计算中心 三大计算引擎

3.1 三大计算中心

  • 离线计算计算中心(mapreduce)
  • 实时计算中心(storm flink…)
  • 准实时计算中心(spark)

3.2 三大计算引擎

  • 交互式查询计算引擎(hive sparksql)
  • 图计算计算引擎
  • 机器学习计算引擎

4. Storm 简介

  • 免费 开源 分布式 实时计算系统
  • 处理无界的数据流
  • Tiwtter 开源的 cloujre
  • Storm 能实现高频数据和大规模数据的实时处理
  • 官网资料显示 storm 的一个节点 1 秒钟能够处理 100 万个 100 字节的消息(IntelE5645@2.4Ghz 的 CPU,24GB 的内存)
  • storm 是毫秒级的实时处理框架

Apache Storm 是 Twitter 开源的一个类似于 Hadoop 的实时数据处理框架,它原来是由 BackType 开发,后 BackType 被 Twitter 收购,将 Storm 作为 Twitter 的实时数据分析系统。

5. hadoop 与 storm 的计算

  • 数据来源

    • hadoop

      • HADOOP 处理的是 HDFS 上 TB 级别的数据(历史数据)
    • storm

      • STORM 是处理的是实时新增的某一笔数据(实时数据)
  • 处理过程

    • hadoop

      • HADOOP 是分 MAP 阶段到 REDUCE 阶段
      • HADOOP 最后是要结束的
    • storm

      • STORM 是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源 (SPOUT) 或处理逻辑(BOLT)
      • STORM 是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始
  • 处理速度

    • hadoop

      • HADOOP 是以处理 HDFS 上 TB 级别数据为目的,处理速度慢
    • storm

      • STORM 是只要处理新增的某一笔数据即可,可以做到很快 (毫秒级的响应)
  • 适用场景

    • HADOOP 是在要处理批量数据时用的,不讲究 时效性
    • STORM 是要处理某一新增数据时用的,要讲 时效性

6. Storm 的架构

  • Spout

    • Storm 认为每个 stream 都有一个 stream 源,也就是原始元组的源头,所以它将这个源头称为 Spout
    • 消息源,是消息生产者,他会从一个外部源读取数据并向 topology 里面面发出消息
  • Bolt

    • 消息处理者,所有的消息处理逻辑被封装在 bolts 里面,处理输入的数据流并产生新的输出数据流, 可执行过滤,聚合,查询数据库等操作
  • 数据流
  • Task 每一个 Spout 和 Bolt 会被当作很多 task 在整个集群里面执行, 每一个 task 对应到一个线程.
  • Stream groupings: 消息分发策略, 定义一个 Topology 的其中一步是定义每个 tuple 接受什么样的流作为输入,stream grouping 就是用来定义一个 stream 应该如何分配给 Bolts 们.

7. Storm 集群的安装

  • 准备安装文件

    ​ apache-storm-1.0.2.tar.gz

  • 解压
[root@uplooking01 /soft]
    tar -zxvf apache-storm-1.0.2.tar.gz -C /opt
    mv apache-storm-1.0.2/ storm
  • 配置 storm

storm-env.sh

[root@uplooking01 /soft]
    export JAVA_HOME=/opt/jdk
    export STORM_CONF_DIR="/opt/storm/conf"

storm.yaml

[root@uplooking01 /opt/storm/conf]


storm.zookeeper.servers:
  - "uplooking03"
  - "uplooking04"
  - "uplooking05"

#配置两个主节点, 实现主节点的单点故障
nimbus.seeds: ["uplooking01", "uplooking02"]
storm.local.dir: "/opt/storm/storm-local"
#配置从节点的槽数
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703
  • 分发到其他节点
[root@uplooking01 /]
    scp -r /opt/storm uplooking02:/opt
    scp -r /opt/storm uplooking03:/opt
    scp -r /opt/storm uplooking04:/opt
    scp -r /opt/storm uplooking05:/opt
  • 启动 storm
[root@uplooking01 /]  
    #启动主进程和 ui 进程
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /]
    #启动主进程(numbus)
    nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
# 启动从节点进程(supervisor)
[root@uplooking03 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking04 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking05 /]
    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &

8. Storm 集群的启动脚本

#!/bin/bash
#启动 nimbus

for nimbusHost in  `cat /opt/shell/nimbus.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${nimbusHost}    << eeooff
    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
eeooff
done

#启动 supervisor
for supervisorHost in  `cat /opt/shell/supervisor.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${supervisorHost}    << eeooff
        nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
eeooff
done


#启动 logviewer
for logviewerHost in  `cat /opt/shell/logviewer.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${logviewerHost}    << eeooff
        nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
eeooff
done


#启动 ui
for uiHost in  `cat /opt/shell/ui.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T  root@${uiHost}    << eeooff
        nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
eeooff
done

9. Storm 实现数字累加

  • 编写 Spout
public class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    // 初始化累加的数字
    int num = 0;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}

    @Override
    public void nextTuple() {collector.emit(new Values(num));
        num++;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("mynum"));
    }
}
  • 编写 Bolt
public class MyBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { }

    @Override
    public void execute(Tuple tuple) {Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
  • 编写 Topology
public class MyTopology {public static void main(String[] args) {
        // 创建自定义的 spout
        MySpout mySpout = new MySpout();
        // 创建自定义的 bolt
        MyBolt myBolt = new MyBolt();
        // 创建 topology 名称
        String topologyName = "MyNumTopology";
        // 创建 topology 的配置对象
        Map conf = new Config();

        // 创建 topology 的构造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        // 为 topology 设置 spout 和 bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");

        // 创建本地的 topology 提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

10. 多个 Bolt 的问题

  • 定义下一个 Bolt
public class MyBolt02 extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { }

    @Override
    public void execute(Tuple tuple) {System.out.println(tuple.getIntegerByField("mynum02") + ".....");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
  • 第一个 Bolt 中给第二个 Bolt 发射数据
public class MyBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}

    @Override
    public void execute(Tuple tuple) {Integer num = tuple.getIntegerByField("mynum");
        System.out.println(num);
        collector.emit(new Values(num));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("mynum02"));
    }
}
  • 在 Topology 中配置第二个 Bolt
public class MyTopology {public static void main(String[] args) {
        // 创建自定义的 spout
        MySpout mySpout = new MySpout();
        // 创建自定义的 bolt
        MyBolt myBolt = new MyBolt();

        MyBolt02 myBolt02 = new MyBolt02();
        // 创建 topology 名称
        String topologyName = "MyNumTopology";
        // 创建 topology 的配置对象
        Map conf = new Config();

        // 创建 topology 的构造器
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        // 为 topology 设置 spout 和 bolt
        topologyBuilder.setSpout("myspout", mySpout);
        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");
        topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt");

        // 创建本地的 topology 提交器
        StormTopology stormTopology = topologyBuilder.createTopology();
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, conf, stormTopology);
    }
}

11. 提交作业到集群

 StormSubmitter.submitTopology(topologyName, conf, stormTopology);

12. Storm 的并行度

在 storm 中的并行度说的就是一个进程的运行需要多少个线程来参与,如果 storm 运行的线程个数 +1,则并行度 +1

Worker :

  • worker 是一个进程级别的概念,可以通过 jps 查看的到
  • worker 是一个 Topology 实例的子集,也就是说一个 Topology 的实例在 supervisor 中运行,可以在一个或者多个 supervisor 中启动一个或者多个 worker 进程
  • 一个 worker 进程只能为一个 Topology 实例服务
  • 所以 Topology 和 worker 的关系 ===>1:N
  • 进程是由多个线程来组成,这里的线程就是 Executor
  • conf.setNumWorkers(int workers)
  • 所以 worker 和 executor 的关系 ===>1:N
  • 每一个 executor 线程具体干活是由一个个 task 任务的实例来完成的
  • 在 builer.setSpout/setBolt 的第三个参数设置
  • Task 真正在 topology 干活的实例,一个 executor 线程,默认情况下对应了 1 个 task 的实例的
  • Executor 和 Task 的关系 ===>1:N
  • builder.setSpout().setNumTasks(tasks)// 设置的是 spout 对应的 executor 拥有几个 task 实例 builder.setBolt().setNumTasks(tasks)// 设置的是 bolt 对应的 executor 拥有几个 task 实例

13. Storm 中的消息确认机制

  • 在 spout 中如果发送消息时指定 messageId 则代表开启消息确认机制, 如果不指定 messageID 则代表不开启消息确认机制
  • 如果 Spout 中开启了消息确认机制则在 bolt 中需要用 ack()方法来确认消息接收成功
  • 在 Soput 中重写响应的 fail()和 ack()方法来处理消息成功或者失败的回调逻辑
  • Storm 默认如果不确认消息接收成功则 30s 之后返回消息失败
  • 消息确认机制要慎重使用 ( 效率换取安全)
退出移动版