當前位置:編程學習大全網 - 編程語言 - MapReduce執行過程

MapReduce執行過程

MapReduce存在以下4個獨立的實體。

1. JobClient:運行於client node,負責將MapReduce程序打成Jar包存儲到HDFS,並把Jar包的路徑提交到Jobtracker,由Jobtracker進行任務的分配和監控。

2. JobTracker:運行於name node,負責接收JobClient提交的Job,調度Job的每壹個子task運行於TaskTracker上,並監控它們,如果發現有失敗的task就重新運行它。

3. TaskTracker:運行於data node,負責主動與JobTracker通信,接收作業,並直接執行每壹個任務。

4. HDFS:用來與其它實體間***享作業文件。

具體細節如下:

1.JobClient通過RPC協議向JobTracker請求壹個新應用的ID,用於MapReduce作業的ID

2.JobTracker檢查作業的輸出說明。例如,如果沒有指定輸出目錄或目錄已存在,作業就不提交,錯誤拋回給JobClient,否則,返回新的作業ID給JobClient

3.JobClient將作業所需的資源(包括作業JAR文件、配置文件和計算所得得輸入分片)復制到以作業ID命名的HDFS文件夾中

4.JobClient通過submitApplication()提交作業

5.JobTracker收到調用它的submitApplication()消息後,進行任務初始化

6.JobTracker讀取HDFS上的要處理的文件,開始計算輸入分片,每壹個分片對應壹個TaskTracker

map任務不是隨隨便便地分配給某個TaskTracker的,這裏有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同事將程序jar包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”。而分配reduce任務時並不考慮數據本地化

7.TaskTracker通過心跳機制領取任務(任務的描述信息)

8.TaskTracker讀取HDFS上的作業資源(JAR包、配置文件等)

9.TaskTracker啟動壹個java child子進程,用來執行具體的任務(MapperTask或ReducerTask)

10.TaskTracker將Reduce結果寫入到HDFS當中

註:以HDFS的壹個塊的大小(默認64M)為壹個分片?

15年2.7.3版本開始,block size由64 MB變成了128 MB的。

Shuffle分析

Shuffle的中文意思是“洗牌”,如果我們這樣看:壹個map產生的數據,結果通過hash過程分區缺分配給了不同的reduce任務,是不是壹個對數據洗牌的過程呢?

?shuffle的概念:

 Collections.shuffle(List list):隨機打亂list裏的元素順序。

 MapReduce裏的Shuffle:描述著數據從map task輸出到reduce task輸入的這段過程。

Map端流程分析

1 每個輸入分片會讓壹個map任務來處理,默認情況下,以HDFS的壹個塊的大小(默認64M)為壹個分片,當然我們也可以設置塊的大小。map輸出的結果會暫且放在壹個環形內存緩沖區中(該緩沖區的大小默認為100M,由io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建壹個溢出文件,將該緩沖區中的數據寫入這個文件。

2 在寫入磁盤之前,線程首先根據reduce任務的數目將數據劃分為相同數目的分區,也就是壹個reduce任務對應壹個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。然後對每個分區中的數據進行排序,如果此時設置了Combiner,將排序後的結果進行Combianer操作,這樣做的目的是讓盡可能少的數據寫入到磁盤。

3 當map任務輸出最後壹個記錄時,可能會有很多的溢出文件,這時需要將這些文件合並。合並的過程中會不斷地進行排序和combiner操作,目的有兩個:1、盡量減少每次寫入磁盤的數據量;2、盡量減少下壹復制階段網絡傳輸的數據量。最後合並成了壹個已分區且已排序的文件。為了減少網絡傳輸的數據量,這裏可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以。

數據壓縮:Gzip、Lzo、snappy。

4 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎麽知道它對應的reduce是哪個呢?其實map任務壹直和其父TaskTracker保持聯系,而TaskTracker又壹直和obTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就OK了。

reduce端流程分析

1 reduce會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果reduce端接收的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間百分比),如果數據量超過了該緩沖區大小的壹定比例(由mapred.job.shuffle.merg.percent決定),則對數據合並後溢寫到磁盤中。

2 隨著溢寫文件的增多,後臺線程會將它們合並成壹個更大的有序的文件,這樣做是為了給後面的合並節省空間。其實不管在map端還是在reduce端,MapReduce都是反復地執行排序,合並操作,現在終於明白了有些人為什麽會說:排序是hadoop的靈魂。

3 合並的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,並且最後壹次合並的結果並沒有寫入磁盤,而是直接輸入到reduce函數。

4 Reducer的輸入文件。不斷地merge後,最後會生成壹個“最終文件”。為什麽加引號?因為這個文件可能存在於磁盤上,也可能存在於內存中。對我們來說,希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。然後就是Reducer執行,把結果放到HDSF上。

詳細過程:

1?每個map task都有壹個內存緩沖區,存儲著map的輸出結果,當緩沖區快滿的時候需要將緩沖區的數據以壹個臨時文件的方式存放到磁盤,當整個map task結束後在對磁盤中這個map task產生的所有臨時文件做壹個合並,生成最終的正式輸出文件,然後等待reduce task來拉數據。

2?在map task執行時,它的輸入數據來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。split與block對應關系可能是多對壹,默認是壹對壹。在wordcount例子裏,假設map的輸入數據都是是像“aaa”這樣的字符串。

3?在經過mapper的運行後,我們得知mapper的輸出是這樣壹個key/value對:key是“aaa”,value是數值1。因為當前map端只做加1的操作,在reduce task裏采取合並結果集。前面我們知道這個job有3個reduce task。那到底當前的“aaa”究竟該丟給哪個reduce去處理呢?是需要現在做決定的。

4?MapReduce提供Partitioner接口,作用就是根據key或value及reduce的數量來決定當前的輸出數據最終應該交由哪個reduce task處理。默認對key hash後再以reduce task數據取模(取余)。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以定制並設置到job上。

5?在例子中,“aaa”經過Partition後返回0,也就是這對值應當交由第壹個reduce來處理。接下來,需要將數據寫入內存緩沖區中,緩沖區的作用是批量收集map結果,減少磁盤IO的影響。我們的key/value對以及Partition的結果都會被寫入緩沖區。當然,寫入之前,key與value值都會被序列化成字節數組。

6?內存緩沖區是有大小限制的,默認是100MB。當map task的輸出結果很多時,就可能會撐爆內存,所以需要在壹定條件下將緩沖區中的數據臨時寫入磁盤,然後重新利用這塊緩沖區。這個從內存往磁盤寫數據的過程被稱為spill,中文可理解為溢寫。溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩沖區有個溢寫的比例spill.percent。比例默認是0.8,也就是當緩沖區的數據值已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。

7?當溢寫線程啟動後,需要對這80MB空間內的key做排序(sort)。排序是MapReduce模型默認的行為,這裏的排序也是對序列化的字節做的排序。

8?因為map task的輸出是需要發送到不同的reduce端去,而內存緩沖區沒有對將發送到相同reduce端的數據做合並,那麽這種合並應該是體現在磁盤文件中的。從官方圖上也可以看到寫到磁盤中的壹些文件是對不同的reduce端的數值做過合並。所以溢寫過程壹個很重要的細節在於,如果有很多個key/value對需要發送到某個reduce端去,那麽需要將這些key/value值拼接到壹塊,減少與partition相關的索引記錄。

 在針對每個reduce端而合並數據時,有些數據可能像這樣:“aaa”/1,“aaa”/1。對於wordcount例子,只是簡單地統計單詞出現的次數,如果在同壹個map task的結果中有很多像“aaa”壹樣出現多次的key,我們就應該把它們的值合並到壹塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只值reduce端執行從多個map task取數據做計算的過程。除reduce外,非正式地合並數據只能算作combine了。其實大家知道的,MapReduce中將Combiner等同於Reducer。

 如果client設置過Combiner,那麽現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數據量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?從這裏分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value類型完全壹致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用壹定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

9?每次溢寫會在磁盤上生成壹個溢寫文件,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩沖區中的數據也全部溢寫到磁盤中形成壹個溢寫文件。最終磁盤中會至少有壹個這樣的溢寫文件存在(如果map的輸出結果很少,當map執行完成時,只會產生壹個溢寫文件),因為最終的文件只有壹個,所以需要將這些溢寫文件歸並到壹起,這個過程就叫Merge。Merge是怎樣的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外壹個map讀取時值是8,因為他們有相同的key,所以要merge成group。

 什麽是group:對於“aaa”就是像真陽的:{“aaa”,[5,8,2,...]},數組中的值就是從不同的溢寫文件中讀取出來的,然後再把這些值加起來。請註意,因為merge是將多個溢寫文件合並到壹個文件,所以可能也有相同的key存在,在這個過程中,如果client設置過Combiner,也會使用Combiner來合並相同的key。

至此,map端的所有工作都已經結束,最終生成的這個文件也存放在TaskTracker夠得到的某個本地目錄中。每個reduce?task不斷地通過RPC從JobTRacker那獲取map task是否完成的信息,如果reduce task得到通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啟動。

Reduce端的shuffle過程:

1 copy過程,簡單地拉取數據。Reduce進程啟動壹些數據copy線程(Fetcher),通過biner操作,目的有兩個:1、盡量減少每次寫入磁盤的數據量;2、盡量減少下壹復制階段網絡傳輸的數據量。最後合並成了壹個已分區且已排序的文件。為了減少網絡傳輸的數據量,這裏可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以。

數據壓縮:Gzip、Lzo、snappy。

4 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎麽知道它對應的reduce是哪個呢?其實map任務壹直和其父TaskTracker保持聯系,而TaskTracker又壹直和obTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就OK了。

reduce端流程分析

1 reduce會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果reduce端接收的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間百分比),如果數據量超過了該緩沖區大小的壹定比例(由mapred.job.shuffle.merg.percent決定),則對數據合並後溢寫到磁盤中。

2 隨著溢寫文件的增多,後臺線程會將它們合並成壹個更大的有序的文件,這樣做是為了給後面的合並節省空間。其實不管在map端還是在reduce端,MapReduce都是反復地執行排序,合並操作,現在終於明白了有些人為什麽會說:排序是hadoop的靈魂。

3 合並的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,並且最後壹次合並的結果並沒有寫入磁盤,而是直接輸入到reduce函數。

4 Reducer的輸入文件。不斷地merge後,最後會生成壹個“最終文件”。為什麽加引號?因為這個文件可能存在於磁盤上,也可能存在於內存中。對我們來說,希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。然後就是Reducer執行,把結果放到HDSF上。

註意:對MapReduce的調優在很大程度上就是對MapReduce Shuffle的性能的調優。

三、內存緩沖區:MapOutputBuffer

兩級索引結構:

環形緩沖區:

1 kvoffsets緩沖區:也叫偏移量索引數組,用於保存key/value信息在位置索引kvindices中的偏移量。當kvoffsets的使用率超過io.sort.spill.percent(默認為80%)後,便會觸發壹次SpillThread線程的“溢寫”操作,也就是開始壹次spill階段的操作。

2 kvindices緩沖區:也叫位置索引數組,用於保存key/value在數據緩沖區kvbuffer中的起始位置。

3 kvbuffer數據緩沖區:用於保存實際的key/value的值。默認情況下該緩沖區最多可以使用io.sort.mb的95%,當kvbuffer使用率超過io.sort.spill.percent(默認80%)後,便會觸發壹次SpillThread線程的“溢寫”操作,也就是開始壹次spill階段的操作。

  • 上一篇:如何快速學好壹門編程語言?
  • 下一篇:藝術與科技就業前景
  • copyright 2024編程學習大全網