當前位置:編程學習大全網 - 編程語言 - pyflink消費kafka-connect-jdbc消息(帶schema)

pyflink消費kafka-connect-jdbc消息(帶schema)

1、數據接入

通過kafka的restFul接口創建連接mysql的連接器並啟動。

{

"name": "mysql_stream_test",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"timestamp.column.name": "",

"incrementing.column.name": "ID",

"connection.password": "",

"validate.non.null": true,

"tasks.max": 1,

"batch.max.rows": 100,

"table.whitelist": "baseqx.test_demo",

"mode": "incrementing",

"topic.prefix": "mysql_",

"connection.user": "",

"poll.interval.ms": 5000,

"numeric.mapping": "best_fit",

"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true"

}

}

2.kafka-connect創建主題中的默認數據格式為

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消費帶schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema為字符串, 用 ROW 函數封裝 payload

ddlKafkaConn = """

create table sourceKafkaConn(

`scheam` STRING? comment 'kafkaConn每行模式',

`payload`? ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)? comment '行數據'

)comment '從kafkaConnect獲取帶模式的數據'

with(

'connector' = 'kafka',

'topic' = 'mysql_test_demo',

'properties.bootstrap.servers' = '192.168.113.11:9092',

'scan.startup.mode' = 'earliest-offset',

'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示讀取最早的消息 | latest-offset 表示讀取消息隊列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

CREATE TABLE sinkPrint WITH ('connector' = 'print')

LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

.insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.執行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.執行結果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,執行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

  • 上一篇:煎餅果子培訓哪裏比較好?壹般學費是多少?
  • 下一篇:少兒編程對於少兒來說有哪些益處呢?
  • copyright 2024編程學習大全網