大家好,今天小编来为大家解答以下的问题,关于任务网站平台源码分享,任务网站有哪些这个很多人还不知道,现在让我们一起来看看吧!
FlinkYarn-per-job模式提交流程如图所示:
1、启动Yarn客户端
AbstractJobClusterExecutor.java
publicCompletableFuture<JobClient>execute(@NonnullfinalPipelinepipeline,@NonnullfinalConfigurationconfiguration)throwsException{\nfinalJobGraphjobGraph=ExecutorUtils.getJobGraph(pipeline,configuration);\n//创建并启动yarn客户端\ntry(finalClusterDescriptor<ClusterID>clusterDescriptor=clusterClientFactory.createClusterDescriptor(configuration)){\nfinalExecutionConfigAccessorconfigAccessor=ExecutionConfigAccessor.fromConfiguration(configuration);\n//获取集群配置参数\nfinalClusterSpecificationclusterSpecification=clusterClientFactory.getClusterSpecification(configuration);\n//部署集群\nfinalClusterClientProvider<ClusterID>clusterClientProvider=clusterDescriptor\n.deployJobCluster(clusterSpecification,jobGraph,configAccessor.getDetachedMode());\nLOG.info(&34;+jobGraph.getJobID());\n\nreturnCompletableFuture.completedFuture(\nnewClusterClientJobClientAdapter<>(clusterClientProvider,jobGraph.getJobID()));\n}\n}
YarnClusterClientFactory.java
publicYarnClusterDescriptorcreateClusterDescriptor(Configurationconfiguration){\n……\nreturngetClusterDescriptor(configuration);\n}\n\nprivateYarnClusterDescriptorgetClusterDescriptor(Configurationconfiguration){\nfinalYarnClientyarnClient=YarnClient.createYarnClient();\nfinalYarnConfigurationyarnConfiguration=newYarnConfiguration();\n\nyarnClient.init(yarnConfiguration);\nyarnClient.start();\n\nreturnnewYarnClusterDescriptor(\nconfiguration,\nyarnConfiguration,\nyarnClient,\nYarnClientYarnClusterInformationRetriever.create(yarnClient),\nfalse);\n}
2、获取集群配置参数
AbstractContainerizedClusterClientFactory.java
publicClusterSpecificationgetClusterSpecification(Configurationconfiguration){\n……\nfinalintjobManagerMemoryMB=JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(\nconfiguration,\nJobManagerOptions.TOTAL_PROCESS_MEMORY)\n.getTotalProcessMemorySize()\n.getMebiBytes();\n\nfinalinttaskManagerMemoryMB=TaskExecutorProcessUtils\n.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(\nconfiguration,TaskManagerOptions.TOTAL_PROCESS_MEMORY))\n.getTotalProcessMemorySize()\n.getMebiBytes();\n\nintslotsPerTaskManager=configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);\n\nreturnnewClusterSpecification.ClusterSpecificationBuilder()\n.setMasterMemoryMB(jobManagerMemoryMB)\n.setTaskManagerMemoryMB(taskManagerMemoryMB)\n.setSlotsPerTaskManager(slotsPerTaskManager)\n.createClusterSpecification();\n}
3、部署集群
YarnClusterDescriptor.java
publicClusterClientProvider<ApplicationId>deployJobCluster(\nClusterSpecificationclusterSpecification,\nJobGraphjobGraph,\nbooleandetached)throwsClusterDeploymentException{\ntry{\nreturndeployInternal(\nclusterSpecification,\n&34;,\ngetYarnJobClusterEntrypoint(),//获取YarnJobClusterEntrypoint,启动AM的入口\njobGraph,\ndetached);\n}catch(Exceptione){\nthrownewClusterDeploymentException(&34;,e);\n}\n}
上传jar包和配置文件到HDFS
YarnClusterDescriptor.java
privateClusterClientProvider<ApplicationId>deployInternal(\nClusterSpecificationclusterSpecification,\nStringapplicationName,\nStringyarnClusterEntrypoint,\n@NullableJobGraphjobGraph,\nbooleandetached)throwsException{\n……\n//创建应用\nfinalYarnClientApplicationyarnApplication=yarnClient.createApplication();\n……\nApplicationReportreport=startAppMaster(\nflinkConfiguration,\napplicationName,\nyarnClusterEntrypoint,\njobGraph,\nyarnClient,\nyarnApplication,\nvalidClusterSpecification);\n……\n}
privateApplicationReportstartAppMaster(\nConfigurationconfiguration,\nStringapplicationName,\nStringyarnClusterEntrypoint,\nJobGraphjobGraph,\nYarnClientyarnClient,\nYarnClientApplicationyarnApplication,\nClusterSpecificationclusterSpecification)throwsException{\n……\n//初始化文件系统(HDFS)\nfinalFileSystemfs=FileSystem.get(yarnConfiguration);\n……\nApplicationSubmissionContextappContext=yarnApplication.getApplicationSubmissionContext();\n\nfinalList<Path>providedLibDirs=getRemoteSharedPaths(configuration);\n//上传文件的工具类\nfinalYarnApplicationFileUploaderfileUploader=YarnApplicationFileUploader.from(\nfs,\nfs.getHomeDirectory(),\nprovidedLibDirs,\nappContext.getApplicationId(),\ngetFileReplication());\n……\nfinalApplicationIdappId=appContext.getApplicationId();\n……\nif(HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){\n//yarn重试次数,默认2\nappContext.setMaxAppAttempts(\nconfiguration.getInteger(\nYarnConfigOptions.APPLICATION_ATTEMPTS.key(),\nYarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));\n\nactivateHighAvailabilitySupport(appContext);\n}else{\n//不是高可用重试次数为1\nappContext.setMaxAppAttempts(\nconfiguration.getInteger(\nYarnConfigOptions.APPLICATION_ATTEMPTS.key(),\n1));\n}\n……\n\n//多次调用上传HDFS的方法,分别是:\n//=>systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包\n//=>shipOnlyFiles:plugins/目录下的文件\n//=>userJarFiles:用户代码的jar包\nfileUploader.registerMultipleLocalResources(……);\n……\n//上传和配置ApplicationMaster的jar包:flink-dist*.jar\nfinalYarnLocalResourceDescriptorlocalResourceDescFlinkJar=fileUploader.uploadFlinkDist(flinkJarPath);\n……\n//\nfileUploader.registerSingleLocalResource(\njobGraphFilename,\nnewPath(tmpJobGraphFile.toURI()),\n&34;,\ntrue,\nfalse);\n……\n//上传flink配置文件\nStringflinkConfigKey=&34;;\nPathremotePathConf=setupSingleLocalResource(\nflinkConfigKey,\nfs,\nappId,\nnewPath(tmpConfigurationFile.getAbsolutePath()),\nlocalResources,\nhomeDir,\n&34;);\n……\n//将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS\nfileUploader.registerSingleLocalResource(\njobGraphFilename,\nnewPath(tmpJobGraphFile.toURI()),\n&34;,\ntrue,\nfalse);\n……\n//上传flink配置文件\nStringflinkConfigKey=&34;;\nfileUploader.registerSingleLocalResource(\nflinkConfigKey,\nnewPath(tmpConfigurationFile.getAbsolutePath()),\n&34;,\ntrue,\ntrue);\n……\nfinalJobManagerProcessSpecprocessSpec=JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(\nflinkConfiguration,\nJobManagerOptions.TOTAL_PROCESS_MEMORY);\n//封装启动AMcontainer的Java命令\nfinalContainerLaunchContextamContainer=setupApplicationMasterContainer(\nyarnClusterEntrypoint,\nhasKrb5,\nprocessSpec);\n……\nappContext.setApplicationName(customApplicationName);\nappContext.setApplicationType(applicationType!=null?applicationType:&34;);\nappContext.setAMContainerSpec(amContainer);\nappContext.setResource(capability);\n……\nyarnClient.submitApplication(appContext);\n……\n}
封装AM参数和命令
YarnClusterDescriptor.java
ContainerLaunchContextsetupApplicationMasterContainer(\nStringyarnClusterEntrypoint,\nbooleanhasKrb5,\nJobManagerProcessSpecprocessSpec){\n\n//respectcustomJVMoptionsintheYAMLfile\nStringjavaOpts=flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);\nif(flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()>0){\njavaOpts+=&34;+flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);\n}\n//applicableonlyforYarnMiniClustersecuretestrun\n//krb5.conffilewillbeavailableaslocalresourceinJM/TMcontainer\nif(hasKrb5){\njavaOpts+=&34;;\n}\n\n//创建AM的容器启动上下文\nContainerLaunchContextamContainer=Records.newRecord(ContainerLaunchContext.class);\n\nfinalMap<String,String>startCommandValues=newHashMap<>();\nstartCommandValues.put(&34;,&34;);\n\nStringjvmHeapMem=JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration);\nstartCommandValues.put(&34;,jvmHeapMem);\n\nstartCommandValues.put(&34;,javaOpts);\nstartCommandValues.put(&34;,YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));\n\nstartCommandValues.put(&34;,yarnClusterEntrypoint);\nstartCommandValues.put(&34;,\n&34;+ApplicationConstants.LOG_DIR_EXPANSION_VAR+&34;+\n&34;+ApplicationConstants.LOG_DIR_EXPANSION_VAR+&34;);\nstartCommandValues.put(&34;,&34;);\n\nfinalStringcommandTemplate=flinkConfiguration\n.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,\nConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);\nfinalStringamCommand=\nBootstrapTools.getStartCommand(commandTemplate,startCommandValues);\n\namContainer.setCommands(Collections.singletonList(amCommand));\n\nLOG.debug(&34;+amCommand);\n\nreturnamContainer;\n}
封装AM参数:
privateApplicationReportstartAppMaster(\nConfigurationconfiguration,\nStringapplicationName,\nStringyarnClusterEntrypoint,\nJobGraphjobGraph,\nYarnClientyarnClient,\nYarnClientApplicationyarnApplication,\nClusterSpecificationclusterSpecification)throwsException{\n\n……\nfinalContainerLaunchContextamContainer=setupApplicationMasterContainer(\nyarnClusterEntrypoint,\nhasKrb5,\nprocessSpec);\n……\n//封装AM的classpath和环境参数\nfinalMap<String,String>appMasterEnv=newHashMap<>();\n//setuserspecifiedappmasterenvironmentvariables\nappMasterEnv.putAll(\nConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration));\n//setFlinkappclasspath\nappMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString());\n\n//setFlinkonYARNinternalconfigurationvalues\nappMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString());\nappMasterEnv.put(YarnConfigKeys.ENV_APP_ID,appId.toString());\nappMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString());\nappMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));\nappMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE,getZookeeperNamespace());\nappMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());\n\n//https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md34;StartingDispatcherRESTendpoint.&34;StartingDispatcher.&34;StartingResourceManager.&34;Start{}.&34;StartingResourceManager.&34;CouldnotstarttheJobManagerbecausetheleaderelectionservicedidnotstart.&34;Couldnotstarttheleaderelectionservice.&34;JobManagerRunnercannotbegrantedleadershipbecauseitisalreadyshutdown.&34;Couldnotstartthejobmanager.&34;Startingexecutionofjob{}({})underjobmasterid{}.&34;Cannotinitializeresourceprovider.&34;Couldnotstartresourcemanagerclient.&34;StartingtheSlotManager.&34;StartingJobMastercomponent.&34;ResourceManager&34;CouldnotcomputethecontainerResourcefromthegivenTaskExecutorProcessSpec%s.&34;ThisusuallyindicatestherequestedresourceislargerthanYarn&34;,\ntaskExecutorProcessSpec)));\n}else{\nfinalPrioritypriority=priorityAndResourceOpt.get().getPriority();\nfinalResourceresource=priorityAndResourceOpt.get().getResource();\nresourceManagerClient.addContainerRequest(getContainerRequest(resource,priority));\n\n//makesurewetransmittherequestfastandreceivefastnewsofgrantedallocations\nresourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);\n\nrequestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore->newLinkedList<>()).add(requestResourceFuture);\n\nlog.info(&34;,taskExecutorProcessSpec,priority);\n}\n\nreturnrequestResourceFuture;\n}
10、TaskManager启动
YarnTaskExecutorRunner.java
publicstaticvoidmain(String[]args){\nEnvironmentInformation.logEnvironmentInfo(LOG,&34;,args);\nSignalHandler.register(LOG);\nJvmShutdownSafeguard.installAsShutdownHook(LOG);\n\nrunTaskManagerSecurely(args);\n}
privatestaticvoidrunTaskManagerSecurely(String[]args){\ntry{\nLOG.debug(&34;,ENV);\n\nfinalStringcurrDir=ENV.get(Environment.PWD.key());\nLOG.info(&34;,currDir);\n\nfinalConfigurationconfiguration=TaskManagerRunner.loadConfiguration(args);\nsetupAndModifyConfiguration(configuration,currDir,ENV);\n\nTaskManagerRunner.runTaskManagerSecurely(configuration);\n}\ncatch(Throwablet){\nfinalThrowablestrippedThrowable=ExceptionUtils.stripException(t,UndeclaredThrowableException.class);\n//makesurethateverythingwhateverendsupinthelog\nLOG.error(&34;,strippedThrowable);\nSystem.exit(INIT_ERROR_EXIT_CODE);\n}\n}
TaskManagerRunner.java
publicvoidstart()throwsException{\ntaskExecutorService.start();\n}
TaskExecutorToServiceAdapter.java
publicvoidstart(){\ntaskExecutor.start();\n}
TaskExecutor.java
publicvoidonStart()throwsException{\ntry{\nstartTaskExecutorServices();\n}catch(Throwablet){\nfinalTaskManagerExceptionexception=newTaskManagerException(String.format(&34;,getAddress()),t);\nonFatalError(exception);\nthrowexception;\n}\n\nstartRegistrationTimeout();\n}
11、向ResourceManager注册
TaskExecutor.java
privatevoidstartTaskExecutorServices()throwsException{\ntry{\n//startbyconnectingtotheResourceManager\nresourceManagerLeaderRetriever.start(newResourceManagerLeaderListener());\n\n//tellthetaskslottablewho&34;ResourceManager&34;Successfulregistrationatresourcemanager{}underregistrationid{}.&34;EstablishingResourceManagerconnectioninTaskExecutorfailed&34;UnknownTaskManagerregistrationid%s.&34;RegisteringTaskManager{}under{}attheSlotManager.&34;Thetotalnumberofslotsexceedsthemaxlimitation{},releasetheexcessresource.&34;Thetotalnumberofslotsexceedsthemaxlimitation.&34;Re-registrationofslot%s.ThisindicatesthattheTaskExecutorhasre-connected.&39;ve\n//justallocatedtheslot.Solet&34;Couldnotfreeslot&34;Couldnotcreatenewjob.&34;Offerreservedslotstotheleaderofjob{}.&34;Therearenounassignedslotsforthejob{}.&34;UnknownTaskManager”+taskManagerId));\n}\n\nfinalTaskManagerLocationtaskManagerLocation=taskManager.f0;\nfinalTaskExecutorGatewaytaskExecutorGateway=taskManager.f1;\n\nfinalRpcTaskManagerGatewayrpcTaskManagerGateway=newRpcTaskManagerGateway(taskExecutorGateway,getFencingToken());\n\nreturnCompletableFuture.completedFuture(\nslotPool.offerSlots(\ntaskManagerLocation,\nrpcTaskManagerGateway,\nslots));\n}
SlotPoolImpl.java
publicCollection<SlotOffer>offerSlots(\nTaskManagerLocationtaskManagerLocation,\nTaskManagerGatewaytaskManagerGateway,\nCollection<SlotOffer>offers){\n\nArrayList<SlotOffer>result=newArrayList<>(offers.size());\n\nfor(SlotOfferoffer:offers){\nif(offerSlot(\ntaskManagerLocation,\ntaskManagerGateway,\noffer)){\n\nresult.add(offer);\n}\n}\n\nreturnresult;\n}
booleanofferSlot(\nfinalTaskManagerLocationtaskManagerLocation,\nfinalTaskManagerGatewaytaskManagerGateway,\nfinalSlotOfferslotOffer){\n\n……\n\n//usetheslottofulfillpendingrequest,inrequestedorder\n//按照请求顺序,使用slot来完成挂起的请求\ntryFulfillSlotRequestOrMakeAvailable(allocatedSlot);\n\n//weacceptedtherequestinanycase.slotwillbereleasedafteritidledfor\n//toolongandtimedout\nreturntrue;\n}
往期精彩内容:
用flink能替代spark的批处理功能吗
Flink进阶之滑动窗口统计实时热门商品
Flink进阶之使用CEP实现恶意登陆检测
重磅!Flink源码解析环境准备及提交流程之环境准备
大咖分享|通过制作一个迷你版Flink来学习Flink源码
关于任务网站平台源码分享到此分享完毕,希望能帮助到您。