當前位置:編程學習大全網 - 編程語言 - spark 啟動worker時出錯,求解答

spark 啟動worker時出錯,求解答

基於spark1.3.1的源碼進行分析

Spark master啟動源碼分析

1、在start-master.sh調用master的main方法,main方法調用

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//啟動系統和actor

actorSystem.awaitTermination()

}

2、調用startSystemAndActor啟動系統和創建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]

(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

3、調用AkkaUtils.createActorSystem來創建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int => (ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、調用Utils.startServiceOnPort啟動壹個端口上的服務,創建成功後調用doCreateActorSystem創建ActorSystem

5、ActorSystem創建成功後創建Actor

6、調用Master的主構造函數,執行preStart()

1、start-slaves.sh調用Worker類的main方法

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2、調用startSystemAndActor啟動系統和創建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems

val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

val actorName = "Worker"

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(actorSystem, boundPort)

}

3、調用AkkaUtils的createActorSystem創建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int => (ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、創建完ActorSystem後調用Worker的主構造函數,執行preStart方法

override def preStart() {

assert(!registered)

logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(

host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Attach the worker metrics servlet handler to the web ui after the metrics system is started.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5、調用registerWithMaster方法向Master註冊啟動的worker

def registerWithMaster() {

// DisassociatedEvent may be triggered multiple times, so don't attempt registration

// if there are outstanding registration attempts scheduled.

registrationRetryTimer match {

case None =>

registered = false

tryRegisterAllMasters()

connectionAttemptCount = 0

registrationRetryTimer = Some {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =>

logInfo("Not spawning another attempt to register with the master, since there is an" +

" attempt scheduled already.")

}

}

6、調用tryRegisterAllMasters向Master發送註冊的Worker消息

private def tryRegisterAllMasters() {

for (masterAkkaUrl <- masterAkkaUrls) {

logInfo("Connecting to master " + masterAkkaUrl + "...")

val actor = context.actorSelection(masterAkkaUrl)

actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

}

}

7、Master的receiveWithLogging接收到消息執行

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>

{

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

sender, workerUiPort, publicAddress)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

schedule()

} else {

val workerAddress = worker.actor.path.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

8、失敗向worker返回失敗消息,成功則返回Master的相關信息

9、返回消息後調用schedule,但是因為沒有application,所以這時候不會進行資源的分配

至此整個Spark集群就已經啟動完成

  • 上一篇:第四代計算機(超大規模集成電路時代)的特點是什麽?
  • 下一篇:樂理入門基礎知識
  • copyright 2024編程學習大全網