當前位置:編程學習大全網 - 源碼下載 - flink sql 知其所以然(十三):流 join問題解決

flink sql 知其所以然(十三):流 join問題解決

本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:

廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麽幫助:

書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。

本文介紹怎麽使用 flink sql interval join 解決這些問題。

flink sql 知其所以然(十二):流 join 很難嘛?(上)

看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。

場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。

來壹波輸入數據:

曝光數據:

點擊數據:

預期輸出數據如下:

上節的 flink sql regular join 解決方案如下:

上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麽產生了 retract 流,從而導致重復寫入 kafka。

對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麽能不能讓左流強行等待右流壹段時間,實在等不到在數據關聯不到的數據即可。

當當當!!!

本文的 flink sql interval join 登場,它就能等。

大家先通過下面這句話和圖簡單了解壹下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。

interval join 就是用壹個流的數據去關聯另壹個流的壹段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>

來看看上述案例的 flink sql interval join sql 怎麽寫:

這裏設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鐘之內的數據進行關聯。

運行結果如下:

如上就是我們期望的正確結果了。

flink web ui 算子圖如下:

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>

那麽此時妳可能有壹個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?

博主帶妳們來定位到具體的實現源碼。先看壹下 transformations。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>

可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay 。

其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。

TimeIntervalJoin

下面詳細給大家解釋壹下。

join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另壹條流的時間已經推進到當前這條數據在也不可能 join 到另壹條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。

舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來壹條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]

以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):

上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。

小夥伴萌在使用 interval join 需要註意的兩點事項:

本文主要介紹了 flink sql interval 是怎麽避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望妳讀完本文之後能了解到:

  • 上一篇:人民幣成全球第四位支付貨幣,有哪些重大意義?
  • 下一篇:基於JAVA 的汽車租賃系統~~源代碼~謝謝~高分~~!
  • copyright 2024編程學習大全網