1、主線程: 由 KafkaProducer 創建消息,通過 攔截器、序列化器和分區器 後 緩存到消息累加器 (RecordAccumulator,也稱消息收集器)中。
2、Sender 線程: 從 RecordAccumulator 中 獲取消息,發送 到 Kafka
壹、攔截器: 發送前準備: 過濾、修改 消息,發送回調前: 統計
二、序列化器:對象 轉換成 字節數組 發送給 Kafka
三、分區器:根據 key 計算 partition
1、作用 發送前準備: 過濾、修改 消息,發送回調前: 統計
2、實現 org.apache.kafka.clients.producer. ProducerInterceptor 接口,包含3個方法:
2、何時KafkaProducer調攔截器:
1)序列化 和 計算分區前 調 攔截器 onSend() 修改(壹般不修改topic、key 和 partition等)
2) 消息被 應答前 (Acknowledgement)或 發送失敗 時調 攔截器onAcknowledgement() ,優先於用戶設定Callback前執行。
ps:運行在 Producer I/O線程中,實現越簡單越好,否則影響消息發送速度
3)close() 關閉攔截器時清理
1、作用:對象 轉換成 字節數組 發送給 Kafka。 消費者 用 反序列 化器轉成對象(對應序列化器)
2、實現 org.apache.kafka.common.serialization.Serializer 接口,3個方法:配置、序列化、關閉
configure(): 創建 KafkaProducer 實例時調, 確定編碼類型 。
serialize:編解碼 ,如幾種序列化器都無法滿足,可用 Avro、JSON、Thrift、ProtoBuf 和 Protostuff 等通用序列化工具或自定義
1、作用:根據 key 計算 partition 。如果=指定partition 字段,不需分區器
2、實現: 默認分區器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,實現 org.apache.kafka.clients.producer.Partitioner 接口,2個方法
3、partition() :哈希計算分區號 , 參數: 主題、鍵、序列化後鍵、值、序列化後值,集群元數據。默認分區器 DefaultPartitioner 中, key? null,輪詢發往各個分區
4、close() : 關閉分區器回收資源, 空方法 (默認分區器中)。
5、自定義分區器 ,實現某系列key都發到 同壹分區 有序消費, 實現Partitioner 接口。
/s/C6dfvzFkNDYgiNeZ4eWPBQ