當前位置:編程學習大全網 - 源碼下載 - flink的背壓問題產生原因和解決方法

flink的背壓問題產生原因和解決方法

最近flink job出現了背壓的問題, 後果是導致了checkpoint的生成超時, 影響了flink job的運行.

定位問題:

如下圖:

1) flink的checkpoint生成超時, 失敗:

2) 查看jobmanager日誌,定位問題:

3) 找大神幫忙定位問題, 原來是出現了背壓的問題,? 緩沖區的數據處理不過來,barrier流動慢,導致checkpoint生成時間長, 出現超時的現象. (checkpoint超時時間設置了30分鐘)

下圖是背壓過高, input 和 output緩沖區都占滿的情況

4) 背壓的情況也可以在flink後臺的job的JobGraph中查看

下面說說flink感應反壓的過程:

下面這張圖簡單展示了兩個 Task 之間的數據傳輸以及 Flink 如何感知到反壓的:

記錄“A”進入了 Flink 並且被 Task 1 處理。(這裏省略了 Netty 接收、反序列化等過程)

記錄被序列化到 buffer 中。

該 buffer 被發送到 Task 2,然後 Task 2 從這個 buffer 中讀出記錄。

註意 : 記錄能被 Flink 處理的前提是,必須有空閑可用的 Buffer。

結合上面兩張圖看:Task 1 在輸出端有壹個相關聯的 LocalBufferPool(稱緩沖池1),Task 2 在輸入端也有壹個相關聯的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化並發送該 buffer。

這裏我們需要註意兩個場景:

本地傳輸:如果 Task 1 和 Task 2 運行在同壹個 worker 節點(TaskManager),該 buffer 可以直接交給下壹個 Task。壹旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢,那麽 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1 的降速。

遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那麽 buffer 會在發送到網絡(TCP Channel)後被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然後拷貝網絡中的數據到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制來保證不往網絡中寫入太多數據(後面會說)。如果網絡中的數據(Netty輸出緩沖中的字節數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入數據。這保證了網絡中不會有太多的數據。如果接收端停止消費網絡中的數據(由於接收端緩沖池沒有可用 buffer),網絡中的緩沖數據就會堆積,那麽發送端也會暫停發送。另外,這會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求 buffer,阻塞了 writer 往 ResultSubPartition 寫數據。

這種固定大小緩沖池就像阻塞隊列壹樣,保證了 Flink 有壹套健壯的反壓機制,使得 Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline。

解決辦法:

1)? 首先說壹下flink原來的JobGraph, 如下圖,? 產生背壓的是中間的算子,?

2) 背壓是什麽

如果您看到任務的背壓警告(例如High),這意味著它生成的數據比下遊算子可以消耗的速度快。下遊工作流程中的記錄(例如從源到匯)和背壓沿著相反的方向傳播到流上方。

以壹個簡單的Source -> Sink工作為例。如果您看到警告Source,這意味著Sink消耗數據的速度比Source生成速度慢。Sink正在向上遊算子施加壓力Source。

可以得出:? 第三個算子的處理數據速度比第二個算子生成數據的速度,? 明顯的解決方法:? 提高第三個算子的並發度,? 問題又出現了:? 並發度要上調到多少呢?

3) 第壹次上調, 從原來的10並發 上調到 40?

觀察緩存池對比的情況:?

並發是10的buffer情況: (背壓的情況比較嚴重, 曲線持續性地達到峰值, 會導致資源占光)

? 並發是40的buffer情況:(有了比較大的改善, 但是還是存在背壓的問題, 因為曲線有達到頂峰的時候)

4)? 從網上了解到flink的並發度的優化策略後, 有了壹個比較好的解決方法, 把第三個算子的並行度設置成100, 與第二個算子的並發度壹致:

這樣做的好處是, ?flink會自動將條件合適的算子鏈化, 形成算子鏈,

滿足上下遊形成算子鏈的條件比較苛刻的:

1.上下遊的並行度壹致

2.下遊節點的入度為1 (也就是說下遊節點沒有來自其他節點的輸入)

3.上下遊節點都在同壹個 slot group 中(下面會解釋 slot group)

4.下遊節點的 chain 策略為 ALWAYS(可以與上下遊鏈接,map、flatmap、filter等默認是ALWAYS)

5.上遊節點的 chain 策略為 ALWAYS 或 HEAD(只能與下遊鏈接,不能與上遊鏈接,Source默認是HEAD)

6.兩個節點間數據分區方式是 forward(參考 理解數據流的分區 )

7.用戶沒有禁用 chain

算子鏈的好處: 鏈化成算子鏈可以減少線程與線程間的切換和數據緩沖的開銷,並在降低延遲的同時提高整體吞吐量。

flink還有另外壹種優化手段就是 槽***享 ,

flink默認開啟slot***享(所有operator都在default***享組)

默認情況下,Flink 允許同壹個job裏的不同的子任務可以***享同壹個slot,即使它們是不同任務的子任務但是可以分配到同壹個slot上。?這樣的結果是,壹個 slot 可以保存整個管道pipeline, 換句話說,? flink會安排並行度壹樣的算子子任務在同壹個槽裏運行

意思是每壹個taskmanager的slot裏面都可以運行上述的整個完整的流式任務, 減少了數據在不同機器不同分區之間的傳輸損耗, (如果算子之間的並發度不同, 會造成數據分區的重新分配(rebalance, shuffle, hash....等等), 就會導致數據需要在不同機器之間傳輸)

優化後的JobGraph, 如下圖,?

再次觀察緩存池對比的情況:?

並發是100的buffer情況: (背壓的情況已經大大緩解)

checkpoint生成的時間沒有出現超時的情況

  • 上一篇:十種常用的python圖像處理工具
  • 下一篇:彩色信紙寫作文
  • copyright 2024編程學習大全網