其实在线业务提交网站源码分享的问题并不复杂,但是又很多的朋友都不太了解在线接业务,因此呢,今天小编就来为大家分享在线业务提交网站源码分享的一些知识,希望可以帮助到大家,下面我们一起来看看这个问题的分析吧!
实际上spark各种脚本,spark-shell、spark-sql实现方式都是通过调用spark-submit脚本来实现的,而spark-submit又是通过spark-class脚本来实现的,spark-class脚本最终执行org.apache.spark.launcher.Main,作为整个Spark程序的主入口。下面我们具体来看一下。
spark-submit
首先查看一下spark-submit的位置,可以看出/usr/bin/spark-submit是个软连接:
[root@master~]ll/usr/bin/spark-submit\nlrwxrwxrwx1rootroot302月610:50/usr/bin/spark-submit->/etc/alternatives/spark-submit\n[root@master~]!/bin/bash\nAutodetectJAVA_HOMEifnotdefined\n.$LIB_DIR/bigtop-utils/bigtop-detect-javahome\n此时$LIB_DIR=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submit\nexec$LIB_DIR/spark/bin/spark-submit”$@”\n
上述脚本的主要功能是:找到spark提交的真正脚本的所在位置,在CDH版本中该位置在/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark/bin/spark-submit,另一种查找spark-submit的位置方法是使用find/-namespark-submit命令。
真正的spark-submit脚本内容是:
disablerandomizedhashforstringinPython3.3+\nexportPYTHONHASHSEED=0\nspark-submit调用了spark-class\n传入的类是org.apache.spark.deploy.SparkSubmit\n以及它传入的参数,如deploymode、executor-memory等\n此时的$SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/\nexec”$SPARK_HOME”/bin/spark-classorg.apache.spark.deploy.SparkSubmit”$@”\n
可以看出该脚本又调用了spark-class脚本,传递过去的第一个参数是org.apache.spark.deploy.SparkSubmit,$@是执行/usr/bin/spark-submit脚本时传入的参数,同样传给了spark-class。
接着来看spark-class脚本,spark-class脚本会加载spark配置的环境变量信息、定位依赖包spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)等,然后再调用org.apache.spark.launcher.Main正式启动Spark应用程序的运行,spark-class脚本具体内容:
定位SAPRK_HOME目录,SPARK_HOME=/opt/cloudera/parcels/CDH-5.6.0-1.cdh5.6.0.p0.45/lib/spark\nexportSPARK_HOME=”$(cd”`dirname”$0″`”/..;pwd)”\n例如配置文件conf下的spark-env.sh等\n.”$SPARK_HOME”/bin/load-spark-env.sh\n定位spark-assembly-1.5.0-cdh5.6.0-hadoop2.6.0-cdh5.6.0.jar文件(以cdh2.6.0-spark1.5.0为例)\n把launcher加到类路径中\nif[-n”$SPARK_PREPEND_CLASSES”];then\nLAUNCH_CLASSPATH=”$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH”\nfi\nexport_SPARK_ASSEMBLY=”$SPARK_ASSEMBLY_JAR”\ncharactersthatwouldbeotherwiseinterpretedbytheshell.Readthatinawhileloop,populating\n#执行org.apache.spark.launcher.Main作为Spark应用程序的主入口\nCMD=()\nwhileIFS=read-d”-rARG;do\nCMD+=(“$ARG”)\ndone<<(“$RUNNER”-cp”$LAUNCH_CLASSPATH”org.apache.spark.launcher.Main”$@”)\nexec”${CMD[@]}”\n
最关键的就是下面这句了:
CMD=()\nwhileIFS=read-d”-rARG;do\nCMD+=(“$ARG”)\ndone<<(“$RUNNER”-cp”$LAUNCH_CLASSPATH”org.apache.spark.launcher.Main”$@”)\nexec”${CMD[@]}”\n
首先执行了”$RUNNER”-cp”$LAUNCH_CLASSPATH”org.apache.spark.launcher.Main”$@这个是真正执行的第一个spark的类。返回值由while循环读取,加入到CMD中。
launcher.Main返回的数据存储到CMD中。
然后执行命令:
exec”${CMD[@]}”\n
launcher.Main
org.apache.spark.launcher.Main是Spark启动器的命令行接口,spark-class调用完org.apache.spark.launcher.Main后,执行exec”${CMD[@]}”,会启动org.apache.spark.deploy.SparkSubmit的执行,org.apache.spark.launcher.Main部分源码如下:
publicstaticvoidmain(String[]argsArray)throwsException{\n…\nList<String>args=newArrayList<String>(Arrays.asList(argsArray));\nStringclassName=args.remove(0);\n…\n//创建命令解析器\nAbstractCommandBuilderbuilder;\nif(className.equals(“org.apache.spark.deploy.SparkSubmit”)){\ntry{\n//调用org.apache.spark.deploy.SparkSubmit\nbuilder=newSparkSubmitCommandBuilder(args);\n}catch(IllegalArgumentExceptione){\n//……省略\nbuilder=newSparkSubmitCommandBuilder(help);\n}\n}else{\n//启动其他类\nbuilder=newSparkClassCommandBuilder(className,args);\n}\nList<String>cmd=builder.buildCommand(env);//解析器解析参数\n…\n//返回有效的参数\nif(isWindows()){\nSystem.out.println(prepareWindowsCommand(cmd,env));\n}else{\nList<String>bashCmd=prepareBashCommand(cmd,env);\nfor(Stringc:bashCmd){\nSystem.out.print(c);\nSystem.out.print(‘\\0’);\n}\n}\n}\n
这样就进入了org.apache.spark.deploy.SparkSubmit方法入口函数。SparkSubmit部分源码如下:
defmain(args:Array[String]):Unit={\n//任务提交时设置的参数,例如master、executorMemory等\nvalappArgs=newSparkSubmitArguments(args)\nif(appArgs.verbose){\n//scalastyle:offprintln\nprintStream.println(appArgs)\n//scalastyle:onprintln\n}\nappArgs.actionmatch{\n//任务提交时,执行submit(appArgs)\ncaseSparkSubmitAction.SUBMIT=>submit(appArgs)\ncaseSparkSubmitAction.KILL=>kill(appArgs)\ncaseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)\n}\n}\n
创建SparkSubmitArguments对象,并解析参数来初始化成员。这里只分析submit过程。
SparkSubmitArguments
此类,解析并封装来自spark-submit脚本的参数。当newSparkSubmitArguments(args)的时候会顺序触发
//Setparametersfromcommandlinearguments\ntry{\n//解析命令行参数\nparse(args.toList)\n}catch{\n//省略\n}\n//Populate`sparkProperties`mapfrompropertiesfile\n//将默认属性文件(默认为spark-defaults.conf)中配置的参数值与通过-conf指定的值合并;\nmergeDefaultSparkProperties()\n//Removekeysthatdon’tstartwith”spark.”from`sparkProperties`.\n//删除不以”spark.”开头的属性\nignoreNonSparkProperties()\n//Use`sparkProperties`mapalongwithenvvarstofillinanymissingparameters\n//从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT;\nloadEnvironmentArguments()\n//验证action操作所需要的属性都已设置\nvalidateArguments()\n
解析Sprak-submit脚本传递过来的参数将默认属性文件的配置值和–conf传递过来的属性值合并,取–conf的值,删除不以”spark.”开头的属性从环境变量、Spark属性等加载参数。此方法会设置action参数,默认为SUBMIT;根据action值的不同,检查action对应操作所需要的属性是否都已设置,不同的action操作有不同的检测方法。
privatedefvalidateArguments():Unit={\nactionmatch{\ncaseSUBMIT=>validateSubmitArguments()\ncaseKILL=>validateKillArguments()\ncaseREQUEST_STATUS=>validateStatusRequestArguments()\n}\n}\n
Spark属性参数优先级
应用程序中设置的参数;命令行设置的参数;在配置文件中设置的参数(默认为spark-defaults.conf);环境变量中设置的属性值。
SparkSubmit.submit
该方法的功能分为两步骤:
首先,我们通过设置类路径、系统属性和应用程序参数准备启动环境来运行基于集群管理器和部署模式的子主类。其次,我们使用这个启动环境来调用子类的主方法。
下面来看submit方法的实现:
privatedefsubmit(args:SparkSubmitArguments):Unit={\nval(childArgs,childClasspath,sysProps,childMainClass)=prepareSubmitEnvironment(args)\ndefdoRunMain():Unit={\n//args.proxyUser可以在命令行中通过args.proxyUser传递,模拟指定的用户提交程序\nif(args.proxyUser!=null){\nvalproxyUser=UserGroupInformation.createProxyUser(args.proxyUser,\nUserGroupInformation.getCurrentUser())\ntry{\nproxyUser.doAs(newPrivilegedExceptionAction[Unit](){\noverridedefrun():Unit={\n//重点!!!\nrunMain(childArgs,childClasspath,sysProps,childMainClass,args.verbose)\n}\n})\n}catch{\ncasee:Exception=>\n….\n}\n}else{\n//重点!!!\nrunMain(childArgs,childClasspath,sysProps,childMainClass,args.verbose)\n}\n}\n//Instandaloneclustermode,therearetwosubmissiongateways:\n//(1)ThetraditionalAkkagatewayusingo.a.s.deploy.Clientasawrapper\n//(2)ThenewREST-basedgatewayintroducedinSpark1.3\n//ThelatteristhedefaultbehaviorasofSpark1.3,butSparksubmitwillfailover\n//tousethelegacygatewayifthemasterendpointturnsouttobenotaRESTserver.\nif(args.isStandaloneCluster&&args.useRest){\ntry{\nprintStream.println(“RunningSparkusingtheRESTapplicationsubmissionprotocol.”)\ndoRunMain()\n}catch{\n//Failovertousethelegacysubmissiongateway\ncasee:SubmitRestConnectionException=>\nprintWarning(s”Masterendpoint${args.master}wasnotaRESTserver.”+\n”Fallingbacktolegacysubmissiongatewayinstead.”)\nargs.useRest=false\nsubmit(args)\n}\n//Inallothermodes,justrunthemainclassasprepared\n}else{\ndoRunMain()\n}\n}\n
prepareSubmitEnvironment其中一个职责就是设置childMainClass,它决定了应用程序主类的调用方式;调用doRunMain内部方法,它将调用runMain方法。
SparkSubmit.prepareSubmitEnvironment
该方法主要是为提交应用程序做准备,最终返回一个Tuple类型,具有4个元素:
childArgs:返回的第一个参数是子程序的参数childClasspath:进入子程序所需要的类路径sysProps:一个包括系统属性的map类型childMainClass:主类名,即org.apache.spark.deploy.yarn.Client、org.apache.spark.deploy.Client、org.apache.spark.deploy.rest.RestSubmissionClient或应用程序主类名等,这些Client类是userClass的包装类,userClass就是应用程序主类名,也就是用户自己编写的Spark程序的主类。prepareSubmitEnvironment的方法签名:
private[deploy]defprepareSubmitEnvironment(args:SparkSubmitArguments)\n:(Seq[String],Seq[String],Map[String,String],String)={\n
设置集群资源管理器,表示程序将会运行在哪个集群资源管理器上
valclusterManager:Int=args.mastermatch{\ncasemifm.startsWith(“yarn”)=>YARN\ncasemifm.startsWith(“spark”)=>STANDALONE\ncasemifm.startsWith(“mesos”)=>MESOS\ncasemifm.startsWith(“local”)=>LOCAL\ncase_=>printErrorAndExit(“Mastermuststartwithyarn,spark,mesos,orlocal”);-1\n}\n
设置应用程序部署模式
vardeployMode:Int=args.deployModematch{\ncase”client”|null=>CLIENT\ncase”cluster”=>CLUSTER\ncase_=>printErrorAndExit(“Deploymodemustbeeitherclientorcluster”);-1\n}\n
在client模式下,直接将childMainClass设置为应用程序主类名
//在client模式下,直接启动应用程序主类,并将应用程序jar和其它依赖的jar添加到类路径中\nif(deployMode==CLIENT){\nchildMainClass=args.mainClass\nif(isUserJar(args.primaryResource)){\nchildClasspath+=args.primaryResource\n}\nif(args.jars!=null){childClasspath++=args.jars.split(“,”)}\nif(args.childArgs!=null){childArgs++=args.childArgs}\n}\n
在standalonecluster模式下,将childMainClass设置org.apache.spark.deploy.rest.RestSubmissionClient或org.apache.spark.deploy.Client
if(args.isStandaloneCluster){\nif(args.useRest){\nchildMainClass=”org.apache.spark.deploy.rest.RestSubmissionClient”\nchildArgs+=(args.primaryResource,args.mainClass)\n}else{\n//Inlegacystandaloneclustermode,useClientasawrapperaroundtheuserclass\nchildMainClass=”org.apache.spark.deploy.Client”\nif(args.supervise){childArgs+=”–supervise”}\nOption(args.driverMemory).foreach{m=>childArgs+=(“–memory”,m)}\nOption(args.driverCores).foreach{c=>childArgs+=(“–cores”,c)}\nchildArgs+=”launch”\nchildArgs+=(args.master,args.primaryResource,args.mainClass)\n}\nif(args.childArgs!=null){\nchildArgs++=args.childArgs\n}\n}\n
如果提交方式是yarn-cluster,则将childMainClass设置为org.apache.spark.deploy.yarn.Client
//Inyarn-clustermode,useyarn.Clientasawrapperaroundtheuserclass\n//在yarn-cluster模式下,使用yarn.Client用户包装类\nif(isYarnCluster){\nchildMainClass=”org.apache.spark.deploy.yarn.Client”\n//…………….\n}\n
SparkSubmit.runMain
runMain方法的定义:
privatedefrunMain(\nchildArgs:Seq[String],\nchildClasspath:Seq[String],\nsysProps:Map[String,String],\nchildMainClass:String,\nverbose:Boolean):Unit={\n
verbose可以在命令行中使用–verbose,-v设置,将runMain的参数输出到控制台:
if(verbose){\nprintStream.println(s”Mainclass:\\n$childMainClass”)\nprintStream.println(s”Arguments:\\n${childArgs.mkString(“\\n”)}”)\nprintStream.println(s”Systemproperties:\\n${sysProps.mkString(“\\n”)}”)\nprintStream.println(s”Classpathelements:\\n${childClasspath.mkString(“\\n”)}”)\nprintStream.println(“\\n”)\n}\n
举个例子,如果提交方式为client:spark-submit-v–classcom.carol.SparkTest–masteryarn-clienttestSpark.jar1002,那么,会有如下输出:
Mainclass:\ncom.carol.SparkTest\nArguments:\n100\n2\n–name\ntestSpark\n–jar\nfile:/home/jars/testSpark.jar\n–class\ncom.carol.SparkTest\nSystemproperties:\n…..\nClasspathelements:\n…..\n
如果提交方式为cluster:spark-submit-v–classcom.carol.SparkTest–masteryarn-clustertestSpark.jar1002,那么,会有如下输出:
Mainclass:\norg.apache.spark.deploy.yarn.Client\nArguments:\n–name\ntestSpark\n–jar\nfile:/home/jars/testSpark.jar\n–class\ncom.carol.SparkTest\n–arg\n100\n–arg\n2\nSystemproperties:\n…..\nClasspathelements:\n…..\n
加载jar
for(jar<-childClasspath){\naddJarToClasspath(jar,loader)\n}\n
将Spark属性参数设置为系统属性(很多地方采用从System属性中获取参数,比如创建SparkConf时从系统中加载):
for((key,value)<-sysProps){\nSystem.setProperty(key,value)\n}\n
创建childMainClass的类对象:
try{\nmainClass=Utils.classForName(childMainClass)\n}catch{\n//…..\n}\n
获取main方法对象:
valmainMethod=mainClass.getMethod(“main”,newArray[String](0).getClass)\n
调用main方法:
try{\nmainMethod.invoke(null,childArgs.toArray)\n}catch{\n//…..\n}\n
到此,就已经调用prepareSubmitEnvironment方法设置的childMainClass类了。那么childMainClass的取值可为:
应用程序主类名;//Client\norg.apache.spark.deploy.rest.StandaloneRestClient;//StandalonerestClient\norg.apache.spark.deploy.Client;//StandaloneClient\norg.apache.spark.deploy.yarn.Client//yarncluster\n
接下来就是通过Master、Worker启动DriverWrapper进程,进而启动应用程序主类的过程。
好了,内容就到这里了,照例放张美图休息一下
感兴趣的朋友可以点个关注,我们一起学习进步!
在线业务提交网站源码分享的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于在线接业务、在线业务提交网站源码分享的信息别忘了在本站进行查找哦。
