當前位置:編程學習大全網 - 源碼下載 - 如何將MapReduce轉換成Spark

如何將MapReduce轉換成Spark

MapReduce 轉換到 Spark

Spark 是類似於 MapReduce 的計算引擎,它提出的內存方式解決了 MapReduce 存在的讀取磁盤速度較慢的困難,此外,它基於 Scala 的函數式編程風格和 API,進行並行計算時效率很高。

於 Spark 采用的是 RDD(彈性分布式結果集) 方式對數據進行計算,這種方式與 MapReduce 的 Map()、Reduce()

方式差距較大,所以很難直接使用 Mapper、Reducer 的 API,這也是阻礙 MapReduce 轉為 Spark 的絆腳石。

Scala 或者 Spark 裏面的 map() 和 reduce() 方法與 Hadoop MapReduce 裏面的 map()、reduce() 方法相比,Hadoop

MapReduce 的 API 更加靈活和復雜,下面列出了 Hadoop MapReduce 的壹些特性:

Mappers 和 Reducers 通常使用 key-value 鍵值對作為輸入和輸出;

壹個 key 對應壹個 Reducer 的 reduce;

每壹個 Mapper 或者 Reducer 可能發出類似於 0,1 這樣的鍵值對作為每壹次輸出;

Mappers 和 Reducers 可能發出任意的 key 或者 value,而不是標準數據集方式;

Mapper 和 Reducer 對象對每壹次 map() 和 reduce() 的調用都存在生命周期。它們支持壹個 setup() 方法和 cleanup() 方法,這些方法可以被用來在處理批量數據之前的操作。

試想這麽壹個場景,我們需要計算壹個文本文件裏每壹行的字符數量。在 Hadoop

MapReduce 裏,我們需要為 Mapper 方法準備壹個鍵值對,key 用作行的行數,value 的值是這壹行的字符數量。

清單 9.

MapReduce 方式 Map 函數

public class LineLengthCountMapper

extends Mapper<LongWritable,Text,IntWritable,IntWritable> {

@Override

protected void map(LongWritable lineNumber, Text line, Context context)

throws IOException, InterruptedException {

context.write(new IntWritable(line.getLength()), new IntWritable(1));

}

}

清單 9 所示代碼,由於 Mappers 和 Reducers 只處理鍵值對,所以對於類

LineLengthCountMapper 而言,輸入是 TextInputFormat 對象,它的 key 由行數提供,value

就是該行所有字符。換成 Spark 之後的代碼如清單 10 所示。

清單 10. Spark 方式 Map 函數

lines.map(line => (line.length, 1))

在 Spark 裏,輸入是彈性分布式數據集 (Resilient Distributed

Dataset),Spark 不需要 key-value 鍵值對,代之的是 Scala 元祖 (tuple),它是通過 (line.length,

1) 這樣的 (a,b) 語法創建的。以上代碼中 map() 操作是壹個 RDD,(line.length,

1) 元祖。當壹個 RDD 包含元祖時,它依賴於其他方法,例如 reduceByKey(),該方法對於重新生成 MapReduce 特性具有重要意義。

清單 11 所示代碼是 Hadoop MapReduce 統計每壹行的字符數,然後以 Reduce 方式輸出。

清單 11.

MapReduce 方式 Reduce 函數

public class LineLengthReducer

extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {

@Override

protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum += count.get();

}

context.write(length, new IntWritable(sum));

}

}

Spark 裏面的對應代碼如清單 12 所示。

清單 12.

Spark 方式 Reduce 函數

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

Spark 的 RDD API 有壹個 reduce() 方法,它會 reduce 所有的 key-value 鍵值對到壹個獨立的 value。

我們現在需要統計大寫字母開頭的單詞數量,對於文本的每壹行而言,壹個 Mapper 可能需要統計很多個鍵值對,代碼如清單 13 所示。

清單 13.

MapReduce 方式計算字符數量

public class CountUppercaseMapper

extends Mapper<LongWritable,Text,Text,IntWritable> {

@Override

protected void map(LongWritable lineNumber, Text line, Context context)

throws IOException, InterruptedException {

for (String word : line.toString().split(" ")) {

if (Character.isUpperCase(word.charAt(0))) {

context.write(new Text(word), new IntWritable(1));

}

}

}

}

在 Spark 裏面,對應的代碼如清單 14 所示。

清單 14. Spark 方式計算字符數量

lines.flatMap(

_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))

)

MapReduce 依賴的 Map

方法這裏並不適用,因為每壹個輸入必須對應壹個輸出,這樣的話,每壹行可能占用到很多的輸出。相反的,Spark 裏面的 Map

方法比較簡單。Spark

裏面的方法是,首先對每壹行數據進行匯總後存入壹個輸出結果物數組,這個數組可能是空的,也可能包含了很多的值,最終這個數組會作為壹個 RDD

作為輸出物。這就是 flatMap() 方法的功能,它對每壹行文本裏的單詞轉換成函數內部的元組後進行了過濾。

在 Spark 裏面,reduceByKey() 方法可以被用來統計每篇文章裏面出現的字母數量。如果我們想統計每壹篇文章裏面出現的大寫字母數量,在 MapReduce 裏程序可以如清單 15 所示。

清單 15. MapReduce 方式

public class CountUppercaseReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

@Override

protected void reduce(Text word, Iterable<IntWritable> counts, Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable count : counts) {

sum += count.get();

}

context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));

}

}

在 Spark 裏,代碼如清單 16 所示。

清單 16. Spark 方式

groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey() 方法負責收集壹個 key 的所有值,不應用於壹個 reduce 方法。本例中,key 被轉換成大寫字母,值被直接相加算出總和。但這裏需要註意,如果壹個 key 與很多個 value 相關聯,可能會出現 Out

Of Memory 錯誤。

Spark 提供了壹個簡單的方法可以轉換 key 對應的值,這個方法把 reduce 方法過程移交給了 Spark,可以避免出現 OOM 異常。

reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total)

}

setup() 方法在 MapReduce 裏面主要的作用是在 map 方法開始前對輸入進行處理,常用的場景是連接數據庫,可以在 cleanup() 方法中釋放在 setup() 方法裏面占用的資源。

清單 17. MapReduce 方式

public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

private Connection dbConnection;

@Override

protected void setup(Context context) {

dbConnection = ...;

}

...

@Override

protected void cleanup(Context context) {

dbConnection.close();

}

}

  • 上一篇:linux中yum和rpm的用法和區別
  • 下一篇:LATEX制作表格,自動換行、水平和豎直對齊的解決方法
  • copyright 2024編程學習大全網