1.Spark Streaming 简介
Spark Streaming 从各种输出源中读取数据,并把数据分组为小的批次。新的批次按平均的工夫距离创立进去。在每个工夫区间开始的时候,一个新的批次就创立进去,在该区间内收到的数据都会被增加到这个批次中。在工夫区间完结时,批次进行增长,工夫区间的大小是由批次距离这个参数决定的。批次距离个别设在 500 毫秒到几秒之间,由开发者配置。每个输出批次都造成一个 RDD,以 Spark 作业的形式解决并生成其余的 RDD。解决的后果能够以批处理的形式传给内部零碎,Spark Streaming 的编程形象是离散化流,也就是 DStream。它是一个 RDD 序列,每个 RDD 代表数据流中一个工夫片内的数据。另外退出了窗口操作和状态转化,其余和批次解决相似。
与 StructedStreaming 的区别
StructedStreaming 诞生于 2.x 后,次要用于解决结构化数据,除了实现与 Spark Streaming 的批处理,还实现了 long-running 的 task,次要了解为解决的机会能够是数据的生产工夫,而非收到数据的工夫,能够细看下表:
流解决模式 | SparkStreaming | Structed Streaming |
---|---|---|
执行模式 | Micro Batch | Micro batch / Streaming |
API | Dstream/streamingContext | Dataset/DataFrame,SparkSession |
Job 生成形式 | Timer 定时器定时生成 job | Trigger 触发 |
反对数据源 | Socket,filstream,kafka,zeroMq,flume,kinesis | Socket,filstream,kafka,ratesource |
executed-based | Executed based on dstream api | Executed based on sparksql |
Time based | Processing Time | ProcessingTime & eventTIme |
UI | Built-in | No |
对于流解决,当初生产环境下应用 Flink 较多,数据源形式,当初根本是以 kafka 为主,所以本文对 Spark Streaming 的场景即 ETL 流解决结构化日志,将后果输出 Kafka 队列
2.Spark Sreaming 的运行流程
1、客户端提交 Spark Streaming 作业后启动 Driver,Driver 启动 Receiver,Receiver 接收数据源的数据
2、每个作业蕴含多个 Executor,每个 Executor 以线程的形式运行 task,SparkStreaming 至多蕴含一个 receiver task(个别状况下)
3、Receiver 接收数据后生成 Block,并把 BlockId 汇报给 Driver,而后备份到另外一个 Executor 上
4、ReceiverTracker 保护 Reciver 汇报的 BlockId
5、Driver 定时启动 JobGenerator,依据 Dstream 的关系生成逻辑 RDD,而后创立 Jobset,交给 JobScheduler
6、JobScheduler 负责调度 Jobset,交给 DAGScheduler,DAGScheduler 依据逻辑 RDD,生成相应的 Stages,每个 stage 蕴含一到多个 Task,将 TaskSet 提交给 TaskSchedule
7、TaskScheduler 负责把 Task 调度到 Executor 上,并保护 Task 的运行状态
罕用数据源的读取形式
常数据流:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
Socket:
val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)
RDD 队列:
val queue = new Queue[RDD[Int]]()
val queueDStream: InputDStream[Int] = ssc.queueStream(queue)
文件夹:
val lines: DStream[String] = ssc.textFileStream("data/log/")
3. 案例阐明
生产上,罕用流程如下,批处理原始 Kafka 日志,比方申请打点日志等,应用 Spark Streaming 来将数据荡涤转变为肯定格局再导入 Kafka 中, 为了保障 exact-once,会将 offer 本人来保留,次要保留在 redis-offset 中
数据地址:链接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ 提取码:hell
3.1 原始 Kafka 日志
sample.log 格局如下:
咱们将它先放到文件里,模仿生产环境下 xx.log
3.2 创立两个 topic,并创立 KafkaProducer 来嫁给你数据写入 mytopic1
一个用来放原始的日志数据,一个用来放解决过后的日志
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1
启动 redis 服务:
./redis-server redis.conf
查看 mytopic1 数据
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning
3.3 代码实现
第一局部,解决原始文件数据写入 mytopic1
package com.hoult.Streaming.work
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object FilerToKafka {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
// 定义 kafka producer 参数
val lines: RDD[String] = sc.textFile("data/sample.log")
// 定义 kafka producer 参数
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// 将读取到的数据发送到 mytopic1
lines.foreachPartition{iter =>
// KafkaProducer
val producer = new KafkaProducer[String, String](prop)
iter.foreach{line =>
val record = new ProducerRecord[String, String]("mytopic1", line)
producer.send(record)
}
producer.close()}
}
}
第二局部,streaming 读取 mytopic1 的数据,写入 mytopic2
package com.hoult.Streaming.work
import java.util.Properties
import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 每秒解决 Kafka 数据,生成结构化数据,输出另外一个 Kafka topic
*/
object KafkaStreamingETL {val log = Logger.getLogger(this.getClass)
def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// 须要生产的 topic
val topics: Array[String] = Array("mytopic1")
val groupid = "mygroup1"
// 定义 kafka 相干参数
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
// 从 Redis 获取 offset
val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)
// 创立 DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
// 从 kafka 中读取数据
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
)
// 转换后的数据发送到另一个 topic
dstream.foreachRDD{rdd =>
if (!rdd.isEmpty) {val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(process)
// 将 offset 保留到 Redis
OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
}
}
// 启动作业
ssc.start()
ssc.awaitTermination()}
def process(iter: Iterator[ConsumerRecord[String, String]]) = {iter.map(line => parse(line.value))
.filter(!_.isEmpty)
// .foreach(println)
.foreach(line =>sendMsg2Topic(line, "mytopic2"))
}
def parse(text: String): String = {
try{val arr = text.replace("<<<!>>>", "").split(",")
if (arr.length != 15) return ""arr.mkString("|")
} catch {
case e: Exception =>
log.error("解析数据出错!", e)
""
}
}
def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
}
def getKafkaProducerParameters(): Properties = {val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop
}
def sendMsg2Topic(msg: String, topic: String): Unit = {val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
val record = new ProducerRecord[String, String](topic, msg)
producer.send(record)
}
}
第三局部,从 redis 中读写 offset 的工具
package com.hoult.Streaming.kafka
import java.util
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
import scala.collection.mutable
object OffsetsWithRedisUtils {
// 定义 Redis 参数
private val redisHost = "linux121"
private val redisPort = 6379
// 获取 Redis 的连贯
private val config = new JedisPoolConfig
// 最大闲暇数
config.setMaxIdle(5)
// 最大连接数
config.setMaxTotal(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private def getRedisConnection: Jedis = pool.getResource
private val topicPrefix = "kafka:topic"
// Key:kafka:topic:TopicName:groupid
private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"
// 依据 key 获取 offsets
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
val key = getKey(topic, groupId)
import scala.collection.JavaConverters._
jedis.hgetAll(key)
.asScala
.map {case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
}
// 偿还资源
jedis.close()
offsets.flatten.toMap
}
// 将 offsets 保留到 Redis 中
def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
// 获取连贯
val jedis: Jedis = getRedisConnection
// 组织数据
offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
.groupBy(_._1)
.foreach{case (topic, buffer) =>
val key: String = getKey(topic, groupId)
import scala.collection.JavaConverters._
val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava
// 保留数据
jedis.hmset(key, maps)
}
jedis.close()}
def main(args: Array[String]): Unit = {val topics = Array("mytopic1")
val groupid = "mygroup1"
val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)
x.foreach(println)
}
}
3.4 演示
- 启动 redis
./redis-server ./redis.conf
- 启动 kafka 并创立 topic
sh scripts/kafka.sh start
3.2 创立两个 topic,并创立 KafkaProducer 来嫁给你数据写入 mytopic1 - 启动 FilerToKafka 和 KafkaStreamingETL
残缺代码:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/Streaming/work
4.spark-streamin 注意事项
spark-streaming 读文件读不到的问题,读取本地文件时候,要留神,它不会读取本来就存在于该文件里的文本,只会读取在监听期间,传进文件夹里的数据,而且本文本还有要求,必须是它组后一次更改并且保留的操作,是在监听开始的那一刻
之后的,其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先关上这个文本,轻易输出几个空格,或者回车,或者其余不影响文本内容的操作,而后保留,最初再传进文件夹里,这样它能力
检测到这个被传进来的文本。(预计它这个用意是只监听被更改过的文本吧),参考:https://www.codeleading.com/article/9561702251/
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注