當前位置:編程學習大全網 - 源碼下載 - RocketMQ的事務消息

RocketMQ的事務消息

RocketMQ的事務消息,是指發送消息事件和其他事件需要同時成功或同時失敗。比如銀行轉賬,A銀行的某賬戶要轉壹萬元到B銀行的某賬戶。A銀行發送“B銀行賬戶增加壹萬元”這個消息,要和“從A銀行賬戶扣除壹萬元”這個操作同時成功或者同時失敗。

RocketMQ采用兩階段提交的方式實現事務消息,TransactionMQProducer處理上面情況的流程是,先發壹個“準備從B銀行賬戶增加壹萬元”的消息,發送成功後做從A銀行賬戶扣除壹萬元的操作,根據操作結果是否成功,確定之前的“準備從B銀行賬戶增加壹萬元”的消息是做commit還是rollback,具體流程如下:

1)發送方向RocketMQ發送“待確認”消息。

2)RocketMQ將收到的“待確認”消息持久化成功後,向發送方回復消息已經發送成功,此時第壹階段消息發送完成。

3)發送方開始執行本地事件邏輯。

4)發送方根據本地事件執行結果向RocketMQ發送二次確認(Commit或是Rollback)消息,RocketMQ收到Commit狀態則將第壹階段消息標記為可投遞,訂閱方將能夠收到該消息;收到Rollback狀態則刪除第壹階段的消息,訂閱方接收不到該消息。

5)如果出現異常情況,步驟4)提交的二次確認最終未到達RocketMQ,服務器在經過固定時間段後將對“待確認”消息發起回查請求。

6)發送方收到消息回查請求後(如果發送壹階段消息的Producer不能工作,回查請求將被發送到和Producer在同壹個Group裏的其他Producer),通過檢查對應消息的本地事件執行結果返回Commit或Roolback狀態。

7)RocketMQ收到回查請求後,按照步驟4)的邏輯處理。

上面的邏輯似乎很好地實現了事務消息功能,它也是RocketMQ之前的版本實現事務消息的邏輯。

但是因為RocketMQ依賴將數據順序寫到磁盤這個特征來提高性能,步驟4)卻需要更改第壹階段消息的狀態,這樣會造成磁盤Catch的臟頁過多,降低系統的性能。所以RocketMQ在4.x的版本中將這部分功能去除。系統中的壹些上層Class都還在,用戶可以根據實際需求實現自己的事務功能。

客戶端有三個類來支持用戶實現事務消息,

第壹個類是LocalTransaction-Executer,用來實例化步驟3)的邏輯,根據情況返回LocalTransactionState.ROLLBACK_MESSAGE或者

LocalTransactionState.COMMIT_MESSAGE狀態。

第二個類是TransactionMQProducer,它的用法和DefaultMQProducer類似,要通過它啟動壹個Producer並發消息,但是比DefaultMQProducer多設置本地事務處理函數和回查狀態函數。

第三個類是TransactionCheckListener,實現步驟5)中MQ服務器的回查請求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE

上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。

1.事務消息發送及提交:

(1) 發送消息(half消息)。

(2) 服務端響應消息寫入結果。

(3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。

(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見)。

2.補償流程:

(1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起壹次“回查”。

(2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態。

(3) 根據本地事務狀態,重新Commit或者Rollback。

其中,補償階段用於解決消息Commit或者Rollback發生超時或者失敗的情況。

在RocketMQ事務消息的主要流程中,壹階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是壹階段發送的消息對用戶是不可見的。那麽,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half類型的消息。然後二階段會顯示執行提交或者回滾half消息(邏輯刪除)。當然,為了防止二階段操作失敗,RocketMQ會開啟壹個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進行消費,根據生產者組獲取壹個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。

在RocketMQ中,消息在服務端的存儲結構如下,每條消息都會有對應的索引信息,Consumer通 過ConsumeQueue這個二級索引來讀取消息實體內容,其流程如下:

RocketMQ的具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息並不會轉發到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。其實改變消息主題是RocketMQ的常用“套路”,回想壹下延時消息的實現機制。RMQ_SYS_TRANS_HALF_TOPIC

在完成壹階段寫入壹條對用戶不可見的消息後,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷壹階段的消息。先說Rollback的情況。對於Rollback,本身壹階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除壹條消息,因為是順序寫文件的)。但是區別於這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要壹個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果壹條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息後,事務消息無論是Commit或者Rollback都會記錄壹個Op操作。Commit相對於Rollback只是在寫入Op消息前創建Half消息的索引。

RocketMQ將Op消息寫入到全局壹個特定的Topic中通過源碼中的方法—

TransactionalMessageUtil.buildOpTopic();這個Topic是壹個內部的Topic(像Half消息的Topic壹樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行後續的回查操作。

在執行二階段Commit操作時,需要構建出Half消息的索引。壹階段的Half消息由於是寫到壹個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,並將Topic和Queue替換成真正的目標的Topic和Queue,之後通過壹次普通消息的寫入操作來生成壹條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了壹階段存儲的消息的內容,在二階段時恢復出壹條完整的普通消息,然後走壹遍消息寫入流程。

如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網絡問題導致Commit失敗,那麽需要通過壹定的策略使這條消息最終被Commit。RocketMQ采用了壹種補償機制,稱為“回查”。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同壹個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。

Broker端通過對比Half消息和Op消息進行事務消息的回查並且推進CheckPoint(記錄那些事務消息的狀態是確定的)。

值得註意的是,rocketmq並不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息。

TxConsumer類實現

  • 上一篇:有哪些好的3D建模軟件,最近對3D建模很感興趣
  • 下一篇:退回源代碼
  • copyright 2024編程學習大全網