當前位置:編程學習大全網 - 源碼下載 - rocketmq事務源代碼詳解

rocketmq事務源代碼詳解

我們的服務器已經從單機發展到多機分布式系統。之前各個系統需要借助網絡進行通信,原有單機中相對可靠的方法調用和進程間通信方式無法使用。同時網絡環境不穩定,造成了我們多臺機器之間的數據同步問題。這是壹個典型的分布式事務問題。

在分布式事務中,參與者、支持事務的服務器、資源服務器和事務管理器位於不同分布式系統的不同節點上。分布式事務是為了保證不同節點之間的數據壹致性。

1,2PC(兩階段提交)方案-強壹致性

2.3PC(三階段提交)方案

3.TCC(嘗試-確認-取消)交易-最終壹致性

4.Saga交易-最終壹致性

5.本地消息表-最終壹致性

6.MQ事務-最終壹致性

除了自己的業務邏輯之外,消息的生產者還需要維護壹個消息表。這個消息表中記錄的是需要與其他服務同步的信息。當然,在這個消息表中,每條消息都有壹個狀態值來標識消息是否被成功處理。

發送播放的業務邏輯和消息表中數據的插入將在壹個事務中完成,避免了業務處理成功+事務消息發送失敗,或者業務處理失敗+事務消息發送成功的問題。

舉個栗子:

我們假設目前有兩個服務,訂單服務和購物車服務。用戶將購物車中的幾種商品組合起來下訂單,然後他們需要關於剛剛放入購物車的商品的信息。

1.消息的生產者,也就是訂單服務,完成自己的邏輯(訂購商品)然後通過mq把這個消息發送給其他需要數據同步的服務,也就是我們栗子中的購物車服務。

2.其他服務(購物車服務)將偵聽此隊列;

1.如果收到這個消息,並且成功執行了數據同步,當然這也是壹個本地事務,那麽這個消息的生產者(訂單服務)的消息已經通過mq進行了處理,然後生產者就可以識別出這個事務已經結束了。如果是業務錯誤,回復消息的生產者,需要進行數據回滾。

2.很久沒有收到這條消息了,不會出現這種情況。消息的發送者將有壹個預定的任務,並將定期重試發送消息表中未處理的消息。

3.如果消息生產者(訂單服務)收到消息回執;

1.如果成功,修改為該消息已經被處理,即該分布式事務的同步已經完成;

2.如果消息的結果是執行失敗,則同時在本地回滾事務,表示消息已經被處理;

3.如果消息丟失,即未收到回執消息,這種情況不太可能發生。消息的發送方(訂單服務)會有壹個調度任務,重試發送消息表中未處理的消息,下遊服務需要冪等,可能會多次收到重復的消息。如果來自回復消息的生產者的接收消息丟失,它將繼續接收來自生產者的mq消息,然後再次回復來自消息生產者的接收信息。

有兩個非常重要的操作:

1.服務器在處理消息時需要冪等,消息的生產者和接收者都需要冪等;

2.發送方需要增加壹個定時器來遍歷和重推未處理的消息,避免消息丟失導致的事務執行斷裂。

這個方案的優點和缺點

優勢:

1,在設計層面實現了消息數據的可靠性,不依賴於消息中間件,弱化了對mq特性的依賴。

2.簡單且易於實現。

缺點:

主要是需要和業務數據綁定,高度耦合。使用同壹個數據庫會占用業務數據庫的壹些資源。

下面分析幾種消息隊列對事務的支持。

解決問題的RocketMQ中的事務是確保本地事務和發送消息都是成功或失敗的。而且RocketMQ增加了事務回溯機制,提高了事務執行的成功率和數據壹致性。

主要有兩個方面,正常的事務提交和事務消息補償。

正常交易提交

1.發送消息(半消息)。這種半消息與普通消息的區別在於,在提交交易之前,消費者看不到這種消息。

2.MQ服務器寫信息,返回響應結果;

3.根據MQ服務器響應的結果,決定是否執行本地事務。如果MQ服務器寫入信息,則本地事務成功執行,否則不會執行。

如果MQ服務器沒有收到提交或回滾的消息,這種情況需要進行補償。

補償流程

1.如果MQ SERVER沒有收到消息發送方的提交或回滾消息,它會向消息發送方即我們的服務器發起查詢,查詢當前的消息狀態;

2.消息發送方接收相應的查詢請求,查詢事務的狀態,然後將狀態推回MQ SERVER,以便MQ SERVER執行後續流程。

與本地消息表處理分布式事務相比,MQ事務是將本應在本地消息表中處理的邏輯放到MQ中完成的。

Kafka中的事務解決了這個問題,並確保在壹個事務中發送的多條信息要麽成功,要麽失敗。即,確保了對多個分區寫操作的原子性。

通過配合Kafka的冪等機制,實現了Kafka的恰好壹次,滿足了讀-處理-寫模式的應用程序。當然,卡夫卡中的交易主要處理的就是這種模式。

什麽是讀取-處理-寫入模式?

李茹:流計算中,以卡夫卡為數據源,計算結果保存到卡夫卡。數據從Kafka的某個主題消費,在計算集群中計算,然後保存在Kafka的其他主題中。在這個過程中,需要保證每條消息只處理壹次,這樣才能保證最終結果的成功。卡夫卡事務的原子性保證了讀寫的原子性,它們要麽壹起成功,要麽壹起回滾失敗。

我們來分析壹下卡夫卡的交易是如何實現的。

它的實現原理和RocketMQ事務類似,基於兩階段提交,實現起來可能比較麻煩。

我先介紹壹下事務協調器。為了解決分布式事務問題,Kafka引入了事務協調者的角色,負責在服務器端協調整個事務。這個協調器不是壹個獨立的進程,而是代理進程的壹部分。協調器和分區壹樣,通過選舉來保證其可用性。

Kafka cluster還有壹個專門記錄事務日誌的主題,記錄所有的事務日誌。同時會有多個協調器,每個協調器負責管理和使用事務日誌中的幾個分區。這可以並行執行事務並提高性能。

我們來看看具體流程

交易提交

1.協調器將事務的狀態設置為PrepareCommit,並將其寫入事務日誌;

2.協調器在每個分區中寫入事務結束的標識,然後客戶端可以將之前過濾的未提交事務消息釋放給消費者進行消費;

事務回滾

1.協調器將事務的狀態設置為PrepareAbort,並將其寫入事務日誌;

2.協調器在每個分區中寫入事務回滾的標識,然後可以丟棄之前未提交的事務消息;

這裏引用壹下消息隊列大師課上的圖片。

RabbitMQ中的事務解決的問題是保證生產者的消息到達MQ服務器,這和其他MQ事務有壹點不同,這裏就不討論了。

讓我們首先分析下壹條消息在MQ中經歷的階段。

生產階段:生產者生成消息,並通過網絡發送給代理。

存儲階段:當代理得到消息時,它需要卸載消息。如果是MQ的集群版本,它需要將數據同步到其他節點。

消費階段:消費者在經紀人端拉數據,通過網絡傳輸到消費者端。

網絡丟包、網絡故障等都會導致消息的丟失。

在生產者發送消息之前,通過channel.txSelect啟動壹個事務,然後發送消息。如果消息未能傳遞到服務器,事務將回滾到channel.txRollback,然後再次發送。如果服務器收到消息,它將提交事務channel.txCommit。

但是使用事務的性能並不好。這是壹個同步操作。壹條消息發出後,發送方會被屏蔽,等待RabbitMQ服務器的響應,然後才能發送下壹條消息。生產者的生產消息的吞吐量和性能將會大大降低。

使用確認機制,生產者將通道設置為確認模式。壹旦頻道進入確認模式,頻道上發布的所有消息將被分配壹個唯壹的ID(從1開始)。壹旦消息被傳遞到所有匹配的隊列,RabbitMQ將向生產者發送壹個確認(包括消息的唯壹deliveryTag和多個參數),這使得生產者知道消息已經被發送。

當multiple為真時,表示批量消息確認;為真時,表示小於等於返回的deliveryTag的消息id已被確認;當它為false時,意味著具有返回的deliveryTag的消息id的消息已經被確認。

有三種類型的確認機制。

1,同步確認

2.批量確認

3.異步確認

同步模式的效率非常低,因為每個消息都要等待確認才能被處理。

批量確認模式比同步模式效率高,但有壹個致命的缺陷。壹旦回復確認失敗,當前確認批次中的所有消息都會被重傳,導致消息重復。

異步模式是個不錯的選擇,不會有同步模式的阻塞問題,而且效率也很高,是個不錯的選擇。

卡夫卡介紹了壹個經紀人。Broker將消息確認給生產者和消費者,生產者將消息發送給broker,如果沒有收到broker的確認,可以選擇繼續發送。

生產者只要收到代理的確認響應,就可以保證消息在生產階段不會丟失。壹些消息隊列在長時間沒有收到確認響應後會自動重試。如果重試再次失敗,將通過返回值或異常通知用戶。

只要正確處理Broker的確認響應,就可以避免消息的丟失。

RocketMQ提供了三種發送消息的方式,即:

同步發送:生產者向代理發送消息,阻止當前線程等待代理響應並發送結果。

異步發送:Producer先構建壹個任務發送消息給broker,將任務提交給線程池,任務完成後回調壹個自定義的回調函數執行處理結果。

Oneway to send: Oneway只負責發送請求而不等待回復,Producer只負責發送請求而不處理響應結果。

在正常存儲階段下,只要代理正常運行,就不會出現消息丟失的問題,但是如果代理出現故障,比如進程死亡或者服務器宕機,消息仍然有可能丟失。

為了防止在存儲階段丟失消息量,可以將其持久化以防止異常情況(重啟、關閉、停機)。。。

RabbitMQ持久性有三個部分:

對於消息持久性,在傳遞時指定delivery_mode=2(1是非持久性的)。對於消息持久化,需要匹配隊列的持久化,只設置消息持久化。重啟後,隊列消失,然後消息會丟失。所以不設置隊列持久性而設置消息持久性沒有太大意義。

對於持久化,如果所有消息都設置為持久化,會影響寫的性能,所以可以選擇持久化可靠性要求高的消息。

然而,消息持久性不能完全避免消息丟失。

比如卸載過程中數據下去了,消息沒有及時同步到內存,也會丟失數據。這個問題可以通過引入鏡像隊列來解決。

鏡像隊列的功能:通過引入鏡像隊列,可以將隊列鏡像到集群中的其他代理節點上。如果集群中的壹個節點出現故障,隊列可以自動切換到鏡像中的另壹個節點,以確保服務的可用性。(更多細節這裏就不討論了。)

操作系統本身有壹層緩存,稱為頁面緩存。寫入磁盤文件時,系統會先將數據流寫入緩存。

Kafka收到消息後,也會先存儲在頁面緩存中,然後操作系統根據自己的策略刷盤或者通過fsync命令強制刷盤。如果系統掛起,PageCache中的數據將會丟失。也就是說,相應代理中的數據將會丟失。

加工思路

1,控制競選負責人的經紀人。如果壹個經紀人落後於原來的領導者太多,壹旦成為新的領導者,必然會造成消息的丟失。

2.控制消息只有寫入多份後才能提交,避免了上述1的問題。

1.將刷碟模式改為同步刷碟;

2.對於有多個節點的代理,有必要配置代理集群向至少兩個節點發送消息,然後向客戶端發送確認響應。這樣,當壹個經紀人倒下時,其他經紀人可以代替倒下的經紀人,不會有消息丟失。

消費階段很簡單。如果在網絡傳輸中丟失了,這條消息會繼續推送給消費者。在消費階段,我們只需要在業務邏輯處理完成後控制消費確認即可。

總結:對於消息的丟失,我們還可以利用本地消息表的思想,在消息產生時丟棄消息,通過定時將長時間沒有處理的消息推回隊列。

MQ中的消息傳輸可以大致分為以下三種類型:

1,最多壹次:最多壹次。消息傳遞時,最多傳遞壹次。不安全,可能會丟失數據。

2.至少壹次:至少壹次。郵件在傳遞時至少會傳遞壹次。換句話說,不允許丟失消息,但是允許少量的重復消息。

3.恰好壹次:恰好壹次。消息傳遞時,只傳遞壹次,不允許丟失或重復。這是最高境界。

大多數消息隊列至少有壹次,這意味著可以允許重復的消息。

我們消費者需要滿足冪等性,通常有以下幾種解決方案。

1,使用數據庫的唯壹性

根據業務情況,選擇業務中的唯壹值作為數據庫的唯壹鍵,創建新的運行列表,然後在同壹個事務中執行業務操作並插入運行列表數據。如果運行列表數據已經存在,則執行將失敗,以確保冪等性。也可以先查詢運行列表的數據,沒有數據再執行業務,插入運行列表的數據。但是要註意讀寫數據庫的延遲。

2.添加更新數據庫的前提條件。

3.給消息壹個唯壹的ID。

每條消息添加壹個唯壹的ID,重復消息的消耗借助數據庫的唯壹性,通過在方法1中添加壹個運行列表來處理。

  • 上一篇:腳本錯誤的控件錯誤
  • 下一篇:bch是什麽幣
  • copyright 2024編程學習大全網