當前位置:編程學習大全網 - 源碼下載 - RabbitMQ,RocketMQ,Kafka 事務性,消息丟失和重復發送處理策略

RabbitMQ,RocketMQ,Kafka 事務性,消息丟失和重復發送處理策略

我們的服務器從單機發展到擁有多臺機器的分布式系統,各個系統之前需要借助於網絡進行通信,原有單機中相對可靠的方法調用以及進程間通信方式已經沒有辦法使用,同時網絡環境也是不穩定的,造成了我們多個機器之間的數據同步問題,這就是典型的分布式事務問題。

在分布式事務中事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位於不同的分布式系統的不同節點之上。分布式事務就是要保證不同節點之間的數據壹致性。

1、2PC(二階段提交)方案 - 強壹致性

2、3PC(三階段提交)方案

3、TCC (Try-Confirm-Cancel)事務 - 最終壹致性

4、Saga事務 - 最終壹致性

5、本地消息表 - 最終壹致性

6、MQ事務 - 最終壹致性

消息的生產方,除了維護自己的業務邏輯之外,同時需要維護壹個消息表。這個消息表裏面記錄的就是需要同步到別的服務的信息,當然這個消息表,每個消息都有壹個狀態值,來標識這個消息有沒有被成功處理。

發送放的業務邏輯以及消息表中數據的插入將在壹個事務中完成,這樣避免了業務處理成功 + 事務消息發送失敗,或業務處理失敗 + 事務消息發送成功,這個問題。

舉個栗子:

我們假定目前有兩個服務,訂單服務,購物車服務,用戶在購物車中對幾個商品進行合並下單,之後需要情況購物車中剛剛已經下單的商品信息。

1、消息的生產方也就是訂單服務,完成了自己的邏輯(對商品進行下單操作)然後把這個消息通過 mq 發送到需要進行數據同步的其他服務中,也就是我們栗子中的購物車服務。

2、其他服務(購物車服務)會監聽這個隊列;

1、如果收到這個消息,並且數據同步執行成功了,當然這也是壹個本地事務,就通過 mq 回復消息的生產方(訂單服務)消息已經處理了,然後生產方就能標識本次事務已經結束。如果是壹個業務上的錯誤,就回復消息的生產方,需要進行數據回滾了。

2、很久沒收到這個消息,這種情況是不會發生的,消息的發送方會有壹個定時的任務,會定時重試發送消息表中還沒有處理的消息;

3、消息的生產方(訂單服務)如果收到消息回執;

1、成功的話就修改本次消息已經處理完,也就是本次分布式事務的同步已經完成;

2、如果消息的結果是執行失敗,同時在本地回滾本次事務,標識消息已經處理完成;

3、如果消息丟失,也就是回執消息沒有收到,這種情況也不太會發生,消息的發送方(訂單服務)會有壹個定時的任務,定時重試發送消息表中還沒有處理的消息,下遊的服務需要做冪等,可能會收到多次重復的消息,如果壹個回復消息生產方中的某個回執信息丟失了,後面持續收到生產方的 mq 消息,然後再次回復消息的生產方回執信息,這樣總能保證發送者能成功收到回執,消息的生產方在接收回執消息的時候也要做到冪等性。

這裏有兩個很重要的操作:

1、服務器處理消息需要是冪等的,消息的生產方和接收方都需要做到冪等性;

2、發送放需要添加壹個定時器來遍歷重推未處理的消息,避免消息丟失,造成的事務執行斷裂。

該方案的優缺點

優點:

1、在設計層面上實現了消息數據的可靠性,不依賴消息中間件,弱化了對 mq 特性的依賴。

2、簡單,易於實現。

缺點:

主要是需要和業務數據綁定到壹起,耦合性比較高,使用相同的數據庫,會占用業務數據庫的壹些資源。

下面分析下幾種消息隊列對事務的支持

RocketMQ 中的事務,它解決的問題是,確保執行本地事務和發消息這兩個操作,要麽都成功,要麽都失敗。並且,RocketMQ 增加了壹個事務反查的機制,來盡量提高事務執行的成功率和數據壹致性。

主要是兩個方面,正常的事務提交和事務消息補償

正常的事務提交

1、發送消息(half消息),這個 half 消息和普通消息的區別,在事務提交 之前,對於消費者來說,這個消息是不可見的。

2、MQ SERVER寫入信息,並且返回響應的結果;

3、根據MQ SERVER響應的結果,決定是否執行本地事務,如果MQ SERVER寫入信息成功執行本地事務,否則不執行;

如果MQ SERVER沒有收到 Commit 或者 Rollback 的消息,這種情況就需要進行補償流程了

補償流程

1、MQ SERVER如果沒有收到來自消息發送方的 Commit 或者 Rollback 消息,就會向消息發送端也就是我們的服務器發起壹次查詢,查詢當前消息的狀態;

2、消息發送方收到對應的查詢請求,查詢事務的狀態,然後把狀態重新推送給MQ SERVER,MQ SERVER就能之後後續的流程了。

相比於本地消息表來處理分布式事務,MQ 事務是把原本應該在本地消息表中處理的邏輯放到了 MQ 中來完成。

Kafka 中的事務解決問題,確保在壹個事務中發送的多條信息,要麽都成功,要麽都失敗。也就是保證對多個分區寫入操作的原子性。

通過配合 Kafka 的冪等機制來實現 Kafka 的 Exactly Once,滿足了讀取-處理-寫入這種模式的應用程序。當然 Kafka 中的事務主要也是來處理這種模式的。

什麽是讀取-處理-寫入模式呢?

栗如:在流計算中,用 Kafka 作為數據源,並且將計算結果保存到 Kafka 這種場景下,數據從 Kafka 的某個主題中消費,在計算集群中計算,再把計算結果保存在 Kafka 的其他主題中。這個過程中,要保證每條消息只被處理壹次,這樣才能保證最終結果的成功。Kafka 事務的原子性就保證了,讀取和寫入的原子性,兩者要不壹起成功,要不就壹起失敗回滾。

這裏來分析下 Kafka 的事務是如何實現的

它的實現原理和 RocketMQ 的事務是差不多的,都是基於兩階段提交來實現的,在實現上可能更麻煩

先來介紹下事務協調者,為了解決分布式事務問題,Kafka 引入了事務協調者這個角色,負責在服務端協調整個事務。這個協調者並不是壹個獨立的進程,而是 Broker 進程的壹部分,協調者和分區壹樣通過選舉來保證自身的可用性。

Kafka 集群中也有壹個特殊的用於記錄事務日誌的主題,裏面記錄的都是事務的日誌。同時會有多個協調者的存在,每個協調者負責管理和使用事務日誌中的幾個分區。這樣能夠並行的執行事務,提高性能。

下面看下具體的流程

事務的提交

1、協調者設置事務的狀態為PrepareCommit,寫入到事務日誌中;

2、協調者在每個分區中寫入事務結束的標識,然後客戶端就能把之前過濾的未提交的事務消息放行給消費端進行消費了;

事務的回滾

1、協調者設置事務的狀態為PrepareAbort,寫入到事務日誌中;

2、協調者在每個分區中寫入事務回滾的標識,然後之前未提交的事務消息就能被丟棄了;

這裏引用壹下消息隊列高手課中的圖片

RabbitMQ 中事務解決的問題是確保生產者的消息到達MQ SERVER,這和其他 MQ 事務還是有點差別的,這裏也不展開討論了。

先來分析下壹條消息在 MQ 中流轉所經歷的階段。

生產階段 :生產者產生消息,通過網絡發送到 Broker 端。

存儲階段 :Broker 拿到消息,需要進行落盤,如果是集群版的 MQ 還需要同步數據到其他節點。

消費階段 :消費者在 Broker 端拉數據,通過網絡傳輸到達消費者端。

發生網絡丟包、網絡故障等這些會導致消息的丟失

在生產者發送消息之前,通過channel.txSelect開啟壹個事務,接著發送消息, 如果消息投遞 server 失敗,進行事務回滾channel.txRollback,然後重新發送, 如果 server 收到消息,就提交事務channel.txCommit

不過使用事務性能不好,這是同步操作,壹條消息發送之後會使發送端阻塞,以等待RabbitMQ Server的回應,之後才能繼續發送下壹條消息,生產者生產消息的吞吐量和性能都會大大降低。

使用確認機制,生產者將信道設置成 confirm 確認模式,壹旦信道進入 confirm 模式,所有在該信道上面發布的消息都會被指派壹個唯壹的ID(從1開始),壹旦消息被投遞到所有匹配的隊列之後,RabbitMQ 就會發送壹個確認(Basic.Ack)給生產者(包含消息的唯壹 deliveryTag 和 multiple 參數),這就使得生產者知曉消息已經正確到達了目的地了。

multiple 為 true 表示的是批量的消息確認,為 true 的時候,表示小於等於返回的 deliveryTag 的消息 id 都已經確認了,為 false 表示的是消息 id 為返回的 deliveryTag 的消息,已經確認了。

確認機制有三種類型

1、同步確認

2、批量確認

3、異步確認

同步模式的效率很低,因為每壹條消息度都需要等待確認好之後,才能處理下壹條;

批量確認模式相比同步模式效率是很高,不過有個致命的缺陷,壹旦回復確認失敗,當前確認批次的消息會全部重新發送,導致消息重復發送;

異步模式就是個很好的選擇了,不會有同步模式的阻塞問題,同時效率也很高,是個不錯的選擇。

Kafaka 中引入了壹個 broker。 broker 會對生產者和消費者進行消息的確認,生產者發送消息到 broker,如果沒有收到 broker 的確認就可以選擇繼續發送。

只要 Producer 收到了 Broker 的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送確認響應後,會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。

只要正確處理 Broker 的確認響應,就可以避免消息的丟失。

RocketMQ 提供了3種發送消息方式,分別是:

同步發送:Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。

異步發送:Producer 首先構建壹個向 broker 發送消息的任務,把該任務提交給線程池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果。

Oneway發送:Oneway 方式只負責發送請求,不等待應答,Producer 只負責把請求發出去,而不處理響應結果。

在存儲階段正常情況下,只要 Broker 在正常運行,就不會出現丟失消息的問題,但是如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。

防止在存儲階段消息額丟失,可以做持久化,防止異常情況(重啟,關閉,宕機)。。。

RabbitMQ 持久化中有三部分:

消息的持久化,在投遞時指定 delivery_mode=2(1是非持久化),消息的持久化,需要配合隊列的持久,只設置消息的持久化,重啟之後隊列消失,繼而消息也會丟失。所以如果只設置消息持久化而不設置隊列的持久化意義不大。

對於持久化,如果所有的消息都設置持久化,會影響寫入的性能,所以可以選擇對可靠性要求比較高的消息進行持久化處理。

不過消息持久化並不能百分之百避免消息的丟失

比如數據在落盤的過程中宕機了,消息還沒及時同步到內存中,這也是會丟數據的,這種問題可以通過引入鏡像隊列來解決。

鏡像隊列的作用:引入鏡像隊列,可已將隊列鏡像到集群中的其他 Broker 節點之上,如果集群中的壹個節點失效了,隊列能夠自動切換到鏡像中的另壹個節點上來保證服務的可用性。(更細節的這裏不展開討論了)

操作系統本身有壹層緩存,叫做 Page Cache,當往磁盤文件寫入的時候,系統會先將數據流寫入緩存中。

Kafka 收到消息後也會先存儲在也緩存中(Page Cache)中,之後由操作系統根據自己的策略進行刷盤或者通過 fsync 命令強制刷盤。如果系統掛掉,在 PageCache 中的數據就會丟失。也就是對應的 Broker 中的數據就會丟失了。

處理思路

1、控制競選分區 leader 的 Broker。如果壹個 Broker 落後原先的 Leader 太多,那麽它壹旦成為新的 Leader,必然會造成消息的丟失。

2、控制消息能夠被寫入到多個副本中才能提交,這樣避免上面的問題1。

1、將刷盤方式改成同步刷盤;

2、對於多個節點的 Broker,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。

消費階段就很簡單了,如果在網絡傳輸中丟失,這個消息之後還會持續的推送給消費者,在消費階段我們只需要控制在業務邏輯處理完成之後再去進行消費確認就行了。

總結:對於消息的丟失,也可以借助於本地消息表的思路,消息產生的時候進行消息的落盤,長時間未處理的消息,使用定時重推到隊列中。

消息在 MQ 中的傳遞,大致可以歸類為下面三種:

1、At most once: 至多壹次。消息在傳遞時,最多會被送達壹次。是不安全的,可能會丟數據。

2、At least once: 至少壹次。消息在傳遞時,至少會被送達壹次。也就是說,不允許丟消息,但是允許有少量重復消息出現。

3、Exactly once:恰好壹次。消息在傳遞時,只會被送達壹次,不允許丟失也不允許重復,這個是最高的等級。

大部分消息隊列滿足的都是At least once,也就是可以允許重復的消息出現。

我們消費者需要滿足冪等性,通常有下面幾種處理方案

1、利用數據庫的唯壹性

根據業務情況,選定業務中能夠判定唯壹的值作為數據庫的唯壹鍵,新建壹個流水表,然後執行業務操作和流水表數據的插入放在同壹事務中,如果流水表數據已經存在,那麽就執行失敗,借此保證冪等性。也可先查詢流水表的數據,沒有數據然後執行業務,插入流水表數據。不過需要註意,數據庫讀寫延遲的情況。

2、數據庫的更新增加前置條件

3、給消息帶上唯壹ID

每條消息加上唯壹ID,利用方法1中通過增加流水表,借助數據庫的唯壹性來處理重復消息的消費。

  • 上一篇:神奇的“兩點半選股法”,抓尾盤拉升牛股!
  • 下一篇:有哪些波段操作技巧?
  • copyright 2024編程學習大全網