关于spark-streaming:第十篇SparkStreaming手动维护Kafka-Offset的几种方式

Spark Streaming No Receivers 形式的createDirectStream 办法不应用接收器,而是创立输出流间接从Kafka 集群节点拉取音讯。输出流保障每个音讯从Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据(即Exactly Once语义),单纯的依附SparkStreaming自身的机制是不太现实的,生产环境中通常借助手动治理offset的形式来保护kafka的生产位点。本文分享将介绍如何手动治理Kafka的Offset,心愿对你有所帮忙。本文次要包含以下内容:

  • 如何应用MySQL治理Kafka的Offset
  • 如何应用Redis治理Kafka的OffSet

如何应用MySQL治理Kafka的Offset

咱们能够从Spark Streaming 应用程序中编写代码来手动治理Kafka偏移量,偏移量能够从每一批流解决中生成的RDDS偏移量来获取,获取形式为:

KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 ...
  }

当获取到偏移量之后,能够将将其保留到内部存储设备中(MySQL、Redis、Zookeeper、HBase等)。

应用案例代码

  • MySQL中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` (
  `topic` varchar(255) NOT NULL,
  `partition` int(11) NOT NULL,
  `groupid` varchar(255) NOT NULL,
  `offset` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
  • 常量配置类:ConfigConstants
object ConfigConstants {
  // Kafka配置
  val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
  val groupId = "group_test"
  val kafkaTopics = "test"
  val batchInterval = Seconds(5)
  val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
  val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
  val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
  val batchSize = 16384
  val lingerMs = 1
  val bufferMemory = 33554432
  // MySQL配置
  val user = "root"
  val password = "123qwe"
  val url = "jdbc:mysql://localhost:3306/kafka_offset"
  val driver = "com.mysql.jdbc.Driver"
  // 检查点配置
  val checkpointDir = "file:///e:/checkpoint"
  val checkpointInterval = Seconds(10)
  // Redis配置
  val redisAddress = "192.168.10.203"
  val redisPort = 6379
  val redisAuth = "123qwe"
  val redisTimeout = 3000
}
  • JDBC连贯工具类:JDBCConnPool
object JDBCConnPool {

  val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
  var dataSource: BasicDataSource = null
  /**
    * 创立数据源
    *
    * @return
    */
  def getDataSource(): BasicDataSource = {
    if (dataSource == null) {
      dataSource = new BasicDataSource()
      dataSource.setDriverClassName(ConfigConstants.driver)
      dataSource.setUrl(ConfigConstants.url)
      dataSource.setUsername(ConfigConstants.user)
      dataSource.setPassword(ConfigConstants.password)
      dataSource.setMaxTotal(50)
      dataSource.setInitialSize(3)
      dataSource.setMinIdle(3)
      dataSource.setMaxIdle(10)
      dataSource.setMaxWaitMillis(2 * 10000)
      dataSource.setRemoveAbandonedTimeout(180)
      dataSource.setRemoveAbandonedOnBorrow(true)
      dataSource.setRemoveAbandonedOnMaintenance(true)
      dataSource.setTestOnReturn(true)
      dataSource.setTestOnBorrow(true)
    }
    return dataSource
  }
  /**
    * 开释数据源
    */
  def closeDataSource() = {
    if (dataSource != null) {
      dataSource.close()
    }
  }
  /**
    * 获取数据库连贯
    *
    * @return
    */
  def getConnection(): Connection = {
    var conn: Connection = null
    try {
      if (dataSource != null) {
        conn = dataSource.getConnection()
      } else {
        conn = getDataSource().getConnection()
      }
    } catch {
      case e: Exception =>
        log.error(e.getMessage(), e)
    }
    conn
  }

  /**
    * 敞开连贯
    */
 def closeConnection (ps:PreparedStatement , conn:Connection ) {
    if (ps != null) {
      try {
        ps.close();
      } catch  {
        case e:Exception =>
          log.error("预编译SQL语句对象PreparedStatement敞开异样!" + e.getMessage(), e);
      }
    }
    if (conn != null) {
      try {
        conn.close();
      } catch  {
        case e:Exception =>
        log.error("敞开连贯对象Connection异样!" + e.getMessage(), e);
      }
    }
  }
}
  • Kafka生产者:KafkaProducerTest
object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val  props : Properties = new Properties()
    props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
    props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
    props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
    props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
    props.put("key.serializer",ConfigConstants.kafkaKeySer)
    props.put("value.serializer", ConfigConstants.kafkaValueSer)
   val  producer :  Producer[String, String] = new KafkaProducer[String, String](props)
    val startTime : Long  = System.currentTimeMillis()
    for ( i <- 1 to 100) {
      producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
    }
  println("耗费工夫:" + (System.currentTimeMillis() - startTime))
    producer.close()
  }
}
  • 读取和保留Offset:

该对象的作用是从外部设备中读取和写入Offset,包含MySQL和Redis

object OffsetReadAndSave {

  /**
    * 从MySQL中获取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */

  def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {

    val conn = JDBCConnPool.getConnection()
    val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
    val ppst = conn.prepareStatement(selectSql)
    ppst.setString(1, groupid)
    ppst.setString(2, topic)

    val result: ResultSet = ppst.executeQuery()

    // 主题分区偏移量
    val topicPartitionOffset = mutable.Map[TopicPartition, Long]()

    while (result.next()) {

      val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))

      topicPartitionOffset += (topicPartition -> result.getLong("offset"))
    }

    JDBCConnPool.closeConnection(ppst, conn)
    topicPartitionOffset
  }

  /**
    * 从Redis中获取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */
  def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = JedisConnPool.getConnection()
    var offsets = mutable.Map[TopicPartition, Long]()

    val key = s"${topic}_${groupid}"
    val fields : java.util.Map[String, String] = jedis.hgetAll(key)
    for (partition <- JavaConversions.mapAsScalaMap(fields)) {

      offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
    }

    offsets.toMap

  }
  /**
    * 将偏移量写入MySQL
    *
    * @param groupid     消费者组ID
    * @param offsetRange 音讯偏移量范畴
    */

  def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {

    val conn = JDBCConnPool.getConnection()
    val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
    val ppst = conn.prepareStatement(insertSql)

    for (offset <- offsetRange) {

      ppst.setString(1, offset.topic)
      ppst.setInt(2, offset.partition)
      ppst.setString(3, groupid)
      ppst.setLong(4, offset.untilOffset)
      ppst.executeUpdate()
    }
    JDBCConnPool.closeConnection(ppst, conn)

  }
  /**
    * 将偏移量保留到Redis中
    * @param groupid
    * @param offsetRange
    */
  def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
    val jedis :Jedis = JedisConnPool.getConnection()
    for(offsetRange<-offsetRange){
      val topic=offsetRange.topic
      val partition=offsetRange.partition
      val offset=offsetRange.untilOffset
      // key为topic_groupid,field为partition,value为offset
      jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
    }
  }
}
  • 业务解决类

该对象是业务解决逻辑,次要是生产Kafka数据,再解决之后进行手动将偏移量保留到MySQL中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在Offset,则从以后的Offset去生产。

察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。

object ManualCommitOffset {
  
  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer",ConfigConstants.sparkSerializer)

    val ssc = new StreamingContext(conf, batchInterval)
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //应用broker和topic创立direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka连贯参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 从MySQL中读取该主题对应的消费者组的分区偏移量
    val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果MySQL中曾经存在了偏移量,则应该从该偏移量处开始生产
    if (offsetMap.size > 0) {
      println("存在偏移量,从该偏移量处进行生产!!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果MySQL中没有存在了偏移量,从最早开始生产
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint工夫距离,必须是batchInterval的整数倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保留batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 获取以后DS的音讯偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 获取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 状态更新函数
      * @param newValues:新的value值
      * @param stateValue:状态值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 获取状态值
      // 遍历以后数据,并更新状态
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的状态
      Option(oldvalue)
    }
    // 业务逻辑解决
    // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和数据信息,察看输入的后果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍历打印该RDD数据
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印生产偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //将偏移量保留到到MySQL中
      OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

如何应用Redis治理Kafka的OffSet

  • Redis连贯类
object JedisConnPool {
  val config = new JedisPoolConfig
  //最大连接数
  config.setMaxTotal(60)
  //最大闲暇连接数
  config.setMaxIdle(10)
  config.setTestOnBorrow(true)

  //服务器ip
  val redisAddress :String = ConfigConstants.redisAddress.toString
  // 端口号
  val redisPort:Int = ConfigConstants.redisPort.toInt
  //拜访明码
  val redisAuth :String = ConfigConstants.redisAuth.toString
  //期待可用连贯的最大工夫
  val redisTimeout:Int = ConfigConstants.redisTimeout.toInt

  val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)

  def getConnection():Jedis = {
    pool.getResource
  }

}
  • 业务逻辑解决

该对象与下面的根本相似,只不过应用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,根本格局为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。

object ManualCommitOffsetToRedis {

  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer", ConfigConstants.sparkSerializer)


    val ssc = new StreamingContext(conf, batchInterval)
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //应用broker和topic创立direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka连贯参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 从Redis中读取该主题对应的消费者组的分区偏移量
    val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果Redis中曾经存在了偏移量,则应该从该偏移量处开始生产
    if (offsetMap.size > 0) {
      println("存在偏移量,从该偏移量处进行生产!!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果Redis中没有存在了偏移量,从最早开始生产
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint工夫距离,必须是batchInterval的整数倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保留batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 获取以后DS的音讯偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 获取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 状态更新函数
      *
      * @param newValues  :新的value值
      * @param stateValue :状态值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 获取状态值
      // 遍历以后数据,并更新状态
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的状态
      Option(oldvalue)
    }
    // 业务逻辑解决
    // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和数据信息,察看输入的后果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍历打印该RDD数据
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印生产偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //将偏移量保留到到Redis中
      OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }

}

总结

本文介绍了如何应用内部存储设备来保留Kafka的生产位点,通过具体的代码示例阐明了应用MySQL和Redis治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理Offset,比方Zookeeper和HBase等,其根本解决思路都十分相似。

Spark Streaming No Receivers 形式的createDirectStream 办法不应用接收器,而是创立输出流间接从Kafka 集群节点拉取音讯。输出流保障每个音讯从Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据(即Exactly Once语义),单纯的依附SparkStreaming自身的机制是不太现实的,生产环境中通常借助手动治理offset的形式来保护kafka的生产位点。本文分享将介绍如何手动治理Kafka的Offset,心愿对你有所帮忙。本文次要包含以下内容:

  • 如何应用MySQL治理Kafka的Offset
  • 如何应用Redis治理Kafka的OffSet

如何应用MySQL治理Kafka的Offset

咱们能够从Spark Streaming 应用程序中编写代码来手动治理Kafka偏移量,偏移量能够从每一批流解决中生成的RDDS偏移量来获取,获取形式为:

KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 ...
  }

当获取到偏移量之后,能够将将其保留到内部存储设备中(MySQL、Redis、Zookeeper、HBase等)。

应用案例代码

  • MySQL中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` (
  `topic` varchar(255) NOT NULL,
  `partition` int(11) NOT NULL,
  `groupid` varchar(255) NOT NULL,
  `offset` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
  • 常量配置类:ConfigConstants
object ConfigConstants {
  // Kafka配置
  val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
  val groupId = "group_test"
  val kafkaTopics = "test"
  val batchInterval = Seconds(5)
  val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
  val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
  val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
  val batchSize = 16384
  val lingerMs = 1
  val bufferMemory = 33554432
  // MySQL配置
  val user = "root"
  val password = "123qwe"
  val url = "jdbc:mysql://localhost:3306/kafka_offset"
  val driver = "com.mysql.jdbc.Driver"
  // 检查点配置
  val checkpointDir = "file:///e:/checkpoint"
  val checkpointInterval = Seconds(10)
  // Redis配置
  val redisAddress = "192.168.10.203"
  val redisPort = 6379
  val redisAuth = "123qwe"
  val redisTimeout = 3000
}
  • JDBC连贯工具类:JDBCConnPool
object JDBCConnPool {

  val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
  var dataSource: BasicDataSource = null
  /**
    * 创立数据源
    *
    * @return
    */
  def getDataSource(): BasicDataSource = {
    if (dataSource == null) {
      dataSource = new BasicDataSource()
      dataSource.setDriverClassName(ConfigConstants.driver)
      dataSource.setUrl(ConfigConstants.url)
      dataSource.setUsername(ConfigConstants.user)
      dataSource.setPassword(ConfigConstants.password)
      dataSource.setMaxTotal(50)
      dataSource.setInitialSize(3)
      dataSource.setMinIdle(3)
      dataSource.setMaxIdle(10)
      dataSource.setMaxWaitMillis(2 * 10000)
      dataSource.setRemoveAbandonedTimeout(180)
      dataSource.setRemoveAbandonedOnBorrow(true)
      dataSource.setRemoveAbandonedOnMaintenance(true)
      dataSource.setTestOnReturn(true)
      dataSource.setTestOnBorrow(true)
    }
    return dataSource
  }
  /**
    * 开释数据源
    */
  def closeDataSource() = {
    if (dataSource != null) {
      dataSource.close()
    }
  }
  /**
    * 获取数据库连贯
    *
    * @return
    */
  def getConnection(): Connection = {
    var conn: Connection = null
    try {
      if (dataSource != null) {
        conn = dataSource.getConnection()
      } else {
        conn = getDataSource().getConnection()
      }
    } catch {
      case e: Exception =>
        log.error(e.getMessage(), e)
    }
    conn
  }

  /**
    * 敞开连贯
    */
 def closeConnection (ps:PreparedStatement , conn:Connection ) {
    if (ps != null) {
      try {
        ps.close();
      } catch  {
        case e:Exception =>
          log.error("预编译SQL语句对象PreparedStatement敞开异样!" + e.getMessage(), e);
      }
    }
    if (conn != null) {
      try {
        conn.close();
      } catch  {
        case e:Exception =>
        log.error("敞开连贯对象Connection异样!" + e.getMessage(), e);
      }
    }
  }
}
  • Kafka生产者:KafkaProducerTest
object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val  props : Properties = new Properties()
    props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
    props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
    props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
    props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
    props.put("key.serializer",ConfigConstants.kafkaKeySer)
    props.put("value.serializer", ConfigConstants.kafkaValueSer)
   val  producer :  Producer[String, String] = new KafkaProducer[String, String](props)
    val startTime : Long  = System.currentTimeMillis()
    for ( i <- 1 to 100) {
      producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
    }
  println("耗费工夫:" + (System.currentTimeMillis() - startTime))
    producer.close()
  }
}
  • 读取和保留Offset:

该对象的作用是从外部设备中读取和写入Offset,包含MySQL和Redis

object OffsetReadAndSave {

  /**
    * 从MySQL中获取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */

  def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {

    val conn = JDBCConnPool.getConnection()
    val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
    val ppst = conn.prepareStatement(selectSql)
    ppst.setString(1, groupid)
    ppst.setString(2, topic)

    val result: ResultSet = ppst.executeQuery()

    // 主题分区偏移量
    val topicPartitionOffset = mutable.Map[TopicPartition, Long]()

    while (result.next()) {

      val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))

      topicPartitionOffset += (topicPartition -> result.getLong("offset"))
    }

    JDBCConnPool.closeConnection(ppst, conn)
    topicPartitionOffset
  }

  /**
    * 从Redis中获取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */
  def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = JedisConnPool.getConnection()
    var offsets = mutable.Map[TopicPartition, Long]()

    val key = s"${topic}_${groupid}"
    val fields : java.util.Map[String, String] = jedis.hgetAll(key)
    for (partition <- JavaConversions.mapAsScalaMap(fields)) {

      offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
    }

    offsets.toMap

  }
  /**
    * 将偏移量写入MySQL
    *
    * @param groupid     消费者组ID
    * @param offsetRange 音讯偏移量范畴
    */

  def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {

    val conn = JDBCConnPool.getConnection()
    val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
    val ppst = conn.prepareStatement(insertSql)

    for (offset <- offsetRange) {

      ppst.setString(1, offset.topic)
      ppst.setInt(2, offset.partition)
      ppst.setString(3, groupid)
      ppst.setLong(4, offset.untilOffset)
      ppst.executeUpdate()
    }
    JDBCConnPool.closeConnection(ppst, conn)

  }
  /**
    * 将偏移量保留到Redis中
    * @param groupid
    * @param offsetRange
    */
  def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
    val jedis :Jedis = JedisConnPool.getConnection()
    for(offsetRange<-offsetRange){
      val topic=offsetRange.topic
      val partition=offsetRange.partition
      val offset=offsetRange.untilOffset
      // key为topic_groupid,field为partition,value为offset
      jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
    }
  }
}
  • 业务解决类

该对象是业务解决逻辑,次要是生产Kafka数据,再解决之后进行手动将偏移量保留到MySQL中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在Offset,则从以后的Offset去生产。

察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。

object ManualCommitOffset {
  
  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer",ConfigConstants.sparkSerializer)

    val ssc = new StreamingContext(conf, batchInterval)
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //应用broker和topic创立direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka连贯参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 从MySQL中读取该主题对应的消费者组的分区偏移量
    val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果MySQL中曾经存在了偏移量,则应该从该偏移量处开始生产
    if (offsetMap.size > 0) {
      println("存在偏移量,从该偏移量处进行生产!!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果MySQL中没有存在了偏移量,从最早开始生产
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint工夫距离,必须是batchInterval的整数倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保留batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 获取以后DS的音讯偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 获取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 状态更新函数
      * @param newValues:新的value值
      * @param stateValue:状态值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 获取状态值
      // 遍历以后数据,并更新状态
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的状态
      Option(oldvalue)
    }
    // 业务逻辑解决
    // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和数据信息,察看输入的后果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍历打印该RDD数据
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印生产偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //将偏移量保留到到MySQL中
      OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

如何应用Redis治理Kafka的OffSet

  • Redis连贯类
object JedisConnPool {
  val config = new JedisPoolConfig
  //最大连接数
  config.setMaxTotal(60)
  //最大闲暇连接数
  config.setMaxIdle(10)
  config.setTestOnBorrow(true)

  //服务器ip
  val redisAddress :String = ConfigConstants.redisAddress.toString
  // 端口号
  val redisPort:Int = ConfigConstants.redisPort.toInt
  //拜访明码
  val redisAuth :String = ConfigConstants.redisAuth.toString
  //期待可用连贯的最大工夫
  val redisTimeout:Int = ConfigConstants.redisTimeout.toInt

  val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)

  def getConnection():Jedis = {
    pool.getResource
  }

}
  • 业务逻辑解决

该对象与下面的根本相似,只不过应用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,根本格局为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。

object ManualCommitOffsetToRedis {

  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer", ConfigConstants.sparkSerializer)


    val ssc = new StreamingContext(conf, batchInterval)
    // 必须开启checkpoint,否则会报错
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //应用broker和topic创立direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka连贯参数
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 从Redis中读取该主题对应的消费者组的分区偏移量
    val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果Redis中曾经存在了偏移量,则应该从该偏移量处开始生产
    if (offsetMap.size > 0) {
      println("存在偏移量,从该偏移量处进行生产!!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果Redis中没有存在了偏移量,从最早开始生产
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint工夫距离,必须是batchInterval的整数倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保留batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 获取以后DS的音讯偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 获取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 状态更新函数
      *
      * @param newValues  :新的value值
      * @param stateValue :状态值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 获取状态值
      // 遍历以后数据,并更新状态
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的状态
      Option(oldvalue)
    }
    // 业务逻辑解决
    // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和数据信息,察看输入的后果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍历打印该RDD数据
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印生产偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //将偏移量保留到到Redis中
      OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }

}

总结

本文介绍了如何应用内部存储设备来保留Kafka的生产位点,通过具体的代码示例阐明了应用MySQL和Redis治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理Offset,比方Zookeeper和HBase等,其根本解决思路都十分相似。

公主号:大数据技术与数仓

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理