當前位置:編程學習大全網 - 源碼下載 - 如何確定Kafka的分區數,key和consumer線程數

如何確定Kafka的分區數,key和consumer線程數

壹、客戶端/服務器端需要使用的內存就越多

先說說客戶端的情況。Kafka 0.8.2之後推出了Java版的全新的producer,這個producer有個參數batch.size,默認是16KB。它會為每個分區緩存消息,壹旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存占用也會更多。假設妳有10000個分區,按照默認設置,這部分緩存需要占用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數(大部分情況下是最佳的消費吞吐量配置)的話,那麽在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這裏面的線程切換的開銷本身已經不容小覷了。

服務器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,服務器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。

二、文件句柄的開銷

每個分區在底層文件系統都有屬於自己的壹個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破妳的ulimit -n的限制。

三、降低高可用性

Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分區保存若幹個副本(replica_factor指定副本數)。每個副本保存在不同的broker上。期中的壹個副本充當leader 副本,負責處理producer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然後在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果妳有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那麽zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,並且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。

說了這麽多“廢話”,很多人肯定已經不耐煩了。那妳說到底要怎麽確定分區數呢?答案就是:視情況而定。基本上妳還是需要通過壹系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對妳意義不大,因為不同的硬件、軟件、負載情況測試出來的結果必然不壹樣。我經常碰到的問題類似於,官網說每秒能到10MB,為什麽我的producer每秒才1MB? —— 且不說硬件條件,最後發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過妳依然可以遵循壹定的步驟來嘗試確定分區數:創建壹個只有1個分區的topic,然後測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然後假設總的目標吞吐量是Tt,那麽分區數 = Tt / max(Tp, Tc)

Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大, 因為Tc的值取決於妳拿到消息之後執行什麽操作,因此Tc的測試通常也要麻煩壹些。

另外,Kafka並不能真正地做到線性擴展(其實任何系統都不能),所以妳在規劃妳的分區數的時候最好多規劃壹下,這樣未來擴展時候也更加方便。

消息-分區的分配

默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key) % numPartitions,如下圖所示:

def partition(key: Any, numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

這就保證了相同key的消息壹定會被路由到相同的分區。如果妳沒有指定key,那麽Kafka是如何確定這條消息去往哪個分區的呢?

復制代碼

if(key == null) { // 如果沒有指定key

val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id

id match {

case Some(partitionId) =>

partitionId // 如果有的話直接使用這個分區Id就好了

case None => // 如果沒有的話,

val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區的leader所在的broker

if (availablePartitions.isEmpty)

throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)

val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中隨機挑壹個

val partitionId = availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下壹次直接使用

partitionId

}

}

復制代碼

可以看出,Kafka幾乎就是隨機找壹個分區發送無key的消息,然後把這個分區號加入到緩存中以備後面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鐘或每次請求topic元數據時)

如何設定consumer線程數

我個人的觀點,如果妳的分區數是N,那麽最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。

topic下的壹個分區只能被同壹個consumer group下的壹個consumer線程來消費,但反之並不成立,即壹個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是壹個線程來消費所有分區的數據。——其實ConsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。

再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用於consumer程序調用實現數據的消費。其底層維護了壹個阻塞隊列,所以在沒有新消息到來時,consumer是處於阻塞狀態的,表現出來的狀態就是consumer程序壹直在等待新消息的到來。——妳當然可以配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。

下面說說Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設妳有10個分區,P0 ~ P9,consumer線程數是3, C0 ~ C2,那麽每個線程都分配哪些分區呢?

C0 消費分區 0, 1, 2, 3

C1 消費分區 4, 5, 6

C2 消費分區 7, 8, 9

具體算法就是:

復制代碼

val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分區數

val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分區需要單獨分配給開頭的線程們

...

for (consumerThreadId <- consumerThreadIdSet) { // 對於每壹個consumer線程

val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出該線程在所有線程中的位置,介於[0, n-1]

assert(myConsumerPosition >= 0)

// startPart 就是這個線程要消費的起始分區數

val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)

// nParts 就是這個線程總***要消費多少個分區

val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

...

}

復制代碼

針對於這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若幹個線程。這就是為什麽C0消費4個分區,後面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:

ctx.myTopicThreadIds

nPartsPerConsumer = 10 / 3 = 3

nConsumersWithExtraPart = 10 % 3 = 1

第壹次:

myConsumerPosition = 1

startPart = 1 * 3 + min(1, 1) = 4 ---也就是從分區4開始讀

nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 讀取3個分區, 即4,5,6

第二次:

myConsumerPosition = 0

startPart = 3 * 0 + min(1, 0) =0 --- 從分區0開始讀

nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 讀取4個分區,即0,1,2,3

第三次:

myConsumerPosition = 2

startPart = 3 * 2 + min(2, 1) = 7 --- 從分區7開始讀

nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 讀取3個分區,即7, 8, 9

至此10個分區都已經分配完畢

說到這裏,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka並沒有提供自定義分配策略。做到這點很難,但仔細想壹想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許並不是Kafka該做的事情。

  • 上一篇:kafka源代碼功能介紹
  • 下一篇:請問誰能幫我解釋壹下單片機發光二極管流水燈實驗源代碼,我剛學單片機,源代碼看不太懂,萬分感謝!
  • copyright 2024編程學習大全網