关于大数据:大数据开发Spark开发Streaming处理数据-写入Kafka

45次阅读

共计 9028 个字符,预计需要花费 23 分钟才能阅读完成。

1.Spark Streaming 简介

Spark Streaming 从各种输出源中读取数据,并把数据分组为小的批次。新的批次按平均的工夫距离创立进去。在每个工夫区间开始的时候,一个新的批次就创立进去,在该区间内收到的数据都会被增加到这个批次中。在工夫区间完结时,批次进行增长,工夫区间的大小是由批次距离这个参数决定的。批次距离个别设在 500 毫秒到几秒之间,由开发者配置。每个输出批次都造成一个 RDD,以 Spark 作业的形式解决并生成其余的 RDD。解决的后果能够以批处理的形式传给内部零碎,Spark Streaming 的编程形象是离散化流,也就是 DStream。它是一个 RDD 序列,每个 RDD 代表数据流中一个工夫片内的数据。另外退出了窗口操作和状态转化,其余和批次解决相似。

与 StructedStreaming 的区别

StructedStreaming 诞生于 2.x 后,次要用于解决结构化数据,除了实现与 Spark Streaming 的批处理,还实现了 long-running 的 task,次要了解为解决的机会能够是数据的生产工夫,而非收到数据的工夫,能够细看下表:

流解决模式 SparkStreaming Structed Streaming
执行模式 Micro Batch Micro batch / Streaming
API Dstream/streamingContext Dataset/DataFrame,SparkSession
Job 生成形式 Timer 定时器定时生成 job Trigger 触发
反对数据源 Socket,filstream,kafka,zeroMq,flume,kinesis Socket,filstream,kafka,ratesource
executed-based Executed based on dstream api Executed based on sparksql
Time based Processing Time ProcessingTime & eventTIme
UI Built-in No

对于流解决,当初生产环境下应用 Flink 较多,数据源形式,当初根本是以 kafka 为主,所以本文对 Spark Streaming 的场景即 ETL 流解决结构化日志,将后果输出 Kafka 队列

2.Spark Sreaming 的运行流程

1、客户端提交 Spark Streaming 作业后启动 Driver,Driver 启动 Receiver,Receiver 接收数据源的数据

2、每个作业蕴含多个 Executor,每个 Executor 以线程的形式运行 task,SparkStreaming 至多蕴含一个 receiver task(个别状况下)

3、Receiver 接收数据后生成 Block,并把 BlockId 汇报给 Driver,而后备份到另外一个 Executor 上

4、ReceiverTracker 保护 Reciver 汇报的 BlockId

5、Driver 定时启动 JobGenerator,依据 Dstream 的关系生成逻辑 RDD,而后创立 Jobset,交给 JobScheduler

6、JobScheduler 负责调度 Jobset,交给 DAGScheduler,DAGScheduler 依据逻辑 RDD,生成相应的 Stages,每个 stage 蕴含一到多个 Task,将 TaskSet 提交给 TaskSchedule

7、TaskScheduler 负责把 Task 调度到 Executor 上,并保护 Task 的运行状态

罕用数据源的读取形式

常数据流:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

Socket:

    val rdd: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordDStream: ConstantInputDStream[String] = new ConstantInputDStream(ssc, rdd)

RDD 队列:

    val queue = new Queue[RDD[Int]]()
    val queueDStream: InputDStream[Int] = ssc.queueStream(queue)

文件夹:

    val lines: DStream[String] = ssc.textFileStream("data/log/")

3. 案例阐明

生产上,罕用流程如下,批处理原始 Kafka 日志,比方申请打点日志等,应用 Spark Streaming 来将数据荡涤转变为肯定格局再导入 Kafka 中, 为了保障 exact-once,会将 offer 本人来保留,次要保留在 redis-offset 中

数据地址:链接:https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ 提取码:hell

3.1 原始 Kafka 日志

sample.log 格局如下:

咱们将它先放到文件里,模仿生产环境下 xx.log

3.2 创立两个 topic,并创立 KafkaProducer 来嫁给你数据写入 mytopic1

一个用来放原始的日志数据,一个用来放解决过后的日志

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic1 --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic mytopic2 --partitions 1 --replication-factor 1  

启动 redis 服务:

./redis-server redis.conf

查看 mytopic1 数据

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic mytopic1 --from-beginning

3.3 代码实现

第一局部,解决原始文件数据写入 mytopic1

package com.hoult.Streaming.work

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FilerToKafka {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 定义 kafka producer 参数
    val lines: RDD[String] = sc.textFile("data/sample.log")

    // 定义 kafka producer 参数
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    // 将读取到的数据发送到 mytopic1
    lines.foreachPartition{iter =>
      // KafkaProducer
      val producer = new KafkaProducer[String, String](prop)
      iter.foreach{line =>
        val record = new ProducerRecord[String, String]("mytopic1", line)
        producer.send(record)
      }
      producer.close()}
  }
}

第二局部,streaming 读取 mytopic1 的数据,写入 mytopic2

package com.hoult.Streaming.work

import java.util.Properties

import com.hoult.Streaming.kafka.OffsetsWithRedisUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 * 每秒解决 Kafka 数据,生成结构化数据,输出另外一个 Kafka topic
 */
object KafkaStreamingETL {val log = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 须要生产的 topic
    val topics: Array[String] = Array("mytopic1")
    val groupid = "mygroup1"
    // 定义 kafka 相干参数
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
    // 从 Redis 获取 offset
    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)

    // 创立 DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      // 从 kafka 中读取数据
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
    )

    // 转换后的数据发送到另一个 topic
    dstream.foreachRDD{rdd =>
      if (!rdd.isEmpty) {val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(process)
        // 将 offset 保留到 Redis
        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
      }
    }

    // 启动作业
    ssc.start()
    ssc.awaitTermination()}

  def process(iter: Iterator[ConsumerRecord[String, String]]) = {iter.map(line => parse(line.value))
      .filter(!_.isEmpty)
//      .foreach(println)
      .foreach(line =>sendMsg2Topic(line, "mytopic2"))
  }

  def parse(text: String): String = {
    try{val arr = text.replace("<<<!>>>", "").split(",")
      if (arr.length != 15) return ""arr.mkString("|")
    } catch {
      case e: Exception =>
        log.error("解析数据出错!", e)
        ""
    }
  }

  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
  }

  def getKafkaProducerParameters(): Properties = {val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop
  }

  def sendMsg2Topic(msg: String, topic: String): Unit = {val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
    val record = new ProducerRecord[String, String](topic, msg)
    producer.send(record)
  }
}

第三局部,从 redis 中读写 offset 的工具

package com.hoult.Streaming.kafka

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsWithRedisUtils {
  // 定义 Redis 参数
  private val redisHost = "linux121"
  private val redisPort = 6379

  // 获取 Redis 的连贯
  private val config = new JedisPoolConfig
  // 最大闲暇数
  config.setMaxIdle(5)
  // 最大连接数
  config.setMaxTotal(10)

  private val pool = new JedisPool(config, redisHost, redisPort, 10000)
  private def getRedisConnection: Jedis = pool.getResource

  private val topicPrefix = "kafka:topic"

  // Key:kafka:topic:TopicName:groupid
  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"

  // 依据 key 获取 offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection

    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
      val key = getKey(topic, groupId)

      import scala.collection.JavaConverters._

      jedis.hgetAll(key)
        .asScala
        .map {case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
    }

    // 偿还资源
    jedis.close()

    offsets.flatten.toMap
  }

  // 将 offsets 保留到 Redis 中
  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
    // 获取连贯
    val jedis: Jedis = getRedisConnection

    // 组织数据
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
        .groupBy(_._1)
      .foreach{case (topic, buffer) =>
        val key: String = getKey(topic, groupId)

        import scala.collection.JavaConverters._
        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava

        // 保留数据
        jedis.hmset(key, maps)
      }

    jedis.close()}

  def main(args: Array[String]): Unit = {val topics = Array("mytopic1")
    val groupid = "mygroup1"
    val x: Map[TopicPartition, Long] = getOffsetsFromRedis(topics, groupid)
    x.foreach(println)
  }
}

3.4 演示

  • 启动 redis ./redis-server ./redis.conf
  • 启动 kafka 并创立 topic sh scripts/kafka.sh start 3.2 创立两个 topic,并创立 KafkaProducer 来嫁给你数据写入 mytopic1
  • 启动 FilerToKafka 和 KafkaStreamingETL


残缺代码:https://github.com/hulichao/bigdata-spark/blob/master/src/main/scala/com/hoult/Streaming/work

4.spark-streamin 注意事项

spark-streaming 读文件读不到的问题,读取本地文件时候,要留神,它不会读取本来就存在于该文件里的文本,只会读取在监听期间,传进文件夹里的数据,而且本文本还有要求,必须是它组后一次更改并且保留的操作,是在监听开始的那一刻
之后的,其实意思就是,如果要向被监听的文件夹里传一个文本,你就要在监听开始之后,先关上这个文本,轻易输出几个空格,或者回车,或者其余不影响文本内容的操作,而后保留,最初再传进文件夹里,这样它能力
检测到这个被传进来的文本。(预计它这个用意是只监听被更改过的文本吧),参考:https://www.codeleading.com/article/9561702251/
吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注

正文完
 0