摘要:因为redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中在应用Spark-Redis时也会碰到很多问题,尤其是执行海量数据插入与查问的场景中。
海量数据查问
Redis是基于内存读取的数据库,相比其它的数据库,Redis的读取速度会更快。然而当咱们要查问上千万条的海量数据时,即便是Redis也须要破费较长时间。这时候如果咱们想要终止select作业的执行,咱们心愿的是所有的running task立刻killed。
Spark是有作业调度机制的。SparkContext是Spark的入口,相当于应用程序的main函数。SparkContext中的cancelJobGroup函数能够勾销正在运行的job。
/**
* Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
* for more information.
*/
def cancelJobGroup(groupId: String) {
assertNotStopped()
dagScheduler.cancelJobGroup(groupId)
}
按理说勾销job之后,job下的所有task应该也终止。而且当咱们勾销select作业时,executor会throw TaskKilledException,而这个时候负责task作业的TaskContext在捕捉到该异样之后,会执行killTaskIfInterrupted。
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
val killReason = reasonIfKilled
if (killReason.isDefined) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException(killReason.get)
}
/**
* If the task is interrupted, throws TaskKilledException with the reason for the interrupt.
*/
private[spark] def killTaskIfInterrupted(): Unit
然而Spark-Redis中还是会呈现终止作业然而task依然running。因为task的计算逻辑最终是在RedisRDD中实现的,RedisRDD的compute会从Jedis中取获取keys。所以说要解决这个问题,应该在RedisRDD中勾销正在running的task。这里有两种办法:
办法一:参考Spark的JDBCRDD,定义close(),联合InterruptibleIterator。
def close() {
if (closed) return
try {
if (null != rs) {
rs.close()
}
} catch {
case e: Exception => logWarning("Exception closing resultset", e)
}
try {
if (null != stmt) {
stmt.close()
}
} catch {
case e: Exception => logWarning("Exception closing statement", e)
}
try {
if (null != conn) {
if (!conn.isClosed && !conn.getAutoCommit) {
try {
conn.commit()
} catch {
case NonFatal(e) => logWarning("Exception committing transaction", e)
}
}
conn.close()
}
logInfo("closed connection")
} catch {
case e: Exception => logWarning("Exception closing connection", e)
}
closed = true
}
context.addTaskCompletionListener{ context => close() }
CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
办法二:异步线程执行compute,主线程中判断task isInterrupted
try{
val thread = new Thread() {
override def run(): Unit = {
try {
keys = doCall
} catch {
case e =>
logWarning(s"execute http require failed.")
}
isRequestFinished = true
}
}
// control the http request for quite if user interrupt the job
thread.start()
while (!context.isInterrupted() && !isRequestFinished) {
Thread.sleep(GetKeysWaitInterval)
}
if (context.isInterrupted() && !isRequestFinished) {
logInfo(s"try to kill task ${context.getKillReason()}")
context.killTaskIfInterrupted()
}
thread.join()
CompletionIterator[T, Iterator[T]](
new InterruptibleIterator(context, keys), close)
咱们能够异步线程来执行compute,而后在另外的线程中判断是否task isInterrupted,如果是的话就执行TaskContext的killTaskIfInterrupted。避免killTaskIfInterrupted无奈杀掉task,再联合InterruptibleIterator:一种迭代器,以提供工作终止性能。通过查看[TaskContext]中的中断标记来工作。
海量数据插入
咱们都曾经redis的数据是保留在内存中的。当然Redis也反对长久化,能够将数据备份到硬盘中。当插入海量数据时,如果Redis的内存不够的话,很显然会失落局部数据。这里让使用者困惑的点在于: 当Redis已应用内存大于最大可用内存时,Redis会报错:command not allowed when used memory > ‘maxmemory’。然而当insert job的数据大于Redis的可用内存时,局部数据失落了,并且还没有任何报错。
因为不论是Jedis客户端还是Redis服务器,当插入数据时内存不够,不会插入胜利,但也不会返回任何response。所以目前能想到的解决办法就是当insert数据失落时,扩充Redis内存。
总结
Spark-Redis是一个利用还不是很宽泛的开源我的项目,不像Spark JDBC那样曾经商业化。所以Spark-Redis还是存在很多问题。置信随着commiter的致力,Spark-Redis也会越来越弱小。
点击关注,第一工夫理解华为云陈腐技术~
发表回复