【Spark学习】Apache Spark调优

Spark版本:1.1.0

本文系以开源中国社区的译文为基础,结合官方文档翻译修订而来,转载请注明以下链接:

http://www.cnblogs.com/zhangningbo/p/4117981.html

http://www.oschina.net/translate/spark-tuning

目录


  • 数据序列化
  • 内存优化
  1. 确定内存消耗
  2. 优化数据结构
  3. 序列化RDD存储
  4. 优化内存回收
  • 其他考虑因素
  1. 并行度
  2. Reduce任务的内存用量
  3. 广播”大变量“
  • 总结

  因为大多数Spark程序都具有“内存计算”的特性,所以集群中的任何资源都有可能成为Spark程序的瓶颈,比如,CPU、网络带宽或者内存。通常情况下, 如果数据完全加载到内存,那么,网络带宽就会成为瓶颈。但有时候,你还需要对程序进行优化,以便减少内存使用,例如以序列化的形式存储RDD数据(Resilient Distributed Datasets)。本文包含两个主要议题:数据序列化和内存优化,数据序列化不但可以提高网络性能还能减少内存使用。此外,我们还讨论了其他几个小议题。

数据序列化


  序列化对于提高分布式程序的性能起着重要的作用。一个不好的序列化方式(如,序列化的速度非常慢或者序列化的结果非常大)会极大降低计算速度。通常,这是你优化Spark程序时要调整的首要参数。Spark试图在便利性(可以让用户在操作中使用任何Java类型)和性能之间取得平衡。它提供了两个序列化类库:

  • Java 序列化:在默认情况下,Spark采用Java的ObjectOutputStream框架来序列化对象,而且该方式适用于所有实现了java.io.Serializable的类。通过继承 java.io.Externalizable,你可以进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。
  • Kryo序列化:Spark也可以使用Kryo(版本2)来更快地序列化对象。Kryo不但速度极快,而且比Java序列化所产生的结果更为紧凑(通常能提高10倍)。但Kryo的缺点是不支持所有的Serializable类型,而且为了获得最佳性能,你必须提前注册要在程序中使用的类。

  你可以在创建SparkContext之前,通过SparkConf类并调用conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),将序列化方式切换成Kryo。这一设置可以让序列化器serializer既适用于worker节点间shuffling数据,又能用于序列化RDD时将结果存入磁盘。Kryo不能成为默认方式的唯一原因是需要用户进行注册;但是,对于任何 “网络密集型”(network-intensive)的应用,我们都建议尝试该方式。

  Spark会自动包含Kryo序列化器serializer以用于含在AllScalaRegister(来自Twitter chill库)当中的许多通用的核心Scala类。

  为了将自定义类注册到Kryo,你需要继承公共类org.apache.spark.serializer.KryoRegistrator并且设置配置属性spark.kryo.registrator指向该类,如下所示:

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass1])
    kryo.register(classOf[MyClass2])
  }
}

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(conf)

  Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。

  如果对象非常大,你还需要增加配置属性spark.kryoserializer.buffer.mb的值。其默认值是2,但是该属性需要足够大以便能够容纳需要序列化的最大对象。

  最后,如果你不注册自定义的类,Kryo仍然可以工作,但是需要为每一个对象保存其对应的完整类名,这是非常浪费的。

内存优化


  内存优化有三个方面的考虑:对象所占用的内存量(你或许希望将所有数据都加载到内存),访问对象的代价以及垃圾回收所占用的开销(如果你的对象具有很高的周转率)。

  默认情况下,Java对象的访问速度很快,但其占用的空间通常是其内部”raw“数据的2-5倍。这是由以下几方面原因引起的:

  • 每一个不同的Java对象都包含一个“对象头”(object header),对象头大约有16字节,并且包含了指向对象所对应的类(class)的指针等信息。如果对象本身包含的数据非常少(比如就一个Int字段),那么对象头可能会比对象数据还要大。
  • Java String在实际的raw字符串数据之外,还需要大约40字节的额外开销(因为String使用一个Char数组来保存字符串,而且需要保存长度等额外数据);同时,因为String在内部使用UTF-16编码,每一个字符需要占用两个字节,所以,一个长度为10字节的字符串需要占用60个字节。
  • 通用的集合类,例如HashMap、LinkedList等,都采用了链表数据结构,对于每一个条目(如Map.Entry)都进行了封装。每一个条目不仅包含对象头,还包含了一个指向下一条目的指针(通常每个指针占8字节)。
  • 基本类型(primitive type)的集合通常都保存为”boxed“对象,例如java.lang.Integer。

  本节讨论如何估算对象所占用的内存量以及如何改进——通过改变数据结构或者采用序列化方式存储数据。接下来,我们将讨论如何优化Spark的缓存以及Java内存回收(garbage collection)。

确定内存消耗

  确定数据集(dataset)所需内存量的最佳方法就是创建一个RDD,然后将其放入缓存,最后查看驱动程序(driver program)中SparkContext的日志。日志会告诉你每个分区占用了多少内存量;你可以合计这些内存占用量以确定RDD所需的内存总量。日志信息如下:

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)

  该信息表明RDD0的1号分区消耗了717.5KB内存。

优化数据结构

  减少内存占用的首要途径是避免使用一些增加额外开销的Java特性,例如基于指针的数据结构和二次封装对象等。有以下几种方法可以达到此目的:

  1. 使用对象数组以及原始类型(primitive type)数组来设计数据结构,以替代标准Java或Scala集合类(如HashMap)。 fastutil 库为原始数据类型提供了非常方便的集合类,且兼容Java标准类库。
  2. 尽可能避免使用包含大量小对象和指针的嵌套数据结构。
  3. 考虑采用数字ID或者枚举类型来替代String类型的主键。
  4. 如果内存少于32GB,那么设置JVM参数-XX:+UseCompressedOops以便将8字节指针压缩成4字节。你可以将这些选项添加到spark-env.sh。(同时,在Java 7或者更高版本,设置JVM参数-XX:+UseCompressedStrings以便采用8比特来编码每一个ASCII字符。)

序列化RDD存储

  尽管经过了上述优化,但是对象还是太大以至不能高效存储数据,那么,还有一个减少内存使用的简单方法——以序列化形式存储数据,即在RDD持久化API中使用序列化的StorageLevel,例如MEMORY_ONLY_SER。Spark将每个RDD分区都保存为byte数组。序列化带来的唯一缺点是会降低访问速度,因为需要将对象反序列化。如果需要采用序列化的方式缓存数据,我们强烈建议采用Kryo,Kryo序列化结果比Java标准序列化的更小(从某种程度上,甚至比对象内部的raw数据都要小)。

优化内存回收

  如果你需要不断的“翻动”程序保存的RDD数据,那么JVM内存回收可能会成为问题(通常,如果读取RDD一次,然后再进行多个操作,这样是不会有问题的)。当Java需要回收旧对象以便为新对象腾出内存空间时,JVM需要跟踪所有的Java对象以确定哪些对象是不再需要的。需要记住的一点是,内存回收的代价与Java对象的数量成正比;因此,使用含有对象数量更小的数据结构(例如使用Int数组而不是LinkedList)能显著降低这种开销。另一种更好的方法是采用对象序列化,如上所述:现在每个RDD分区只有一个对象(一个字节数组)。如果内存回收(GC)存在问题,那么在尝试其他方法之前,首先应尝试使用序列化缓存(serialized caching)

  任务(task)的工作内存(运行任务所需的内存量)与缓存在节点的RDD之间会相互影响,这种影响也会造成内存回收问题。下面我们将讨论如何为RDD缓存分配空间以便减轻这种影响。

估算内存回收的影响

  优化内存回收的第一步是获取一些统计信息,包括内存回收的频率、内存回收耗费的时间等。为了获取这些统计信息,我们可以把参数-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps添加到环境变量SPARK_JAVA_OPTS。设置完成后,下次Spark作业运行时,我们可以在worker的日志中看到每一次内存回收时产生的信息。注意,这些日志保存在集群中的worker节点(在工作路径下的stdout文件中)而不是你的驱动程序(driver program).

优化缓存大小

  用多大的内存来缓存RDD是内存回收一个重要的配置参数。默认情况下,Spark采用运行内存(executor memory,spark.executor.memory或者SPARK_MEM)的60%来缓存RDD。这表明在任务执行期间,有40%的内存可以用来创建对象。

  如果任务运行速度变慢且JVM频繁进行内存回收,或者内存空间不足,那么降低缓存大小设置有助于减少内存消耗。为了将缓存大小修改为50%,你可以在SparkConf上调用方法conf.set("spark.storage.memoryFraction", "0.5")。结合序列化缓存,使用较小的缓存足够解决大部分内存回收问题。如果你有兴趣进一步优化Java内存回收,请继续阅读下面文章。

内存回收高级优化

  为了进一步优化内存回收,我们首先需要了解JVM内存管理的一些基本知识。

  • Java堆(heap)空间分为两部分:新生代和老生代。新生代用于保存生命周期较短的对象;老生代用于保存生命周期较长的对象。
  • 新生代进一步划分为三部分[Eden, Survivor1, Survivor2]
  • 内存回收过程的简要描述:如果Eden区域已满,则在Eden上运行的minor GC与在Eden和Survivor1中仍然活跃的对象将被复制到Survivor2。然后将Survivor1和Survivor2对换。如果对象活跃的时间已经足够长或者Survivor2区域已满,那么会将对象拷贝到Old区域。最终,如果Old区域消耗殆尽,则执行full GC。

  Spark内存回收优化的目标是确保只有长时间存活的RDD才被保存到老生代区域;同时,新生代区域足够大以保存生命周期比较短的对象。这样,在任务执行期间就可以避免执行full GC去回收任务执行期间所创建的临时对象。下面是一些可能有用的执行步骤:

  • 通过收集GC状态信息来检查内存回收是否过于频繁。如果在任务结束之前执行了很多次full GC,则表明任务执行的内存空间不足。
  • 在打印的内存回收状态信息中,如果老生代接近消耗殆尽,那么就减少用于缓存的内存空间。这可以通过属性spark.storage.memoryFraction来完成。减少缓存对象以提高执行速度是非常值得的。
  • 如果有过多的minor GC而不是full GC,那么为Eden分配更大的内存是有益的。你可以为Eden分配大于任务执行所需要的内存空间。如果Eden的大小确定为E,那么可以通过 -Xmn=4/3*E来设置新生代的大小(将内存扩大到4/3是考虑到survivor所需要的空间)。
  • 举一个例子,如果你的任务从HDFS读取数据,那么该任务所需要的内存量可以从读取的block数量估算出来。注意,解压后的block通常为解压前的2-3 倍。所以,如果我们需要同时执行3或4个任务,且HDFS的block大小为64MB,那么我们可以估算出Eden的大小为4*3*64MB。
  • 监控在采用新的参数设置时内存回收的频率以及消耗的时间。

  我们的经验表明内存回收优化的效果取决于你的程序和可用的内存量。 网上还有很多其他的优化选项, 但从深层次来讲,控制内存回收的频率有助于降低额外开销。

其他考虑因素


并行度

  集群资源不会被充分利用,除非为每一个操作都设置足够高的并行度。Spark会根据每一个文件的大小自动设置运行在该文件上的“Map"任务的数量(你也可以通过SparkContext.textFile的配置参数来控制);对于分布式"reduce"任务(例如groupByKey或者reduceByKey),则利用最大父级(parent)RDD的分区数。你可以通过第二个参数传入并行度(阅读文档spark.PairRDDFunctions )或者通过设置系统参数spark.default.parallelism来改变默认值。通常来讲,在集群中,我们建议为每一个CPU核(core)分配2-3个任务。

Reduce任务的内存使用

  有时,你会碰到OutOfMemoryError错误,这不是因为你的RDD不能加载到内存,而是因为任务执行的数据集过大,例如正在执行groupByKey操作的reduce任务。Spark的”混洗“(shuffle)操作(sortByKey、groupByKey、reduceByKey、join等)为了完成分组会为每一个任务创建哈希表,哈希表通常都比较大。最简单的修复方法是增加并行度,这样,每一个任务的输入会变的更小。Spark能够高效地支持耗时短至200ms的任务,因为它会对所有的任务复用一个worker JVM,这样能减小任务启动的消耗。所以,你可以放心地使任务的并行度大于集群的CPU核数。

广播”大变量“

  使用SparkContext的广播功能可以极大地减小每一个序列化任务的大小以及在集群中启动作业的消耗。如果任务使用驱动程序(driver program)中比较大的对象(例如静态查找表),考虑将其变成可广播变量。Spark会在master上打印每一个任务序列化后的大小,所以你可以通过检查那些信息来确定任务是否过于庞大。通常来讲,大于20KB的任务很可能都是值得优化的。

总结


  本文指出了Spark程序优化所需了解的几个要点——最重要的是数据序列化和内存优化。对于大多数程序而言,采用Kryo框架并以序列化形式存储数据,能够解决大部分性能问题。非常欢迎在Spark mailing list提问优化相关的问题。

【扩展】

    1. Spark调优

    2. Spark&Spark性能调优实战

    3. Spark on Yarn:性能调优

    4. spark调优经验

    5. Spark性能优化的10大问题及其解决方案

    6. Sharethrough使用Spark Streaming优化实时竞价