阿里第五届中间件复赛思路最终排名15

27次阅读

共计 10783 个字符,预计需要花费 27 分钟才能阅读完成。

赛题分析

赛题分为 3 个阶段:发送阶段、查询聚合消息阶段、查询聚合结果阶段。

  • 发送阶段:发送线程多个(线上评测程序为 12 个发送线程),消息属性为: a(long 类型、8 字节、随机), t(输入时间戳模拟值、long 类型、8 字节、线程内升序),body(byte 数组,大小固定 34 字节,内容随机)。消息条数在 20 亿条左右,总数据在 100G 左右,也就是 a 和 t 分别 16G、body 68G。可用内存堆内 4G、堆外 2G,也就是数据内存肯定存不下,需要落盘。SSD 盘,性能大致是 iops 1w 左右,块读写能力(一次读写 4K 以上) 在 200MB/s 左右。

    接口定义如下:

  • 查询聚合消息阶段:有多次查询(线上评测程序大概是 3w 多次,12 个查询线程),给定 t 的范围 [tMin,tMax] 和 a 的范围[aMin,aMax],返回以 t 和 a 为条件的消息, 返回消息按照 t 升序排列。

    接口定义如下:

  • 查询聚合结果阶段: 有多次查询(线上评测程序大概是 3w 多次,12 个查询线程),给定 t 的范围 [tMin,tMax] 和 a 的范围[aMin,aMax],返回以 t 和 a 为条件对 a 求平均的值。

    接口定义如下:

需要选手按照题目要求实现这三个接口。

赛题思路

拿到赛题之后,根据以往的比赛经验就是先实现一个简单的版本,把三个阶段跑通,然后在此基础上统计数据去分析数据特点,比如 t 的间隔分布(每个线程的 t 都是满足非递减的),a 的随机性以及 body 的随机性等。所以整个比赛过程中主要有两个版本:分线程版本和全局版本。

分线程版本

整体思路如下图

实现之后在这个版本基础上开始统计数据,发现分线程的间隔 t 很小,主要集中在 0,1 等小间隔内,全局的间隔更小更是集中在 0 和 1 等小间隔内,说明 t 和内存化。a 和 body 随机性太大,而且无法压缩(使用 snappy 压缩之后比不压缩还大点,java 自带的压缩太耗 cpu,而且这次比赛的数据也没法压缩)。

既然 t 可以压缩,接下来就是把 t 压缩到内存上。压缩 t 先是采用的 zigzag 压缩算法存差值,后面和别人交流发现存 deltaOfDelta 差值的差值((t3-t2)-(t2-t1))压缩率更大。由于压缩是版本通用的,后面再单独讲。

全局版本

put 和 getMessage 阶段和分线程版本之前一样(区别在于 t 内存化了以及 get avg 阶段),如下图所示

t 内存化做法如下图

不管是分线程还是全局版本,t 内存化的做法都是一样的,对 t 排序(分线程本身就是有序的不用再排,接下来的讲解的和 t 有关的都是全局排序之后的),然后按照固定个数进行分区,每个分区 n 个 t,有辅助索引来记录每个分区在内存中的起始偏移位置以及第一个 t,这样对每个分区编码 t 的时候,第一个 t 就不需要在编码了,然后每个分区的第一个值就存 delta,其它的全存 deltaOfDelta。

全局版本排序是在 put 阶段完成之后,getMessage 阶段刚开始的时候去做的,对每个线程所有的 t 和 a 进行一个 merge sort,并将 t 按照上图的思路内存化,每个分区数是:1024 * 24,整个下来编码内存占用 252M,辅助索引内存占用不到 1M。(从工程角度上来看,肯定是在 put 阶段进行 merge sort 更具有意义性)
t 内存化了,给定一个 t 查询,先在辅助索引上进行二分查询找到 t 所在对应的分区块,然后在分区上进行解码查找具体 t 就可以了。

这样只解决了 t,a 还要是从磁盘中读取在过滤。在比赛过程中可以看出,get avg 阶段是最重要的阶段,前排选手都是 get avg 阶段得分高。按照先找符合[tMin,tMax]t 的位置,再去磁盘读取 a 过滤不符合的 a 再去算平均值,一次查询只需要读一次磁盘,但是读取的数据量太多,根据统计出来的数据可以得到其中大部分的 a 其实都不满足条件。get avg 阶段提分的做法就是需要降读取的 a 数量,降读取 a 的数量必然带来读盘次数的增加(本次给的磁盘的 iops 是 1w),所以需要去找一个平衡点。

既然 t 已经分区了,是不是可以对分区中的 a 做一个排序来减少数据量呢,答案肯定是可以的,后面具体会详解。对 a 排序再去求符合条件的平均值有两种做法,分别是:

  1. 分区内的 a 排序,并记录 a 对应的 t,这样求平均值就是先找符合条件的分区 t,再去每个分区中找符合条件的 a,对于一个分区 t 要么全部满足条件,要么部分满足条件。如果全部满足条件,就只需要从分区排序后的 a 找到满足条件的 a 即可;如果是部分满足,还会多一步,从分区排序后的 a 找到满足条件的 a 之后,还需要在从符合条件的 a 中再去过滤不符合条件的 t。
  2. t 排序后 t 对应的 a 存一份,分区内的 a 排序,但不记录 a 对应的 t,这样求平均值也是先找符合条件的分区 t,再去每个分区中找符合条件的 a,同样对于一个分区 t 要么全部满足条件,要么部分满足条件。如果全部满足条件,就只需要从分区排序后的 a 找到满足条件的 a 即可;如果是部分满足,就读 t 排序对应的 a 再去过滤 a 即可。

实际做的时候采用的方案 2,接下来重点阶段方案 2,两个方案其实比较类似,所有优化细节类似,所以方案 2 理解了方案 1 同样可以理解。(从赛后总结来看:方案 1 会更优,并且更具有工程意义性)

方案 2 的思路如下图,t 排序,然后按照 1024*48 分区,每个分区内的 a 进行排序,然后再按照 128 进行分组。

首先根据 [tMin,tMax] 可以确定 t 所在的区间,这样区间可以分为两种情况:

  1. 整个区间 t 都符合条件,只需要利用分组内 a 的辅助索引二分来确定 a 的所在分组,对于首尾分组的 a 是部分满足,最多需要两次读盘,每次读盘都是 128 个 a,对于中间的分组中的所有 a 肯定都满足条件,直接累加和就行。
  2. 区间部分满足,区间部分满足也有 3 种情况
    t 完全在中间,直接读取 t 对应的 a 来找符合条件的 a;对于 t 在左半部分或者右半部分,如果符合条件的 t 会大于等于整个区间的 3 /4,可以进行一个求反来优化,用整个区间内所有符合条件的 a 减去不符合条件 t 中符合条件的 a,来降低读取数据只是最多多了两次 128 的磁盘读取。

为什么 t 分区内 a 排序采用的 128 来进行分组呢,经过测试,一次读取 128 个 a 时间最短。12 个线程每个线程读取 1w 次,随机读取一个分区中的首 k 个 a,测试结果如下。

分组之后,原本一次读取变成了最多 8 次读取(分组合理 t 最可以最多跨 3 个分区),分组完之后,每次读取最终换成几次读取是可以计算出来的,然后再进行比对看是一次读取还是划分成多次读取节省时间,当时并没有去做这个分析。当时队友提到说可以进行分次分组,一开始没有采纳,觉得实现比较麻烦,后面仔细思考了下,还是很好实现的,所以后面就开始搞多层分组。

多层的思路如下图所示,1024*48 里面分 1024*24,1024*24 再分 1024*12,每一层按照 a 排序都需要存一份文件到磁盘中,并且有一份辅助索引在内存中。

来一个查询从大的分组开始找,找到最合适的分层,最合适的分层就是 t 可以跨了 3 个分区,关键代码如下图所示,再按照前面讲的思路去做。

public long getAvgValue(long aMin, long aMax, long tMin, long tMax) {if (aMin > aMax || tMin > tMax) {return 0;}

        GetAvgItem getItem = getItemThreadLocal.get();
        ByteBuffer tBufDup = tBuf.duplicate();
        // 对 t 进行精确定位,省去不必要的操作,查找的区间是左闭右开
        int beginTPos = findLeftClosedInterval(tMin, getItem.tDecoder, tBufDup);
        int endTPos = findRightOpenInterval(tMax, getItem.tDecoder, tBufDup);
        if (beginTPos >= endTPos) {return 0;}

        // t 符合提交的个数
        int tCount = endTPos - beginTPos;
        IntervalSum intervalSum = getItem.intervalSum;
        intervalSum.reset();

        if (tCount <= Const.T_INDEX_INTERVALS[Const.T_INDEX_INTERVALS.length - 1]) {
            // 读取的数量比最小层的间隔还小,直接从排序后的 t 对应的 a 文件读取过滤 a 求平均值返回
            readAndSumFromAPartition(beginTPos, tCount, aMin, aMax, getItem);
        } else {
            // 走分层索引查询
            sumByPartitionIndex(beginTPos, endTPos, tCount, aMin, aMax, getItem);
        }
        return intervalSum.avg();}
private void sumByPartitionIndex(int beginTPos, int endTPos, int tCount, long aMin, long aMax, GetAvgItem getItem) {PartitionIndex partitionIndex = findBestPartitionIndex(beginTPos, endTPos);
        if (partitionIndex == null) { // 没有找到最合适的索引,直接从排序后的 t 对应的 a 文件读取过滤 a 求平均值返回
            readAndSumFromAPartition(beginTPos, tCount, aMin, aMax, getItem);
            return;
        }

        int interval = partitionIndex.getInterval();
        int doubleHalfInterval = interval / 4; // 4 分 1 的分区大小
        int beginPartition = beginTPos / interval, endPartition = endTPos / interval;// 求首尾所在分区
        int firstPartitionFilterCount = beginTPos % interval, lastPartitionNeedCount = endTPos % interval;// 求首分区需要过滤的个数,尾分区需要读取的个数
        long sum = 0;
        int count = 0;

        if (firstPartitionFilterCount > 0) {// 处理首分区
            int firstReadCount = interval - firstPartitionFilterCount;
            if (firstPartitionFilterCount < doubleHalfInterval) {
                // 求反,先减后加,防止溢出
                inverseReadAndSumFromAPartition(beginTPos - firstPartitionFilterCount, firstPartitionFilterCount, aMin, aMax, getItem);
                partitionIndex.partitionSum(beginPartition, aMin, aMax, getItem);
            } else {sumByPartitionIndex(beginTPos, beginTPos + firstReadCount, firstReadCount, aMin, aMax, getItem);
            }
            beginPartition++;
        }

        if (lastPartitionNeedCount > 0) {// 处理尾分区
            if (interval - lastPartitionNeedCount < doubleHalfInterval && (endTPos - endPartition * interval) >= interval) {
                // 求反,先减后加,防止溢出
                inverseReadAndSumFromAPartition(endTPos, interval - lastPartitionNeedCount, aMin, aMax, getItem);
                partitionIndex.partitionSum(endPartition, aMin, aMax, getItem);
            } else {sumByPartitionIndex(endTPos - lastPartitionNeedCount, endTPos, lastPartitionNeedCount, aMin, aMax, getItem);
            }
        }

        // 首尾区间处理之后,[beginPartition, endPartition)中的 t 都是符合条件,不用再判断
        while (beginPartition < endPartition) {partitionIndex.partitionSum(beginPartition, aMin, aMax, getItem);
            beginPartition++;
        }
        getItem.intervalSum.add(sum, count);
    }
private PartitionIndex findBestPartitionIndex(int beginTPos, int endTPos) {for (int i = 0; i < Const.T_INDEX_INTERVALS.length; i++){int interval = partitionIndices[i].getInterval();
            int beginPartition = beginTPos / interval;
            int endPartition = endTPos / interval;

            if (endPartition - beginPartition > 1) {// 找到了最合适的分区
                return partitionIndices[i];
            }
        }
        return null;
    }

分层的好处就是可以找最合适的层去处理查询,缺点就是每一层都需要存一份文件以及一份辅助索引。

压缩算法

zigzag 压缩算法

zigzag 特别适合用来压缩小整数,虽然题目给的 t 是 long 型,范围很大,但是 t 的间隔很小,这样就可以存 delta 差值,就可以使用 zigzag 算法了。

zigzag 的思想是每次使用固定的比特位编码(至少 2 位以上),一位标志位,其它位位数据位,比如用 8 个比特位编码,其中第 1 位位标志位,用来标志是否结束(下一个 8 比特是否还有数据),剩余 7 位来表示数据。下面举一个 8 比特位编码的例子,其中标志位如果位为 1 表示还有数据,0 表示结束。

比如 0b_1000_1010_1110 使用 zigzag 的思想来编码,最终的编码就是 0b_(1_1000101)_(0_0001110),也就是 16 个比特位就能表示。

到这里会发现如果是一个负数用 zigzag 又怎么编码呢?zigzag 会将数左移一位,进行异或来进行编码,比如 -10:

-10 的二进制是 0b1111 1111 1111 1111 1111 1111 1111 0110,

0b1111 1111 1111 1111 1111 1111 1111 1111 符号位移到末尾

0b1111 1111 1111 1111 1111 1111 1110 1100 -10 左移一位

0b0000 0000 0000 0000 0000 0000 0001 0011 异或的结果,然后再按照上面的方式进行编码。

int intToZigzag(int n){return (n << 1) ^ (n >> 31);
}
int zigzagToInt(int n) {return (n >>> 1) ^ -(n & 1);
}

因为这里 t 的 delta 差值都是正数,所以不需要考虑负数,这样使用 zigzag 的思想来编码,分线程版本内存占用是 600 来 M 左右,全局版本需要 500M。

Beringei 压缩算法

不管是分线程还是全局版本,t 的 delta 分布中,其中 0 的占比最多,由于 zigzag 编码有 1 位标志位,0 最少也要使用 2 个比特位来编码,有没有其它的编码方式更省内存呢?

在和别人的交流中,得知了 Beringei 压缩算法(zigzag 和这种编码都是从 fackbook 开源的),这种编码特别适合时序数据库的压缩,t 的差值很小,t 的差值的差值就更小也就是 deltaOfDelta。后面在 github 上找到了一个相关的实现,在实现编码的地方十分精巧,在比赛中使用到了里面移位编码的代码。

本次比赛在使用的时候的简化了这种编码,并不完全一致,下面大概介绍下这种编码思想。

在编码的时候有标记位,标记位标识的是数据位的有效长度(可标识数的范围)和数据位,数据位不是一定要有,可以只有控制位,这样 0 就可以采用一个比特位来编码。下面用例子来讲解(只考虑正数,负数的话只需要加一个符号位就行):

控制位:0b0 表示 0,控制位 0 就是数据位

控制位:0b10 表示数据有效长度为 1,可表示的数据范围是 0~1

控制位:0b110 表示数据有效长度为 2,可表示的数据范围是 0~3

控制位:0b1110 表示数据有效长度为 3,可表示的数据范围是 0~7

以此类推(控制位可以根据实际场景自己设定)编码如下:

对于数字串:0 1 7 6 0 5 4 2 3 1 0 1 的编码如下

感兴趣的可以再去研究下代码:https://github.com/jecyhw/ByteBuffer-Beringei-Compress 从 https://github.com/haidfs/ByteBuffer-Beringei-Compressfork 过来的。

使用这种编码,分线程版本内存占用是 440M 左右(节省将近 1 / 3 的内存),全局版本只需要 252M(节省一半的内存)。对于限制内存的比赛,内存使用就会特别重要,内存使用的越少,就有越多内存可以当缓存使用。

缓存

按照前面讲的思路,整个内存使用情况:分线程版本内存占用是 440M(编码占用内存)+5.8M(辅助索引内存),这一部分是可以省掉的;全局版本内存占用是 252M(编码占用内存)+1M(辅助索引内存)。每一层 a 的辅助索引内存占用是 240M。也就是整个下来内存占用不到 1G,甚至可以到 500M,总共内存 6G(堆内 4G+ 堆外 2G),还有将近 5G 的内存来做缓存,需要预留一部分的内存给 jvm。

缓存比较简单,对 t 合并排序的时候从头开始缓存 a 知道将所有内存填满为此。实际使用了 2.5g 来缓存,统计出来的缓存命中率在 1 /6,但由于代码写得有问题,不是在一开始就先查缓存,还是先走分层,最后才会走缓存,比赛结束之后才发现缓存没怎么用上,当时缓存写好提交之后分数提高在 1k 左右,效果不好当时也觉得奇怪但没有细查代码,还有内存可用作缓存也就没再搞了。

总结

  1. 采用 t 全局排序分区,分区内 a 排序在分组这种思路时,当时写代码的时候队友就是想将 a 排序后的 t 编码到内存中。但是当时考虑到 a 排序后的 t 要编码到内存中,还需要去统计怎么做编码才能将 t 全部编码到内存中,而将原来 t 排序后对应的 a 保存一份,t 排序后分区内 a 排序的 a 保存一份一样可以做,而且写起来还简单点,还有一个原因是想快速验证下,所以就选择了方案 2,并且一直陷入到最后。
  2. 忽略了 t 排序后分区内 a 排序可以压缩,赛后和别人交流的时候发现压缩的提分最高。由于方案 2 的会至少保留 2 份文件,其中一份是不能压缩的,所以压缩带来的效果肯定没有只存一份 a 而且还能压缩的效果好。之前评测数据没改的时候,一直在搞压缩,改完评测数据采用全局的方案的时候就把压缩也忽略了,到比赛最后一天才想起来可以压缩。赛后和别人交流,a 压缩可以从 16g 压缩到 9g 多点,自己当时最后一天也统计过 a 的 delta 的有效数据比特位的分布,但是已经晚了。
  3. 本赛题的瓶颈点就是磁盘 io,get 两个阶段 cpu 完全富裕。当时一直在降读取数量并且用尽可能少的 io 读取次数,没能够好好分析一次读取 a 的数量的耗时和转换成几次小 io 的耗时比对。
  4. 前期 debug 耗的时间最多,基本都是在 debug,一个小小的细节没注意到就可以让你 debug 到崩溃,后面才慢慢找到感觉。
  5. 前排猜测应该都是采用的方案 1 来进行实现优化的,赛后总结确实是方案 1 更优,而且更具有工程意义性。

赛题

赛题链接:

https://tianchi.aliyun.com/competition/entrance/231714/information

https://code.aliyun.com/middlewarerace2019/mqrace2019?spm=5176.12281978.0.0.562d1556Ek96Hp&accounttraceid=ec081c33-412a-478d-89a2-3e20ab14e0de

1. 赛题描述

Apache RocketMQ 作为的一款分布式的消息中间件,历年双十一承载了万亿级的消息流转,为业务方提供高性能低延迟的稳定可靠的消息服务。随着业务的逐步发展和云上的输出,各种依赖消息作为输入输出的流计算场景层出不穷,这些都给 RocketMQ 带来了新的挑战。请实现一个进程内消息持久化存储引擎,可支持简单的条件查询,以及支持简单的聚合计算,如指定时间窗口的内对于消息属性某些字段的求平均等。

2. 题目内容

实现一个进程内消息持久化存储引擎,要求包含以下功能:

  • 发送消息功能 – 根据一定的条件做查询或聚合计算,包括 A. 查询一定时间窗口内的消息 B. 对一定时间窗口内的消息属性某个字段求平均

例子:t 表示时间,时间窗口 [1000, 1002] 表示: t>=1000 & t<=1002 (这里的 t 和实际时间戳没有任何关系, 只是一个模拟时间范围)

对接口层而言,消息包括两个字段,一个是业务字段 a,一个是时间戳,以及一个 byte 数组消息体。实际存储格式用户自己定义,只要能实现对应的读写接口就好。

发送消息如下(忽略消息体):

消息 1,消息属性{“a”:1,”t”:1001} 消息 2,消息属性{“a”:2,”t”:1002} 消息 3,消息属性{“a”:3,”t”:1003}

查询如下:

示例 1 - 输入:时间窗口[1001,9999],对 a 求平均 输出:2, 即:(1+2+3)/3=2 示例 2 - 输入:时间窗口[1002,9999],求符合的消息 输出:{“a”:1,”t”:1002},{“a”:3,”t”:1003} 示例 3 - 输入:时间窗口[1000,9999]&(a>=2),对 a 求平均 输出:2 (去除小数位)

3. 语言限定

JAVA

4. 程序目标

仔细阅读 demo 项目中的 MessageStore,DefaultMessageStoreImpl,DemoTester 三个类。

你的 coding 目标是实现 DefaultMessageStoreImpl

注:评测时的数据存储路径为:/alidata1/race2019/data。日志请直接打印在控制台标准输出,可以使用 System.out.println,如果使用日志框架,请配置为 ConsoleAppender。注意不要把日志输出到 Error 通道(也即不要使用 System.err.println,如果使用日志框架,则不要使用 log.error)。评测程序会把控制台标准输出的内容搜集出来,放置在 OSS 上面供用户排错,但是请不要密集打印日志,单次评测,最多不能超过 100M,超过会截断

5. 参赛方法说明

  1. 在阿里天池找到 ” 中间件性能挑战赛 ”,并报名参加
  2. 在 code.aliyun.com 注册一个账号,并新建一个仓库名,并将大赛官方账号 middleware2019 添加为项目成员,权限为 reporter
  3. fork 或者拷贝本仓库的代码到自己的仓库,并实现自己的逻辑
  4. 在天池提交成绩的入口,提交自己的仓库 git 地址,等待评测结果
  5. 坐等每天 10 点排名更新

6. 测试环境描述

测试环境为 4c8g 的 ECS,Jvm 相关参数 -Xmx4g -XX:MaxDirectMemorySize=2g -XX:+UseConcMarkSweepGC。带一块 300G 左右大小的 SSD 磁盘。

SSD 性能大致如下:iops 1w 左右;块读写能力(一次读写 4K 以上) 在 200MB/s 左右。

ulimit -a:

core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 31052 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 655350 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 31052 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited

磁盘调度算法是 deadline 其它系统参数都是默认的。

7. 评测指标和规模

评测程序分为 3 个阶段:发送阶段、查询聚合消息阶段、查询聚合结果阶段:

  • 发送阶段:假设发送消息条数为 N1,所有消息发送完毕的时间为 T1; 发送线程多个,消息属性为: a(随机整数), t(输入时间戳模拟值,和实际时间戳没有关系, 线程内升序). 消息总大小为 50 字节,消息条数在 20 亿条左右,总数据在 100G 左右
  • 查询聚合消息阶段:有多次查询,消息总数为 N2,所有查询时间为 T2; 返回以 t 和 a 为条件的消息, 返回消息按照 t 升序排列
  • 查询聚合结果阶段: 有多次查询,消息总数为 N3,所有查询时间为 T3; 返回以 t 和 a 为条件对 a 求平均的值

若查询结果都正确,则最终成绩为 N1/T1 + N2/T2 + N3/T3 附加,无成绩情况:

  1. 发送阶段耗时超过 1800s; 查询聚合消息和聚合结构阶段不能超过 1800s
  2. 查询结果有错误
  3. 发现有作弊行为,比如通过 hack 评测程序,绕过了必须的评测逻辑

8. 排名规则

按照上述计算的成绩从高到低来排名

9. 第二 / 三方库规约

  • 仅允许依赖 JavaSE 8 包含的 lib
  • 可以参考别人的实现,拷贝少量的代码
  • 我们会对排名靠前的代码进行 review,如果发现大量拷贝别人的代码,将扣分
  • 不允许使用 jni/jna
  • 不允许使用 dio (direct IO)
  • 允许使用堆外内存

10. 作弊说明

所有消息都应该进行按实际发送的信息进行存储,可以压缩,但不能伪造。程序不能针对数据规律, 查询条件等进行针对性优化, 所有优化必须符合随机数据的通用性 程序不能绕过 JVM 的内存参数限制进行内存分配, 否则取消成绩 如果发现有作弊行为,比如通过 hack 评测程序,绕过了必须的评测逻辑,则程序无效,且取消参赛资格。

比赛排名

最终排名截图

各阶段得分截图

代码仓库

https://github.com/jecyhw/mqrace2019

正文完
 0