Fork me on GitHub

B 站基于 StarRocks 构建大数据元仓和诊断系统

以下文章来源于 https://zhuanlan.zhihu.com/p/671882091

导读 今天的介绍会围绕下面四点展开:

  1. 大数据元仓背景

  2. 元仓技术选型及方案

  3. 元仓与诊断效果

  4. 总结与未来规划

分享嘉宾|杨洋 bilibili 高级开发工程师

编辑整理|马信宏

内容校对|李瑶

出品社区|DataFun

01大数据元仓背景



首先介绍一下大数据元仓的背景,在大数据引擎运行的过程中,由于缺乏一些运行时的切面数据,我们难以实时观测引擎的运行情况。另外,由于缺乏作业维度的统计信息,我们也难以推动用户对大作业进行治理。为了满足这些需求,B 站构建了大数据元仓系统。

大数据元仓主要指的是内部离线的一些大数据组件,例如 Yarn、Presto、Spark 等。未来,还计划将 StarRocks 的相关信息纳入到大数据元仓的体系中。



先以 Presto 元仓为例,来介绍一下整个元仓的结构和功能。对于 Presto 元仓,主要分为三个维度进行分析:集群维度、队列维度和 query 维度。

在集群维度下,可以细分为以下几个方面:

  • 节点汇总信息:主要包括节点资源情况,例如 CPU、内存、扫描数据量等。
  • 集群 query 汇总信息:主要包括各种 query 状态的统计信息,例如正在运行的作业数量、失败的作业数量、已完成的作业数量等。

队列维度主要展示队列的一些维度信息,包括队列的资源信息和水位信息。而 query 维度则是对前面的query汇总信息的一个补充。通过 query 维度,可以获取更详细的信息,例如查询的失败次数,但有时候除了知道失败次数之外,我们还想具体了解导致查询失败的异常情况。

02元仓技术选型及方案

接下来介绍元仓技术选型。



首先简单介绍一下内部监控架构。其基于 Prometheus 搭建,主要包括三个模块:

  • 数据源层:各个组件通过 export 方式暴露其指标数据。这些指标数据可以是应用程序、服务或者系统的性能指标。
  • 数据归集与处理层:Prometheus 通过 HTTP Pull 的方式从各个 exporter 拉取其对应的指标数据,并将其存储到时序数据库中。
  • 数据应用层:通过 Prometheus Web UI 使用 PromQL 语句向用户展示监控指标数据。此外,还配置了一些阈值来触发监控报警,并发送相应的通知信息。


Prometheus 存在的缺陷是其存储数据量有限,不适合存储长时间的历史数据。通常,Prometheus 存储的数据量仅为一两个月的数据。此外,Prometheus 是基于度量的系统,更多地用于展示趋势性数据,例如集群的 CPU 和内存情况等。但是,对于像元仓这样需要下钻到具体明细数据的需求,Prometheus 则难以满足。

基于以上问题,我们打算设计一个新的架构来构建大数据元仓。我们的大数据元仓应该满足以下特点:

  • 实时观测:能够实时观测到集群的指标数据,并在多维分析场景中实现秒级或亚秒级的查询返回。
  • 复杂逻辑计算:支持复杂的逻辑计算,不需要将数据落库后打成大宽表的形式。有较高的灵活性,以便后期满足不同的需求,并在现有逻辑的基础上进行处理和分析。
  • 存储及回放:能够存储半年甚至更久的数据,并支持数据的回放。


基于以上需求,我们进行了市场调研,发现了一些比较热门的技术,例如数据湖技术。数据湖是一种流行的架构模式,其中一些典型的代表包括 Iceberg、Hudi 和 Delta Lake,它们都建立在 Hadoop 生态系统之上,使用更加廉价的机器来进行大规模数据的存储,通过多副本机制保证其高可靠性。

相对于现有的 Hive 数仓技术,数据湖技术具有更强大的功能。数据湖技术可以支持 ACID 事务,支持 Schema evolution,并为用户提供更好的表格格式。随着数据湖技术的发展,它还支持更丰富的索引,以实现 Data skipping,从而提高查询引擎的查询能力。此外,通过聚类等技术,数据湖可以更好地组织和编排文件格式,以提高查询性能。

除了数据湖技术,我们还调研了市面上一些主流的数仓技术,如 ClickHouse 和 StarRocks。传统的数据湖技术可能在时效性方面存在一定的不足,而像 Hudi 和 Iceberg 等技术虽然可以达到分钟级别的时效性,但要实现秒级别的时效性可能仍然存在一些技术难题。

第二点是关于数据湖和数仓技术的远程 IO 成本。相对而言,数据湖的远程IO成本可能会较高,因为数据湖技术相对不够成熟,一些索引的优化可能还不够完善,导致查询成本相对较高。而数仓技术更多地采用本地 IO,可以更有效地减少远程 IO 的开销。在数仓技术中,有一些成熟的加速手段,例如通过物化视图和索引等方式来提高查询性能。相对于数据湖技术,数仓技术在这方面更加成熟。



基于上述分析,目前更倾向于以数仓技术作为大数据元仓技术的底座,关于采用 StarRocks 还是 ClickHouse,接下来从六个维度进行比较,包括标准 SQL 支持和性能方面。

首先,StarRocks 支持标准 SQL,并兼容 MySQL 协议,这对于应用程序迁移来说是一个优点。而 ClickHouse 在标准 SQL 方面并不完全支持。

其次,从性能方面来看,StarRocks 的读写性能都较好,而 ClickHouse 在单机性能方面可能更强大。StarRocks 是一个全面的向量化引擎,通过多机多核的方式提高并发能力。而 ClickHouse 的并发能力相对较弱,默认的 QPS 大约为 100。在 join 操作方面,StarRocks 的支持较好,而 ClickHouse 的 join 能力相对较弱,通常需要将数据导入 ClickHouse 并进行查询。

从运维的角度来看,StarRocks 相对于 ClickHouse 更为方便。StarRocks 不依赖第三方组件,主要通过 FE 和 BE 节点来构建集群。如果出现资源不足的情况,可以很容易地对 FE 和 BE 进行横向扩展,因此其运维成本相对较低。而 ClickHouse 依赖于第三方组件,如 Zookeeper 来构建集群,因此相对于 StarRocks,其运维成本会更高。

从社区的角度来看,StarRocks 是由国人主导的国产MPP数据库,其社区活跃度相对较高。在我们进行 StarRocks 的调研和测试时,如果遇到问题,社区往往能够快速给出建议和回复。



根据以上分析,我们更倾向于选择 StarRocks 作为大数据元仓技术的底座。为此,这里进行了 StarRocks 内外表与内部 Presto 集群的性能比较,使用了 TPCH 数据集,并随机选择了一些 SQL 进行性能测试。

图中橙色线表示 StarRocks 外表的查询,灰色线表示 Presto 的查询。可以直观地感受到,相对于 Presto,StarRocks 具有更强大的查询性能。据计算,StarRocks 的外表查询时间相对于 Presto 缩短了大约 70% 至 80%。此外,还比较了 StarRocks 内表和外表的查询性能。发现 StarRocks 内表的查询性能比外表更强大。这可能是由于 StarRocks 内表的列表划分为本地 IO,以及各种文件组织格式和索引的查询优化手段所致。

除了查询性能,我们还关注计算引擎的资源消耗。因此,还比较了 StarRocks 和 Presto 的查询资源消耗。



这里特别说明一下,以上对比主要采用内表,主要原因是,StarRocks 的内表在稳定性和性能方面更好,并且在市场上使用内表的情况更为普遍。另外,StarRocks 的外表查询技术可能还不够成熟。考虑到我们的元仓场景更倾向于使用内表进行查询,因此采用内表进行了资源、内存和 CPU 方面的比较。总体而言,相对于 Presto,StarRocks 的资源消耗更小。



在元仓架构上,我们倾向于 StarRocks 作为技术底座,提供存储和查询能力。架构主要包含采集、存储计算及数据服务。我们使用采集模块收集各个集群如 Presto、Yarn 的指标,并将其推送到 Kafka。为了实现这一功能,内部实现了一个代理(agent),该代理封装了从采集器(collector)将数据推送到 Kafka 的逻辑。

对于业务方来说,他们不需要关心底层如何将数据推送到 Kafka,只需要按照规范实现一个 collect 接口,通过实现 collect 方法来收集对应的业务逻辑。具体的推送工作由代理(agent)来处理。一旦数据被写入 Kafka,便可以使用 Routine Load 或 Flink 的方式进行导入。对于 StarRocks 集群中的数据,不会直接对外暴露查询接口,而是通过一个数据服务层(data service)来解耦用户和 StarRocks 之间的关系,并通过数据服务提供对外的查询接口。



在数据导入方面,StarRocks 通常有两种方式:Routine Load 和 Flink。Routine Load 是 StarRocks 自带的一种导入作业方式,可以消费 Kafka 数据并将其写入 StarRocks。使用 Routine Load 相对来说比较简单,用户只需要创建一个 Routine Load 作业,并指定列、Kafka 主题以及一些分区信息即可进行数据消费和写入 StarRocks。所以对于新业务我们首推 Routine Load 方式。

对于存量数据,用户可能已经在 Kafka 端采集了一些度量指标,此时让用户按照之前定义的规范重新将数据写入 Kafka 可能并不合适。对于一些特殊的业务逻辑,Routine Load 可能无法满足需求,这时就需要用到 Flink 来处理。

相比 Routine Load,Flink 通过编码的方式更加灵活,特别适用于处理复杂的多表关联查询。然而,由于 Flink 即使是对于简单的表也需要进行编码,这对于一些不常开发代码的用户来说可能会增加上手成本。因此,在内部我们会将 Routine Load 与 Flink 结合使用。

另外,由于 Routine Load 是 StarRocks 自带组件,通常需要自行完善配套监控和告警。我们线上遇到过一些 Routine Load 的问题。比较常见的是遇到了脏数据导致处于 RUNNING 状态的作业转变为了 PAUSED。对于这种情况,可能需要对 Routine Load 作业进行修改,重新拉取数据并进行处理。但我们也遇到过特殊 Case,发现 Routine Load 无法重新恢复。原因是它的 Kafka 偏移量已经小于当前线上 Kafka 主题的起始偏移量,导致无法将数据拉取到最新状态。对于这种情况,目前只能与业务方进行沟通,是否可以接受一定程度的数据丢失,我们会在后期通过添加监控和告警从而来完善这一部分。

关于 Flink 的监控和告警,公司已经有完备的流计算平台,可以提供相关能力。举个例子,如果线上的一个 Flink 作业由于某些原因失败,它会自动拉起并重试以及通过邮件等方式来告知用户。常见的失败原因为遇到了脏数据。对于这种情况,通常是代码健壮性问题,缺少对脏数据的处理。可以在平台上设置检查点下线作业,待代码修复后,从检查点进行恢复进而保证数据的连续性。



下面分享一下我们在线上使用 Flink 导入数据时遇到的问题。背景是有一些存量数据,是一种日志类型的格式,原先的 Routine Load 并不适合处理这种格式。因此,我们需要使用 Flink 进行转换,进行 ETL 操作,然后将数据导入到 StarRocks 中。此外,我们内部之前有使用 Flink 消费 Kafka 导入其他数据库的例子,再加上 StarRocks 兼容 MySQL 协议,我们认为稍作一些改动,数据应该可以落入 StarRocks 集群。在功能测试阶段,该思路确实可行。但上线后,业务方反馈对于一些简单的查询,StarRocks 平常在一秒左右就能返回计算结果,但在某段时间内可能需要十几秒甚至二十几秒才能返回结果,这显然是不合理的,存在问题。

我们查看了 StarRocks 的GC日志,发现 StarRocks 发生了频繁的 Full GC,大约每几十秒就会发生一次。对于 StarRocks 而言,虽然我们线上设置的堆内存不大,为 8GB 再加上导入数据量不多,但频繁的 Full GC 显然是不太正常的。



我们提出的解决方案首先是对 JVM 参数进行调优。例如,可以增加堆内存的大小。在这个案例中,理论上增加堆内存可以解决问题。然而,随着后续推广 StarRocks,更多的业务方进行接入,频繁 GC 问题可能仍然会出现。

第二个方案是从根本上解决 Flink 导入导致频繁 Full GC 的问题。问题原因可能有两方面,第一是 Flink 作业导入不规范,第二是 StarRocks 自身代码的问题。第二点的可能性不大,因为业界已经有了使用 Flink 进行数据导入的成功案例。因此,我们更倾向于认为,可能是一开始使用 JDBC 方式导入数据到 StarRocks 时,性能不太好导致出现了问题。

我们进一步调研发现,StarRocks 官方已经提供了 Flink 连接器,它可以帮助我们实现数据导入至 StarRocks。其实现原理为在内存中积攒小批数据,然后通过流式加载(Stream Load)的方式进行导入。

具体的思路是,当 Flink 从 Kafka 消费数据并 sink 到 StarRocks 时,它不会立即进行导入,而是将数据写入到一个内存数据结构,比如一个 Map 中。然后启动一个异步线程来扫描这个 Map。通常情况下,每隔 5 秒,会发起一个 Stream Load 请求。当 StarRocks 收到这个请求后,它会将请求重定向到另一台 BE 上进行具体的执行。当我们修改完代码并重新上线后,发现 GC 日志中不再频繁出现 GC 的问题。



刚刚分享了一些数据导入时遇到的问题,现在继续分享一下我们的数据服务。做该服务的目的主要是希望解耦用户与 StarRocks。首先,该做法可以防止用户随意编写 SQL。如果用户随意编写一些大型 SQL 进行执行,可能会影响到 StarRocks 集群的稳定性,从而影响到线上的其他业务作业。

另外可以增强安全性,我们收回用户的访问路径,统一通过数据服务(Data Service)对外暴露接口。具体实现方面,我们使用了 Spring Boot 框架来搭建元数据服务。鉴于 StarRocks 支持 MySQL 协议,我们可以不必使用 JDBC 方式进行访问,而是可以采用市面上比较成熟的 ORM 框架,例如 Mybatis 框架,来建立应用与 StarRocks 之间的连接。

在研发数据服务的过程中,我们遇到了一个细节问题。前端向数据服务发送请求时,长时间没有响应,导致 Gateway 返回 504 错误。前面提到,我们可以使用元仓查看失败作业的数量,但有时候除了失败量,我们还想深入到具体的明细数据。然而某些明细数据非常庞大,可能有上千万甚至上亿条数据,如果直接拉取全量数据,就可能导致数据服务延迟问题。

我们的数据服务部署在 K8S 集群中,它的 Pod 实例可能会在不同的节点上进行调度和迁移,这样可以实现资源的弹性扩容和容器的自动拉起。在这种情况下,数据服务的访问地址可能会发生变化,因此我们使用 SLB(负载均衡器)通过域名进行访问,以便动态地解析到正确的服务实例。这样可以确保即使服务实例发生迁移或重启,用户仍然可以通过域名访问到服务。

实际上,对于拉取明细数据的 SQL 查询,StarRocks 的查询速度非常快,可能在表级别上只需要大约 1 秒左右就可以完成数据的拉取和返回。然而,真正慢的部分在于数据服务层,主要是由于 Mybatis 的关系映射和 Java Bean 对象的封装。对于大量的明细数据,这个操作可能会比较耗时,导致前端响应超时。

解决方案是分页,因为有时我们并不需要大量的明细数据,可能只是需要查看部分数据。通过分页的方式可以减少查询量,从而提高响应速度。

03元仓与诊断效果



在前面的章节中,我们讨论了大数据元仓的选型、方案和实践。本章将介绍元仓与诊断的效果。首先从 StarRocks 的角度来看,StarRocks 整体的性能表现非常好,在 99 分位延迟方面表现出色,大部分查询都能在几百毫秒内返回结果。



接着从元仓的角度,以 Presto 元仓为例展示其效果。该图为 Presto CPU 资源使用情况的监控。通过 Presto worker CPU 指标(如可用处理器数量和负载等)以及用户选择的时间范围(如 3000 分钟)与时间粒度(分钟、小时或天),对 CPU 使用情况进行分组和聚合,以获取整体 CPU 使用情况的统计数据。



上图展示了 B 站内部 Presto 集群作业的概况。有时用户会反馈 Presto 作业运行较慢或失败较多。在遇到这些问题时,我们可以通过这张图进行量化分析,以确定是否存在排队查询或失败等情况。图中,排队查询量、正在执行的作业成功量以及失败的作业数量等数据主要来源于 Presto Coordinator 的查询信息。通过这些信息,我们可以更好的监控和管理 Presto 集群。



除了了解作业整体失败的数量,我们还希望可以进行细化,下钻到具体的明细数据,知道主要是哪些异常导致。

因此我们采用了 Presto 自身的异常定义,将其分为四大类:用户异常、资源不足、外部异常和内部异常。比如对于用户异常,我们可以进行下钻操作,具体查看哪些用户异常导致了作业失败。常见的用户异常包括列不存在(column not found)、UDF未注册等。此外,用户手动取消作业,状态变为 user cancel,通常是用户认为查询语句有误或查询过程太长



除了上述元仓提供的能力之外,有时用户还会关心作业的执行速度和资源消耗情况,再加上较大的人员值班运维压力,我们急需一套系统能够监控和分析作业的执行情况并且能够诊断用户异常作业同时给出优化建议。

通过市场调研发现,OPPO 的开源项目罗盘是一个功能比较完善的诊断系统,包含了各种规则和功能。因此,我们基于罗盘系统构建了内部的诊断系统。诊断架构如上图所示。

该架构包含了采集模块和多种异常检测模块。有两种采集方式:定时采集和主动采集。定时采集是指定期从 History 中获取需要分析的作业信息,并将其推送至 Kafka;主动采集是指根据用户在 UI 界面上输入的 APP ID,将其需要分析的信息推送至 Kafka。

异常检测模块是整个诊断系统中非常重要的模块。可以订阅 Kafka 消息进行消费,并使用多种诊断规则,如大表扫描、数据倾斜和 OOM 预警等。数据智囊服务会根据之前的诊断信息生成相应的诊断报告,并通过用户界面展示详细的诊断详情。此外,数据智囊服务还可以将诊断信息推送到其他平台,例如公司的治理平台和调度平台,以提供用户更专业的诊断建议。

接下来展示一下诊断效果。



首先,主界面展示了内部作业的异常情况,包括诊断异常任务数、实例数目和资源消耗情况。其次,分享几个诊断规则。



第一个是大表扫描,它的实现逻辑是通过解析计划来分析扫描的数据量。例如,如果扫描的数据量达到 500 亿,我们就认为它是一个大表扫描。并会给出一些建议,比如让用户确认代码逻辑是否添加了分区,是否有一些不必要的数据可以提前过滤。



第二个诊断规则是全局排序异常。我们会关注它 Stage Task 数据量是否达到了阈值。如果达到阈值,就会诊断为全局排序异常。针对排序异常,会建议修改逻辑,减少全局排序。例如,可以将全局排序改为局部排序,通过 ROW_NUMBER 进行分区内排序,从而减少全局排序的数据量,提高排序性能。



还有一个诊断规则是数据倾斜,这是我们经常遇到的一个问题。数据倾斜是指数据分布不均匀,导致大量 Key 集中在个别任务中。对于数据倾斜的诊断,我们会依据 App 与Stage 的运行时长,以及最大 Task 和中位数 Task 所处理的数据量的倍数是否满足阈值来进行判断。

对于数据倾斜,通常的解决方法有两种。从技术角度来看,可以调整一些参数,例如增加 Spark Executor 内存或增加 Spark SQL 的 Shuffle Partition,以提高分区并发性。从业务角度来看,建议用户找出倾斜的 key,通过对 key 进行分组聚合来判断是否存在倾斜。如果找到了倾斜的 key,可以通过加盐打散等方式进行处理。

04总结与未来规划

最后进行一下总结。首先我们在内部完成了 StarRocks 的初步落地,将其应用于公司的元仓场景。另外,构建了一个大数据元仓系统,为用户提供实时的资源观测能力。第三,通过诊断系统推动用户治理异常作业。

未来,会在如下一些方向开展工作:

首先,由于 StarRocks 在大数据元仓场景中表现非常出色,我们希望将其接入更多的业务场景,例如 BI 和 DQC 等。

第二,解决权限、UDF 等问题,与其它引擎对齐。权限方面,目前内部主要通过 Ranger 进行权限管理,因此,如果想要将 StarRocks 与其它业务进行整合,就必须与 Ranger 进行权限打通。在公司内部,我们主要推荐使用 Hive UDF。Spark 和 Hive 之间大部分是兼容的。而对于 Presto,我们进行了一些改造,通过反射等技术使其能够运行 Hive 的 UDF。对于 StarRocks,我们希望在后期能够接入 Hive UDF,与其它计算引擎进行对齐。

第三是关于元仓架构的规划。目前的架构主要是以仓库为中心。但随着原始仓库的推广,更多的组件将被引入,并且数据的生命周期将变得更长。例如,我们计划将半年或者更长时间的数据回流到数据湖中,从而变为湖仓一体的架构。

第四是关于 StarRocks 的加速能力。我们希望能够开启 StarRocks 的一些加速功能,例如物化视图及索引,以提升现有元仓查询的速度。目前元仓主要接入了一些组件的元信息,例如 Presto 和 Spark。在未来,我们希望能够接入更多的组件,例如 HDFS、Kyuubi 的大数据元信息,将其纳入元仓体系中。

第五,诊断系统方面,目前主要以 Spark 诊断为主。未来,我们希望能够支持更多类型的作业诊断,如 Presto 和 Flink 作业的智能诊断。此外,我们还希望将诊断系统与公司内部其他平台打通,为用户提供更专业的诊断建议。

以上就是本次分享的内容。如需更多信息,可以关注B站内部的技术公众号。谢谢大家。



以上就是本次分享的内容,谢谢大家。




本文地址:https://www.6aiq.com/article/1702386651910
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出