关于数据库:如何应对SparkRedis行海量数据插入查询作业时碰到的问题

38次阅读

共计 3267 个字符,预计需要花费 9 分钟才能阅读完成。

摘要:因为 redis 是基于内存的数据库,稳定性并不是很高,尤其是 standalone 模式下的 redis。于是工作中在应用 Spark-Redis 时也会碰到很多问题,尤其是执行海量数据插入与查问的场景中。

海量数据查问

Redis 是基于内存读取的数据库,相比其它的数据库,Redis 的读取速度会更快。然而当咱们要查问上千万条的海量数据时,即便是 Redis 也须要破费较长时间。这时候如果咱们想要终止 select 作业的执行,咱们心愿的是所有的 running task 立刻 killed。

Spark 是有作业调度机制的。SparkContext 是 Spark 的入口,相当于应用程序的 main 函数。SparkContext 中的 cancelJobGroup 函数能够勾销正在运行的 job。

/**
  * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup`
  * for more information.
  */
 def cancelJobGroup(groupId: String) {assertNotStopped()
   dagScheduler.cancelJobGroup(groupId)
 }

按理说勾销 job 之后,job 下的所有 task 应该也终止。而且当咱们勾销 select 作业时,executor 会 throw TaskKilledException,而这个时候负责 task 作业的 TaskContext 在捕捉到该异样之后,会执行 killTaskIfInterrupted。

 // If this task has been killed before we deserialized it, let's quit now. Otherwise,
 // continue executing the task.
 val killReason = reasonIfKilled
 if (killReason.isDefined) {// Throw an exception rather than returning, because returning within a try{} block
   // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
   // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
   // for the task.
   throw new TaskKilledException(killReason.get)
 }
/**
 * If the task is interrupted, throws TaskKilledException with the reason for the interrupt.
 */
 private[spark] def killTaskIfInterrupted(): Unit

然而 Spark-Redis 中还是会呈现终止作业然而 task 依然 running。因为 task 的计算逻辑最终是在 RedisRDD 中实现的,RedisRDD 的 compute 会从 Jedis 中取获取 keys。所以说要解决这个问题,应该在 RedisRDD 中勾销正在 running 的 task。这里有两种办法:

办法一:参考 Spark 的 JDBCRDD,定义 close(),联合 InterruptibleIterator。

def close() {if (closed) return
   try {if (null != rs) {rs.close()
     }
   } catch {case e: Exception => logWarning("Exception closing resultset", e)
   }
   try {if (null != stmt) {stmt.close()
     }
   } catch {case e: Exception => logWarning("Exception closing statement", e)
   }
   try {if (null != conn) {if (!conn.isClosed && !conn.getAutoCommit) {
         try {conn.commit()
         } catch {case NonFatal(e) => logWarning("Exception committing transaction", e)
         }
       }
       conn.close()}
     logInfo("closed connection")
   } catch {case e: Exception => logWarning("Exception closing connection", e)
   }
   closed = true
 }
 
 context.addTaskCompletionListener{context => close() } 
CompletionIterator[InternalRow, Iterator[InternalRow]](new InterruptibleIterator(context, rowsIterator), close())

办法二:异步线程执行 compute,主线程中判断 task isInterrupted

try{val thread = new Thread() {override def run(): Unit = {
       try {keys = doCall} catch {
         case e =>
           logWarning(s"execute http require failed.")
       }
       isRequestFinished = true
     }
   }
 
   // control the http request for quite if user interrupt the job
   thread.start()
   while (!context.isInterrupted() && !isRequestFinished) {Thread.sleep(GetKeysWaitInterval)
   }
   if (context.isInterrupted() && !isRequestFinished) {logInfo(s"try to kill task ${context.getKillReason()}")
     context.killTaskIfInterrupted()}
   thread.join()
   CompletionIterator[T, Iterator[T]](new InterruptibleIterator(context, keys), close)

咱们能够异步线程来执行 compute,而后在另外的线程中判断是否 task isInterrupted,如果是的话就执行 TaskContext 的 killTaskIfInterrupted。避免 killTaskIfInterrupted 无奈杀掉 task,再联合 InterruptibleIterator:一种迭代器,以提供工作终止性能。通过查看 [TaskContext] 中的中断标记来工作。

海量数据插入

咱们都曾经 redis 的数据是保留在内存中的。当然 Redis 也反对长久化,能够将数据备份到硬盘中。当插入海量数据时,如果 Redis 的内存不够的话,很显然会失落局部数据。这里让使用者困惑的点在于:当 Redis 已应用内存大于最大可用内存时,Redis 会报错:command not allowed when used memory >‘maxmemory’。然而当 insert job 的数据大于 Redis 的可用内存时,局部数据失落了,并且还没有任何报错。

因为不论是 Jedis 客户端还是 Redis 服务器,当插入数据时内存不够,不会插入胜利,但也不会返回任何 response。所以目前能想到的解决办法就是当 insert 数据失落时,扩充 Redis 内存。

总结

Spark-Redis 是一个利用还不是很宽泛的开源我的项目,不像 Spark JDBC 那样曾经商业化。所以 Spark-Redis 还是存在很多问题。置信随着 commiter 的致力,Spark-Redis 也会越来越弱小。

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0