Fork me on GitHub

基于 StarRocks 和 Paimon 打造湖仓分析新范式

以下文章来源于 https://zhuanlan.zhihu.com/p/684177680

导读 本文主题是基于 StarRocks 和 Paimon 打造湖仓分析的新范式。将介绍 StarRocks 和 Paimon 结合的应用场景以及其中的关键技术。

主要内容包含四个部分:

  1. 数据湖分析技术发展

  2. 使用 StarRocks+Paimon 进行湖仓分析主要场景介绍

  3. 使用 StarRocks+Pamon 数据湖分析的关键技术原理

  4. StarRocks+Paimon 湖仓分析能力的未来规划

分享嘉宾|王日宇 阿里云 高级软件开发工程师

编辑整理|张龙春

内容校对|李瑶

出品社区|DataFun


01数据湖分析技术发展

首先来看一下数据湖技术的产生和发展。

传统数据仓库的构建是基于 ETL 的概念,从业务系统拿到数据之后,先进行大量的清洗、加工、预聚合,再 Load 到数据仓库里,这个过程中会损失大量的原始数据信息。现在存储的成本越来越低,尤其是对象存储已经非常廉价,大家对存储的要求也就不那么苛刻了,允许容纳大量原始数据进来。原始数据中可能有脏数据,可以先容纳进来,再做 Load 和 Transform 的工作,这就是所谓的 ELT。

当前数据湖四剑客 Iceberg、Hudi、Delta 和 Paimon,主要解决的问题是原始数据经常大量批更新、ACID 的事务管理、并发处理以及 Schema Evolution 等。



Apache Paimon 是由 Flink Tablestore 项目孵化而来的,其最大的特点是入湖速度特别快,Compaction 也特别及时,所以适用于对实时性要求很高的场景,并且 Paimon 是原生流处理。

介绍了数据湖之后,接下来介绍如何对湖格式进行分析。这就引出了本次分享的主角------StarRocks。



StarRocks 是一个极速、统一的数据湖分析查询引擎。如上图,StarRocks 定位是分析层。从下到上看,最下面是存储层,包括常见的 HDFS,也有廉价的对象存储,如 S3、OSS 等。在此之上构建各种格式的数据,比如传统的 Hive 格式,还有新兴的 Paimon 格式。如果是流式实时任务,经常会用到 Flink 进行实时加工处理,而批任务会用到 Spark 进行 t+1 报表的处理。常见业务场景中前端连接 MySQL,构建相应的 CDC 任务,然后经过 Flink 消费聚合,再写到存储系统里,最后使用 StarRocks 进行分析。

在使用 StarRocks 之前,我们常用 Hive、Impala、Presto、Trino 等各种分析引擎去分析。StarRocks 的定位是统一这些分析引擎,把复杂的语义都放在一个引擎里去操作,再通过 StarRocks 直接对接上层的业务报表、Adhoc 查询等工作。

下图展示了 StarRocks 在湖仓方面的一些重大特性的发展。



现在 StarRocks 最新版本是 3.1,很快会发布 3.2 版本。从 1.x 到 3.x,其湖仓能力逐渐增强。这些能力分为两部分,一部分是湖仓重大特性的支持,另外一部分是底层性能的持续优化。

第一个湖仓重大特性,是支持了Hive、Elastic、MySQL 外表。之所以使用StarRocks 就是因为查询快,以 Hive 为例,现在的 StarRocks 直接查询 Hive 表,能够达到 Presto 的 3 到 5 倍。

实现这一能力,主要是利用其内部优秀的向量化执行引擎,以及 CBO 优化器,和各种 runtime filter 等。

去年 StarRocks 2.X 主要解决了湖仓分析的 Catalog 数据目录。之前如果想建Hive 表,需要按表来建,即执行 create external table,指定各种字段。然而大数据领域库表是非常多的,可能几十张、几百张,如果这样逐一建表,会浪费很多时间,也容易出错。所以 StarRocks 引入了 Catalog 数据目录,只需要指定一个 MetaStore URL 或一个 File system 的 URL,就能自动加载整个目录下的库表,不用建表就可以直接进行查询分析。

在 2.x 时代,除了对 Hive 的支持,还对其它数据湖格式进行了完整的支持,包括 Iceberg、Hudi 和 Delta lake。并引入了 JNI Connector 能力。大数据生态大部分都是 Java 语言编写的,无论是 Hive、Hudi、Iceberg,还是新兴的 Paimon,都是纯 Java 写的。而 StarRocks 的优势是 native,它是用 C++ 语言写的,如果不构造一个类似 JNI Connector 中间层,而使用 C++ 去对接 Java 的湖格式,就要用 C++ 重写各种湖格式的 reader,代码量会非常大,而且也不容易跟社区进行完整的结合。所以引入了 JNI Connector,简单配置几个类,如 reader 的类名就可以直接使用 C++ 读取 Java 的数据。

另一个重大特性就是引入外表物化视图,这就解决了如何查得更快的问题。比如Hive 外表或者 Paimon 外表,放到物化视图里以后,这些数据会使用 StarRocks 自己的存储格式存在自己的存储管理系统里,这样就可以用到大量的 StarRocks 内部的索引优化、CBO 优化等,存储和查询引擎相结合带来的查询优势。

2.x 时代最后一个重大特性就是支持了 JSON、map、struct 复杂类型。

同时,底层性能持续优化,包括 IO 性能优化等。IO 合并能力就是优化之一。大数据领域经常会有一些业务或 SQL 任务写得不当,或者某些业务天生特性导致产生大量小文件,继而会引起大量的 IO 请求,这些 IO 请求频繁又密集,不仅给存储系统带来压力,也会影响分析系统效率。在 StarRocks 内部引入了 IO 合并性能优化,其内部会把一些小的 Page,row group 在一次 IO 读取,增强 IO 效率,提升性能。

另一个底层优化是延迟物化,比如谓词有两个列,查询列 a 和列 b。这两个列的选择度可能不同。选择度指的是,比如一百行数据,通过谓词下推之后,可能只剩下 10 行,另外一个谓词下推之后还有 80 行,那么第一个列的选择度就更高。我们就可以直接用第一个列的谓词结果再读取其他列,大部分数据就不用读了,只需要读 10 行就行。

Pipeline 执行引擎,指的是调度优化。

再来介绍下直方图统计信息。一个 SQL 的查询效率很大一部分除了格式和 IO 优化,还取决于它的执行计划。得到的统计信息越多、越丰富,执行器优化器出来的执行计划也就会越好,执行效率也就会越高。但是在外表情况下,统计信息很多都是缺失的,这是现实问题,所以要使用外表物化视图把它物化进来,通过StarRocks 自动索引以及统计信息机制。把需要的统计信息都构建好,后面的查询就可以更快了,这就是为什么可以使用 StarRocks 加速外表查询。

StarRocks 3.x 版本主要有以下几个特性。首先是 Paimon 外表的支持,其次是 JNI Connector 复杂类型的支持,第三是算子落盘 Spill,这个也是 StarRocks 3.0 最重大的特性之一,支持了 Spill 之后,才能容纳大规模数据量的查询。比如机器内存比较小的情况,几个 G 或十几个 G,有了 Spill 之后,所有算子的中间结果可以落盘之后再复用。之前很难做到在小内存的机器下做大数据量的查询,在Spill 基础上就可以用 StarRocks 做大数据量 ETL 了。

第四大特性就是引入了 Trino 兼容性,StarRocks 社区有很多用户,最开始用Trino 进行 OLAP 分析,为了获得更快的查询速度而迁移到 StarRocks,但迁移后因为语法不兼容,导致原来 Presto、Trino 下的 SQL 不能用了。3.0 版本就推出了 Trino 兼容性这个特性,方便原来使用 Trino、Presto 的 SQL 可以一行不改,直接跑在 StarRocks 上。

第五个特性是外表物化视图的分区刷新,之前在 2.x 版本的外表物化视图是整表刷新,后来引入分区刷新,会自动感知对应的 Paimon 表、Hive 表是哪个分区被修改过,只刷新那个分区即可,从而减少刷新代价。

最后一个重大特性是支持 Hive、Iceberg 数据写入,现在完全可以使用StarRocks 去做数据湖仓分层的主要引擎。

StarRocks 的性能也在持续优化,包括 Data Cache 能力。Data Cache 顾名思义就是缓存,现在的 Hive、Iceberg、Delta lake 或 Paimon,大多使用对象存储,对象存储走 HTTP 有网络波动可能会带来不稳定。在遇到带宽瓶颈时,查询效率就上不来。这种情况下 CPU 是空闲的,只是网络占用高。因此引入 Data Cache 在本地做缓存。最后就是 JNI Connector 的性能优化,为了实现更快地查询。Paimon 就是使用 JNI Connector 接入的。

02使用 StarRocks+Paimon 进行湖仓分析主要场景介绍

第二个部分介绍使用 StarRocks 和 Paimon 可以做哪些事。

1. 联邦查询

第一个场景是联邦查询,这也是 Presto、Trino 的优势之一。联邦查询可以跨数据源查询,目前已经支持了大部分数据源,如 Hive、Paimon、Iceberg、Hudi、Delta Lake,能直接分析湖上数据。并且这些不同格式的数据可以联合在一起查询,比如一个 Paimon 表 Join 一个 Iceberg 表,一个 Hudi 表 Join 一个 Hive 表。当然 StarRocks 本身有自己的内表格式,这个内表格式也可以跟湖格式进行互相访问和 Join 查询。最终可以做到跨数据源的联邦分析,内外表的数据访问统一管理。使用方式也特别简单。



引入 Catalog 机制之后,使用 StarRocks 查 Paimon 表,只需要 create external catalog,指定 type 等于 Paimon。Paimon 目前有两种 Catalog 类型,StarRocks Catalog 的实现目前映射到 Paimon Catalog,这两种类型是Hive Metastore 和 File system。

Metastore 相当于元数据存在 Hive Metastore 里,而 File system 相当于 Paimon 表不依赖 Hive Metastore,它本身存在文件系统里。实际上使用 Metastore Catalog 的一般也能用 File system 查询,因为所有数据就是落在存储上的。

HMS 实现只是为了兼容重度依赖 HMS 系统的场景提供的。这里以 File system 这种 Catalog 为例,再指定一个 Warehouse 地址。可能很多业务都是多个 Warehouse,一个 Warehouse 有不同的库和表,只需要指定这个 Warehouse 地址,StarRocks 就会自动取到这个 Warehouse 下所有的库和表,然后就可以执行查询。Select * from 刚刚建的 Paimon FS catalog,之后 .db.table 就可以直接查到表。这是联邦查询的第一个场景,相对比较常见。

2. 透明加速




第二个场景是透明加速。如果直接查同样外表的话,SQL 也会有很多种,比如经常用到某两张表的聚合,有时候 Select A,有时候 Select B,有时候 Count* 以及 SUM 不同字段。

在传统的数据分析引擎里,每次查询都要扫描全表,现场做一些计算再返回。这就可以用到物化视图。如下面的 Select 语句(这是一个 TPCCH 查询),这个查询是最简单的聚合查询,执行一次要扫全表,执行第二次还是要扫全表。但如果先建立一个物化视图,如图中所示,三张表的内容就物化到 StarRocks 内部存储系统里了。

StarRocks 会自动构建索引,作为统计信息来优化它的执行计划。在查询时,比如写一个类似下面的 SQL,当匹配到能用的物化视图时就会直接使用物化视图预计算的结果,从而实现加速。这就是透明加速,声明物化视图让优化器自动改写,解决了 BI 报表不易修改 SQL、不易调优的问题。

3. 数据建模



第三个场景是数据建模。数仓一般都采用分层结构,最底层是 ODS 层,往上是 DWD、DWS、ADS 等。每一层都会面向不同的业务场景去做分析。StarRocks 的物化视图支持嵌套,前面的例子是在一个 Paimon 表里建物化视图,还可以在已有的物化视图之上再建立一个物化视图,达到数仓分层的效果。

底层存储可以使用 HDFS 或者 OSS 等作为基座,上面使用 Paimon 格式,最上面使用 StarRocks 做分析。

4. 冷热融合



第四个场景是冷热融合。很多业务场景中,数据都是按天分区,但大多业务用到的都是最近几天的分区,所以有冷数据、热数据的概念,例如三天之前的数据称之为冷数据,三天之内的数据称之为热数据。因为热数据经常查,希望快速返回结果。而对于全量冷数据,一般从业务场景来说,查询慢一点,也不会很在意。因为一旦查询冷数据,一般都是复杂的历史综合查询,不会要求那么实时。因此,可以利用StarRocks 和 Paimon 以及物化视图的特点自动为冷热数据做加速。

首先 create materialize view,里面可以指定一些 property 属性。只需要告诉Paimon 物化视图的 partition TTL 等于 3 month,那么物化视图在构建的时候就会把这三个月的物化视图自动拿到 StarRocks 里做加速。假如要查三个月的数据,全量地读物化视图即可,可以天然地用到索引加速。如果要查五个月,那三个月内的数据还是会走物化视图做加速,另外两个月不在物化视图内,会去走远端,拉取原始的 Paimon 或者外表信息,然后自动将五个月的数据做 union 再返回。不需要像传统一样全部 5 个月的数据都要从外部存储做一次 IO 查询。在热数据本身已经在物化视图里的场景,就会自动使用热数据加速。用户在 SQL 语义上不需要做任何改动。

03使用 StarRocks+Pamon 数据湖分析的关键技术原理

第三部分简单介绍一些关键技术原理。

StarRocks 依赖于 JNI Connector 读 Paimon 的数据库分析关键技术。



StarRocks 查询是用 C++ 写的,为了便捷地读取 Java 数据,引入了JNI Connector 存储层,整体架构如上图所示。底层 Java Data Source 是数据源,可能是 Paimon、Hudi 等。在接入一个新数据源时,只需要实现 Open、GetNext 和 Close 三个方法。调用原始 Java Reader 的一个封装,告诉 JNI Connector 数据信息。再进行一些配置,比如对于 Paimon,配置 Paimon Reader 的类名称,再配置一些必要的信息,就可以使用这套 JNI Connector 读取外部的 Paimon 数据源了。中间所有 Java 和 C++ 内存转换都不需要操心,所有的类型都在 JNI Connector 里封装好了。

主要功能有五点:

  • 可以快速接入各类的 Java 数据源,无需考虑数据转换。
  • 提供了简单易用的 Java 接口,只需要实现 open、getNext、close 三个方法即可。
  • 已支持 Hudi MOR table、Paimon table 等新兴的数据格式。
  • 支持 Struct、Map、Array 等复杂类型。
  • 完全做到 BE(Backend)C++ 代码的零侵入,不需要考虑任何 C++ 具体实现。


上图描述的是数据存储格式,所有的 C++ 和 Java 内存交互都是参考了 Spark Photon 项目。定长字段类型比如 long、int 等;变长字段类型有 varchar、string 等。实现 JNI Connector 的时候,要在 Java 开启一块堆外内存,因为要让 C++ 的代码去访问它,C++ 不太方便直接访问 Java 堆内存。另外,使用堆外内存所有的分配和释放都可以由自己来控制,相对比较灵活。直接使用堆内存的话,还需要考虑 Java 回收机制的影响。

对于定长字段类型,会有两块区域去存信息,第一块是 null indicator,每一个块都由一个个字节挨着放在一起。如果是 true,就说明这一行的数据为 null。如果 null 的话,就不用访问数据内存,如果非 null,就需要访问数据内存。对应的数据内存比较简单,比如 int 类型,就是四个字节内存排在一起。变长类型的存储多了 offset 地址,因为变长需要把 offset 记录起来,在内存里一个个排列好,后一个减前一个加一,就是整个行的数据长度。

这里所有的核心都是围绕 C++ 内存的访问,围绕 StarRocks 在 C++ 里是如何对不同类型进行布局的。最后 Java JNI Connector 实现的时候,只要把内存布局在堆外放成 StarRocks 可识别的布局,C++ 代码就可以访问了。实际访问的C++ 代码也比较简单,直接把整块内存做 mem copy,即可直接进行读取。

04StarRocks+Paimon 湖仓分析能力的未来规划

最后简单介绍 StarRocks 和 Paimon 湖仓分析能力的未来规划,这也是 Paimon 和 StarRocks 社区的共同规划。



  • 支持 Append Only 表类型。目前已经支持 primary key 表类型,Paimon 很快会发 0.6.0 版本,对 Append Only 表类型做完整的支持。
  • 优化 date 和 datetime 类型的处理效率。现在对这两个类型是通过 string 做转换,在一些场景下的效率比较低,像 datetime 等在原生语言里一般都是以 long 类型保存,因此希望直接通过 long 类型来读取。
  • 使用 native reader 加速查询。JNI Connector 虽然一直在优化,但是总会有一些 JNI 开销以及内存 copy 的开销,这些开销是无法抹平的。所以对于一些常用的数据湖格式,比如 Hudi、Paimon,其所有文件底层类型都是 ORC 或 Parquet,在此基础上,Paimon 加了 LSM tree,Hudi 加了 Delta log,reader 的核心原理是先把 Parquet 文件读出来,再把 log 读出来,做一个合并。如果 Key 相同的话,以 log 的值为准,然后 merge。但是并不是所有的 parquet 文件都有一个对应的 log,可能某个 Parquet 文件本身就没做更新。对于那些没有 log 的文件,完全可以用原生的 C++ reader 去读。因此,希望区分 Paimon 表的 split 类型,如果不需要做合并,就直接用 C++ reader 读,否则再走 JNI。
  • 支持列统计信息。调 Paimon 接口,拿到列统计信息,CPU 就会自动用一些列统计信息。
  • 支持元数据缓存。所有的数据湖格式都是在原生的 Parquet、ORC 基础上附加了一些元数据。目前读 Paimon 表时,每次都会读取元数据。但有些元数据没有更新是没必要读的,因此可以做缓存去加速 IO。
  • 支持 time travel 和 snapshot,也就是历史查询,回溯到某一天的数据段。
  • 支持 Paimon 外表的 sink 能力,通过 StarRocks 直接写 Paimon 表,完善使用 StarRocks 做 ETL 的数据链路。

以上就是本次分享的内容,谢谢大家。



本文地址:https://www.6aiq.com/article/1709017541548
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出