关于大数据:大数据开发技术之Storm原理与实践

一、Storm简介
1. 引例
在介绍Storm之前,咱们先看一个日志统计的例子:如果咱们想要依据用户的拜访日志统计应用斗鱼客户端的用大数据培训户的地区散布状况,个别状况下咱们会分这几步:
• 取出拜访日志中客户端的IP
• 把IP转换成对应地区
• 依照地区进行统计

Hadoop貌似就能够轻松搞定:
• map做ip提取,转换成地区
• reduce以地区为key聚合,计数统计
• 从HDFS取出后果

如果有时效性要求呢?
• 小时级:还行,每小时跑一个MapReduce Job
• 10分钟:还对付能跑
• 5分钟 :够呛了,等槽位可能要几分钟呢
• 1分钟 :算了吧,启动Job就要几十秒呢
• 秒级 :… 要满足秒级别的数据统计需要,须要
• 过程常驻运行;
• 数据在内存中

Storm正好适宜这种需要。
2. 个性
Storm是一个分布式实时流式计算平台。次要个性如下:
• 简略的编程模型:相似于MapReduce升高了并行批处理复杂性,Storm升高了实时处理的复杂性,只需实现几个接口即可(Spout实现ISpout接口,Bolt实现IBolt接口)。
• 反对多种语言:你能够在Storm之上应用各种编程语言。默认反对Clojure、Java、Ruby和Python。要减少对其余语言的反对,只需实现一个简略的Storm通信协议即可。
• 容错性:nimbus、supervisor都是无状态的, 能够用kill -9来杀死Nimbus和Supervisor过程, 而后再重启它们,工作照常进行; 当worker失败后, supervisor会尝试在本机重启它。
• 分布式:计算是在多个线程、过程和服务器之间并行进行的。
• 持久性、可靠性:音讯被长久化到本地磁盘,并且反对数据备份避免数据失落。
• 牢靠的音讯解决:Storm保障每个音讯至多能失去一次残缺解决。工作失败时,它会负责从音讯源重试音讯(ack机制)。
• 疾速、实时:Storm保障每个音讯能能失去疾速的解决。

3. 与罕用其余大数据计算平台比照
• Storm vs. MapReduce Storm的一个拓扑常驻内存运行,MR作业运行完了进行就被kill了;storm是流式解决,MR是批处理;Storm数据在内存中不写磁盘,而MR会与磁盘进行交互;Storm的DAG(有向无环图)模型能够组合多个阶段,而MR只能够有MAP和REDUCE两个阶段。

Storm vs. Spark Streaming Storm解决的是每次传入的一条数据,Spark Streaming理论解决的是微批量数据。
二、Storm的架构和运行时原理
1. 集群架构

如上图所示,一个典型的storm集群蕴含一个主控节点Nimbus,负责资源分配和任务调度;还有若干个子节点Supervisor,负责承受nimbus调配的工作,启动和进行属于本人治理的worker过程;Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群实现。
2. Storm的容错(Fault Tolerance)机制
Nimbus和Supervisor过程被设计成疾速失败(fail fast)的(当遇到异样的状况,过程就会挂掉)并且是无状态的(状态都保留在Zookeeper或者在磁盘上)。

  1. Nimbus与Supervisor自身也是无状态的,状态信息是由zookeeper存储(实现了高可用,当nimbus挂掉,能够找另外一个节点启动nimbus过程,状态信息从zookeeper取得)。
  2. 在Nimbus过程失败后,能够疾速重启恢复正常工作,不须要很长的工夫来进行初始化和状态复原。
  3. 当Nimbus从zookeeper得悉有supervisor节点挂掉,能够将该节点的工作重新分配给其余子节点。
  4. Nimbus在“某种程度”上属于单点故障的。在理论中,即便Nimbus过程挂掉,也不会有灾难性的事件产生 。

当Nimbus挂掉会怎么?

  1. 曾经存在的拓扑能够持续失常运行,然而不能提交新拓扑;
  2. 正在运行的worker过程依然能够持续工作。而且当worker挂掉,Supervisor会始终重启worker。
  3. 失败的工作不会被调配到其余机器(是Nimbus的职责)上了

当一个Supervisor(slave节点)挂掉会怎么?

  1. 调配到这台机器的所有工作(task)会超时,Nimbus会把这些工作(task)重新分配给其余机器。当一个worker挂掉会怎么样?
  2. 当一个worker挂掉,Supervisor会重启它。如果启动始终失败那么此时worker也就不能和Nimbus放弃心跳了,Nimbus会重新分配worker到其余机器

3. Storm的编程模型
Strom在运行中可分为spout与bolt两个组件,其中,数据源从spout开始,数据以tuple的形式发送到bolt,多个bolt能够串连起来,一个bolt也能够接入多个spot/bolt。运行时Topology如下图:

编程模型的一些基本概念:
• 元组

• storm应用tuple(元组)来作为它的数据模型。每个tuple由一堆域(field)组成,每个域有一个值,并且每个值能够是任何类型。
• 一个tuple能够看作一个没有办法的java对象。总体来看,storm反对所有的根本类型、字符串以及字节数组作为tuple的值类型。

• Spout

• i. BaseRichSpout是实现 IRichSpout接口的类,对上述必要的办法有默认的实现;
• ii. 如果业务须要自定义ack()、fail() 等办法,抉择实现 IRichSpout接口;
• iii. 如果业务没有自定义需要,抉择继承BaseRichSpout类,能够不实现并不一定须要用户实现的办法,简化开发。
• i. open办法是初始化动作。容许你在该spout初始化时做一些动作,传入了上下文,不便取上下文的一些数据。
• ii. close办法在该spout敞开前执行。
• iii. activate和deactivate :一个spout能够被临时激活和敞开,这两个办法别离在对应的时刻被调用。
• iv. nextTuple 用来发射数据。Spout中最重要的办法。
• v. ack(Object)传入的Object其实是一个id,惟一示意一个tuple。该办法是这个id所对应的tuple被胜利解决后执行。
• vi. fail(Object)同ack,只不过是tuple解决失败时执行。
• Spout是在一个topology中产生源数据流的组件。通常状况下spout会从内部数据源中读取数据,而后转换为topology外部的源数据。Spout是一个被动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只有在其中生成源数据即可。
• 实现Spout时,须要实现最顶层形象ISpout接口外面的几个办法

• 实现Spout时,还须要实现Icomponent接口,来申明发射到上游bolt的字段名称。
• 通常状况下,实现一个Spout,能够间接实现接口IRichSpout,如果不想写多余的代码,能够间接继承BaseRichSpout。

• Bolt

• prepare办法是初始化动作。容许你在该Bolt初始化时做一些动作,传入了上下文,不便取上下文的一些数据。
• excute 用来解决数据。Bolt中最重要的办法。
• cleanup在该Bolt敞开前执行.
• 在拓扑中所有的计算逻辑都是在Bolt中实现的。一个Bolt能够解决任意数量的输出流,产生任意数量新的输入流。Bolt能够做函数解决,过滤,流的合并,聚合,存储到数据库等操作。在Bolt中最次要的函数是execute函数,它应用一个新的元组当作输出。Bolt应用OutputCollector对象来吐出新的元组。
• 实现Bolt时,须要实现IBolt接口,它申明了Bolt的外围办法,负责Topology所有的计算逻辑:
• 实现Bolt时,还须要实现Icomponent接口,来申明发射到上游bolt的字段名称
• 通常状况下,实现一个Bolt ,能够间接实现接口IRichBolt/IBasicBolt,也能够间接继承BaseRichBolt/BaseBasicBolt。IBasicBolt/BaseBasicBolt在emit数据的时候,会主动和输出的tuple相关联,而在execute办法完结的时候那个输出tuple会被主动ack。应用IRichBolt/BaseRichBolt须要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以放弃tracker链路,即collector.emit(oldTuple,newTuple);并且须要在execute执行胜利后调用OutputCollector.ack(tuple), 当失败解决时,执行OutputCollector.fail(tuple)。

• Stream Groupings(流分组)

• 定义了一个流在Bolt工作间该如何被切分。

  • 随机分组(Shuffle grouping):随机散发tuple到Bolt的工作,保障每个工作取得相等数量的tuple。
  • 字段分组(Fields grouping):依据指定字段宰割数据流,并分组。例如,依据“user-id”字段,雷同“user-id”的元组总是散发到同一个工作,不同“user-id”的元组可能散发到不同的工作。
  • 全副分组(All grouping):tuple被复制到bolt的所有工作。这种类型须要审慎应用。
  • 全局分组(Global grouping):全副流都调配到bolt的同一个工作。明确地说,是调配给ID最小的那个task。
  • 无分组(None grouping):你不须要关怀流是如何分组。目前,无分组等效于随机分组。
  • 间接分组(Direct grouping):这是一个特地的分组类型。元组生产者决定tuple由哪个元组解决者工作接管。
    4. Storm音讯解决的可靠性机制
    可靠性机制(Ack机制)指的是Storm能够保障从Spout收回的每个音讯都能被齐全解决。一条音讯被“残缺解决”,指一个从Spout收回的元组所触发的音讯树中所有的音讯都被Storm解决了。如果在指定的超时工夫里,这个Spout元组触发的音讯树中有任何一个音讯没有解决完,就认为这个Spout元组解决失败了。这个超时工夫是通过每个拓扑的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置项来进行配置的,默认是30秒。
    Storm 是这样实现可靠性机制的:
    • Storm 的拓扑有一些非凡的称为“acker”的工作,这些工作负责跟踪每个 Spout 收回的 tuple 的 DAG。当一个 acker 发现一个 DAG 完结了,它就会给创立 spout tuple 的 Spout 工作发送一条音讯,让这个工作来应答这个音讯。你能够应用Config.TOPOLOGY_ACKERS 来配置拓扑的 acker 数量。Storm 默认会将 acker 的数量设置为1,不过如果你有大量音讯的解决需要,你可能须要减少这个数量。
    • acker工作跟踪一个元组树,只占用固定大小的空间(大概20字节)。若采纳 Ack机制,每个解决的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果不ack/fail每一个tuple, 那么最终你会看到OutOfMemory谬误。
    • 编程实现(必要条件):acker数设置大于0;Spout发送元组时,指定messageId;bolt解决完元组时,肯定要调用ack/fail办法。

5. Storm的并发机制
在一个 Storm 集群中,Storm 次要通过以下三个部件来运行拓扑:工作过程(worker processes)、执行器(executors)、工作(tasks)。三者的关系如下:

• 1个worker过程执行的是1个topology的子集(注:不会呈现1个worker为多个topology服务)。1个worker过程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因而,1个运行中的topology就是由集群中多台物理机上的多个worker过程组成的。
• executor是1个被worker过程启动的独自线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task能够是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例)。
• task是最终运行spout或bolt中代码的单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute办法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component应用的executor线程数能够动静调整(例如:1个executor线程能够执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认状况下task的数目等于executor线程数目,即1个executor线程只运行1个task。

三、构建基于Storm的实时数据分析平台实战经验
构建基于Storm的实时数据分析平台,第一步当然应该是搭建storm集群。这个网上的教程还有轮子切实是太多,我就不贴出来了。请大家Google或者Baidu之,而后一步步搭建集群就完了。
1. Storm应用的一些实战经验
• 在架构上,举荐 “消息中间件 + storm + 内部存储” 3架马车式架构

• Storm从消息中间件中取出数据,计算出后果,存储到内部存储上
• 通常消息中间件举荐应用RocketMQ,Kafka
• 内部存储举荐应用HBase,Redis
• 该架构,十分不便Storm程序进行重启(如因为减少业务降级程序)
• 职责清晰化,缩小和内部零碎的交互,Storm将计算结果存储到内部存储后,用户的查问就无需拜访Storm中服务过程,查问内部存储即可。在理论计算中,经常发现须要做数据勘误,因而在设计整个我的项目时,须要思考重跑性能 。在最终生成的后果中,数据最好带工夫戳 。

• 联合Storm UI查看topology各个组件的负载,合理配置各组件的并发度。
• Spout和Bolt的构造函数只会在submit Topology时调一次,而后序列化起来,间接发给工作节点,工作节点里实例化时不会被调用里,所以简单的成员变量记得都定义成transient,在open(),prepare()里初始化及连贯数据库等资源。
• 依照性能来说, 应用ack机制一般接口 < 关掉ack机制的一般接口, 因而,须要依据业务对数据处理的速率需要决定是否采纳ack机制。
• 当应用fieldGrouping形式时,有可能造成有的task工作重,有的task工作轻,因而让整个数据流变慢, 尽量让task之间压力平均。
• KafkaSpout的并发度最好设置成Kafka的分区数。生产Kafka时, 一个分区只能一个线程生产,因而有可能简略的减少并发无奈解决问题, 能够尝试减少Kafka的分区数。
• 如果topology性能有问题, 能够尝试关掉ack机制,查看性能如何,如果性能有大幅晋升,则预示着瓶颈不在spout, 有可能是Acker的并发少了,或者业务解决逻辑慢了。

2. Storm编程实际-WordCount
• Spout

• SpiltSentenceBolt

• WordCountBolt

• ReportBolt

• Topology

• Result

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理