关于java:Kafka-实战四Kafka-Stream-详解

3次阅读

共计 8685 个字符,预计需要花费 22 分钟才能阅读完成。

一、Kafka Stream 背景

1、Kafka Stream 简介

提供了对存储于 Kafka 内的树进行流式解决和剖析的性能

Kafka Stream 的特点:

  • Kafka Stream 提供了一个非常简单而轻量的 Library,它能够十分不便地嵌入任意 Java 利用中,也能够任意形式打包和部署
  • 除了 Kafka 外,无任何内部依赖
  • 充分利用 Kafka 分区机制实现程度扩大和程序性保障
  • 通过可容错的 state store 实现高效的状态操作(如 windowed join 和 aggregation)
  • 反对正好一次解决语义
  • 提供记录级的解决能力,从而实现毫秒级的低提早
  • 反对基于事件工夫的窗口操作,并且可解决晚到的数据(late arrival of records)
  • 同时提供底层的解决原语 Processor(相似于 Storm 的 spout 和 bolt),以及高层形象的 DSL(相似于 Spark 的 map/group/reduce)

2、流式计算

流式计算模型中,输出是继续的,意味着永远拿不到全量数据去做计算。同时,计算结果是继续输入的,也即计算结果在工夫上也是无界的。流式计算个别对实时性要求较高,同时个别是先定义指标计算,而后数据到来之后将计算逻辑利用于数据。同时为了进步计算效率,往往尽可能采纳增量计算代替全量计算。

批量解决模型中,个别先有全量数据集,而后定义计算逻辑,并将计算利用于全量数据。特点是全量计算,并且计算结果一次性全量输入。

3、为什么要有 Kafka Stream

以后曾经有十分多的流式解决零碎,最出名且利用最多的开源流式解决零碎有 Spark Streaming 和 Apache Storm。Apache Storm 倒退多年,利用宽泛,提供记录级别的解决能力,以后也反对 SQL on Stream。而 Spark Streaming 基于 Apache Spark,能够十分不便与图计算,SQL 解决等集成,功能强大,对于相熟其它 Spark 利用开发的用户而言应用门槛低。另外,目前支流的 Hadoop 发行版,如 MapR,Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。

既然 Apache Spark 与 Apache Storm 拥用如此多的劣势,那为何还须要 Kafka Stream 呢?笔者认为次要有如下起因:

第一,Spark 和 Storm 都是流式解决框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式解决类库。框架要求开发者依照特定的形式去开发逻辑局部,供框架调用。开发者很难理解框架的具体运行形式,从而使得调试老本高,并且应用受限。而 Kafka Stream 作为流式解决类库,间接提供具体的类给开发者调用,整个利用的运行形式次要由开发者管制,方便使用和调试。

第二,尽管 Cloudera 与 Hortonworks 不便了 Storm 和 Spark 的部署,然而这些框架的部署依然绝对简单。而 Kafka Stream 作为类库,能够十分不便的嵌入应用程序中,它对利用的打包和部署根本没有任何要求。更为重要的是,Kafka Stream 充分利用了 Kafka 的分区机制和 Consumer 的 Rebalance 机制,使得 Kafka Stream 能够十分不便的程度扩大,并且各个实例能够应用不同的部署形式。具体来说,每个运行 Kafka Stream 的应用程序实例都蕴含了 Kafka Consumer 实例,多个同一利用的实例之间并行处理数据集。而不同实例之间的部署形式并不要求统一,比方局部实例能够运行在 Web 容器中,局部实例可运行在 Docker 或 Kubernetes 中。

第三,就流式解决零碎而言,根本都反对 Kafka 作为数据源。例如 Storm 具备专门的 kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是支流的流式解决零碎的规范数据源。换言之,大部分流式零碎中都已部署了 Kafka,此时应用 Kafka Stream 的老本非常低。

第四,应用 Storm 或 Spark Streaming 时,须要为框架自身的过程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即便对于利用实例而言,框架自身也会占用局部资源,如 Spark Streaming 须要为 shuffle 和 storage 预留内存。

第五,因为 Kafka 自身提供数据长久化,因而 Kafka Stream 提供滚动部署和滚动降级以及从新计算的能力。

第六,因为 Kafka Consumer Rebalance 机制,Kafka Stream 能够在线动静调整并行度。

二、Kafka Stream 架构

1、Kafka Stream 整体架构

Kafka Stream 的整体架构图如下所示。

目前(Kafka 0.11.0.0)Kafka Stream 的数据源只能如上图所示是 Kafka。然而处理结果并不一定要如上图所示输入到 Kafka。实际上 KStream 和 Ktable 的实例化都须要指定 Topic。

KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");

另外,上图中的 Consumer 和 Producer 并不需要开发者在利用中显示实例化,而是由 Kafka Stream 依据参数隐式实例化和治理,从而升高了应用门槛。开发者只须要专一于开发外围业务逻辑,也即上图中 Task 内的局部。

2、Processor Topology

基于 Kafka Stream 的流式利用的业务逻辑全副通过一个被称为 Processor Topology 的中央执行。它与 Storm 的 Topology 和 Spark 的 DAG 相似,都定义了数据在各个处理单元(在 Kafka Stream 中被称作 Processor)间的流动形式,或者说定义了数据的解决逻辑。

3、Kafka Stream 并行模型

Kafka Stream 的并行模型中,最小粒度为 Task,而每个 Task 蕴含一个特定子 Topology 的所有 Processor。因而每个 Task 所执行的代码齐全一样,惟一的不同在于所解决的数据集互补。这一点跟 Storm 的 Topology 齐全不一样。Storm 的 Topology 的每一个 Task 只蕴含一个 Spout 或 Bolt 的实例。因而 Storm 的一个 Topology 内的不同 Task 之间须要通过网络通信传递数据,而 Kafka Stream 的 Task 蕴含了残缺的子 Topology,所以 Task 之间不须要传递数据,也就不须要网络通信。这一点升高了零碎复杂度,也进步了解决效率。

如果某个 Stream 的输出 Topic 有多个 (比方 2 个 Topic,1 个 Partition 数为 4,另一个 Partition 数为 3),则总的 Task 数等于 Partition 数最多的那个 Topic 的 Partition 数(max(4,3)=4)。这是因为 Kafka Stream 应用了 Consumer 的 Rebalance 机制,每个 Partition 对应一个 Task。

下图展现了在一个过程(Instance)中以 2 个 Topic(Partition 数均为 4)为数据源的 Kafka Stream 利用的并行模型。从图中能够看到,因为 Kafka Stream 利用的默认线程数为 1,所以 4 个 Task 全副在一个线程中运行。


为了充分利用多线程的劣势,能够设置 Kafka Stream 的线程数。下图展现了线程数为 2 时的并行模型。


前文有提到,Kafka Stream 可被嵌入任意 Java 利用(实践上基于 JVM 的利用都能够)中,下图展现了在同一台机器的不同过程中同时启动同一 Kafka Stream 利用时的并行模型。留神,这里要保障两个过程的 StreamsConfig.APPLICATION_ID_CONFIG 齐全一样。因为 Kafka Stream 将 APPLICATION_ID_CONFIG 作为隐式启动的 Consumer 的 Group ID。只有保障 APPLICATION_ID_CONFIG 雷同,能力保障这两个过程的 Consumer 属于同一个 Group,从而能够通过 Consumer Rebalance 机制拿到互补的数据集。

既然实现了多过程部署,能够以同样的形式实现多机器部署。该部署形式也要求所有过程的 APPLICATION_ID_CONFIG 齐全一样。从图上也能够看到,每个实例中的线程数并不要求一样。然而无论如何部署,Task 总数总会保障统一。

这里比照一下 Kafka Stream 的 Processor Topology 与 Storm 的 Topology。

  • Storm 的 Topology 由 Spout 和 Bolt 组成,Spout 提供数据源,而 Bolt 提供计算和数据导出。Kafka Stream 的 Processor Topology 齐全由 Processor 组成,因为它的数据固定由 Kafka 的 Topic 提供。
  • Storm 的不同 Bolt 运行在不同的 Executor 中,很可能位于不同的机器,须要通过网络通信传输数据。而 Kafka Stream 的 Processor Topology 的不同 Processor 齐全运行于同一个 Task 中,也就齐全处于同一个线程,无需网络通信。
  • Storm 的 Topology 能够同时蕴含 Shuffle 局部和非 Shuffle 局部,并且往往一个 Topology 就是一个残缺的利用。而 Kafka Stream 的一个物理 Topology 只蕴含非 Shuffle 局部,而 Shuffle 局部须要通过 through 操作显示实现,该操作将一个大的 Topology 分成了 2 个子 Topology。
  • Storm 的 Topology 内,不同 Bolt/Spout 的并行度能够不一样,而 Kafka Stream 的子 Topology 内,所有 Processor 的并行度齐全一样。
  • Storm 的一个 Task 只蕴含一个 Spout 或者 Bolt 的实例,而 Kafka Stream 的一个 Task 蕴含了一个子 Topology 的所有 Processor。

4、KTable vs. KStream

KTable 和 KStream 是 Kafka Stream 中十分重要的两个概念,它们是 Kafka 实现各种语义的根底。因而这里有必要剖析下二者的区别。

KStream 是一个数据流,能够认为所有记录都通过 Insert only 的形式插入进这个数据流里。而 KTable 代表一个残缺的数据集,能够了解为数据库中的表。因为每条记录都是 Key-Value 对,这里能够将 Key 了解为数据库中的 Primary Key,而 Value 能够了解为一行记录。能够认为 KTable 中的数据都是通过 Update only 的形式进入的。也就意味着,如果 KTable 对应的 Topic 中新进入的数据的 Key 曾经存在,那么从 KTable 只会取出同一 Key 对应的最初一条数据,相当于新的数据更新了旧的数据。

以下图为例,假如有一个 KStream 和 KTable,基于同一个 Topic 创立,并且该 Topic 中蕴含如下图所示 5 条数据。此时遍历 KStream 将失去与 Topic 内数据齐全一样的所有 5 条数据,且程序不变。而此时遍历 KTable 时,因为这 5 条记录中有 3 个不同的 Key,所以将失去 3 条记录,每个 Key 对应最新的值,并且这三条数据之间的程序与原来在 Topic 中的程序保持一致。这一点与 Kafka 的日志 compact 雷同。


此时如果对该 KStream 和 KTable 别离基于 key 做 Group,对 Value 进行 Sum,失去的后果将会不同。对 KStream 的计算结果是 <Jack,4>,<Lily,7>,<Mike,4>。而对 Ktable 的计算结果是 <Mike,4>,<Jack,3>,<Lily,5>。

5、State store

流式解决中,局部操作是无状态的,例如过滤操作(Kafka Stream DSL 中用 filer 办法实现)。而局部操作是有状态的,须要记录中间状态,如 Window 操作和聚合计算。State store 被用来存储中间状态。它能够是一个长久化的 Key-Value 存储,也能够是内存中的 HashMap,或者是数据库。Kafka 提供了基于 Topic 的状态存储。

Topic 中存储的数据记录自身是 Key-Value 模式的,同时 Kafka 的 log compaction 机制可对历史数据做 compact 操作,保留每个 Key 对应的最初一个 Value,从而在保障 Key 不失落的前提下,缩小总数据量,从而进步查问效率。

结构 KTable 时,须要指定其 state store name。默认状况下,该名字也即用于存储该 KTable 的状态的 Topic 的名字,遍历 KTable 的过程,理论就是遍历它对应的 state store,或者说遍历 Topic 的所有 key,并取每个 Key 最新值的过程。为了使得该过程更加高效,默认状况下会对该 Topic 进行 compact 操作。
另外,除了 KTable,所有状态计算,都须要指定 state store name,从而记录中间状态。

三、Kafka Stream 如何解决流式零碎中关键问题

1、工夫

在流式数据处理中,工夫是数据的一个十分重要的属性。从 Kafka 0.10 开始,每条记录除了 Key 和 Value 外,还减少了 timestamp 属性。目前 Kafka Stream 反对三种工夫

  • 事件产生工夫。事件产生的工夫,蕴含在数据记录中。产生工夫由 Producer 在结构 ProducerRecord 时指定。并且须要 Broker 或者 Topic 将 message.timestamp.type 设置为 CreateTime(默认值)能力失效。
  • 音讯接管工夫。也即音讯存入 Broker 的工夫。当 Broker 或 Topic 将 message.timestamp.type 设置为 LogAppendTime 时失效。此时 Broker 会在接管到音讯后,存入磁盘前,将其 timestamp 属性值设置为以后机器工夫。个别音讯接管工夫比拟靠近于事件产生工夫,局部场景下可代替事件产生工夫。
  • 音讯解决工夫。也即 Kafka Stream 解决音讯时的工夫。

注:Kafka Stream 容许通过实现 org.apache.kafka.streams.processor.TimestampExtractor 接口自定义记录时间。

2、窗口

前文提到,流式数据是在工夫上无界的数据。而聚合操作只能作用在特定的数据集,也即有界的数据集上。因而须要通过某种形式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种十分罕用的设定计算边界的形式。不同的流式解决零碎反对的窗口相似,但不尽相同。

Kafka Stream 反对的窗口如下。

(1)Hopping Time Window 该窗口定义如下图所示。它有两个属性,一个是 Window size,一个是 Advance interval。Window size 指定了窗口的大小,也即每次计算的数据集的大小。而 Advance interval 定义输入的工夫距离。一个典型的利用场景是,每隔 5 秒钟输入一次过来 1 个小时内网站的 PV 或者 UV。

(2)Tumbling Time Window 该窗口定义如下图所示。能够认为它是 Hopping Time Window 的一种特例,也即 Window size 和 Advance interval 相等。它的特点是各个 Window 之间齐全不相交。

(3)Sliding Window 该窗口只用于 2 个 KStream 进行 Join 计算时。该窗口的大小定义了 Join 两侧 KStream 的数据记录被认为在同一个窗口的最大时间差。假如该窗口的大小为 5 秒,则参加 Join 的 2 个 KStream 中,记录时间差小于 5 的记录被认为在同一个窗口中,能够进行 Join 计算。

(4)Session Window 该窗口用于对 Key 做 Group 后的聚合操作中。它须要对 Key 做分组,而后对组内的数据依据业务需要定义一个窗口的起始点和完结点。一个典型的案例是,心愿通过 Session Window 计算某个用户拜访网站的工夫。对于一个特定的用户(用 Key 示意)而言,当产生登录操作时,该用户(Key)的窗口即开始,当产生退出操作或者超时时,该用户(Key)的窗口即完结。窗口完结时,可计算该用户的拜访工夫或者点击次数等。

3、Join

Kafka Stream 因为蕴含 KStream 和 Ktable 两种数据集,因而提供如下 Join 计算

  • KTable Join KTable 后果仍为 KTable。任意一边有更新,后果 KTable 都会更新。
  • KStream Join KStream 后果为 KStream。必须带窗口操作,否则会造成 Join 操作始终不完结。
  • KStream Join KTable / GlobalKTable 后果为 KStream。只有当 KStream 中有新数据时,才会触发 Join 计算并输入后果。KStream 无新数据时,KTable 的更新并不会触发 Join 计算,也不会输入数据。并且该更新只对下次 Join 失效。一个典型的应用场景是,KStream 中的订单信息与 KTable 中的用户信息做关联计算。

对于 Join 操作,如果要失去正确的计算结果,须要保障参加 Join 的 KTable 或 KStream 中 Key 雷同的数据被调配到同一个 Task。具体方法是

  • 参加 Join 的 KTable 或 KStream 的 Key 类型雷同(实际上,业务含意也应该雷同)
  • 参加 Join 的 KTable 或 KStream 对应的 Topic 的 Partition 数雷同
  • Partitioner 策略的最终后果等效(实现不须要齐全一样,只有成果一样即可),也即 Key 雷同的状况下,被调配到 ID 雷同的 Partition 内

如果上述条件不满足,可通过调用如下办法使得它满足上述条件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

4、聚合与乱序解决

聚合操作可利用于 KStream 和 KTable。当聚合产生在 KStream 上时必须指定窗口,从而限定计算的指标数据集。

须要阐明的是,聚合操作的后果必定是 KTable。因为 KTable 是可更新的,能够在晚到的数据到来时(也即产生数据乱序时)更新后果 KTable。

这里举例说明。假如对 KStream 以 5 秒为窗口大小,进行 Tumbling Time Window 上的 Count 操作。并且 KStream 先后呈现工夫为 1 秒, 3 秒, 5 秒的数据,此时 5 秒的窗口已达下限,Kafka Stream 敞开该窗口,触发 Count 操作并将后果 3 输入到 KTable 中(假如该后果示意为 <1-5,3>)。若 1 秒后,又收到了工夫为 2 秒的记录,因为 1 - 5 秒的窗口已敞开,若间接摈弃该数据,则可认为之前的后果 <1-5,3> 不精确。而如果间接将残缺的后果 <1-5,4> 输入到 KStream 中,则 KStream 中将会蕴含该窗口的 2 条记录,<1-5,3>, <1-5,4>,也会存在肮数据。因而 Kafka Stream 抉择将聚合后果存于 KTable 中,此时新的后果 <1-5,4> 会代替旧的后果 <1-5,3>。用户可失去残缺的正确的后果。

这种形式保障了数据准确性,同时也进步了容错性。

但须要阐明的是,Kafka Stream 并不会对所有晚到的数据都从新计算并更新后果集,而是让用户设置一个 retention period,将每个窗口的后果集在内存中保留肯定工夫,该窗口内的数据晚到时,间接合并计算,并更新后果 KTable。超过 retention period 后,该窗口后果将从内存中删除,并且晚到的数据即便落入窗口,也会被间接抛弃。

5、容错

Kafka Stream 从如下几个方面进行容错

  • 高可用的 Partition 保障无数据失落。每个 Task 计算一个 Partition,而 Kafka 数据复制机制保障了 Partition 内数据的高可用性,故无数据失落危险。同时因为数据是长久化的,即便工作失败,仍然能够从新计算。
  • 状态存储实现疾速故障复原和从故障点持续解决。对于 Join 和聚合及窗口等有状态计算,状态存储可保留中间状态。即便产生 Failover 或 Consumer Rebalance,依然能够通过状态存储复原中间状态,从而能够持续从 Failover 或 Consumer Rebalance 前的点持续计算。
  • Table 与 retention period 提供了对乱序数据的解决能力。

四、总结

  • Kafka Stream 的并行模型齐全基于 Kafka 的分区机制和 Rebalance 机制,实现了在线动静调整并行度
  • 同一 Task 蕴含了一个子 Topology 的所有 Processor,使得所有解决逻辑都在同一线程内实现,防止了不用的网络通信开销,从而进步了效率。
  • through 办法提供了相似 Spark 的 Shuffle 机制,为应用不同分区策略的数据提供了 Join 的可能
  • log compact 进步了基于 Kafka 的 state store 的加载效率
  • state store 为状态计算提供了可能
  • 基于 offset 的计算进度治理以及基于 state store 的中间状态治理为产生 Consumer rebalance 或 Failover 时从断点处持续解决提供了可能,并为零碎容错性提供了保障
  • KTable 的引入,使得聚合计算拥用了解决乱序问题的能力
正文完
 0