【Spark学习】Apache Spark作业调度机制

Spark版本:1.1.1

本文系从官方文档翻译而来,转载请尊重译者的工作,注明以下链接:

http://www.cnblogs.com/zhangningbo/p/4135905.html


  • 概述
  • 不同应用程序间的资源调度
  • 同一应用程序内的资源调度
  1. Fair调度池
  2. 调度池的默认行为
  3. 调度池的属性配置

概述


Spark有几个功能用于在作业之间(译者注:作业包含两类:1)不同应用程序所执行的作业;2)同一应用程序内的不同作业所执行的作业。无论哪种作业,Spark都可以完成作业之间的资源调度。)调度资源。首先,回顾一下集群模式概述一节所讲的内容,每个Spark应用程序(SparkContext实例)都运行着一组独立的executor进程,而应用程序之间的资源调度工作由Spark所采用的集群管理器来提供。其次,在每个Spark应用程序内部,如果多个作业分别是由不同的线程提交的,那么,它们可能会并发执行。要是你的应用程序通过网络提供访问请求,那么这种多个作业并发执行的情况就会很常见。例如,Shark服务器就是以这种方式工作的。Spark使用fair scheduler在每个SparkContext内部调度资源。

不同应用程序间的资源调度


当Spark应用程序在集群上执行时,它会获得一组独立的executor JVM只用来为自己运行任务(task)及保存数据。如果多个用户需要共享集群,那么可以通过不同的选项来管理资源的分配方式。这取决于集群管理器的类型。

在所有集群管理器上都可用的最简单的选项,就是静态资源分区。利用这种方法,每个应用程序都可以设置其所用资源的最大数量,并在其整个执行周期内占有这些资源。这正是Spark在Standalone、YARN和coarse-grained Mesos模式下所采用的方案。集群上的资源分配方式配置如下(依据不同的集群类型):

  • Standalone集群模式:默认情况下,提交给Standalone集群的应用程序以FIFO(first in, first out)顺序执行,同时,每个应用程序都会尝试使用所有可用的节点。你可以通过设置属性spark.cores.max来限制应用程序能使用的节点数目,或者,对于那些未设置该属性的应用程序,还可以通过设置属性spark.deploy.defaultCores来改变默认的资源分配方式(即每个应用使用所有可用节点)。最后,除了控制CPU核数外,每个应用程序可以通过属性spark.executor.memory来控制其内存用量。
  • Mesos集群模式:要在Mesos集群上使用静态资源分区,将属性spark.mesos.coarse配置为true即可,还可以根据需要设置属性spark.cores.max来限制每个应用程序所占用的资源,如同在Standalone集群模式下所做的配置一样。你还应当设置属性spark.executor.memor来控制executor的内存占用量。
  • YARN集群模式:用于Spark YARN客户端的命令行选项--num-executors控制着应用程序要在集群上分配多少个executor来执行作业,而选项--executor-memory和--executor-cores则分别控制着每个executor所用的内存量和CPU核数。

在Mesos集群上还有一个可用选项,就是动态共享CPU核数。在这种模式下,虽然每个spark应用程序仍然有其固定且独立的内存分配方式(由属性spark.executor.memory设置),但是当一个应用程序没有运行任务(task)的时候,其他应用程序就会占用该应用程序的CPU核来运行他们自己的任务(task)。这种模式在那些有着大量的需要间断执行的应用程序的场合非常有用,比如,来自多个用户的shell会话。然而,该模式也伴随着一个风险——不可预测的延迟,因为当一个应用程序有任务执行时,它想再次获得那些CPU核可能就需要等待一些时间。要想使用这种模式,可以简单地用 mesos://URL而无需设置属性spark.mesos.coarse为true。

需要注意的是以上模式中,当前还没有一个提供不同应用程序间的内存共享。如果你想用这种方式来共享数据,我们推荐运行一个单服务器应用程序,让它可以通过查询相同的RDD来响应多个服务请求。例如,Shark JDBC服务器就是以这种方式来做SQL查询的。在未来的Spark版本中,内存存储系统(如Tachyon)将提供另一种途径来共享RDD。

同一应用程序内的资源调度


在Spark应用程序(即SparkContext实例)内部,如果多个作业分别由单独的线程提交,那么它们就可以同时并行执行。在本节,我们所说的“job”指的是Spark action(如save、collect等)以及需要执行该action的任何任务(task)。Spark调度器是完全线程安全的,而且支持这种特性能让应用程序响应多个服务请求(比如为多个用户提供查询服务)。

默认情况下,spark调度器以FIFO方式运行作业。每个作业被分成若干个“stage”(比如map和reduce阶段),同时,当第一个作业中的stage有任务(task)要启动时,那么该作业优先获得所有可用资源,接着是第二个作业获得优先权,依次类推。如果队列首位的作业不需要占用集群上的所有资源,那么后续的作业就可以立即启动,但是如果队列首位的作业规模很大,那么后续作业的执行会显著地延迟。

从Spark 0.8版开始,通过配置我们就可以让各个作业公平地共享资源。在公平资源共享模式下,spark以轮转(round robin)方式在各个作业(job)之间分配任务(task),以便所有作业都能大致平等地共享集群资源。这就意味着,在长期作业正在运行时提交的短期作业可以立即获得资源并得到良好的响应时间,而无需等待长期作业执行完毕。该模式对于多用户环境是最佳的。

要使能这种公平调度器(fair scheduler),可以在配置SparkContext时简单地将属性spark.scheduler.mode设置为FAIR即可:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

Fair调度池

fair scheduler调度器还支持把作业分组放入调度池(pool)里,并为每个调度池设置不同的调度选项(如权重weight)。这个功能是很有用的,例如,对于那些更重要的作业设置优先级,或者把每个用户的作业分别合并为一组,无论他们各自有多少个并发作业,都让其平等地共享资源,而不是让每个作业平等地共享资源。这种方法是基于Hadoop Fair Scheduler来实现的。

无需任何干预,新提交的作业会进入默认调度池,但是作业所属的调度池也是可以设置的,即在提交作业的那个线程内,向SparkContext中添加本地属性(local property)spark.scheduler.pool就能实现。方法如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

设置了该本地属性之后,此线程中提交的所有作业(在此线程中调用RDD.save,count,collect等)都将使用这个调度池名称。这个设置是每个线程中都有一个,便于让一个线程以同一用户的名义运行多个作业。如果你想清空与该线程关联的调度池,可以简单地调用:

sc.setLocalProperty("spark.scheduler.pool", null)

调度池的默认行为

默认情况下,每个调度池都会平等地共享集群资源(在默认调度池内的每个作业也都是平等地共享资源) ,但在每个调度池内,各个作业以FIFO顺序运行。例如,如果你为每个用户各创建一个调度池,也就意味着每个用户都将平等地共享集群资源,且每个用户的查询作业将按顺序运行,而不是后续的查询作业抢占该用户之前的查询作业已经获得的资源。

调度池的属性配置

对于特定的调度池,其属性也可以通过配置文件来修改。每个调度池都支持以下三个属性:

  • schedulingMode:该属性可以设为FIFO或FAIR,用来控制调度池内的各个作业是按照排队顺序执行(默认行为),还是平等地共享该调度池的资源。
  • weight:该属性控制一个调度池与其他调度池共享集群资源的方式。默认情况下,所有调度池的权重均为1。举例来说,如果你将某个调度池的权重设为2,那么,它获得的资源将是其他调度池的两倍。设置一个较高的权重(比如1000)还可能实现调度池之间的优先级——基本上,权重为1000的调度池无论何时有活动的作业,它都总是优先执行其任务(task)。
  • minShare:除了设定调度池的总体权重外,每个调度池都可以设置共享资源的最小数量(如CPU核数)。在根据权重重新分发额外的资源之前,fair scheduler调度器总是尝试去满足所有活跃调度池所需的共享资源的最小数量。因此,minShare属性就成了确保调度池在无需设置高优先级的情况下就能快速获得定量资源(如10个CPU核)的又一种方法。默认情况下,每个调度池的minShare属性值为0.

调度池的属性可以通过创建一个XML文件(类似于conf/fairscheduler.xml.template)并在你的SparkContext中设定属性spark.scheduler.allocation.file来配置。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

这个XML文件的格式很简单,就是每个调度池一个<pool>元素,该元素内部含有不同的子元素用于设置各个属性。例如:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

文件conf/fairscheduler.xml.template也可以作为一个完整的示例。需要注意的是任何一个未在XML文件中配置过的调度池都会简单地设置为默认值(即schedulingMode为FIFO,weight为1,minShare为0)。