當前位置:編程學習大全網 - 源碼下載 - RocketMQ之offset確認機制

RocketMQ之offset確認機制

本文要探討的offset指的是上圖中的Queue Offset。

為了保存消費的消費進度,避免重復消費,我們需要將offset保存下來。

針對集群消費,offset保存在broker,在客戶端使用RemoteBrokerOffsetStore。

針對廣播消費,offset保存在本地,在客戶端使用LocalFileOffsetStore。

最後,比較重要的壹點是,保存的offset指的是下壹條消息的offset,而不是消費完最後壹條消息的offset。

比如,妳消費了上圖中第壹個Queue的offset為0的消息,其實保存的offset為1,表示下次我從offset=1的位置進行消費。

在broker端,通過ConsumerOffsetManager中的offsetTable來保存Topic下各個ConsumerGroup的消費進度。

從offsetTable的雙層Map結構也是能夠看出,我上面說的消費進度,細指為ConsumerGroup在Topic下每個queue的消費進度。

offsetTable畢竟只是內存結構,因此ConsumerOffsetManager繼承了ConfigManager實現了持久化功能。

實現了encode,decode,configFilePath三個模板方法。用於指定序列化,反序列化的邏輯以及保存位置

其中序列化,反序列化的邏輯很簡單,就是使用到了我們的FastJson。

保存文件名為consumerOffset.json。

broker啟動時從本地文件加載

org.apache.rocketmq.broker.BrokerController#initialize

定時觸發,持久化到磁盤

org.apache.rocketmq.broker.BrokerController#initialize

BrokerController#shutdown

用於consumer定時同步offset

拉取消息時會順帶確認offset

事務回查觸發,暫不深入研究

本文只討論PUSH模式的集群消費,本地的offset緩存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。

因為consumer每次重啟都會重新拉取offset,只是壹個臨時存儲,因此RemoteBrokerOffsetStore的offsetTable的設計沒有像ConsumerOffsetManager那麽復雜。

consumer啟動後會進行第壹次rebalance,並且之後都會定期rebalance。

在rebalance分配好messagequeue之後,會根據messagequeue生成processqueue進行消息拉取。

而在進行消息拉取前,有壹個關鍵的操作, 拉取對應messagequeue的offset

RebalanceImpl#updateProcessQueueTableInRebalance

其中獲取消息拉取初始位置有三種策略

CONSUME_FROM_LAST_OFFSET 最新的offset

CONSUME_FROM_FIRST_OFFSET 第壹個offset

CONSUME_FROM_TIMESTAMP 根據時間戳獲取offset

但是從源碼中可以看出來,實際上的邏輯和我們想象的有點不同,上面三個的邏輯的觸發前提是,從broker拉取不到offset進度。

這應該是為了防止重復消費以及少消費,畢竟rocketmq是業務相關的mq。

在consumer端,針對offsetTable的更新,當然通過消費消息觸發。

ConsumeMessageConcurrentlyService#processConsumeResult

針對並發消費的offset,更新值來源於ProcessQueue#removeMessage方法

removeMessage的邏輯,用到了滑動窗口的算法。

比如10條消息,offset為 0 - 9。

在多線程並發消費的場景下

比如我第壹個線程消費了offset為0的消息,那麽offsetTable中的offset更新為1

然後我第二個線程消費了offset為5的消息,removeMessage返回的offset還是為1

只有前面的消息全被消費了,窗口才會滑動

ConsumeMessageOrderlyService#processConsumeResult

順序消費,暫不研究。

最終的offset以broker為準,因此本地的offset要定期持久化到offset。

主要持久化邏輯在persistAll和persist方法。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist

persistAll和persist邏輯大致相同,核心邏輯都是通過updateConsumeOffsetToBroker持久化到broker。

觸發持久化邏輯的時機有以下4個

MQClientInstance#startScheduledTask

DefaultMQPullConsumerImpl#shutdown

DefaultMQPushConsumerImpl#shutdown

當壹個queue不再屬於當前consumer的時候,需要同步進步給broker,以便於新拿到queue的consumer從最新未消費的消息開始拉取

RebalancePullImpl#removeUnnecessaryMessageQueue

拉取消息的時候會順帶commit offset

DefaultMQPushConsumerImpl#pullMessage

PullMessageProcessor#processRequest

正常情況下,消息消費失敗不會影響窗口滑動,因為針對消費失敗的消息,client會進行sendback。

sendback之後,消息經過延遲之後會發往Topic=%RETRY%{CONSUMERGROUP}的Retry隊列

ConsumeMessageConcurrentlyService#processConsumeResult

而sendMessageBack失敗的消息,會重新封裝成另壹個ConsumeRequest在本地再次消費。

這些失敗的消息會從之前的consumeRequest移除,因此也就影響到了ProcessQueue#removeMessage的返回值。

但是這是壹個優化,重試之後窗口大概率上還是會正常滑動。

如何保證並發消費提交偏移量正確?

基於TreeMap的滑動窗口

如何保證消息消費不丟失?

滑動窗口+broker遠端保存+sendback+本地重試兜底

如果broker保存了offset

那麽從對應offset重新拉取消息

如果broker沒有保存offset,或者其他情況丟失

那麽根據配置的策略,從對應的offset開始拉取

  • 上一篇:二、OpenGL坐標系
  • 下一篇:準絕殺是什麽意思
  • copyright 2024編程學習大全網