在這篇文章中,我將通過網站的事件近實時回話的例子演示使妳熟悉壹些常見的和高級的Spark Streaming功能,然後加載活動有關的統計數據到Apache HBase,用不喜歡的BI用具來繪圖分析。 (Sessionization指的是捕獲的單壹訪問者的網站會話時間範圍內所有點擊流活動。)妳可以在這裏找到了這個演示的代碼。
像這樣的系統對於了解訪問者的行為(無論是人還是機器)是超級有用的。通過壹些額外的工作它也可以被設計成windowing模式來以異步方式檢測可能的欺詐。
Spark Streaming 代碼
我們的例子中的main class是:
com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData
讓我們來看看這段代碼段(忽略1-59行,其中包含imports 和其他無聊的東西)。
60到112行:設置Spark Streaming 這些行是非常基本的,用來設置的Spark Streaming,同時可以選擇從HDFS或socket接收數據流。如果妳在Spark Streaming方面是壹個新手,我已經添加了壹些詳細的註釋幫助理解代碼。 (我不打算在這裏詳談,因為仍然在樣例代碼裏。)
//This is just creating a Spark Config object. I don't do much here but
//add the app name. There are tons of options to put into the Spark config,
//but none are needed for this simple example.
val sparkConf = new SparkConf().
setAppName("SessionizeData " + args(0)).
set("spark.cleaner.ttl", "120000")
//These two lines will get us out SparkContext and our StreamingContext.
//These objects have all the root functionality we need to get started.
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//Here are are loading our HBase Configuration object. This will have
//all the information needed to connect to our HBase cluster.
//There is nothing different here from when you normally interact with HBase.
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
//This is a HBaseContext object. This is a nice abstraction that will hide
//any complex HBase stuff from us so we can focus on our business case
//HBaseContext is from the SparkOnHBase project which can be found at
// /tmalaska/SparkOnHBase
val hbaseContext = new HBaseContext(sc, conf);
//This is create a reference to our root DStream. DStreams are like RDDs but
//with the context of being in micro batch world. I set this to null now
//because I later give the option of populating this data from HDFS or from
//a socket. There is no reason this could not also be populated by Kafka,
//Flume, MQ system, or anything else. I just focused on these because
//there are the easiest to set up.
var lines: DStream[String] = null
//Options for data load. Will be adding Kafka and Flume at some point
if (args(0).equals("socket")) {
val host = args(FIXED_ARGS);
val port = args(FIXED_ARGS + 1);
println("host:" + host)
println("port:" + Integer.parseInt(port))
//Simple example of how you set up a receiver from a Socket Stream
lines = ssc.socketTextStream(host, port.toInt)
} else if (args(0).equals("newFile")) {
val directory = args(FIXED_ARGS)
println("directory:" + directory)
//Simple example of how you set up a receiver from a HDFS folder
lines = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true).map(_._2.toString)
} else {
throw new RuntimeException("bad input type")
}
114到124行: 字符串解析 這裏是Spark Streaming的開始的地方. 請看下面四行::
val ipKeyLines = lines.map[(String, (Long, Long, String))](eventRecord => {
//Get the time and ip address out of the original event
val time = dateFormat.parse(
eventRecord.substring(eventRecord.indexOf('[') + 1, eventRecord.indexOf(']'))).
getTime()
val ipAddress = eventRecord.substring(0, eventRecord.indexOf(' '))
//We are return the time twice because we will use the first at the start time
//and the second as the end time
(ipAddress, (time, time, eventRecord))
})
上面第壹命令是在DSTREAM對象“lines”上進行了map函數和,解析原始事件來分離出的IP地址,時間戳和事件的body。對於那些Spark Streaming的新手,壹個DSTREAM保存著要處理的壹批記錄。這些記錄由以前所定義的receiver對象填充,並且此map函數在這個micro-batch內產生另壹個DSTREAM存儲變換後的記錄來進行額外的處理。
當看像上面的Spark Streaming示意圖時,有壹些事情要註意::
每個micro-batch在到達構建StreamingContext時設定的那壹秒時被銷毀
Receiver總是用被下壹個micro-batch中的RDDS填充
之前micro batch中老的RDDs將被清理丟棄
126到135行:產生Sessions 現在,我們有從網絡日誌中獲得的IP地址和時間,是時候建立sessions了。下面的代碼是通過micro-batch內的第壹聚集事件建立session,然後在DSTREAM中reduce這些會話。
val latestSessionInfo = ipKeyLines.
map[(String, (Long, Long, Long))](a => {
//transform to (ipAddress, (time, time, counter))
(a._1, (a._2._1, a._2._2, 1))
}).
reduceByKey((a, b) => {
//transform to (ipAddress, (lowestStartTime, MaxFinishTime, sumOfCounter))
(Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3)
}).
updateStateByKey(updateStatbyOfSessions)
這裏有壹個關於records如何在micro-batch中被reduce的例子:
在會話範圍內的 micro-batch 內加入,我們可以用超酷的updateStateByKey功能(做join/reduce-like操作)下圖說明了就DStreams而言,隨著時間變化這個處理過程是怎樣的。
現在,讓我們深入到updateStatbyOfSessions函數,它被定義在文件的底部。此代碼(註意詳細註釋)含有大量的魔法,使sessionization發生在micro-batch的連續模式中。
/**
* This function will be called for to union of keys in the Reduce DStream
* with the active sessions from the last micro batch with the ipAddress
* being the key
*
* To goal is that this produces a stateful RDD that has all the active
* sessions. So we add new sessions and remove sessions that have timed
* out and extend sessions that are still going
*/
def updateStatbyOfSessions(
//(sessionStartTime, sessionFinishTime, countOfEvents)
a: Seq[(Long, Long, Long)],
//(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession)
b: Option[(Long, Long, Long, Boolean)]
): Option[(Long, Long, Long, Boolean)] = {
//This function will return a Optional value.
//If we want to delete the value we can return a optional "None".
//This value contains four parts
//(startTime, endTime, countOfEvents, isNewSession)
var result: Option[(Long, Long, Long, Boolean)] = null
// These if statements are saying if we didn't get a new event for
//this session's ip address for longer then the session
//timeout + the batch time then it is safe to remove this key value
//from the future Stateful DStream
if (a.size == 0) {
if (System.currentTimeMillis() - b.get._2 > SESSION_TIMEOUT + 11000) {
result = None
} else {
if (b.get._4 == false) {
result = b
} else {
result = Some((b.get._1, b.get._2, b.get._3, false))
}
}
}
//Now because we used the reduce function before this function we are
//only ever going to get at most one event in the Sequence.
a.foreach(c => {
if (b.isEmpty) {
//If there was no value in the Stateful DStream then just add it
//new, with a true for being a new session
result = Some((c._1, c._2, c._3, true))
} else {
if (c._1 - b.get._2 < SESSION_TIMEOUT) {
//If the session from the stateful DStream has not timed out
//then extend the session
result = Some((
Math.min(c._1, b.get._1),
//newStartTime
Math.max(c._2, b.get._2),
//newFinishTime
b.get._3 + c._3,
//newSumOfEvents
false
//This is not a new session
))
} else {
//Otherwise remove the old session with a new one
result = Some((
c._1,
//newStartTime
c._2,
//newFinishTime
b.get._3,
//newSumOfEvents
true
//new session
))
}
}
})
result
}
}
在這段代碼做了很多事,而且通過很多方式,這是整個工作中最復雜的部分。總之,它跟蹤活動的會話,所以妳知道妳是繼續現有的會話還是啟動壹個新的。
126到207行:計數和HBase 這部分做了大多數計數工作。在這裏有很多是重復的,讓我們只看壹個count的例子,然後壹步步地我們把生成的同壹個記錄counts存儲在HBase中。
val onlyActiveSessions = latestSessionInfo.filter(t => System.currentTimeMillis() - t._2._2 < SESSION_TIMEOUT)
…
val newSessionCount = onlyActiveSessions.filter(t => {
//is the session newer then that last micro batch
//and is the boolean saying this is a new session true
(System.currentTimeMillis() - t._2._2 > 11000 && t._2._4)
}).
count.
map[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))
總之,上面的代碼是過濾除了活動的會話其他所有會話,對他們進行計數,並把該最終計記錄到壹個的HashMap實例中。它使用HashMap作為容 器,所以在所有的count做完後,我們可以調用下面的reduce函數把他們都到壹個單壹的記錄。 (我敢肯定有更好的方法來實現這壹點,但這種方法工 作得很好。)
接下來,下面的代碼處理所有的那些HashMap,並把他們所有的值在壹個HashMap中。
val allCounts = newSessionCount.
union(totalSessionCount).
union(totals).
union(totalEventsCount).
union(deadSessionsCount).
union(totalSessionEventCount).
reduce((a, b) => b ++ a)
用HBaseContext來使Spark Streaming與HBase交互超級簡單。所有妳需要做的就是用HashMap和函數將其轉換為壹個put對象提供給DSTREAM。
hbaseContext.streamBulkPut[HashMap[String, Long]](
allCounts,
//The input RDD
hTableName,
//The name of the table we want to put too
(t) => {
//Here we are converting our input record into a put
//The rowKey is C for Count and a backward counting time so the newest
//count show up first in HBase's sorted order
val put = new Put(Bytes.toBytes("C." + (Long.MaxValue - System.currentTimeMillis())))
//We are iterating through the HashMap to make all the columns with their counts
t.foreach(kv => put.add(Bytes.toBytes(hFamily), Bytes.toBytes(kv._1), Bytes.toBytes(kv._2.toString)))
put
},
false)
現在,HBase的這些信息可以用Apache Hive table包起來,然後通過妳喜歡的BI工具執行壹個查詢來獲取像下面這樣的圖,它每次micro-batch會刷新。
209到215行:寫入HDFS 最後的任務是把擁有事件數據的活動會話信息加入,然後把事件以會話的開始時間來持久化到HDFS。
//Persist to HDFS
ipKeyLines.join(onlyActiveSessions).
map(t => {
//Session root start time | Event message
dateFormat.format(new Date(t._2._2._1)) + "t" + t._2._1._3
}).
saveAsTextFiles(outputDir + "/session", "txt")