1. 实时计算

  • 有别于传统的离线批处理操作(对很多数据的集合进行的操作)
  • 实时处理,说白就是针对一条一条的数据/记录进行操作
  • 实时计算计算的是无界数据

2. 有界数据和无界数据

2.1 有界数据

  • 离线计算面临的操作数据都是有界限的,无论是1G、1T、1P、1EB、1NB
  • 数据的有界必然会导致计算的有界

2.2 无界数据

  • 实时计算面临的操作数据是源源不断的向水流一样,是没有界限的
  • 数据的无界必然导致计算的无界

3. 计算中心和计算引擎

在大数据领域中存在三大计算中心三大计算引擎

3.1 三大计算中心

  • 离线计算计算中心(mapreduce)
  • 实时计算中心(storm flink...)
  • 准实时计算中心(spark)

3.2 三大计算引擎

  • 交互式查询计算引擎(hive sparksql)
  • 图计算计算引擎
  • 机器学习计算引擎

4. Storm简介

  • 免费 开源 分布式 实时计算系统
  • 处理无界的数据流
  • Tiwtter开源的cloujre
  • Storm能实现高频数据和大规模数据的实时处理
  • 官网资料显示storm的一个节点1秒钟能够处理100万个100字节的消息(IntelE5645@2.4Ghz的CPU,24GB的内存)
  • storm是毫秒级的实时处理框架

Apache Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统。

5. hadoop与storm的计算

  • 数据来源

    • hadoop

      • HADOOP处理的是HDFS上TB级别的数据(历史数据)
    • storm

      • STORM是处理的是实时新增的某一笔数据(实时数据)
  • 处理过程

    • hadoop

      • HADOOP是分MAP阶段到REDUCE阶段
      • HADOOP最后是要结束的
    • storm

      • STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT)
      • STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始
  • 处理速度

    • hadoop

      • HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢
    • storm

      • STORM是只要处理新增的某一笔数据即可,可以做到很快 (毫秒级的响应)
  • 适用场景

    • HADOOP是在要处理批量数据时用的 ,不讲究时效性
    • STORM是要处理某一新增数据时用的,要讲时效性

6. Storm的架构

  • Spout

    • Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout
    • 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息
  • Bolt

    • 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作
  • 数据流
  • Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
  • Stream groupings: 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们.

7. Storm集群的安装

  • 准备安装文件

    apache-storm-1.0.2.tar.gz

  • 解压
[root@uplooking01 /soft]    tar -zxvf apache-storm-1.0.2.tar.gz -C /opt    mv apache-storm-1.0.2/ storm
  • 配置storm

storm-env.sh

[root@uplooking01 /soft]    export JAVA_HOME=/opt/jdk    export STORM_CONF_DIR="/opt/storm/conf"

storm.yaml

[root@uplooking01 /opt/storm/conf]storm.zookeeper.servers:  - "uplooking03"  - "uplooking04"  - "uplooking05"#配置两个主节点,实现主节点的单点故障nimbus.seeds: ["uplooking01", "uplooking02"]storm.local.dir: "/opt/storm/storm-local"#配置从节点的槽数supervisor.slots.ports:  - 6700  - 6701  - 6702  - 6703
  • 分发到其他节点
[root@uplooking01 /]    scp -r /opt/storm uplooking02:/opt    scp -r /opt/storm uplooking03:/opt    scp -r /opt/storm uplooking04:/opt    scp -r /opt/storm uplooking05:/opt
  • 启动storm
[root@uplooking01 /]      #启动主进程和ui进程    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &    nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /]    #启动主进程(numbus)    nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 &    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
#启动从节点进程(supervisor)[root@uplooking03 /]    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &[root@uplooking04 /]    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &[root@uplooking05 /]    nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &    nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &

8. Storm集群的启动脚本

#!/bin/bash#启动nimbusfor nimbusHost in  `cat /opt/shell/nimbus.host`do#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端ssh -T  root@${nimbusHost}    << eeooff    nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &eeooffdone#启动supervisorfor supervisorHost in  `cat /opt/shell/supervisor.host`do#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端ssh -T  root@${supervisorHost}    << eeooff        nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &eeooffdone#启动logviewerfor logviewerHost in  `cat /opt/shell/logviewer.host`do#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端ssh -T  root@${logviewerHost}    << eeooff        nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &eeooffdone#启动uifor uiHost in  `cat /opt/shell/ui.host`do#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端ssh -T  root@${uiHost}    << eeooff        nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &eeooffdone

9. Storm实现数字累加

  • 编写Spout
public class MySpout extends BaseRichSpout {    private SpoutOutputCollector collector;    //初始化累加的数字    int num = 0;    @Override    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {        this.collector = collector;    }    @Override    public void nextTuple() {        collector.emit(new Values(num));        num++;    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("mynum"));    }}
  • 编写Bolt
public class MyBolt extends BaseRichBolt {    @Override    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {    }    @Override    public void execute(Tuple tuple) {        Integer num = tuple.getIntegerByField("mynum");        System.out.println(num);    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {    }}
  • 编写Topology
public class MyTopology {    public static void main(String[] args) {        //创建自定义的spout        MySpout mySpout = new MySpout();        //创建自定义的bolt        MyBolt myBolt = new MyBolt();        //创建topology名称        String topologyName = "MyNumTopology";        //创建topology的配置对象        Map conf = new Config();        //创建topology的构造器        TopologyBuilder topologyBuilder = new TopologyBuilder();        //为topology设置spout和bolt        topologyBuilder.setSpout("myspout", mySpout);        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");        //创建本地的topology提交器        StormTopology stormTopology = topologyBuilder.createTopology();        LocalCluster localCluster = new LocalCluster();        localCluster.submitTopology(topologyName, conf, stormTopology);    }}

10. 多个Bolt的问题

  • 定义下一个Bolt
public class MyBolt02 extends BaseRichBolt {    @Override    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {    }    @Override    public void execute(Tuple tuple) {        System.out.println(tuple.getIntegerByField("mynum02") + ".....");    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {    }}
  • 第一个Bolt中给第二个Bolt发射数据
public class MyBolt extends BaseRichBolt {    private OutputCollector collector;    @Override    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {        this.collector = collector;    }    @Override    public void execute(Tuple tuple) {        Integer num = tuple.getIntegerByField("mynum");        System.out.println(num);        collector.emit(new Values(num));    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("mynum02"));    }}
  • 在Topology中配置第二个Bolt
public class MyTopology {    public static void main(String[] args) {        //创建自定义的spout        MySpout mySpout = new MySpout();        //创建自定义的bolt        MyBolt myBolt = new MyBolt();        MyBolt02 myBolt02 = new MyBolt02();        //创建topology名称        String topologyName = "MyNumTopology";        //创建topology的配置对象        Map conf = new Config();        //创建topology的构造器        TopologyBuilder topologyBuilder = new TopologyBuilder();        //为topology设置spout和bolt        topologyBuilder.setSpout("myspout", mySpout);        topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");        topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt");        //创建本地的topology提交器        StormTopology stormTopology = topologyBuilder.createTopology();        LocalCluster localCluster = new LocalCluster();        localCluster.submitTopology(topologyName, conf, stormTopology);    }}

11. 提交作业到集群

 StormSubmitter.submitTopology(topologyName, conf, stormTopology);

12. Storm的并行度

在storm中的并行度说的就是一个进程的运行需要多少个线程来参与,如果storm运行的线程个数+1,则并行度+1

Worker :

  • worker是一个进程级别的概念,可以通过jps查看的到
  • worker是一个Topology实例的子集,也就是说一个Topology的实例在supervisor中运行,可以在一个或者多个supervisor中启动一个或者多个worker进程
  • 一个worker进程只能为一个Topology实例服务
  • 所以Topology和worker的关系===>1:N
  • 进程是由多个线程来组成,这里的线程就是Executor
  • conf.setNumWorkers(int workers)
  • 所以worker和executor的关系===>1:N
  • 每一个executor线程具体干活是由一个个task任务的实例来完成的
  • 在builer.setSpout/setBolt的第三个参数设置
  • Task真正在topology干活的实例,一个executor线程,默认情况下对应了1个task的实例的
  • Executor和Task的关系===>1:N
  • builder.setSpout().setNumTasks(tasks)//设置的是spout对应的executor拥有几个task实例builder.setBolt().setNumTasks(tasks)//设置的是bolt对应的executor拥有几个task实例

13. Storm中的消息确认机制

  • 在spout中如果发送消息时指定messageId则代表开启消息确认机制,如果不指定messageID则代表不开启消息确认机制
  • 如果Spout中开启了消息确认机制则在bolt中需要用ack()方法来确认消息接收成功
  • 在Soput中重写响应的fail()和ack()方法来处理消息成功或者失败的回调逻辑
  • Storm默认如果不确认消息接收成功则30s之后返回消息失败
  • 消息确认机制要慎重使用(效率换取安全)