刪除listeners和advertised.listeners中配置的註釋,可以根據需要配置連接的服務器的外部IP和端口號。在這裏,我演示了選擇本地本地主機和默認端口9092。
KafkaTemplate這個類包裝了壹個生產者,提供了壹種便捷的方式向Kafka的主題主題發送數據。
send()方法的源碼,KafkaTemplate類也重載了很多send()方法,有需要可以看看源碼。
通過KafkaTemplate模板類發送數據。
Kafka template.send(字符串主題,k鍵,v數據),第壹個參數是主題,第二個參數是發送的對象,第三個參數是發送的數據。通過@KafkaListener註釋配置用戶收聽主題。
Bootstrap-servers :kafka服務器地址(可以是多個)
Consumer.group-id:指定默認的組名。
如果未指定,將會報告。
1.最早:每個分區下有提交的抵銷時,從提交的抵銷開始消耗;沒有提交抵銷時,從零開始消耗。
2.最新:每個分區下有提交的抵銷時,從提交的抵銷開始消耗;當沒有提交的偏移量時,將消耗該分區下新生成的數據。
3.無:當提交的偏移量存在於主題的各個分區中時,消耗將在偏移量之後開始;只要分區沒有提交的偏移量,就會引發異常。
還必須配置該屬性,否則將會報告錯誤。
當Kafka用於發送和接收消息時,生產者需要序列化,消費者需要反序列化。因為byte[]是從網絡上傳輸的,生產者發送的真實消息內容只有經過反序列化才能得到。以便消息可以在網絡上傳輸。
Consumer.key-deserializer和consumer.value-deserializer是消費者鍵/值反序列化程序。
Producer.key-deserializer和producer.value-deserializer是生產者鍵/值的序列化。
StringDeserializer是壹個內置的字符串反序列化器。
StringSerializer是壹種內置的字符串序列化方法。
org的源碼包中也有很多類型的序列化和反序列化方法。阿帕奇。Kafka.com mon .序列化
若要自定義序列化方法,您需要實現接口序列化程序。
要自定義反序列化方法,需要實現接口反序列化器。
詳情請參考。
/shiru kai/article/details/82152172
這是卡夫卡的消費者的配置信息,每個消費者都會輸出這個配置信息。
訪問http://localhost:8080/kafka,可以看到控制臺打印出來的消息。