关于spark:spark性能优化一

28次阅读

共计 8936 个字符,预计需要花费 23 分钟才能阅读完成。

本文内容阐明

  • 初始化配置给 rdd 和 dataframe 带来的影响
  • repartition 的相干阐明
  • cache&persist 的相干阐明
  • 性能优化的阐明倡议以及实例

配置阐明

spark:2.4.0
服务器:5 台(8 核 32G)

初始化配置项

%%init_spark
launcher.master = "yarn"
launcher.conf.spark.app.name = "BDP-xw"
launcher.conf.spark.driver.cores = 1
launcher.conf.spark.driver.memory = '1g'
launcher.conf.spark.executor.instances = 3
launcher.conf.spark.executor.memory = '1g'
launcher.conf.spark.executor.cores = 2
launcher.conf.spark.default.parallelism = 5
launcher.conf.spark.dynamicAllocation.enabled = False
import org.apache.spark.sql.SparkSession
var NumExecutors = spark.conf.getOption("spark.num_executors").repr
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
var max_buffer = spark.conf.getOption("spark.kryoserializer.buffer.max").repr
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.functions.{udf, _}
import org.apache.spark.{SparkConf, SparkContext}


object LoadingData_from_files{def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {for( a <- etl_date){
            val hdfs_file_ = s"$hdfs_file" + a
            val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
            val num1 = rdd_20210113.count
            println(s"加载数据啦:$a RDD 的数据量是 $num1")
        }
        val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
        var num1 = rdd_20210113_test.count()
        println(s"加载数据啦:20210113 RDD 的数据量是 $num1")
        rdd_20210113_test.unpersist() // 解除长久化
        val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
        num1 = df_20210420.count() // 指定 memory 之后,cache 的数量太多之前 cache 的后果会被干掉
        println(s"加载数据啦:20210420 DataFrame 的数据量是 $num1")
    }
}

// 配置参数 multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113")
LoadingData_from_files.main(hdfs_file, etl_date)
  • 失去后果如下:

  • 后果剖析

    • 能够看到默认状况下,RDD 的缓存形式都是到 Memory 的,而 DataFrame 的缓存形式都是 Memory and Disk
    • 指定 memory 之后,cache 的数量太多之前 cache 的后果会被干掉

无特定配置项

import org.apache.spark.sql.SparkSession
var NumExecutors = spark.conf.getOption("spark.num_executors").repr
var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
var AppName = spark.conf.getOption("spark.app.name").repr
object LoadingData_from_files{def main(args: Tuple2[String, Array[String]]=Tuple2(hdfs_file, etl_date:Array[String])): Unit = {for( a <- etl_date){
            val hdfs_file_ = s"$hdfs_file" + a
            val rdd_20210113 = spark.sparkContext.textFile(hdfs_file_).cache()
            val num1 = rdd_20210113.count
            println(s"加载数据啦:$a RDD 的数据量是 $num1")
        }
        val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210328").cache()
        var num1 = rdd_20210113_test.count()
        println(s"加载数据啦:20210328 RDD 的数据量是 $num1")
        rdd_20210113_test.unpersist() // 解除长久化
        val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF.cache()
        num1 = df_20210420.count() // 指定 memory 之后,cache 的数量太多之前 cache 的后果会被干掉
        println(s"加载数据啦:20210420 DataFrame 的数据量是 $num1 \n 以后环境下 cache 的个数及 id 为:")
        spark.sparkContext.getPersistentRDDs.foreach(i=>println("cache 的 id:" + i._1))
    }
}

// 无配置参数 multiple_duplicated
val hdfs_file = "hdfs://path/etl_date="
val etl_date = Array("20210113","20210112","20210112","20210112","20210112","20210112", "20210113")
LoadingData_from_files.main(hdfs_file, etl_date)
  • 失去后果如下:
  • 后果剖析

    • spark 的配置文件中,设置的也是动静分配内存;
    • cache 的后果也是达到 memory 限度的时候,曾经 cache 的后果会主动隐没;
    • 上述例子中,咱们减少了 8 个文件,但最终只保留了 5 个 cache 的后果;

      • 通过 for 反复从一个文件取数,并 val 申明给雷同变量并 cache,后果是会被屡次保留在 memory 或者 Disk 中的;

查看以后服务下的所有缓存并删除

spark.sparkContext.getPersistentRDDs.foreach(i=>println(i._1))
spark.sparkContext.getPersistentRDDs.foreach(i=>{i._2.unpersist()})

repartition

  • repartition 只是 coalesce 接口中 shuffle 为 true 的实现
  • repartition 能够减少和缩小分区,而应用 coalesce 则只能缩小分区
  • 每个 block 的大小为默认的 128M
//RDD
rdd.getNumPartitions
rdd.partitions.length
rdd.partitions.size

// For DataFrame, convert to RDD first
df.rdd.getNumPartitions
df.rdd.partitions.length
df.rdd.partitions.size

RDD

  • 默认 cache 的级别是Memory
val hdfs_file = "hdfs://path1/etl_date="
val rdd_20210113_test = spark.sparkContext.textFile(hdfs_file + "20210113").cache()
// 文件大小为 1.5G
rdd_20210113_test.getNumPartitions
// res2: Int = 13
val rdd_20210113_test_par1 = rdd_20210113_test.repartition(5)
rdd_20210113_test_par1.partitions.size
// res9: Int = 5
val rdd_20210113_test_par2 = rdd_20210113_test_par1.coalesce(13)
rdd_20210113_test_par2.partitions.length
// res14: Int = 5 减少分区没失效
val rdd_20210113_test_par3 = rdd_20210113_test_par1.coalesce(3)
rdd_20210113_test_par3.partitions.length 
// res16: Int = 3 减少分区失效

DataFrame

默认 cache 的级别是Memory and Disk

val hdfs_file = "hdfs://path1/etl_date="
val df_20210420 = spark.sparkContext.textFile(hdfs_file + "20210113").toDF().cache()
df_20210420.rdd.getNumPartitions
// res18: Int = 13
val df_20210420_par1 = df_20210420.repartition(20)
df_20210420_par1.rdd.getNumPartitions
// res19: Int = 20 减少分区失效
val df_20210420_par2 = df_20210420_par1.coalesce(5)
df_20210420_par2.rdd.getNumPartitions
// res20: Int = 5

cache&persist 比照

  • cache 调用的是无参数版本的 persist()
  • persist 的阐明
import org.apache.spark.storage.StorageLevel._
// MEMORY_AND_DISK
val hdfs_file = "hdfs://path1/etl_date="
var etl_date = "20210113"
var hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_MEMORY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_AND_DISK)
println("DISK_ONLY 数据量为" + rdd_20210113_DISK_MEMORY.count())
// MEMORY_ONLY
etl_date = "20210112"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_MEMORY_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(MEMORY_ONLY)
println("MEMORY_ONLY 数据量为" + rdd_20210113_MEMORY_ONLY.count())
// DISK_ONLY
etl_date = "20210328"
hdfs_file_ = s"$hdfs_file" + etl_date
val rdd_20210113_DISK_ONLY = spark.sparkContext.textFile(hdfs_file_).persist(DISK_ONLY)
println("DISK_ONLY 数据量为" + rdd_20210113_DISK_ONLY.count())
// DISK_ONLY 数据量为 4298617
// MEMORY_ONLY 数据量为 86340
// DISK_ONLY 数据量为 20000

性能优化

参数阐明

参数配置倡议

  • 优化方面阐明

  • tips

    • yarn 集群中个别有资源申请下限,如,executor-memory*num-executors < 400G 等,所以调试参数时要留神这一点
    • 如果 GC 工夫较长,能够适当减少 –executor-memory 的值或者缩小 –executor-cores 的值
    • yarn 下每个 executor 须要的 memory = spark-executor-memory + spark.yarn.executor.memoryOverhead.
    • 个别须要为,后盾过程留下足够的 cores(个别每个节点留一个 core)。
    • Yarn ApplicationMaster (AM):ApplicationMaster 负责从 ResourceManager 申请资源并且和 NodeManagers 一起执行和监控 containers 和它们的资源耗费。如果咱们是 spark on yarn 模式,那么咱们须要为 ApplicationMaster 预留一些资源(1G 和 1 个 Executor)
    • num-executors 大(30),executor-cores 小(1)→ 每个 executor 只调配了一个核,将无奈运行多个工作的长处
    • num-executors 小(5),executor-cores 大(7)→ 每个 executor 调配了 7 个核,HDFS 吞吐量会受到影响。同时过大的内存调配也会导致过多的 GC 提早
    • Spark shell required memory = (Driver Memory + 384 MB) + (Number of executors * (Executor memory + 384 MB))

设置资源的示例

  • 资源分配状况【非动态分配资源模式下】

    • contain、CPUVcores、AllocaMemory 为 hdfs 下对具体的 application 的资源占用状况
    • executors、storage-memory 为 spark web-ui 下的 executor 状况

  • spark-shell 下资源占用状况

    • 次要查看指定 num-executor 以及 total-executor-cores 状况下,资源占用是否依然会动态变化
    • 还是会动态变化

  • spark-submit 下资源占用状况

    • 次要查看指定 num-executor 以及 total-executor-cores 状况下,资源占用是否依然会动态变化
    • 还是会动态变化
    • CPUVcores 是因为其默认是 *4

  • 设置资源的形式(以咱们的集群为例,5 台 8 核 32G 的服务器)

    • 第一,给每个 Executor 调配 3 个 core 即 executor-cores=3,个别设置 5 对 HDFS 的吞吐量会比拟敌对。
    • 第二,为后盾过程留一个 core,则每个节点可用的 core 数是 8 – 1 = 7。所以集群总的可用 core 数是 7 x 5 = 35。
    • 第三,每个节点上的 Executor 数就是 7 / 3 = 2,集群总的可用的 Executor 数就是 2 * 5 = 10。为 ApplicationManager 留一个 Executor,则 num-executors=9。
    • 第四,每个节点上每个 Executor 可调配的内存是 (32GB-1GB) / 2 = 15GB(减去的 1GB 是留给后台程序用),除去 MemoryOverHead=max(384MB, 7% * 15GB)=2GB,所以 executor-memory=15GB – 2GB = 12GB。
    • 所以最初的参数配置是:num-executors=9、executor-cores=3、executor-memory=12G
  • 设置资源的形式(调整下,比方要升高 GC,executor-memory 不须要给那么多)

    • 依照上述形式,失去每个 Executor 调配到的内存是 12GB,但假如 8GB 内存就够用了
    • 那么此时咱们能够将 executor-cores 升高为 2,那么每个节点就能够有 7 / 2 = 3 个 Executor,那么总共能够取得的 Executor 数就是 (5 3) – 1 =14,每个节点上每个 Executor 可调配的内存是(32GB-1GB) / 3 = 10GB,除去 MemoryOverHead=max(384MB, 7% 10GB)=1GB,所以 executor-memory=10GB – 1GB = 9GB
    • 所以最初的参数配置是:num-executors=14、executor-cores=2、executor-memory=9G

查看 Memory 理论分配情况

// 计算 driver Memory 的
// spark 调配的理论资源状况
def getSparkMemory():Float={val driver_memory_set1 = sc.getConf.getSizeAsBytes("spark.driver.memory")/1024/1024/1024
    val systemMemory = Runtime.getRuntime.maxMemory.toFloat///1024/1024/1024
    // fixed amount of memory for non-storage, non-execution purposes
    val reservedMemory = 300 * 1024 * 1024
    // minimum system memory required
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
    val maxMemory = (usableMemory * memoryFraction).toLong
    import org.apache.spark.network.util.JavaUtils
    val allocateMemory = JavaUtils.byteStringAsMb(maxMemory + "b")
    println(f"driver_memory: $driver_memory_set1%1.1f, allocateMemory: $allocateMemory%1.1f,")
    maxMemory
}

val maxMemory = getSparkMemory()
// driver_memory: 2.0, allocateMemory: 912.0,

// // 查看 spark web ui 资源状况
def formatBytes(bytes: Double) = {
  val k = 1000
  val i = math.floor(math.log(bytes) / math.log(k))
  val maxMemoryWebUI = bytes / math.pow(k, i)
  f"$maxMemoryWebUI%1.1f"
}
println(formatBytes(maxMemory))
// 956.6
def allocateMemory(executorMemory:Float=1, executors:Float=1, driverMemory:Float=1):Double={val driver_overmemory = Array(384, driverMemory * 0.07).max
    val executor_Memory = Array(384, executorMemory * 0.07).max
    val allocateMemory = (driver_overmemory + driverMemory) + executors * (executor_Memory + executorMemory)
    allocateMemory/1024
}
allocateMemory(1 * 1024, 16, 1 * 1024) 
// res3: Double = 23.375

查看服务环境

  • 通过 8088 端口 proxy 查看工作信息http://ip:8088/proxy/application_jobid/executors/
  • 通过 8088 端口 cluster 查看工作信息http://ip:8088/cluster/apps

正文完
 0