當前位置:編程學習大全網 - 源碼下載 - 如何把二份數據作用於壹個reducer

如何把二份數據作用於壹個reducer

Mapper的處理過程:

1.1. InputFormat 產生 InputSplit,並且調用RecordReader將這些邏輯單元(InputSplit)轉化為map task的輸入。其中InputSplit是map task處理的最小輸入單元的邏輯表示。

1.2. 在客戶端代碼中調用Job類來設置參數,並執行在hadoop集群的上的MapReduce程序。

1.3. Mapper類在Job中被實例化,並且通過MapContext對象來傳遞參數設置。可以調用Job.getConfiguration().set(“myKey”, “myVal”)來設置參數。

1.4. Mapper的run()方法調用了自身的setup()來設置參數。

1.5. Mapper的run()方法調用map(KeyInType, ValInType, Context)方法來依次處理InputSplit中輸入的key/value數據對。並且可以通過Context參數的Context.write(KeyOutType, ValOutType)方法來發射處理的結果。用戶程序還可以用Context來設置狀態信息等。同時,用戶還可以用Job.setCombinerClass(Class)來設置Combiner,將map產生的中間態數據在Mapper本地進行匯聚,從而減少傳遞給Reducer的數據。

1.6. Mapper的run()方法調用cleanup()方法。

壹些說明:

所有的中間結果都會被MapReduce框架自動的分組,然後傳遞給Reducer(s)去產生最後的結果。用戶可以通過Job.setGroupingComparatorClass(Class)來設置Comparator。如果這個Comparator沒有被設置,那麽所有有壹樣的key的數據不會被排序。

Mapper的結果都是被排序過的,並被劃分為R個區塊(R是Reducer的個數)。用戶可以通過實現自定義的Partitioner類來指定哪些數據被劃分給哪個Reducer。

中間態的數據往往用(key-len, key, value-len, value)的簡單格式存儲。用戶程序可以指定CompressionCodec來壓縮中間數據。

Map的數目由輸入數據的總大小決定。壹般來說,壹個計算節點10-100個map任務有較好的並行性。如果cpu的計算量很小,那麽平均每個計算節點300個map任務也是可以的。但是每個任務都是需要時間來初始化的,因此每個任務不能劃分的太小,至少也要平均壹個任務執行個壹分鐘。可以通過mapreduce.job.maps參數來設置map的數目。更進壹步說,任務的數量是有InputFormat.getSplits()方法來控制的,用戶可以重寫這個方法。

------------------------------------------------------------------------------------------------------------------------------------------------

Reducer主要分三個步驟:

1. Shuffle洗牌

這步驟是Reducer從相關的Mapper節點獲取中間態的數據。

2. Sort排序

在洗牌的同時,Reducer對獲取的數據進行排序。在這個過程中,可以用Job.setGroupingComparatorClass(Class)來對同壹個key的數據進行Secondary Sort二次排序。

3. Reduce

Reduce的調用順序和Map差不多,也是通過run()方法調用setup(),reduce(),cleanup()來實現的。Reducer輸出的數據是沒有經過排序的。

Reduce 的數目可以通過Job.setNumReduceTasks(int)來設置。壹般來說,Reduce的數目是節點數的0.95到1.75倍。

Reduce的數目也可以設置為0,那麽這樣map的輸出會直接寫到文件系統中。

Reduce中還可以使用Mark-Reset的功能。簡而言之就是可以在遍歷map產生的中間態的數據的時候可以進行標記,然後在後面適當的時候用reset回到最近標記的位置。當然這是有壹點限制的,如下面的例子,必須自己在Reduce方法中對reduce的Iterator重新new 壹個MarkableIterator才能使用。

public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

MarkableIterator<IntWritable> mitr = new MarkableIterator<IntWritable>(values.iterator());

// Mark the position

mitr.mark();

while (mitr.hasNext()) {

i = mitr.next();

// Do the necessary processing

}

// Reset

mitr.reset();

// Iterate all over again. Since mark was called before the first

// call to mitr.next() in this example, we will iterate over all

// the values now

while (mitr.hasNext()) {

i = mitr.next();

// Do the necessary processing

}

}

3. Partitioner

Partitioner 對map輸出的中間態數據按照reduce的數目進行分區,壹般通過hash來進行劃分。默認的實現是HashPartitioner。

4. Reporting Progress

MapReduce程序可以通過mapper或者reducer的Context來匯報應用程序的狀態。

5. Job

當我們在寫MapReduce程序的時候,通常,在main函數裏。建立壹個Job對象,設置它的JobName,然後配置輸入輸出路徑,設置我們的Mapper類和Reducer類,設置InputFormat和正確的輸出類型等等。然後我們會使用job.waitForCompletion()提交到JobTracker,等待job運行並返回,這就是壹般的Job設置過程。JobTracker會初始化這個Job,獲取輸入分片,然後將壹個壹個的task任務分配給TaskTrackers執行。TaskTracker獲取task是通過心跳的返回值得到的,然後TaskTracker就會為收到的task啟動壹個JVM來運行。

Job其實就是提供配置作業、獲取作業配置、以及提交作業的功能,以及跟蹤作業進度和控制作業。Job類繼承於JobContext類。JobContext提供了獲取作業配置的功能,如作業ID,作業的Mapper類,Reducer類,輸入格式,輸出格式等等,它們除了作業ID之外,都是只讀的。 Job類在JobContext的基礎上,提供了設置作業配置信息的功能、跟蹤進度,以及提交作業的接口和控制作業的方法。

壹個Job對象有兩種狀態,DEFINE和RUNNING,Job對象被創建時的狀態時DEFINE,當且僅當Job對象處於DEFINE狀態,才可以用來設置作業的壹些配置,如Reduce task的數量、InputFormat類、工作的Mapper類,Partitioner類等等,這些設置是通過設置配置信息conf來實現的;當作業通過submit()被提交,就會將這個Job對象的狀態設置為RUNNING,這時候作業以及提交了,就不能再設置上面那些參數了,作業處於調度運行階段。處於RUNNING狀態的作業我們可以獲取作業、maptask和reduce task的進度,通過代碼中的*Progress()獲得,這些函數是通過info來獲取的,info是RunningJob對象,它是實際在運行的作業的壹組獲取作業情況的接口,如Progress。

在waitForCompletion()中,首先用submit()提交作業,然後等待info.waitForCompletion()返回作業執行完畢。verbose參數用來決定是否將運行進度等信息輸出給用戶。submit()首先會檢查是否正確使用了new API,這通過setUseNewAPI()檢查舊版本的屬性是否被設置來實現的,接著就connect()連接JobTracker並提交。實際提交作業的是壹個JobClient對象,提交作業後返回壹個RunningJob對象,這個對象可以跟蹤作業的進度以及含有由JobTracker設置的作業ID。

getCounter()函數是用來返回這個作業的計數器列表的,計數器被用來收集作業的統計信息,比如失敗的map task數量,reduce輸出的記錄數等等。它包括內置計數器和用戶定義的計數器,用戶自定義的計數器可以用來收集用戶需要的特定信息。計數器首先被每個task定期傳輸到TaskTracker,最後TaskTracker再傳到JobTracker收集起來。這就意味著,計數器是全局的。

  • 上一篇:嗶哩嗶哩電競與動視暴雪電競達成長期戰略合作
  • 下一篇:兩同方向同頻率的簡諧振動的振動方程為x=6cos(5t+)
  • copyright 2024編程學習大全網