关于iceberg:分析Iceberg合并任务解决数据冲突
作者:吴文池 背景 iceberg提供合并小文件性能,能够依照用户的配置文件,把多个合乎配置的小文件合并成多个大文件。该流程次要是对源数据做了一次复制。 但在理论生产环境中,数据是始终在变动的,有可能会呈现这种状况:在还未实现数据合并工作时,对之前的数据做出了批改,这就导致正在合并的数据与新数据可能会产生抵触。 那么iceberg是如何解决这种数据抵触的呢? iceberg行级删除 iceberg的行级批改次要是通过行级删除记录再加上数据记录实现的,所以上面先说一下iceberg行级删除的实现。 iceberg行级删除实现概要 iceberg的行级更新和删除,次要是通过减少delete记录做到的。iceberg总共有两种delete类型,一种是position delete,一种是equality delete,次要区别是在于该数据的插入和批改操作是否在同一个checkpoint内。此次剖析的抵触场景次要是不同快照间的,所以以下阐明都以equality delete为背景,简略说下流程(非upsert模式)。 先在 writer 中设置 equalityFieldColumns : DataStreamSink<Void> append = FlinkSink.forRowData(input) .tableLoader(tableLoader).overwrite(false).distributionMode(DistributionMode.HASH).equalityFieldColumns(Lists.newArrayList("column1", "column2")) .append();设置好之后,writer就会依据设置的列对数据做写入,如果是插入的数据,则写为数据记录,如果是删除数据,则写为删除记录(非upsert模式): case INSERT:case UPDATE_AFTER: if (upsert) { writer.delete(row);} writer.write(row); break;case UPDATE_BEFORE: if (upsert) { break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice} writer.delete(row); break;case DELETE: writer.delete(row); break;在读取数据的时候,会依据数据生成的程序(即sequenceNumber),判断该数据记录是否有对应的删除记录,如果有,则认为该数据曾经被删除,如果没有,则认为该数据仍然存在。 利用删除记录的判断条件 iceberg中应用接口FileScanTask来记录以后扫描工作: public interface FileScanTask extends ScanTask { /** The {@link DataFile file} to scan. *@return the file to scan */DataFile file(); ...