當前位置:編程學習大全網 - 源碼下載 - RabbitMQ消息過濾的壹個思路

RabbitMQ消息過濾的壹個思路

生產者 Producer 向 壹個 隊列發送消息,並且為消息打上不同的 Tag。假設這個隊列有 3 個消費者:Consumer #[1:3],Consumer #1 只想消費 tag1 標記的消息,Consumer #2 只想消費 tag2 標記的消息,Consumer #3 只想消費 tag3 標記的消息。

生產者 publish 消息時,將 Tag 保存在 Map<String, Object> 類型的 header 字段,作為構建 AMQP.BasicProperties 參數

消費者如何告知 Broker 只消費特定 Tag?

假設 Consumer #1 只希望消費帶 tag1 標記的消息,那麽 Consumer #1 可以在向 Broker 請求 Basic.Consume 指令時,捎帶自己期望的 Tag 字符串。Client 在具體生成 consumerTag 時可以用 Tag 關鍵字加上隨機字符串(避免 consumerTag 重復):

消費者通過 Basic.Consume 指令來監聽隊列的消息,這些消費者信息服務端是如何存儲的?

保存在隊列主進程(Pid)的 state 中(具體調試可以通過 sys:get_state(Pid) )

並且隊列進程在初始化時,會進行 consumers 初始化:

consumers 字段實際由 priority_queue:new() 初始化。當有新的 consumer 註冊到隊列進程,那麽會調用 rabbit_queue_consumers 模塊的 add_consumer 方法來向 priority_queue 添加壹個元素;同理當有 consumer下線時,最終也會調用該模塊的 remove_consumer 方法。 priority_queue 完整實現見 附二

Broker 向 Consumer 投遞消息時,底層是通過 rabbit_amqqueue_process 調用 rabbit_queue_consumers 模塊的 deliver 方法。默認采用

從 priority_queue 中獲取壹個 QEntry( {ChPid, Consumer} ),然後通過 FetchFun 從隊列中獲取消息,發送到 ChPid(Channel 進程)

在 consumers 不為空的情況下,通過 FetchFun 獲取消息,此時可以獲取該消息的 header,取出 Tag 值(如果消息打了 Tag 標記),然後通過 priority_queue 的 filter/2 方法

在 Pred 實現中,我們可以判斷當前消息 Tag 值是否被包含在 consumerTag 中,從而可以過濾出消費特定 tag 的consumers,最後向這些 consumers 中的壹個發送 Message 消息。

附壹 (隊列進程 state 中的 consumers 信息例子)

附二 (priority_queue 模塊實現

rabbit_common )

註 :上述思路建議在測試環境測試,考慮到有可能出現的性能問題,作為壹個調研也會有很多工作要做,整個過程會涉及 RabbitMQ 服務端源碼改造、編譯、打包( rabbitmq-public-umbrella )以及客戶端的相關改造,如果能實際嘗試下,也會有不小的收獲。

  • 上一篇:幾種富文本編輯器Editor比較
  • 下一篇:本草綱目讀後感
  • copyright 2024編程學習大全網