任务文章网站源码分享?任务文章网站源码分享怎么弄

大家好,今天来为大家解答任务文章网站源码分享这个问题的一些问题点,包括任务文章网站源码分享怎么弄也一样很多人还不知道,因此呢,今天就来为大家分析分析,现在让我们一起来看看吧!如果解决了您的问题,还望您关注下本站哦,谢谢~

之前分析了master和worker的启动过程,可以以前的文章。

本文是在standaloneclient模式下,spark-submit提交任务,分析源码知道启动executor成功。

远程调试请参考:「Spark源码分析」-配置idea远程调试

在不同的部署环境下提交任务的流程也不同。讨论standaloneclient模式启动executor代码分析。

主要架构:

4.1.使用spark-submit提交任务(远程调试)

\nspark-submit–classcom.kyrie.first.FirstRemote\\\n–masterspark://192.168.1.62:7077\\\n–driver-java-options”-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=10000″\\\n–confspark.executor.extraJavaOptions=”-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=10002″\\\n/home/hadoop/changyue/spark-source-2.4-1.0-SNAPSHOT.jar\ndriver-java-options用于调试driver\nspark.executor.extraJavaOptions用于调试executor。\n

所以idea中要配置两个远程调试,分配监控10000,10002.

4.2.在spark-submit脚本中调用spark-class把所有参数传过去。

exec”${SPARK_HOME}”/bin/spark-classorg.apache.spark.deploy.SparkSubmit”$@”\n

4.3.在spark-class脚本中启动Main类

\nbuild_command(){\n”$RUNNER”-Xmx128m-cp”$LAUNCH_CLASSPATH”org.apache.spark.launcher.Main”$@”\nprintf”%d\\0″$?\n}\nCMD=()\nwhileIFS=read-d”-rARG;do\nCMD+=(“$ARG”)\ndone<<(build_command”$@”)\n

4.4.执行launcher.Main类的main函数

\n作用:解析上面的shell传递过来的参数,启动SparkSubmit类调用main函数\n\npublicstaticvoidmain(String[]argsArray)throwsException{\ncheckArgument(argsArray.length>0,”Notenougharguments:missingclassname.”);\nList<String>args=newArrayList<>(Arrays.asList(argsArray));\nStringclassName=args.remove(0);\nbooleanprintLaunchCommand=!isEmpty(System.getenv(“SPARK_PRINT_LAUNCH_COMMAND”));\nAbstractCommandBuilderbuilder;\nif(className.equals(“org.apache.spark.deploy.SparkSubmit”)){\ntry{\n//解析shell传过来的参数,mainClass就是我们程序的主函数:com.kyrie.first.FirstRemote\nbuilder=newSparkSubmitCommandBuilder(args);\n}catch(IllegalArgumentExceptione){\nprintLaunchCommand=false;\nSystem.err.println(“Error:”+e.getMessage());\nSystem.err.println();\nMainClassOptionParserparser=newMainClassOptionParser();\ntry{\nparser.parse(args);\n}catch(Exceptionignored){\n//Ignoreparsingexceptions.\n}\nList<String>help=newArrayList<>();\nif(parser.className!=null){\nhelp.add(parser.CLASS);\nhelp.add(parser.className);\n}\nhelp.add(parser.USAGE_ERROR);\nbuilder=newSparkSubmitCommandBuilder(help);\n}\n}else{\nbuilder=newSparkClassCommandBuilder(className,args);\n}\nMap<String,String>env=newHashMap<>();\nList<String>cmd=builder.buildCommand(env);\nif(printLaunchCommand){\nSystem.err.println(“SparkCommand:”+join(“”,cmd));\nSystem.err.println(“========================================”);\n}\nif(isWindows()){\nSystem.out.println(prepareWindowsCommand(cmd,env));\n}else{\n//Inbash,useNULLastheargseparatorsinceitcannotbeusedinanargument.\n//构建启动SparkSubmit类的参数\nList<String>bashCmd=prepareBashCommand(cmd,env);\nfor(Stringc:bashCmd){\nSystem.out.print(c);//启动spark-submit类的main函数\nSystem.out.print(‘\\0’);\n}\n}\n}\n

4.5.执行SparkSubmit的main方法

\n作用:调用SparkSubmit.submit函数\n\ndefmain(args:Array[String]):Unit={\n//解析参数\nvalappArgs=newSparkSubmitArguments(args)\nif(appArgs.verbose){\n//scalastyle:offprintln\nprintStream.println(appArgs)\n//scalastyle:onprintln\n}\nappArgs.actionmatch{\ncaseSparkSubmitAction.SUBMIT=>submit(appArgs)\ncaseSparkSubmitAction.KILL=>kill(appArgs)\ncaseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)\n}\n}\n

4.6.SparkSubmit.submit函数

\n作用:1.设置启动环境,根据运行环境返回不同的执行类,standaloneclien模式返回childMainClass是com.kyrie.first.FirstRemote。\n2.调用runMain\n\nprivatedefsubmit(args:SparkSubmitArguments):Unit={\n//返回环境配置\nval(childArgs,childClasspath,sysProps,childMainClass)=prepareSubmitEnvironment(args)\ndefdoRunMain():Unit={\nif(args.proxyUser!=null){\nvalproxyUser=UserGroupInformation.createProxyUser(args.proxyUser,\nUserGroupInformation.getCurrentUser())\ntry{\nproxyUser.doAs(newPrivilegedExceptionAction[Unit](){\noverridedefrun():Unit={\nrunMain(childArgs,childClasspath,sysProps,childMainClass,args.verbose)\n}\n})\n}catch{\ncasee:Exception=>\n//Hadoop’sAuthorizationExceptionsuppressestheexception’sstacktrace,which\n//makesthemessageprintedtotheoutputbytheJVMnotveryhelpful.Instead,\n//detectexceptionswithemptystacktraceshere,andtreatthemdifferently.\nif(e.getStackTrace().length==0){\n//scalastyle:offprintln\nprintStream.println(s”ERROR:${e.getClass().getName()}:${e.getMessage()}”)\n//scalastyle:onprintln\nexitFn(1)\n}else{\nthrowe\n}\n}\n}else{\nrunMain(childArgs,childClasspath,sysProps,childMainClass,args.verbose)\n}\n}\n

4.7.执行Sparksubimt.runMain

创建JavaMainApplication()对象并调用其start方法,start方法调用com.kyrie.first.FirstRemote的main方法。

\nprivatedefrunMain(\nchildArgs:Seq[String],\nchildClasspath:Seq[String],\nsparkConf:SparkConf,\nchildMainClass:String,\nverbose:Boolean):Unit={\nif(verbose){\nlogInfo(s”Mainclass:\\n$childMainClass”)\nlogInfo(s”Arguments:\\n${childArgs.mkString(“\\n”)}”)\n//sysPropsmaycontainsensitiveinformation,soredactbeforeprinting\nlogInfo(s”Sparkconfig:\\n${Utils.redact(sparkConf.getAll.toMap).mkString(“\\n”)}”)\nlogInfo(s”Classpathelements:\\n${childClasspath.mkString(“\\n”)}”)\nlogInfo(“\\n”)\n}\nvalloader=\nif(sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)){\nnewChildFirstURLClassLoader(newArray[URL](0),\nThread.currentThread.getContextClassLoader)\n}else{\nnewMutableURLClassLoader(newArray[URL](0),\nThread.currentThread.getContextClassLoader)\n}\nThread.currentThread.setContextClassLoader(loader)\nfor(jar<-childClasspath){\naddJarToClasspath(jar,loader)\n}\nvarmainClass:Class[_]=null\ntry{\nmainClass=Utils.classForName(childMainClass)\n}catch{\ncasee:ClassNotFoundException=>\nlogWarning(s”Failedtoload$childMainClass.”,e)\nif(childMainClass.contains(“thriftserver”)){\nlogInfo(s”Failedtoloadmainclass$childMainClass.”)\nlogInfo(“YouneedtobuildSparkwith-Phiveand-Phive-thriftserver.”)\n}\nthrownewSparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)\ncasee:NoClassDefFoundError=>\nlogWarning(s”Failedtoload$childMainClass:${e.getMessage()}”)\nif(e.getMessage.contains(“org/apache/hadoop/hive”)){\nlogInfo(s”Failedtoloadhiveclass.”)\nlogInfo(“YouneedtobuildSparkwith-Phiveand-Phive-thriftserver.”)\n}\nthrownewSparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)\n}\nvalapp:SparkApplication=if(classOf[SparkApplication].isAssignableFrom(mainClass)){\nmainClass.newInstance().asInstanceOf[SparkApplication]\n}else{\n//SPARK-4170\nif(classOf[scala.App].isAssignableFrom(mainClass)){\nlogWarning(“Subclassesofscala.Appmaynotworkcorrectly.Useamain()methodinstead.”)\n}\nnewJavaMainApplication(mainClass)\n}\n//删除无用代码\n….\ntry{\napp.start(childArgs.toArray,sparkConf)\n}catch{\ncaset:Throwable=>\nthrowfindCause(t)\n}\n}\n

4.8.JavaMainApplication的start方法

start方法调用com.kyrie.first.FirstRemote的main方法。

\nprivate[deploy]classJavaMainApplication(klass:Class[_])extendsSparkApplication{\noverridedefstart(args:Array[String],conf:SparkConf):Unit={\nvalmainMethod=klass.getMethod(“main”,newArray[String](0).getClass)\nif(!Modifier.isStatic(mainMethod.getModifiers)){\nthrownewIllegalStateException(“Themainmethodinthegivenmainclassmustbestatic”)\n}\nvalsysProps=conf.getAll.toMap\nsysProps.foreach{case(k,v)=>\nsys.props(k)=v\n}\nmainMethod.invoke(null,args)\n}\n

4.9.执行用户自定义类的main方法。

创建SparkContext对象。

\nvarappName=”FirstRemote”\ndefmain(args:Array[String]){\nSystem.setProperty(“HADOOP_USER_NAME”,”hadoop”)\nvalsc=sparkContext()\nvalrdd=sc.textFile(“hdfs://192.168.1.61:9000/a.txt”)\nprintln(rdd.collect().mkString(“,”))\n}\ndefsparkContext():SparkContext={\nvalconf=newSparkConf().setAppName(appName).setMaster(“spark://192.168.1.61:7077”)\nconf.set(“spark.driver.memory”,”400m”)\nconf.set(“spark.executor.memory”,”500m”)\nconf.set(“”,””)\nconf.set(“spark.executor.cores”,”1″)\nvalsc=newSparkContext(conf)\nsc\n}\n

4.10.执行SparkContext构造函数

创建一堆对象,这里只讨论

_schedulerBackend=StandaloneSchedulerBackend

_taskScheduler=TaskSchedulerImpl

这两对象。

createTaskScheduler根据不同的部署模式创建上面两个对象。

调用_taskScheduler.start()方法

\nval(sched,ts)=SparkContext.createTaskScheduler(this,master,deployMode)\n_schedulerBackend=sched\n_taskScheduler=ts\n_dagScheduler=newDAGScheduler(this)\n_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)\n//startTaskScheduleraftertaskSchedulersetsDAGSchedulerreferenceinDAGScheduler’s\n//constructor\n_taskScheduler.start()\n

4.11.执行TaskSchedulerImpl的start方法

调用StandaloneSchedulerBackend方法;

检测是否开启spark.speculation.

\noverridedefstart(){\nbackend.start()\nif(!isLocal&&conf.getBoolean(“spark.speculation”,false)){\nlogInfo(“Startingspeculativeexecutionthread”)\nspeculationScheduler.scheduleWithFixedDelay(newRunnable{\noverridedefrun():Unit=Utils.tryOrStopSparkContext(sc){\ncheckSpeculatableTasks()\n}\n},SPECULATION_INTERVAL_MS,SPECULATION_INTERVAL_MS,TimeUnit.MILLISECONDS)\n}\n}\n

4.12.执行StandaloneSchedulerBackend的start方法

封装driver的信息

设置启动executor的启动类(CoarseGrainedExecutorBackend)和启动环境

创建StandaloneAppClient对象,并调用其start方法

等待application注册完成。

\noverridedefstart(){\nsuper.start()\n//SPARK-21159.Theschedulerbackendshouldonlytrytoconnecttothelauncherwheninclient\n//mode.Inclustermode,thecodethatsubmitstheapplicationtotheMasterneedstoconnect\n//tothelauncherinstead.\nif(sc.deployMode==”client”){\nlauncherBackend.connect()\n}\n//封装driver信息\nvaldriverUrl=RpcEndpointAddress(\nsc.conf.get(“spark.driver.host”),\nsc.conf.get(“spark.driver.port”).toInt,\nCoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString\nvalargs=Seq(\n”–driver-url”,driverUrl,\n”–executor-id”,”{{EXECUTOR_ID}}”,\n”–hostname”,”{{HOSTNAME}}”,\n”–cores”,”{{CORES}}”,\n”–app-id”,”{{APP_ID}}”,\n”–worker-url”,”{{WORKER_URL}}”)\n……\n…..\n//Startexecutorswithafewnecessaryconfigsforregisteringwiththescheduler\nvalsparkJavaOpts=Utils.sparkJavaOpts(conf,SparkConf.isExecutorStartupConf)\nvaljavaOpts=sparkJavaOpts++extraJavaOpts\nvalcommand=Command(“org.apache.spark.executor.CoarseGrainedExecutorBackend”,\nargs,sc.executorEnvs,classPathEntries++testingClassPath,libraryPathEntries,javaOpts)\nvalwebUrl=sc.ui.map(_.webUrl).getOrElse(“”)\nvalcoresPerExecutor=conf.getOption(“spark.executor.cores”).map(_.toInt)\n//Ifwe’reusingdynamicallocation,setourinitialexecutorlimitto0fornow.\n//ExecutorAllocationManagerwillsendtherealinitiallimittotheMasterlater.\nvalinitialExecutorLimit=\nif(Utils.isDynamicAllocationEnabled(conf)){\nSome(0)\n}else{\nNone\n}\nvalappDesc=ApplicationDescription(sc.appName,maxCores,sc.executorMemory,command,\nwebUrl,sc.eventLogDir,sc.eventLogCodec,coresPerExecutor,initialExecutorLimit)\nclient=newStandaloneAppClient(sc.env.rpcEnv,masters,appDesc,this,conf)\nclient.start()\nlauncherBackend.setState(SparkAppHandle.State.SUBMITTED)\nwaitForRegistration()\nlauncherBackend.setState(SparkAppHandle.State.RUNNING)\n}\n

4.13.执行StandaloneAppClient的start方法

创建ClientEndpoint对象,添加到rpcEnv上。此时默认会执行ClientEndpoint的onStart方法。此对象driver对象。

\ndefstart(){\n//JustlaunchanrpcEndpoint;itwillcallbackintothelistener.\nendpoint.set(rpcEnv.setupEndpoint(“AppClient”,newClientEndpoint(rpcEnv)))\n}\n

4.14.执行ClientEndpoint的onStart方法

调用ClientEndpoint.registerWithMaster()方法

\noverridedefonStart():Unit={\ntry{\nregisterWithMaster(1)\n}catch{\ncasee:Exception=>\nlogWarning(“Failedtoconnecttomaster”,e)\nmarkDisconnected()\nstop()\n}\n}\n

4.15.执行registerWithMaster方法

调用tryRegisterAllMasters方法

\nprivatedefregisterWithMaster(nthRetry:Int){\nregisterMasterFutures.set(tryRegisterAllMasters())\nregistrationRetryTimer.set(registrationRetryThread.schedule(newRunnable{\noverridedefrun():Unit={\nif(registered.get){\nregisterMasterFutures.get.foreach(_.cancel(true))\nregisterMasterThreadPool.shutdownNow()\n}elseif(nthRetry>=REGISTRATION_RETRIES){\nmarkDead(“Allmastersareunresponsive!Givingup.”)\n}else{\nregisterMasterFutures.get.foreach(_.cancel(true))\nregisterWithMaster(nthRetry+1)\n}\n}\n},REGISTRATION_TIMEOUT_SECONDS,TimeUnit.SECONDS))\n}\n

4.16.执行tryRegisterAllMaster方法

向master发送RegisterApplication消息

\nprivatedeftryRegisterAllMasters():Array[JFuture[_]]={\nfor(masterAddress<-masterRpcAddresses)yield{\nregisterMasterThreadPool.submit(newRunnable{\noverridedefrun():Unit=try{\nif(registered.get){\nreturn\n}\nlogInfo(“Connectingtomaster”+masterAddress.toSparkURL+”…”)\nvalmasterRef=rpcEnv.setupEndpointRef(masterAddress,Master.ENDPOINT_NAME)\nmasterRef.send(RegisterApplication(appDescription,self))\n}catch{\ncaseie:InterruptedException=>//Cancelled\ncaseNonFatal(e)=>logWarning(s”Failedtoconnecttomaster$masterAddress”,e)\n}\n})\n}\n}\n

4.17.master接收到RegisterApplication消息执行响应逻辑

注册application到master

发送RegisteredApplication消息给driver

调用schedule函数

\n//Master类recevice方法中代码\ncaseRegisterApplication(description,driver)=>\n//TODOPreventrepeatedregistrationsfromsomedriver\nif(state==RecoveryState.STANDBY){\n//ignore,don’tsendresponse\n}else{\nlogInfo(“Registeringapp”+description.name)\nvalapp=createApplication(description,driver)\nregisterApplication(app)\nlogInfo(“Registeredapp”+description.name+”withID”+app.id)\npersistenceEngine.addApplication(app)\ndriver.send(RegisteredApplication(app.id,self))\nschedule()\n}\n执行registerApplication方法:\n添加app信息到master的各个几何中,\nprivatedefregisterApplication(app:ApplicationInfo):Unit={\nvalappAddress=app.driver.address\nif(addressToApp.contains(appAddress)){\nlogInfo(“Attemptedtore-registerapplicationatsameaddress:”+appAddress)\nreturn\n}\napplicationMetricsSystem.registerSource(app.appSource)\napps+=app\nidToApp(app.id)=app\nendpointToApp(app.driver)=app\naddressToApp(appAddress)=app\nwaitingApps+=app\n}\n

4.18.执行schedule方法

判断worker是否有足够资源。

调用startExecutorsOnWorkers方法

\nprivatedefschedule():Unit={\nif(state!=RecoveryState.ALIVE){\nreturn\n}\n//Driverstakestrictprecedenceoverexecutors\nvalshuffledAliveWorkers=Random.shuffle(workers.toSeq.filter(_.state==WorkerState.ALIVE))\nvalnumWorkersAlive=shuffledAliveWorkers.size\nvarcurPos=0\nfor(driver<-waitingDrivers.toList){//iterateoveracopyofwaitingDrivers\n//Weassignworkerstoeachwaitingdriverinaround-robinfashion.Foreachdriver,we\n//startfromthelastworkerthatwasassignedadriver,andcontinueonwardsuntilwehave\n//exploredallaliveworkers.\nvarlaunched=false\nvarnumWorkersVisited=0\nwhile(numWorkersVisited<numWorkersAlive&&!launched){\nvalworker=shuffledAliveWorkers(curPos)\nnumWorkersVisited+=1\nif(worker.memoryFree>=driver.desc.mem&&worker.coresFree>=driver.desc.cores){\nlaunchDriver(worker,driver)\nwaitingDrivers-=driver\nlaunched=true\n}\ncurPos=(curPos+1)%numWorkersAlive\n}\n}\nstartExecutorsOnWorkers()\n}\n

4.19.执行startExecutorsOnWorkers方法

分配多少core给executor

调用allocateWorkerResourceToExecutors函数,发送消息给worker启动executor

\nprivatedefstartExecutorsOnWorkers():Unit={\n//RightnowthisisaverysimpleFIFOscheduler.Wekeeptryingtofitinthefirstapp\n//inthequeue,thenthesecondapp,etc.\nfor(app<-waitingApps){\nvalcoresPerExecutor=app.desc.coresPerExecutor.getOrElse(1)\n//IfthecoresleftislessthanthecoresPerExecutor,thecoresleftwillnotbeallocated\nif(app.coresLeft>=coresPerExecutor){\n//Filteroutworkersthatdon’thaveenoughresourcestolaunchanexecutor\nvalusableWorkers=workers.toArray.filter(_.state==WorkerState.ALIVE)\n.filter(worker=>worker.memoryFree>=app.desc.memoryPerExecutorMB&&\nworker.coresFree>=coresPerExecutor)\n.sortBy(_.coresFree).reverse\nvalassignedCores=scheduleExecutorsOnWorkers(app,usableWorkers,spreadOutApps)\n//Nowthatwe’vedecidedhowmanycorestoallocateoneachworker,let’sallocatethem\nfor(pos<-0untilusableWorkers.lengthifassignedCores(pos)>0){\nallocateWorkerResourceToExecutors(\napp,assignedCores(pos),app.desc.coresPerExecutor,usableWorkers(pos))\n}\n}\n}\n}\n

4.20.执行allocateWorkerResourceToExecutors方法

调用launchExecutor方法。

\nprivatedefallocateWorkerResourceToExecutors(\napp:ApplicationInfo,\nassignedCores:Int,\ncoresPerExecutor:Option[Int],\nworker:WorkerInfo):Unit={\n//Ifthenumberofcoresperexecutorisspecified,wedividethecoresassigned\n//tothisworkerevenlyamongtheexecutorswithnoremainder.\n//Otherwise,welaunchasingleexecutorthatgrabsalltheassignedCoresonthisworker.\nvalnumExecutors=coresPerExecutor.map{assignedCores/_}.getOrElse(1)\nvalcoresToAssign=coresPerExecutor.getOrElse(assignedCores)\nfor(i<-1tonumExecutors){\nvalexec=app.addExecutor(worker,coresToAssign)\nlaunchExecutor(worker,exec)\napp.state=ApplicationState.RUNNING\n}\n}\n

4.21.执行launchExecutor

向worker发送LaunchExecutor消息

向driver发送ExecutorAdded消息。

\nprivatedeflaunchExecutor(worker:WorkerInfo,exec:ExecutorDesc):Unit={\nlogInfo(“Launchingexecutor”+exec.fullId+”onworker”+worker.id)\nworker.addExecutor(exec)\nworker.endpoint.send(LaunchExecutor(masterUrl,\nexec.application.id,exec.id,exec.application.desc,exec.cores,exec.memory))\nexec.application.driver.send(\nExecutorAdded(exec.id,worker.id,worker.hostPort,exec.cores,exec.memory))\n}\n

4.22.Worker接收到LaunchExecutor消息执行相应的操作

创建ExecutorRunner对象,执行其start方法

\ncaseLaunchExecutor(masterUrl,appId,execId,appDesc,cores_,memory_)=>\nif(masterUrl!=activeMasterUrl){\nlogWarning(“InvalidMaster(“+masterUrl+”)attemptedtolaunchexecutor.”)\n}else{\ntry{\nlogInfo(“Askedtolaunchexecutor%s/%dfor%s”.format(appId,execId,appDesc.name))\n//Createtheexecutor’sworkingdirectory\nvalexecutorDir=newFile(workDir,appId+”/”+execId)\nif(!executorDir.mkdirs()){\nthrownewIOException(“Failedtocreatedirectory”+executorDir)\n}\n//Createlocaldirsfortheexecutor.Thesearepassedtotheexecutorviathe\n//SPARK_EXECUTOR_DIRSenvironmentvariable,anddeletedbytheWorkerwhenthe\n//applicationfinishes.\nvalappLocalDirs=appDirectories.getOrElse(appId,{\nvallocalRootDirs=Utils.getOrCreateLocalRootDirs(conf)\nvaldirs=localRootDirs.flatMap{dir=>\ntry{\nvalappDir=Utils.createDirectory(dir,namePrefix=”executor”)\nUtils.chmod700(appDir)\nSome(appDir.getAbsolutePath())\n}catch{\ncasee:IOException=>\nlogWarning(s”${e.getMessage}.Ignoringthisdirectory.”)\nNone\n}\n}.toSeq\nif(dirs.isEmpty){\nthrownewIOException(“Nosubfoldercanbecreatedin”+\ns”${localRootDirs.mkString(“,”)}.”)\n}\ndirs\n})\nappDirectories(appId)=appLocalDirs\nvalmanager=newExecutorRunner(\nappId,\nexecId,\nappDesc.copy(command=Worker.maybeUpdateSSLSettings(appDesc.command,conf)),\ncores_,\nmemory_,\nself,\nworkerId,\nhost,\nwebUi.boundPort,\npublicAddress,\nsparkHome,\nexecutorDir,\nworkerUri,\nconf,\nappLocalDirs,ExecutorState.RUNNING)\nexecutors(appId+”/”+execId)=manager\nmanager.start()\ncoresUsed+=cores_\nmemoryUsed+=memory_\nsendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None))\n}catch{\ncasee:Exception=>\nlogError(s”Failedtolaunchexecutor$appId/$execIdfor${appDesc.name}.”,e)\nif(executors.contains(appId+”/”+execId)){\nexecutors(appId+”/”+execId).kill()\nexecutors-=appId+”/”+execId\n}\nsendToMaster(ExecutorStateChanged(appId,execId,ExecutorState.FAILED,\nSome(e.toString),None))\n}\n}\n

4.23.执行ExecutorRunner.start方法

调用fetchAndRunExecutor方法

\nprivate[worker]defstart(){\nworkerThread=newThread(“ExecutorRunnerfor”+fullId){\noverridedefrun(){fetchAndRunExecutor()}\n}\nworkerThread.start()\n//Shutdownhookthatkillsactorsonshutdown.\nshutdownHook=ShutdownHookManager.addShutdownHook{()=>\n//It’spossiblethatwearriveherebeforecalling`fetchAndRunExecutor`,then`state`will\n//be`ExecutorState.RUNNING`.Inthiscase,weshouldset`state`to`FAILED`.\nif(state==ExecutorState.RUNNING){\nstate=ExecutorState.FAILED\n}\nkillProcess(Some(“Workershuttingdown”))}\n}\n

4.24.执行fetchAndRunExecutor方法

调用启动CoarseGrainedExecutorBackend类命令。

\nprivatedeffetchAndRunExecutor(){\ntry{\n//Launchtheprocess\nvalsubsOpts=appDesc.command.javaOpts.map{\nUtils.substituteAppNExecIds(_,appId,execId.toString)\n}\nvalsubsCommand=appDesc.command.copy(javaOpts=subsOpts)\nvalbuilder=CommandUtils.buildProcessBuilder(subsCommand,newSecurityManager(conf),\nmemory,sparkHome.getAbsolutePath,substituteVariables)\nvalcommand=builder.command()\nvalformattedCommand=command.asScala.mkString(“\\””,”\\”\\””,”\\””)\nlogInfo(s”Launchcommand:$formattedCommand”)\nbuilder.directory(executorDir)\nbuilder.environment.put(“SPARK_EXECUTOR_DIRS”,appLocalDirs.mkString(File.pathSeparator))\n//IncasewearerunningthisfromwithintheSparkShell,avoidcreatinga”scala”\n//parentprocessfortheexecutorcommand\nbuilder.environment.put(“SPARK_LAUNCH_WITH_SCALA”,”0″)\n//AddwebUIlogurls\nvalbaseUrl=\nif(conf.getBoolean(“spark.ui.reverseProxy”,false)){\ns”/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType=”\n}else{\ns”http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=”\n}\nbuilder.environment.put(“SPARK_LOG_URL_STDERR”,s”${baseUrl}stderr”)\nbuilder.environment.put(“SPARK_LOG_URL_STDOUT”,s”${baseUrl}stdout”)\nprocess=builder.start()\nvalheader=”SparkExecutorCommand:%s\\n%s\\n\\n”.format(\nformattedCommand,”=”*40)\n//Redirectitsstdoutandstderrtofiles\nvalstdout=newFile(executorDir,”stdout”)\nstdoutAppender=FileAppender(process.getInputStream,stdout,conf)\nvalstderr=newFile(executorDir,”stderr”)\nFiles.write(header,stderr,StandardCharsets.UTF_8)\nstderrAppender=FileAppender(process.getErrorStream,stderr,conf)\n//Waitforittoexit;executormayexitwithcode0(whendriverinstructsittoshutdown)\n//orwithnonzeroexitcode\nvalexitCode=process.waitFor()\nstate=ExecutorState.EXITED\nvalmessage=”Commandexitedwithcode”+exitCode\nworker.send(ExecutorStateChanged(appId,execId,state,Some(message),Some(exitCode)))\n}catch{\ncaseinterrupted:InterruptedException=>\nlogInfo(“Runnerthreadforexecutor”+fullId+”interrupted”)\nstate=ExecutorState.KILLED\nkillProcess(None)\ncasee:Exception=>\nlogError(“Errorrunningexecutor”,e)\nstate=ExecutorState.FAILED\nkillProcess(Some(e.toString))\n}\n}\n\n配置–confspark.executor.extraJavaOptions=-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=10002可调试CoarseGrainedExecutorBackend的启动过程。\n

4.25.CoarseGrainedExecutorBackend的main函数

创建sparkEnv

创建CoarseGrainedExecutorBackend对象

创建WorkerWatcher对象。

\nvalenv=SparkEnv.createExecutorEnv(\ndriverConf,executorId,hostname,cores,cfg.ioEncryptionKey,isLocal=false)\nenv.rpcEnv.setupEndpoint(“Executor”,newCoarseGrainedExecutorBackend(\nenv.rpcEnv,driverUrl,executorId,hostname,cores,userClassPath,env))\nworkerUrl.foreach{url=>\nenv.rpcEnv.setupEndpoint(“WorkerWatcher”,newWorkerWatcher(env.rpcEnv,url))\n}\nenv.rpcEnv.awaitTermination()\n#执行CoarseGrainedExecutorBackend对象onStart方法\n向driver发送RegisterExecutor消息。\noverridedefonStart(){\nlogInfo(“Connectingtodriver:”+driverUrl)\nrpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap{ref=>\n//Thisisaveryfastactionsowecanuse”ThreadUtils.sameThread”\ndriver=Some(ref)\nref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,extractLogUrls))\n}(ThreadUtils.sameThread).onComplete{\n//Thisisaveryfastactionsowecanuse”ThreadUtils.sameThread”\ncaseSuccess(msg)=>\n//Alwaysreceive`true`.Justignoreit\ncaseFailure(e)=>\nexitExecutor(1,s”Cannotregisterwithdriver:$driverUrl”,e,notifyDriver=false)\n}(ThreadUtils.sameThread)\n}\n

自此executor启动完成,代码粘贴的多点多,建议跟着流程走一遍。接下来分析,spark触发提交job的源码分析。

关于任务文章网站源码分享和任务文章网站源码分享怎么弄的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平