作者:吴文池
背景
hudi在数据汇集方面,反对应用zorder对数据进行重排。
做zorder排序次要流程分为三步:
-对于用户指定的每个zorder字段,生成对应的z值。
-把所有zorder字段生成的z值进行比特位的穿插组值,生成最终的z值。
-依据最终z值,对所有数据进行排序。
在下面第一步中,每个字段生成本人的z值形式次要有两种:
间接的值映射形式。该形式实现简略,容易了解,但也有缺点:
-参加生成z值的字段实践上须要是从0开始的正整数,这样能力生成很好的z曲线,但实在的数据集中根本不可能呈现,那么zorder的成果将会打折扣。
对于一些前缀雷同的数据,例如url字段大多数都以:http://www.结尾,那么取后面几位做排序将毫无意义。
-先对数据采样,依据采样数据对所有数据进行分区,最初应用该数据所对应的分区号作为该数据的z值。这种形式能够很好的解决形式一的两个问题:
分区肯定是从0开始的整数。
对于前缀雷同的数据,也可能依据字符串大小,将其很好的调配到不同分区中,失去不同的z值。
上面次要介绍第二种形式采样分区流程。
(代码以hudi的master分支、commit 22c45a7704cf4d5ec6fb56ee7cc1bf17d826315d 为准)
采样分区流程
整体流程图
(采样分区总体流程图,外面一些具体的计算流程剖析可参考上面代码剖析)
代码剖析
获取采样后果
// 该函数次要对rdd进行采样,并返回采样后果(采样后果蕴含采样的数据和该数据对应的权重)
def getRangeBounds(): ArrayBuffer[(K, Float)] = {// zEncodeNum:即目标分区个数,用户可配置,配置项为:hoodie.layout.optimize.build.curve.sample.size
// 默认分区个数为200000
if (zEncodeNum <= 1) {
ArrayBuffer.empty[(K, Float)]
} else {// samplePointsPerPartitionHint:每个分区采样个数,默认为20
// sampleSize:须要采样的个数,最多为1e6个
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)// rdd.partitions:rdd的分区个数
// 这里假如每个分区的数据都比拟均衡,且每个分区的数据量都比拟多,
// 所以多采集一些(因为实际上会呈现分区不均衡的状况,尽管前面会对数据量大的分区从新采集,但还是会可能呈现采样个数不够的状况),
// 并算出均匀每个分区须要采样的个数
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt// rdd.map(_._1):是获取只保留了用户指定的zorder字段的rdd
// sketch 即开始对每个分区做采样,在下一节(对每个分区进行采样)具体阐明
// 返回值:
// numItems:该rdd数据总个数
// sketched:每个分区采集到的数据
val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) { ArrayBuffer.empty[(K, Float)]} else { // 如果分区蕴含的内容远远超过均匀样本数(单个分区记录数*fraction > sampleSizePartition),咱们将从中从新进行抽样 // 以确保从该分区收集足够的样本。 // 计算 采样个数占总体个数的比例,为上面判断分区是否均衡提供根据 val fraction: Double = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 记录最终采集进去的所有数据 // K: 采集到的数据 // Float: 该数据对应的比重 val candidates = ArrayBuffer.empty[(K, Float)] // 记录数据不均衡分区,后续对这些分区从新采样 val imbalancedPartitions = mutable.Set.empty[Int] // sketched : // _1: 分区号 // _2: 该分区数据总个数 // _3: 该分区采集的数据 sketched.foreach { case (idx, n, sample) => // 判断是否是不均衡的分区 // 这里能够把 fraction 和 sampleSizePerPartion 计算公式代回来化简一下, // 最终变成:(rdd.partitions.length * n) / numItems > 3 // 阐明,把以后分区内数据量看成均匀数据量的话,比理论总量大3倍 if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // 计算以后采样数据的权重 = 以后分区数据总数 / 以后分区采样总数 // 这里计算的权重,是为了前面确定分区边界 val weight = (n.toDouble / sample.length).toFloat // 将该数据放到最终的后果集中 for (key <- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // 对不均衡的分区从新采样,同时从新计算采样数据的权重 val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) // 从新采样权重 val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() // 计算权重 val weight = (1.0 / fraction).toFloat // 将该数据放到最终的后果集中 candidates ++= reSampled.map(x => (x, weight)) } // 返回采样后果集 candidates}
}
}
1.1 对每个分区进行采样
def sketch[K: ClassTag](
rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {val shift = rdd.id// 对每个分区进行采样val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => // 筹备随机种子 val seed = byteswap32(idx ^ (shift << 16)) // 应用蓄水池采样办法对该分区内数据进行采样 // sample:该分区采集到的数据 // n:该分区总数据量 val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) // idx: 以后分区编号 // n: 以后分区总输出数据个数 // sample: 以后分区采样汇合 Iterator((idx, n, sample))}.collect()// numItems 是所有分区总共输出数据个数,即以后rdd数据总量,为了前面计算采样数据的权重val numItems = sketched.map(_._2).sum// 返回后果// numItems:以后rdd数据总量,为了前面计算采样数据的权重// sketched:每个分区采集到的数据(numItems, sketched)
1.2 蓄水池采样办法
该算法次要解决的场景是:给定一个数据流,数据流长度N很大,且N直到解决完所有数据之前都不可知,请问如何在只遍历一遍数据(O(N))的状况下,可能随机选取出k个数据。
算法比较简单,次要分为两步:
应用源数据前k条数据,对后果集进行初始化。
遍历源数据k之后的数据,计算一个随机值,如果该随机值小于k,则替换后果集中的数据。
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T], k: Int, seed: Long = Random.nextLong()): (Array[T], Long) = { // input: 输出数据集,即rdd中的某个分区// k: 须要采集数据的个数// seed: 给定的种子 // 后果集,即采样到的数据 val reservoir = new Array[T](k)var i = 0// 先应用数据的前k个数据,对后果集做初始化,while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1}if (i < k) { // 数据量不够,则间接返回 val trimReservoir = new Array[T](i) System.arraycopy(reservoir, 0, trimReservoir, 0, i) (trimReservoir, i)} else { // 数据量足够,则开始做随机替换 // l:记录数据总量 var l = i.toLong val rand = new XORShiftRandom(seed) while (input.hasNext) { val item = input.next() l += 1 // 对后果集中的数据做随机替换 // 如果随机进去的数据在后果集范畴内,则替换该数据 val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { reservoir(replacementIndex.toInt) = item } } // 返回采集后果和数据总量 (reservoir, l)}
}
获取分区边界
def determineBound[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, ordering: Ordering[K]): Array[K] = {// candidates:采集到的数据
// K:以后采样的数据
// Float:以后采样数据的权重
// partitions:须要失去的分区个数(即用户配置的所需分区个数)
// ordering:排序// 将candidate依照第一个字段进行排序,其实这里也就只有一个字段
val ordered = candidates.sortBy(_._1)(ordering)// 采样样品个数
val numCandidates = ordered.size// 计算总权重
val sumWeights = ordered.map(_._2.toDouble).sum// 每个分区的均匀权重
val step = sumWeights / partitionsvar cumWeight = 0.0
var target = step// 记录边界后果集
val bounds = ArrayBuffer.empty[K]var i = 0
var j = 0
var previousBound = Option.empty[K]// 遍历已排序的candidate,累加其权重cumWeight,每当权重达到一个分区的
// 均匀权重step,就获取一个key作为分区的距离符,最初返回所有获取到的分隔符
// 注:在算边界时,会跳过反复数据
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)// 累加以后分区的权重
cumWeight += weight
if (cumWeight >= target) {// 跳过反复数据 if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { // 增加一个分区 bounds += key target += step j += 1 previousBound = Some(key) }
}
i += 1
}// 返回边界
bounds.toArray
}裁减分区边界
// sampleBounds:即下面返回的分区边界
// 因为zorder能够指定多个字段,每个字段都会确定一个边界,这里计算所有字段确定进去的边界长度的最大值
val maxLength = sampleBounds.map(_.length).max// 遍历每个分区边界,如果有分区边界长度很少的话,则对该分区边界裁减
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>val fillFactor: Int = maxLength / bound.size if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { // 以后边界长度太小,则进行对以后分区边界进行裁减 val newBound = new Array[Double](bound.length * fillFactor) val longBound = bound.asInstanceOf[Array[Long]] for (i <- 0 to bound.length - 1) { for (j <- 0 to fillFactor - 1) { // 裁减的大小,就是下面 fillFactor 的倒数 // 例如 fillFactor=3,则裁减的大小为0.33,当一个边界值为2时,则会裁减2个边界,为:2.33、2.66和3 newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) } } (newBound, fillFactor) } else { (bound, 0) }
}