當前位置:編程學習大全網 - 編程語言 - Hadoop從入門到精通33:MapReduce核心原理之Shuffle過程分析

Hadoop從入門到精通33:MapReduce核心原理之Shuffle過程分析

在安裝Hadoop集群的時候,我們在yarn-site.xml文件中配置了MapReduce的運行方式為yarn.nodemanager.aux-services=mapreduce_shuffle。本節就來詳細介紹壹下MapReduce的shuffle過程。

shuffle,即混洗、洗牌的意思,是指MapReduce程序在執行過程中,數據在各個Mapper(Combiner、Sorter、Partitioner)、Reducer等進程之間互相交換的過程。

關於上圖Shuffle過程的幾點說明:

說明:map節點執行map task任務生成map的輸出結果。

shuffle的工作內容:

從運算效率的出發點,map輸出結果優先存儲在map節點的內存中。每個map task都有壹個內存緩沖區,存儲著map的輸出結果,當達到內存緩沖區的閥值(80%)時,需要將緩沖區中的數據以壹個臨時文件的方式存到磁盤,當整個map task結束後再對磁盤中這個map task所產生的所有臨時文件做合並,生成最終的輸出文件。最後,等待reduce task來拉取數據。當然,如果map task的結果不大,能夠完全存儲到內存緩沖區,且未達到內存緩沖區的閥值,那麽就不會有寫臨時文件到磁盤的操作,也不會有後面的合並。

詳細過程如下:

(1)map task任務執行,輸入數據的來源是:HDFS的block。當然在mapreduce概念中,map task讀取的是split分片。split與block的對應關系:壹對壹(默認)。

此處有必要說明壹下block與split:

block(物理劃分):文件上傳到HDFS,就要劃分數據成塊,這裏的劃分屬於物理的劃分,塊的大小可配置(默認:第壹代為64M,第二代為128M)可通過 dfs.block.size配置。為保證數據的安 全,block采用冗余機制:默認為3份,可通過dfs.replication配置。註意:當更改塊大小的配置後,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值。

split(邏輯劃分):Hadoop中split劃分屬於邏輯上的劃分,目的只是為了讓map task更好地獲取數據。split是通過hadoop中的InputFormat接口中的getSplit()方法得到的。那麽,split的大小具體怎麽得到呢?

首先介紹幾個數據量:

totalSize:整個mapreduce job所有輸入的總大小。註意:基本單位是block個數,而不是Bytes個數。

numSplits:來自job.getNumMapTasks(),即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,從方法的名稱上看,是用於設置map的個數。但是,最終map的個數也就是split的個數並不壹定取用戶設置的這個值,用戶設置的map個數值只是給最終的map個數壹個提示,只是壹個影響因素,而不是決定因素。

goalSize:totalSize/numSplits,即期望的split的大小,也就是每個mapper處理多少的數據。但是僅僅是期望

minSize:split的最小值,該值可由兩個途徑設置:

最終取goalSize和minSize中的最大值!

最終:split大小的計算原則:finalSplitSize=max(minSize,min(goalSize,blockSize))

那麽,map的個數=totalSize/finalSplitSize

註意: 新版的API中InputSplit劃分算法不再考慮用戶設定的Map Task個數,而是用mapred.max.split.size(記為maxSize)代替

即:InputSplit大小的計算公式為:splitSize=max{minSize,min{maxSize,blockSize}}

接下來就簡答說說怎麽根據業務需求,調整map的個數。

當我們用hadoop處理大批量的大數據時,壹種最常見的情況就是job啟動的mapper數量太多而超出系統限制,導致hadoop拋出異常終止執行。

解決方案:減少mapper的數量!具體如下:

a.輸入文件數量巨大,但不是小文件

這種情況可通過增大每個mapper的inputsize,即增大minSize或者增大blockSize來減少所需的mapper的數量。增大blocksize通常不可行,因為HDFS被hadoop namenode -format之後,blocksize就已經確定了(由格式化時dfs.block.size決定),如果要更改blocksize,需要重新格式化HDFS,這樣當然會丟失已有的數據。所以通常情況下只能增大minSize,即增大mapred.min.split.size的值。

b.輸入文件數量巨大,且都是小文件

所謂小文件,就是單個文件的size小於blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成壹個InputSplit送給mapper處理,從而減少mapper的數量。增加mapper的數量,可以通過減少每個mapper的輸入做到,即減小blockSize或者減少mapred.min.split.size的值。

(2)map執行後,得到key/value鍵值對。接下來的問題就是,這些鍵值對應該交給哪個reduce做?註意:reduce的個數是允許用戶在提交job時,通過設置方法設置的!

MapReduce提供partitioner接口解決上述問題。默認操作是:對key hash後再以reduce task數量取模,返回值決定著該鍵值對應該由哪個reduce處理。這種默認的取模方式只是為了平均reduce的處理能力,防止數據傾斜,保證負載均衡。如果用戶自己對Partition有需求,可以自行定制並設置到job上。

接下來,需要將key/value以及Partition結果都寫入到緩沖區,緩沖區的作用:批量收集map結果,減少磁盤IO的影響。當然,寫入之前,這些數據都會被序列化成字節數組。而整個內存緩沖區就是壹個字節數組。這個內存緩沖區是有大小限制的,默認100MB。當map task的輸出結果很多時,就可能撐爆內存。需將緩沖區的數據臨時寫入磁盤,然後重新利用這塊緩沖區。

從內存往磁盤寫數據被稱為Spill(溢寫),由單獨線程完成,不影響往緩沖區寫map結果的線程。溢寫比例:spill.percent(默認0.8)。

當緩沖區的數據達到閥值,溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。剩下的20MB繼續寫入map task的輸出結果。互不幹涉!

當溢寫線程啟動後,需要對這80MB空間內的key做排序(Sort)。排序是mapreduce模型的默認行為,也是對序列化的字節做的排序。排序規則:字典排序!

map task的輸出結果寫入內存後,當溢寫線程未啟動時,對輸出結果並沒有做任何的合並。從官方圖可以看出,合並是體現在溢寫的臨時磁盤文件上的,且這種合並是對不同的reduce端的數值做的合並。所以溢寫過程壹個很重要的細節在於,如果有很多個key/value對需要發送到某個reduce端,那麽需要將這些鍵值對拼接到壹塊,減少與partition相關的索引記錄。如果client設置Combiner,其會將有相同key的key/value對的value加起來,減少溢寫到磁盤的數據量。註意:這裏的合並並不能保證map結果中所有的相同的key值的鍵值對的value都合並了,它合並的範圍只是這80MB,它能保證的是在每個單獨的溢寫文件中所有鍵值對的key值均不相同!

溢寫生成的臨時文件的個數隨著map輸出結果的數據量變大而增多,當整個map task完成,內存中的數據也全部溢寫到磁盤的壹個溢寫文件。也就是說,不論任何情況下,溢寫過程生成的溢寫文件至少有壹個!但是最終的文件只能有壹個,需要將這些溢寫文件歸並到壹起,稱為merge。merge是將所有的溢寫文件歸並到壹個文件,結合上面所描述的combiner的作用範圍,歸並得到的文件內鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner,也會合並相同的key值的鍵值對,如果沒有,merge得到的就是鍵值集合,如{“aaa”, [5, 8, 2, …]}。註意:combiner的合理設置可以提高效率,但是如果使用不當會影響效率!

至此,map端的所有工作都已經結束!

當mapreduce任務提交後,reduce task就不斷通過RPC從JobTracker那裏獲取map task是否完成的信息,如果獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程就開始啟動。其實呢,reduce task在執行之前的工作就是:不斷地拉取當前job裏每個map task的最終結果,並對不同地方拉取過來的數據不斷地做merge,也最終形成壹個文件作為reduce task的輸入文件。

1.Copy過程,簡單地拉取數據。Reduce進程啟動壹些數據copy線程(Fether),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤。

2.Merge過程。這裏的merge如map端的merge動作,只是數組中存放的是不同map端copy過來的數值。Copy過來的數據會先放入內存緩沖區中,這裏緩沖區的大小要比map端的更為靈活,它是基於JVM的heap size設置,因為shuffler階段reducer不運行,所以應該把絕大部分的內存都給shuffle用。

merge的三種形式:內存到內存、內存到磁盤、磁盤到磁盤。默認情況下,第壹種形式不啟用。當內存中的數據量達到壹定的閥值,就啟動內存到磁盤的merge。與map端類似,這也是溢寫過程,當然如果這裏設置了Combiner,也是會啟動的,然後在磁盤中生成了眾多的溢寫文件。第二種merge方式壹直在運行,直到沒有map端的數據時才結束,然後啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。

3.reducer的輸入文件。不斷地merge後,最後會生成壹個“最終文件”。這個最終文件可能在磁盤中也可能在內存中。當然我們希望它在內存中,直接作為reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當reducer的輸入文件已定,整個shuffle才最終結束。然後就是reducer執行,把結果存放到HDFS上。

  • 上一篇:歷年諾貝爾生理學或醫學獎?
  • 下一篇:克拉克和阿西莫夫誰是科幻作家中的NO.1
  • copyright 2024編程學習大全網