Kafka压缩
概括
需要理解kafka压缩则需要理解Kafka的存储格式.
Kafka存储格式
RecordBatch
baseOffset: int64batchLength: int32partitionLeaderEpoch: int32magic: int8 (current magic value is 2)crc: int32attributes: int16 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6~15: unusedlastOffsetDelta: int32firstTimestamp: int64maxTimestamp: int64producerId: int64producerEpoch: int16baseSequence: int32records: [Record]
Record
length: varintattributes: int8 bit 0~7: unusedtimestampDelta: varintoffsetDelta: varintkeyLength: varintkey: byte[]valueLen: varintvalue: byte[]Headers => [Header]
Record Header
headerKeyLength: varintheaderKey: StringheaderValueLength: varintValue: byte[]
Note: 图片来源.推荐阅读该文章来更好的了解Kafka消息格式演变过程.
消息比较
0000 0000 0000 0000 0000 0040 0000 000002e3 0171 9400 0000 0000 0000 0001 6ad90153 7e00 0001 6ad9 0153 7eff ffff ffffffff ffff ffff ffff ff00 0000 011c 00000006 6b65 790a 7661 6c75 65000000 0000 0000 0001 0000 0054 0000 000002e5 cb48 0600 0100 0000 0000 0001 6ad95427 af00 0001 6ad9 5427 afff ffff ffffffff ffff ffff ffff ff00 0000 011f 8b080000 0000 0000 0093 6160 6060 cb4e ade42a4b cc29 4d65 0000 55dc 0454 0f00 0000
用可视化的方式分析
76B =======================Header============================0000 0000 0000 0000 => first offset => 00000 0040 => length => 640000 0000 => partition leader epoch => 002 => magic => 2e3 0171 94 => crc32 => 380852264400 00 => attributes => 000 0000 00 => last offset delta => 000 0001 6ad9 0153 7e => first timestamp => 155841890393400 0001 6ad9 0153 7e => max timestamp => 1558418903934ff ffff ffff ffff ff => producer id => -1ff ff => producer epoch => -1ff ffff ff => first sequence => -100 0000 01 => record count => 1=======================Records===========================================1c => length(变长) => 1400 => arrtibutes => 弃用00 => timestamp delta(变长) => 000 => offset delta(变长) => 006 => key length(变长) => 36b65 79 => key => "key"0a => value length(变长) => 57661 6c75 65 => value => "value"00 => headers counts(变长) => 0
开启消息压缩
0000 0000 0000 0000 0000 0040 0000 000002e3 0171 9400 0000 0000 0000 0001 6ad90153 7e00 0001 6ad9 0153 7eff ffff ffffffff ffff ffff ffff ff00 0000 011c 00000006 6b65 790a 7661 6c75 6500================上面是76B未压缩的数据=====================0000 0000 0000 0001 first offset0000 0054 length0000 0000 partition leader epoch02 magice5 cb48 06 crc3200 01 attributes00 0000 00 last offset delta00 0001 6ad9 5427 af first timestamp00 0001 6ad9 5427 af max timestampff ffff ffff ffff ff producer idff ff producer epochff ffff ff first sequence00 0000 01 record count1f 8b08 0000 0000 0000 0093 6160 6060 cb4e ade4 2a4b cc29 4d65 0000 55dc 0454 0f00 0000
结论
消息压缩只是针对records部分.