根目錄下有以下5個checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分區目錄下有以下目錄: 0000xxx.index(偏移量為64位長整形,長度固定為20位), 0000xxx.log, 0000xxx.timeindex.
還有可能包含.deleted .cleaned .swap等臨時文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日誌格式演變
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校驗值
magic(1B):消息版本號0
attributes(1B):消息屬性。低3位表示壓縮類型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的長度。-1代表null
key: 可選
value length(4B):實際消息體的長度。-1代表null
value: 消息體。可以為空,如墓碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的時間戳
attributes的第4位也被利用起來,0表示timestamp的類型為CreateTime,1表示timestamp的類型為LogAppendTime
timestamp類型由broker端參數log.message.timestamp.type來配置,默認為CreateTime,即采用生產者創建的時間戳
5.2.3 消息壓縮
保證端到端的壓縮,服務端配置compression.type,默認為"producer",表示保留生產者使用的壓縮方式,還可以配置為"gzip","snappy","lz4"
多條消息壓縮至value字段,以提高壓縮率
5.2.4 變長字段
變長整形(Varints):每壹個字節都有壹個位於最高位的msb位(most significant bit),除了最後壹個字節為1,其余都為0,字節倒序排列
為了使編碼更加高效,Varints使用ZigZag編碼:sint32對應 (n<<1)^(n>>31) sint64對應 (n<<1)^(n>>63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定為2
attributes:兩個字節。低3位表示壓縮格式,第4位表示時間戳類型,第5位表示事務(0-非事務1-事務),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息總長度)、timestamp delta(時間戳增量)、offset delta(位移增量)和headers信息,並且棄用了attributes
Record
length:
attributes:棄用,但仍占據1B
timestamp delta:
offset delta:
headers:
5.3日誌索引
稀疏索引(sparse index):每當寫入壹定量(broker端參數log.index.interval.bytes指定,默認為4096B),偏移量索引文件和時間索引文件分別對應壹個索引項
日誌段切分策略:
1.大小超過broker端參數log.segment.bytes配置的值,默認為1073741824(1GB)
2.當前日誌段消息的最大時間戳與當前系統的時間戳差值大於log.roll.ms或者log.roll.hours,ms優先級高,默認log.roll.hours=168(7天)
3.索引文件或者時間戳索引文件的大小大於log.index.size.max.bytes配置的值,默認為10485760(10MB)
4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE
5.3.1 偏移量索引
每個索引項占用8個字節,分為兩個部分:1.relativeOffset相對偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh腳本來解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh --files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端參數log.index.size.max.bytes不是8的倍數,內部會自動轉換為8的倍數
5.3.2 時間戳索引
每個索引項占用12個字節,分為兩個部分:1.timestamp當前日誌分段的最大時間戳(12B) 2.relativeOffset時間戳對應的相對偏移量(4B)
如果broker端參數log.index.size.max.bytes不是12的倍數,內部會自動轉換為12的倍數
5.4日誌清理
日誌清理策略可以控制到主題級別
5.4.1 日誌刪除
broker端參數log.cleanup.policy設置為delete(默認為delete)
檢測周期broker端參數log.retention.check.interval.ms=300000(默認5分鐘)
1.基於時間
broker端參數log.retention.hours,log.retention.minutes,log.retention.ms,優先級ms>minutes>hours
刪除時先增加.delete後綴,延遲刪除根據file.delete.delay.ms(默認60000)配置
2.基於日誌大小
日誌總大小為broker端參數log.retention.bytes(默認為-1,表示無窮大)
日誌段大小為broker端參數log.segment.bytes(默認為1073741824,1GB)
3.基於日誌起始偏移量
DeleteRecordRequest請求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh腳本
5.4.2 日誌壓縮
broker端參數log.cleanup.policy設置為compact,且log.cleaner.enable設置為true(默認為true)
5.5磁盤存儲
相關測試:壹個由6塊7200r/min的RAID-5陣列組成的磁盤簇的線性寫入600MB/s,隨機寫入100KB/s,隨機內存寫入400MB/s,線性內存3.6GB/s
5.5.1 頁緩存
Linux操作系統的vm.dirty_background_ratio參數用來指定臟頁數量達到系統的百分比之後就觸發pdflush/flush/kdmflush,壹般小於10,不建議為0
vm.dirty_ratio表示臟頁百分比之後刷盤,但是阻塞新IO請求
kafka同樣提供同步刷盤及間斷性強制刷盤(fsync)功能,可以通過log.flush.interval.messages、log.flush.interval.ms等參數來控制
kafka不建議使用swap分區,vm.swappiness參數上限為100,下限為0,建議設置為1
5.5.2 磁盤I/O流程
壹般磁盤IO的場景有以下4種:
1.用戶調用標準C庫進行IO操作,數據流為:應用程序Buffer->C庫標準IOBuffer->文件系統也緩存->通過具體文件系統到磁盤
2.用戶調用文件IO,數據流為:應用程序Buffer->文件系統也緩存->通過具體文件系統到磁盤
3.用戶打開文件時使用O_DIRECT,繞過頁緩存直接讀寫磁盤
4.用戶使用類似dd工具,並使用direct參數,繞過系統cache與文件系統直接讀寫磁盤
Linux系統中IO調度策略有4種:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷貝
指數據直接從磁盤文件復制到網卡設備中,不需要經應用程序
對linux而言依賴於底層的sendfile()
對java而言,FileChannal.transferTo()的底層實現就是sendfile()