共计 5656 个字符,预计需要花费 15 分钟才能阅读完成。
作者:吴文池
背景
hudi 在数据汇集方面,反对应用 zorder 对数据进行重排。
做 zorder 排序次要流程分为三步:
- 对于用户指定的每个 zorder 字段,生成对应的 z 值。
- 把所有 zorder 字段生成的 z 值进行比特位的穿插组值,生成最终的 z 值。
- 依据最终 z 值,对所有数据进行排序。
在下面第一步中,每个字段生成本人的 z 值形式次要有两种:
间接的值映射形式。该形式实现简略,容易了解,但也有缺点:
- 参加生成 z 值的字段实践上须要是从 0 开始的正整数,这样能力生成很好的 z 曲线,但实在的数据集中根本不可能呈现,那么 zorder 的成果将会打折扣。
对于一些前缀雷同的数据,例如 url 字段大多数都以:http://www. 结尾,那么取后面几位做排序将毫无意义。
- 先对数据采样,依据采样数据对所有数据进行分区,最初应用该数据所对应的分区号作为该数据的 z 值。这种形式能够很好的解决形式一的两个问题:
分区肯定是从 0 开始的整数。
对于前缀雷同的数据,也可能依据字符串大小,将其很好的调配到不同分区中,失去不同的 z 值。
上面次要介绍第二种形式采样分区流程。
(代码以 hudi 的 master 分支、commit 22c45a7704cf4d5ec6fb56ee7cc1bf17d826315d 为准)
采样分区流程
整体流程图
(采样分区总体流程图,外面一些具体的计算流程剖析可参考上面代码剖析)
代码剖析
-
获取采样后果
// 该函数次要对 rdd 进行采样,并返回采样后果(采样后果蕴含采样的数据和该数据对应的权重)
def getRangeBounds(): ArrayBuffer[(K, Float)] = {// zEncodeNum:即目标分区个数,用户可配置,配置项为:hoodie.layout.optimize.build.curve.sample.size
// 默认分区个数为 200000
if (zEncodeNum <= 1) {
ArrayBuffer.empty[(K, Float)]
} else {// samplePointsPerPartitionHint:每个分区采样个数,默认为 20
// sampleSize:须要采样的个数,最多为 1e6 个
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)// rdd.partitions:rdd 的分区个数
// 这里假如每个分区的数据都比拟均衡,且每个分区的数据量都比拟多,
// 所以多采集一些(因为实际上会呈现分区不均衡的状况,尽管前面会对数据量大的分区从新采集,但还是会可能呈现采样个数不够的状况),
// 并算出均匀每个分区须要采样的个数
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt// rdd.map(_._1):是获取只保留了用户指定的 zorder 字段的 rdd
// sketch 即开始对每个分区做采样,在下一节(对每个分区进行采样)具体阐明
// 返回值:
// numItems:该 rdd 数据总个数
// sketched:每个分区采集到的数据
val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {ArrayBuffer.empty[(K, Float)]
} else {// 如果分区蕴含的内容远远超过均匀样本数(单个分区记录数 *fraction > sampleSizePartition),咱们将从中从新进行抽样
// 以确保从该分区收集足够的样本。// 计算 采样个数占总体个数的比例,为上面判断分区是否均衡提供根据
val fraction: Double = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 记录最终采集进去的所有数据
// K: 采集到的数据
// Float: 该数据对应的比重
val candidates = ArrayBuffer.empty[(K, Float)]
// 记录数据不均衡分区,后续对这些分区从新采样
val imbalancedPartitions = mutable.Set.empty[Int]
// sketched :
// _1: 分区号
// _2: 该分区数据总个数
// _3: 该分区采集的数据
sketched.foreach {case (idx, n, sample) =>
// 判断是否是不均衡的分区
// 这里能够把 fraction 和 sampleSizePerPartion 计算公式代回来化简一下,// 最终变成:(rdd.partitions.length * n) / numItems > 3
// 阐明,把以后分区内数据量看成均匀数据量的话,比理论总量大 3 倍
if (fraction * n > sampleSizePerPartition) {imbalancedPartitions += idx} else {
// 计算以后采样数据的权重 = 以后分区数据总数 / 以后分区采样总数
// 这里计算的权重,是为了前面确定分区边界
val weight = (n.toDouble / sample.length).toFloat
// 将该数据放到最终的后果集中
for (key <- sample) {candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// 对不均衡的分区从新采样,同时从新计算采样数据的权重
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
// 从新采样权重
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
// 计算权重
val weight = (1.0 / fraction).toFloat
// 将该数据放到最终的后果集中
candidates ++= reSampled.map(x => (x, weight))
}
// 返回采样后果集
candidates
}
}
}
1.1 对每个分区进行采样
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// 对每个分区进行采样
val sketched = rdd.mapPartitionsWithIndex {(idx, iter) =>
// 筹备随机种子
val seed = byteswap32(idx ^ (shift << 16))
// 应用蓄水池采样办法对该分区内数据进行采样
// sample:该分区采集到的数据
// n:该分区总数据量
val (sample, n) = SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
// idx: 以后分区编号
// n: 以后分区总输出数据个数
// sample: 以后分区采样汇合
Iterator((idx, n, sample))
}.collect()
// numItems 是所有分区总共输出数据个数,即以后 rdd 数据总量,为了前面计算采样数据的权重
val numItems = sketched.map(_._2).sum
// 返回后果
// numItems:以后 rdd 数据总量,为了前面计算采样数据的权重
// sketched:每个分区采集到的数据
(numItems, sketched)
1.2 蓄水池采样办法
该算法次要解决的场景是:给定一个数据流,数据流长度 N 很大,且 N 直到解决完所有数据之前都不可知,请问如何在只遍历一遍数据(O(N))的状况下,可能随机选取出 k 个数据。
算法比较简单,次要分为两步:
应用源数据前 k 条数据,对后果集进行初始化。
遍历源数据 k 之后的数据,计算一个随机值,如果该随机值小于 k,则替换后果集中的数据。
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
// input: 输出数据集,即 rdd 中的某个分区
// k: 须要采集数据的个数
// seed: 给定的种子
// 后果集,即采样到的数据
val reservoir = new Array[T](k)
var i = 0
// 先应用数据的前 k 个数据,对后果集做初始化,while (i < k && input.hasNext) {val item = input.next()
reservoir(i) = item
i += 1
}
if (i < k) {
// 数据量不够,则间接返回
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// 数据量足够,则开始做随机替换
// l:记录数据总量
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {val item = input.next()
l += 1
// 对后果集中的数据做随机替换
// 如果随机进去的数据在后果集范畴内,则替换该数据
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {reservoir(replacementIndex.toInt) = item
}
}
// 返回采集后果和数据总量
(reservoir, l)
}
}
-
获取分区边界
def determineBound[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, ordering: Ordering[K]): Array[K] = {// candidates:采集到的数据
// K:以后采样的数据
// Float:以后采样数据的权重
// partitions:须要失去的分区个数(即用户配置的所需分区个数)
// ordering:排序// 将 candidate 依照第一个字段进行排序,其实这里也就只有一个字段
val ordered = candidates.sortBy(_._1)(ordering)// 采样样品个数
val numCandidates = ordered.size// 计算总权重
val sumWeights = ordered.map(_._2.toDouble).sum// 每个分区的均匀权重
val step = sumWeights / partitionsvar cumWeight = 0.0
var target = step// 记录边界后果集
val bounds = ArrayBuffer.empty[K]var i = 0
var j = 0
var previousBound = Option.empty[K]// 遍历已排序的 candidate,累加其权重 cumWeight,每当权重达到一个分区的
// 均匀权重 step,就获取一个 key 作为分区的距离符,最初返回所有获取到的分隔符
// 注:在算边界时,会跳过反复数据
while ((i < numCandidates) && (j < partitions – 1)) {
val (key, weight) = ordered(i)// 累加以后分区的权重
cumWeight += weight
if (cumWeight >= target) {// 跳过反复数据 if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { // 增加一个分区 bounds += key target += step j += 1 previousBound = Some(key) }
}
i += 1
}// 返回边界
bounds.toArray
} -
裁减分区边界
// sampleBounds:即下面返回的分区边界
// 因为 zorder 能够指定多个字段,每个字段都会确定一个边界,这里计算所有字段确定进去的边界长度的最大值
val maxLength = sampleBounds.map(_.length).max// 遍历每个分区边界,如果有分区边界长度很少的话,则对该分区边界裁减
val expandSampleBoundsWithFactor = sampleBounds.map {bound =>val fillFactor: Int = maxLength / bound.size if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { // 以后边界长度太小,则进行对以后分区边界进行裁减 val newBound = new Array[Double](bound.length * fillFactor) val longBound = bound.asInstanceOf[Array[Long]] for (i <- 0 to bound.length - 1) {for (j <- 0 to fillFactor - 1) { // 裁减的大小,就是下面 fillFactor 的倒数 // 例如 fillFactor=3,则裁减的大小为 0.33,当一个边界值为 2 时,则会裁减 2 个边界,为:2.33、2.66 和 3 newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) } } (newBound, fillFactor) } else {(bound, 0) }
}