1. 实时计算
- 有别于传统的离线批处理操作(对很多数据的集合进行的操作)
- 实时处理,说白就是针对一条一条的数据 / 记录进行操作
- 实时计算计算的是 无界数据
2. 有界数据和无界数据
2.1 有界数据
- 离线计算面临的操作数据都是有界限的,无论是 1G、1T、1P、1EB、1NB
- 数据的有界必然会导致计算的有界
2.2 无界数据
- 实时计算面临的操作数据是源源不断的向水流一样,是没有界限的
- 数据的无界必然导致计算的无界
3. 计算中心和计算引擎
在大数据领域中存在 三大计算中心 和三大计算引擎
3.1 三大计算中心
- 离线计算计算中心(mapreduce)
- 实时计算中心(storm flink…)
- 准实时计算中心(spark)
3.2 三大计算引擎
- 交互式查询计算引擎(hive sparksql)
- 图计算计算引擎
- 机器学习计算引擎
4. Storm 简介
- 免费 开源 分布式 实时计算系统
- 处理无界的数据流
- Tiwtter 开源的 cloujre
- Storm 能实现高频数据和大规模数据的实时处理
- 官网资料显示 storm 的一个节点 1 秒钟能够处理 100 万个 100 字节的消息(IntelE5645@2.4Ghz 的 CPU,24GB 的内存)
- storm 是毫秒级的实时处理框架
Apache Storm 是 Twitter 开源的一个类似于 Hadoop 的实时数据处理框架,它原来是由 BackType 开发,后 BackType 被 Twitter 收购,将 Storm 作为 Twitter 的实时数据分析系统。
5. hadoop 与 storm 的计算
-
数据来源
-
hadoop
- HADOOP 处理的是 HDFS 上 TB 级别的数据(历史数据)
-
storm
- STORM 是处理的是实时新增的某一笔数据(实时数据)
-
-
处理过程
-
hadoop
- HADOOP 是分 MAP 阶段到 REDUCE 阶段
- HADOOP 最后是要结束的
-
storm
- STORM 是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源 (SPOUT) 或处理逻辑(BOLT)
- STORM 是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始
-
-
处理速度
-
hadoop
- HADOOP 是以处理 HDFS 上 TB 级别数据为目的,处理速度慢
-
storm
- STORM 是只要处理新增的某一笔数据即可,可以做到很快 (毫秒级的响应)
-
-
适用场景
- HADOOP 是在要处理批量数据时用的,不讲究 时效性
- STORM 是要处理某一新增数据时用的,要讲 时效性
6. Storm 的架构
-
Spout
- Storm 认为每个 stream 都有一个 stream 源,也就是原始元组的源头,所以它将这个源头称为 Spout
- 消息源,是消息生产者,他会从一个外部源读取数据并向 topology 里面面发出消息
-
Bolt
- 消息处理者,所有的消息处理逻辑被封装在 bolts 里面,处理输入的数据流并产生新的输出数据流, 可执行过滤,聚合,查询数据库等操作
- 数据流
- Task 每一个 Spout 和 Bolt 会被当作很多 task 在整个集群里面执行, 每一个 task 对应到一个线程.
- Stream groupings: 消息分发策略, 定义一个 Topology 的其中一步是定义每个 tuple 接受什么样的流作为输入,stream grouping 就是用来定义一个 stream 应该如何分配给 Bolts 们.
7. Storm 集群的安装
- 准备安装文件
apache-storm-1.0.2.tar.gz
- 解压
[root@uplooking01 /soft]
tar -zxvf apache-storm-1.0.2.tar.gz -C /opt
mv apache-storm-1.0.2/ storm
- 配置 storm
storm-env.sh
[root@uplooking01 /soft]
export JAVA_HOME=/opt/jdk
export STORM_CONF_DIR="/opt/storm/conf"
storm.yaml
[root@uplooking01 /opt/storm/conf]
storm.zookeeper.servers:
- "uplooking03"
- "uplooking04"
- "uplooking05"
#配置两个主节点, 实现主节点的单点故障
nimbus.seeds: ["uplooking01", "uplooking02"]
storm.local.dir: "/opt/storm/storm-local"
#配置从节点的槽数
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 分发到其他节点
[root@uplooking01 /]
scp -r /opt/storm uplooking02:/opt
scp -r /opt/storm uplooking03:/opt
scp -r /opt/storm uplooking04:/opt
scp -r /opt/storm uplooking05:/opt
- 启动 storm
[root@uplooking01 /]
#启动主进程和 ui 进程
nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking02 /]
#启动主进程(numbus)
nohup /opt/storm/bin/storm numbus >/dev/null 2>&1 &
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
# 启动从节点进程(supervisor)
[root@uplooking03 /]
nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking04 /]
nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
[root@uplooking05 /]
nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
8. Storm 集群的启动脚本
#!/bin/bash
#启动 nimbus
for nimbusHost in `cat /opt/shell/nimbus.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T root@${nimbusHost} << eeooff
nohup /opt/storm/bin/storm nimbus >/dev/null 2>&1 &
eeooff
done
#启动 supervisor
for supervisorHost in `cat /opt/shell/supervisor.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T root@${supervisorHost} << eeooff
nohup /opt/storm/bin/storm supervisor >/dev/null 2>&1 &
eeooff
done
#启动 logviewer
for logviewerHost in `cat /opt/shell/logviewer.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T root@${logviewerHost} << eeooff
nohup /opt/storm/bin/storm logviewer >/dev/null 2>&1 &
eeooff
done
#启动 ui
for uiHost in `cat /opt/shell/ui.host`
do
#-T 进制分配伪终端 一般自动化脚本不需要分配伪终端
ssh -T root@${uiHost} << eeooff
nohup /opt/storm/bin/storm ui >/dev/null 2>&1 &
eeooff
done
9. Storm 实现数字累加
- 编写 Spout
public class MySpout extends BaseRichSpout {
private SpoutOutputCollector collector;
// 初始化累加的数字
int num = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;}
@Override
public void nextTuple() {collector.emit(new Values(num));
num++;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("mynum"));
}
}
- 编写 Bolt
public class MyBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { }
@Override
public void execute(Tuple tuple) {Integer num = tuple.getIntegerByField("mynum");
System.out.println(num);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
- 编写 Topology
public class MyTopology {public static void main(String[] args) {
// 创建自定义的 spout
MySpout mySpout = new MySpout();
// 创建自定义的 bolt
MyBolt myBolt = new MyBolt();
// 创建 topology 名称
String topologyName = "MyNumTopology";
// 创建 topology 的配置对象
Map conf = new Config();
// 创建 topology 的构造器
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 为 topology 设置 spout 和 bolt
topologyBuilder.setSpout("myspout", mySpout);
topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");
// 创建本地的 topology 提交器
StormTopology stormTopology = topologyBuilder.createTopology();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, stormTopology);
}
}
10. 多个 Bolt 的问题
- 定义下一个 Bolt
public class MyBolt02 extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { }
@Override
public void execute(Tuple tuple) {System.out.println(tuple.getIntegerByField("mynum02") + ".....");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}}
- 第一个 Bolt 中给第二个 Bolt 发射数据
public class MyBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}
@Override
public void execute(Tuple tuple) {Integer num = tuple.getIntegerByField("mynum");
System.out.println(num);
collector.emit(new Values(num));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("mynum02"));
}
}
- 在 Topology 中配置第二个 Bolt
public class MyTopology {public static void main(String[] args) {
// 创建自定义的 spout
MySpout mySpout = new MySpout();
// 创建自定义的 bolt
MyBolt myBolt = new MyBolt();
MyBolt02 myBolt02 = new MyBolt02();
// 创建 topology 名称
String topologyName = "MyNumTopology";
// 创建 topology 的配置对象
Map conf = new Config();
// 创建 topology 的构造器
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 为 topology 设置 spout 和 bolt
topologyBuilder.setSpout("myspout", mySpout);
topologyBuilder.setBolt("mybolt", myBolt).shuffleGrouping("myspout");
topologyBuilder.setBolt("mybolt02", myBolt02).shuffleGrouping("mybolt");
// 创建本地的 topology 提交器
StormTopology stormTopology = topologyBuilder.createTopology();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, conf, stormTopology);
}
}
11. 提交作业到集群
StormSubmitter.submitTopology(topologyName, conf, stormTopology);
12. Storm 的并行度
在 storm 中的并行度说的就是一个进程的运行需要多少个线程来参与,如果 storm 运行的线程个数 +1,则并行度 +1
Worker :
- worker 是一个进程级别的概念,可以通过 jps 查看的到
- worker 是一个 Topology 实例的子集,也就是说一个 Topology 的实例在 supervisor 中运行,可以在一个或者多个 supervisor 中启动一个或者多个 worker 进程
- 一个 worker 进程只能为一个 Topology 实例服务
- 所以 Topology 和 worker 的关系 ===>1:N
- 进程是由多个线程来组成,这里的线程就是 Executor
- conf.setNumWorkers(int workers)
- 所以 worker 和 executor 的关系 ===>1:N
- 每一个 executor 线程具体干活是由一个个 task 任务的实例来完成的
- 在 builer.setSpout/setBolt 的第三个参数设置
- Task 真正在 topology 干活的实例,一个 executor 线程,默认情况下对应了 1 个 task 的实例的
- Executor 和 Task 的关系 ===>1:N
- builder.setSpout().setNumTasks(tasks)// 设置的是 spout 对应的 executor 拥有几个 task 实例 builder.setBolt().setNumTasks(tasks)// 设置的是 bolt 对应的 executor 拥有几个 task 实例
13. Storm 中的消息确认机制
- 在 spout 中如果发送消息时指定 messageId 则代表开启消息确认机制, 如果不指定 messageID 则代表不开启消息确认机制
- 如果 Spout 中开启了消息确认机制则在 bolt 中需要用 ack()方法来确认消息接收成功
- 在 Soput 中重写响应的 fail()和 ack()方法来处理消息成功或者失败的回调逻辑
- Storm 默认如果不确认消息接收成功则 30s 之后返回消息失败
- 消息确认机制要慎重使用 ( 效率换取安全)