當前位置:編程學習大全網 - 編程語言 - mapreduce中map是怎麽做的?參數又是怎麽解析傳遞給map方法的

mapreduce中map是怎麽做的?參數又是怎麽解析傳遞給map方法的

1.首先介紹壹下wordcount 早mapreduce框架中的 對應關系

大家都知道 mapreduce 分為 map 和reduce 兩個部分,那麽在wordcount例子中,很顯然 對文件word 計數部分為map,對 word 數量累計部分為 reduce;

大家都明白 map接受壹個參數,經過map處理後,將處理結果作為reduce的入參分發給reduce,然後在reduce中統計了word 的數量,最終輸出到輸出結果;

但是初看遇到的問題:

壹、map的輸入參數是個 Text之類的 對象,並不是 file對象

二、reduce中並沒有if-else之類的判斷語句 ,來說明 這個word 數量 加 壹次,那個word 加壹次。那麽這個判斷到底只是在 map中已經區分了 還是在reduce的時候才判斷的

三、map過程到底做了什麽,reduce過程到底做了什麽?為什麽它能夠做到多個map多個reduce?

壹、

1. 怎麽將 文件參數 傳遞 到 job中呢?

在 client 我們調用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

實際上 addInputPath 做了以下的事情(將文件路徑加載到了conf中)

public static void addInputPath(Job job,

Path path) throws IOException {

Configuration conf = job.getConfiguration();

path = path.getFileSystem(conf).makeQualified(path);

String dirStr = StringUtils.escapeString(path.toString());

String dirs = conf.get(INPUT_DIR);

conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);

}

我們再來看看 FileInputFormat 是做什麽用的, FileInputFormat 實現了 InputFormat 接口 ,這個接口是hadoop用來接收客戶端輸入參數的。所有的輸入格式都繼承於InputFormat,這是壹個抽象類,其子類有專門用於讀取普通文件的FileInputFormat,用來讀取數據庫的DBInputFormat等等。

我們會看到 在 InputFormat 接口中 有getSplits方法,也就是說分片操作實際上實在 map之前 就已經做好了

List<InputSplit>getSplits(JobContext job)

Generate the list of files and make them into FileSplits.

具體實現參考 FileInputFormat getSplits 方法:

上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它們會被用來計算分片大小。可以通過設置mapred.min.split.size和mapred.max.split.size來設置。splits鏈表用來存儲計算得到的輸入分片,files則存儲作為由listStatus()獲取的輸入文件列表。然後對於每個輸入文件,判斷是否可以分割,通過computeSplitSize計算出分片大小splitSize,計算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize。然後我們根據這個splitSize計算出每個文件的inputSplits集合,然後加入分片列表splits中。註意到我們生成InputSplit的時候按上面說的使用文件路徑,分片起始位置,分片大小和存放這個文件的hosts列表來創建。最後我們還設置了輸入文件數量:mapreduce.input.num.files。

二、計算出來的分片有時怎麽傳遞給 map呢 ?對於單詞數量如何累加?

我們使用了 就是InputFormat中的另壹個方法createRecordReader() 這個方法:

RecordReader:

RecordReader是用來從壹個輸入分片中讀取壹個壹個的K -V 對的抽象類,我們可以將其看作是在InputSplit上的叠代器。我們從API接口中可以看到它的壹些方法,最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下壹個K-V 對。

可以看到接口中有:

public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

public abstract float getProgress() throws IOException, InterruptedException;

public abstract void close() throws IOException;

FileInputFormat<K,V>

Direct Known Subclasses:

CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

對於 wordcount 測試用了 NLineInputFormat和 TextInputFormat 實現類

在 InputFormat 構建壹個 RecordReader 出來,然後調用RecordReader initialize 的方法,初始化RecordReader 對象

那麽 到底 Map是怎麽調用 的呢? 通過前邊我們 已經將 文件分片了,並且將文件分片的內容存放到了RecordReader中,

下面繼續看看這些RecordReader是如何被MapReduce框架使用的

終於 說道 Map了 ,我麽如果要實現Map 那麽 壹定要繼承 Mapper這個類

public abstract class Context

implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

}

protected void setup(Context context) throws IOException, InterruptedException

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }

protected void cleanup(Context context ) throws IOException, InterruptedException { }

public void run(Context context) throws IOException, InterruptedException { }

我們寫MapReduce程序的時候,我們寫的mapper都要繼承這個Mapper.class,通常我們會重寫map()方法,map()每次接受壹個K-V對,然後我們對這個K-V對進行處理,再分發出處理後的數據。我們也可能重寫setup()以對這個map task進行壹些預處理,比如創建壹個List之類的;我們也可能重寫cleanup()方法對做壹些處理後的工作,當然我們也可能在cleanup()中寫出K-V對。舉個例子就是:InputSplit的數據是壹些整數,然後我們要在mapper中算出它們的和。我們就可以在先設置個sum屬性,然後map()函數處理壹個K-V對就是將其加到sum上,最後在cleanup()函數中調用context.write(key,value);

最後我們看看Mapper.class中的run()方法,它相當於map task的驅動,我們可以看到run()方法首先調用setup()進行初始操作,然後對每個context.nextKeyValue()獲取的K-V對,就調用map()函數進行處理,最後調用cleanup()做最後的處理。事實上,從context.nextKeyValue()就是使用了相應的RecordReader來獲取K-V對的。

我們看看Mapper.class中的Context類,它繼承與MapContext,使用了壹個RecordReader進行構造。下面我們再看這個MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,

RecordReader<KEYIN,VALUEIN> reader,

RecordWriter<KEYOUT,VALUEOUT> writer,

OutputCommitter committer,

StatusReporter reporter,

InputSplit split) {

super(conf, taskid, writer, committer, reporter);

this.reader = reader;

this.split = split;

}

RecordReader 看來是在這裏構造出來了, 那麽 是誰調用這個方法,將這個承載著關鍵數據信息的 RecordReader 傳過來了 ?

我們可以想象 這裏 應該被框架調用的可能性比較大了,那麽mapreduce 框架是怎麽分別來調用map和reduce呢?

還以為分析完map就完事了,才發現這裏僅僅是做了mapreduce 框架調用前的壹些準備工作,

1.在 job提交 任務之後 首先由jobtrack 分發任務,

在 任務分發完成之後 ,執行 task的時候,這時 調用了 maptask 中的 runNewMapper

在這個方法中調用了 MapContextImpl, 至此 這個map 和框架就可以聯系起來了

  • 上一篇:青海公務員考試中計算機應用考什麽?
  • 下一篇:編程零基礎學習難嗎?
  • copyright 2024編程學習大全網