當前位置:編程學習大全網 - 源碼下載 - kafka存儲結構以及Log清理機制

kafka存儲結構以及Log清理機制

本文主要聚焦 kafka 的日誌存儲以及日誌清理相關。

首先我們來看壹張 kafak 的存儲結構圖。

如上圖所示、kafka 中消息是以主題 topic 為基本單位進行歸類的,這裏的 topic 是邏輯上的概念,實際上在磁盤存儲是根據分區存儲的,每個主題可以分為多個分區、分區的數量可以在主題創建的時候進行指定。例如下面 kafka 命令創建了壹個 topic 為 test 的主題、該主題下有 4 個分區、每個分區有兩個副本保證高可用。

分區的修改除了在創建的時候指定。還可以動態的修改。如下將 kafka 的 test 主題分區數修改為 12 個

分區內每條消息都會被分配壹個唯壹的消息 id,也就是我們通常所說的 offset, 因此 kafak 只能保證每壹個分區內部有序性,不能保證全局有序性。

如果分區設置的合理,那麽所有的消息都可以均勻的分布到不同的分區中去,這樣可以實現水平擴展。不考慮多副本的情況下,壹個分區對應壹個 log 日誌、如上圖所示。為了防止 log 日誌過大,kafka 又引入了日誌分段(LogSegment)的概念,將 log 切分為多個 LogSegement,相當於壹個巨型文件被平均分配為相對較小的文件,這樣也便於消息的維護和清理。事實上,Log 和 LogSegement 也不是純粹物理意義上的概念,Log 在物理上只是以文件夾的形式存儲,而每個 LogSegement 對應於磁盤上的壹個日誌文件和兩個索引文件,以及可能的其他文件(比如以".txindex"為後綴的事務索引文件)。

kafak 中的 Log 對應了壹個命名為<topic>-<partition> 的文件夾。舉個例子、假如有壹個 test 主題,此主題下遊 3 個分區,那麽在實際物理上的存儲就是 "test-0","test-1","test-2" 這三個文件夾。

向 Log 中寫入消息是順序寫入的。只有最後壹個 LogSegement 才能執行寫入操作,在此之前的所有 LogSegement 都不能執行寫入操作。為了方便描述,我們將最後壹個 LogSegement 成為"ActiveSegement",即表示當前活躍的日誌分段。隨著消息的不斷寫入,當 ActiveSegement 滿足壹定的條件時,就需要創建新的 activeSegement,之後在追加的消息寫入新的 activeSegement。

為了便於消息的檢索,每個 LogSegement 中的日誌文件(以".log" 為文件後綴)都有對應的兩個文件索引:偏移量索引文件(以".index" 為文件後綴)和時間戳索引文件(以".timeindex"為文件後綴)。每個 LogSegement 都有壹個“基準偏移量” baseOffset,用來標識當前 LogSegement 中第壹條消息的 offset。偏移量是壹個 64 位的長整形。日誌文件和兩個索引文件都是根據基準偏移量(baseOffset)命名的,名稱固定為 20 位數字,沒有達到的位數則用 0 填充。比如第壹個 LogSegment 的基準偏移量為 0,對應的日誌文件為 00000000000000000000.log

示例中第 2 個 LogSegment 對應的基準位移是 256,也說明了該 LogSegment 中的第壹條消息的偏移量為 256,同時可以反映出第壹個 LogSegment 中***有 256 條消息(偏移量從 0 至 254 的消息)。

由於 kafak 是把消息存儲 在磁盤上,為了控制消息的不斷增加我們就必須對消息做壹定的清理和壓縮。kakfa 中的每壹個分區副本都對應的壹個 log 日誌文件。而 Log 又分為多個 LogSegement 日誌分段。這樣也便於日誌清理。kafka 內部提供了兩種日誌清理策略。

按照壹定的保留策略直接刪除不符合條件的日誌分段。

我們可以通過 broker 端參數 log.cleanup.policy 來設置日誌清理策略,此參數的默認值為“delete”,即采用日誌刪除的清理策略。如果要采用日誌壓縮的清理策略,就需要將 log.cleanup.policy 設置為“compact”,並且還需要將 log.cleaner.enable(默認值為 true)設定為 true。通過將 log.cleanup.policy 參數設置為“delete,compact”,還可以同時支持日誌刪除和日誌壓縮兩種策略。日誌清理的粒度可以控制到主題級別,比如與 log.cleanup.policy 對應的主題級別的參數為 cleanup.policy,為了簡化說明,本文只采用 broker 端參數做陳述。

日誌刪除任務會檢查當前日誌文件中是否有保留時間超過設定的閾值(retentionMs)來尋找可刪除的日誌分段文件集合(deletableSegments),如圖下圖所示。retentionMs 可以通過 broker 端參數 log.retention.hours、log.retention.minutes 和 log.retention.ms 來配置,其中 log.retention.ms 的優先級最高,log.retention.minutes 次之,log.retention.hours 最低。默認情況下只配置了 log.retention.hours 參數,其值為 168,故默認情況下日誌分段文件的保留時間為 7 天。

查找過期的日誌分段文件,並不是簡單地根據日誌分段的最近修改時間 lastModifiedTime 來計算的,而是根據日誌分段中最大的時間戳 largestTimeStamp 來計算的。因為日誌分段的 lastModifiedTime 可以被有意或無意地修改,比如執行了 touch 操作,或者分區副本進行了重新分配,lastModifiedTime 並不能真實地反映出日誌分段在磁盤的保留時間。要獲取日誌分段中的最大時間戳 largestTimeStamp 的值,首先要查詢該日誌分段所對應的時間戳索引文件,查找時間戳索引文件中最後壹條索引項,若最後壹條索引項的時間戳字段值大於 0,則取其值,否則才設置為最近修改時間 lastModifiedTime.

若待刪除的日誌分段的總數等於該日誌文件中所有的日誌分段的數量,那麽說明所有的日誌分段都已過期,但該日誌文件中還要有壹個日誌分段用於接收消息的寫入,即必須要保證有壹個活躍的日誌分段 activeSegment,在此種情況下,會先切分出壹個新的日誌分段作為 activeSegment,然後執行刪除操作。

刪除日誌分段時,首先會從 Log 對象中所維護日誌分段的跳躍表中移除待刪除的日誌分段,以保證沒有線程對這些日誌分段進行讀取操作。然後將日誌分段所對應的所有文件添加上“.deleted”的後綴(當然也包括對應的索引文件)。最後交由壹個以“delete-file”命名的延遲任務來刪除這些以“.deleted”為後綴的文件,這個任務的延遲執行時間可以通過 file.delete.delay.ms 參數來調配,此參數的默認值為 60000,即 1 分鐘。

日誌刪除任務會檢查當前日誌的大小是否超過設定的閾值(retentionSize)來尋找可刪除的日誌分段的文件集合(deletableSegments),如下圖所示。retentionSize 可以通過 broker 端參數 log.retention.bytes 來配置,默認值為-1,表示無窮大。註意 log.retention.bytes 配置的是 Log 中所有日誌文件的總大小,而不是單個日誌分段(確切地說應該為.log 日誌文件)的大小。單個日誌分段的大小由 broker 端參數 log.segment.bytes 來限制,默認值為 1073741824,即 1GB。

基於日誌大小的保留策略與基於時間的保留策略類似,首先計算日誌文件的總大小 size 和 retentionSize 的差值 diff,即計算需要刪除的日誌總大小,然後從日誌文件中的第壹個日誌分段開始進行查找可刪除的日誌分段的文件集合 deletableSegments。查找出 deletableSegments 之後就執行刪除操作,這個刪除操作和基於時間的保留策略的刪除操作相同

  • 上一篇:快速電機源代碼
  • 下一篇:什麽是初始化控件
  • copyright 2024編程學習大全網