随着大数据时代的到来,以及kubernetes的愈发炽热,好多公司曾经把spark利用从yarn迁徙到k8s,当然也踩了不少的坑,     当初咱们来剖析一下spark on k8s的DynamicResourceAllocation这个坑

留神:该文基于spark 3.0.0剖析

spark on yarn 中的DynamicResourceAllocation

spark on yarn对于DynamicResourceAllocation调配来说,从spark 1.2版本就曾经开始反对了.
对于spark相熟的人都晓得,如果咱们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,
对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:

<property>   <name>yarn.nodemanager.aux-services</name>   <value>spark_shuffle</value></property><property>   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>   <value>org.apache.spark.network.yarn.YarnShuffleService</value></property><property>   <name>spark.shuffle.service.port</name>   <value>7337</value></property>

重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark利用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的成果

咱们间接从CoarseGrainedExecutorBackend中SparkEnv创立开始说,每一个executor的启动,必然会通过CoarseGrainedExecutorBackend main办法,而main中就波及到SparkEnv的创立

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的创立就波及到BlockManager的创立。沿着代码往下走,最终

val blockTransferService =     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)val blockManager = new BlockManager(     executorId,     rpcEnv,     blockManagerMaster,     serializerManager,     conf,     memoryManager,     mapOutputTracker,     shuffleManager,     blockTransferService,     securityManager,     externalShuffleClient)

在blockManager的initialize办法中,就会进行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {     registerWithExternalShuffleServer()   }

如果咱们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把以后的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(     diskBlockManager.localDirsString,     diskBlockManager.subDirsPerLocalDir,     shuffleManager.getClass.getName)

这里我重点剖析一下registerWithExternalShuffleServer的办法中的以下片段

// Synchronous and will throw an exception if we cannot connect.       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)            

该代码中shuffleServerId来自于:

shuffleServerId = if (externalShuffleServiceEnabled) {     logInfo(s"external shuffle service port = $externalShuffleServicePort")     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)   } else {     blockManagerId   }

而blockTransferService.hostName 是咱们在SparkEnv中创立的时候由advertiseAddress传过来的,
最终由CoarseGrainedExecutorBackend 主类参数hostname过去的,那到底怎么传过来的呢?
参照ExecutorRunnable的prepareCommand办法,

val commands = prefixEnv ++     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++     javaOpts ++     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",       "--driver-url", masterAddress,       "--executor-id", executorId,       "--hostname", hostname,       "--cores", executorCores.toString,       "--app-id", appId,       "--resourceProfileId", resourceProfileId.toString) ++

而这个hostname的值最终由YarnAllocator的办法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

传递过去的,也就是说咱们最终获取到了yarn节点,也就是nodeManager的host
这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动静资源分配的
ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动静资源分配的NettyBlockTransferService大同小异了

spark on k8s(kubernetes) 中的DynamicResourceAllocation

参考之前的文章,咱们晓得在entrypoint中咱们在启动executor的时候,咱们传递了hostname参数

executor)    shift 1    CMD=(      ${JAVA_HOME}/bin/java      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"      -Xms$SPARK_EXECUTOR_MEMORY      -Xmx$SPARK_EXECUTOR_MEMORY      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"      org.apache.spark.executor.CoarseGrainedExecutorBackend      --driver-url $SPARK_DRIVER_URL      --executor-id $SPARK_EXECUTOR_ID      --cores $SPARK_EXECUTOR_CORES      --app-id $SPARK_APPLICATION_ID      --hostname $SPARK_EXECUTOR_POD_IP    )

而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:

Seq(new EnvVarBuilder()          .withName(ENV_EXECUTOR_POD_IP)          .withValueFrom(new EnvVarSourceBuilder()            .withNewFieldRef("v1", "status.podIP")            .build())          .build())

这样依照以上流程的剖析,
executor也不能向k8s节点ExternalShuffleService服务注册,因为咱们注册的节点是POD IP,而不是节点IP,
当然spark社区早就提出了未开启external shuffle service的动静资源分配,且曾经合并到master分支.
具体配置,能够参照如下:

spark.dynamicAllocation.enabled  true spark.dynamicAllocation.shuffleTracking.enabled  truespark.dynamicAllocation.minExecutors  1spark.dynamicAllocation.maxExecutors  4spark.dynamicAllocation.executorIdleTimeout     60s
本文由博客群发一文多发等经营工具平台 OpenWrite 公布