大家好,今天给各位分享web作业制作网站源码分享的一些知识,其中也会对web大作业制作网页代码进行解释,文章篇幅可能偏长,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在就马上开始吧!
本文分为两个部分:
作业提交流程思维导图关键函数列表
作业提交流程思维导图
collect后Job的提交流程
点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。
关键函数列表
Dataset.collect
defcollect():Array[T]=withAction(&34;,queryExecution)(collectFromPlan)
Dataset.withAction
Dataset.collectFromPlan
触发物理计划的执行,其中plan的类型是SparkPlan
privatedefcollectFromPlan(plan:SparkPlan):Array[T]={\nvalfromRow=resolvedEnc.createDeserializer()\nplan.executeCollect().map(fromRow)\n}
Spark有很多action函数,比如:
collectcountshow
最终都是通过collectFromPlan去创建Job
SparkPlan.executeCollect
executeCollect
这个函数分为三部:
getByteArrayRdd函数将UnsafeRowRDD转化为bytearrayRDD,加速序列化然后调用了RDD.collect解析collect结果,并返回
RDD.collect
ResilientDistributedDataset(RDD),是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。
classRDD是一个基类,它有很多子类:
ShuffledRDD:存储shuffle结果数据,parentRDD是Javakey-value对ShuffledRowRDD:存储shuffle结果数据,parentRDD是InternalRow,SparkSQL使用MapPartitionsRDD:算子会被应用到parentRDD的所有分区UnionRDD:存储union的结果数据其他RDD子类
collect方法的主要职能是提交Spark作业,该功能代理给了SparkContext去支持:
SparkContext.runJob
runJob方法有很多重载,我们只关心最复杂的一个:
从功能上来说,它实现了
准备callSite,以便出问题知道是哪一行代码出错了通过DAGScheduler.runJob提交作业progressBar:命令行里stage的进度条显示doCheckpoint将RDD的中间和最后结果缓存下来
从代码上来说,方法声明如下:
defrunJob[T,U:ClassTag](\nrdd:RDD[T],\nfunc:(TaskContext,Iterator[T])=>U,\npartitions:Seq[Int],\nresultHandler:(Int,U)=>Unit):Unit
它有两个泛型类型参数:
T:ClassTag输入RDD的类型U:ClassTag输出数据的类型
参数列表:
rdd:RDD[T]指输入RDD类型,比如RDD[(Long,Array[Byte])]func:(TaskContext,Iterator[T])=>U。func会被作用到rdd的每个分区,返回Upartitions:Seq[Int]。分区下标列表resultHandler:(Int,U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值
返回值:Unit表示没有任何返回值
DAGScheduler.runJob
对于DAGScheduler而言,Stage是最小的调度单元。它会
给Job生成以Stage为调度单位的DAG图追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案提交Stage,以TaskSet的形式提交给TasksetManager
DAGScheduler对Job的调度是围绕DAGSchedulerEventProcessLoop展开的。这是一个经典的EventLoop使用场景。runJob方法的执行流程如下:
提交任务本质上是向EventLoop发送一个JobSubmitted事件通过一个JobWaiter对象等待结果
在EventLoop的另一端,onReceive接收到JobSubmitted事件,交给成员函数handleJobSubmitted处理该事件。
JobWaiter内部有一个Promise对象,它会不停接收到taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。
DAGSchedulerEventProcessLoop.onReceive
onReceive负责接收各类事件,并分发给特定的handler函数处理,具体可以看思维导图或spark代码。
这里我们只看handleJobSubmitted,它做了五件事情:
创建Stage:递归式地创建,先创建parentstage注册Stage创建Job注册Job提交Stage
由于stage是一个有向无环图,所以创建和执行都遵循topologicalorder。
DAGScheduler.createResultStage
在SparkPlan对象调用execute时,会递归地生成RDD,从而构成了RDDLineageGraph,它是一个有向无环图。那么在RDDLineage上如何切分stage呢?
RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建RDDLineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分Stage。根节点的RDD必须分配到ResultStage里,而之前所有的Stage,不管有多少级依赖,都是ShuffleMapStage。
在
DAGScheduler.getShuffleDependenciesAndResourceProfiles
方法中,通过一个栈来记录分配到当前stage中的RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到shuffleDeps中。
getShuffleDependenciesAndResourceProfiles
关于web作业制作网站源码分享的内容到此结束,希望对大家有所帮助。
