【干货】Spark 之性能优化



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

1、诊断Spark 程序内存的消耗

ASpark 程序内存都花费在哪里?

Ø每个 Java 对象,都有一个对象头,会占用 16 个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个 int 类型的 field,那么它的对象头实际上占用的内存比对象自己还要大;

ØJava 的 String 对象,会比它内部的原始数据,要多出 40 个字节。因为它内部使用 char 数组来保存内部的字符序列,并且还得保存诸如数组长度之类的信息,而且由于 String 使用的是 UTF-16 编码,因此每个字符会占用 2 个字节。比如,包含 10 个字符的 String,会占用 60 个字节;

ØJava 中的集合类型,比如 HashMap 和 LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了 Entry 对象来包装。Entry 对象不光有对象头,还有指向下一个 Entry 的指针,通常占用 8 个字节;

Ø元素类型为原始数据类型(比如 int)的集合,内部通常会使用原始数据类型的包装类型,比如 Integer,来存储元素。

B、如何判断Spark程序消耗的内存

Ø首先,自己设置 RDD 的并行度,有两种方式:要不然,在 parallelize()、textFile() 等方法中,传入第二个参数,设置 RDD 的 task/partition 的数量;要不然,用 SparkConf.set() 方法,设置一个参数,spark.default.parallelism,可以统一设置这个 application 所有 RDD 的 partition 数量;

Ø其次,在程序中将 RDD cache 到内存中,调用 RDD.cache() 方法即可;

Ø接着,观察 Driver 的 log,会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息,这就显示了每个 partition 占用了多少内存;

Ø最后,将这个内存信息乘以 partition 数量,即可得出 RDD 的内存占用量。

2、使用高性能序列化类库

A、Java 序列化机制 &Kryo 序列化机制

默认情况下,Spark 使用 Java 自身的 ObjectInputStream 和 ObjectOutputStream 机制进行对象的序列化。只要你的类实现了 Serializable 接口,那么都是可以序列化的,而且 Java 序列化机制是提供了自定义序列化支持的,只要你实现 Externalizable 接口即可实现自己的更高性能的序列化算法。Java 序列化机制有两个缺陷,其一是速度比较慢,其二是序列化后的数据占用的内存空间比较大;

Spark 同样支持使用 Kryo 类库来进行序列化。Kryo 序列化机制比 Java 序列化机制更快,而且序列化后的数据占用的空间更小,通常比 Java 序列化的数据占用的空间要小 10 倍。Kryo 序列化机制之所以不是 Spark 默认序列化机制的原因是,有些类型虽然实现了 Seriralizable 接口,但是它不一定能够进行 Kryo 序列化;此外,如果你要得到最佳的性能,Kryo 还要求你在 Spark 应用程序中,对所有你需要序列化的类型都进行注册。

B、如何使用 Kryo 序列化机制

如果要使用 Kryo 序列化机制,首先要用 SparkConf 设置一个参数,使用 new SparkConf().set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 即可,将 Spark 的序列化器设置为 KryoSerializer。这样,Spark 在内部的一些操作,比如 Shuffle,进行序列化时,就会使用 Kryo 类库进行高性能、快速、更低内存占用量的序列化了;

使用 Kryo 时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么 Kryo 必须时刻保存类型的全限定名,反而占用不少内存。Spark 默认是对 Scala 中常用的类型自动注册了 Kryo 的,都在 AllScalaRegistry 类中。但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册。举例如下:

val counter = new Counter();

val numbers = sc.parallelize(Array(1, 2, 3, 4, 5))

numbers.foreach(num => counter.add(num));

(实际上,上面的写法是错误的,因为 counter 不是共享的,所以累加的功能是无法实现的)

如果要注册自定义的类型,那么就需要使用如下的代码:

Scala 版本:

val conf = new SparkConf().setMaster(…).setAppName(…)

conf.registerKryoClasses(Array(classOf[Counter]))

val sc = new SparkContext(conf)

Java 版本:

SparkConf conf = new SparkConf().setMaster(…).setAppName(…);

conf.registerKryoClasses(Counter.class);

JavaSparkContext sc = new JavaSparkContext(conf);

C、优化 Kryo 类库使用

优化缓存大小

如果注册的要序列化的自定义类型,本身特别大,比如包含了超过 100 个 field。那么就会导致要序列化的对象过大,此时就需要对 Kryo 本身进行优化。因为 Kryo 内部的缓存可能不够存放那么大的 class 对象。此时就需要调用 SparkConf.set() 方法,设置 spark.kryoserializer.buffer.mb 参数的值,将其调大。默认情况下它的值是 2,就是说最大能缓存 2M 的对象,然后进行序列化。可以在必要时将其调大,比如设置为 10。

预先注册自定义类型

虽然不注册自定义类型,Kryo 类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存,因此通常都建议预先注册好要序列化的自定义类。

D、什么场景适合使用 Kryo 序列化类库

这里针对的 Kryo 序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说,在外部定义了一个封装了应用所有配置信息的对象,比如自定义了一个 MyConfiguration 对象,里面包含了 100m 的数据,然后,在算子函数里面,使用到了这个外部的大对象。此时呢,如果默认情况下,让 Spark 用 java 序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度非常缓慢,并且序列化以后的数据还是很大,比较占用内存空间。

因此,在这种情况下,比较适合切换到 Kryo 序列化类库,来对算子外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。

3、优化数据结构

要减少内存的消耗,除了使用高校的序列化类库以外,还有一个很重要的事情,就是优化数据结构,从而避免 Java 语法特性中所导致的额外内存开销,比如基于指针的 Java 数据结构以及包装类型。有一个关键问题,就是优化什么数据结构?其实主要就是优化你的算子函数内部使用到的局部数据,或者是算子函数外部的数据,都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗的占用。

那么,如何优化数据结构呢?

优先使用数组以及字符串,而不是集合类。也就是说,优先用 array,而不是 ArrayList、LinkedList、HashMap 等集合。比如,有个 List list = new ArrayList(),将其替换为 int[] arr = new int[]。这样的话,array 既比 List 少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比 List 中用 Integer 这种包装类型存储数据,要节省内存得多;还比如,通常企业级应用中的做法是,对于 HashMap、List 这种数据,统一用 String 拼接成特殊格式的字符串,比如 Map persons = new HashMap(),可以优化为特殊的字符串格式,如:id:name,address|id:name,address…。

避免使用多层嵌套的对象结构,比如:public class Teacher {private List students = new ArrayList() },就是非常不好的例子,因为 Teacher 类的内部又嵌套了大量的小 Student 对象。对于上述例子,完全可以使用特殊的字符串来进行数据的存储,比如,用 json 字符串来存储数据,就是一个很好的选择:{“teacherId”: 1, “teacherName”: “leo”, students:[{“studentId”: 1, “studentName”: “tom”},{“studentId”:2, “studentName”:“marry”}]}

例如 mapreduce 程序:

context.write(new Text(key),new person());

替换为:

Person p1 = new Person(1,“zahngsan”);

String str = JSON.toJSON(p1);

context.write(new text(key),str);

对于有些能够避免的场景,尽量使用 int 替代 String。因为 String 虽然比 ArrayList、HashMap 等数据结构高效一些,占用内存量少很多,但是之前分析过,还是有额外内存的消耗。比如之前用 String 表示 id,那么现在完全可以用数字类型的 int,来进行替代。这里提醒下,在 spark 应用中,id 就不要用常用的 uuid 了,因为无法转成 int,就用自增的 int 类型的 id 即可。(uuid 如 -234242342-)。

4、对多次操作的 RDD 进行持久化或 CheckPoint

如果在程序中,对某一个 RDD,基于它进行了多次 transformation 或 action 操作,那么就非常有必要进行持久化操作,以避免对一个 RDD 反复进行计算。此外,如果要保证 RDD 的持久化数据可能丢失的情况下,还要保证高性能,那么可以对 RDD 进行 CheckPoint 操作。整个过程及详细说明,如下图所示:

除了对多次使用的 RDD 进行持久化操作之外,还可以进一步优化其性能,因为很可能,RDD 的数据是持久化到内存或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如 MEMORY_ONLY_SER、MEMORY_AND_DISK_SER 等,使用 RDD.persist(StorageLevel.MEMORY_ONLY_SER) 这样的语法即可。这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘 io 性能消耗也比较小。

对 RDD 持久化序列化后,RDD 的每个 partition 的数据,都是序列化为一个巨大的字节数组,这样,对于内存的消耗就小得多了。但是唯一的缺点就是,获取 RDD 数据时,需要对其进行反序列,会增大其性能开销。因此,对于序列化的持久化级别,还可以进一步优化,就是说,使用 Kryo 序列化类库,这样可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果 RDD 的元素(如 RDD 的泛型类型),是自定义类型的话,在 Kryo 中需要提前注册该自定义类型。

5、措施四:优化 Java 虚拟机垃圾回收

如果在持久化 RDD 的时候,持久化了大量的数据,那么 Java 虚拟机垃圾回收就可能成为一个性能瓶颈。因为 Java 虚拟机会定期进行垃圾回收,此时就会追踪所有的 Java 对象,并且在垃圾回收时,找到那些已经不再使用的对象,然后清理旧的对象,来给新的对象腾出内存空间。垃圾回收的性能开销,是跟内存中的对象数量成正比的,所以,对于垃圾回收的性能问题,首先要做的就是使用更高效的数据结构,比如 Array 和 String;其次就是在持久化 RDD 时,使用序列化级别,而且用 Kryo 序列化类库,这样每个 Partition 就只是一个对象——一个字节数组。

在优化 Java 虚拟机垃圾回收之前,可以对垃圾回收进行检测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在 spark-submit 脚本中,增加一个配置即可,即–conf spark.executor.extraJavaOptions=-verbose:gc –XX:+PrintGCDetails –XX:+PrintGCTimerStamps。但是要记住,这里虽然会打印出 Java 虚拟机的垃圾回收的相关信息,但是是输出到了 worker 节点上的日志中,而不是 driver 的日志中;其实也完全可以通过 SparkUI(4044 端口) 来观察每个 stage 的垃圾回收的情况。

lgc 本身有性能消耗,如果频繁发生,对性能的影响不能忽视;

l 数据量过大,每次 gc 回收的数据量也会很大,gc 的速度会比较慢;

lgc 也是由一个线程来执行,而 gc 线程会阻塞工作线程,严重影响 Spark 性能。

A、优化 executor 内存比例

对于垃圾回收来说,最重要的就是调节 RDD 缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark 使用每个 executor 60% 的内存空间来缓存 RDD,那么在 task 执行期间创建的对象,只有 40% 的内存空间来存放。在这种情况下,很有可能因为内存空间的不足,task 创建的对象过大,那么一旦发现 40% 的内存空间不够用了,就会触发 Java 虚拟机的垃圾回收操作。因此,在极端情况下,垃圾回收操作可能会被频繁触发。

在上述情况下,如果发现垃圾回收频繁发生,就需要对那个比例进行调优,使用 new SparkConf().set(“spark.storage.memoryFraction”,”0.5”) 即可,可以将 RDD 缓存占用空间的比例降低,从而给更多的空间让 task 创建的对象进行使用。因此,对于 RDD 持久化,完全可以使用 Kryo 序列化,加上降低其 executor 内存占比的方式,来减少其内存消耗,给 task 提供更多的内存,从而避免 task 的执行频繁触发垃圾回收。

B、高级垃圾回收调优

Java 堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代存放的是短时间存活的对象,老年代存放的是长时间存活的对象。年轻代又被划分为了三块空间,Eden、Survivor1、Survivor2。首先,Eden 区域和 Survivor1 区域用于存放对象,Survivor2 区域备用。创建的对象,先放入 Eden 区域和 Survivor1 区域,如果 Eden 区域满了,那么就会触发一次 Minor GC,进行年轻代的垃圾回收。Eden 和 Survivor1 区域中存活的对象,会被移动到 Survivor2 区域中。然后 Survivor1 和 Survivor2 的角色调换,Survivor1 变成了备用。如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将 Eden 和 Survivor1 中的存活对象,尝试放入 Survivor2 中时,发现 Survivor2 满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。如果老年代的空间满了,那么就会触发 Full GC,进行老年代的垃圾回收操作。

Spark 中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能待在年轻代。不能因为某个 Survivor 区域空间不够,在 Minor GC 时,就进入老年代,从而造成短时间存活的对象,长期待在老年代中占据空间,而且 Full GC 时要回收大量的短时间存活的对象,导致 Full GC 速度缓慢。如果发现,在 task 执行期间,大量 full gc 发生了,那么说明,年轻代的 Eden 区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:

n 降低 spark.storage.memoryFraction 的比例,给年轻代更多的内存空间,来存放短时间存活的对象;

n 给 Eden 区域分配更大的空间,使用 -Xmn 即可,通常建议给 Eden 区域分配预计大小的 4/3;

n 如果使用的是 HDFS 文件,那么很好估计 Eden 区域大小,如果每个 executor 有 4 个 task,然后每个 hdfs 压缩块解压缩后大小是 3 倍,此外每个 hdfs 块的大小是 64M,那么 Eden 区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过 -Xmn 参数,将 Eden 区域大小设置为 4 * 3 * 64 * 4/3。

根据经验来看,对于垃圾回收的调优,尽量就是调节 executor 内存的比例就可以了。因为 jvm 调优是非常复杂和敏感的,除非真的到了万不得已的时候,而且开发者又对 jvm 相关的技术比较了解,那么此时可以进行 eden 区域的调节和调优。对应的 2 个高级参数说明如下:

-XX:SurvivorRatio=4: 如果值为 4,那么就是两个 Survivor 跟 Eden 的比例是 2:4,也就是说每个 Survivor 占据的年轻代的比例是 1/6,所以,可以尝试调大 Survivor 区域的大小;

-XX:NewRatio=4: 新生代(eden+2*s)和老年代(不包含永久区)的比值,4 表示新生代:老年代 =1:4,即年轻代占堆的 1/5。

6、提高 Spark 并行度

实际上 Spark 集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源,才能充分提高 Spark 应用程序的性能。Spark 会自动设置以文件作为输入源的 RDD 的并行度,依据其大小,比如 HDFS,就会给每一个 block 创建一个 partition,也依据这个设置并行度。对于 reduceByKey 等会发生 shuffle 的操作,就使用并行度最大的父 RDD 的并行度即可。

可以手动使用 textFile()、parallelize() 等方法的第二个参数来设置并行度;也可以使用 spark.default.parallelism 参数,来设置统一的并行度。Spark 官方的推荐是,给集群中的每个 cpu core 设置 2~3 个 task。比如说,spark-submit 设置了 executor 数量是 10 个,每个 executor 要求分配 2 个 core,那么 application 总共会有 20 个 core。此时可以设置 new SparkConf().set(“spark.default.parallelism”, “60”) 来设置合理的并行度,从而充分利用集群资源。

7、广播共享数据

如果算子函数中使用到了特别大的数据,那么这个时候推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个 task 中,而是给每个节点拷贝一份该大数据,然后节点上的 task 共享该数据。如此,就可以减少大数据在节点上的内存消耗,并且可以减少数据到节点的网络传输开销。广播共享数据的原理,如下图所示:

8、数据本地化

数据本地化对于 Spark Job 的性能有着巨大的影响,如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark 也正是基于这个数据本地化的原则来构建 task 调度算法的。

数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有如下几种数据本地化级别:

nPROCESS_LOCAL:数据和计算它的代码在同一个 JVM 进程中;

nNODE_LOCAL:数据和计算它的代码在同一个节点上,但是不在同一个进程中,比如在不同的 executor 进程中,或者是数据在 HDFS 文件的 block 中;

nNO_PREF:数据从哪里过来,性能都是一样的;

nRACK_LOCAL:数据和计算它的代码在同一个机架上;

nANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

Spark 倾向于使用最好的本地化级别来调度 task,但是这是不可能的。如果没有任何未处理的数据在空闲的 executor 上,那么 Spark 就会放低本地化级别。这时有两个选择:第一,等待,直到 executor 上的 cpu 释放出来,那么就分配 task 过去;第二,立即在任意一个 executor 上启动一个 task。

Spark 默认会等待一会儿,来期望 task 要处理的数据所在的节点上的 executor 空闲出一个 cpu,从而将 task 分配过去。只要超过了时间,那么 Spark 就会将 task 分配到其他任意一个空闲的 executor 上。可以设置 spark.locality 系列参数,来调节 Spark 等待 task 可以进行数据本地化的时间,最好是采用分层设置的方式,对于最好的本地化级别,可以设置等待的时间长些,毕竟本地化级别越好数据传输的效率也越高。spark.locality.wait(默认 3000 毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。数据本地化的原理,如下图所示:

9、用 reduceByKey 替代 groupByKey

对于单词计数 WordCount 程序来说,同样可以使用 groupByKey 算子来实现对应的功能,但是它的效率会远远低于 reduceByKey,程序代码参考如下:

//reduceByKey 代码:

val counts = pairs.reduceByKey(_ + _)

//groupByKey 代码:

val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))

如果能用 reduceByKey,那就用 reduceByKey,因为它会在 map 端,先进行本地 combine,可以大大减少要传输到 reduce 端的数据量,减小网络传输的开销。只有在 reduceByKey 处理不了时,才用 groupByKey().map() 来替代。groupByKey 算子的原理,如下图所示:

reduceByKey 算子的原理,如下图所示:

10、shuffle 性能优化

大多数 Spark 作业的性能主要消耗在了 shuffle 环节,因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对 shuffle 过程进行调优。但是也必须提醒的是,影响一个 Spark 作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle 调优只能在整个 Spark 的性能调优中占到一小部分而已。因此务必把握住调优的基本原则,千万不要舍本逐末。下面详细讲解 shuffle 的原理,以及相关参数的说明,同时给出各个参数的调优建议。

A、未经优化的 HashShuffleManager

下图说明了未经优化的 HashShuffleManager 的原理,这里先明确一个假设前提:每个 executor 只有 1 个 cpu core,也就是说,无论这个 executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。

从 shuffle write 说起,shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“分类”。所谓“分类”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行 shuffle write 的 task,要为下一个 stage 创建多少个磁盘文件呢?很简单,下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。如果当前 stage 有 50 个 task,总共有 10 个 executor,每个 executor 执行 5 个 Task,那么每个 executor 上总共就要创建 500 个磁盘文件,所有 executor 上会创建 5000 个磁盘文件。由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的。

接着来说说 shuffle read,shuffle read,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,task 给下游 stage 的每个 task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 task 只要从上游 stage 的所有 task 所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read 的拉取过程是一边拉取一边进行聚合。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据拉取完,并得到最终的结果。

B、优化后的 HashShuffleManager

下图说明了优化后的 HashShuffleManager 的原理,这里说的优化,是指可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优化机制。通常来说,如果使用 HashShuffleManager,那么都建议开启这个选项。

开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了。此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件中。

当 executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件。也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效地将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。

假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 executor,每个 executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 executor 会产生 500 个磁盘文件,所有 executor 会产生 5000 个磁盘文件。但是此时经过优化之后,每个 executor 创建的磁盘文件的数量的计算公式为:cpu core 的数量 * 下一个 stage 的 task 数量。也就是说,每个 executor 此时只会创建 100 个磁盘文件,所有 executor 只会创建 1000 个磁盘文件。

C、SortShuffleManager 运行原理

下图说明了普通的 SortShuffleManager 的原理,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此,此时每个 executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

D、bypass SortShuffleManager运行机制

下图说明了 bypass SortShuffleManager 的原理,bypass 运行机制的触发条件如下:

lshuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值;

l 不是聚合类的 shuffle 算子(比如 reduceByKey)。

此时 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

E、shuffle 相关参数调优

以下是 Shuffle 过程中的一些主要参数,这里详细讲解了各个参数的功能、默认值以及基于实践经验给出的调优建议。

spark.shuffle.file.buffer

默认值:32k

参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲区大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲区中,待缓冲区写满之后,才会溢写到磁盘。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 64k),从而减少 shuffle write 过程中溢写磁盘文件的次数,也就可以减少磁盘 IO 次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提升。

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置 shuffle read task 的 buffer 缓冲区大小,而这个 buffer 缓冲区决定了每次能够拉取多少数据。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如 96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有 1%~5% 的提升。

spark.shuffle.io.maxRetries

默认值:3

参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

调优建议:对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数(比如 60 次),以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿 ~ 上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是 5s。

调优建议:建议加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性。

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数代表了 executor 内存中,分配给 shuffle read task 进行聚合操作的内存比例,默认是 20%。

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给 shuffle read task 的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升 10% 左右。

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置 ShuffleManager 的类型。Spark 1.5 以后,有三个可选项:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默认选项,但是 Spark 1.2 以及之后的版本默认都是 SortShuffleManager 了。tungsten-sort 与 sort 类似,但是使用了 tungsten 计划中的堆外内存管理机制,内存使用效率更高。

调优建议:由于 SortShuffleManager 默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的 SortShuffleManager 就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过 bypass 机制或优化的 HashShuffleManager 来避免排序操作,同时提供较好的磁盘读写性能。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当 ShuffleManager 为 SortShuffleManager 时,如果 shuffle read task 的数量小于这个阈值(默认是 200),则 shuffle write 过程中不会进行排序操作,而是直接按照未经优化的 HashShuffleManager 的方式去写数据,但是最后会将每个 task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:当你使用 SortShuffleManager 时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于 shuffle read task 的数量。那么此时就会自动启用 bypass 机制,map-side 就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:如果使用 HashShuffleManager,该参数有效。如果设置为 true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于 shuffle read task 数量特别多的情况下,这种方法可以极大地减少磁盘 IO 开销,提升性能。

调优建议:如果的确不需要 SortShuffleManager 的排序机制,那么除了使用 bypass 机制,还可以尝试将 spark.shffle.manager 参数手动指定为 hash,使用 HashShuffleManager,同时开启 consolidate 机制。在实践中尝试过,发现其性能比开启了 bypass 机制的 SortShuffleManager 要高出 10%~30%。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com