摘要:应用Spark SQL进行ETL工作,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异样。
本文分享自华为云社区《你的Parquet该降级了:IOException: totalValueCount == 0问题定位之旅》,原文作者:wzhfy 。
1. 问题形容
应用Spark SQL进行ETL工作,在读取某张表的时候报错:“IOException: totalValueCount == 0”,但该表在写入时,并没有什么异样。
2. 初步剖析
该表的后果是由两表join后生成。经剖析,join的后果产生了数据歪斜,且歪斜key为null。Join后每个task写一个文件,所以partition key为null的那个task将大量的null值写入了一个文件,null值个数达到22亿。
22亿这个数字比拟敏感,正好超过int最大值2147483647(21亿多)。因而,初步狐疑parquet在写入超过int.max个value时有问题。
【注】本文只关注大量null值写入同一个文件导致读取时报错的问题。至于该列数据产生如此大量的null是否正当,不在本文探讨范畴之内。
3. Deep dive into Parquet (version 1.8.3,局部内容可能须要联合Parquet源码进行了解)
入口:Spark(Spark 2.3版本) -> Parquet
Parquet调用入口在Spark,所以从Spark开始开掘调用栈。
InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile() -> FileFormatWriter.write()
这里分几个步骤:
- 启动作业前,创立outputWriterFactory: ParquetFileFormat.prepareWrite()。这里会设置一系列与parquet写文件无关的配置信息。其中次要的一个,是设置WriteSupport类:ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]),ParquetWriteSupport是Spark本人定义的类。
- 在executeTask() -> writeTask.execute()中,先通过outputWriterFactory创立OutputWriter (ParquetOutputWriter):outputWriterFactory.newInstance()。
- 对于每行记录,应用ParquetOutputWriter.write(InternalRow)办法顺次写入parquet文件。
- Task完结前,调用ParquetOutputWriter.close()敞开资源。
3.1 Write过程
在ParquetOutputWriter中,通过ParquetOutputFormat.getRecordWriter结构一个RecordWriter(ParquetRecordWriter),其中蕴含了:
- prepareWrite()时设置的WriteSupport:负责转换Spark record并写入parquet构造
- ParquetFileWriter:负责写入文件
ParquetRecordWriter中,其实是把write操作委托给了一个internalWriter(InternalParquetRecordWriter,用WriteSupport和ParquetFileWriter结构)。
当初让咱们梳理一下,目前为止的大抵流程为:
SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute
-> ParquetOutputWriter.write -> ParquetRecordWriter.write -> InternalParquetRecordWriter.write
接下来,InternalParquetRecordWriter.write外面,就是三件事:
(1)writeSupport.write,即ParquetWriteSupport.write,外面分三个步骤:
- MessageColumnIO.MessageColumnIORecordConsumer.startMessage;
- ParquetWriteSupport.writeFields:写入一行中各个列的值,null值除外;
- MessageColumnIO.MessageColumnIORecordConsumer.endMessage:针对第二步中的missing fields写入null值。
ColumnWriterV1.writeNull -> accountForValueWritten:
1) 减少计数器valueCount (int类型)
2) 查看空间是否已满,须要writePage - 检查点1
(2)减少计数器recordCount(long类型)
(3)查看block size,是否须要flushRowGroupToStore - 检查点2
因为写入的值全是null,在1、2两个检查点的memSize都为0,所以不会刷新page和row group。导致的后果就是,始终在往同一个page里减少null值。而ColumnWriterV1的计数器valueCount是int类型,当超过int.max时,溢出,变为了一个正数。
因而,只有当调用close()办法时(task完结时),才会执行flushRowGroupToStore:
ParquetOutputWriter.close -> ParquetRecordWriter.close
-> InternalParquetRecordWriter.close -> flushRowGroupToStore
-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush
因为valueCount溢出为负,此处也不会写page。
因为未调用过writePage,所以此处的totalValueCount始终为0。
ColumnWriterV1.writePage -> ColumnChunkPageWriter.writePage -> 累计totalValueCount
在write完结时,InternalParquetRecordWriter.close -> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter -> for each column ColumnChunkPageWriter.writeToFileWriter:
- ParquetFileWriter.startColumn:totalValueCount赋值给currentChunkValueCount
- ParquetFileWriter.writeDataPages
- ParquetFileWriter.endColumn:currentChunkValueCount(为0)和其余元数据信息结构出一个ColumnChunkMetaData,相干信息最终会被写入文件。
3.2 Read过程
同样以Spark为入口,进行查看。
初始化阶段:ParquetFileFormat.BuildReaderWithPartitionValues -> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter -> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata -> ColumnChunkMetaData.get,其中蕴含valueCount(为0)。
读取时:VectorizedParquetRecordReader.nextBatch -> checkEndOfRowGroup:
1) ParquetFileReader.readNextRowGroup -> for each chunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())
因为getValueCount为0,所以pagesInChunk为空。
2)结构ColumnChunkPageReader:
因为page列表为空,所以totalValueCount为0,导致在结构VectorizedColumnReader时报了问题中的谬误。
4. 解决办法:Parquet降级(version 1.11.1)
在新版本中,ParquetWriteSupport.write ->
MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->
ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:
在endRecord中减少了每个page最大记录条数(默认2w条)的属性和查看逻辑,超出限度时会writePage,使得ColumnWriterV1的valueCount不会溢出(每次writePage后会清零)。
而比照老版本1.8.3中,ColumnWriteStoreV1.endRecord为空函数。
附:Parquet中的一个小trick
Parquet中为了节约空间,当一个long类型的值,在肯定范畴内时,会应用int来存储,其办法如下:
- 判断是否能够用int存储:
- 如果能够,用IntColumnChunkMetaData代替LongColumnChunkMetaData,结构时转换:
- 应用时,再转回来,IntColumnChunkMetaData.getValueCount -> intToPositiveLong():
一般的int范畴是 -2^31 ~ (2^31 - 1),因为元数据信息(如valueCount等)都是非负整数,那么理论只能存储0 ~ (2^31 - 1) 范畴的数。而用这种办法,能够示意0 ~ (2^32 - 1) 范畴的数,表白范畴也大了一倍。
附件:可用于复现的测试用例代码(依赖Spark局部类,可置于Spark工程中运行)
测试用例代码.txt 1.88KB
点击关注,第一工夫理解华为云陈腐技术~