Spark 宽依赖 窄依赖 Job Stage Executor Task 总结



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

79284fc78ea1414bbcaa1bc5422a1700.png

宽依赖与窄依赖

窄依赖(narrow dependency)和宽依赖(wide dependency, 也称 shuffle dependency).

  1. 窄依赖是指父 RDD 的每个分区只被子 RDD 的一个分区所使用,子 RDD 分区通常对应常数个父 RDD 分区(O(1),与数据规模无关 ), map/filter 和 union

  2. 相应的,宽依赖是指父 RDD 的每个分区都可能被多个子 RDD 分区所使用,子 RDD 分区通常对应所有的父 RDD 分区(O(n),与数据规模有关 ), join

宽依赖和窄依赖如下图所示:

476685204a7644a2b6d60d2ed8e5604a.png

相比于宽依赖,窄依赖对优化很有利 ,主要基于以下两点:

  1. 宽依赖往往对应着 shuffle 操作,需要在运行过程中将同一个父 RDD 的分区传入到不同的子 RDD 分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父 RDD 的分区只会传入到一个子 RDD 分区中,通常可以在一个节点内完成转换。

  2. 当 RDD 分区丢失时(某个节点故障),spark 会对数据进行重算。

    1. 对于窄依赖,由于父 RDD 的一个分区只对应一个子 RDD 分区,这样只需要重算和子 RDD 分区对应的父 RDD 分区即可,所以这个重算对数据的利用率是 100% 的;

    2. 对于宽依赖,重算的父 RDD 分区对应多个子 RDD 分区,这样实际上父 RDD 中只有一部分的数据是被用于恢复这个丢失的子 RDD 分区的,另一部分对应子 RDD 的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子 RDD 分区通常来自多个父 RDD 分区,极端情况下,所有的父 RDD 分区都要进行重新计算。

    3. 如下图所示,b1 分区丢失,则需要重新计算 a1,a2 和 a3,这就产生了冗余计算 (a1,a2,a3 中对应 b2 的数据)。

b8b193420285452fb728b73e1da7e2db.png

Job & Stage & Executor & Task

  • Job: A job is triggered by an action, like count()or saveAsTextFile(). 由 action 触发

  • stage: 一个 job 会被拆分为多组 Task,每组任务被称为一个 Stage, Stage 包含两类:

    • ShuffleMapStage in Spark

      ShuffleMapStage 是在 DAG 中 属于中间层的 stage, 会产生 map output file 的 write disk 操作,提供给下一个 stage 的 task 使用,并且增加了网络 IO,跨 stage 会是一种高昂的开销,很多情况下应尽量避免, 另外ShuffleMapStage 可以被多个 job 共享!

    • ResultStage in Spark

    action 操作会包含在 ResultStage 里,是 DAG 中最终的 stage, 里面的 task 会计算最终的 action 结果
    53025fc4db434c39af057427657417d7.png

d67fc206b2d64619bfb180e2cfbfbc1a.png

bbefc3c9b64448a69ff70e00b924e927.png

stage 内只会存在窄依赖, stage 间存在宽依赖,一定存在 shuffle 过程!

  • 在 Spark 中有两类 task:
    • 一类是 shuffleMapTask
    • 一类是 resultTask
      第一类 task 的输出是 shuffle 所需数据,第二类 task 的输出是 result,stage 的划分也以此为依据,shuffle 之前的所有变换是一个 stage,shuffle 之后的操作是另一个 stage。比如
rdd.parallize(1 to 10).foreach(println)

这个操作没有 shuffle,直接就输出了,那么只有它的 task 是 resultTask,stage 也只有一个;如果是

rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println)

这个 job 因为有 reduce,所以有一个 shuffle 过程,那么 reduceByKey 之前的是一个 stage,执行 shuffleMapTask,输出 shuffle 所需的数据,reduceByKey 到最后是一个 stage,直接就输出结果了。如果 job 中有多次 shuffle,那么每个 shuffle 之前都是一个 stage. 会根据 RDD 之间的依赖关系将 DAG 图划分为不同的阶段,对于窄依赖,由于 partition 依赖关系的确定性,partition 的转换处理就可以在同一个线程里完成,窄依赖就被 spark 划分到同一个 stage 中,而对于宽依赖,只能等父 RDD shuffle 处理完成后,下一个 stage 才能开始接下来的计算。之所以称之为 ShuffleMapTask 是因为它需要将自己的计算结果通过 shuffle 到下一个 stage 中
5e15d1f9241444b7ae2e0efd62c805c8.png

因此 spark 划分 stage 的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个 stage;遇到窄依赖就将这个 RDD 加入该 stage 中。因此在图 2 中 RDD C,RDD D,RDD E,RDDF 被构建在一个 stage 中,RDD A 被构建在一个单独的 Stage 中, 而 RDD B 和 RDD G 又被构建在同一个 stage 中。

参考链接
[1] https://data-flair.training/blogs/spark-stage/


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com