可能很多人用 Presto 只用 Hive Connector,其实 Iceberg connector 跟 Hive 差不多,不论从实现,还是从性能上都有相互的参照,尤其是在实现方面应用了十分多的 Hive connector 底层的代码。它创立 table 也是一样,咱们能够从一个 TPC-DS 数据的 customer 表里抽几列再创立一个 table,你能够指定这个数据的格局,能够是 Parquet 也能够是 ORC 格局。也能够同时指定分区(partition),这里用出世的月份这样会容易些,因为月份只有 12 个,也就是 12 个分区。咱们创立了这个表之后,跟 Hive 表一样,你能够 select 这个表,select* from Iceberg.test,test1 是表名,你能够用一个美元符号 $ 加上 partitions,这样你能够把这个表的所有分区给列出来。每个分区都会有一个统计,比如说上面第一行 8 月,能看到它有多少行有多少个文件,大小总共有多大,同时对于 customer_sk 这一列,能看到最小值多少最大值多少。前面的 birth date 就是日期,对于 8 月最小值是 1,最大值是 31,空值有若干。因为 8 月是大月,前面的 9 月是小月最大值是 30,每一个 partition 都会有本人的统计,前面咱们会再讲,predicate pushdown 会用到这个,能够让咱们跳过很多的分区,其实 Hive 也有这个性能,只不过可能有些数据在 Hive metastore 上,元数据这里没有的话,用不上这个性能,但在 Iceberg 上它内嵌在 table 里了,就会比拟好用一些。
后面提到 Iceberg 会有一些事务(transaction)反对。咱们试着往这个表里退出一行,SK 是 1000,日期是 40,我特意插入了一个不可能存在的日期月份是 13,这样等于说我新创建了一个 partition。其实不论是不是新建 partition 都会产生一个新的快照(snapshot),在 Presto 里,通过 select * from 表, 表的名字下面加一个美元 $ 符号,而后再加一个 snapshots,就能够列出这个 table 所有的 snapshots。大家能够看到有两个 snapshots,因为新建 table 时出一个,插入一行之后又出一个,manifest list 就有两个 avro 文件,第二个 snapshot 基于第一个,第二个 snapshot 的 parent ID 就是第一个 snapshot 的 parent ID,待会咱们会用 snapshot ID 来做 time travel。
对于这样一个文件,咱们加了一个 partition 进去之后会怎么样,看一下这个目录,其实 Iceberg 的目录非常简单,咱们指定一个目录,它在这上面就创立一个 test1,里边有两个文件夹,一个是 data (数据),一个是 metadata(元数据)。数据里边是依照月份来分区的,这外面是 14 个分区,因为 12 个月份,还有个空值,再加上咱们新加的月份 13,等于当初一共 14 个分区,这个文件就是这么组织的,而每个目录上面就是 parquet 文件。
那么咱们在 query 的时候会产生什么呢?
这个其实大家都会——写个 SQL,从这个 select*from Test1 的时候,指定一个条件,我这个月份是 13,那就把我方才新插入的那一条记录给调进去了。我前面会介绍一下怎么做工夫穿梭 (Time travel),咱们能够看到在这个表名 test1 这有个 @符,前面我能够加一个 snapshot ID,如果我用第二个(快照)snapshot,就能查到这个记录,如果我用第一个 snapshot 就没有这个记录,为什么?因为第一个 query 产生在插入这条记录之前。这还挺有用的,因为有的时候就想查一下我这个表昨天是什么样的。但这也有问题,如果你频繁插入数据的话,就会产生大量的 snapshot,avro 外面就会有大量的数据。那咱们是不是要丢掉一些过期的快照?这也是个优化点,当初 presto 还没有,但当前我感觉咱们会把它做进去。
另外有些敌人会问,既然 Iceberg connector 有这个性能了,能不能用它来取代 MySQL,做 OLTP 来解决一些在线的 transaction 数据? ——能够,然而不能像 MySQL 那么用,频繁的插入数据还会带来一些问题,须要做更深刻的优化,间接这么用的话会产生大量的小文件和快照,但这些都有方法解决,咱们前面会把它缓缓迭代进去。
这个是我的前共事 Chunxu 做 Schema Evolution 的时候截的一张图。能够看到这也是 Iceberg 的一个亮点,就是说这个表原来有几列,我能够加一列或者改一列,当然这也不难,因为原来 Hive table 也能够这样做,然而做完之后,你的 table 还能不能查?Iceberg 给咱们的答案是 table 改完了还能查,当然这里边也有 tricky 的中央,外面的数据也不是这么残缺,然而不论怎么样它没出错,你先改好 table,用老的 query 还可能查到。这个性能我感觉还是挺实用的,因为各个公司 table 总在改,改完之后 table 在 presto 这边还是能够查的。
最近这两三个月,有几个性能进来之后给咱们的一些货色解锁了。
1. 第一份 credit 要重点给亚马逊公司 AWS 的 Jack Ye,他做了一个 native folder 的反对,这个在 Iceberg 叫做 Hadoop catalog,盘活了咱们的很多性能,解决了咱们十分多的痛点。
2. 另外就是腾讯的 Baolong,他把 local cache 这个性能给加上去了,当初 Iceberg connector 能够和 RaptorX 那一套,就是 Hive connector 里的 cache,同样享受 local cache,失去提速。当然这个不是那么简略,那么开箱即用,可能会须要一些配置,前面我会再具体地讲。
3. 接下来就是 Uber 的 Xinli Shang,咱们俩给 parquet 做了降级。Xinli Shang 是 Parquet 社区的 chair,他给 parquet 做了降级后,咱们拿过去放在 presto 里,咱们的降级工作历时大略半年,降级到新的 parquet 之后,咱们也解锁了 Iceberg 1.12,有更多新的性能,包含对 v2 的 Iceberg table 的反对。
4. 还有一个 predicate pushdown,在前面 Beinan(Alluxio)也会具体地讲一下,这是能够优化查问的一个性能。
这就是我方才提到的 Jack 做的 native catalog——原来在 Iceberg 叫 Hadoop catalog,其实 Iceberg 数据也是存在 S3、HDFS、GCS 里的。它的每一个 table 上面既有元数据,又有数据,那为什么还须要 Hive metastore,还要去 Hive metastore 里取元数据呢?这是因为最开始的 Hive catalog 还是要依赖 Hive 的元数据的,咱们须要找到 table 的门路,到这个 table 里把 Iceberg 本人的元数据加载进去,而后用 presto 进行查问。有了 Jack 这个很好的批改,咱们能够反对 Hadoop catalog,你间接给它一个门路,table 都放在这个门路上面,它到这个门路下来扫一下,就能够录入所有的 table,像 table1,table2,table3,每个 table 多少元数据,咱们就不再须要 Hive metastore 了。有了这个 native catalog 之后,presto 和 Iceberg 的联合就残缺了。原本咱们还依赖于一个额定的元数据存储,当初咱们能够间接应用 native catalog,这解决了十分多的痛点。
这个是之前有敌人问的 local cache,这个性能可能两个礼拜以前才 merge 的,腾讯的 Baolong 特地厉害几天就把这性能给做好了。那么为什么这个货色这么快就做好了,这个得从 Iceberg connector 的实现说起,是因为 Iceberg connector 和 Hive connector 用的是一套货色,都是同一个 Parquet reader 或者是 ORC reader。所以说咱们过后在 RaptorX 这个我的项目里,就是在 Hive 上面做 local cache,这个我的项目里用的很好的 local cache,在 Facebook、头条、还有 Uber 都获得很好成果,咱们就把这个 local cache 间接搬到 Iceberg 里来用,间接能获得一个很好的成果。
这个里边有关键点得跟大家说一下,这个 local cache 像是每个 worker 本人的公有缓存,它不像 Alluxio cache 那样,是一个分布式的、弹性的,能够部署比如说 100 个节点或 200 个节点,能够程度扩大的。然而这里不一样,这里边给你一个就近的小容量的 local 缓存,给每个 worker 可能 500g 或者 1TB 的一个本地磁盘,用来当作缓存应用。
这里就有一个问题,Presto 在做 plan 的时候是随机分的,比如说每一个大的 table 上面有 1 万个 partition,下面可能有 100 万个文件,轻易拿一个文件,它不肯定去哪个 worker,每个 worker cache 不了那么多数据。于是咱们就有一个 soft affinity scheduling,有点像做负载平衡的时候会有一个 affinity 的性能一样,也就是粘连,比如说这个文件去 work1,当前就始终去 work1,这样的话 work1 只有把它 cache 了,你再拜访这个文件它的 cache 命中率就进步了,所以这个 affinity 的性能是肯定要关上的。
如果你发现 local cache 命中率很低,你就要看是不是 affinity 做的不对,是不是你的节点频繁减少或者缩小,即便你什么都不调,你只有把 soft affinity 关上,用一个比如说 500g 或者 1TB 的 local cache,它的命中率应该是不会低的,应该是有百分之六七十的命中率,这个是数据量很大的状况,数据量小的话可能有 100% 的命中率。
事实上就是 Presto 交给 Iceberg 来做一个 plan。在收到 SQL 申请后,Presto 解析,把 SQL 拆一拆,通知 Iceberg 要查什么,Iceberg 就会生成一个 plan,说要扫哪些文件,而后 presto 通过 soft affinity 把这些文件分发给特定的 worker,这些 worker 就会去扫这些文件,如果命中了本地 cache 扫本地文件,如果没命中本地 cache 就扫近程文件。其实 Alluxio 就是一个二级的存储,本地没命中去 Alluxio,Alluxio 还没命中去三级存储。
当然咱们后续会有 semantic cache,这个次要是给 Hive 做的,然而就像我后面提的,因为 Iceberg 和 Hive 底层的实现是同源的,因而后果咱们都能够用。这里跟大家通报一个最新的停顿,是 AWS 的 Iceberg 团队 Jack 刚跟咱们讲的,咱们可能会不再应用 presto 的 Hive 实现,当然这是可选的,你能够持续应用 presto 的 Hive 实现,也能够应用 Iceberg 的 native 实现。这样当前 Iceberg 有什么新的性能,咱们就不依赖 Hive,这也是坏事,而且咱们能够引入更多向量化的货色,这是个长期的 plan,可能明年大家才会见到。
对于一个 presto 的查问来说,咱们就说 select* from table,比如说 city=‘Beijing’,profile age>18 岁,这样一个查问,它其实就生成了右边这三个块的 plan,先 scan,scan 完了给 filter,filter 完了给输入,其实就是扫完表之后,看哪些符合条件就输入但这个没有必要。比如说咱们这个表是依照城市做的分区,没有必要扫整个表,而且每一个文件都有统计的,可能这个文件里年龄都是小于 18 岁的,就不必扫能够间接跳过了。
在 presto 里有个 connector optimizer,这个是 prestoDB 特有的一个货色,能够针对不同的 connector 来做优化,为什么要针对不同的 connector 做优化?因为很多人可能是一个 Iceberg table 去照映一个 Hive table,这两个 table 底下 scan 数据源是不一样的,所以你要决定到底把什么条件下推下去。其实有一个最简略的规定,就是 Iceberg 目前不反对 profile.age 这种有嵌套的字段的下推,那我就把 city 下推,就把 city 的 filter 下推到 table scan,和 table scan 合并成一个 plan 节点,就是这个中央既做 filter 又做 scan,而后下推给 Iceberg,这里我把 age>18 岁留着,scan 好了之后再 filter,这个不是最优的计划,但这是最根本的规定。
咱们来看看成果,这不是一个正规的 Benchmark,就是方才我建的 table,新加了一条记录,月份等于 13 的那个,如果我不开下推的话,是右边这种状况,它扫了 200 万条记录,input 数据是 200 多 KB;开了下推之后,它只扫描一条记录,工夫和数据都有十分大的晋升,它只扫特定的分区。这种查问在事实中遇不到,因为事实中必定是有更多的组合,每个 partition 上面可能还有更多的文件,这里比拟极其一些,只有这一个文件。其实这样成果不显著,文件越多成果越好,举荐大家试一试。
之前提到了 Native Iceberg IO,咱们会应用 Iceberg reader 和 writer 来取代 Hive 实现,让它彻底的离开,也可能两个都反对,这个就是咱们将要进行的工作,敬请期待。
另外一个是物化视图(materialized view),是我的前共事 Chunxu 在做的,这也是重要的一个性能,让长期表可能把数据存到一起,这个也没有那么简略,Facebook 也有共事在做这个。咱们这里不再多说了,Facebook 很快会有一个对于物化视图的新的博客进去。
我会持续把 v2 table 实现一下删原来只能是依照分区来删,不能删除某一行或某几行,因为删除操作和 insert 是一样的,会再产生一些新的文件,标注说你这几行要被删掉了,而后真正出后果的时候,会把它合并到一起。当初是不反对这个性能的,那么咱们要想反对这个性能,有两个做法,一个是应用 native Iceberg IO,另外一个就是在 Parquet 的 reader 上把要删的这些行标注进去,示意这些行不再显示。以上就是今年年底或明年年初的一些工作和构想,谢谢大家。