一、Kafka Stream 背景

1、Kafka Stream 简介

提供了对存储于Kafka内的树进行流式解决和剖析的性能

Kafka Stream的特点:

  • Kafka Stream提供了一个非常简单而轻量的Library,它能够十分不便地嵌入任意Java利用中,也能够任意形式打包和部署
  • 除了Kafka外,无任何内部依赖
  • 充分利用Kafka分区机制实现程度扩大和程序性保障
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 反对正好一次解决语义
  • 提供记录级的解决能力,从而实现毫秒级的低提早
  • 反对基于事件工夫的窗口操作,并且可解决晚到的数据(late arrival of records)
  • 同时提供底层的解决原语Processor(相似于Storm的spout和bolt),以及高层形象的DSL(相似于Spark的map/group/reduce)

2、流式计算

流式计算模型中,输出是继续的,意味着永远拿不到全量数据去做计算。同时,计算结果是继续输入的, 也即计算结果在工夫上也是无界的。流式计算个别对实时性要求较高,同时个别是先定义指标计算,而后数据到来之后将计算逻辑利用于数据。同时为了进步计算效率,往往尽可能采纳增量计算代替全量计算。

批量解决模型中,个别先有全量数据集,而后定义计算逻辑,并将计算利用于全量数据。特点是全量计算,并且计算结果一次性全量输入。

3、 为什么要有Kafka Stream

以后曾经有十分多的流式解决零碎,最出名且利用最多的开源流式解决零碎有Spark Streaming和Apache Storm。Apache Storm倒退多年,利用宽泛,提供记录级别的解决能力,以后也反对SQL on Stream。而Spark Streaming基于Apache Spark,能够十分不便与图计算,SQL解决等集成,功能强大,对于相熟其它Spark利用开发的用户而言应用门槛低。另外,目前支流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark与Apache Storm拥用如此多的劣势,那为何还须要Kafka Stream呢?笔者认为次要有如下起因:

第一,Spark和Storm都是流式解决框架,而Kafka Stream提供的是一个基于Kafka的流式解决类库。框架要求开发者依照特定的形式去开发逻辑局部,供框架调用。开发者很难理解框架的具体运行形式,从而使得调试老本高,并且应用受限。而Kafka Stream作为流式解决类库,间接提供具体的类给开发者调用,整个利用的运行形式次要由开发者管制,方便使用和调试。

第二,尽管Cloudera与Hortonworks不便了Storm和Spark的部署,然而这些框架的部署依然绝对简单。而Kafka Stream作为类库,能够十分不便的嵌入应用程序中,它对利用的打包和部署根本没有任何要求。更为重要的是,Kafka Stream充分利用了Kafka的分区机制和Consumer的Rebalance机制,使得Kafka Stream能够十分不便的程度扩大,并且各个实例能够应用不同的部署形式。具体来说,每个运行Kafka Stream的应用程序实例都蕴含了Kafka Consumer实例,多个同一利用的实例之间并行处理数据集。而不同实例之间的部署形式并不要求统一,比方局部实例能够运行在Web容器中,局部实例可运行在Docker或Kubernetes中。

第三,就流式解决零碎而言,根本都反对Kafka作为数据源。例如Storm具备专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是支流的流式解决零碎的规范数据源。换言之,大部分流式零碎中都已部署了Kafka,此时应用Kafka Stream的老本非常低。

第四,应用Storm或Spark Streaming时,须要为框架自身的过程预留资源,如Storm的supervisor和Spark on YARN的node manager。即便对于利用实例而言,框架自身也会占用局部资源,如Spark Streaming须要为shuffle和storage预留内存。

第五,因为Kafka自身提供数据长久化,因而Kafka Stream提供滚动部署和滚动降级以及从新计算的能力。

第六,因为Kafka Consumer Rebalance机制,Kafka Stream能够在线动静调整并行度。

二、Kafka Stream架构

1、Kafka Stream整体架构

Kafka Stream的整体架构图如下所示。

目前(Kafka 0.11.0.0)Kafka Stream的数据源只能如上图所示是Kafka。然而处理结果并不一定要如上图所示输入到Kafka。实际上KStream和Ktable的实例化都须要指定Topic。

KStream<String, String> stream = builder.stream("words-stream");KTable<String, String> table = builder.table("words-table", "words-store");

另外,上图中的Consumer和Producer并不需要开发者在利用中显示实例化,而是由Kafka Stream依据参数隐式实例化和治理,从而升高了应用门槛。开发者只须要专一于开发外围业务逻辑,也即上图中Task内的局部。

2、Processor Topology

基于Kafka Stream的流式利用的业务逻辑全副通过一个被称为Processor Topology的中央执行。它与Storm的Topology和Spark的DAG相似,都定义了数据在各个处理单元(在Kafka Stream中被称作Processor)间的流动形式,或者说定义了数据的解决逻辑。

3、Kafka Stream并行模型

Kafka Stream的并行模型中,最小粒度为Task,而每个Task蕴含一个特定子Topology的所有Processor。因而每个Task所执行的代码齐全一样,惟一的不同在于所解决的数据集互补。这一点跟Storm的Topology齐全不一样。Storm的Topology的每一个Task只蕴含一个Spout或Bolt的实例。因而Storm的一个Topology内的不同Task之间须要通过网络通信传递数据,而Kafka Stream的Task蕴含了残缺的子Topology,所以Task之间不须要传递数据,也就不须要网络通信。这一点升高了零碎复杂度,也进步了解决效率。

如果某个Stream的输出Topic有多个(比方2个Topic,1个Partition数为4,另一个Partition数为3),则总的Task数等于Partition数最多的那个Topic的Partition数(max(4,3)=4)。这是因为Kafka Stream应用了Consumer的Rebalance机制,每个Partition对应一个Task。

下图展现了在一个过程(Instance)中以2个Topic(Partition数均为4)为数据源的Kafka Stream利用的并行模型。从图中能够看到,因为Kafka Stream利用的默认线程数为1,所以4个Task全副在一个线程中运行。


为了充分利用多线程的劣势,能够设置Kafka Stream的线程数。下图展现了线程数为2时的并行模型。


前文有提到,Kafka Stream可被嵌入任意Java利用(实践上基于JVM的利用都能够)中,下图展现了在同一台机器的不同过程中同时启动同一Kafka Stream利用时的并行模型。留神,这里要保障两个过程的StreamsConfig.APPLICATION_ID_CONFIG齐全一样。因为Kafka Stream将APPLICATION_ID_CONFIG作为隐式启动的Consumer的Group ID。只有保障APPLICATION_ID_CONFIG雷同,能力保障这两个过程的Consumer属于同一个Group,从而能够通过Consumer Rebalance机制拿到互补的数据集。

既然实现了多过程部署,能够以同样的形式实现多机器部署。该部署形式也要求所有过程的APPLICATION_ID_CONFIG齐全一样。从图上也能够看到,每个实例中的线程数并不要求一样。然而无论如何部署,Task总数总会保障统一。

这里比照一下Kafka Stream的Processor Topology与Storm的Topology。

  • Storm的Topology由Spout和Bolt组成,Spout提供数据源,而Bolt提供计算和数据导出。Kafka Stream的Processor Topology齐全由Processor组成,因为它的数据固定由Kafka的Topic提供。
  • Storm的不同Bolt运行在不同的Executor中,很可能位于不同的机器,须要通过网络通信传输数据。而Kafka Stream的Processor Topology的不同Processor齐全运行于同一个Task中,也就齐全处于同一个线程,无需网络通信。
  • Storm的Topology能够同时蕴含Shuffle局部和非Shuffle局部,并且往往一个Topology就是一个残缺的利用。而Kafka Stream的一个物理Topology只蕴含非Shuffle局部,而Shuffle局部须要通过through操作显示实现,该操作将一个大的Topology分成了2个子Topology。
  • Storm的Topology内,不同Bolt/Spout的并行度能够不一样,而Kafka Stream的子Topology内,所有Processor的并行度齐全一样。
  • Storm的一个Task只蕴含一个Spout或者Bolt的实例,而Kafka Stream的一个Task蕴含了一个子Topology的所有Processor。

4、KTable vs. KStream

KTable和KStream是Kafka Stream中十分重要的两个概念,它们是Kafka实现各种语义的根底。因而这里有必要剖析下二者的区别。

KStream是一个数据流,能够认为所有记录都通过Insert only的形式插入进这个数据流里。而KTable代表一个残缺的数据集,能够了解为数据库中的表。因为每条记录都是Key-Value对,这里能够将Key了解为数据库中的Primary Key,而Value能够了解为一行记录。能够认为KTable中的数据都是通过Update only的形式进入的。也就意味着,如果KTable对应的Topic中新进入的数据的Key曾经存在,那么从KTable只会取出同一Key对应的最初一条数据,相当于新的数据更新了旧的数据。

以下图为例,假如有一个KStream和KTable,基于同一个Topic创立,并且该Topic中蕴含如下图所示5条数据。此时遍历KStream将失去与Topic内数据齐全一样的所有5条数据,且程序不变。而此时遍历KTable时,因为这5条记录中有3个不同的Key,所以将失去3条记录,每个Key对应最新的值,并且这三条数据之间的程序与原来在Topic中的程序保持一致。这一点与Kafka的日志compact雷同。


此时如果对该KStream和KTable别离基于key做Group,对Value进行Sum,失去的后果将会不同。对KStream的计算结果是<Jack,4>,<Lily,7>,<Mike,4>。而对Ktable的计算结果是<Mike,4>,<Jack,3>,<Lily,5>。

5、State store

流式解决中,局部操作是无状态的,例如过滤操作(Kafka Stream DSL中用filer办法实现)。而局部操作是有状态的,须要记录中间状态,如Window操作和聚合计算。State store被用来存储中间状态。它能够是一个长久化的Key-Value存储,也能够是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储。

Topic中存储的数据记录自身是Key-Value模式的,同时Kafka的log compaction机制可对历史数据做compact操作,保留每个Key对应的最初一个Value,从而在保障Key不失落的前提下,缩小总数据量,从而进步查问效率。

结构KTable时,须要指定其state store name。默认状况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,理论就是遍历它对应的state store,或者说遍历Topic的所有key,并取每个Key最新值的过程。为了使得该过程更加高效,默认状况下会对该Topic进行compact操作。
另外,除了KTable,所有状态计算,都须要指定state store name,从而记录中间状态。

三、Kafka Stream如何解决流式零碎中关键问题

1、工夫

在流式数据处理中,工夫是数据的一个十分重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还减少了timestamp属性。目前Kafka Stream反对三种工夫

  • 事件产生工夫。事件产生的工夫,蕴含在数据记录中。产生工夫由Producer在结构ProducerRecord时指定。并且须要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)能力失效。
  • 音讯接管工夫。也即音讯存入Broker的工夫。当Broker或Topic将message.timestamp.type设置为LogAppendTime时失效。此时Broker会在接管到音讯后,存入磁盘前,将其timestamp属性值设置为以后机器工夫。个别音讯接管工夫比拟靠近于事件产生工夫,局部场景下可代替事件产生工夫。
  • 音讯解决工夫。也即Kafka Stream解决音讯时的工夫。

注:Kafka Stream容许通过实现org.apache.kafka.streams.processor.TimestampExtractor接口自定义记录时间。

2、窗口

前文提到,流式数据是在工夫上无界的数据。而聚合操作只能作用在特定的数据集,也即有界的数据集上。因而须要通过某种形式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种十分罕用的设定计算边界的形式。不同的流式解决零碎反对的窗口相似,但不尽相同。

Kafka Stream反对的窗口如下。

(1)Hopping Time Window 该窗口定义如下图所示。它有两个属性,一个是Window size,一个是Advance interval。Window size指定了窗口的大小,也即每次计算的数据集的大小。而Advance interval定义输入的工夫距离。一个典型的利用场景是,每隔5秒钟输入一次过来1个小时内网站的PV或者UV。

(2)Tumbling Time Window该窗口定义如下图所示。能够认为它是Hopping Time Window的一种特例,也即Window size和Advance interval相等。它的特点是各个Window之间齐全不相交。

(3)Sliding Window该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假如该窗口的大小为5秒,则参加Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,能够进行Join计算。

(4)Session Window该窗口用于对Key做Group后的聚合操作中。它须要对Key做分组,而后对组内的数据依据业务需要定义一个窗口的起始点和完结点。一个典型的案例是,心愿通过Session Window计算某个用户拜访网站的工夫。对于一个特定的用户(用Key示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。

3、Join

Kafka Stream因为蕴含KStream和Ktable两种数据集,因而提供如下Join计算

  • KTable Join KTable 后果仍为KTable。任意一边有更新,后果KTable都会更新。
  • KStream Join KStream 后果为KStream。必须带窗口操作,否则会造成Join操作始终不完结。
  • KStream Join KTable / GlobalKTable 后果为KStream。只有当KStream中有新数据时,才会触发Join计算并输入后果。KStream无新数据时,KTable的更新并不会触发Join计算,也不会输入数据。并且该更新只对下次Join失效。一个典型的应用场景是,KStream中的订单信息与KTable中的用户信息做关联计算。

对于Join操作,如果要失去正确的计算结果,须要保障参加Join的KTable或KStream中Key雷同的数据被调配到同一个Task。具体方法是

  • 参加Join的KTable或KStream的Key类型雷同(实际上,业务含意也应该雷同)
  • 参加Join的KTable或KStream对应的Topic的Partition数雷同
  • Partitioner策略的最终后果等效(实现不须要齐全一样,只有成果一样即可),也即Key雷同的状况下,被调配到ID雷同的Partition内

如果上述条件不满足,可通过调用如下办法使得它满足上述条件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

4、聚合与乱序解决

聚合操作可利用于KStream和KTable。当聚合产生在KStream上时必须指定窗口,从而限定计算的指标数据集。

须要阐明的是,聚合操作的后果必定是KTable。因为KTable是可更新的,能够在晚到的数据到来时(也即产生数据乱序时)更新后果KTable。

这里举例说明。假如对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操作。并且KStream先后呈现工夫为1秒, 3秒, 5秒的数据,此时5秒的窗口已达下限,Kafka Stream敞开该窗口,触发Count操作并将后果3输入到KTable中(假如该后果示意为<1-5,3>)。若1秒后,又收到了工夫为2秒的记录,因为1-5秒的窗口已敞开,若间接摈弃该数据,则可认为之前的后果<1-5,3>不精确。而如果间接将残缺的后果<1-5,4>输入到KStream中,则KStream中将会蕴含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。因而Kafka Stream抉择将聚合后果存于KTable中,此时新的后果<1-5,4>会代替旧的后果<1-5,3>。用户可失去残缺的正确的后果。

这种形式保障了数据准确性,同时也进步了容错性。

但须要阐明的是,Kafka Stream并不会对所有晚到的数据都从新计算并更新后果集,而是让用户设置一个retention period,将每个窗口的后果集在内存中保留肯定工夫,该窗口内的数据晚到时,间接合并计算,并更新后果KTable。超过retention period后,该窗口后果将从内存中删除,并且晚到的数据即便落入窗口,也会被间接抛弃。

5、容错

Kafka Stream从如下几个方面进行容错

  • 高可用的Partition保障无数据失落。每个Task计算一个Partition,而Kafka数据复制机制保障了Partition内数据的高可用性,故无数据失落危险。同时因为数据是长久化的,即便工作失败,仍然能够从新计算。
  • 状态存储实现疾速故障复原和从故障点持续解决。对于Join和聚合及窗口等有状态计算,状态存储可保留中间状态。即便产生Failover或Consumer Rebalance,依然能够通过状态存储复原中间状态,从而能够持续从Failover或Consumer Rebalance前的点持续计算。
  • Table与retention period提供了对乱序数据的解决能力。

四、总结

  • Kafka Stream的并行模型齐全基于Kafka的分区机制和Rebalance机制,实现了在线动静调整并行度
  • 同一Task蕴含了一个子Topology的所有Processor,使得所有解决逻辑都在同一线程内实现,防止了不用的网络通信开销,从而进步了效率。
  • through办法提供了相似Spark的Shuffle机制,为应用不同分区策略的数据提供了Join的可能
  • log compact进步了基于Kafka的state store的加载效率
  • state store为状态计算提供了可能
  • 基于offset的计算进度治理以及基于state store的中间状态治理为产生Consumer rebalance或Failover时从断点处持续解决提供了可能,并为零碎容错性提供了保障
  • KTable的引入,使得聚合计算拥用了解决乱序问题的能力