Apache Spark-1.0.0浅析,五:资源调度——Task创建和分发

前面提到,submitMissingTask是分发任务的开始,首先submitMissingTasks判断该stage是否为shuffle map stage,是则getPreferredLocs,实例化一个ShuffleMapTasks返回一组task集合,否则是final stage,getPreferredLocs,实例化Result Task返回一组tasks集合;向listenerBus发送SparkListenerStageSubmitted事件;提前序列化一个task以保证其可以被序列化;最后taskScheduler.submitTasks提交TaskSet

/** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
    myPending.clear()
    var tasks = ArrayBuffer[Task[_]]()
    if (stage.isShuffleMap) {
      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
        val locs = getPreferredLocs(stage.rdd, p)
        tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
      }
    } else {
      // This is a final stage; figure out its job's missing partitions
      val job = resultStageToJob(stage)
      for (id <- 0 until job.numPartitions if !job.finished(id)) {
        val partition = job.partitions(id)
        val locs = getPreferredLocs(stage.rdd, partition)
        tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
      }
    }

    val properties = if (jobIdToActiveJob.contains(jobId)) {
      jobIdToActiveJob(stage.jobId).properties
    } else {
      // this stage will be assigned to "default" pool
      null
    }

    // must be run listener before possible NotSerializableException
    // should be "StageSubmitted" first and then "JobEnded"
    listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties))

    if (tasks.size > 0) {
      // Preemptively serialize a task to make sure it can be serialized. We are catching this
      // exception here because it would be fairly hard to catch the non-serializable exception
      // down the road, where we have several different implementations for local scheduler and
      // cluster schedulers.
      try {
        SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
      } catch {
        case e: NotSerializableException =>
          abortStage(stage, "Task not serializable: " + e.toString)
          runningStages -= stage
          return
      }

      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      myPending ++= tasks
      logDebug("New pending tasks: " + myPending)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
    } else {
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
      runningStages -= stage
    }
  }

taskScheduler.submitTasks,实例化TaskSetManager,记录activeTaskSets,判断任务isLocal且hasReceivedTask,启动一个Timer函数,以一定的时间间隔提交task,最后执行backend.reviveOffers

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient memory")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

CoarseGrainedSchedulerBackend继承schedulerBackend,重载reviveOffers,向driverActor发送ReviveOffers消息

override def reviveOffers() {
    driverActor ! ReviveOffers
  }

定义receive接收,接着执行makeOffers

def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          totalCores(executorId) = cores
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += scheduler.CPUS_PER_TASK
            makeOffers(executorId)
          } else {
            // Ignoring the update since we don't know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))
          }
        }

      case ReviveOffers =>
        makeOffers()

      case KillTask(taskId, executorId, interruptThread) =>
        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)

      case StopDriver =>
        sender ! true
        context.stop(self)

      case StopExecutors =>
        logInfo("Asking each executor to shut down")
        for (executor <- executorActor.values) {
          executor ! StopExecutor
        }
        sender ! true

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case DisassociatedEvent(_, address, _) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_,
          "remote Akka client disassociated"))

    }

makeoffers发现空闲资源,并launchTasks

// Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(
        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
    }

首先看WorkerOffers,查看worker中executor可用的资源,以freeCores计量

/**
 * Represents free resources available on an executor.
 */
private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)

再看scheduler.resourceOffers,标记可用的worker记录hostname,打乱资源offers避免将任务分发到相同的worker集,按照调度顺序为TaskSets分配资源,并Locality按递增顺序(round-robin)为其分配每一个node,这样可以让其有机会本地执行

/**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    SparkEnv.set(sc.env)

    // Mark each slave as alive and remember its hostname
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
      do {
        launchedTask = false
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) {
            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
              tasks(i) += task
              val tid = task.taskId
              taskIdToTaskSetId(tid) = taskSet.taskSet.id
              taskIdToExecutorId(tid) = execId
              activeExecutorIds += execId
              executorsByHost(host) += execId
              availableCpus(i) -= CPUS_PER_TASK
              assert (availableCpus(i) >= 0)
              launchedTask = true
            }
          }
        }
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

taskSet.resourceOffer回应单独executor的一个offer,对于一个task,分配一些资源并返回TaskDescription,根据Locality级别延迟调度

/**
   * Respond to an offer of a single executor from the scheduler by finding a task
   */
  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie) {
      val curTime = clock.getTime()

      var allowedLocality = getAllowedLocalityLevel(curTime)
      if (allowedLocality > maxLocality) {
        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
      }

      findTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality)) => {
          // Found a task; do some bookkeeping and return a task description
          val task = tasks(index)
          val taskId = sched.newTaskId()
          // Figure out whether this should count as a preferred launch
          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
            taskSet.id, index, taskId, execId, host, taskLocality))
          // Do various bookkeeping
          copiesRunning(index) += 1
          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
          // Serialize and return the task
          val startTime = clock.getTime()
          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
          // we assume the task can be serialized without exceptions.
          val serializedTask = Task.serializeWithDependencies(
            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          val timeTaken = clock.getTime() - startTime
          addRunningTask(taskId)
          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
            taskSet.id, index, serializedTask.limit, timeTaken))
          val taskName = "task %s:%d".format(taskSet.id, index)
          sched.dagScheduler.taskStarted(task, info)
          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
        }
        case _ =>
      }
    }
    None
  }

值得注意的是,依赖文件和Jar包的添加也在其中

val serializedTask = Task.serializeWithDependencies(
            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          val timeTaken = clock.getTime() - startTime
          addRunningTask(taskId)

准备完成提交Task,向executorActor(task.executorID)发送LaunchTask(task)消息,即将task发送到分配的executor上执行

// Launch tasks returned by a set of resource offers
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
        executorActor(task.executorId) ! LaunchTask(task)
      }
    }

Task创建分发完成。

END