本文要探討的offset指的是上圖中的Queue Offset。
為了保存消費的消費進度,避免重復消費,我們需要將offset保存下來。
針對集群消費,offset保存在broker,在客戶端使用RemoteBrokerOffsetStore。
針對廣播消費,offset保存在本地,在客戶端使用LocalFileOffsetStore。
最後,比較重要的壹點是,保存的offset指的是下壹條消息的offset,而不是消費完最後壹條消息的offset。
比如,妳消費了上圖中第壹個Queue的offset為0的消息,其實保存的offset為1,表示下次我從offset=1的位置進行消費。
在broker端,通過ConsumerOffsetManager中的offsetTable來保存Topic下各個ConsumerGroup的消費進度。
從offsetTable的雙層Map結構也是能夠看出,我上面說的消費進度,細指為ConsumerGroup在Topic下每個queue的消費進度。
offsetTable畢竟只是內存結構,因此ConsumerOffsetManager繼承了ConfigManager實現了持久化功能。
實現了encode,decode,configFilePath三個模板方法。用於指定序列化,反序列化的邏輯以及保存位置
其中序列化,反序列化的邏輯很簡單,就是使用到了我們的FastJson。
保存文件名為consumerOffset.json。
broker啟動時從本地文件加載
org.apache.rocketmq.broker.BrokerController#initialize
定時觸發,持久化到磁盤
org.apache.rocketmq.broker.BrokerController#initialize
BrokerController#shutdown
用於consumer定時同步offset
拉取消息時會順帶確認offset
事務回查觸發,暫不深入研究
本文只討論PUSH模式的集群消費,本地的offset緩存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
因為consumer每次重啟都會重新拉取offset,只是壹個臨時存儲,因此RemoteBrokerOffsetStore的offsetTable的設計沒有像ConsumerOffsetManager那麽復雜。
consumer啟動後會進行第壹次rebalance,並且之後都會定期rebalance。
在rebalance分配好messagequeue之後,會根據messagequeue生成processqueue進行消息拉取。
而在進行消息拉取前,有壹個關鍵的操作, 拉取對應messagequeue的offset 。
RebalanceImpl#updateProcessQueueTableInRebalance
其中獲取消息拉取初始位置有三種策略
CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第壹個offset
CONSUME_FROM_TIMESTAMP 根據時間戳獲取offset
但是從源碼中可以看出來,實際上的邏輯和我們想象的有點不同,上面三個的邏輯的觸發前提是,從broker拉取不到offset進度。
這應該是為了防止重復消費以及少消費,畢竟rocketmq是業務相關的mq。
在consumer端,針對offsetTable的更新,當然通過消費消息觸發。
ConsumeMessageConcurrentlyService#processConsumeResult
針對並發消費的offset,更新值來源於ProcessQueue#removeMessage方法
removeMessage的邏輯,用到了滑動窗口的算法。
比如10條消息,offset為 0 - 9。
在多線程並發消費的場景下
比如我第壹個線程消費了offset為0的消息,那麽offsetTable中的offset更新為1
然後我第二個線程消費了offset為5的消息,removeMessage返回的offset還是為1
只有前面的消息全被消費了,窗口才會滑動
ConsumeMessageOrderlyService#processConsumeResult
順序消費,暫不研究。
最終的offset以broker為準,因此本地的offset要定期持久化到offset。
主要持久化邏輯在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
persistAll和persist邏輯大致相同,核心邏輯都是通過updateConsumeOffsetToBroker持久化到broker。
觸發持久化邏輯的時機有以下4個
MQClientInstance#startScheduledTask
DefaultMQPullConsumerImpl#shutdown
DefaultMQPushConsumerImpl#shutdown
當壹個queue不再屬於當前consumer的時候,需要同步進步給broker,以便於新拿到queue的consumer從最新未消費的消息開始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
拉取消息的時候會順帶commit offset
DefaultMQPushConsumerImpl#pullMessage
PullMessageProcessor#processRequest
正常情況下,消息消費失敗不會影響窗口滑動,因為針對消費失敗的消息,client會進行sendback。
sendback之後,消息經過延遲之後會發往Topic=%RETRY%{CONSUMERGROUP}的Retry隊列
ConsumeMessageConcurrentlyService#processConsumeResult
而sendMessageBack失敗的消息,會重新封裝成另壹個ConsumeRequest在本地再次消費。
這些失敗的消息會從之前的consumeRequest移除,因此也就影響到了ProcessQueue#removeMessage的返回值。
但是這是壹個優化,重試之後窗口大概率上還是會正常滑動。
如何保證並發消費提交偏移量正確?
基於TreeMap的滑動窗口
如何保證消息消費不丟失?
滑動窗口+broker遠端保存+sendback+本地重試兜底
如果broker保存了offset
那麽從對應offset重新拉取消息
如果broker沒有保存offset,或者其他情況丟失
那麽根據配置的策略,從對應的offset開始拉取