关于spark:spark-on-k8skubernetes-DynamicResourceAllocation

44次阅读

共计 4461 个字符,预计需要花费 12 分钟才能阅读完成。

 随着大数据时代的到来,以及 kubernetes 的愈发炽热,好多公司曾经把 spark 利用从 yarn 迁徙到 k8s,当然也踩了不少的坑,当初咱们来剖析一下 spark on k8s 的 DynamicResourceAllocation 这个坑

留神:该文基于 spark 3.0.0 剖析

spark on yarn 中的 DynamicResourceAllocation

spark on yarn 对于 DynamicResourceAllocation 调配来说,从 spark 1.2 版本就曾经开始反对了.
对于 spark 相熟的人都晓得,如果咱们要开启 DynamicResourceAllocation,就得有 ExternalShuffleService 服务,
对于 yarn 来说 ExternalShuffleService 是作为辅助服务开启的,具体配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>

<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>

<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重启 nodeManager,这样在 每个 nodeManager 节点就会启动一个 YarnShuffleService,之后在 spark 利用中设置 spark.dynamicAllocation.enabled 为 true, 这样就能达到运行时资源动态分配的成果

咱们间接从 CoarseGrainedExecutorBackend 中 SparkEnv 创立开始说,每一个 executor 的启动,必然会通过 CoarseGrainedExecutorBackend main 办法, 而 main 中就波及到 SparkEnv 的创立

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而 sparkEnv 的创立就波及到 BlockManager 的创立。沿着代码往下走,最终

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在 blockManager 的 initialize 办法中, 就会进行 registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()
   }

如果咱们开启了 ExternalShuffleService,对于 yarn 就是 YarnShuffleService,就会把以后的 ExecutorShuffleInfo 注册到 host 为 shuffleServerId.host, port 为 shuffleServerId.port 的 ExternalShuffleService 中,ExecutorShuffleInfo 的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

这里我重点剖析一下 registerWithExternalShuffleServer 的办法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)            

该代码中 shuffleServerId 来自于:

shuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {blockManagerId}

而 blockTransferService.hostName 是咱们在 SparkEnv 中创立的时候由 advertiseAddress 传过来的,
最终由 CoarseGrainedExecutorBackend 主类参数 hostname 过去的,那到底怎么传过来的呢?
参照 ExecutorRunnable 的 prepareCommand 办法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而这个 hostname 的值最终由 YarnAllocator 的办法 runAllocatedContainers

val executorHostname = container.getNodeId.getHost

传递过去的,也就是说咱们最终获取到了yarn 节点,也就是 nodeManager 的 host
这样每个启动的 executor,就向 executor 所在的 nodeManager 的 YarnShuffleService 注册了 ExecutorShuffleInfo 信息,这样对于开启了动静资源分配的
ExternalBlockStoreClient 来说 fetchBlocksg 过程就和未开启动静资源分配的 NettyBlockTransferService 大同小异了

spark on k8s(kubernetes) 中的 DynamicResourceAllocation

参考之前的文章, 咱们晓得在 entrypoint 中咱们在启动 executor 的时候,咱们传递了 hostname 参数

executor)
    shift 1
    CMD=(${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而 SPARK_EXECUTOR_POD_IP 是运行中的 POD IP, 参考 BasicExecutorFeatureStep 类片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

这样依照以上流程的剖析,
executor 也不能向 k8s 节点 ExternalShuffleService 服务注册,因为咱们注册的节点是 POD IP, 而不是节点 IP,
当然 spark 社区早就提出了未开启 external shuffle service 的动静资源分配, 且曾经合并到 master 分支.
具体配置,能够参照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout     60s

本文由博客群发一文多发等经营工具平台 OpenWrite 公布

正文完
 0