1.Spark Streaming简介

Spark Streaming从各种输出源中读取数据,并把数据分组为小的批次。新的批次按平均的工夫距离创立进去。在每个工夫区间开始的时候,一个新的批次就创立进去,在该区间内收到的数据都会被增加到这个批次中。在工夫区间完结时,批次进行增长,工夫区间的大小是由批次距离这个参数决定的。批次距离个别设在500毫秒到几秒之间,由开发者配置。每个输出批次都造成一个RDD,以 Spark 作业的形式解决并生成其余的 RDD。 解决的后果能够以批处理的形式传给内部零碎,Spark Streaming的编程形象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个工夫片内的数据。另外退出了窗口操作和状态转化,其余和批次解决相似。

与StructedStreaming的区别

StructedStreaming诞生于2.x后,次要用于解决结构化数据,除了实现与Spark Streaming的批处理,还实现了long-running的task,次要了解为解决的机会能够是数据的生产工夫,而非收到数据的工夫,能够细看下表:

流解决模式SparkStreamingStructed Streaming
执行模式Micro BatchMicro batch / Streaming
APIDstream/streamingContextDataset/DataFrame,SparkSession
Job 生成形式Timer定时器定时生成jobTrigger触发
反对数据源Socket,filstream,kafka,zeroMq,flume,kinesisSocket,filstream,kafka,ratesource
executed-basedExecuted based on dstream apiExecuted based on sparksql
Time basedProcessing TimeProcessingTime & eventTIme
UIBuilt-inNo

对于流解决,当初生产环境下应用Flink较多,数据源形式,当初根本是以kafka为主,所以本文对Spark Streaming的场景即ETL流解决结构化日志,将后果输出Kafka队列

2.Spark Sreaming的运行流程

1、客户端提交Spark Streaming作业后启动Driver,Driver启动Receiver,Receiver接收数据源的数据

2、每个作业蕴含多个Executor,每个Executor以线程的形式运行task,SparkStreaming至多蕴含一个receiver task(个别状况下)

3、Receiver接收数据后生成Block,并把BlockId汇报给Driver,而后备份到另外一个 Executor 上

4、ReceiverTracker保护 Reciver 汇报的BlockId

5、Driver定时启动JobGenerator,依据Dstream的关系生成逻辑RDD,而后创立Jobset,交给JobScheduler

6、JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler依据逻辑RDD,生成相应的Stages,每个stage蕴含一到多个Task,将TaskSet提交给TaskSchedule

7、TaskScheduler负责把 Task 调度到 Executor 上,并保护 Task 的运行状态

罕用数据源的读取形式

常数据流:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

Socket:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

RDD队列:

    val queue = new Queue[RDD[Int]]()    val queueDStream: InputDStream[Int] = ssc.queueStream(queue)

文件夹:

    val lines: DStream[String] = ssc.textFileStream("data/log/")

3.案例阐明

生产上,罕用流程如下,批处理原始Kafka日志,比方申请打点日志等,应用Spark Streaming来将数据荡涤转变为肯定格局再导入Kafka中,为了保障exact-once,会将offer本人来保留,次要保留在redis-offset中

数据地址:链接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ提取码:hell

3.1 原始Kafka日志

sample.log格局如下:

咱们将它先放到文件里,模仿生产环境下xx.log

3.2 创立两个topic,并创立KafkaProducer来嫁给你数据写入mytopic1

一个用来放原始的日志数据,一个用来放解决过后的日志

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1  

启动redis服务:

./redis-server redis.conf

查看mytopic1数据

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning

3.3 代码实现

第一局部,解决原始文件数据写入mytopic1

package com.hoult.Streaming.workimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object FilerToKafka {  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.ERROR)    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")    val sc = new SparkContext(conf)    // 定义 kafka producer参数    val lines: RDD[String] = sc.textFile("data/sample.log")    // 定义 kafka producer参数    val prop = new Properties()    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    // 将读取到的数据发送到mytopic1    lines.foreachPartition{iter =>      // KafkaProducer      val producer = new KafkaProducer[String, String](prop)      iter.foreach{line =>        val record = new ProducerRecord[String, String]("mytopic1", line)        producer.send(record)      }      producer.close()    }  }}

第二局部,streaming读取mytopic1的数据,写入mytopic2

package com.hoult.Streaming.workimport java.util.Propertiesimport com.hoult.Streaming.kafka.OffsetsWithRedisUtilsimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}import org.apache.spark.streaming.{Seconds, StreamingContext}/** * 每秒解决Kafka数据,生成结构化数据,输出另外一个Kafka topic */object KafkaStreamingETL {  val log = Logger.getLogger(this.getClass)  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.ERROR)    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(5))    // 须要生产的topic    val topics: Array[String] = Array("mytopic1")    val groupid = "mygroup1"    // 定义kafka相干参数    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)    // 从Redis获取offset    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)    // 创立DStream    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(      ssc,      LocationStrategies.PreferConsistent,      // 从kafka中读取数据      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)    )    // 转换后的数据发送到另一个topic    dstream.foreachRDD{rdd =>      if (!rdd.isEmpty) {        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges        rdd.foreachPartition(process)        // 将offset保留到Redis        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)      }    }    // 启动作业    ssc.start()    ssc.awaitTermination()  }  def process(iter: Iterator[ConsumerRecord[String, String]]) = {    iter.map(line => parse(line.value))      .filter(!_.isEmpty)//      .foreach(println)      .foreach(line =>sendMsg2Topic(line, "mytopic2"))  }  def parse(text: String): String = {    try{      val arr = text.replace("<<<!>>>", "").split(",")      if (arr.length != 15) return ""      arr.mkString("|")    } catch {      case e: Exception =>        log.error("解析数据出错!", e)        ""    }  }  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {    Map[String, Object](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],      ConsumerConfig.GROUP_ID_CONFIG -> groupid,      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"    )  }  def getKafkaProducerParameters(): Properties = {    val prop = new Properties()    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop  }  def sendMsg2Topic(msg: String, topic: String): Unit = {    val producer = new KafkaProducer[String, String](getKafkaProducerParameters())    val record = new ProducerRecord[String, String](topic, msg)    producer.send(record)  }}

第三局部,从redis中读写offset的工具

package com.hoult.Streaming.kafkaimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import scala.collection.mutableobject OffsetsWithRedisUtils {  // 定义Redis参数  private val redisHost = "linux121"  private val redisPort = 6379  // 获取Redis的连贯  private val config = new JedisPoolConfig  // 最大闲暇数  config.setMaxIdle(5)  // 最大连接数  config.setMaxTotal(10)  private val pool = new JedisPool(config, redisHost, redisPort, 10000)  private def getRedisConnection: Jedis = pool.getResource  private val topicPrefix = "kafka:topic"  // Key:kafka:topic:TopicName:groupid  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"  // 依据 key 获取offsets  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {    val jedis: Jedis = getRedisConnection    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>      val key = getKey(topic, groupId)      import scala.collection.JavaConverters._      jedis.hgetAll(key)        .asScala        .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }    }    // 偿还资源    jedis.close()    offsets.flatten.toMap  }  // 将offsets保留到Redis中  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {    // 获取连贯    val jedis: Jedis = getRedisConnection    // 组织数据    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}        .groupBy(_._1)      .foreach{case (topic, buffer) =>        val key: String = getKey(topic, groupId)        import scala.collection.JavaConverters._        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava        // 保留数据        jedis.hmset(key, maps)      }    jedis.close()  }  def main(args: Array[String]): Unit = {    val topics = Array("mytopic1")    val groupid = "mygroup1"    val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)    x.foreach(println)  }}

3.4 演示

  • 启动redis ./redis-server ./redis.conf
  • 启动kafka并创立topic sh scripts/kafka.sh start 3.2 创立两个topic,并创立KafkaProducer来嫁给你数据写入mytopic1
  • 启动FilerToKafka 和 KafkaStreamingETL


残缺代码:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/Streaming/work

4.spark-streamin注意事项

spark-streaming读文件读不到的问题 ,读取本地文件时候,要留神,它不会读取本来就存在于该文件里的文本,只会读取在监听期间,传进文件夹里的数据,而且本文本还有要求,必须是它组后一次更改并且保留的操作,是在监听开始的那一刻
之后的,其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先关上这个文本,轻易输出几个空格,或者回车,或者其余不影响文本内容的操作,而后保留,最初再传进文件夹里,这样它能力
检测到这个被传进来的文本。(预计它这个用意是只监听被更改过的文本吧),参考:https://www.codeleading.com/article/9561702251/
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注