关于spark:你的Parquet该升级了IOException-totalValueCount-0问题定位之旅

35次阅读

共计 4314 个字符,预计需要花费 11 分钟才能阅读完成。

摘要: 应用 Spark SQL 进行 ETL 工作,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异样。

本文分享自华为云社区《你的 Parquet 该降级了:IOException: totalValueCount == 0 问题定位之旅》,原文作者:wzhfy。

1. 问题形容

应用 Spark SQL 进行 ETL 工作,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异样。

2. 初步剖析

该表的后果是由两表 join 后生成。经剖析,join 的后果产生了数据歪斜,且歪斜 key 为 null。Join 后每个 task 写一个文件,所以 partition key 为 null 的那个 task 将大量的 null 值写入了一个文件,null 值个数达到 22 亿。

22 亿这个数字比拟敏感,正好超过 int 最大值 2147483647(21 亿多)。因而,初步狐疑 parquet 在写入超过 int.max 个 value 时有问题。

【注】本文只关注大量 null 值写入同一个文件导致读取时报错的问题。至于该列数据产生如此大量的 null 是否正当,不在本文探讨范畴之内。

3. Deep dive into Parquet (version 1.8.3,局部内容可能须要联合 Parquet 源码进行了解)

入口:Spark(Spark 2.3 版本)-> Parquet

Parquet 调用入口在 Spark,所以从 Spark 开始开掘调用栈。

InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile() -> FileFormatWriter.write()

这里分几个步骤:

  1. 启动作业前,创立 outputWriterFactory: ParquetFileFormat.prepareWrite()。这里会设置一系列与 parquet 写文件无关的配置信息。其中次要的一个,是设置 WriteSupport 类:ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]),ParquetWriteSupport 是 Spark 本人定义的类。
  2. 在 executeTask() -> writeTask.execute() 中,先通过 outputWriterFactory 创立 OutputWriter (ParquetOutputWriter):outputWriterFactory.newInstance()。
  3. 对于每行记录,应用 ParquetOutputWriter.write(InternalRow) 办法顺次写入 parquet 文件。
  4. Task 完结前,调用 ParquetOutputWriter.close() 敞开资源。

3.1 Write 过程

在 ParquetOutputWriter 中,通过 ParquetOutputFormat.getRecordWriter 结构一个 RecordWriter(ParquetRecordWriter),其中蕴含了:

  1. prepareWrite() 时设置的 WriteSupport:负责转换 Spark record 并写入 parquet 构造
  2. ParquetFileWriter:负责写入文件

ParquetRecordWriter 中,其实是把 write 操作委托给了一个 internalWriter(InternalParquetRecordWriter,用 WriteSupport 和 ParquetFileWriter 结构)。

当初让咱们梳理一下,目前为止的大抵流程为:
SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute
-> ParquetOutputWriter.write -> ParquetRecordWriter.write -> InternalParquetRecordWriter.write

接下来,InternalParquetRecordWriter.write 外面,就是三件事:

(1)writeSupport.write,即 ParquetWriteSupport.write,外面分三个步骤:

  1. MessageColumnIO.MessageColumnIORecordConsumer.startMessage;
  2. ParquetWriteSupport.writeFields:写入一行中各个列的值,null 值除外;
  3. MessageColumnIO.MessageColumnIORecordConsumer.endMessage:针对第二步中的 missing fields 写入 null 值。
    ColumnWriterV1.writeNull -> accountForValueWritten:
    1)减少计数器 valueCount(int 类型)
    2)查看空间是否已满,须要 writePage – 检查点 1

(2)减少计数器 recordCount(long 类型)

(3)查看 block size,是否须要 flushRowGroupToStore – 检查点 2

因为写入的值全是 null,在 1、2 两个检查点的 memSize 都为 0,所以不会刷新 page 和 row group。导致的后果就是,始终在往同一个 page 里减少 null 值。而 ColumnWriterV1 的计数器 valueCount 是 int 类型,当超过 int.max 时,溢出,变为了一个正数。

因而,只有当调用 close() 办法时(task 完结时),才会执行 flushRowGroupToStore:
ParquetOutputWriter.close -> ParquetRecordWriter.close
-> InternalParquetRecordWriter.close -> flushRowGroupToStore
-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush

因为 valueCount 溢出为负,此处也不会写 page。

因为未调用过 writePage,所以此处的 totalValueCount 始终为 0。
ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累计 totalValueCount

在 write 完结时,InternalParquetRecordWriter.close -> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter -> for each column ColumnChunkPageWriter.writeToFileWriter:

  1. ParquetFileWriter.startColumn:totalValueCount 赋值给 currentChunkValueCount
  2. ParquetFileWriter.writeDataPages
  3. ParquetFileWriter.endColumn:currentChunkValueCount(为 0)和其余元数据信息结构出一个 ColumnChunkMetaData,相干信息最终会被写入文件。

3.2 Read 过程

同样以 Spark 为入口,进行查看。
初始化阶段:ParquetFileFormat.BuildReaderWithPartitionValues -> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter -> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get,其中蕴含 valueCount(为 0)。

读取时:VectorizedParquetRecordReader.nextBatch -> checkEndOfRowGroup:
1)ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())

因为 getValueCount 为 0,所以 pagesInChunk 为空。

2)结构 ColumnChunkPageReader:

因为 page 列表为空,所以 totalValueCount 为 0,导致在结构 VectorizedColumnReader 时报了问题中的谬误。

4. 解决办法:Parquet 降级(version 1.11.1)

在新版本中,ParquetWriteSupport.write ->
MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->
ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

在 endRecord 中减少了每个 page 最大记录条数(默认 2w 条)的属性和查看逻辑,超出限度时会 writePage,使得 ColumnWriterV1 的 valueCount 不会溢出(每次 writePage 后会清零)。

而比照老版本 1.8.3 中,ColumnWriteStoreV1.endRecord 为空函数。

附:Parquet 中的一个小 trick

Parquet 中为了节约空间,当一个 long 类型的值,在肯定范畴内时,会应用 int 来存储,其办法如下:

  • 判断是否能够用 int 存储:

  • 如果能够,用 IntColumnChunkMetaData 代替 LongColumnChunkMetaData,结构时转换:

  • 应用时,再转回来,IntColumnChunkMetaData.getValueCount -> intToPositiveLong():

一般的 int 范畴是 -2^31 ~ (2^31 – 1),因为元数据信息(如 valueCount 等)都是非负整数,那么理论只能存储 0 ~ (2^31 – 1) 范畴的数。而用这种办法,能够示意 0 ~ (2^32 – 1) 范畴的数,表白范畴也大了一倍。

附件:可用于复现的测试用例代码(依赖 Spark 局部类,可置于 Spark 工程中运行)

测试用例代码.txt 1.88KB

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0