共计 17056 个字符,预计需要花费 43 分钟才能阅读完成。
Spark Streaming 是构建在 Spark Core 根底之上的流解决框架,是 Spark 十分重要的组成部分。Spark Streaming 于 2013 年 2 月在 Spark0.7.0 版本中引入,倒退至今曾经成为了在企业中宽泛应用的流解决平台。在 2016 年 7 月,Spark2.0 版本中引入了 Structured Streaming,并在 Spark2.2 版本中达到了生产级别,Structured Streaming 是构建在 Spark SQL 之上的流解决引擎,用户能够应用 DataSet/DataFreame API 进行流解决,目前 Structured Streaming 在不同的版本中倒退速度很快。值得注意的是,本文不会对 Structured Streaming 做过多解说,次要针对 Spark Streaming 进行探讨,包含以下内容:
- Spark Streaming 介绍
- Transformations 与 Output Operations
- Spark Streaming 数据源(Sources)
- Spark Streaming 数据汇(Sinks)
Spark Streaming 介绍
什么是 DStream
Spark Streaming 是构建在 Spark Core 的 RDD 根底之上的,与此同时 Spark Streaming 引入了一个新的概念:DStream(Discretized Stream,离散化数据流),示意连续不断的数据流。DStream 形象是 Spark Streaming 的流解决模型,在外部实现上,Spark Streaming 会对输出数据依照工夫距离(如 1 秒)分段,每一段数据转换为 Spark 中的 RDD,这些分段就是 Dstream,并且对 DStream 的操作都最终转变为对相应的 RDD 的操作。如下图所示:
如上图,这些底层的 RDD 转换操作是由 Spark 引擎来实现的,DStream 的操作屏蔽了许多底层的细节,为用户提供了比拟方便使用的高级 API。
计算模型
在 Flink 中,批处理是流解决的特例,所以 Flink 是人造的流解决引擎。而 Spark Streaming 则不然,Spark Streaming 认为流解决是批处理的特例,即 Spark Streaming 并不是纯实时的流解决引擎,在其外部应用的是 microBatch
模型,行将流解决看做是在较小工夫距离内 (batch interval) 的一些列的批处理。对于工夫距离的设定,须要联合具体的业务提早需要,能够实现秒级或者分钟级的距离。
Spark Streaming 会将每个短时间距离内接管的数据存储在集群中,而后对其作用一系列的算子操作(map,reduce, groupBy 等)。执行过程见下图:
如上图:Spark Streaming 会将输出的数据流宰割成一个个小的 batch,每一个 batch 都代表这一些列的 RDD,而后将这些 batch 存储在内存中。通过启动 Spark 作业来解决这些 batch 数据,从而实现一个流解决利用。
Spark Streaming 的工作机制
概览
- 在 Spark Streaming 中,会有一个组件 Receiver,作为一个长期运行的 task 跑在一个 Executor 上
- 每个 Receiver 都会负责一个 input DStream(比方从文件中读取数据的文件流,比方套接字流,或者从 Kafka 中读取的一个输出流等等)
- Spark Streaming 通过 input DStream 与内部数据源进行连贯,读取相干数据
执行细节
- 1. 启动 StreamingContext
- 2.StreamingContext 启动 receiver,该 receiver 会始终运行在 Executor 的 task 中。用于连续不断地接收数据源,有两种次要的 reciver,一种是牢靠的 reciver,当数据被接管并且存储到 spark,发送回执确认,另一种是不牢靠的 reciver,对于数据源不发送回执确认。接管的数据会被缓存到 work 节点内存中,也会被复制到其余 executor 的所在的节点内存中,用于容错解决。
- 3.Streaming context 周期触发 job(依据 batch-interval 工夫距离)进行数据处理。
- 4. 将数据输入。
Spark Streaming 编程步骤
通过下面的剖析,对 Spark Streaming 有了初步的意识。那么该如何编写一个 Spark Streaming 应用程序呢?一个 Spark Streaming 个别包含一下几个步骤:
- 1. 创立
StreamingContext
- 2. 创立输出
DStream
来定义输出源 - 3. 通过对 DStream 利用转换操作和输入操作来定义解决逻辑
- 4. 用 streamingContext.start()来开始接收数据和解决流程
- 5.streamingContext.awaitTermination()办法来期待解决完结
object StartSparkStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Streaming")
// 1. 创立 StreamingContext
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
// 2. 创立 DStream
val lines = ssc.socketTextStream("localhost", 9999)
// 3. 定义流计算解决逻辑
val count = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
// 4. 输入后果
count.print()
// 5. 启动
ssc.start()
// 6. 期待执行
ssc.awaitTermination()}
}
Transformations 与 Output Operations
DStream 是不可变的,这意味着不能间接扭转它们的内容,而是通过对 DStream 进行一系列转换 (Transformation) 来实现预期的利用程序逻辑。每次转换都会创立一个新的 DStream,该 DStream 示意来自父 DStream 的转换后的数据。DStream 转换是惰性 (lazy) 的,这象征只有执行 output 操作之后,才会去执行转换操作,这些触发执行的操作称之为output operation
。
Transformations
Spark Streaming 提供了丰盛的 transformation 操作,这些 transformation 又分为了 有状态的 transformation和 无状态的 transformation。除此之外,Spark Streaming 也提供了一些 window 操作,值得注意的是 window 操作也是有状态的。具体细节如下:
无状态的 transformation
无状态的 transformation 是指每一个 micro-batch 的解决是互相独立的,即以后的计算结果不受之前计算结果的影响,Spark Streaming 的大部分算子都是无状态的,比方常见的 map(),flatMap(),reduceByKey()等等。
- map(func)
对源 DStream 的每个元素,采纳 func 函数进行转换,失去一个新的 Dstream
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
- flatMap(func)
与 map 类似,然而每个输出项可用被映射为 0 个或者多个输入项
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
- filter(func)
返回一个新的 DStream,仅蕴含源 DStream 中满足函数 func 的项
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {new FilteredDStream(this, context.sparkContext.clean(filterFunc))
}
- repartition(numPartitions)
通过创立更多或者更少的分区扭转 DStream 的并行水平
/**
* Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
* returned DStream has exactly numPartitions partitions.
*/
def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))
}
- reduce(func)
利用函数 func 汇集源 DStream 中每个 RDD 的元素,返回一个蕴含单元素 RDDs 的新 DStream
/**
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
}
- count()
统计源 DStream 中每个 RDD 的元素数量
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = ssc.withScope {this.map(_ => (null, 1L))
.transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
.reduceByKey(_ + _)
.map(_._2)
}
- union(otherStream)
返回一个新的 DStream,蕴含源 DStream 和其余 DStream 的元素
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = ssc.withScope {new UnionDStream[T](Array(this, that))
}
- countByValue()
利用于元素类型为 K 的 DStream 上,返回一个(K,V)键值对类型的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的呈现次数, 比方lines.flatMap(_.split(" ")).countByValue().print()
, 对于输出:spark spark flink
, 将输入:(spark,2),(flink,1)
, 即依照元素值进行分组,而后统计每个分组的元素个数。
从源码能够看出:底层实现为 map((_,1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions),即先按以后的元素映射为一个 tuple,其中 key 即为以后元素的值,而后再依照 key 做汇总。
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate
* the RDDs with `numPartitions` partitions (Spark's default number of partitions if
* `numPartitions` not specified).
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
- reduceByKey(func, [numTasks])
当在一个由 (K,V) 键值对组成的 DStream 上执行该操作时,返回一个新的由 (K,V) 键值对组成的 DStream,每一个 key 的值均由给定的 recuce 函数(func)汇集起来
比方:lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).print()
对于输出:spark spark flink,将输入:(spark,2),(flink,1)
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative and commutative reduce function. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {reduceByKey(reduceFunc, defaultPartitioner())
}
- join(otherStream, [numTasks])
当利用于两个 DStream(一个蕴含(K,V)键值对, 一个蕴含 (K,W) 键值对),返回一个蕴含 (K, (V, W)) 键值对的新 Dstream
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {join[W](other, defaultPartitioner())
}
- cogroup(otherStream, [numTasks])
当利用于两个 DStream(一个蕴含(K,V)键值对, 一个蕴含 (K,W) 键值对),返回一个蕴含 (K, Seq[V], Seq[W]) 的元组
// 输出:spark // 输入:(spark,(CompactBuffer(1),CompactBuffer(1))) val DS1 = lines.flatMap(_.split(" ")).map((_,1)) val DS2 = lines.flatMap(_.split(" ")).map((_,1)) DS1.cogroup(DS2).print()
/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {cogroup(other, defaultPartitioner())
}
- transform(func)
通过对源 DStream 的每个 RDD 利用 RDD-to-RDD 函数,创立一个新的 DStream。反对在新的 DStream 中做任何 RDD 操作
// 输出:spark spark flink // 输入:(spark,2)、(flink,1) val lines = ssc.socketTextStream("localhost", 9999) val resultDStream = lines.transform(rdd => {rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _) }) resultDStream.print()
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], _: Time) => cleanedF(r))
}
有状态的 transformation
有状态的 transformation 是指每个 micro-batch 的解决不是互相独立的,即以后的 micro-batch 解决依赖于之前的 micro-batch 计算结果。常见的有状态的 transformation 次要有 countByValueAndWindow, reduceByKeyAndWindow , mapWithState, updateStateByKey 等等。其实所有的基于 window 的操作都是有状态的,因为追踪整个窗口内的数据。
对于有状态的 transformation 和 Window Operations,参见下文。
Output Operations
应用 Output operations 能够将 DStream 写入多内部存储设备或打印到控制台。上文提到,Spark Streaming 的 transformation 是 lazy 的,因而须要 Output Operation 进行触发计算,其性能相似于 RDD 的 action 操作。具体详见下文 Spark Streaming 数据汇(Sinks)。
Spark Streaming 数据源
Spark Streaming 的目标是成为一个通用的流解决框架,为了实现这一指标,Spark Streaming 应用 Receiver 来集成各种各样的数据源。然而,对于有些数据源 (如 kafka),Spark Streaming 反对应用Direct 的形式去接收数据,这种形式比 Receiver 形式性能要好。
基于 Receiver 的形式
Receiver 的作用是从数据源收集数据,而后将数据传送给 Spark Streaming。基本原理是:随着数据的一直到来,在绝对应的 batch interval 工夫距离内,这些数据会被收集并且打包成 block,只有等到 batch interval 工夫实现了,收集的数据 block 会被发送给 spark 进行解决。
如上图:当 Spark Streaming 启动时,receiver 开始收集数据。在 t0
的 batch interval 完结时 (即收集完了该时间段内的数据),收集到的 block #0 会被发送到 Spark 进行解决。在 t2
时刻,Spark 会解决 t1
的 batch interval 的数据 block,与此同时会不停地收集 t2
的 batch interval 对应的 block#2。
常见的基于 Receiver 的数据源包含:Kafka, Kinesis, Flume,Twitter。除此之外,用户也能够通过继承 Receiver抽象类,实现 onStart()
与onStop()
两个办法,进行自定义 Receiver。本文不会对基于 Receiver 的数据源做过多探讨,次要针对基于 Direct 的 Kafka 数据源进行具体解释。
基于 Direct 的形式
Spark 1.3 中引入了这种新的无 Receiver 的 Direct 办法,以确保更强的端到端保障。该办法不是应用 Receiver 来接收数据,而是定期查问 Kafka 每个 topic+partition 中的最新偏移量,并相应地定义要在每个批次中解决的偏移量范畴。启动用于解决数据的作业时,Kafka 的简略 consumer API 用于读取 Kafka 定义的偏移量范畴(相似于从文件系统读取文件)。请留神,此性能是在 Scala 和 Java API 的 Spark 1.3 引入的,在 Python API 的 Spark 1.4 中引入的。
基于 Direct 的形式具备以下长处:
- 简化并行读取
如果要读取多个 partition,不须要创立多个输出 DStream 而后对他们进行 union 操作。Spark 会创立跟 Kafka partition 一样多的 RDD partition,并且会并行从 kafka 中读取数据。所以在 kafka partition 和 RDD partition 之间,有一一对应的关系。
- 高性能
如果要保证数据零失落,在基于 Receiver 的形式中,须要开启 WAL 机制。这种形式其实效率很低,因为数据理论被复制了两份,kafka 本人自身就有高牢靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 Direct 的形式,不依赖于 Receiver,不须要开启 WAL 机制,只有 kafka 中做了数据的复制,那么就能够通过 kafka 的正本进行复原。
- Exactly-once 语义
基于 Receiver 的形式,应用 kafka 的高阶 API 来在 Zookeeper 中保留生产过的 offset。这是生产 kafka 数据的传统形式。这种形式配合 WAL 机制,能够保证数据零失落的高可靠性,然而却无奈保障 Exactly-once 语义(Spark 和 Zookeeper 之间可能是不同步的)。基于 Direct 的形式,应用 kafka 的简略 API,Spark Streaming 本人就负责追踪生产的 offset,并保留在 checkpoint 中。Spark 本人肯定是同步的,因而能够保证数据时生产一次且仅生产一次。
Spark Streaming 集成 kafka
应用形式
应用 KafkaUtils 增加 Kafka 数据源,源码如下:
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]] = {val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
}
具体参数解释:
- K:Kafka 音讯 key 的类型
- V:Kafka 音讯 value 的类型
- ssc:StreamingContext
-
locationStrategy: LocationStrategy,依据 Executor 中的主题的分区来调度 consumer,即尽可能地让 consumer 凑近 leader partition。该配置能够晋升性能,但对于 location 的抉择只是一种参考,并不是相对的。能够抉择如下形式:
- PreferBrokers:Spark 和 Kafka 运行在同一个节点上,能够应用此种形式
- PreferConsistent:大部分状况应用此形式,它将统一地在所有 Executor 之间调配分区
- PreferFixed:将特定的主题分区搁置到特定的主机上,在数据负载不平衡时应用
留神:少数状况下应用 PreferConsisten,其余两种形式只是在特定的场景应用。这种配置只是一种参考,具体的状况还是会依据集群的资源主动调整。
-
consumerStrategy:生产策略,次要有上面三种形式:
- Subscribe:订阅指定主题名称的主题汇合
- SubscribePattern:通过正则匹配,订阅相匹配的主题数据
- Assign:订阅一个主题 + 分区的汇合
留神:大多数状况下应用 Subscribe 形式。
应用案例
object TolerateWCTest {def createContext(checkpointDirectory: String): StreamingContext = {val sparkConf = new SparkConf()
.set("spark.streaming.backpressure.enabled", "true")
// 每秒钟从 kafka 分区中读取的 records 数量, 默认 not set
.set("spark.streaming.kafka.maxRatePerPartition", "1000") //
//Driver 为了获取每个 leader 分区的最近 offsets,间断进行重试的次数,// 默认是 1,示意最多重试 2 次,仅仅实用于 new Kafka direct stream API
.set("spark.streaming.kafka.maxRetries", "2")
.setAppName("TolerateWCTest")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint(checkpointDirectory)
val topic = Array("testkafkasource2")
val kafkaParam = Map[String, Object](
"bootstrap.servers" -> "kms-1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group0",
"auto.offset.reset" -> "latest", // 默认 latest,"enable.auto.commit" -> (false: java.lang.Boolean)) // 默认 true,false: 手动提交
val lines = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topic, kafkaParam))
val words = lines.flatMap(_.value().split(" "))
val wordDstream = words.map(x => (x, 1))
val stateDstream = wordDstream.reduceByKey(_ + _)
stateDstream.cache()
// 参照 batch interval 设置,// 不得低于 batch interval,否则会报错,// 设为 batch interval 的 2 倍
stateDstream.checkpoint(Seconds(6))
// 把 DStream 保留到 MySQL 数据库中
stateDstream.foreachRDD(rdd =>
rdd.foreachPartition { record =>
var conn: Connection = null
var stmt: PreparedStatement = null
// 给每个 partition,获取一个连贯
conn = ConnectionPool.getConnection
// 遍历 partition 中的数据,应用一个连贯,插入数据库
while (record.hasNext) {val wordcounts = record.next()
val sql = "insert into wctbl(word,count) values (?,?)"
stmt = conn.prepareStatement(sql);
stmt.setString(1, wordcounts._1.trim)
stmt.setInt(2, wordcounts._2.toInt)
stmt.executeUpdate()}
// 用完当前,将连贯还回去
ConnectionPool.returnConnection(conn)
})
ssc
}
def main(args: Array[String]) {
val checkpointDirectory = "hdfs://kms-1:8020/docheckpoint"
val ssc = StreamingContext.getOrCreate(
checkpointDirectory,
() => createContext(checkpointDirectory))
ssc.start()
ssc.awaitTermination()}
}
Spark Streaming 数据汇(Sinks)
Output Operation 介绍
Spark Streaming 提供了上面内置的 Output Operation,如下:
- print()
打印数据数据到规范输入,如果不传递参数,默认打印前 10 个元素
- saveAsTextFiles(prefix, [suffix])
将 DStream 内容存储到文件系统,每个 batch interval 的文件名称为 `prefix-TIME_IN_MS[.suffix]
- saveAsObjectFiles(prefix, [suffix])
将 DStream 的内容保留为序列化的 java 对象的 SequenceFile,每个 batch interval 的文件名称为prefix-TIME_IN_MS[.suffix]
,Python API 不反对此办法。
- saveAsHadoopFiles(prefix, [suffix])
将 DStream 内容保留为 Hadoop 文件,每个 batch interval 的文件名称为prefix-TIME_IN_MS[.suffix]
,Python API 不反对此办法。
- foreachRDD(func)
通用的数据输入算子,func 函数将每个 RDD 的数据输入到内部存储设备,比方将 RDD 写入到文件或者数据库。
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
foreachRDD(foreachFunc, displayInnerRDDOps = true)
}
private def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}
foreachRDD是一个十分重要的操作,用户能够应用它将解决的数据输入到内部存储设备。对于 foreachRDD 的应用,须要特点别留神一些细节问题。具体分析如下:
如果将数据写入到 MySQL,须要获取连贯 Connection。用户可能不经意的在 Spark Driver 中创立一个连贯对象,而后在 Work 中应用它将数据写入外部设备,代码如下:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // ①留神:该段代码在 driver 上执行
rdd.foreach { record =>
connection.send(record) // ②留神:该段代码在 worker 上执行
}
}
尖叫提醒:下面的应用形式是谬误的,因为须要将 connection 对象进行序列化,而后发送到 driver 节点,而这种 connection 对象是不能被序列化,所以不能跨节点传输。下面代码会报序列化谬误,正确的应用形式是在 worker 节点创立 connection,即在
rdd.foreach
外部创立 connection。形式如下:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()}
}
下面的形式解决了不能序列化的问题,然而会为每个 RDD 的 record 创立一个 connection,通常创立一个 connection 对象是会存在肯定性能开销的,所以频繁创立和销毁 connection 对象会造成整体的吞吐量升高。一个比拟好的做法是将 rdd.foreach
替换为`rdd.foreachPartition
`, 这样就不必频繁为每个 record 创立 connection,而是为 RDD 的 partition 创立 connection,大大减少了创立 connection 带来的开销。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()}
}
其实下面的应用形式还能够进一步优化,能够通过在多个 RDD 或者批数据间重用连贯对象。用户能够保护一个动态的连贯对象池,重复使用池中的对象将多批次的 RDD 推送到内部零碎,以进一步节俭开销:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}
应用案例
- 模仿数据库连接池
/**
* 简易版的连接池
*/
public class ConnectionPool {
// 动态的 Connection 队列
private static LinkedList<Connection> connectionQueue;
/**
* 加载驱动
*/
static {
try {Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {e.printStackTrace();
}
}
/**
* 获取连贯,多线程拜访并发管制
*
* @return
*/
public synchronized static Connection getConnection() {
try {if (connectionQueue == null) {connectionQueue = new LinkedList<Connection>();
for (int i = 0; i < 10; i++) {
Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/wordcount", "root",
"123qwe");
connectionQueue.push(conn);
}
}
} catch (Exception e) {e.printStackTrace();
}
return connectionQueue.poll();}
/**
* 用完之后,返回一个连贯
*/
public static void returnConnection(Connection conn) {connectionQueue.push(conn);
}
}
- 实时统计写入 MySQL
object WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// 存储到 MySQL
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
var conn: Connection = null
var stmt: PreparedStatement = null
// 给每个 partition,获取一个连贯
conn = ConnectionPool.getConnection
// 遍历 partition 中的数据,应用一个连贯,插入数据库
while (partition.hasNext) {val wordcounts = partition.next()
val sql = "insert into wctbl(word,count) values (?,?)"
stmt = conn.prepareStatement(sql);
stmt.setString(1, wordcounts._1.trim)
stmt.setInt(2, wordcounts._2.toInt)
stmt.executeUpdate()}
// 用完当前,将连贯还回去
ConnectionPool.returnConnection(conn)
}
}
ssc.start()
ssc.awaitTermination()}
}
总结
因为篇幅限度,本文次要对 Spark Streaming 执行机制、Transformations 与 Output Operations、Spark Streaming 数据源 (Sources)、Spark Streaming 数据汇(Sinks) 进行了探讨。下一篇将分享 基于工夫的窗口操作 、 有状态的计算 、 检查点 Checkpoint、性能调优 等内容。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包