Spark-Streaming介绍

49次阅读

共计 3181 个字符,预计需要花费 8 分钟才能阅读完成。

Spark Streaming 特点:

  • 高吞吐量:Streaming 在 Spark 的基础上集成了流式处理,可以以类似 Spark 批处理的方式写流式作业,” 接收 + 处理 + 输出 ” 大量数据。一个吞吐,可以说是,一个服务器接受客户端的请求 ==》然后处理完(可能是 CPU 计算、可能是文件处理、数据库处理、网络任务等)==》最后返回结果。
  • 容错能力强:可以恢复丢失的作业及操作状态
  • 支持多数据源输入:hdfs、flume、kafka、twitter、zeromq 和自定义数据源读取数据,做交互应用、数据分析
  • 结果存储:能保存在很多地方,如 HDFS,数据库等
  • 能和 MLlib(机器学习)以及 Graphx 完美融合

构建一个 Spark Streaming 应用程序的步骤:

  1. 构建 Streaming Context 对象时,注意 时间窗口参数 Second(1),需要积攒的时间长度,如 1s。该参数设置依赖于需求和集群处理能力。
  2. 创建 InputDStream
  3. 操作 DStream
  4. 启动 Spark Streaming,当 ssc.start() 启动后,程序才真正进行所有预期的操作

离散数据流 (DStream):在其内部,DStream 是通过一组时间序列上连续的 RDD 来表示的,每一个 RDD 都包含了特定时间间隔内的数据流

相关算子

  • map(func)
  • flatMap(func)
  • reduceByKey(func,numTasks):numTasks 并行的提交任务个数,默认情况下,本地环境下是 2,集群环境下是 8
  • foreachRDD(func):基本的输出操作

Spark Streaming 接收 Kafka 数据:

  • 先把数据接收过来,转换为 spark streaming 中的数据结构 Dstream,有两种方式:利用 Receiver 接收数据 OR 直接从 kafka 读取数据

方式一 基于 Receiver 的方式:

  • 对于所有的接收器,从 kafka 接收来的数据会存储在 Spark 的 exector 中,之后 Streaming 提交的 job 会处理这些数据
  • 注意:Receiver 的方式中,Spark 的 partition 和 Kafka 的 partition 并不相关,加大每个 topic 的 partition 仅仅增加线程来处理单一 Receiver 消费的 topic,并没有增加 Spark 在处理数据上的并行度
  • 对于不同的 Group 和 topic 可以使用多个 Receiver 创建不同的 DStream 来并行接收数据,之后可以用 union 来统一成一个 DStream
  • 如果我们启用了 Write Ahead Logs 复制到文件系统如 HDFS,那么 storage level 需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

方式二 直接读取方式

  • Spark1.3 之后,引入了 Direct 方式,它会周期性的获取 Kafka 中每个 topic 的每个 partition 中的最新 offsets, 之后根据设定的 maxRatePerPartition 来处理每个 batch
  • 简化的并行:在 **Direct 方式中,Kafka 中的 partition 与 RDD 中的 partition 是一一对应的并行读取 Kafka 数据,这种映射关系也更利于理解和优化
  • 高效:在 Receiver 的方式中,为了达到 0 数据丢失需要将数据存入 Write Ahead Log 中,这样在 Kafka 和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们 Kafka 的数据保留时间足够长,我们都能够从 Kafka 进行数据恢复
  • 精确一次消费:在 Receiver 的方式中,使用的是 Kafka 的高阶 API 接口从 Zookeeper 中获取 offset 值,这也是传统的从 Kafka 中读取数据的方式,但由于 Spark Streaming 消费的数据和 Zookeeper 中记录的 offset 不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶 Kafka API,Offsets 则利用 Spark Streaming 的 checkpoints 进行记录,消除了这种不一致性

Spark streaming+Kafka 调优方法

参考:https://www.cnblogs.com/xltur…

目的是,Spark Streaming 能够实时的拉取 Kafka 当中的数据,并且能够保持稳定

  1. 合理的批处理时间(batchDuration): 过小会导致数据处理不完,导致阻塞。一般对于 batchDuration 的设置不会小于 500ms
  2. 合理的 Kafka 拉取量,配置参数为 spark.streaming.kafka.maxRatePerPartition:默认是没有上线的,即 kafka 当中有多少数据它就会直接全部拉出。而根据生产者写入 Kafka 的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的 batchDuration,使得每个 partition 拉取在每个 batchDuration 期间拉取的数据 能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的 Input Rate 和 Processing Time
  3. 缓存反复使用的 DStream(RDD):Spark 中的 RDD 和 SparkStreaming 中的 Dstream,如果被反复的使用,最好利用 cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销
  4. 设置合理的 GC:即 java 的垃圾回收机制,让我们不用关注内存的分配回收,更加专注业务逻辑。java 的 GC 分为初生代、年轻代、老年代、永久代,每次 GC 都是需要耗费时间的,特别是老年代的 GC 回收。通常使用中建议: --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
  5. 设置合理的 CPU 资源数:CPU 的 core 数量,每个 exectutor 可以占用多个 core,但是 executor 并不总能充分利用多核的能力。通过观察 CPU 的资源使用情况,可以适量增加 executor 减少 core 数量,注意 executor 增加,每个 exector 的内存就越小哦(executor 太多的话,出现过多数据时就会出现 spill over 甚至 out of memory)!
  6. 设置合理的 parallelism:在 SparkStreaming+kafka 的使用中,我们采用了 Direct 连接方式,而 Spark 中的 partition 和 Kafka 中的 Partition 是一一对应的,我们一 般默认设置为 Kafka 中 Partition 的数量
  7. 使用 Kryo 优化序列化性能:官方介绍,Kryo 序列化机制比 Java 序列化机制,性能高 10 倍左右
  8. 使用高性能的算子:
  • 使用 reduceByKey/aggregateByKey 替代 groupByKey
  • 使用 mapPartitions 替代普通 map
  • 使用 foreachPartitions 替代 foreach
  • 使用 filter 之后进行 coalesce 操作
  • 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
// 创建 SparkConf 对象。val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为 KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

正文完
 0