Spark Streaming No Receivers 形式的createDirectStream 办法不应用接收器,而是创立输出流间接从Kafka 集群节点拉取音讯。输出流保障每个音讯从Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据(即Exactly Once语义),单纯的依附SparkStreaming自身的机制是不太现实的,生产环境中通常借助手动治理offset的形式来保护kafka的生产位点。本文分享将介绍如何手动治理Kafka的Offset,心愿对你有所帮忙。本文次要包含以下内容:
- 如何应用MySQL治理Kafka的Offset
- 如何应用Redis治理Kafka的OffSet
如何应用MySQL治理Kafka的Offset
咱们能够从Spark Streaming 应用程序中编写代码来手动治理Kafka偏移量,偏移量能够从每一批流解决中生成的RDDS偏移量来获取,获取形式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>// 获取偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ... }
当获取到偏移量之后,能够将将其保留到内部存储设备中(MySQL、Redis、Zookeeper、HBase等)。
应用案例代码
- MySQL中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
- 常量配置类:ConfigConstants
object ConfigConstants { // Kafka配置 val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092" val groupId = "group_test" val kafkaTopics = "test" val batchInterval = Seconds(5) val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer" val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer" val sparkSerializer = "org.apache.spark.serializer.KryoSerializer" val batchSize = 16384 val lingerMs = 1 val bufferMemory = 33554432 // MySQL配置 val user = "root" val password = "123qwe" val url = "jdbc:mysql://localhost:3306/kafka_offset" val driver = "com.mysql.jdbc.Driver" // 检查点配置 val checkpointDir = "file:///e:/checkpoint" val checkpointInterval = Seconds(10) // Redis配置 val redisAddress = "192.168.10.203" val redisPort = 6379 val redisAuth = "123qwe" val redisTimeout = 3000}
- JDBC连贯工具类:JDBCConnPool
object JDBCConnPool { val log: Logger = Logger.getLogger(JDBCConnPool.getClass) var dataSource: BasicDataSource = null /** * 创立数据源 * * @return */ def getDataSource(): BasicDataSource = { if (dataSource == null) { dataSource = new BasicDataSource() dataSource.setDriverClassName(ConfigConstants.driver) dataSource.setUrl(ConfigConstants.url) dataSource.setUsername(ConfigConstants.user) dataSource.setPassword(ConfigConstants.password) dataSource.setMaxTotal(50) dataSource.setInitialSize(3) dataSource.setMinIdle(3) dataSource.setMaxIdle(10) dataSource.setMaxWaitMillis(2 * 10000) dataSource.setRemoveAbandonedTimeout(180) dataSource.setRemoveAbandonedOnBorrow(true) dataSource.setRemoveAbandonedOnMaintenance(true) dataSource.setTestOnReturn(true) dataSource.setTestOnBorrow(true) } return dataSource } /** * 开释数据源 */ def closeDataSource() = { if (dataSource != null) { dataSource.close() } } /** * 获取数据库连贯 * * @return */ def getConnection(): Connection = { var conn: Connection = null try { if (dataSource != null) { conn = dataSource.getConnection() } else { conn = getDataSource().getConnection() } } catch { case e: Exception => log.error(e.getMessage(), e) } conn } /** * 敞开连贯 */ def closeConnection (ps:PreparedStatement , conn:Connection ) { if (ps != null) { try { ps.close(); } catch { case e:Exception => log.error("预编译SQL语句对象PreparedStatement敞开异样!" + e.getMessage(), e); } } if (conn != null) { try { conn.close(); } catch { case e:Exception => log.error("敞开连贯对象Connection异样!" + e.getMessage(), e); } } }}
- Kafka生产者:KafkaProducerTest
object KafkaProducerTest { def main(args: Array[String]): Unit = { val props : Properties = new Properties() props.put("bootstrap.servers", ConfigConstants.kafkaBrokers) props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer]) props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer]) props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer]) props.put("key.serializer",ConfigConstants.kafkaKeySer) props.put("value.serializer", ConfigConstants.kafkaValueSer) val producer : Producer[String, String] = new KafkaProducer[String, String](props) val startTime : Long = System.currentTimeMillis() for ( i <- 1 to 100) { producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i))) } println("耗费工夫:" + (System.currentTimeMillis() - startTime)) producer.close() }}
- 读取和保留Offset:
该对象的作用是从外部设备中读取和写入Offset,包含MySQL和Redis
object OffsetReadAndSave { /** * 从MySQL中获取偏移量 * * @param groupid * @param topic * @return */ def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = { val conn = JDBCConnPool.getConnection() val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?" val ppst = conn.prepareStatement(selectSql) ppst.setString(1, groupid) ppst.setString(2, topic) val result: ResultSet = ppst.executeQuery() // 主题分区偏移量 val topicPartitionOffset = mutable.Map[TopicPartition, Long]() while (result.next()) { val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition")) topicPartitionOffset += (topicPartition -> result.getLong("offset")) } JDBCConnPool.closeConnection(ppst, conn) topicPartitionOffset } /** * 从Redis中获取偏移量 * * @param groupid * @param topic * @return */ def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = { val jedis: Jedis = JedisConnPool.getConnection() var offsets = mutable.Map[TopicPartition, Long]() val key = s"${topic}_${groupid}" val fields : java.util.Map[String, String] = jedis.hgetAll(key) for (partition <- JavaConversions.mapAsScalaMap(fields)) { offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong) } offsets.toMap } /** * 将偏移量写入MySQL * * @param groupid 消费者组ID * @param offsetRange 音讯偏移量范畴 */ def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = { val conn = JDBCConnPool.getConnection() val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)" val ppst = conn.prepareStatement(insertSql) for (offset <- offsetRange) { ppst.setString(1, offset.topic) ppst.setInt(2, offset.partition) ppst.setString(3, groupid) ppst.setLong(4, offset.untilOffset) ppst.executeUpdate() } JDBCConnPool.closeConnection(ppst, conn) } /** * 将偏移量保留到Redis中 * @param groupid * @param offsetRange */ def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = { val jedis :Jedis = JedisConnPool.getConnection() for(offsetRange<-offsetRange){ val topic=offsetRange.topic val partition=offsetRange.partition val offset=offsetRange.untilOffset // key为topic_groupid,field为partition,value为offset jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString) } }}
- 业务解决类
该对象是业务解决逻辑,次要是生产Kafka数据,再解决之后进行手动将偏移量保留到MySQL中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在Offset,则从以后的Offset去生产。
察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。
object ManualCommitOffset { def main(args: Array[String]): Unit = { val brokers = ConfigConstants.kafkaBrokers val groupId = ConfigConstants.groupId val topics = ConfigConstants.kafkaTopics val batchInterval = ConfigConstants.batchInterval val conf = new SparkConf() .setAppName(ManualCommitOffset.getClass.getSimpleName) .setMaster("local[1]") .set("spark.serializer",ConfigConstants.sparkSerializer) val ssc = new StreamingContext(conf, batchInterval) // 必须开启checkpoint,否则会报错 ssc.checkpoint(ConfigConstants.checkpointDir) ssc.sparkContext.setLogLevel("OFF") //应用broker和topic创立direct kafka stream val topicSet = topics.split(" ").toSet // kafka连贯参数 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) // 从MySQL中读取该主题对应的消费者组的分区偏移量 val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics) var inputDStream: InputDStream[ConsumerRecord[String, String]] = null //如果MySQL中曾经存在了偏移量,则应该从该偏移量处开始生产 if (offsetMap.size > 0) { println("存在偏移量,从该偏移量处进行生产!!") inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap)) } else { //如果MySQL中没有存在了偏移量,从最早开始生产 inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } // checkpoint工夫距离,必须是batchInterval的整数倍 inputDStream.checkpoint(ConfigConstants.checkpointInterval) // 保留batch的offset var offsetRanges = Array[OffsetRange]() // 获取以后DS的音讯偏移量 val transformDS = inputDStream.transform { rdd => // 获取offset offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } /** * 状态更新函数 * @param newValues:新的value值 * @param stateValue:状态值 * @return */ def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = { var oldvalue = stateValue.getOrElse(0) // 获取状态值 // 遍历以后数据,并更新状态 for (newValue <- newValues) { oldvalue += newValue } // 返回最新的状态 Option(oldvalue) } // 业务逻辑解决 // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据 transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print() // 打印偏移量和数据信息,察看输入的后果 transformDS.foreachRDD { (rdd, time) => // 遍历打印该RDD数据 rdd.foreach { record => println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}") } // 打印生产偏移量信息 for (o <- offsetRanges) { println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}") } //将偏移量保留到到MySQL中 OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges) } ssc.start() ssc.awaitTermination() }}
如何应用Redis治理Kafka的OffSet
- Redis连贯类
object JedisConnPool { val config = new JedisPoolConfig //最大连接数 config.setMaxTotal(60) //最大闲暇连接数 config.setMaxIdle(10) config.setTestOnBorrow(true) //服务器ip val redisAddress :String = ConfigConstants.redisAddress.toString // 端口号 val redisPort:Int = ConfigConstants.redisPort.toInt //拜访明码 val redisAuth :String = ConfigConstants.redisAuth.toString //期待可用连贯的最大工夫 val redisTimeout:Int = ConfigConstants.redisTimeout.toInt val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth) def getConnection():Jedis = { pool.getResource }}
- 业务逻辑解决
该对象与下面的根本相似,只不过应用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,根本格局为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。
object ManualCommitOffsetToRedis { def main(args: Array[String]): Unit = { val brokers = ConfigConstants.kafkaBrokers val groupId = ConfigConstants.groupId val topics = ConfigConstants.kafkaTopics val batchInterval = ConfigConstants.batchInterval val conf = new SparkConf() .setAppName(ManualCommitOffset.getClass.getSimpleName) .setMaster("local[1]") .set("spark.serializer", ConfigConstants.sparkSerializer) val ssc = new StreamingContext(conf, batchInterval) // 必须开启checkpoint,否则会报错 ssc.checkpoint(ConfigConstants.checkpointDir) ssc.sparkContext.setLogLevel("OFF") //应用broker和topic创立direct kafka stream val topicSet = topics.split(" ").toSet // kafka连贯参数 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) // 从Redis中读取该主题对应的消费者组的分区偏移量 val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics) var inputDStream: InputDStream[ConsumerRecord[String, String]] = null //如果Redis中曾经存在了偏移量,则应该从该偏移量处开始生产 if (offsetMap.size > 0) { println("存在偏移量,从该偏移量处进行生产!!") inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap)) } else { //如果Redis中没有存在了偏移量,从最早开始生产 inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } // checkpoint工夫距离,必须是batchInterval的整数倍 inputDStream.checkpoint(ConfigConstants.checkpointInterval) // 保留batch的offset var offsetRanges = Array[OffsetRange]() // 获取以后DS的音讯偏移量 val transformDS = inputDStream.transform { rdd => // 获取offset offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } /** * 状态更新函数 * * @param newValues :新的value值 * @param stateValue :状态值 * @return */ def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = { var oldvalue = stateValue.getOrElse(0) // 获取状态值 // 遍历以后数据,并更新状态 for (newValue <- newValues) { oldvalue += newValue } // 返回最新的状态 Option(oldvalue) } // 业务逻辑解决 // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据 transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print() // 打印偏移量和数据信息,察看输入的后果 transformDS.foreachRDD { (rdd, time) => // 遍历打印该RDD数据 rdd.foreach { record => println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}") } // 打印生产偏移量信息 for (o <- offsetRanges) { println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}") } //将偏移量保留到到Redis中 OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges) } ssc.start() ssc.awaitTermination() }}
总结
本文介绍了如何应用内部存储设备来保留Kafka的生产位点,通过具体的代码示例阐明了应用MySQL和Redis治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理Offset,比方Zookeeper和HBase等,其根本解决思路都十分相似。
Spark Streaming No Receivers 形式的createDirectStream 办法不应用接收器,而是创立输出流间接从Kafka 集群节点拉取音讯。输出流保障每个音讯从Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据(即Exactly Once语义),单纯的依附SparkStreaming自身的机制是不太现实的,生产环境中通常借助手动治理offset的形式来保护kafka的生产位点。本文分享将介绍如何手动治理Kafka的Offset,心愿对你有所帮忙。本文次要包含以下内容:
- 如何应用MySQL治理Kafka的Offset
- 如何应用Redis治理Kafka的OffSet
如何应用MySQL治理Kafka的Offset
咱们能够从Spark Streaming 应用程序中编写代码来手动治理Kafka偏移量,偏移量能够从每一批流解决中生成的RDDS偏移量来获取,获取形式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>// 获取偏移量val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges ... }
当获取到偏移量之后,能够将将其保留到内部存储设备中(MySQL、Redis、Zookeeper、HBase等)。
应用案例代码
- MySQL中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
- 常量配置类:ConfigConstants
object ConfigConstants { // Kafka配置 val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092" val groupId = "group_test" val kafkaTopics = "test" val batchInterval = Seconds(5) val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer" val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer" val sparkSerializer = "org.apache.spark.serializer.KryoSerializer" val batchSize = 16384 val lingerMs = 1 val bufferMemory = 33554432 // MySQL配置 val user = "root" val password = "123qwe" val url = "jdbc:mysql://localhost:3306/kafka_offset" val driver = "com.mysql.jdbc.Driver" // 检查点配置 val checkpointDir = "file:///e:/checkpoint" val checkpointInterval = Seconds(10) // Redis配置 val redisAddress = "192.168.10.203" val redisPort = 6379 val redisAuth = "123qwe" val redisTimeout = 3000}
- JDBC连贯工具类:JDBCConnPool
object JDBCConnPool { val log: Logger = Logger.getLogger(JDBCConnPool.getClass) var dataSource: BasicDataSource = null /** * 创立数据源 * * @return */ def getDataSource(): BasicDataSource = { if (dataSource == null) { dataSource = new BasicDataSource() dataSource.setDriverClassName(ConfigConstants.driver) dataSource.setUrl(ConfigConstants.url) dataSource.setUsername(ConfigConstants.user) dataSource.setPassword(ConfigConstants.password) dataSource.setMaxTotal(50) dataSource.setInitialSize(3) dataSource.setMinIdle(3) dataSource.setMaxIdle(10) dataSource.setMaxWaitMillis(2 * 10000) dataSource.setRemoveAbandonedTimeout(180) dataSource.setRemoveAbandonedOnBorrow(true) dataSource.setRemoveAbandonedOnMaintenance(true) dataSource.setTestOnReturn(true) dataSource.setTestOnBorrow(true) } return dataSource } /** * 开释数据源 */ def closeDataSource() = { if (dataSource != null) { dataSource.close() } } /** * 获取数据库连贯 * * @return */ def getConnection(): Connection = { var conn: Connection = null try { if (dataSource != null) { conn = dataSource.getConnection() } else { conn = getDataSource().getConnection() } } catch { case e: Exception => log.error(e.getMessage(), e) } conn } /** * 敞开连贯 */ def closeConnection (ps:PreparedStatement , conn:Connection ) { if (ps != null) { try { ps.close(); } catch { case e:Exception => log.error("预编译SQL语句对象PreparedStatement敞开异样!" + e.getMessage(), e); } } if (conn != null) { try { conn.close(); } catch { case e:Exception => log.error("敞开连贯对象Connection异样!" + e.getMessage(), e); } } }}
- Kafka生产者:KafkaProducerTest
object KafkaProducerTest { def main(args: Array[String]): Unit = { val props : Properties = new Properties() props.put("bootstrap.servers", ConfigConstants.kafkaBrokers) props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer]) props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer]) props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer]) props.put("key.serializer",ConfigConstants.kafkaKeySer) props.put("value.serializer", ConfigConstants.kafkaValueSer) val producer : Producer[String, String] = new KafkaProducer[String, String](props) val startTime : Long = System.currentTimeMillis() for ( i <- 1 to 100) { producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i))) } println("耗费工夫:" + (System.currentTimeMillis() - startTime)) producer.close() }}
- 读取和保留Offset:
该对象的作用是从外部设备中读取和写入Offset,包含MySQL和Redis
object OffsetReadAndSave { /** * 从MySQL中获取偏移量 * * @param groupid * @param topic * @return */ def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = { val conn = JDBCConnPool.getConnection() val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?" val ppst = conn.prepareStatement(selectSql) ppst.setString(1, groupid) ppst.setString(2, topic) val result: ResultSet = ppst.executeQuery() // 主题分区偏移量 val topicPartitionOffset = mutable.Map[TopicPartition, Long]() while (result.next()) { val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition")) topicPartitionOffset += (topicPartition -> result.getLong("offset")) } JDBCConnPool.closeConnection(ppst, conn) topicPartitionOffset } /** * 从Redis中获取偏移量 * * @param groupid * @param topic * @return */ def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = { val jedis: Jedis = JedisConnPool.getConnection() var offsets = mutable.Map[TopicPartition, Long]() val key = s"${topic}_${groupid}" val fields : java.util.Map[String, String] = jedis.hgetAll(key) for (partition <- JavaConversions.mapAsScalaMap(fields)) { offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong) } offsets.toMap } /** * 将偏移量写入MySQL * * @param groupid 消费者组ID * @param offsetRange 音讯偏移量范畴 */ def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = { val conn = JDBCConnPool.getConnection() val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)" val ppst = conn.prepareStatement(insertSql) for (offset <- offsetRange) { ppst.setString(1, offset.topic) ppst.setInt(2, offset.partition) ppst.setString(3, groupid) ppst.setLong(4, offset.untilOffset) ppst.executeUpdate() } JDBCConnPool.closeConnection(ppst, conn) } /** * 将偏移量保留到Redis中 * @param groupid * @param offsetRange */ def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = { val jedis :Jedis = JedisConnPool.getConnection() for(offsetRange<-offsetRange){ val topic=offsetRange.topic val partition=offsetRange.partition val offset=offsetRange.untilOffset // key为topic_groupid,field为partition,value为offset jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString) } }}
- 业务解决类
该对象是业务解决逻辑,次要是生产Kafka数据,再解决之后进行手动将偏移量保留到MySQL中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在Offset,则从以后的Offset去生产。
察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。
object ManualCommitOffset { def main(args: Array[String]): Unit = { val brokers = ConfigConstants.kafkaBrokers val groupId = ConfigConstants.groupId val topics = ConfigConstants.kafkaTopics val batchInterval = ConfigConstants.batchInterval val conf = new SparkConf() .setAppName(ManualCommitOffset.getClass.getSimpleName) .setMaster("local[1]") .set("spark.serializer",ConfigConstants.sparkSerializer) val ssc = new StreamingContext(conf, batchInterval) // 必须开启checkpoint,否则会报错 ssc.checkpoint(ConfigConstants.checkpointDir) ssc.sparkContext.setLogLevel("OFF") //应用broker和topic创立direct kafka stream val topicSet = topics.split(" ").toSet // kafka连贯参数 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) // 从MySQL中读取该主题对应的消费者组的分区偏移量 val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics) var inputDStream: InputDStream[ConsumerRecord[String, String]] = null //如果MySQL中曾经存在了偏移量,则应该从该偏移量处开始生产 if (offsetMap.size > 0) { println("存在偏移量,从该偏移量处进行生产!!") inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap)) } else { //如果MySQL中没有存在了偏移量,从最早开始生产 inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } // checkpoint工夫距离,必须是batchInterval的整数倍 inputDStream.checkpoint(ConfigConstants.checkpointInterval) // 保留batch的offset var offsetRanges = Array[OffsetRange]() // 获取以后DS的音讯偏移量 val transformDS = inputDStream.transform { rdd => // 获取offset offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } /** * 状态更新函数 * @param newValues:新的value值 * @param stateValue:状态值 * @return */ def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = { var oldvalue = stateValue.getOrElse(0) // 获取状态值 // 遍历以后数据,并更新状态 for (newValue <- newValues) { oldvalue += newValue } // 返回最新的状态 Option(oldvalue) } // 业务逻辑解决 // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据 transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print() // 打印偏移量和数据信息,察看输入的后果 transformDS.foreachRDD { (rdd, time) => // 遍历打印该RDD数据 rdd.foreach { record => println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}") } // 打印生产偏移量信息 for (o <- offsetRanges) { println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}") } //将偏移量保留到到MySQL中 OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges) } ssc.start() ssc.awaitTermination() }}
如何应用Redis治理Kafka的OffSet
- Redis连贯类
object JedisConnPool { val config = new JedisPoolConfig //最大连接数 config.setMaxTotal(60) //最大闲暇连接数 config.setMaxIdle(10) config.setTestOnBorrow(true) //服务器ip val redisAddress :String = ConfigConstants.redisAddress.toString // 端口号 val redisPort:Int = ConfigConstants.redisPort.toInt //拜访明码 val redisAuth :String = ConfigConstants.redisAuth.toString //期待可用连贯的最大工夫 val redisTimeout:Int = ConfigConstants.redisTimeout.toInt val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth) def getConnection():Jedis = { pool.getResource }}
- 业务逻辑解决
该对象与下面的根本相似,只不过应用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,根本格局为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。
object ManualCommitOffsetToRedis { def main(args: Array[String]): Unit = { val brokers = ConfigConstants.kafkaBrokers val groupId = ConfigConstants.groupId val topics = ConfigConstants.kafkaTopics val batchInterval = ConfigConstants.batchInterval val conf = new SparkConf() .setAppName(ManualCommitOffset.getClass.getSimpleName) .setMaster("local[1]") .set("spark.serializer", ConfigConstants.sparkSerializer) val ssc = new StreamingContext(conf, batchInterval) // 必须开启checkpoint,否则会报错 ssc.checkpoint(ConfigConstants.checkpointDir) ssc.sparkContext.setLogLevel("OFF") //应用broker和topic创立direct kafka stream val topicSet = topics.split(" ").toSet // kafka连贯参数 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" ) // 从Redis中读取该主题对应的消费者组的分区偏移量 val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics) var inputDStream: InputDStream[ConsumerRecord[String, String]] = null //如果Redis中曾经存在了偏移量,则应该从该偏移量处开始生产 if (offsetMap.size > 0) { println("存在偏移量,从该偏移量处进行生产!!") inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap)) } else { //如果Redis中没有存在了偏移量,从最早开始生产 inputDStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) } // checkpoint工夫距离,必须是batchInterval的整数倍 inputDStream.checkpoint(ConfigConstants.checkpointInterval) // 保留batch的offset var offsetRanges = Array[OffsetRange]() // 获取以后DS的音讯偏移量 val transformDS = inputDStream.transform { rdd => // 获取offset offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } /** * 状态更新函数 * * @param newValues :新的value值 * @param stateValue :状态值 * @return */ def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = { var oldvalue = stateValue.getOrElse(0) // 获取状态值 // 遍历以后数据,并更新状态 for (newValue <- newValues) { oldvalue += newValue } // 返回最新的状态 Option(oldvalue) } // 业务逻辑解决 // 该示例统计音讯key的个数,用于查看是否是从曾经提交的偏移量生产数据 transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print() // 打印偏移量和数据信息,察看输入的后果 transformDS.foreachRDD { (rdd, time) => // 遍历打印该RDD数据 rdd.foreach { record => println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}") } // 打印生产偏移量信息 for (o <- offsetRanges) { println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}") } //将偏移量保留到到Redis中 OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges) } ssc.start() ssc.awaitTermination() }}
总结
本文介绍了如何应用内部存储设备来保留Kafka的生产位点,通过具体的代码示例阐明了应用MySQL和Redis治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理Offset,比方Zookeeper和HBase等,其根本解决思路都十分相似。
公主号:大数据技术与数仓