1.RDD介紹:
RDD,彈性分布式數據集,即分布式的元素集合。在spark中,對所有數據的操作不外乎是創建RDD、轉化已有的RDD以及調用RDD操作進行求值。在這壹切的背後,Spark會自動將RDD中的數據分發到集群中,並將操作並行化。
Spark中的RDD就是壹個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
用戶可以使用兩種方法創建RDD:讀取壹個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,比如list或者set。
RDD的轉化操作都是惰性求值的,這意味著我們對RDD調用轉化操作,操作不會立即執行。相反,Spark會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD看做存放著特定數據的數據集,而最好把每個RDD當做我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操作也是惰性的,數據只會在必要時讀取。轉化操作和讀取操作都有可能多次執行。
2.創建RDD數據集
(1)讀取壹個外部數據集
val input=sc.textFile(inputFileDir)
(2)分發對象集合,這裏以list為例
val lines =sc.parallelize(List("hello world","this is a test"));
3.RDD操作
(1)轉化操作
實現過濾器轉化操作:
val lines =sc.parallelize(List("error:a","error:b","error:c","test"));
val errors=lines.filter(line => line.contains("error"));
errors.collect().foreach(println);
輸出:
error:a
error:b
error:c
可見,列表list中包含詞語error的表項都被正確的過濾出來了。
(2)合並操作
將兩個RDD數據集合並為壹個RDD數據集
接上述程序示例:
val lines =sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));
val errors=lines.filter(line => line.contains("error"));
val warnings =lines.filter(line => line.contains("warnings"));
val unionLines =errors.union(warnings);
unionLines.collect().foreach(println);
輸出:
error:a
error:b
error:c
warning:a
可見,將原始列表項中的所有error項和warning項都過濾出來了。
(3)獲取RDD數據集中的部分或者全部元素
①獲取RDD數據集中的部分元素 ? .take(int num) ?返回值List<T> ?
獲取RDD數據集中的前num項。
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
程序示例:接上
unionLines.take(2).foreach(println);
輸出:
error:a
error:b
可見,輸出了RDD數據集unionLines的前2項
②獲取RDD數據集中的全部元素 .collect() 返回值 List<T>
程序示例:
val all =unionLines.collect();
all.foreach(println);
遍歷輸出RDD數據集unionLines的每壹項
4.向spark傳遞函數
在scala中,我們可以把定義的內聯函數、方法的引用或靜態方法傳遞給Spark,就像Scala的其他函數式API壹樣。我們還要考慮其他壹些細節,必須所傳遞的函數及其引用的數據需要是可序列化的(實現了Java的Serializable接口)。除此之外,與Python類似,傳遞壹個對象的方法或者字段時,會包含對整個對象的引用。我們可以把需要的字段放在壹個局部變量中,來避免包含該字段的整個對象。
class searchFunctions (val query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchFunctionReference(rdd: RDD[String]) :RDD[String]={
//問題: isMach表示 this.isMatch ,因此我們需要傳遞整個this
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd: RDD[String]) :RDD[String] ={
//問題: query表示 this.query ,因此我們需要傳遞整個this
rdd.flatMap(line => line.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={
//安全,只把我們需要的字段拿出來放入局部變量之中
val query1=this.query;
rdd.flatMap(x =>x.split(query1)
)
}
}
5.針對每個元素的轉化操作:
轉化操作map()接收壹個函數,把這個函數用於RDD中的每個元素,將函數的返回結果作為結果RDD中對應的元素。關鍵詞:轉化
轉化操作filter()接受壹個函數,並將RDD中滿足該函數的元素放入新的RDD中返回。關鍵詞:過濾
示例圖如下所示:
①map()
計算RDD中各值的平方
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.map(value => value*value);
println(result.collect().mkString(","));
輸出:
1,4,9,16
filter()
②?去除RDD集合中值為1的元素:
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.filter(value => value!=1);
println(result.collect().mkString(","));
結果:
2,3,4
我們也可以采取傳遞函數的方式,就像這樣:
函數:
def filterFunction(value:Int):Boolean ={
value!=1
}
使用:
val rdd=sc.parallelize(List(1,2,3,4));
val result=rdd.filter(filterFunction);
println(result.collect().mkString(","));
③ 有時候,我們希望對每個輸入元素生成多個輸出元素。實現該功能的操作叫做flatMap()。和map()類似,我們提供給flatMap()的函數被分別應用到了輸入的RDD的每個元素上。不過返回的不是壹個元素,而是壹個返回值序列的叠代器。輸出的RDD倒不是由叠代器組成的。我們得到的是壹個包含各個叠代器可以訪問的所有元素的RDD。flatMap()的壹個簡單用途是將輸入的字符串切分成單詞,如下所示:?
val rdd=sc.parallelize(List("Hello world","hello you","world i love you"));
val result=rdd.flatMap(line => line.split(" "));
println(result.collect().mkString("\n"));
輸出:
hello
world
hello
you
world
i
love
you
6.集合操作
RDD中的集合操作
函數
用途
RDD1.distinct()
生成壹個只包含不同元素的新RDD。需要數據混洗。
RDD1.union(RDD2)
返回壹個包含兩個RDD中所有元素的RDD
RDD1.intersection(RDD2)
只返回兩個RDD中都有的元素
RDD1.substr(RDD2)
返回壹個只存在於第壹個RDD而不存在於第二個RDD中的所有元素組成的RDD。需要數據混洗。
集合操作對笛卡爾集的處理:
RDD1.cartesian(RDD2)
返回兩個RDD數據集的笛卡爾集
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集
val rdd1=sc.parallelize(List(1,2));
val rdd2=sc.parallelize(List(1,2));
val rdd=rdd1.cartesian(rdd2);
println(rdd.collect().mkString("\n"));
輸出:
(1,1)
(1,2)
(2,1)
(2,2)
7.行動操作
(1)reduce操作
reduce()接收壹個函數作為參數,這個函數要操作兩個RDD的元素類型的數據並返回壹個同樣類型的新元素。壹個簡單的例子就是函數+,可以用它來對我們的RDD進行累加。使用reduce(),可以很方便地計算出RDD中所有元素的總和,元素的個數,以及其他類型的聚合操作。
以下是求RDD數據集所有元素和的程序示例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.reduce((x,y) =>x+y);
println(results);
輸出:55
(2)fold()操作
接收壹個與reduce()接收的函數簽名相同的函數,再加上壹個初始值來作為每個分區第壹次調用時的結果。妳所提供的初始值應當是妳提供的操作的單位元素,也就是說,使用妳的函數對這個初始值進行多次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操作對應的空列表)。
程序實例:
①計算RDD數據集中所有元素的和:
zeroValue=0;//求和時,初始值為0。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(0)((x,y) =>x+y);
println(results);
②計算RDD數據集中所有元素的積:
zeroValue=1;//求積時,初始值為1。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(1)((x,y) =>x*y);
println(results);
(3)aggregate()操作
aggregate()函數返回值類型不必與所操作的RDD類型相同。
與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然後通過壹個函數把RDD中的元素合並起來放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。
以下是程序實例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val result=rdd.aggregate((0,0))(
(acc,value) =>(acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)
)
val average=result._1/result._2;
println(average)
輸出:5
最終返回的是壹個Tuple2<int,int>對象, 他被初始化為(0,0),當遇到壹個int值時,將該int數的值加到Tuple2對象的_1中,並將_2值加1,如果遇到壹個Tuple2對象時,將這個Tuple2的_1和_2的值歸並到最終返回的Tuple2值中去。
表格:對壹個數據為{1,2,3,3}的RDD進行基本的RDD行動操作
函數名 目的 示例 結果
collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}
count() RDD的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {(1,1),
(2,1),
(3,2)
}
take(num) 從RDD中返回num個元素 rdd.take(2) {1,2}
top(num) 從RDD中返回最前面的num個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeOrdered(num)
(ordering) 從RDD中按照提供的順序返回最前面的num個元素
rdd.takeSample(false,1) 非確定的
takeSample(withReplacement,num,[seed]) 從RDD中返回任意壹些元素 rdd.takeSample(false,1) 非確定的
reduce(func) 並行整合RDD中所有數據 rdd.reduce((x,y) => x+y)
9
fold(zero)(func) 和reduce()壹樣,但是需要提供初始值 rdd.fold(0)((x,y) => x+y)
9
aggregate(zeroValue)(seqOp,combOp) 和reduce()相似,但是通常返回不同類型的函數 rdd.aggregate((0,0))
((x,y) =>
(x._1+y,x._2+1),
(x,y)=>
(x._1+y._1,x._2+y._2)
) (9,4)
foreach(func) 對RDD中的每個元素使用給定的函數 rdd.foreach(func) 無
8.持久化緩存
因為Spark RDD是惰性求值的,而有時我們希望能多次使用同壹個RDD。如果簡單地對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在叠代算法中消耗格外大,因為叠代算法常常會多次使用同壹組數據。
為了避免多次計算同壹個RDD,可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲壹個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。
出於不同的目的,我們可以為RDD選擇不同的持久化級別。默認情況下persist()會把數據以序列化的形式緩存在JVM的堆空間中
不同關鍵字對應的存儲級別表
級別
使用的空間
cpu時間
是否在內存
是否在磁盤
備註
MEMORY_ONLY
高
低
是
否
直接儲存在內存
MEMORY_ONLY_SER
低
高
是
否
序列化後儲存在內存裏
MEMORY_AND_DISK
低
中等
部分
部分
如果數據在內存中放不下,溢寫在磁盤上
MEMORY_AND_DISK_SER
低
高
部分
部分
數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。
DISK_ONLY
低
高
否
是
直接儲存在硬盤裏面
程序示例:將RDD數據集持久化在內存中。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);
println(rdd.count())
println(rdd.collect().mkString(","));
RDD還有unpersist()方法,調用該方法可以手動把持久化的RDD從緩存中移除。
9.不同的RDD類型
在scala中,將RDD轉為由特定函數的RDD(比如在RDD[Double]上進行數值操作),是由隱式轉換來自動處理的。這些隱式轉換可以隱式地將壹個RDD轉為各種封裝類,比如DoubleRDDFunctions(數值數據的RDD)和PairRDDFunctions(鍵值對RDD),這樣我們就有了諸如mean()和variance()之類的額外的函數。
示例程序:
val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));
println(rdd.mean());
其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。