关于数据库:hudi中zorder采样分区流程分析DEEPNOVA开发者社区

1次阅读

共计 5656 个字符,预计需要花费 15 分钟才能阅读完成。

作者:吴文池
背景
hudi 在数据汇集方面,反对应用 zorder 对数据进行重排。

做 zorder 排序次要流程分为三步:

- 对于用户指定的每个 zorder 字段,生成对应的 z 值。
- 把所有 zorder 字段生成的 z 值进行比特位的穿插组值,生成最终的 z 值。
- 依据最终 z 值,对所有数据进行排序。

在下面第一步中,每个字段生成本人的 z 值形式次要有两种:

间接的值映射形式。该形式实现简略,容易了解,但也有缺点:
- 参加生成 z 值的字段实践上须要是从 0 开始的正整数,这样能力生成很好的 z 曲线,但实在的数据集中根本不可能呈现,那么 zorder 的成果将会打折扣。
对于一些前缀雷同的数据,例如 url 字段大多数都以:http://www. 结尾,那么取后面几位做排序将毫无意义。
- 先对数据采样,依据采样数据对所有数据进行分区,最初应用该数据所对应的分区号作为该数据的 z 值。这种形式能够很好的解决形式一的两个问题:
分区肯定是从 0 开始的整数。
对于前缀雷同的数据,也可能依据字符串大小,将其很好的调配到不同分区中,失去不同的 z 值。
上面次要介绍第二种形式采样分区流程。

(代码以 hudi 的 master 分支、commit 22c45a7704cf4d5ec6fb56ee7cc1bf17d826315d 为准)

采样分区流程
整体流程图
(采样分区总体流程图,外面一些具体的计算流程剖析可参考上面代码剖析)

代码剖析

  1. 获取采样后果
    // 该函数次要对 rdd 进行采样,并返回采样后果(采样后果蕴含采样的数据和该数据对应的权重)
    def getRangeBounds(): ArrayBuffer[(K, Float)] = {

    // zEncodeNum:即目标分区个数,用户可配置,配置项为:hoodie.layout.optimize.build.curve.sample.size
    // 默认分区个数为 200000
    if (zEncodeNum <= 1) {
    ArrayBuffer.empty[(K, Float)]
    } else {

    // samplePointsPerPartitionHint:每个分区采样个数,默认为 20
    // sampleSize:须要采样的个数,最多为 1e6 个
    val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)

    // rdd.partitions:rdd 的分区个数
    // 这里假如每个分区的数据都比拟均衡,且每个分区的数据量都比拟多,
    // 所以多采集一些(因为实际上会呈现分区不均衡的状况,尽管前面会对数据量大的分区从新采集,但还是会可能呈现采样个数不够的状况),
    // 并算出均匀每个分区须要采样的个数
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

    // rdd.map(_._1):是获取只保留了用户指定的 zorder 字段的 rdd
    // sketch 即开始对每个分区做采样,在下一节(对每个分区进行采样)具体阐明
    // 返回值:
    // numItems:该 rdd 数据总个数
    // sketched:每个分区采集到的数据
    val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)


if (numItems == 0L) {ArrayBuffer.empty[(K, Float)]
} else {// 如果分区蕴含的内容远远超过均匀样本数(单个分区记录数 *fraction > sampleSizePartition),咱们将从中从新进行抽样
  // 以确保从该分区收集足够的样本。// 计算 采样个数占总体个数的比例,为上面判断分区是否均衡提供根据
  val fraction: Double = math.min(sampleSize / math.max(numItems, 1L), 1.0)
  
  // 记录最终采集进去的所有数据
  // K: 采集到的数据
  // Float: 该数据对应的比重
  val candidates = ArrayBuffer.empty[(K, Float)]
  
  // 记录数据不均衡分区,后续对这些分区从新采样
  val imbalancedPartitions = mutable.Set.empty[Int]
  
  // sketched :
  //   _1: 分区号
  //   _2: 该分区数据总个数
  //   _3: 该分区采集的数据
  sketched.foreach {case (idx, n, sample) =>
    
    // 判断是否是不均衡的分区
    // 这里能够把 fraction 和 sampleSizePerPartion 计算公式代回来化简一下,// 最终变成:(rdd.partitions.length * n) / numItems > 3
    // 阐明,把以后分区内数据量看成均匀数据量的话,比理论总量大 3 倍
    if (fraction * n > sampleSizePerPartition) {imbalancedPartitions += idx} else {
      
      // 计算以后采样数据的权重 = 以后分区数据总数 / 以后分区采样总数
      // 这里计算的权重,是为了前面确定分区边界
      val weight = (n.toDouble / sample.length).toFloat
      
      // 将该数据放到最终的后果集中
      for (key <- sample) {candidates += ((key, weight))
      }
    }
  }
  
  if (imbalancedPartitions.nonEmpty) {
    // 对不均衡的分区从新采样,同时从新计算采样数据的权重
    val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
    val seed = byteswap32(-rdd.id - 1)
    
    // 从新采样权重
    val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
    
    // 计算权重
    val weight = (1.0 / fraction).toFloat
    
    // 将该数据放到最终的后果集中
    candidates ++= reSampled.map(x => (x, weight))
  }
  
  // 返回采样后果集
  candidates
}

}
}
1.1 对每个分区进行采样
def sketch[K: ClassTag](

  rdd: RDD[K],
  sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {

val shift = rdd.id

// 对每个分区进行采样
val sketched = rdd.mapPartitionsWithIndex {(idx, iter) =>
  
  // 筹备随机种子
  val seed = byteswap32(idx ^ (shift << 16))
  
  // 应用蓄水池采样办法对该分区内数据进行采样
  // sample:该分区采集到的数据
  // n:该分区总数据量
  val (sample, n) = SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
  
  // idx: 以后分区编号
  // n: 以后分区总输出数据个数
  // sample: 以后分区采样汇合
  Iterator((idx, n, sample))
}.collect()

// numItems 是所有分区总共输出数据个数,即以后 rdd 数据总量,为了前面计算采样数据的权重
val numItems = sketched.map(_._2).sum

// 返回后果
// numItems:以后 rdd 数据总量,为了前面计算采样数据的权重
// sketched:每个分区采集到的数据
(numItems, sketched)

1.2 蓄水池采样办法
该算法次要解决的场景是:给定一个数据流,数据流长度 N 很大,且 N 直到解决完所有数据之前都不可知,请问如何在只遍历一遍数据(O(N))的状况下,可能随机选取出 k 个数据。

算法比较简单,次要分为两步:

应用源数据前 k 条数据,对后果集进行初始化。
遍历源数据 k 之后的数据,计算一个随机值,如果该随机值小于 k,则替换后果集中的数据。
def reservoirSampleAndCount[T: ClassTag](

  input: Iterator[T],
  k: Int,
  seed: Long = Random.nextLong())
: (Array[T], Long) = {
  
// input: 输出数据集,即 rdd 中的某个分区
// k: 须要采集数据的个数
// seed: 给定的种子
  
// 后果集,即采样到的数据  
val reservoir = new Array[T](k)

var i = 0
// 先应用数据的前 k 个数据,对后果集做初始化,while (i < k && input.hasNext) {val item = input.next()
  reservoir(i) = item
  i += 1
}

if (i < k) {

  // 数据量不够,则间接返回
  val trimReservoir = new Array[T](i)
  System.arraycopy(reservoir, 0, trimReservoir, 0, i)
  (trimReservoir, i)
} else {
  
  // 数据量足够,则开始做随机替换
  // l:记录数据总量
  var l = i.toLong
  val rand = new XORShiftRandom(seed)
  while (input.hasNext) {val item = input.next()
    l += 1

    // 对后果集中的数据做随机替换
    // 如果随机进去的数据在后果集范畴内,则替换该数据
    val replacementIndex = (rand.nextDouble() * l).toLong
    if (replacementIndex < k) {reservoir(replacementIndex.toInt) = item
    }
  }
  
  // 返回采集后果和数据总量
  (reservoir, l)
}

}

  1. 获取分区边界
    def determineBound[K : Ordering : ClassTag](
    candidates: ArrayBuffer[(K, Float)],
    partitions: Int, ordering: Ordering[K]): Array[K] = {

    // candidates:采集到的数据
    // K:以后采样的数据
    // Float:以后采样数据的权重
    // partitions:须要失去的分区个数(即用户配置的所需分区个数)
    // ordering:排序

    // 将 candidate 依照第一个字段进行排序,其实这里也就只有一个字段
    val ordered = candidates.sortBy(_._1)(ordering)

    // 采样样品个数
    val numCandidates = ordered.size

    // 计算总权重
    val sumWeights = ordered.map(_._2.toDouble).sum

    // 每个分区的均匀权重
    val step = sumWeights / partitions

    var cumWeight = 0.0
    var target = step

    // 记录边界后果集
    val bounds = ArrayBuffer.empty[K]

    var i = 0
    var j = 0
    var previousBound = Option.empty[K]

    // 遍历已排序的 candidate,累加其权重 cumWeight,每当权重达到一个分区的
    // 均匀权重 step,就获取一个 key 作为分区的距离符,最初返回所有获取到的分隔符
    // 注:在算边界时,会跳过反复数据
    while ((i < numCandidates) && (j < partitions – 1)) {
    val (key, weight) = ordered(i)

    // 累加以后分区的权重
    cumWeight += weight
    if (cumWeight >= target) {

     
     // 跳过反复数据
     if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
       
       // 增加一个分区
       bounds += key
       target += step
       j += 1
       previousBound = Some(key)
     }

    }
    i += 1
    }

    // 返回边界
    bounds.toArray
    }

  2. 裁减分区边界
    // sampleBounds:即下面返回的分区边界
    // 因为 zorder 能够指定多个字段,每个字段都会确定一个边界,这里计算所有字段确定进去的边界长度的最大值
    val maxLength = sampleBounds.map(_.length).max

    // 遍历每个分区边界,如果有分区边界长度很少的话,则对该分区边界裁减
    val expandSampleBoundsWithFactor = sampleBounds.map {bound =>

     
     val fillFactor: Int = maxLength / bound.size
     if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) {
       
       // 以后边界长度太小,则进行对以后分区边界进行裁减
       val newBound = new Array[Double](bound.length * fillFactor)
       val longBound = bound.asInstanceOf[Array[Long]]
       for (i <- 0 to bound.length - 1) {for (j <- 0 to fillFactor - 1) {
           
           // 裁减的大小,就是下面 fillFactor 的倒数
           // 例如 fillFactor=3,则裁减的大小为 0.33,当一个边界值为 2 时,则会裁减 2 个边界,为:2.33、2.66 和 3
           newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble)
         }
       }
       (newBound, fillFactor)
     } else {(bound, 0)
     }

    }

正文完
 0