可能很多人用Presto只用 Hive Connector,其实Iceberg connector跟Hive差不多,不论从实现,还是从性能上都有相互的参照,尤其是在实现方面应用了十分多的Hive connector底层的代码。它创立table也是一样,咱们能够从一个 TPC-DS数据的 customer表里抽几列再创立一个table,你能够指定这个数据的格局,能够是Parquet也能够是ORC格局。也能够同时指定分区(partition),这里用出世的月份这样会容易些,因为月份只有12个,也就是12个分区。咱们创立了这个表之后,跟Hive表一样,你能够select这个表,select* from Iceberg.test,test1是表名,你能够用一个美元符号$加上partitions,这样你能够把这个表的所有分区给列出来。每个分区都会有一个统计,比如说上面第一行8月,能看到它有多少行有多少个文件,大小总共有多大,同时对于customer_sk这一列,能看到最小值多少最大值多少。前面的birth date就是日期,对于8月最小值是1,最大值是31,空值有若干。因为8月是大月,前面的9月是小月最大值是30,每一个partition都会有本人的统计,前面咱们会再讲, predicate pushdown会用到这个,能够让咱们跳过很多的分区,其实Hive也有这个性能,只不过可能有些数据在Hive metastore上,元数据这里没有的话,用不上这个性能,但在Iceberg上它内嵌在table里了,就会比拟好用一些。

后面提到Iceberg会有一些事务(transaction)反对。咱们试着往这个表里退出一行, SK是1000,日期是40,我特意插入了一个不可能存在的日期月份是13,这样等于说我新创建了一个partition。其实不论是不是新建partition都会产生一个新的快照(snapshot),在Presto里,通过select * from 表, 表的名字下面加一个美元$符号,而后再加一个snapshots,就能够列出这个 table所有的snapshots。大家能够看到有两个snapshots,因为新建table时出一个,插入一行之后又出一个,manifest list就有两个avro文件,第二个snapshot基于第一个,第二个snapshot 的parent ID就是第一个snapshot的parent ID,待会咱们会用snapshot ID来做time travel。

对于这样一个文件,咱们加了一个partition进去之后会怎么样,看一下这个目录,其实Iceberg的目录非常简单,咱们指定一个目录,它在这上面就创立一个test1,里边有两个文件夹,一个是data (数据),一个是metadata(元数据)。数据里边是依照月份来分区的,这外面是 14个分区,因为12个月份,还有个空值,再加上咱们新加的月份13,等于当初一共14个分区,这个文件就是这么组织的,而每个目录上面就是parquet文件。

那么咱们在query的时候会产生什么呢?

这个其实大家都会——写个SQL,从这个select*from Test1的时候,指定一个条件,我这个月份是13,那就把我方才新插入的那一条记录给调进去了。我前面会介绍一下怎么做工夫穿梭(Time travel),咱们能够看到在这个表名test1这有个@符,前面我能够加一个 snapshot ID,如果我用第二个(快照)snapshot,就能查到这个记录,如果我用第一个snapshot就没有这个记录,为什么?因为第一个query产生在插入这条记录之前。这还挺有用的,因为有的时候就想查一下我这个表昨天是什么样的。但这也有问题,如果你频繁插入数据的话,就会产生大量的snapshot,avro外面就会有大量的数据。那咱们是不是要丢掉一些过期的快照?这也是个优化点,当初presto还没有,但当前我感觉咱们会把它做进去。

另外有些敌人会问,既然Iceberg connector有这个性能了,能不能用它来取代MySQL,做OLTP来解决一些在线的transaction数据? ——能够,然而不能像MySQL那么用,频繁的插入数据还会带来一些问题,须要做更深刻的优化,间接这么用的话会产生大量的小文件和快照,但这些都有方法解决,咱们前面会把它缓缓迭代进去。

这个是我的前共事Chunxu做 Schema Evolution的时候截的一张图。能够看到这也是Iceberg的一个亮点,就是说这个表原来有几列,我能够加一列或者改一列,当然这也不难,因为原来 Hive table也能够这样做,然而做完之后,你的table还能不能查?Iceberg给咱们的答案是 table改完了还能查,当然这里边也有tricky的中央,外面的数据也不是这么残缺,然而不论怎么样它没出错,你先改好table,用老的query还可能查到。这个性能我感觉还是挺实用的,因为各个公司 table总在改,改完之后 table在presto这边还是能够查的。

最近这两三个月,有几个性能进来之后给咱们的一些货色解锁了。

1.第一份credit要重点给亚马逊公司AWS 的Jack Ye,他做了一个 native folder的反对,这个在Iceberg叫做Hadoop catalog,盘活了咱们的很多性能,解决了咱们十分多的痛点。

2.另外就是腾讯的Baolong,他把local cache这个性能给加上去了,当初Iceberg connector能够和 RaptorX那一套,就是Hive connector里的cache,同样享受local cache,失去提速。当然这个不是那么简略,那么开箱即用,可能会须要一些配置,前面我会再具体地讲。

3.接下来就是Uber的Xinli Shang,咱们俩给parquet做了降级。Xinli Shang是Parquet社区的chair,他给parquet做了降级后,咱们拿过去放在presto里,咱们的降级工作历时大略半年,降级到新的parquet之后,咱们也解锁了Iceberg 1.12,有更多新的性能,包含对v2 的Iceberg table的反对。

4.还有一个predicate pushdown,在前面Beinan(Alluxio)也会具体地讲一下,这是能够优化查问的一个性能。


这就是我方才提到的 Jack做的 native catalog——原来在Iceberg叫Hadoop catalog,其实Iceberg数据也是存在 S3、HDFS、GCS里的。它的每一个table上面既有元数据,又有数据,那为什么还须要Hive metastore,还要去Hive metastore里取元数据呢?这是因为最开始的Hive catalog还是要依赖Hive的元数据的,咱们须要找到 table的门路,到这个table里把Iceberg本人的元数据加载进去,而后用 presto进行查问。有了 Jack这个很好的批改,咱们能够反对Hadoop catalog,你间接给它一个门路,table都放在这个门路上面,它到这个门路下来扫一下,就能够录入所有的table,像table1,table2,table3, 每个table多少元数据,咱们就不再须要Hive metastore了。有了这个native catalog之后, presto和Iceberg的联合就残缺了。原本咱们还依赖于一个额定的元数据存储,当初咱们能够间接应用native catalog,这解决了十分多的痛点。


这个是之前有敌人问的 local cache,这个性能可能两个礼拜以前才merge的,腾讯的Baolong特地厉害几天就把这性能给做好了。那么为什么这个货色这么快就做好了,这个得从Iceberg connector的实现说起,是因为Iceberg connector和 Hive connector用的是一套货色,都是同一个Parquet reader或者是ORC reader。所以说咱们过后在 RaptorX这个我的项目里,就是在 Hive上面做local cache,这个我的项目里用的很好的 local cache,在 Facebook、头条、还有Uber都获得很好成果,咱们就把这个local cache间接搬到Iceberg里来用,间接能获得一个很好的成果。

这个里边有关键点得跟大家说一下,这个 local cache像是每个worker本人的公有缓存,它不像Alluxio cache那样,是一个分布式的、弹性的,能够部署比如说100个节点或200个节点,能够程度扩大的。然而这里不一样,这里边给你一个就近的小容量的local缓存,给每个worker可能500g或者1TB的一个本地磁盘,用来当作缓存应用。

这里就有一个问题, Presto在做plan的时候是随机分的,比如说每一个大的table上面有1万个partition,下面可能有100万个文件,轻易拿一个文件,它不肯定去哪个worker,每个worker cache不了那么多数据。于是咱们就有一个soft affinity scheduling,有点像做负载平衡的时候会有一个affinity的性能一样,也就是粘连,比如说这个文件去work1,当前就始终去work1,这样的话work1只有把它cache了,你再拜访这个文件它的cache命中率就进步了,所以这个affinity的性能是肯定要关上的。

如果你发现local cache命中率很低,你就要看是不是affinity做的不对,是不是你的节点频繁减少或者缩小,即便你什么都不调,你只有把 soft affinity关上,用一个比如说500g或者1TB的 local cache,它的命中率应该是不会低的,应该是有百分之六七十的命中率,这个是数据量很大的状况,数据量小的话可能有100%的命中率。

事实上就是Presto交给Iceberg来做一个plan。在收到SQL申请后,Presto解析,把 SQL拆一拆,通知Iceberg要查什么,Iceberg就会生成一个plan,说要扫哪些文件,而后presto通过 soft affinity把这些文件分发给特定的worker,这些worker就会去扫这些文件,如果命中了本地cache扫本地文件,如果没命中本地cache就扫近程文件。其实Alluxio就是一个二级的存储,本地没命中去Alluxio,Alluxio还没命中去三级存储。

当然咱们后续会有semantic cache,这个次要是给Hive做的,然而就像我后面提的,因为Iceberg和Hive底层的实现是同源的,因而后果咱们都能够用。这里跟大家通报一个最新的停顿,是AWS的Iceberg 团队Jack刚跟咱们讲的,咱们可能会不再应用presto的Hive 实现,当然这是可选的,你能够持续应用presto的 Hive实现,也能够应用Iceberg的native实现。这样当前Iceberg有什么新的性能,咱们就不依赖Hive,这也是坏事,而且咱们能够引入更多向量化的货色,这是个长期的 plan,可能明年大家才会见到。


对于一个presto的查问来说,咱们就说select* from table,比如说 city=‘Beijing’,profile age>18岁,这样一个查问,它其实就生成了右边这三个块的plan,先scan,scan完了给filter,filter完了给输入,其实就是扫完表之后,看哪些符合条件就输入但这个没有必要。比如说咱们这个表是依照城市做的分区,没有必要扫整个表,而且每一个文件都有统计的,可能这个文件里年龄都是小于18岁的,就不必扫能够间接跳过了。

在 presto里有个connector optimizer,这个是prestoDB特有的一个货色,能够针对不同的connector来做优化,为什么要针对不同的connector做优化?因为很多人可能是一个Iceberg table去照映一个Hive table,这两个table底下 scan数据源是不一样的,所以你要决定到底把什么条件下推下去。其实有一个最简略的规定,就是Iceberg目前不反对profile.age这种有嵌套的字段的下推,那我就把 city下推,就把city的 filter下推到table scan,和table scan合并成一个plan节点,就是这个中央既做filter又做scan,而后下推给Iceberg,这里我把age>18岁留着,scan好了之后再filter,这个不是最优的计划,但这是最根本的规定。

咱们来看看成果,这不是一个正规的Benchmark,就是方才我建的 table,新加了一条记录,月份等于13的那个,如果我不开下推的话,是右边这种状况,它扫了200万条记录,input数据是200多KB;开了下推之后,它只扫描一条记录,工夫和数据都有十分大的晋升,它只扫特定的分区。这种查问在事实中遇不到,因为事实中必定是有更多的组合,每个partition上面可能还有更多的文件,这里比拟极其一些,只有这一个文件。其实这样成果不显著,文件越多成果越好,举荐大家试一试。

之前提到了 Native Iceberg IO,咱们会应用Iceberg reader和writer来取代Hive实现,让它彻底的离开,也可能两个都反对,这个就是咱们将要进行的工作,敬请期待。


另外一个是物化视图(materialized view),是我的前共事Chunxu在做的,这也是重要的一个性能,让长期表可能把数据存到一起,这个也没有那么简略, Facebook也有共事在做这个。咱们这里不再多说了,Facebook很快会有一个对于物化视图的新的博客进去。

我会持续把v2 table实现一下删原来只能是依照分区来删,不能删除某一行或某几行,因为删除操作和insert是一样的,会再产生一些新的文件,标注说你这几行要被删掉了,而后真正出后果的时候,会把它合并到一起。当初是不反对这个性能的,那么咱们要想反对这个性能,有两个做法,一个是应用 native Iceberg IO,另外一个就是在 Parquet的 reader上把要删的这些行标注进去,示意这些行不再显示。以上就是今年年底或明年年初的一些工作和构想,谢谢大家。