當前位置:編程學習大全網 - 源碼下載 - Spark源代碼編譯失敗

Spark源代碼編譯失敗

分析基於spark1.3.1的源代碼。

Spark master啟動源代碼分析

1.在start-master.sh調用master的main方法。

def main(arg strings:Array[String]){

SignalLogger.register(日誌)

val conf =新SparkConf

val args = new master arguments(arg strings,conf)

Val (actor system,_,_,_)= startsystemandactor(args . host,args.port,args.webuiport,conf)//啟動系統和actor。

actorSystem.awaitTermination()

}

2.調用startSystemAndActor來啟動系統並創建執行元。

def startSystemAndActor(

主持人:字符串,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem,Int,Int,Option[Int]) = {

val securityMgr =新的安全管理器(conf)

val (actorSystem,bound port)= akkautils . createactorsystem(系統名,主機,端口,conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master],host,boundPort,webUiPort,securityMgr,conf),actorName)

val time out = akkautils . ask time out(conf)

val ports request = actor . ask(BoundPortsRequest)(超時)

val ports response = await . result(ports request,timeout)。作為[BoundPortsResponse]的實例

(actorSystem,boundPort,portsResponse.webUIPort,portsResponse.restPort)

3.調用AkkaUtils.createActorSystem來創建ActorSystem。

def createActorSystem(

名稱:字符串,

主持人:字符串,

port: Int,

conf: SparkConf,

security manager:security manager):(ActorSystem,Int) = {

val startService:Int = & gt;(ActorSystem,Int)= { actual port = & gt;

doCreateActorSystem(名稱、主機、實際端口、配置、安全管理器)

}

Utils.startServiceOnPort(端口,啟動服務,配置,名稱)

}

4.調用Utils.startServiceOnPort在端口上啟動服務,創建成功後再調用DocCreator System創建ActorSystem。

5.成功創建ActorSystem後創建Actor。

6.調用Master的主構造函數並執行preStart()。

1和start-slaves.sh調用Worker類的main方法。

def main(arg strings:Array[String]){

SignalLogger.register(日誌)

val conf =新SparkConf

val args = new worker arguments(arg strings,conf)

val (actorSystem,_)= startSystemAndActor(args . host,args.port,args.webUiPort,args.cores,

args.memory,args.masters,args.workDir)

actorSystem.awaitTermination()

}

2.調用startSystemAndActor來啟動系統並創建執行元。

def startSystemAndActor(

主持人:字符串,

port: Int,

webUiPort: Int,

核心:Int,

內存:Int,

master URL:Array[String],

workDir: String,

workerNumber: Option[Int] =無,

conf:spark conf = new spark conf):(actor system,Int) = {

LocalSparkCluster運行多個本地sparkWorkerX參與者系統

val system name = " spark worker "+worker number . map(_。toString)。getOrElse(" ")

val actorName = "Worker "

val securityMgr =新的安全管理器(conf)

val (actorSystem,bound port)= akkautils . createactorsystem(系統名,主機,端口,

conf = conf,securityManager = securityMgr)

val masterAkkaUrls = master URLs . map(master . toakkaurl(_,AkkaUtils.protocol(actorSystem)))

actor system . actor of(Props(class of[Worker],host,boundPort,webUiPort,cores,memory,

masterAkkaUrls,systemName,actorName,workDir,conf,securityMgr),name = actorName)

(actorSystem,boundPort)

}

3.調用AkkaUtils的createActorSystem來創建ActorSystem。

def createActorSystem(

名稱:字符串,

主持人:字符串,

port: Int,

conf: SparkConf,

security manager:security manager):(ActorSystem,Int) = {

val startService:Int = & gt;(ActorSystem,Int)= { actual port = & gt;

doCreateActorSystem(名稱、主機、實際端口、配置、安全管理器)

}

Utils.startServiceOnPort(端口,啟動服務,配置,名稱)

}

4.創建ActorSystem後,調用Worker的主構造函數,執行preStart方法。

覆蓋def preStart() {

斷言(!已註冊)

logInfo("正在啟動Spark worker %s:%d,有%d個內核,%s個RAM ")。格式(

主機、端口、核心、實用程序。兆字節字符串(內存)))

logInfo(s "運行Spark版本$ { org . Apache . Spark . Spark _ VERSION } ")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context . system . event stream . subscribe(self,class of[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this,workDir,webUiPort)

webUi.bind()

registerWithMaster()

metrics system . register source(worker source)

metricsSystem.start()

//在指標系統啟動後,將worker指標servlet處理程序附加到web ui。

metrics system . getservlethandlers . foreach(webui . attach handler)

}

5.調用registerWithMaster方法向主進程註冊已啟動的工作進程。

def registerWithMaster() {

// DisassociatedEvent可能會被觸發多次,所以不要嘗試註冊

//如果計劃了未完成的註冊嘗試。

註冊重試計時器匹配{

case None = & gt

註冊=假

tryRegisterAllMasters()

連接嘗試計數= 0

registrationRetryTimer = Some {

context . system . scheduler . schedule(初始註冊重試間隔,

INITIAL _ REGISTRATION _ RETRY _ INTERVAL,self,registerWithMaster)

}

case Some(_)= & gt;

logInfo("不會產生另壹個向主服務器註冊的嘗試,因為有壹個"+

"已經安排了嘗試。")

}

}

6.調用tryRegisterAllMasters將註冊的工作消息發送到主服務器。

private def tryRegisterAllMasters(){

for(masterAkkaUrl & lt;- masterAkkaUrls) {

logInfo("連接到主服務器"+ masterAkkaUrl +" ... ")

val actor = context . actor selection(masterAkkaUrl)

演員!RegisterWorker(workerId,host,port,cores,memory,webUi.boundPort,publicAddress)

}

}

7.主機的receiveWithLogging接收消息並執行它。

case RegisterWorker(id,workerHost,workerPort,cores,memory,workerUiPort,public address)= & gt;

{

logInfo("正在註冊工作進程%s:%d,具有%d個內核,%s個RAM ")。格式(

workerHost,workerPort,cores,Utils.megabytesToString(memory)))

if (state == RecoveryState。待機){

//忽略,不發送響應

} else if(idtoworker . contains(id)){

發件人!RegisterWorkerFailed("重復的工作ID ")

}否則{

val worker = new WorkerInfo(id,workerHost,workerPort,cores,memory,

發件人、工作報告、公共地址)

if (registerWorker(worker)) {

persistence engine . add worker(worker)

發件人!RegisteredWorker(masterUrl,masterWebUiUrl)

計劃()

}否則{

val worker address = worker . actor . path . address

logWarning("工作者註冊失敗。試圖在同壹個"+

"地址:"+工作地址)

發件人!RegisterWorkerFailed("試圖在同壹地址重新註冊工作程序: "

+工作地址)

}

}

}

8.如果失敗,則向worker返回失敗消息,如果成功,則返回Master的相關信息。

9.返回消息後調用schedule,但由於沒有應用程序,此時不會分配資源。

至此,整個星火集群已經啟動。

  • 上一篇:unity和unreal4 哪個做室內漫遊虛擬現實更有優勢?
  • 下一篇:清遠馬頭山
  • copyright 2024編程學習大全網