博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Spark】DAGScheduler源代码浅析
阅读量:6416 次
发布时间:2019-06-23

本文共 7416 字,大约阅读时间需要 24 分钟。

DAGScheduler

DAGScheduler的主要任务是基于Stage构建DAG,决定每个任务的最佳位置

  • 记录哪个RDD或者Stage输出被物化
  • 面向stage的调度层。为job生成以stage组成的DAG。提交TaskSet给TaskScheduler运行
  • 又一次提交shuffle输出丢失的stage

每个Stage内。都是独立的tasks,他们共同运行同一个computefunction,享有同样的shuffledependencies。DAG在切分stage的时候是按照出现shuffle为界限的。

DAGScheduler实例化

以下的代码是SparkContext实例化DAGScheduler的过程:

@volatile private[spark] var dagScheduler: DAGScheduler = _  try {    dagScheduler = new DAGScheduler(this)  } catch {    case e: Exception => {      try {        stop()      } finally {        throw new SparkException("Error while constructing DAGScheduler", e)      }    }  }

以下代码显示了DAGScheduler的构造函数定义中,通过绑定TaskScheduler的方式创建,当中次构造函数去调用主构造函数来将sc的字段填充入參:

private[spark]class DAGScheduler(    private[scheduler] val sc: SparkContext,    private[scheduler] val taskScheduler: TaskScheduler,    listenerBus: LiveListenerBus,    mapOutputTracker: MapOutputTrackerMaster,    blockManagerMaster: BlockManagerMaster,    env: SparkEnv,    clock: Clock = new SystemClock())  extends Logging {  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {    this(      sc,      taskScheduler,      sc.listenerBus,      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],      sc.env.blockManager.master,      sc.env)  }  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

作业提交与DAGScheduler操作

Action的大部分操作会进行作业(job)的提交,源代码1.0版的job提交过程的大致调用链是:sc.runJob()–>dagScheduler.runJob–>dagScheduler.submitJob—>dagSchedulerEventProcessActor.JobSubmitted–>dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks

详细的作业提交运行期的函数调用为:

  1. sc.runJob->dagScheduler.runJob->submitJob
  2. DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor(在源代码1.4中,submitJob函数中,使用DAGSchedulerEventProcessLoop类进行事件的处理)
  3. eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
  4. job到stage的转换,生成finalStage并提交运行。关键是调用submitStage
  5. 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
  6. 假设计算中发现当前的stage没有不论什么依赖或者全部的依赖都已经准备完毕,则提交task
  7. 提交task是调用函数submitMissingTasks来完毕
  8. task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
  9. TaskSchedulerImpl中会依据Spark的当前运行模式来创建对应的backend,假设是在单机运行则创建LocalBackend
  10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
  11. receiveOffers->executor.launchTask->TaskRunner.run

DAGScheduler的runJob函数

DAGScheduler.runjob最后把结果通过resultHandler保存返回。

这里DAGScheduler的runJob函数调用DAGScheduler的submitJob函数来提交任务:

def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      allowLocal: Boolean,      resultHandler: (Int, U) => Unit,      properties: Properties): Unit = {    val start = System.nanoTime    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)    waiter.awaitResult() match {      case JobSucceeded => {        logInfo("Job %d finished: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      }      case JobFailed(exception: Exception) =>        logInfo("Job %d failed: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        throw exception    }  }

作业提交的调度

在Spark源代码1.4.0中,DAGScheduler的submitJob函数不再使用DAGEventProcessActor进行事件处理和消息通信,而是使用DAGSchedulerEventProcessLoop类实例eventProcessLoop进行JobSubmitted事件的post动作。

以下是submitJob函数代码:

/**   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object   * can be used to block until the the job finishes executing or can be used to cancel the job.   */  def submitJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      allowLocal: Boolean,      resultHandler: (Int, U) => Unit,      properties: Properties): JobWaiter[U] = {    // Check to make sure we are not launching a task on a partition that does not exist.    val maxPartitions = rdd.partitions.length    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>      throw new IllegalArgumentException(        "Attempting to access a non-existent partition: " + p + ". " +          "Total number of partitions: " + maxPartitions)    }    val jobId = nextJobId.getAndIncrement()    if (partitions.size == 0) {      return new JobWaiter[U](this, jobId, 0, resultHandler)    }    assert(partitions.size > 0)    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)    eventProcessLoop.post(JobSubmitted(      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))    waiter  }

当eventProcessLoop对象投递了JobSubmitted事件之后,对象内的eventThread线程实例对事件进行处理。不断从事件队列中取出事件,调用onReceive函数处理事件。当匹配到JobSubmitted事件后。调用DAGScheduler的handleJobSubmitted函数并传入jobid、rdd等參数来处理Job。

handleJobSubmitted函数

Job处理过程中handleJobSubmitted比較关键,该函数主要负责RDD的依赖性分析。生成finalStage,并依据finalStage来产生ActiveJob。

在handleJobSubmitted函数源代码中。给出了部分凝视:

private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      allowLocal: Boolean,      callSite: CallSite,      listener: JobListener,      properties: Properties) {    var finalStage: Stage = null    try {      // New stage creation may throw an exception if, for example, jobs are run on a      // HadoopRDD whose underlying HDFS files have been deleted.      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)    } catch {      //错误处理,告诉监听器作业失败。返回....      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    if (finalStage != null) {      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)      clearCacheLocs()      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(        job.jobId, callSite.shortForm, partitions.length, allowLocal))      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")      logInfo("Parents of final stage: " + finalStage.parents)      logInfo("Missing parents: " + getMissingParentStages(finalStage))      val shouldRunLocally =        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1      val jobSubmissionTime = clock.getTimeMillis()      if (shouldRunLocally) {        // 非常短、没有父stage的本地操作,比方 first() or take() 的操作本地运行        // Compute very short actions like first() or take() with no parent stages locally.        listenerBus.post(          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))        runLocally(job)      } else {        // collect等操作走的是这个过程。更新相关的关系映射,用监听器监听,然后提交作业        jobIdToActiveJob(jobId) = job        activeJobs += job        finalStage.resultOfJob = Some(job)        val stageIds = jobIdToStageIds(jobId).toArray        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))        listenerBus.post(          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))        // 提交stage        submitStage(finalStage)      }    }    // 提交stage    submitWaitingStages()  }

小结

该篇文章介绍了DAGScheduler从SparkContext中进行实例化,到运行Action操作时提交任务调用runJob函数,进而介绍了提交任务的消息调度,和处理Job函数handleJobSubmitted函数。

因为在handleJobSubmitted函数中涉及到依赖性分析和stage的源代码内容,于是我计划在下一篇文章里进行介绍和源代码分析。

转载请注明作者Jason Ding及其出处

Google搜索jasonding1354进入我的博客主页

你可能感兴趣的文章
直播疑难杂症排查(8)— 播放杂音、噪音、回声问题
查看>>
安装乌班图系统,并且演示有趣的linux命令,你还怕对linux无兴趣吗
查看>>
处理器高端之路停滞,联发科欲进军诺基亚领地
查看>>
IBM存储部门换了新老板:还是6年前那个
查看>>
IBM公司公布三层单元PCM-MLC,向3DX堆栈方案发起挑战
查看>>
《2040大预言:高科技引擎与社会新秩序》—— 导读
查看>>
数据库操作:添加、插入、更新语句
查看>>
降低数据中心能源消耗
查看>>
IT众包不养技术人员该怎么玩?
查看>>
自主编写部署性能测试的备
查看>>
10余智慧项目建设初见成效 南岸用智慧城市开启智慧生活
查看>>
1个月千余人参加!阿里云大学互联网技能“轻”认证受热捧
查看>>
互金网络安全良好率才一半 信息泄露成主要风险
查看>>
wap前端测试改进总结
查看>>
大数据对企业的种种影响
查看>>
Windows8正式谢幕
查看>>
《Adobe Illustrator CC 2014中文版经典教程(彩色版)》—第1课0.4节创建形状
查看>>
《大咖讲Wireshark网络分析》—从一道面试题开始说起
查看>>
《算法基础:打开算法之门》一3.6 小结
查看>>
《构建高可用Linux服务器 第3版》—— 1.1 使用PXE+DHCP+Apache+Kickstart无人值守安装CentOS 5.8 x86_64...
查看>>