共计 6396 个字符,预计需要花费 16 分钟才能阅读完成。
一、Storm 简介
1. 引例
在介绍 Storm 之前,咱们先看一个日志统计的例子:如果咱们想要依据用户的拜访日志统计应用斗鱼客户端的用大数据培训户的地区散布状况,个别状况下咱们会分这几步:
• 取出拜访日志中客户端的 IP
• 把 IP 转换成对应地区
• 依照地区进行统计
Hadoop 貌似就能够轻松搞定:
• map 做 ip 提取,转换成地区
• reduce 以地区为 key 聚合,计数统计
• 从 HDFS 取出后果
如果有时效性要求呢?
• 小时级:还行,每小时跑一个 MapReduce Job
• 10 分钟:还对付能跑
• 5 分钟:够呛了,等槽位可能要几分钟呢
• 1 分钟:算了吧,启动 Job 就要几十秒呢
• 秒级:… 要满足秒级别的数据统计需要,须要
• 过程常驻运行;
• 数据在内存中
Storm 正好适宜这种需要。
2. 个性
Storm 是一个分布式实时流式计算平台。次要个性如下:
• 简略的编程模型:相似于 MapReduce 升高了并行批处理复杂性,Storm 升高了实时处理的复杂性,只需实现几个接口即可(Spout 实现 ISpout 接口,Bolt 实现 IBolt 接口)。
• 反对多种语言:你能够在 Storm 之上应用各种编程语言。默认反对 Clojure、Java、Ruby 和 Python。要减少对其余语言的反对,只需实现一个简略的 Storm 通信协议即可。
• 容错性:nimbus、supervisor 都是无状态的, 能够用 kill - 9 来杀死 Nimbus 和 Supervisor 过程, 而后再重启它们, 工作照常进行; 当 worker 失败后, supervisor 会尝试在本机重启它。
• 分布式:计算是在多个线程、过程和服务器之间并行进行的。
• 持久性、可靠性:音讯被长久化到本地磁盘,并且反对数据备份避免数据失落。
• 牢靠的音讯解决:Storm 保障每个音讯至多能失去一次残缺解决。工作失败时,它会负责从音讯源重试音讯(ack 机制)。
• 疾速、实时:Storm 保障每个音讯能能失去疾速的解决。
3. 与罕用其余大数据计算平台比照
• Storm vs. MapReduce Storm 的一个拓扑常驻内存运行,MR 作业运行完了进行就被 kill 了;storm 是流式解决,MR 是批处理;Storm 数据在内存中不写磁盘,而 MR 会与磁盘进行交互;Storm 的 DAG(有向无环图)模型能够组合多个阶段,而 MR 只能够有 MAP 和 REDUCE 两个阶段。
Storm vs. Spark Streaming Storm 解决的是每次传入的一条数据,Spark Streaming 理论解决的是微批量数据。
二、Storm 的架构和运行时原理
1. 集群架构
如上图所示,一个典型的 storm 集群蕴含一个主控节点 Nimbus,负责资源分配和任务调度;还有若干个子节点 Supervisor,负责承受 nimbus 调配的工作,启动和进行属于本人治理的 worker 过程;Nimbus 和 Supervisor 之间的所有协调工作都是通过 Zookeeper 集群实现。
2. Storm 的容错 (Fault Tolerance) 机制
Nimbus 和 Supervisor 过程被设计成疾速失败(fail fast) 的(当遇到异样的状况,过程就会挂掉)并且是无状态的(状态都保留在 Zookeeper 或者在磁盘上)。
- Nimbus 与 Supervisor 自身也是无状态的,状态信息是由 zookeeper 存储(实现了高可用,当 nimbus 挂掉,能够找另外一个节点启动 nimbus 过程,状态信息从 zookeeper 取得)。
- 在 Nimbus 过程失败后,能够疾速重启恢复正常工作,不须要很长的工夫来进行初始化和状态复原。
- 当 Nimbus 从 zookeeper 得悉有 supervisor 节点挂掉,能够将该节点的工作重新分配给其余子节点。
- Nimbus 在“某种程度”上属于单点故障的。在理论中,即便 Nimbus 过程挂掉,也不会有灾难性的事件产生。
当 Nimbus 挂掉会怎么?
- 曾经存在的拓扑能够持续失常运行,然而不能提交新拓扑;
- 正在运行的 worker 过程依然能够持续工作。而且当 worker 挂掉,Supervisor 会始终重启 worker。
- 失败的工作不会被调配到其余机器 (是 Nimbus 的职责) 上了
当一个 Supervisor(slave 节点)挂掉会怎么?
- 调配到这台机器的所有工作 (task) 会超时,Nimbus 会把这些工作 (task) 重新分配给其余机器。当一个 worker 挂掉会怎么样?
- 当一个 worker 挂掉,Supervisor 会重启它。如果启动始终失败那么此时 worker 也就不能和 Nimbus 放弃心跳了,Nimbus 会重新分配 worker 到其余机器
3. Storm 的编程模型
Strom 在运行中可分为 spout 与 bolt 两个组件,其中,数据源从 spout 开始,数据以 tuple 的形式发送到 bolt,多个 bolt 能够串连起来,一个 bolt 也能够接入多个 spot/bolt。运行时 Topology 如下图:
编程模型的一些基本概念:
• 元组
• storm 应用 tuple(元组)来作为它的数据模型。每个 tuple 由一堆域(field)组成,每个域有一个值,并且每个值能够是任何类型。
• 一个 tuple 能够看作一个没有办法的 java 对象。总体来看,storm 反对所有的根本类型、字符串以及字节数组作为 tuple 的值类型。
• Spout
• i. BaseRichSpout 是实现 IRichSpout 接口的类,对上述必要的办法有默认的实现;
• ii. 如果业务须要自定义 ack()、fail() 等办法,抉择实现 IRichSpout 接口;
• iii. 如果业务没有自定义需要,抉择继承 BaseRichSpout 类,能够不实现并不一定须要用户实现的办法,简化开发。
• i. open 办法是初始化动作。容许你在该 spout 初始化时做一些动作,传入了上下文,不便取上下文的一些数据。
• ii. close 办法在该 spout 敞开前执行。
• iii. activate 和 deactivate:一个 spout 能够被临时激活和敞开,这两个办法别离在对应的时刻被调用。
• iv. nextTuple 用来发射数据。Spout 中最重要的办法。
• v. ack(Object)传入的 Object 其实是一个 id,惟一示意一个 tuple。该办法是这个 id 所对应的 tuple 被胜利解决后执行。
• vi. fail(Object)同 ack,只不过是 tuple 解决失败时执行。
• Spout 是在一个 topology 中产生源数据流的组件。通常状况下 spout 会从内部数据源中读取数据,而后转换为 topology 外部的源数据。Spout 是一个被动的角色,其接口中有个 nextTuple()函数,storm 框架会不停地调用此函数,用户只有在其中生成源数据即可。
• 实现 Spout 时,须要实现最顶层形象 ISpout 接口外面的几个办法
• 实现 Spout 时,还须要实现 Icomponent 接口,来申明发射到上游 bolt 的字段名称。
• 通常状况下,实现一个 Spout,能够间接实现接口 IRichSpout,如果不想写多余的代码,能够间接继承 BaseRichSpout。
• Bolt
• prepare 办法是初始化动作。容许你在该 Bolt 初始化时做一些动作,传入了上下文,不便取上下文的一些数据。
• excute 用来解决数据。Bolt 中最重要的办法。
• cleanup 在该 Bolt 敞开前执行.
• 在拓扑中所有的计算逻辑都是在 Bolt 中实现的。一个 Bolt 能够解决任意数量的输出流,产生任意数量新的输入流。Bolt 能够做函数解决,过滤,流的合并,聚合,存储到数据库等操作。在 Bolt 中最次要的函数是 execute 函数,它应用一个新的元组当作输出。Bolt 应用 OutputCollector 对象来吐出新的元组。
• 实现 Bolt 时,须要实现 IBolt 接口,它申明了 Bolt 的外围办法,负责 Topology 所有的计算逻辑:
• 实现 Bolt 时,还须要实现 Icomponent 接口,来申明发射到上游 bolt 的字段名称
• 通常状况下,实现一个 Bolt,能够间接实现接口 IRichBolt/IBasicBolt,也能够间接继承 BaseRichBolt/BaseBasicBolt。IBasicBolt/BaseBasicBolt 在 emit 数据的时候,会主动和输出的 tuple 相关联,而在 execute 办法完结的时候那个输出 tuple 会被主动 ack。应用 IRichBolt/BaseRichBolt 须要在 emit 数据的时候,显示指定该数据的源 tuple 要加上第二个参数 anchor tuple,以放弃 tracker 链路,即 collector.emit(oldTuple,newTuple); 并且须要在 execute 执行胜利后调用 OutputCollector.ack(tuple), 当失败解决时,执行 OutputCollector.fail(tuple)。
• Stream Groupings(流分组)
• 定义了一个流在 Bolt 工作间该如何被切分。
- 随机分组(Shuffle grouping):随机散发 tuple 到 Bolt 的工作,保障每个工作取得相等数量的 tuple。
- 字段分组(Fields grouping):依据指定字段宰割数据流,并分组。例如,依据“user-id”字段,雷同“user-id”的元组总是散发到同一个工作,不同“user-id”的元组可能散发到不同的工作。
- 全副分组(All grouping):tuple 被复制到 bolt 的所有工作。这种类型须要审慎应用。
- 全局分组(Global grouping):全副流都调配到 bolt 的同一个工作。明确地说,是调配给 ID 最小的那个 task。
- 无分组(None grouping):你不须要关怀流是如何分组。目前,无分组等效于随机分组。
- 间接分组(Direct grouping):这是一个特地的分组类型。元组生产者决定 tuple 由哪个元组解决者工作接管。
4. Storm 音讯解决的可靠性机制
可靠性机制(Ack 机制)指的是 Storm 能够保障从 Spout 收回的每个音讯都能被齐全解决。一条音讯被“残缺解决”,指一个从 Spout 收回的元组所触发的音讯树中所有的音讯都被 Storm 解决了。如果在指定的超时工夫里,这个 Spout 元组触发的音讯树中有任何一个音讯没有解决完,就认为这个 Spout 元组解决失败了。这个超时工夫是通过每个拓扑的 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 配置项来进行配置的,默认是 30 秒。
Storm 是这样实现可靠性机制的:
• Storm 的拓扑有一些非凡的称为“acker”的工作,这些工作负责跟踪每个 Spout 收回的 tuple 的 DAG。当一个 acker 发现一个 DAG 完结了,它就会给创立 spout tuple 的 Spout 工作发送一条音讯,让这个工作来应答这个音讯。你能够应用 Config.TOPOLOGY_ACKERS 来配置拓扑的 acker 数量。Storm 默认会将 acker 的数量设置为 1,不过如果你有大量音讯的解决需要,你可能须要减少这个数量。
• acker 工作跟踪一个元组树,只占用固定大小的空间(大概 20 字节)。若采纳 Ack 机制,每个解决的 tuple,必须被 ack 或者 fail。因为 storm 追踪每个 tuple 要占用内存。所以如果不 ack/fail 每一个 tuple,那么最终你会看到 OutOfMemory 谬误。
• 编程实现(必要条件):acker 数设置大于 0;Spout 发送元组时,指定 messageId;bolt 解决完元组时,肯定要调用 ack/fail 办法。
5. Storm 的并发机制
在一个 Storm 集群中,Storm 次要通过以下三个部件来运行拓扑:工作过程(worker processes)、执行器(executors)、工作(tasks)。三者的关系如下:
• 1 个 worker 过程执行的是 1 个 topology 的子集(注:不会呈现 1 个 worker 为多个 topology 服务)。1 个 worker 过程会启动 1 个或多个 executor 线程来执行 1 个 topology 的 component(spout 或 bolt)。因而,1 个运行中的 topology 就是由集群中多台物理机上的多个 worker 过程组成的。
• executor 是 1 个被 worker 过程启动的独自线程。每个 executor 只会运行 1 个 topology 的 1 个 component(spout 或 bolt)的 task(注:task 能够是 1 个或多个,storm 默认是 1 个 component 只生成 1 个 task,executor 线程里会在每次循环里顺序调用所有 task 实例)。
• task 是最终运行 spout 或 bolt 中代码的单元(注:1 个 task 即为 spout 或 bolt 的 1 个实例,executor 线程在执行期间会调用该 task 的 nextTuple 或 execute 办法)。topology 启动后,1 个 component(spout 或 bolt)的 task 数目是固定不变的,但该 component 应用的 executor 线程数能够动静调整(例如:1 个 executor 线程能够执行该 component 的 1 个或多个 task 实例)。这意味着,对于 1 个 component 存在这样的条件:#threads<=#tasks(即:线程数小于等于 task 数目)。默认状况下 task 的数目等于 executor 线程数目,即 1 个 executor 线程只运行 1 个 task。
三、构建基于 Storm 的实时数据分析平台实战经验
构建基于 Storm 的实时数据分析平台,第一步当然应该是搭建 storm 集群。这个网上的教程还有轮子切实是太多,我就不贴出来了。请大家 Google 或者 Baidu 之,而后一步步搭建集群就完了。
1. Storm 应用的一些实战经验
• 在架构上,举荐“消息中间件 + storm + 内部存储”3 架马车式架构
• Storm 从消息中间件中取出数据,计算出后果,存储到内部存储上
• 通常消息中间件举荐应用 RocketMQ,Kafka
• 内部存储举荐应用 HBase,Redis
• 该架构,十分不便 Storm 程序进行重启(如因为减少业务降级程序)
• 职责清晰化,缩小和内部零碎的交互,Storm 将计算结果存储到内部存储后,用户的查问就无需拜访 Storm 中服务过程,查问内部存储即可。在理论计算中,经常发现须要做数据勘误,因而在设计整个我的项目时,须要思考重跑性能。在最终生成的后果中,数据最好带工夫戳。
• 联合 Storm UI 查看 topology 各个组件的负载,合理配置各组件的并发度。
• Spout 和 Bolt 的构造函数只会在 submit Topology 时调一次,而后序列化起来,间接发给工作节点,工作节点里实例化时不会被调用里,所以简单的成员变量记得都定义成 transient,在 open(),prepare()里初始化及连贯数据库等资源。
• 依照性能来说,应用 ack 机制一般接口 < 关掉 ack 机制的一般接口,因而,须要依据业务对数据处理的速率需要决定是否采纳 ack 机制。
• 当应用 fieldGrouping 形式时,有可能造成有的 task 工作重,有的 task 工作轻,因而让整个数据流变慢,尽量让 task 之间压力平均。
• KafkaSpout 的并发度最好设置成 Kafka 的分区数。生产 Kafka 时,一个分区只能一个线程生产,因而有可能简略的减少并发无奈解决问题,能够尝试减少 Kafka 的分区数。
• 如果 topology 性能有问题,能够尝试关掉 ack 机制,查看性能如何,如果性能有大幅晋升,则预示着瓶颈不在 spout,有可能是 Acker 的并发少了,或者业务解决逻辑慢了。
2. Storm 编程实际 -WordCount
• Spout
• SpiltSentenceBolt
• WordCountBolt
• ReportBolt
• Topology
• Result