Fork me on GitHub

Blaze:SparkSQL Native 算子优化在快手的设计与实践

以下文章来源于 https://zhuanlan.zhihu.com/p/630869130

导读: 本次分享的主题是 Blaze:SparkSQL Native 算子优化在快手的设计与实践。当前,Spark 由于其相比 Hive 更强大的性能,已经成为大部分公司的主要执行引擎。随着业务的发展以及数据规模的不断提升,我们对 Spark 性能提升的追求也一直没有停止过。Spark 性能提升主要来自两大方向:执行计划本身的优化、运行时执行效率的优化。

本次分享主要从以下三部分展开介绍:

  1. Blaze 是什么

  2. Blaze 的整体架构及实现细节

  3. Blaze 的当前进展及未来工作


分享嘉宾|王磊博士 快手 大数据 SQL 引擎负责人

编辑整理|刘鹏鹏 嘀嘀

出品社区|DataFun


01/Blaze 是什么


1. Spark 的发展历程



(1)Spark 1.0

在 1.0 阶段,采用的解释执行模型,每个算子自成一个函数,函数内部根据具体的数据类型选择程序执行分支。解释执行模型最大的问题是效率比较低,存在大量的分支判断检查、无法充分利用 CPU 的指令流水、编译器难以优化、CPU 缓存不友好等问题,导致其执行效率很低。

(2)Spark 2.0

到了 Spark 2.0 阶段,Spark 社区推出了多算子编译的概念,其基本思想是将几个简单算子的计算逻辑组织到最近的一个复杂算子中,通过减少运行时函数调用数量的方式来避免大量的函数调用开销。多算子编译功能在 Spark 2.0 之后已经作为 SparkSQL 的默认执行模式,称为 WholeStageCodegen 技术。相比于 1.0 阶段,它是在运行时执行效率上的优化。

(3)Spark 3.0

到了 Spark 3.0,引入了一个很重要的功能 Adaptive Query Execution(AQE 自适应执行引擎),其核心改变是从之前在编译阶段静态地生成执行计划,变为可以在运行时对执行计划动态地优化,这个优化点可以归结为执行计划本身的优化。

(4)Spark 接下来的优化方向

向量化执行是其中一个非常重要的方向,这也基本上是业界的共识。向量化执行的思想是将算子的执行粒度从每次处理一行变成每次处理一个行组,以此来避免大量的函数调用。通过对行组内部处理按列进行计算,同时利用编译技术减少分支判断检查以及更多的 SIMD 优化执行计划。

2. Spark 向量化执行的相关进展

在向量化执行这块,已经有不少公司对其进行了探索:



**(1)Velox:**由 Meta(Facebook)公司开源的 C++ 的向量化执行引擎库,其本身并不是一个端到端的面向用户的 SQL 执行引擎,其输入是一个逻辑执行计划,输出是执行结果。Velox 可以内嵌到其他执行引擎来进行运行加速;

**(2)Gluten:**由 Intel 开源的用于解耦 Spark JVM 和 Native 执行引擎的项目,通过把 Spark 的物理执行计划转化为对应的 Substrait plan,然后再传递给底层的 Native 执行引擎,正好可以结合上面的 Velox 使用;

**(3)Photon:**由 Databricks 推出的下一代执行引擎,也是用 C++ 实现的,本身复用了 Spark 的逻辑执行计划,物理执行计划层面在 task 级别判断哪些算子是可以替换为向量化执行引擎来进行加速;

**(4)Native Codegen:**是由阿里巴巴推出的,以 stage 粒度采用向量化执行,通过 Weld + WholeStageCodegen 的方式,动态生成 Stage 的 LLVM IR,最终经历 LLVM compiler 产生会 Stage 对应的 Native code。

3. Blaze 是什么



在快手内部,我们很早之前也已经开始对 Spark 引擎的向量化执行进行探索,经过两年多的持续迭代,目前已经初步具备了上线使用的能力,这就是今天要和大家分享的 Blaze 项目。

简单说 Blaze 是一个基于 Apache DataFusion 项目封装的向量化执行引擎中间件,通过该组件既能够充分发挥 Spark 分布式计算框架的优势,同时也能够利用 DataFusion Native 向量化算子执行的性能优势。

4. Blaze 和 DataFusion 的关系



Blaze + DataFusion 架构整体类似于前面提到的 Gluten + Velox 组合,它们属于相同架构下的两套不同技术实现方案。Blaze + DataFusion 是基于 Rust 编写,而 Gluten + Velox 是基于 C++ 编写的,在物理执行计划传递部分各自采用了不同的格式 ProtoBuf 和 Substrait,在 Native 执行层都采用了 Apache Arrow 作为内存数据格式。

--

02/整体架构及实现细节


1. Spark On Blaze 架构的整体流向



上图是 Spark on Blaze 的整体架构,左边绿色部分是当前 Spark 原生的执行流程。在右边黄色部分,通过引入 Blaze 的 Extension 组件,把 Spark 生成的物理执行计划转换为对应的 Native 执行计划,并且通过 JNI 的方式把对应的执行计划最终传递给底层的 DataFusion 执行。DataFusion 项目采用列式计算,并对主要的表达式计算实现了 Native 向量化支持,经过翻译后的 Native 算子执行效率远高于 Spark 原生 JVM 执行。

对于各种向量化实现方案来说,因为要兼容当前的应用场景,所以都要面对同现有执行框架打通的问题,并且要兼顾到当只有部分算子支持向量化计算的时候,在一个作业内部如何能够融合 Spark 原生的行式执行和向量化的列式执行。

Native 算子和 Spark 原生算子之间的交互,是通过 Arrow 的 FFI(Foreign Function Interface)来实现传递的。Native 算子本身是基于 Arrow 格式的列存,Spark 原生算子是 InternalRow 行式存储,所以转换过程中会引入各种行列转换的开销。我们在 Blaze 的 SessionExtention 组件中实现了一套基于规则的转换策略,如果预期某一部分操作转换为 Native 算子后引入的行列转换开销会降低最终执行的效率,那就不会对它进行转换,以此来保证在部分算子未完全覆盖的场景下执行效率也至少不会低于原生的 Spark 引擎。

2. 执行计划转换示例



这是一个执行计划转换的 DEMO,其中可以支持转化到 Native 执行的部分,我们就把这部分的物理执行计划转换为对应的 Native Plan,如果中间存在 Native 和原生 JVM 的转换,我们会执行行列互转的步骤。

3. 四大核心组件



根据前文提到的整体架构,核心可以抽象出以下四个组件:

**(1)Blaze Session Extension:**主要负责控制整个 Spark 执行过程中算子的检查和转换操作;

**(2)Plan SerDe:**通过 protobuf 来对 DataFusion 执行计划进行序列化和反序列化;

**(3)JNI Gateways:**通过 JNI Gateways 实现数据和执行计划的传递转换;

**(4)Native Operators:**将 Spark 的执行计划映射到 Native 引擎的具体执行。

具体的执行过程可以分为三个部分:

**(1)物理执行计划的转换:**把 Spark 生成的物理执行计划转换为对应的 Native Plan;

(2)Native Plan 的生成和提交;

(3)Native 真正的执行。

下面将逐一展开介绍。

4. 物理执行计划的转换



在物理执行计划转换部分,我们通过 Spark 3.0 引入的 Session Extension 机制实现了 Blaze Session Extension。在生成物理执行计划到每个 Stage 实际执行之前,通过 Blaze 的 Extension 组件对整个 Stage 中的所有算子进行检查和翻译。

由于工作进度的原因,会有部分算子还没有支持、无法使用 Native 执行。对于这部分算子就需要在算子前后加入列转行和行转列的算子,然后通过 FFI 实现数据在 Native 和 JVM 之间的交换。

为了尽可能减少行列互转带来的性能开销,我们实现了一个基于规则的策略来决策部分算子是否需要翻译。我们当前在算子层面还没有支持 Generate 算子,假设 Generate 后还有 Filter 算子,如果直接按照算子支持粒度自动转换的话,Generate 由于不支持自动转换,会先生成行存格式的数据,Filter 算子因为已经支持了,会自动把它转为列存然后再进行 Filter,这样开销就会很大。这种情况我们采取的优化策略是在 Generate 完之后先不转为列存,而是先进行 Filter 操作,在接下来执行的 Native 算子之前再行转列,这样行转列的操作只对 Filter 之后很少量的数据生效,可以减少很多行转列的开销。

5. Native Plan 的生成和提交



在对物理执行计划完成算子的检查和翻译后,需要进一步生成 Native 的执行计划。

我们使用 ProtoBuf 来描述 Native 的执行计划,其中包含了需要执行的所有算子、算子间的依赖关系、算子所需要的执行参数(如文件路径、Shuffle read 地址、Project 表达式等)。

我们所有的 Native 算子都需要实现 doExecuteNative():Native RDD 接口,生成的 Native RDD 包含了本算子以 protobuf 描述的 Native 执行计划和 Partition 信息,同时提供它真正的 Compute 方法,将执行计划通过 protobuf 序列化,使用 JNI 提交到 Blaze native engine 开始实际的执行。

6. Native 的执行



在完成了 Native 执行计划的生成后,native broadcast/shuffle、C2R、rdd.collect 操作都会触发 NativeRDD.compute(),从而调起 native 计划的生成、提交、执行。

执行过程首先将 protobuf 字节序列在 native(rust) 端反解出来,转换为 DataFusion 的 ExecutionPlan 结构,并使用 DataFusion 框架异步执行整个 ExecutionPlan。在 JVM 端我们会维护一个监听线程来接收 Native 执行的状态和输出结果。

以上就是整体的执行流程。

在具体实现上,除了利用 DataFusion 已有的基础能力来完成 Spark 常规算子和数据类型的 Native 执行外,还有一些比较重要的实现和优化点需要特别考虑,如 UDF 的兼容、内存管理以及特定算子的更高效实现。

7. UDF 的兼容



如果只是单纯跑 TPC-DS 的话,不会包含太多的 UDF 调用,但我们最终要把这个能力上线到实际应用中,就要考虑到对大量业务 UDF 调用的兼容。

直接把现有的业务 UDF 放到 Native 端重新实现一遍,成本过于巨大,因此我们需要实现在 Native 端对 Java UDF 的兼容。

目前采用的方式是在 Native 端遇到 UDF 时,将整个数据打包成 Arrow RPC 格式,传回到 JVM 端进行计算,在计算完之后再将计算结果传回 Native 端,这些操作会涉及到数据序列化和 JNI 调用的额外开销。由于 Native 端是采用列式计算的优势,单次操作可以处理批量的数据,一定程度上能够在把这些开销均摊之后,最终对执行效率并不会有太大的影响。

8. 内存的管理



在 Native 执行部分的内存使用上是没有办法被 Spark 的内存管理组件来很好地管理的,所以针对一些特定场景会出现内存不足导致任务执行失败的问题。

我们在 Blaze 重点对 Shuffle、Sort、Aggregate 这三个 Native 算子实现了内存管理,支持将中间数据 Spill 到文件,处理完所有的数据之后,再通过外部排序的方式合并所有 Spill 产出的计算结果。使得在输入数据量大、内存不足的场景下,依然能够正常的完成计算。

由于 JVM 内存模型限制,Blaze Native 代码只能直接申请堆外内存。在我们的使用场景中,Spark 的作业配置主要是堆内+堆外模式,并且堆内配置通常会远大于堆外,为了防止堆外内存过小造成 Native 算子频繁 Spill,同时也将大空间的堆内内存利用起来,我们实现了二级的 Spill 策略:

① 在堆外内存超限、Native 算子触发 Spill 的时候,会在堆内开辟一块空间,通过 JNI+ NIO 方式将需要 Spill 的数据直接写入堆内,这样就能够把堆外的空间释放出来;

② 在堆内就可以通过 Spark 本身的 MemoryManager 管理由堆外写进来的内存块,同时实现 Spill 接口,复用 Spark 本身的内存管理功能,支持在堆内空间也不足的时候,将这部分内存块写到磁盘文件。

9. 其他优化实现

前面提到 DataFusion 项目本身提供了很多适配 Spark 功能的算子,直接接入即可使用。为了获得更好的性能,我们也对一些特定的算子进行了深度的定制优化,比如 Sort、Aggregate、SortMergeJoin、Shuffle 等。

**(1)Comparasion operation:**使用了 arrow-rs 项目中的 arrow-row 模块,支持将多个需要比较的列序列化为 byte[] 格式的行数据(类似于 Spark 中的 UnsafeRow,但是可以直接用于比较大小),使得在排序、SMJ 等场景进行行与行之间的大小比较时可以忽略数据类型,直接比较字节序列;

**(2)Sort operation:**Sort、SortShuffle、Agg 等算子都需要进行大量的排序操作,我们使用 rust 自带的 sort_unstable 进行内存排序,它内部实现使用了更先进的 PDQSort 算法,相比于 JDK 自带的排序算法效率更高;同时在进行外部排序的时候使用 TournamentTree 结构进行结果归并,也比 Spark ExternalSorter 中使用 PriorityQueue(二叉堆)有更高的效率;

**(3)HashMap:**在 Agg 算子中,我们使用 hashbrown 进行 hash 表的维护,它是 google SwissTable 的高效实现,在性能和内存开销上都有很大的优势;

**(4)Columnarized Shuffle:**在 Shuffle 数据这块,我们一开始把 Shuffle 数据进行列存转换的时候采用的是 Arrow-RPC 格式,但在实际过程中我们发现一些不符合预期的问题,理论上把 Shuffle 数据转为列存后压缩率更高、压缩后的数据量会更小、Shuffle 执行效率会提升。但是我们在实际测试中发现并不是这样,很多场景出现了压缩率不及预期反而影响性能的情况。

我们在排查的过程中发现,在处理字符串列的时候,Arrow-RPC 存储的是所有字符串拼接在一起的 Byte 数组,加上每一行在 Byte 数组里的偏移量。而偏移量是 Int32 格式的数据,大部分的数据前三个字节是相同的。而 Shuffle 数据压缩所采用的 ZSTD 算法默认的压缩级别处理不了三字节重复串,导致这部分数据实际上不会压缩生效。所以我们实现了自定义的格式,通过把字符串存储改成按顺序写入每行的长度和字符串字节数组,并且使用前缀编码的处理方式,让长度在大部分 case 下只占 1~2 字节,这样就能够很好的触发 ZSTD 的压缩。

10. 对社区的贡献



这是我们所做的一些深度的定制优化,一方面我们用到了 DataFusion 提供的很多适配 Spark 功能的算子,可以直接接入。另一方面我们内部开发过程中也发现 DataFusion 当前还有一些能力是待完善的,所以我们也把很多的代码反馈给了社区,包括前边提到的内存管理机制、支持 HDFS 读写的 Remote Storage API、以及各种关于 sort 的深度优化。

--

03/当前进展及未来工作


1. 算子的覆盖度



在算子覆盖度方面,我们的开发工作基本是按照线上使用频率来推进的,最常见的算子基本都支持;还有一些算子虽然支持了但还有一些待完善的地方,如 FileSourceScan,由于我们 80%+ 的数据是采用 Parquet 格式存储,所以当前支持了 Parquet 格式,而文本、ORC 等格式暂时还没有支持;对于 SortMergeJoin 这块,我们暂时还不支持 Post Filter,当然在列存下要实现这个本身也比较困难;BroadcastHashJoin 这种算子本身支持了,但暂时还不支持在 Driver 端构建 HashMap 和内存管理这些操作。

还有一些算子虽然暂不支持,但有些已经开发完成,如复杂数据类型 Struct、List 这块,目前正在进行全面的测试,后续很快也能支持。

2. 基准测试



这是我们在测试环境 80 个 Task 并发的情况下,在 TPC-DS 1TB 数据下的测试性能对比,当前能够跑通所有的 TPC-DS 查询,并且对于单个查询最高能达到 10 倍的性能提升,整体平均下来也有 2 倍以上的性能提升。

3. 线上收益



除了在 Benchmark 上跑通之外,最重要的目标还是能够在线上落地并拿到收益。这是我们在内部生产环境的实际落地情况,目前已经在一部分生产 ETL 任务上灰度上线了这个功能。

当然因为在前期上线会相对比较谨慎,所以我们采用线下双跑的方式,首先确认它不会引入数据一致性问题,因为把数据算错是一个非常大的问题。其次,我们目前在对 SQL 类型上有针对性地筛选了一些计算密集型任务,以及目前支持的比较好的算子的 SQL。

目前已取得了线上实际的收益,单个任务最大可以达到四倍多的性能提升,平均也已经达到两倍以上的性能提升。

4. 未来工作



我们未来的工作计划主要在以下四部分:

(1)持续地覆盖当前还不支持的算子和数据类型;

(2)今年会大规模推广 Blaze 在内部的上线,整个离线数仓都会在业务无感知的情况下,把当前的执行引擎从原生 Spark 切换到 Spark + Blaze;

(3)现在 Blaze 开发是和 Spark 引擎以及 DataFusion 深度定制的,后续会进一步把一些接口抽象出来,以支持更多的引擎;

(4)回馈开源社区,像之前在 DataFusion,我们已经把很多工作反馈给了社区,同时我们的 Blaze 也已经在社区开放了一个初步可用的版本,当然后续还会持续地把最新的一些优化反馈给社区,也欢迎大家参与到 Blaze 社区的建设中。

今天的分享就到这里,谢谢大家。



▌2023亚马逊云科技中国峰会


2023年6月27-28日9:00-17:00,2023亚马逊云科技中国峰会将在上海世博中心举办。

本次峰会将会分享数百个技术话题与最佳实践,覆盖汽车、制造、金融、医疗与生命科学、电商、游戏、泛娱乐、电信、教育、数字化营销等领域。

下面给大家预告一些精彩议题 报名参会,请点击"下方链接"。
2023年亚马逊云科技中国峰会 - 因构建_而可见

|---------------------|-------------------------------|
| 大数据方向议题 | 算法方向议题 |
| 下一代"智能湖仓"架构演进 | 玩转Stable Diffusion模型的微调与提示词工程 |
| 数据合规与云上安全架构构建实践 | 智能搜索技术在金融行业的应用 |
| 敏捷数据分析架构详解 | 基于开源LLM模型如何快速构建类ChatGPT应用? |
| 云原生数据库最佳实践 | 大语言模型(LLM)驱动的AIGC应用架构解密 |
| 智慧医疗: 本地化与全球化精选案例合集 | 生成式AI在游戏行业的应用 |
| 技术人员如何抓住风口获取成功? | AIGC在互联网行业与传统行业的应用与创新案例 |



本文地址:https://www.6aiq.com/article/1684655112016
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出