當前位置:編程學習大全網 - 源碼下載 - scala 中rdd類型用什麽頭文件

scala 中rdd類型用什麽頭文件

1.RDD介紹:

RDD,彈性分布式數據集,即分布式的元素集合。在spark中,對所有數據的操作不外乎是創建RDD、轉化已有的RDD以及調用RDD操作進行求值。在這壹切的背後,Spark會自動將RDD中的數據分發到集群中,並將操作並行化。

Spark中的RDD就是壹個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。

用戶可以使用兩種方法創建RDD:讀取壹個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,比如list或者set。

RDD的轉化操作都是惰性求值的,這意味著我們對RDD調用轉化操作,操作不會立即執行。相反,Spark會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD看做存放著特定數據的數據集,而最好把每個RDD當做我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操作也是惰性的,數據只會在必要時讀取。轉化操作和讀取操作都有可能多次執行。

2.創建RDD數據集

(1)讀取壹個外部數據集

val input=sc.textFile(inputFileDir)

(2)分發對象集合,這裏以list為例

val lines =sc.parallelize(List("hello world","this is a test"));

3.RDD操作

(1)轉化操作

實現過濾器轉化操作:

val lines =sc.parallelize(List("error:a","error:b","error:c","test"));

val errors=lines.filter(line => line.contains("error"));

errors.collect().foreach(println);

輸出:

error:a

error:b

error:c

可見,列表list中包含詞語error的表項都被正確的過濾出來了。

(2)合並操作

將兩個RDD數據集合並為壹個RDD數據集

接上述程序示例:

val lines =sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));

val errors=lines.filter(line => line.contains("error"));

val warnings =lines.filter(line => line.contains("warnings"));

val unionLines =errors.union(warnings);

unionLines.collect().foreach(println);

輸出:

error:a

error:b

error:c

warning:a

可見,將原始列表項中的所有error項和warning項都過濾出來了。

(3)獲取RDD數據集中的部分或者全部元素

①獲取RDD數據集中的部分元素 ? .take(int num) ?返回值List<T> ?

獲取RDD數據集中的前num項。

/**

* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so

* it will be slow if a lot of partitions are required. In that case, use collect() to get the

* whole RDD instead.

*/

def take(num: Int): JList[T]

程序示例:接上

unionLines.take(2).foreach(println);

輸出:

error:a

error:b

可見,輸出了RDD數據集unionLines的前2項

②獲取RDD數據集中的全部元素 .collect() 返回值 List<T>

程序示例:

val all =unionLines.collect();

all.foreach(println);

遍歷輸出RDD數據集unionLines的每壹項

4.向spark傳遞函數

在scala中,我們可以把定義的內聯函數、方法的引用或靜態方法傳遞給Spark,就像Scala的其他函數式API壹樣。我們還要考慮其他壹些細節,必須所傳遞的函數及其引用的數據需要是可序列化的(實現了Java的Serializable接口)。除此之外,與Python類似,傳遞壹個對象的方法或者字段時,會包含對整個對象的引用。我們可以把需要的字段放在壹個局部變量中,來避免包含該字段的整個對象。

class searchFunctions (val query:String){

def isMatch(s: String): Boolean = {

s.contains(query)

}

def getMatchFunctionReference(rdd: RDD[String]) :RDD[String]={

//問題: isMach表示 this.isMatch ,因此我們需要傳遞整個this

rdd.filter(isMatch)

}

def getMatchesFunctionReference(rdd: RDD[String]) :RDD[String] ={

//問題: query表示 this.query ,因此我們需要傳遞整個this

rdd.flatMap(line => line.split(query))

}

def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={

//安全,只把我們需要的字段拿出來放入局部變量之中

val query1=this.query;

rdd.flatMap(x =>x.split(query1)

)

}

}

5.針對每個元素的轉化操作:

轉化操作map()接收壹個函數,把這個函數用於RDD中的每個元素,將函數的返回結果作為結果RDD中對應的元素。關鍵詞:轉化

轉化操作filter()接受壹個函數,並將RDD中滿足該函數的元素放入新的RDD中返回。關鍵詞:過濾

示例圖如下所示:

①map()

計算RDD中各值的平方

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.map(value => value*value);

println(result.collect().mkString(","));

輸出:

1,4,9,16

filter()

②?去除RDD集合中值為1的元素:

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.filter(value => value!=1);

println(result.collect().mkString(","));

結果:

2,3,4

我們也可以采取傳遞函數的方式,就像這樣:

函數:

def filterFunction(value:Int):Boolean ={

value!=1

}

使用:

val rdd=sc.parallelize(List(1,2,3,4));

val result=rdd.filter(filterFunction);

println(result.collect().mkString(","));

③ 有時候,我們希望對每個輸入元素生成多個輸出元素。實現該功能的操作叫做flatMap()。和map()類似,我們提供給flatMap()的函數被分別應用到了輸入的RDD的每個元素上。不過返回的不是壹個元素,而是壹個返回值序列的叠代器。輸出的RDD倒不是由叠代器組成的。我們得到的是壹個包含各個叠代器可以訪問的所有元素的RDD。flatMap()的壹個簡單用途是將輸入的字符串切分成單詞,如下所示:?

val rdd=sc.parallelize(List("Hello world","hello you","world i love you"));

val result=rdd.flatMap(line => line.split(" "));

println(result.collect().mkString("\n"));

輸出:

hello

world

hello

you

world

i

love

you

6.集合操作

RDD中的集合操作

函數

用途

RDD1.distinct()

生成壹個只包含不同元素的新RDD。需要數據混洗。

RDD1.union(RDD2)

返回壹個包含兩個RDD中所有元素的RDD

RDD1.intersection(RDD2)

只返回兩個RDD中都有的元素

RDD1.substr(RDD2)

返回壹個只存在於第壹個RDD而不存在於第二個RDD中的所有元素組成的RDD。需要數據混洗。

集合操作對笛卡爾集的處理:

RDD1.cartesian(RDD2)

返回兩個RDD數據集的笛卡爾集

程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集

val rdd1=sc.parallelize(List(1,2));

val rdd2=sc.parallelize(List(1,2));

val rdd=rdd1.cartesian(rdd2);

println(rdd.collect().mkString("\n"));

輸出:

(1,1)

(1,2)

(2,1)

(2,2)

7.行動操作

(1)reduce操作

reduce()接收壹個函數作為參數,這個函數要操作兩個RDD的元素類型的數據並返回壹個同樣類型的新元素。壹個簡單的例子就是函數+,可以用它來對我們的RDD進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和,元素的個數,以及其他類型的聚合操作。

以下是求RDD數據集所有元素和的程序示例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.reduce((x,y) =>x+y);

println(results);

輸出:55

(2)fold()操作

接收壹個與reduce()接收的函數簽名相同的函數,再加上壹個初始值來作為每個分區第壹次調用時的結果。妳所提供的初始值應當是妳提供的操作的單位元素,也就是說,使用妳的函數對這個初始值進行多次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操作對應的空列表)。

程序實例:

①計算RDD數據集中所有元素的和:

zeroValue=0;//求和時,初始值為0。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.fold(0)((x,y) =>x+y);

println(results);

②計算RDD數據集中所有元素的積:

zeroValue=1;//求積時,初始值為1。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val results=rdd.fold(1)((x,y) =>x*y);

println(results);

(3)aggregate()操作

aggregate()函數返回值類型不必與所操作的RDD類型相同。

與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然後通過壹個函數把RDD中的元素合並起來放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。

以下是程序實例:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));

val result=rdd.aggregate((0,0))(

(acc,value) =>(acc._1+value,acc._2+1),

(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)

)

val average=result._1/result._2;

println(average)

輸出:5

最終返回的是壹個Tuple2<int,int>對象, 他被初始化為(0,0),當遇到壹個int值時,將該int數的值加到Tuple2對象的_1中,並將_2值加1,如果遇到壹個Tuple2對象時,將這個Tuple2的_1和_2的值歸並到最終返回的Tuple2值中去。

表格:對壹個數據為{1,2,3,3}的RDD進行基本的RDD行動操作

函數名 目的 示例 結果

collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}

count() RDD的元素個數 rdd.count() 4

countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {(1,1),

(2,1),

(3,2)

}

take(num) 從RDD中返回num個元素 rdd.take(2) {1,2}

top(num) 從RDD中返回最前面的num個元素 rdd.takeOrdered(2)(myOrdering) {3,3}

takeOrdered(num)

(ordering) 從RDD中按照提供的順序返回最前面的num個元素

rdd.takeSample(false,1) 非確定的

takeSample(withReplacement,num,[seed]) 從RDD中返回任意壹些元素 rdd.takeSample(false,1) 非確定的

reduce(func) 並行整合RDD中所有數據 rdd.reduce((x,y) => x+y)

9

fold(zero)(func) 和reduce()壹樣,但是需要提供初始值 rdd.fold(0)((x,y) => x+y)

9

aggregate(zeroValue)(seqOp,combOp) 和reduce()相似,但是通常返回不同類型的函數 rdd.aggregate((0,0))

((x,y) =>

(x._1+y,x._2+1),

(x,y)=>

(x._1+y._1,x._2+y._2)

) (9,4)

foreach(func) 對RDD中的每個元素使用給定的函數 rdd.foreach(func) 無

8.持久化緩存

因為Spark RDD是惰性求值的,而有時我們希望能多次使用同壹個RDD。如果簡單地對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在叠代算法中消耗格外大,因為叠代算法常常會多次使用同壹組數據。

為了避免多次計算同壹個RDD,可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲壹個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。

出於不同的目的,我們可以為RDD選擇不同的持久化級別。默認情況下persist()會把數據以序列化的形式緩存在JVM的堆空間中

不同關鍵字對應的存儲級別表

級別

使用的空間

cpu時間

是否在內存

是否在磁盤

備註

MEMORY_ONLY

直接儲存在內存

MEMORY_ONLY_SER

序列化後儲存在內存裏

MEMORY_AND_DISK

中等

部分

部分

如果數據在內存中放不下,溢寫在磁盤上

MEMORY_AND_DISK_SER

部分

部分

數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。

DISK_ONLY

直接儲存在硬盤裏面

程序示例:將RDD數據集持久化在內存中。

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);

println(rdd.count())

println(rdd.collect().mkString(","));

RDD還有unpersist()方法,調用該方法可以手動把持久化的RDD從緩存中移除。

9.不同的RDD類型

在scala中,將RDD轉為由特定函數的RDD(比如在RDD[Double]上進行數值操作),是由隱式轉換來自動處理的。這些隱式轉換可以隱式地將壹個RDD轉為各種封裝類,比如DoubleRDDFunctions(數值數據的RDD)和PairRDDFunctions(鍵值對RDD),這樣我們就有了諸如mean()和variance()之類的額外的函數。

示例程序:

val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));

println(rdd.mean());

其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。

  • 上一篇:html 顯示不了當前時間,代碼如下
  • 下一篇:設計算法求二叉樹所包含的度為1的結點的數目。(給出設計思想,再用代碼實現)
  • copyright 2024編程學習大全網