Apache Flink是壹個面向分布式數據流處理和批量數據處理的開源計算平臺,它能夠基於同壹個Flink運行時,提供支持流處理和批處理兩種類型應用的功能。
現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理壹般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理。
Flink從另壹個視角看待流處理和批處理,將二者統壹起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的; 批處理被作為壹種特殊的流處理,只是它的輸入數據流被定義為有界的。
Flink流處理特性:
Flink以層級式系統形式組件其軟件棧,不同層的棧建立在其下層基礎上,並且各層接受程序不同層的抽象形式。
1. 流、轉換、操作符
Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是壹個中間結果數據,而Transformation是壹個操作,它對壹個或多個輸入Stream進行計算處理,輸出壹個或多個結果Stream。
Flink程序被執行的時候,它會被映射為Streaming Dataflow。壹個Streaming Dataflow是由壹組Stream和Transformation Operator組成,它類似於壹個DAG圖,在啟動的時候從壹個或多個Source Operator開始,結束於壹個或多個Sink Operator。
2. 並行數據流
壹個Stream可以被分成多個Stream分區(Stream Partitions),壹個Operator可以被分成多個Operator Subtask,每壹個Operator Subtask是在不同的線程中獨立執行的。壹個Operator的並行度,等於Operator Subtask的個數,壹個Stream的並行度總是等於生成它的Operator的並行度。
One-to-one模式
比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是壹致的。
Redistribution模式
這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上遊的Subtask向下遊的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關系。
3.任務、操作符鏈
Flink分布式執行環境中,會將多個Operator Subtask串起來組成壹個Operator Chain,實際上就是壹個執行鏈,每個執行鏈會在TaskManager上壹個獨立的線程中執行。
4. 時間
處理Stream中的記錄時,記錄中通常會包含各種典型的時間字段:
Event Time:表示事件創建時間
Ingestion Time:表示事件進入到Flink Dataflow的時間
Processing Time:表示某個Operator對事件進行處理的本地系統時間
Flink使用WaterMark衡量時間的時間,WaterMark攜帶時間戳t,並被插入到stream中。
5. 窗口
Flink支持基於時間窗口操作,也支持基於數據的窗口操作:
窗口分類:
Tumbling/Sliding Time Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over carCnt
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Tumbling/Sliding Count Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the carCnt sum
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
自定義窗口
基本操作:
6. 容錯
Barrier機制:
對齊:
當Operator接收到多個輸入的數據流時,需要在Snapshot Barrier中對數據流進行排列對齊:
基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存壹部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的壹個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了壹個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
CheckPoint:
Snapshot並不僅僅是對數據流做了壹個狀態的Checkpoint,它也包含了壹個Operator內部所持有的狀態,這樣才能夠在保證在流處理系統失敗時能夠正確地恢復數據流處理。狀態包含兩種:
7. 調度
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將壹個JobGraph轉換映射為壹個ExecutionGraph,ExecutionGraph是JobGraph的並行表示,也就是實際JobManager調度壹個Job在TaskManager上運行的邏輯視圖。
物理上進行調度,基於資源的分配與使用的壹個例子:
8. 叠代
機器學習和圖計算應用,都會使用到叠代計算,Flink通過在叠代Operator中定義Step函數來實現叠代算法,這種叠代算法包括Iterate和Delta Iterate兩種類型。
Iterate
Iterate Operator是壹種簡單的叠代形式:每壹輪叠代,Step函數的輸入或者是輸入的整個數據集,或者是上壹輪叠代的結果,通過該輪叠代計算出下壹輪計算所需要的輸入(也稱為Next Partial Solution),滿足叠代的終止條件後,會輸出最終叠代結果。
流程偽代碼:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
Delta Iterate
Delta Iterate Operator實現了增量叠代。
流程偽代碼:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
最小值傳播:
9. Back Pressure監控
流處理系統中,當下遊Operator處理速度跟不上的情況,如果下遊Operator能夠將自己處理狀態傳播給上遊Operator,使得上遊Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現有流處理系統存在的問題。
Flink Web界面上提供了對運行Job的Backpressure行為的監控,它通過使用Sampling線程對正在運行的Task進行堆棧跟蹤采樣來實現。
默認情況下,JobManager會每間隔50ms觸發對壹個Job的每個Task依次進行100次堆棧跟蹤調用,過計算得到壹個比值,例如,radio=0.01,表示100次中僅有1次方法調用阻塞。Flink目前定義了如下Backpressure狀態:
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
1. Table
Flink的Table API實現了使用類SQL進行流和批處理。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/table_api.html
2. CEP
Flink的CEP(Complex Event Processing)支持在流中發現復雜的事件模式,快速篩選用戶感興趣的數據。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/programming-model.html#next-steps
3. Gelly
Gelly是Flink提供的圖計算API,提供了簡化開發和構建圖計算分析應用的接口。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/index.html
4. FlinkML
FlinkML是Flink提供的機器學習庫,提供了可擴展的機器學習算法、簡潔的API和工具簡化機器學習系統的開發。
詳情參考:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/index.html
明天更新部署與測試
本文僅代表個人的觀點,如果闡述的不好歡迎大家指導糾正,在此感激不盡。