當前位置:編程學習大全網 - 源碼下載 - 源代碼背後的邏輯

源代碼背後的邏輯

消息中間件是分布式系統的常用組件,具有廣泛的應用價值,如異步、解耦、削峰等。我們通常認為消息中間件是壹個可靠的組件——這裏所謂的可靠,是指只要我成功地將消息交付給消息中間件,消息就不會丟失,也就是消息肯定會被消費者成功消費至少壹次,這是消息中間件最基本的特性之壹,也就是我們常說的“至少壹次”,也就是消息至少會被成功消費壹次。

比如壹個消息M被發送到消息中間件,消息被傳遞到消費程序A,A接收到消息後消費,但是程序在消費中途重啟。此時,該消息未被標記為消費成功,該消息將繼續發送給消費者,直到其消費成功。

但是,由於這種可靠的特性,消息可能會被傳遞多次。比如,就是這個例子。程序A收到這條消息M,完成消費邏輯後,正要通知消息中間件“我已經消費成功”,程序重啟。所以對於消息中間件來說,消息還沒有被成功消費,所以會繼續傳遞。此時對於應用A來說,似乎消息明顯消費成功了,但是消息中間件還在反復傳遞。

在RockectMQ的場景中,重復傳遞具有相同messageId的消息。

基於消息的交付可靠性(消息不丟失)是更高的優先級,所以消息不重的任務會轉移到應用自實現上,這也是RocketMQ的文檔強調消費邏輯需要冪等的原因。其實背後的邏輯是:不丟失和不重是矛盾的(分布式場景下),但是消息復制有解決方案,消息丟失很麻煩。

例如,假設我們業務的消息消費邏輯是:插入壹個訂單表的數據,然後更新庫存:

為了實現消息的冪等性,我們可以采用這樣的方案:

很多時候這確實能起到很好的作用,但是在並發的場景下,還是會出現問題。

假設所有消耗的代碼加起來是1秒,並且有在1秒內到達的重復消息(假設100毫秒)(比如生產者重發快,代理重啟等。),那麽很有可能上面的重復代碼中數據仍然是空的(因為最後壹條消息還沒有消費完,訂單狀態還沒有成功更新)。

然後會穿透check屏障,最終導致重復的消息消費邏輯進入非冪等安全業務代碼,從而導致重復消費問題(如主鍵沖突拋出異常、存貨被重復扣款但未釋放等。)

為了解決上述並發場景中的消息等冪問題,壹個理想的解決方案是打開事務,將select語句更改為select for update語句,並鎖定記錄。

但是這種消耗的邏輯會因為事務包的引入,導致整個消息的消耗時間變長,並發性降低。

當然還有其他更高級的解決方案,比如更新訂單狀態采取樂觀鎖,如果更新失敗,消息會被重新消費。但是,對於特定的業務場景,這需要更復雜和詳細的代碼開發和庫表設計,這超出了本文的範圍。

但是,無論是select for update的解決方案還是樂觀鎖的解決方案,實際上都是基於業務表本身,這無疑增加了業務開發的復雜性。業務系統中很大壹部分請求處理依賴於MQ。如果每個消費者邏輯本身都需要基於業務本身來開發,那就是壹個繁瑣的工作量。本文希望探索壹種通用的消息冪等處理方法,從而抽象出某些工具類來應用於各種業務場景。

在消息中間件中,有壹個交付語義的概念,這個語義中有壹個叫做“恰好壹次”,即消息壹定會被成功消費,而且只消費壹次。以下是阿裏雲對恰好壹次的解釋:

在業務消息的冪等處理領域,我們可以認為業務消息的代碼壹定會執行並且只執行壹次,那麽我們可以認為正好是壹次。

然而,在分布式場景中找到通用的解決方案幾乎是不可能的。但如果是針對基於數據庫事務的消費邏輯,其實是可行的。

假設我們業務的消息消費邏輯是:更新MySQL數據庫中壹個訂單表的狀態:

要達到壹次精確,即消息只消費壹次(且必須消費壹次),我們可以這樣做:在這個數據庫中添加壹個消息消費記錄表,將消息插入到這個表中,在同壹個事務中提交原始訂單更新連同這個插入的動作,這樣可以保證消息只消費壹次。

1,開通交易

2.插入消息表(處理主鍵沖突的問題)

3.更新訂單表(原始消耗邏輯)

4.提交交易

描述:

1.此時,如果消息消費成功,事務提交,則消息表插入成功。此時,即使RocketMQ還沒有收到消費站點的更新並再次發送,消息插入也會被認為已經被消費,然後直接更新消費站點。這確保了我們的消費者代碼只被執行壹次。2.如果事務提交前服務掛起(如重啟),本地事務不執行,訂單不更新,消息表插入不成功;對於RocketMQ服務器,消費站點還沒有更新,所以消息會繼續傳遞。交付後發現消息成功插入消息表,可以繼續消費。這確保了消息不會丟失。

事實上,阿裏雲ONS的恰好壹次語義實現類似於這種基於數據庫事務特性的方案。更多詳情請參考:/document _ detail/102777 . html。

基於這種方法,它確實能夠擴展到不同的應用場景,因為它的實現方案與具體的業務本身無關——它依賴於壹個消息表。

但它有其局限性。

1.消息的消費邏輯必須依賴於關系數據庫事務。如果在消費過程中修改了其他數據,比如Redis這種不支持事務特征的數據源,這些數據就無法回滾。

2.數據庫中的數據必須在壹個數據庫中,跨數據庫無法解決。

註意:在業務中,消息表的設計不應該用消息ID來標識,而應該用業務的業務主鍵來標識,這樣處理生產者的重發更合理。阿裏雲上的消息去重只是RocketMQ的messageId,在生產者出於某種原因(比如上遊多次請求交易)手動重傳的場景下,不會起到去重/冪等的效果(因為消息Id不壹樣)。

如上所述,這種方式的恰好壹次的語義實現實際上有很多局限性,使得這種方案基本上不值得廣泛應用。而且因為是基於事務的,可能會導致表鎖定時間過長等性能問題。

舉個例子,我們以壹個訂單應用比較常見的消息為例。可以有以下步驟(以下統稱為步驟X):

1,檢查庫存(RPC)

2.鎖定庫存(RPC)

3.打開事務並插入訂單表(MySQL)

4.調用其他壹些下遊服務(RPC)

5.更新訂單狀態

6.提交事務(MySQL)

在這種情況下,如果我們采用消息表+本地事務的實現模式,那麽消息消費過程中的很多子流程都不支持回滾,這意味著即使我們添加了事務,其背後的操作實際上也不是原子的。怎麽說呢?也就是說,有可能是在第壹家店進行第二步鎖定庫存的時候,服務重新啟動了。此時,庫存實際上被鎖定在另壹個服務中,無法回滾。當然,消息還會被再次傳遞,而且要保證消息至少能被消費壹次。換句話說,鎖庫存本身的RPC接口仍然需要支持冪等性。

再者,如果在這種耗時的長鏈場景中加入事務包,系統的並發性會大大降低。因此,壹般來說,我們在這個場景中處理消息重復數據刪除的方法仍然會使用開頭提到的業務自己實現重復數據刪除邏輯的方式,例如在前面添加select for update,或者使用樂觀鎖定。

那麽我們有沒有辦法提煉出壹個通用的方案,可以兼顧輕量化、通用性和高性能?

壹種想法是將上述步驟分解成幾個不同的子消息,例如:

1.庫存系統消耗A:檢查庫存並鎖定,向訂單服務發送消息B。

2.訂單系統消費消息B:插入訂單表(MySQL),發送消息C給自己(下遊系統)消費。

3.下遊系統消費消息c:處理部分邏輯,向訂單系統發送消息d。

4.訂單系統消耗消息D:更新訂單狀態

註意:以上步驟需要保證本地事務和消息是壹個事務(至少是最終的壹致性),這涉及到分布式事務消息相關的話題,不在本文討論。

可以看出,這種處理方式會使操作的每壹步更加原子化,原子意味著小事務,小事務意味著使用消息表+事務的方案是可行的。

但是,這太復雜了!這就把壹個原本連續的代碼邏輯分割成多個系統,進行多種消息交互!這不如在業務代碼級別鎖定實現好。

上述消息表+本地事務的方案之所以有其局限性和並發缺點,是因為它依賴於關系數據庫的事務,必須將事務包裝在整個消息消費過程中。

如果我們可以在不依賴事務的情況下實現消息去重,那麽該方案可以擴展到更復雜的場景,如RPC、跨庫等。

例如,我們仍然使用消息表,但不依賴於事務,而是為消息表增加了消費狀態。我們能解決問題嗎?

67_1.png

以上是去交易後的消息冪等方案的流程。可以看出,這個方案是非事務性的,但是消息表本身的狀態是有區別的:消費進行中,消費完成。只有消費完成的消息才是冪等的。對於已經消費的消息,重復的消息將觸發延遲消費(在RocketMQ的場景中,它將被發送到RETRY TOPIC)。觸發延遲消費的原因是在並發場景下第壹條消息未完成的過程中,控制第二條消息不丟失(如果直接冪等,則消息會丟失(如果不消費相同的消息id),因為妳已經完成了第二條消息。

將不詳細描述上述過程。下面這篇文章有github源代碼的地址。讀者可以參考實現的源代碼。在這裏,我們回過頭來看看我們當初想解決的問題是否已經解決了:

1.消息已經消費成功,第二條消息將直接冪等(消費成功)。

2.並發場景下的消息仍然可以滿足不會出現消息重復的問題,即消息會穿透冪等擋板。

3.支持上遊業務生產者重復轉發業務消息的冪等問題。

第壹個問題顯然已經解決了,這裏就不討論了。

第二個問題是怎麽解決的?它主要由插入消息表的動作控制。假設我們使用MySQL作為消息表的存儲介質(設置消息的唯壹ID為主鍵),那麽只有壹條消息的插入動作會成功,後續的消息插入會因為主鍵的沖突而失敗,導致延遲消費的分支,然後延遲消費就成為上面第壹種場景的問題。

至於第三個問題,只要我們設計了支持業務主鍵(如訂單號、請求流水號等)的重復消息鍵。),而不僅僅是messageId。所以不是問題。

如果細心的讀者可能會發現,這裏其實存在壹個邏輯漏洞,問題出在上面提到的三個問題中的第二個問題(並發場景)。在並發場景下,我們依靠消息狀態做並發控制,讓第二條消息的重復消息繼續延遲消耗(重試)。但是如果1消息由於壹些異常原因(如機器重啟、外部異常導致消費失敗)消費不成功怎麽辦?也就是說,此時的延遲消費實際上看到的是每次消費的狀態,最終消費會被視為消費失敗,交付給死信話題(RocketMQ默認可以重復消費16次)。

有這樣的顧慮是對的!對此,我們的解決方案是,插入的消息表必須有壹個最長的消耗過期時間,比如10分鐘,這意味著如果壹個消息的消耗超過10分鐘,就需要從消息表中刪除它(程序需要自己實現)。所以最終,這條信息的流程將會是這樣的:

67_2.png

實際上,我們的方案沒有事務,只需要壹個中心存儲介質,所以自然可以選擇更靈活的存儲介質,比如Redis。使用Redis有兩個優點:

1,性能損失更低。

2.上面說的超時可以直接用Redis本身的ttl來實現。

當然,Redis存儲數據的可靠性和壹致性不如MySQL,需要用戶選擇。

RocketMQ的上述方案的Java實現已經以開源的方式放入Github。具體使用文檔請參考/jaskey/rocketmqdeduplistener。

以下只是使用Redis消除Readme中重復的壹個示例,以顯示如果在業務中使用該工具,添加消息來消除冪等性是多麽容易:

以上大部分代碼都是原RocketMQ的必備代碼。唯壹需要修改的是創建壹個經過重復數據刪除的DedupConcurrentListener示例,其中指出了您的消費邏輯和重復的業務密鑰(默認為messageId)。

更多細節請參考Github上的說明。

目前看來,該方案相當完善,所有消息都可以快速訪問和復制,並且與具體業務實現完全解耦。那麽這是完成所有任務的完美方式嗎?

可惜不是。原因很簡單:因為消息必須至少成功消費壹次,所以有可能消息在消費過程中失敗,並觸發消息重試。或者用上面的順序處理x:

1,檢查庫存(RPC)

2.鎖定庫存(RPC)

3.打開事務並插入訂單表(MySQL)

4.調用其他壹些下遊服務(RPC)

5.更新訂單狀態

6.提交事務(MySQL)

當消息在步驟3中被使用時,我們假設MySQL異常導致了失敗並觸發了消息重試。因為我們在重試之前會刪除冪等表的記錄,消息在重試的時候會重新輸入消費代碼,所以會再次執行步驟1和步驟2。如果步驟2本身不是冪等的,那麽業務消息消費仍然不是完全冪等的。

那麽既然這個不能完全完成消息冪等性,那麽值多少呢?價值會很大!雖然這不是解決消息冪等性的銀彈(事實上,在軟件工程領域基本上沒有銀彈),但可以通過方便的手段來解決:

1.代理、負載均衡等各種原因導致的重復消息重發。

2.各種上遊生產者造成的業務級消息重復。

3.重復消息並發消耗的控制窗口問題。即使重復,它也不可能同時進入消費邏輯。

也就是說,使用這種方法可以保證在正常消費邏輯場景下(無異常,無異常退出),消息的冪等工作可以全部解決,無論是業務重復還是rocketmq特性導致的重復。

事實上,這已經解決了99%的消息重復的問題。畢竟肯定有幾個不正常的場景。那麽,如果要處理非正常場景下的等冪問題,可以做以下工作來降低問題率:

1.回滾消息使用失敗。如果把消息消費本身的失敗帶回滾動機制,那麽消息重試自然沒有副作用。

2.消費者應該做優雅的退出處理。這是為了避免消息消耗到壹半時程序退出而導致的消息重試。

3.對於壹些不能冪等的操作,至少停止消費,報警。比如在鎖定庫存的操作中,如果統壹業務流程成功鎖定壹次庫存,就會再次觸發庫存鎖定。如果不能做冪等處理,至少消息消費會觸發異常(比如主鍵沖突導致的非正常消費)。

4.在#3的前提下,做好對消息消耗的監控。當發現消息重試連續失敗時,手動做好#1的回滾工作,這樣下壹次消費重試才會成功。

  • 上一篇:簡單的光譜排版源代碼
  • 下一篇:遠程診斷、遠程手術……醫療資源配置還需5G助力
  • copyright 2024編程學習大全網