Fork me on GitHub

Spark SQL 在平安产险的应用

导读: 本文将分享 Spark SQL 在平安产险的应用。

主要包括以下几部分:

  • 业务背景
  • 部署方式的选择
  • 迁移流程
  • 典型问题和经验

分享嘉宾|李伟轩 中国平安 大数据开发工程师
编辑整理|唐洪超 敏捷云
出品社区|DataFun


01 业务背景

首先介绍一下业务背景。

1. 业务现状

图片

我们的业务现状是 离线集群在业务任务高峰期会出现 CPU 资源不足,而内存利用率又不高的情况 。如上图监控图表就是凌晨一点到八点的监控图,此时间段是业务的一个高峰期,集群 CPU 资源是占满的,而可用内存还是剩余较多。

2. 业务需求与特点

图片

基于上述现状,我们提出了节省集群资源,提高资源利用率,以及提升集群任务运行时效的业务优化需求。

离线集群的业务特点最主要的一点就是所有任务都是 Hive SQL 任务,且都是高并发的场景。 数据量有 50% 左右达到了百万甚至亿级,30% 是 1 亿到 50 亿,20% 是 50 亿以上的表,数仓任务通过专门的调度平台去定时执行。数据时效都是离线 T + 1 ,也就是基于昨天或者是昨天之前的那些数据去跑,调度平台任务量每天在 10 万以上。

3. 引擎对比

基于现状跟需求,本身我们有做 Hive 的调优,但是效果不是很理想。所以在 Hive 调优的基础上考虑细化任务引擎,从根本上去解决时效和资源问题。下图是三个引擎的对比。

图片

首先是 Hive, Hive 的优势就在于稳定性高,劣势是时效性较低,资源利用率也比较低 ,特别是它的内存利用率低,而且应用场景主要是 SQL 比较复杂、稳定性要求高、数据量巨大的一些离线任务。

Spark 时效性相对 Hive 来说更高, 主要原因是 Spark 消除了冗余的 HDFS 读写和 MR 阶段。其次 Spark 内存利用率也比较高。同时可以节省队列的 CPU 资源。通过实际测试,在 ODS 层中的 Spark 任务,在保证性能提升 70% 到 80% 的情况下,队列资源仅需要原本 Hive 的 30% 到 40%。缺点是 Spark 的稳定性较低,主要体现在运行任务规模比较大的任务时,容易出现 OOM (内存溢出),比如百亿数据量的两张表,做一个 join 或者是 group by 等操作。Spark 适合 SQL 规模相对较小,稳定性要求低,复杂 SQL 场景下数据量较小的那些离线任务。

Presto 是一款完全基于内存的计算框架。其优点是能够低延迟地做查询,查询速度能达到秒级。且它还能支持多个数据源 。它的一个劣势也是因为完全基于内存,导致它对集群的内存资源要求比较高,并且无法做过大数据集的查询。因此它的应用场景主要还是偏向那些实时性要求高的交互式查询。

4. 实现方案

通过对比,最终方案决定采取部分的 Hive SQL 任务切换成 Spark 任务去实现,主要是针对满足以下条件的任务:

  • 数据规模比较小(< 60 亿/<500 GB)
  • SQL 复杂度比较小(单条 SQL 子查询<5层,join 表个数<5)
  • 稳定性要求比较低(下游依赖较少/数据丢失能快速恢复)

其余任务继续使用 Hive SQL 去实现。

02 Spark 部署方式的选择

Spark SQL 的部署模式有以下三种,不同模式有各自的优缺点。

图片

1. ThrifServer 模式

ThrifServer 模式本质上是在多线程场景下的一个应用。 运行的时候一个 ThriftServer 对应一个 application ,所有的 SQL 共享这个 application 的资源,因此它的优点是资源利用率比较高,且参数跟资源的管理成本较低,可以统一配置。并且 ThriftServer 是一个常驻服务,启动后,后续提交 Spark SQL 不需要再临时连接 metastore 启动 application,耗时较少。ThriftServer 的缺点就是它没有高可用,同时它的资源没有隔离, SQL 之间相互会有影响,稳定性相对较低。并且它没有多租户的功能,因此权限依靠服务端用户掌控。

2. Spark SQL 模式

Spark SQL 模式本质就是 Spark submit 提交任务,每提交一个 SQL 任务就会启动一个 application。 因此它的优点是每个 SQL 任务之间相互隔离,互不影响,在稳定性方面相对高很多。缺点则是它的资源利用率比较低。同时也存在启动的开销,因为每提交一次就要去启动 application,在生产上启动 application 耗时一分钟左右。

3. Kyuubi 模式

Kyuubi,自身 Kyuubi server 这个服务就是充当一个 RPC 的代理,接收用户请求,然后提交到这个用户所属的一个 SparkSQL engine 上 。SparkSQL engine 其实就是一个基于 ThriftServer 的实现,相当于 ThriftServer 的一个高可用版本。其优点是它支持高可用,稳定性也比较高,且支持多租户。缺点是从平台管控层面来看,Spark engine 的启动时机,销毁时机。

4. 部署模式的选择

当前我们使用了根据业务特点采用了不同的部署模式:

Spark SQL 模式 :离线批处理任务。这类任务对即时性要求较低,都是每日定时离线运行,任务数据量相对较大,运行时长在10min以上,为保证任务稳定运行,采取了该模式。

Kyuubi 模式 :清单报表查询、开发平台用户查询/开发使用。主要针对的场景即时性要求较高,任务运行时长较短,因此采取Kyuubi的架构,减少服务启动耗时,使用用户级别的引擎隔离。注:由于ThriftServer服务有单点问题、无多租户功能,服务稳定性较差,因此我们使用Kyuubi模式代替ThriftServer模式。

03 迁移流程

下面介绍集群任务迁移的流程,流程中用到了自主开发的一些自动化工具。

图片

首先,在前期做语法兼容性的一个统计 。把相关的解决方案融入到语法兼容性校验工具中,工具会自动识别出是否有兼容性问题,且输出是何种兼容性问题,并提供解决方案供开发人员参考修改。通过校验工具去将归档的 Hive SQL 审计日志来对任务的 SQL 去自动化校验语法。如果不兼容,通过人工去修改复核兼容,此时工具会自动生成测试的 SQL ,这里主要是识别一些映射的语句去替换成映射进临时表,以及在这之前添加创建临时表的语句。

然后,跑测试任务,运行完之后通过工具比较测试任务执行结果表数据和生产上跑出来的表数据的一致性 。最后综合地去评估这个任务是否可以迁移,这一步主要依据包括数据一致性、所需资源、时效提升效果、SQL 的稳定性,以及对任务失败的容忍度。如果在验证阶段是确认要迁移的,就会进入到灰度阶段和上线阶段。

图片

灰度阶段主要是创建相关的演练任务,然后放到调度平台上去跑一周 。演练任务与生产任务双跑,自动统计演练任务的执行时长和资源消耗以及任务的异常运行状态。然后根据演练任务执行情况的统计分析结果,最终评估任务是否能够上线。

04 典型问题和经验

1. ThriftServer 稳定性建设

图片

在 Spark SQL 落地过程中遇到的一个典型的问题就是 ThriftServer 本身稳定性比较低。因此需要针对 ThriftServer 做一些稳定性的建设。

① 缓解 driver 压力

第一部分就是 driver 压力的缓解,因为 driver 本身就要处理 executor 的 RPC 信息,以及各个 task 的 RPC 信息。如果有结果集返回的话,也会 client 到 driver 端。如果有广播表,也会 client 到 driver 端,所以 driver 端的压力会比较大。

针对上述问题,我们采取了以下措施:

  • 增大 Driver 内存;
  • 降低广播 join 阈值,尽量让相对大一点的广播表改用 Spark SQL 执行;
  • 多服务分摊压力,用代理服务的方式,把私域拆分成多个服务分摊压力。比如原本启动一个 ThriftServer 分配的是 800 CPU,可以把它拆成四个 ThriftServer,每个 ThriftServer 分配 200 个 CPU,此时由原本的一个 driver 变成了四个 driver;
  • 结果集分批返回,针对有结果集的任务,可以设定 Spark SQL ThriftServer in in criminic collect 参数,把结果集分批返回,而不是一次性返回,这样可以减缓集群的压力;
  • 限制提交结果集过大 SQL 任务,通过生产实际验证,Spark SQL ThriftServer in in criminic collect 设置对性能影响比较严重,所以暂时我们没有采取这个措施,而是限定用户如果用户要返回较大结果集改用 Spark SQL或者 Hive 的方式去执行任务。然后限制结果及提交任务也是有相关参数,Spark.driver.maxresultsize,这个参数可以在服务端去配置;
  • 最后一个措施就是合理地调控整个 SQL 任务的并发度。

② 运维与治理

在运维与治理方面的措施包括:

  • 服务异常自动拉起;
  • 定期分析日志,识别异常任务,比如含有 fail task 比较多的一些 job 去找出已发生的问题,或者是有没有潜在的一些问题去做优化;
  • 实时预警和监控,将 event log 接入到 Kafka 做实时的预警和监控;
  • 健康检查、告警,配一个基础的健康检查,和相关的邮件告警。

③ 高可用&可拓展

最后因为数据 search 本身是不支持高可用的,所以搭建了代理服务,暴露出代理的地址。代理地址会映射多个 ThriftServer 地址。用户把请求发到代理服务里汇总,然后代理服务转发到 ThriftServer 上去执行。

2. 并行度

在 Spark 应用本身,一个 CPU 一次只能执行一个 task 。那 Spark 应用最大能同时执行的 task 数就是 Spark 应用分配到的总的 execute 数。如果我们运行的 task 数小于为它分配的总 CPU 数,就会有 CPU 空闲,会造成资源浪费,因此 task 数刚好跟并行度有关,task 数量就是该 stage 的并行度,合理提高我们的并行度,也就是增加 task 数,能够使我们的集群资源得到充分利用。

这里有三个需要注意的地方:

  • Spark.sql.shuffle.partitions 针对 Spark SQL 有效,Spark.default.parallelism 是针对 RDD 有效。
  • Spark.default.parallelism 在 shuffle 的时候有效,该参数设置的是 shuffle 的并行度,比如两个表的 join 操作,开始的两个 stage 就 scan 这两个表,这个 stage 的并行度,与这个参数没什么关系,只和表的文件个数以及设置的最大读取分区数有关。
  • 参数配置建议,如果是在 ThriftServer 模式下,服务端其实不需要额外去配置,如果要使资源得到充分利用,对 ThriftServer 模式来说,更应该关心的是业务的并发,假设四月 4 号资源利用率比较低,那就多提交一些 SQL 上去,资源自然会跑满。Spark SQL 模式,建议 application 个数为 CPU 的两到三倍。

3. 合理配置通信与重试参数

图片

合理的配置通信与重试的参数,有助于避免由于一些任务阻塞或者网络波动,导致通信的重试时间过长,一般来说需要减少重试次数,增大交互的超时时间,增大心跳的时间间隔。在生产上有遇到过相关案例,SQL 运行时间比较长,虽然是成功了,但是它运行时间比之前要长很多,定位发现耗时主要是在失败 task 上运行时间比较长,通过查看日志发现是该task 所属的 executor 与 driver 的通信发生了异常。又因为我们原本的 Spark.executor.heartbeat.maxFailures 参数设置的重试次数是 60,Spark.executor.heartbeatInterval 参数设置的是 20s,也就是每次要间隔 20 秒,导致需要 20 分钟才会把这个 executor 给 kill 掉并将这个 task 设置为失败状态,然后才去重跑这个 task,极大的增大了执行时长。

关于 task 的重试次数,不能调太大或太小,一般是 3-5。太小的话容错率会比较低。太大的话,如果发生异常,会拖慢整体的运行进度。比如某个 executor 分配内存可能不足,无论重试多少次都是会失败的。此时最好尽早暴露出此任务是失败的,然后去调整内存分配再去重跑,从而减少这个任务的运行时长。

ThriftServer 中需要调大 application 失败之前的最大 executor 失败次数。这个参数含义为如果 Spark 应用的 executor 挂掉的个数超过了这个值的话,它就会将这个 application 给 kill 掉。例如 ThriftServer 这种常驻服务,它的运行时间比较长,会积累 executor 的失败个数。达到 Spark.yarn.max.executor.failures 参数的值后就会把 ThriftServer 直接 kill 掉。这样一来对任务就会造成影响。所以建议在 ThriftServer 里面调大这个次数,同时定时重启 ThriftServer。

4. Spark-SQL资源预估

图片

首先,每个 Executor 的CPU 个数建议是配置成 nodemanager 的 CPU 与 4取模。

Executor 个数,需要针对任务去做 explain 以及试运行,统计耗时比较久且 stage 较多的 task 数。然后 execute 个数就等于统计的 task 数除以 Spark.executor.cores,再除以 2,其实就是为了提高它的并行度。

第三,每个 Executor 的内存,都用 Spark.executor.cores 去乘以 5G。

最后是 Driver 内存,主要是考虑这个 SQL 是否有结果,有结果集的话先要预估结果集大小,然后根据结果集去配。如果是广播 join 场景,则需要根据广播 join 的阈值,乘以广播表的个数再乘以 2,这样去计算预估。

5. 合理控制广播参数

针对广播参数,要合理地去控制。

图片

本身 Spark 做 join 操作时,我们可以用广播特性将小表广播到各个节点上去,转成非 join 操作非 shuffle 的操作。主要涉及到的参数就是 Spark.sql.autoBroadcastJoin.Threshold,该参数默认是 10 兆,-1 表示不开启广播 join,一般预估标准是根据小表的文件大小以及字段裁剪来判断,但存在预估不准确的情况,主要因为小表的存储格式会有差异。如用 easy lab 压缩的 ORC 跟 test 在存储上就会差很多,即使都是一样格式的,有可能压缩率也会不一样。对字段字节的估算也会有一定误差。且数据读取到内存后会有一定的放大。所以一般参数就是比预估值要大一点。为了避免因为表数据小于阈值后,Spark 选择广播 join,但实际上读取进内存时膨胀过大,导致超出 8g 的限制后异常退出,因此这个参数也不能调太大。

6. 部分兼容问题

① 源表和目标表为同一张表时,insert overwrite 异常

Spark 默认不支持 insert overwrite 自我覆盖的语法。典型的就是拉链表的更新,也就是全量表与增量表执行 join 操作,然后将结果集写回到全量表。

  • 方案一:设置

Spark.sql.Hive.convertMetastoreORC/Spark.sql.Hive.convertMetastoreParquet

为 false,但缺点在于如果任务异常,可能会有数据丢失。

  • 方案二:采用 Hudi/Iceberg 等数据湖来做拉链表更新的业务,特别是针对 ODS 一些拉链表的场景,可以考虑整个架构采用数据湖的技术。缺点是上下游的任务都得做相应的适配,改造成本会比较高。
  • 方案三:将结果数据先 insert overwrite 到临时表,然后删除原目标表,将临时表 rename 为目标表。这个方案的缺点是目标表的权限等元数据信息会丢失。
  • 方案四:在下面划分出多一级目录。如下图,全量 t1 表的路径是/user/Hive/warehouse/t1/executor=01,t1 和 t1_delta 执行完 join 操作后,insert overwrite 到 /user/Hive/warehouse/t1/executor=02 这个路径下。然后通过 alter 操作去把 t1 表的 location 指向 /user/Hive/warehouse/t1/executor=02 路径。这样操作的好处是,即使任务失败,数据也不会丢,主要的缺点是 metastore 服务的压力会增大。

图片

② 类型转换的异常

在 Spark 3.x 版本中,强制类型转换是按照 ANSI SQL 标准执行,如果出现了标准中定义的不合理的转换(如 string->int、double->boolean、decimal->double 等),将会抛出异常。

常见场景:insert 时源表和目标表字段类型不一致(Spark 自动触发转换)

解决方案:set Spark.sql.storeAssignmentPolicy=LEGACY;

参数值:

  • ANSI。默认值,会按 ANSI 标准做转换,不符合标准的将抛出异常
  • LEGACY。允许类型强制,只要它是有效的 Cast
  • STRICT。不允许任何可能的精度损失或数据截断

7. 迁移效果

整体来看队列资源使用能降低 30% 到 60%。其中 BDAS 业务线,迁移前队列需要 6000 CPU core,迁移后只需要 2000 CPU core 就能够支撑了。

  • 在 Spark SQL 模式下一般都能提升 65% 到 90%

图片

  • 在 ThriftServer 模式下的业务提升 70% 到 90%

图片

|分享嘉宾|

图片

李伟轩

中国平安 大数据开发工程师

平安产险大数据开发工程师,本科毕业于广东工业大学,先后供职于华为和中国平安,在离线计算领域有多年经验,目前致力于平安产险离线业务由hive sql迁移spark sql的应用与优化。


本文地址:https://www.6aiq.com/article/1674629446079
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出