當前位置:編程學習大全網 - 編程軟體 - flink處理數據從kafka到另外壹個kafka

flink處理數據從kafka到另外壹個kafka

需求就是將流量數據(json格式)中某個接口數據抽取壹下。如:有個identityUri="yiyang/user/getById/13782" , 這裏的13782,是個userId,我們需要將其處理成 identityUri="yiyang/user/getById/{}"

實際上我們生產中是將二者接口使用的。先使用2,如果沒有匹配到,在使用1

這裏是演示flink kafka的用法,我們簡單使用正則處理

註意:kafka消費的方式是: kafkaConsumer.setStartFromGroupOffsets();

看下上面的啟動日誌,有這樣的信息:Resetting offset for partition yiyang-0 to offset 22.

我們另外啟動壹個程序,發送消息,並消費兩個topic中的數據

看下 ConsumeKafkaTest 中的日誌

在看下另外壹個服務(消費兩個topic數據)的日誌:

說明已經成功的把處理好的消息發送到另外壹個topic中了

關於數據處理,如果只是簡單的增加字段,減少字段,正則替換,也可以使用logstash工具

  • 上一篇:萊迪狗智能機器狗使用方法
  • 下一篇:labview公式節點程序問題 求1000以內的完數
  • copyright 2024編程學習大全網