Spark master啟動源代碼分析
1.在start-master.sh調用master的main方法。
def main(arg strings:Array[String]){
SignalLogger.register(日誌)
val conf =新SparkConf
val args = new master arguments(arg strings,conf)
Val (actor system,_,_,_)= startsystemandactor(args . host,args.port,args.webuiport,conf)//啟動系統和actor。
actorSystem.awaitTermination()
}
2.調用startSystemAndActor來啟動系統並創建執行元。
def startSystemAndActor(
主持人:字符串,
port: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem,Int,Int,Option[Int]) = {
val securityMgr =新的安全管理器(conf)
val (actorSystem,bound port)= akkautils . createactorsystem(系統名,主機,端口,conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master],host,boundPort,webUiPort,securityMgr,conf),actorName)
val time out = akkautils . ask time out(conf)
val ports request = actor . ask(BoundPortsRequest)(超時)
val ports response = await . result(ports request,timeout)。作為[BoundPortsResponse]的實例
(actorSystem,boundPort,portsResponse.webUIPort,portsResponse.restPort)
3.調用AkkaUtils.createActorSystem來創建ActorSystem。
def createActorSystem(
名稱:字符串,
主持人:字符串,
port: Int,
conf: SparkConf,
security manager:security manager):(ActorSystem,Int) = {
val startService:Int = & gt;(ActorSystem,Int)= { actual port = & gt;
doCreateActorSystem(名稱、主機、實際端口、配置、安全管理器)
}
Utils.startServiceOnPort(端口,啟動服務,配置,名稱)
}
4.調用Utils.startServiceOnPort在端口上啟動服務,創建成功後再調用DocCreator System創建ActorSystem。
5.成功創建ActorSystem後創建Actor。
6.調用Master的主構造函數並執行preStart()。
1和start-slaves.sh調用Worker類的main方法。
def main(arg strings:Array[String]){
SignalLogger.register(日誌)
val conf =新SparkConf
val args = new worker arguments(arg strings,conf)
val (actorSystem,_)= startSystemAndActor(args . host,args.port,args.webUiPort,args.cores,
args.memory,args.masters,args.workDir)
actorSystem.awaitTermination()
}
2.調用startSystemAndActor來啟動系統並創建執行元。
def startSystemAndActor(
主持人:字符串,
port: Int,
webUiPort: Int,
核心:Int,
內存:Int,
master URL:Array[String],
workDir: String,
workerNumber: Option[Int] =無,
conf:spark conf = new spark conf):(actor system,Int) = {
LocalSparkCluster運行多個本地sparkWorkerX參與者系統
val system name = " spark worker "+worker number . map(_。toString)。getOrElse(" ")
val actorName = "Worker "
val securityMgr =新的安全管理器(conf)
val (actorSystem,bound port)= akkautils . createactorsystem(系統名,主機,端口,
conf = conf,securityManager = securityMgr)
val masterAkkaUrls = master URLs . map(master . toakkaurl(_,AkkaUtils.protocol(actorSystem)))
actor system . actor of(Props(class of[Worker],host,boundPort,webUiPort,cores,memory,
masterAkkaUrls,systemName,actorName,workDir,conf,securityMgr),name = actorName)
(actorSystem,boundPort)
}
3.調用AkkaUtils的createActorSystem來創建ActorSystem。
def createActorSystem(
名稱:字符串,
主持人:字符串,
port: Int,
conf: SparkConf,
security manager:security manager):(ActorSystem,Int) = {
val startService:Int = & gt;(ActorSystem,Int)= { actual port = & gt;
doCreateActorSystem(名稱、主機、實際端口、配置、安全管理器)
}
Utils.startServiceOnPort(端口,啟動服務,配置,名稱)
}
4.創建ActorSystem後,調用Worker的主構造函數,執行preStart方法。
覆蓋def preStart() {
斷言(!已註冊)
logInfo("正在啟動Spark worker %s:%d,有%d個內核,%s個RAM ")。格式(
主機、端口、核心、實用程序。兆字節字符串(內存)))
logInfo(s "運行Spark版本$ { org . Apache . Spark . Spark _ VERSION } ")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context . system . event stream . subscribe(self,class of[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this,workDir,webUiPort)
webUi.bind()
registerWithMaster()
metrics system . register source(worker source)
metricsSystem.start()
//在指標系統啟動後,將worker指標servlet處理程序附加到web ui。
metrics system . getservlethandlers . foreach(webui . attach handler)
}
5.調用registerWithMaster方法向主進程註冊已啟動的工作進程。
def registerWithMaster() {
// DisassociatedEvent可能會被觸發多次,所以不要嘗試註冊
//如果計劃了未完成的註冊嘗試。
註冊重試計時器匹配{
case None = & gt
註冊=假
tryRegisterAllMasters()
連接嘗試計數= 0
registrationRetryTimer = Some {
context . system . scheduler . schedule(初始註冊重試間隔,
INITIAL _ REGISTRATION _ RETRY _ INTERVAL,self,registerWithMaster)
}
case Some(_)= & gt;
logInfo("不會產生另壹個向主服務器註冊的嘗試,因為有壹個"+
"已經安排了嘗試。")
}
}
6.調用tryRegisterAllMasters將註冊的工作消息發送到主服務器。
private def tryRegisterAllMasters(){
for(masterAkkaUrl & lt;- masterAkkaUrls) {
logInfo("連接到主服務器"+ masterAkkaUrl +" ... ")
val actor = context . actor selection(masterAkkaUrl)
演員!RegisterWorker(workerId,host,port,cores,memory,webUi.boundPort,publicAddress)
}
}
7.主機的receiveWithLogging接收消息並執行它。
case RegisterWorker(id,workerHost,workerPort,cores,memory,workerUiPort,public address)= & gt;
{
logInfo("正在註冊工作進程%s:%d,具有%d個內核,%s個RAM ")。格式(
workerHost,workerPort,cores,Utils.megabytesToString(memory)))
if (state == RecoveryState。待機){
//忽略,不發送響應
} else if(idtoworker . contains(id)){
發件人!RegisterWorkerFailed("重復的工作ID ")
}否則{
val worker = new WorkerInfo(id,workerHost,workerPort,cores,memory,
發件人、工作報告、公共地址)
if (registerWorker(worker)) {
persistence engine . add worker(worker)
發件人!RegisteredWorker(masterUrl,masterWebUiUrl)
計劃()
}否則{
val worker address = worker . actor . path . address
logWarning("工作者註冊失敗。試圖在同壹個"+
"地址:"+工作地址)
發件人!RegisterWorkerFailed("試圖在同壹地址重新註冊工作程序: "
+工作地址)
}
}
}
8.如果失敗,則向worker返回失敗消息,如果成功,則返回Master的相關信息。
9.返回消息後調用schedule,但由於沒有應用程序,此時不會分配資源。
至此,整個星火集群已經啟動。