setup-pyflink-virtual-env.sh 腳本會自動安裝 miniconda 並在當前目錄下創建虛擬環境文件夾 venv,然後自動安裝 apache-flink 模塊後壓縮虛擬環境為 venv.zip。
Pyflink 腳本開發請認真參考 官方文檔v1.11 。
Pyflink 正在快速發展的階段,每次版本更新都會增加很多新的特性,同時會取消舊特性,因此務必確保開發時所參考的文檔與本地 pyflink 版本壹致。
Flink 中的 Jar 包是 connector 的擴展,允許在 flink 腳本中連接和使用各種數據存儲工具,包括:
Pyflink 默認支持有限的幾種 jar 包,如有特殊需要(例如以 json 格式來消費 kafka 裏的數據),需要手動指定腳本依賴的 jar 包所在的路徑。
已知有 3 種方式來指定 jar 包依賴。
在腳本中完成 TableEnvironment 的初始化後,添加下面的腳本以指定 jar 包路徑(多個 jar 包的路徑用 ; 隔開)。
註意,本地環境的 jar 包路徑與線上環境的 jar 包路徑可能不同,因此每次提交到線上時還需要修改腳本中的路徑為對應的路徑。
找到 pyflink 模塊的安裝路徑,以及對應的 lib 目錄。
然後使用 cp 命令復制 jar 包到 lib 目錄下即可。
這種方法壹次運行,壹勞永逸,比較適合本地開發。
這種方式不適用於本地開發,而是用於提交到集群上時指定 jar 包的路徑,但為了較為系統地介紹 jar 包依賴的指定方式,故在此介紹。
命令如下:
通過 -j 參數來指定壹個 jar 包路徑,多個 jar 包則使用多個 -j 。
Flink 支持使用 local-singleJVM 模式 來進行本地測試,即只需簡單的執行 Python xxxx.py 命令,pyflink 就會默認啟動壹個 local-singleJVM 的 flink 環境來執行作業。
在運行過程中,可以另起終端,輸入 jps 來查看 java 進程。