Spark Streaming No Receivers 形式的 createDirectStream 办法不应用接收器,而是创立输出流间接从 Kafka 集群节点拉取音讯。输出流保障每个音讯从 Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据 (即 Exactly Once 语义),单纯的依附 SparkStreaming 自身的机制是不太现实的,生产环境中通常借助手动治理 offset 的形式来保护 kafka 的生产位点。本文分享将介绍如何手动治理 Kafka 的 Offset,心愿对你有所帮忙。本文次要包含以下内容:
- 如何应用 MySQL 治理 Kafka 的 Offset
- 如何应用 Redis 治理 Kafka 的 OffSet
如何应用 MySQL 治理 Kafka 的 Offset
咱们能够从 Spark Streaming 应用程序中编写代码来手动治理 Kafka 偏移量,偏移量能够从每一批流解决中生成的 RDDS 偏移量来获取,获取形式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
当获取到偏移量之后,能够将将其保留到内部存储设备中 (MySQL、Redis、Zookeeper、HBase 等)。
应用案例代码
- MySQL 中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` (`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
- 常量配置类:ConfigConstants
object ConfigConstants {
// Kafka 配置
val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
val groupId = "group_test"
val kafkaTopics = "test"
val batchInterval = Seconds(5)
val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
val batchSize = 16384
val lingerMs = 1
val bufferMemory = 33554432
// MySQL 配置
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/kafka_offset"
val driver = "com.mysql.jdbc.Driver"
// 检查点配置
val checkpointDir = "file:///e:/checkpoint"
val checkpointInterval = Seconds(10)
// Redis 配置
val redisAddress = "192.168.10.203"
val redisPort = 6379
val redisAuth = "123qwe"
val redisTimeout = 3000
}
- JDBC 连贯工具类:JDBCConnPool
object JDBCConnPool {val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
var dataSource: BasicDataSource = null
/**
* 创立数据源
*
* @return
*/
def getDataSource(): BasicDataSource = {if (dataSource == null) {dataSource = new BasicDataSource()
dataSource.setDriverClassName(ConfigConstants.driver)
dataSource.setUrl(ConfigConstants.url)
dataSource.setUsername(ConfigConstants.user)
dataSource.setPassword(ConfigConstants.password)
dataSource.setMaxTotal(50)
dataSource.setInitialSize(3)
dataSource.setMinIdle(3)
dataSource.setMaxIdle(10)
dataSource.setMaxWaitMillis(2 * 10000)
dataSource.setRemoveAbandonedTimeout(180)
dataSource.setRemoveAbandonedOnBorrow(true)
dataSource.setRemoveAbandonedOnMaintenance(true)
dataSource.setTestOnReturn(true)
dataSource.setTestOnBorrow(true)
}
return dataSource
}
/**
* 开释数据源
*/
def closeDataSource() = {if (dataSource != null) {dataSource.close()
}
}
/**
* 获取数据库连贯
*
* @return
*/
def getConnection(): Connection = {
var conn: Connection = null
try {if (dataSource != null) {conn = dataSource.getConnection()
} else {conn = getDataSource().getConnection()}
} catch {
case e: Exception =>
log.error(e.getMessage(), e)
}
conn
}
/**
* 敞开连贯
*/
def closeConnection (ps:PreparedStatement , conn:Connection) {if (ps != null) {
try {ps.close();
} catch {
case e:Exception =>
log.error("预编译 SQL 语句对象 PreparedStatement 敞开异样!" + e.getMessage(), e);
}
}
if (conn != null) {
try {conn.close();
} catch {
case e:Exception =>
log.error("敞开连贯对象 Connection 异样!" + e.getMessage(), e);
}
}
}
}
- Kafka 生产者:KafkaProducerTest
object KafkaProducerTest {def main(args: Array[String]): Unit = {val props : Properties = new Properties()
props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
props.put("key.serializer",ConfigConstants.kafkaKeySer)
props.put("value.serializer", ConfigConstants.kafkaValueSer)
val producer : Producer[String, String] = new KafkaProducer[String, String](props)
val startTime : Long = System.currentTimeMillis()
for (i <- 1 to 100) {producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
}
println("耗费工夫:" + (System.currentTimeMillis() - startTime))
producer.close()}
}
- 读取和保留 Offset:
该对象的作用是从外部设备中读取和写入 Offset,包含 MySQL 和 Redis
object OffsetReadAndSave {
/**
* 从 MySQL 中获取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {val conn = JDBCConnPool.getConnection()
val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
val ppst = conn.prepareStatement(selectSql)
ppst.setString(1, groupid)
ppst.setString(2, topic)
val result: ResultSet = ppst.executeQuery()
// 主题分区偏移量
val topicPartitionOffset = mutable.Map[TopicPartition, Long]()
while (result.next()) {val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
topicPartitionOffset += (topicPartition -> result.getLong("offset"))
}
JDBCConnPool.closeConnection(ppst, conn)
topicPartitionOffset
}
/**
* 从 Redis 中获取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {val jedis: Jedis = JedisConnPool.getConnection()
var offsets = mutable.Map[TopicPartition, Long]()
val key = s"${topic}_${groupid}"
val fields : java.util.Map[String, String] = jedis.hgetAll(key)
for (partition <- JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
}
offsets.toMap
}
/**
* 将偏移量写入 MySQL
*
* @param groupid 消费者组 ID
* @param offsetRange 音讯偏移量范畴
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val conn = JDBCConnPool.getConnection()
val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
val ppst = conn.prepareStatement(insertSql)
for (offset <- offsetRange) {ppst.setString(1, offset.topic)
ppst.setInt(2, offset.partition)
ppst.setString(3, groupid)
ppst.setLong(4, offset.untilOffset)
ppst.executeUpdate()}
JDBCConnPool.closeConnection(ppst, conn)
}
/**
* 将偏移量保留到 Redis 中
* @param groupid
* @param offsetRange
*/
def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {val jedis :Jedis = JedisConnPool.getConnection()
for(offsetRange<-offsetRange){
val topic=offsetRange.topic
val partition=offsetRange.partition
val offset=offsetRange.untilOffset
// key 为 topic_groupid,field 为 partition,value 为 offset
jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
}
}
}
- 业务解决类
该对象是业务解决逻辑,次要是生产 Kafka 数据,再解决之后进行手动将偏移量保留到 MySQL 中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在 Offset,则从以后的 Offset 去生产。
察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。
object ManualCommitOffset {def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer",ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必须开启 checkpoint, 否则会报错
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
// 应用 broker 和 topic 创立 direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka 连贯参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 从 MySQL 中读取该主题对应的消费者组的分区偏移量
val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
// 如果 MySQL 中曾经存在了偏移量, 则应该从该偏移量处开始生产
if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行生产!!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
// 如果 MySQL 中没有存在了偏移量,从最早开始生产
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint 工夫距离,必须是 batchInterval 的整数倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保留 batch 的 offset
var offsetRanges = Array[OffsetRange]()
// 获取以后 DS 的音讯偏移量
val transformDS = inputDStream.transform { rdd =>
// 获取 offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 状态更新函数
* @param newValues: 新的 value 值
* @param stateValue:状态值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值
// 遍历以后数据,并更新状态
for (newValue <- newValues) {oldvalue += newValue}
// 返回最新的状态
Option(oldvalue)
}
// 业务逻辑解决
// 该示例统计音讯 key 的个数,用于查看是否是从曾经提交的偏移量生产数据
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和数据信息,察看输入的后果
transformDS.foreachRDD {(rdd, time) =>
// 遍历打印该 RDD 数据
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印生产偏移量信息
for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
// 将偏移量保留到到 MySQL 中
OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()}
}
如何应用 Redis 治理 Kafka 的 OffSet
- Redis 连贯类
object JedisConnPool {
val config = new JedisPoolConfig
// 最大连接数
config.setMaxTotal(60)
// 最大闲暇连接数
config.setMaxIdle(10)
config.setTestOnBorrow(true)
// 服务器 ip
val redisAddress :String = ConfigConstants.redisAddress.toString
// 端口号
val redisPort:Int = ConfigConstants.redisPort.toInt
// 拜访明码
val redisAuth :String = ConfigConstants.redisAuth.toString
// 期待可用连贯的最大工夫
val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
def getConnection():Jedis = {pool.getResource}
}
- 业务逻辑解决
该对象与下面的根本相似,只不过应用的是 Redis 来进行存储 Offset,存储到 Redis 的数据类型是 Hash,根本格局为:[key field value] -> [topic_groupid partition offset],即 key 为 topic_groupid,field 为 partition,value 为 offset。
object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer", ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必须开启 checkpoint, 否则会报错
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
// 应用 broker 和 topic 创立 direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka 连贯参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 从 Redis 中读取该主题对应的消费者组的分区偏移量
val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
// 如果 Redis 中曾经存在了偏移量, 则应该从该偏移量处开始生产
if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行生产!!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
// 如果 Redis 中没有存在了偏移量,从最早开始生产
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint 工夫距离,必须是 batchInterval 的整数倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保留 batch 的 offset
var offsetRanges = Array[OffsetRange]()
// 获取以后 DS 的音讯偏移量
val transformDS = inputDStream.transform { rdd =>
// 获取 offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 状态更新函数
*
* @param newValues : 新的 value 值
* @param stateValue:状态值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值
// 遍历以后数据,并更新状态
for (newValue <- newValues) {oldvalue += newValue}
// 返回最新的状态
Option(oldvalue)
}
// 业务逻辑解决
// 该示例统计音讯 key 的个数,用于查看是否是从曾经提交的偏移量生产数据
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和数据信息,察看输入的后果
transformDS.foreachRDD {(rdd, time) =>
// 遍历打印该 RDD 数据
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印生产偏移量信息
for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
// 将偏移量保留到到 Redis 中
OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()}
}
总结
本文介绍了如何应用内部存储设备来保留 Kafka 的生产位点,通过具体的代码示例阐明了应用 MySQL 和 Redis 治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理 Offset,比方 Zookeeper 和 HBase 等,其根本解决思路都十分相似。
Spark Streaming No Receivers 形式的 createDirectStream 办法不应用接收器,而是创立输出流间接从 Kafka 集群节点拉取音讯。输出流保障每个音讯从 Kafka 集群拉取当前只齐全转换一次,保障语义一致性。然而当作业产生故障或重启时,要保障从以后的生产位点去解决数据 (即 Exactly Once 语义),单纯的依附 SparkStreaming 自身的机制是不太现实的,生产环境中通常借助手动治理 offset 的形式来保护 kafka 的生产位点。本文分享将介绍如何手动治理 Kafka 的 Offset,心愿对你有所帮忙。本文次要包含以下内容:
- 如何应用 MySQL 治理 Kafka 的 Offset
- 如何应用 Redis 治理 Kafka 的 OffSet
如何应用 MySQL 治理 Kafka 的 Offset
咱们能够从 Spark Streaming 应用程序中编写代码来手动治理 Kafka 偏移量,偏移量能够从每一批流解决中生成的 RDDS 偏移量来获取,获取形式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
当获取到偏移量之后,能够将将其保留到内部存储设备中 (MySQL、Redis、Zookeeper、HBase 等)。
应用案例代码
- MySQL 中用于保留偏移量的表
CREATE TABLE `topic_par_group_offset` (`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
- 常量配置类:ConfigConstants
object ConfigConstants {
// Kafka 配置
val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
val groupId = "group_test"
val kafkaTopics = "test"
val batchInterval = Seconds(5)
val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
val batchSize = 16384
val lingerMs = 1
val bufferMemory = 33554432
// MySQL 配置
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/kafka_offset"
val driver = "com.mysql.jdbc.Driver"
// 检查点配置
val checkpointDir = "file:///e:/checkpoint"
val checkpointInterval = Seconds(10)
// Redis 配置
val redisAddress = "192.168.10.203"
val redisPort = 6379
val redisAuth = "123qwe"
val redisTimeout = 3000
}
- JDBC 连贯工具类:JDBCConnPool
object JDBCConnPool {val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
var dataSource: BasicDataSource = null
/**
* 创立数据源
*
* @return
*/
def getDataSource(): BasicDataSource = {if (dataSource == null) {dataSource = new BasicDataSource()
dataSource.setDriverClassName(ConfigConstants.driver)
dataSource.setUrl(ConfigConstants.url)
dataSource.setUsername(ConfigConstants.user)
dataSource.setPassword(ConfigConstants.password)
dataSource.setMaxTotal(50)
dataSource.setInitialSize(3)
dataSource.setMinIdle(3)
dataSource.setMaxIdle(10)
dataSource.setMaxWaitMillis(2 * 10000)
dataSource.setRemoveAbandonedTimeout(180)
dataSource.setRemoveAbandonedOnBorrow(true)
dataSource.setRemoveAbandonedOnMaintenance(true)
dataSource.setTestOnReturn(true)
dataSource.setTestOnBorrow(true)
}
return dataSource
}
/**
* 开释数据源
*/
def closeDataSource() = {if (dataSource != null) {dataSource.close()
}
}
/**
* 获取数据库连贯
*
* @return
*/
def getConnection(): Connection = {
var conn: Connection = null
try {if (dataSource != null) {conn = dataSource.getConnection()
} else {conn = getDataSource().getConnection()}
} catch {
case e: Exception =>
log.error(e.getMessage(), e)
}
conn
}
/**
* 敞开连贯
*/
def closeConnection (ps:PreparedStatement , conn:Connection) {if (ps != null) {
try {ps.close();
} catch {
case e:Exception =>
log.error("预编译 SQL 语句对象 PreparedStatement 敞开异样!" + e.getMessage(), e);
}
}
if (conn != null) {
try {conn.close();
} catch {
case e:Exception =>
log.error("敞开连贯对象 Connection 异样!" + e.getMessage(), e);
}
}
}
}
- Kafka 生产者:KafkaProducerTest
object KafkaProducerTest {def main(args: Array[String]): Unit = {val props : Properties = new Properties()
props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
props.put("key.serializer",ConfigConstants.kafkaKeySer)
props.put("value.serializer", ConfigConstants.kafkaValueSer)
val producer : Producer[String, String] = new KafkaProducer[String, String](props)
val startTime : Long = System.currentTimeMillis()
for (i <- 1 to 100) {producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
}
println("耗费工夫:" + (System.currentTimeMillis() - startTime))
producer.close()}
}
- 读取和保留 Offset:
该对象的作用是从外部设备中读取和写入 Offset,包含 MySQL 和 Redis
object OffsetReadAndSave {
/**
* 从 MySQL 中获取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {val conn = JDBCConnPool.getConnection()
val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
val ppst = conn.prepareStatement(selectSql)
ppst.setString(1, groupid)
ppst.setString(2, topic)
val result: ResultSet = ppst.executeQuery()
// 主题分区偏移量
val topicPartitionOffset = mutable.Map[TopicPartition, Long]()
while (result.next()) {val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))
topicPartitionOffset += (topicPartition -> result.getLong("offset"))
}
JDBCConnPool.closeConnection(ppst, conn)
topicPartitionOffset
}
/**
* 从 Redis 中获取偏移量
*
* @param groupid
* @param topic
* @return
*/
def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {val jedis: Jedis = JedisConnPool.getConnection()
var offsets = mutable.Map[TopicPartition, Long]()
val key = s"${topic}_${groupid}"
val fields : java.util.Map[String, String] = jedis.hgetAll(key)
for (partition <- JavaConversions.mapAsScalaMap(fields)) {offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
}
offsets.toMap
}
/**
* 将偏移量写入 MySQL
*
* @param groupid 消费者组 ID
* @param offsetRange 音讯偏移量范畴
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val conn = JDBCConnPool.getConnection()
val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
val ppst = conn.prepareStatement(insertSql)
for (offset <- offsetRange) {ppst.setString(1, offset.topic)
ppst.setInt(2, offset.partition)
ppst.setString(3, groupid)
ppst.setLong(4, offset.untilOffset)
ppst.executeUpdate()}
JDBCConnPool.closeConnection(ppst, conn)
}
/**
* 将偏移量保留到 Redis 中
* @param groupid
* @param offsetRange
*/
def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {val jedis :Jedis = JedisConnPool.getConnection()
for(offsetRange<-offsetRange){
val topic=offsetRange.topic
val partition=offsetRange.partition
val offset=offsetRange.untilOffset
// key 为 topic_groupid,field 为 partition,value 为 offset
jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
}
}
}
- 业务解决类
该对象是业务解决逻辑,次要是生产 Kafka 数据,再解决之后进行手动将偏移量保留到 MySQL 中。在启动程序时,会判断内部存储设备中是否存在偏移量,如果是首次启动则从最后的生产位点生产,如果存在 Offset,则从以后的 Offset 去生产。
察看景象:当首次启动时会从头生产数据,手动进行程序,而后再次启动,会发现会从以后提交的偏移量生产数据。
object ManualCommitOffset {def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer",ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必须开启 checkpoint, 否则会报错
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
// 应用 broker 和 topic 创立 direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka 连贯参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 从 MySQL 中读取该主题对应的消费者组的分区偏移量
val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
// 如果 MySQL 中曾经存在了偏移量, 则应该从该偏移量处开始生产
if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行生产!!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
// 如果 MySQL 中没有存在了偏移量,从最早开始生产
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint 工夫距离,必须是 batchInterval 的整数倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保留 batch 的 offset
var offsetRanges = Array[OffsetRange]()
// 获取以后 DS 的音讯偏移量
val transformDS = inputDStream.transform { rdd =>
// 获取 offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 状态更新函数
* @param newValues: 新的 value 值
* @param stateValue:状态值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值
// 遍历以后数据,并更新状态
for (newValue <- newValues) {oldvalue += newValue}
// 返回最新的状态
Option(oldvalue)
}
// 业务逻辑解决
// 该示例统计音讯 key 的个数,用于查看是否是从曾经提交的偏移量生产数据
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和数据信息,察看输入的后果
transformDS.foreachRDD {(rdd, time) =>
// 遍历打印该 RDD 数据
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印生产偏移量信息
for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
// 将偏移量保留到到 MySQL 中
OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()}
}
如何应用 Redis 治理 Kafka 的 OffSet
- Redis 连贯类
object JedisConnPool {
val config = new JedisPoolConfig
// 最大连接数
config.setMaxTotal(60)
// 最大闲暇连接数
config.setMaxIdle(10)
config.setTestOnBorrow(true)
// 服务器 ip
val redisAddress :String = ConfigConstants.redisAddress.toString
// 端口号
val redisPort:Int = ConfigConstants.redisPort.toInt
// 拜访明码
val redisAuth :String = ConfigConstants.redisAuth.toString
// 期待可用连贯的最大工夫
val redisTimeout:Int = ConfigConstants.redisTimeout.toInt
val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)
def getConnection():Jedis = {pool.getResource}
}
- 业务逻辑解决
该对象与下面的根本相似,只不过应用的是 Redis 来进行存储 Offset,存储到 Redis 的数据类型是 Hash,根本格局为:[key field value] -> [topic_groupid partition offset],即 key 为 topic_groupid,field 为 partition,value 为 offset。
object ManualCommitOffsetToRedis {def main(args: Array[String]): Unit = {
val brokers = ConfigConstants.kafkaBrokers
val groupId = ConfigConstants.groupId
val topics = ConfigConstants.kafkaTopics
val batchInterval = ConfigConstants.batchInterval
val conf = new SparkConf()
.setAppName(ManualCommitOffset.getClass.getSimpleName)
.setMaster("local[1]")
.set("spark.serializer", ConfigConstants.sparkSerializer)
val ssc = new StreamingContext(conf, batchInterval)
// 必须开启 checkpoint, 否则会报错
ssc.checkpoint(ConfigConstants.checkpointDir)
ssc.sparkContext.setLogLevel("OFF")
// 应用 broker 和 topic 创立 direct kafka stream
val topicSet = topics.split(" ").toSet
// kafka 连贯参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
// 从 Redis 中读取该主题对应的消费者组的分区偏移量
val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
var inputDStream: InputDStream[ConsumerRecord[String, String]] = null
// 如果 Redis 中曾经存在了偏移量, 则应该从该偏移量处开始生产
if (offsetMap.size > 0) {println("存在偏移量,从该偏移量处进行生产!!")
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))
} else {
// 如果 Redis 中没有存在了偏移量,从最早开始生产
inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
}
// checkpoint 工夫距离,必须是 batchInterval 的整数倍
inputDStream.checkpoint(ConfigConstants.checkpointInterval)
// 保留 batch 的 offset
var offsetRanges = Array[OffsetRange]()
// 获取以后 DS 的音讯偏移量
val transformDS = inputDStream.transform { rdd =>
// 获取 offset
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
/**
* 状态更新函数
*
* @param newValues : 新的 value 值
* @param stateValue:状态值
* @return
*/
def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {var oldvalue = stateValue.getOrElse(0) // 获取状态值
// 遍历以后数据,并更新状态
for (newValue <- newValues) {oldvalue += newValue}
// 返回最新的状态
Option(oldvalue)
}
// 业务逻辑解决
// 该示例统计音讯 key 的个数,用于查看是否是从曾经提交的偏移量生产数据
transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()
// 打印偏移量和数据信息,察看输入的后果
transformDS.foreachRDD {(rdd, time) =>
// 遍历打印该 RDD 数据
rdd.foreach { record =>
println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
// 打印生产偏移量信息
for (o <- offsetRanges) {println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")
}
// 将偏移量保留到到 Redis 中
OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
}
ssc.start()
ssc.awaitTermination()}
}
总结
本文介绍了如何应用内部存储设备来保留 Kafka 的生产位点,通过具体的代码示例阐明了应用 MySQL 和 Redis 治理生产位点的形式。当然,内部存储设备很多,用户也能够应用其余的存储设备进行治理 Offset,比方 Zookeeper 和 HBase 等,其根本解决思路都十分相似。
公主号:大数据技术与数仓