Fork me on GitHub

字节基于 Hudi 的批流一体存储实践

以下文章来源于 https://zhuanlan.zhihu.com/p/654263972

导读 通过采用Hudi作为底层存储引擎,结合分布式文件系统进行数据存储和管理,实现批流一体业务高效的数据存储和查询。该方案已在金融风控、零售电商和物流运输等多个场景中得到成功应用,解决了业务痛点。未来,将进一步优化和完善批流一体存储实践方案。

今天的介绍会围绕下面五点展开:

  1. 背景与挑战

  2. 设计方案

  3. 落地场景

  4. 应用案例

  5. Q&A环节


分享嘉宾|耿筱喻 字节跳动 火山引擎 LAS 技术专家

编辑整理|阿东同学

内容校对|李瑶

出品社区|DataFun


01/背景与挑战

首先来介绍一下相关背景。

传统数仓存在实时和离线两条链路,来满足业务对于时效数据的时效性和数据量的不同需求。离线会维护历史的全量视图,实时会维护增量视图,最后在服务层去进行数据的汇总,从而支持后续的在线的serving、 OLAP 查询以及看板的应用等等。



因为处理场景的差异,在实时和离线数仓的具体实现上,依赖的底层存储计算引擎基本上是完全隔离的,实时依赖的主要是以 Flink 为代表的流式计算引擎来做计算,而离线依赖主要是以 Spark 为代表的引擎,实时主要依赖 KV 或 MQ 这样的多种存储选型。离线则常常采用 Hive 为代表的存储引擎,传统的数仓架构,它本质上结合了流计算和 批计算的优势,通过两套代码来兼容实时数据和离线数据的优势,弥补各自的缺点。但是这两套架构或代码也带来了两倍的资源成本,并且因为底层计算引擎的不同,对于相同算子的处理的语义不是完全一致的,它们的计算结果就会存在差异。所以对于研发同学来说,这部分差异也会给数据校验等其它一些工作带来额外的负担。

总结下来,传统数仓的 Lambda 架构主要存在三个问题要解决:第一个是一套计算逻辑,但需要写离线和实时两套代码,带来了两倍的运维成本;第二是离线实时两条链路带来了资源冗余的问题,双倍的资源的成本;第三是两套引擎计算口径不完全对齐,导致数据校准方面会有比较大的困难。所以业内提出了希望通过批流一体来解决当前传统 Lambda 架构的问题。

基于上述问题,再结合内部的实际场景,批流一体的诉求可以分为两种,第一种是计算的批流一体,第二种是存储的批流一体。计算的批流一体指的是希望通过底层一套系统,业务层的一套代码同时满足离线和实时的开发需求,从而解决两套系统带来的研发效率、人工成本、运维成本等资源成本相关的问题。另外我们希望一套系统能够对齐计算指标的口径来解决数据一致性的问题。



存储的流批一体包括,第一是实时和离线所在的存储统一,第二是实时和离线的数据能够复用。实时和离线存储统一是指我们希望实时和离线能够使用统一选型的存储,这就要求存储能够满足大规模、全量和增量数据的读写诉求。批流数据复用指的是批处理能够使用流处理的结果数据,提升整个离线数仓的产出时间。典型的 case 就是 ODS 层的数据复用,另外流处理也能够复用批处理的数据来解决链路冷启动的时候,需要将离线的数据回灌到实时存储中的额外成本,我们通过 LAS 去实现了批流一体的能力。

在计算层,我们的解决思路是对外暴露统一的 SQL,底层根据 SQL 处理的场景选择不同的执行引擎去执行,对用户屏蔽以底层执行引擎的差异性。之所以这样做的原因是我们认为不同引擎适用于不同场景,很难找到引擎能够同时满足实时和离线,不同时效性和数据规模的要求,在 SQL 层,我们对齐了底层执行引擎的差异性来实现计算口径的对齐,解决了上面说到的一致性问题。

在存储层,我们基于湖仓一体的架构,通过数据湖实现了批流一体存储的能力。除了能够支持流式的增量和批式的全量读写之外,我们还支持了高效的 OLAP 查询能力以及维表 join 的能力。

既然提到了LAS,我们看一下 LAS 的整体结构, LAS全称是 Lakehouse Analysis Service。湖仓一体的数据分析服务,融合了湖与仓的优势,既能够利用湖的优势将所有的数据都存储到廉价存储中,供积极学习、数据分析等场景使用,又能够基于数据湖构建数仓,让BI、报表等业务场景去使用。



LAS具有如下一些特性:首先是能够支持统一的元数据,避免在数据湖中存在数据孤岛的问题,每个数据都是可追溯的。第二个是依托数据湖提供ACID 的能力。第三点是机器支持企业级的权限管控。第四点是支持资源的极致弹性扩缩容,降低用户的使用成本。最后是引擎的内核的极致优化,提供高效的读写性能。

LAS 整体架构,最上面一层是湖仓开发工具,为数据应用场景提供能力支撑。下面一层是数据分析引擎,支持流批一体SQL,解决计算批流一体的问题,并且支持根据 SQL 的特点去做引擎的智能选择和加速。针对 OLAP 分析,我们会将 SQL 路由到不同的执行引擎去执行,比如对 Ad-hoc我们会用 Presto 去进行查询。再往下一层是统一元数据层,最后一层是基于 Hudi 去实现的流批存储层。

本文会聚焦在流批一体存储的细节实现上。



我们需要分析现有的离线数仓和实时数仓的具体需求,来考虑流批一体存储的实现方式。离线数仓的整体结构分层相对来说还是比较清晰的,使用的存储也会比较单一,主要是Spark 加 Hive 的形式,提供高效的数据处理和吞吐能力,能够支持离线数据回溯场景下的并发更新。但是实时数仓的使用存储相对来说会复杂一些,一般会依托 Kafka 或 MQ 进行每一层数据表的构建,为了支持高效的 join 的性能,在维表的存储选型上,我们往往会根据数据量的差异选择KV、Hbase 或者是 MySQL 去进行存储。

在整体的 DWS 数仓链路梳理完之后,到了数据应用层会对接 ClickHouse、Doris这样的高效的 OLAP 引擎,去对外提供计时的数据看板报表等等。数据应用层还会有一些 serving 的功能,服务层会将数据写到 KV 或者 MySQL 或者 ES 这些存储里,对外提供 serving 的服务。



在构建实时数仓新链路的时候,对于链路冷启动,需要使用历史分区的数据,所以我们需要将离线的数据回灌到实时链路的 MQ 里面,受限于 MQ 带宽的限制,整体的回溯周期可能会非常的长,并且操作很复杂。另外当计算指标有问题,或者是整体的计算口径需要调整的时候,也需要使用离线的数据去对实时数据进行回刷,同样它也会遇到回溯周期长,操作复杂等一系列问题。

通过介绍可以看出实时数据仓库整体相对比较复杂,存储方式和构建标准没有完全统一。为了更精细地分析实时数据仓库对于流批一体存储的需求,我们基于数据量延迟、数据一致性要求和计算周期等维度,将场景划分成了三类:日志计算、长周期计算和全量计算。



日志计算的场景特点在于数据量比较大,但是可以接受少量数据丢失。大部分数据要求在分钟内计算,并进行分组聚合。但该场景的痛点在于希望通过批流数据复用和统一以提升数据时效性和降低资源成本。

对于长周期计算场景,数据量相对中等。需要对指标进行复合计算,但整体数据周期可能较长。直播类业务场景可能持续一个月,数据要求在秒级。该场景的痛点在于冷启动和回溯过程复杂、周期长、成本高。

全量计算场景数据量不大,会将全量数据存储到Flink state中进行计算,要求强一致性,时效性要求在秒级别。但该场景遇到的最大问题在于因数据存储到Flink state中,未进行分层结构,回溯的中间结果可能不透出,不太利于开发人员进行调试操作。

总结来看,数仓对于存储的主要需求可以概括为以下几点:其一,实时存储不统一,运维复杂;其二,实时离线存储不统一,资源成本高;其三,冷启动或回溯过程复杂耗时;其四,无法查询中间数据。

因此,我们的批流一体存储方案,不仅要解决上述痛点,还需要具备以下基本能力:支持离线回溯场景的分区并发更新,且数据读写吞吐量不低于Hive。

对于流式场景的流批处理,需要满足低延迟的要求,数据延迟约为几秒钟,并能够提供高吞吐量以支持千万级RPS。此外,我们还需要提供支持Exactly-once和At-least-once数据一致性语义的功能。为了实现整体的流批一体的目标,还需要支持多引擎,例如Spark、Flink的读写,同时也需要支持多种OLAP引擎进行查询。

02/设计方案

接下来看一下我们的流批一体存储方案,结合刚刚讨论的流批一体存储目标,我们发现现有的基于数据湖仓一体的架构,实际上已经可以满足大多数要求了。当前数据湖仓一体的架构已经支持所有数据入湖,并支持Spark、Flink引擎,同时也可以进行离线和实时的数据操作。在下游数据应用方面,数据湖仓一体的架构还支持ihook、metastore、adhoc等OLAP查询方式。

在字节内部使用的场景中,业务会通过Flink实时将数据入湖,使用Spark批量回溯更新湖内的数据,并且下游会使用Presto查询服务来触及下游的看板。因此,我们信息湖仓库主要使用了Hudi这样的开源方案。在功能方面,数据湖仓库基本符合了实时和离线数仓对于流批一体存储的需求,而这主要是因为Hudi本身提供了事务支持,我们在内部还进行了桶索引机制的优化以进一步提高入湖的性能,并且通过metastore的元数据服务来支持并发写入功能。此外,Hudi原生支持多引擎,因此既可以对批流进行读写消费,也可以使用Presto进行交互式分析。



在内部,湖仓一体架构大规模地落地了离线的数仓场景和部分近实时的数仓场景。但是因为 Hudi 本身的或者数据库本身分钟级别的可见性,它还是没有办法做到实时数仓存储的标准方案。

为了解决时效性的问题,提供低延迟的能力,我们内部自研了基于内存的服务,它构建于数据湖之上,形成了一套整体的高吞吐、高并发、低延迟的实时数据服务方案。底层方案的整体架构如图所示。底层是持久化数据层,会复用Hudi当前的能力持久化数据,文件分布跟 Hudi一致,通过 log 的行存文件和 base 的列存文件进行数据存储,会通过 file slice 这种基于时间戳的方式去维护数据的版本信息,通过 file group 这样的方式去对文件进行分组,相同组件的数据会存储在同文件组内。这种文件变分组的方式,再结合索引的能力,能够有效地提升数据入湖的性能和查询的性能。



上层服务层主要分为两个组件, BTS 和 Table Service Management。BTS 是基于内存构建的服务层,它主要是来解决实时场景下数据读写的时效性的问题,通过内存去对数据读写进行加速,TSM (Table Service Management),是表优化的服务,它会异步地去执行一些表优化的操作,从而实现对查询的加速。



这里的表优化操作指的包括社区原生的压缩聚合 clustering,以及一些索引异步构建,视图异步构建的一些操作。压缩聚合指的是对日志文件和基础文件进一步合并以生成新的列式存储文件,这样对整体查询效率而言更优。而 clustering 则是合并小文件以减少文件开销。当前 TSM 只支持这两种能力以及清理能力,我们计划结合社区现有的 MDT 能力来异步构建多级索引,以提升交互式查询的性能。

表优化操作是一个完全异步的过程。这部分是我们自主开发的服务,因为一些社区原生实现并没有做到完全异步。为什么要异步呢?因为 compaction 和 clustering 的执行时间比较长,同步操作会影响数据湖的写入速度,特别是在实时场景下不可接受。而社区的异步操作仅指写入时不阻塞,但是 compaction 会共享写入资源在同一个应用程序内执行。这可能会影响写入作业的稳定性,因此我们在内部落地过程中发现了这个问题,最后实现了一个完全异步的调度执行,同时不共享写入资源的服务。在具体的执行层面上,我们还利用混部的资源以降低成本。

1. 数据组织形式

基于这样的新的流批存储架构,我们新增了中间的服务层,特别是BTS这样的实时元数据加速层,整体的数据组织形式如下图所示。

数据组织形式在逻辑层分为表分区、文件组和文件大小等概念。数据写入时,先写入对应分区,再根据主键写入对应文件组。文件组的底层文件存储分为内存数据和分布式文件系统数据两种类型。在内存中,数据由块构成,而在分布式文件系统中,数据的组织形式与Hudi相似,采用基础文件和日志文件的模式。值得注意的是,我们引入了日志存储层来存储WAL文件,以确保数据在写入过程中的有效性,并解决内存数据丢失的问题。因此,在写入内存数据之前,我们会先写入WAL文件,确保这部分数据已被持久化存储,实际操作才被认为是成功的。

对于内存文件块与持久化存储文件之间的映射关系,每个块对应一个WAL文件以保证数据的容错性。每个内存中的块都不会永久存储在内存中,而是定期地刷到持久化存储文件中。在实际操作中,多个块通常对应一个日志文件。

2. 数据读写方式

再来看一下整个数据读写的交互方式。首先,做了流批复杂的分离,因为流场景和批场景对于数据的可见性和时效性的要求是不完全一致的。对于批量回溯的场景,用户并不是希望能够马上可见,只是希望把这部分数据做好更新和校准而已。



在批量数据更新的场景中,数据会直接写入持久性存储中,也就是会写入到HDFS上,而不是通过内存。这种方法可以极大地提高我们的读写吞吐量。对于流式读写的情况,会首先访问BTS之类的内存服务进行读写。这里的主要实施细节是,在写入数据时,我们会优先写入内存中,会首先写WAL文件以保障容错。在读取数据时,由于内存中的数据不会一直存在,因为cache会定期清理,所以读取时会优先访问内存中的数据。

如果发现内存中没有数据,我们会先加载WAL文件并对其进行预加载,以尽可能地将数据优先加载到内存中,以保证流式读取的时效性。

如果发现WAL文件也不存在,或者被清理了,那么我们就会转而去读取持久性存储中的日志。

在大多数情况下,这是流式读写,整个周期相对较短。因此,在内存中,WAL文件的存储能够做到一周之内的时效性。一周之内的用户都可以正常从内存中消费这部分数据。如果用户希望存储长周期的数据,那么他可能需要承担更多的存储成本。我们需要尽可能避免从HDFS上加载日志文件,这是整体的数据读写方式。

3. BTS架构

刚刚提到的流读的场景,我们也做了读写的负载分离,会有单独的读集群去承接整体的读流量,来避免它影响写入节点的性能。

我们来看一下整体的BTS架构,BTS首先是Master-Slave架构。Master主要负责一些元数据信息的管理,它的结构与HDFS相似。Table Server以Slave的形式存在,负责数据的读写,并在其上存储由若干个block组织而成的文件。对于Master,它管理的元数据包括Table Server的信息以及block的元数据管理。由于元数据管理的需要,它必然会引入一定的负载均衡机制。因此,我们目前实现了比较简单的负载均衡机制,旨在避免某一Table Server内存被打爆的情况。



Table Server主要提供数据读写能力,维护本地的块并定期进行块清理。它异步将某些块刷新到HDFS上,整个数据读写流程是客户端请求master,获取需要写入的块,然后找到对应的Table Server进行数据写入。在写入时,它优先写WAL文件,再写内存文件。当数据全部写入并ACK返回后,表示这批数据已经成功写入,不会再丢失。此时涉及到数据提交问题。

这部分统一由主节点master去管理事务,其整体的事务机制与Hudi目前的实现相似,即依托于引擎进行提交。对于Flink来说,每次checkpoint都会触发主节点进行提交。因此,当下游消费这批数据时,如果我们需要达到秒级数据,则不太可能进行秒级信息源数据的提交。因此,在这部分下游可能会读取一些基于read on committed的数据,所以需要进行去重操作,以确保At least once的语义。以上就是整体的BTS结构的介绍。接下来介绍落地场景。

03/落地场景

我们的主要落地场景是流式数据计算,它类似于离线数仓,需要进行一些ETL清理和简单的聚合计算。左侧是整体架构方案,我们使用了基于Hudi加BTS的数据湖方案来替换MQ,从而实现每一层数据表的存储。在维表上,我们目前仍使用KV存储。我们当前的目标是替换MQ场景。



原先离线需要将实时表从 MQ dump 成 Hive,去进行后续的离线数仓的相关工作。切换成这套方案后,dump 操作就可以省掉,能够做到流批的数据复用的能力来减少整体的中间存储的成本。

第二个场景是多维分析场景,其特点是实时数据清洗后直接支持看板等实时的OLAP查询。基于批流一体方案结合 Presto 查询来满足业务侧分钟级时延诉求,和秒级查询响应诉求。我们团队针对Presto进行了许多优化,包括native engine等相关技术,以实现高性能查询。目前,该场景也在现场落地,并取得了不错的收益。另外,因为整体的流批是数据表,存储是统一的,所以不需要额外将其转储为Hive表,也不需要维护离线存储的快照。



第三个场景就是批流复用的日志场景,一般大家直觉上会从 ODS 层切换做批流复用,字节内部,在实时场景会先对接埋点数据, Flink 端去做清洗,落到实时的存储里面,然后对接看板等下游。在离线数仓上,也会将所有埋点信息存储下来,根据具体的业务场景落成不同的 ODS 表,再去构建离线数仓的任务。



当我们整体把存储换成 LAS 这套方案之后,只需要维护 ODS 层的数据,就能支持离线和实时两个的场景去进行分析。

最后是飞书数仓的场景落地,整体链路比较清晰,分为实时和离线两个链路,这里离线实时链路主要是去做一些人事信息变更之类的业务。离线链路主要是对一些长周期的问题去做一些数据的修正,把这部分修正的数据回补到我们的实时链路里面,让下游的看板数据变得更准确。



在实时数据传输和离线数据处理两个环节中,我们都采用了LAS之类的存储来替换底层存储。离线数据处理要求数据的处理时间在10-15分钟内完成,因此用户更惯用Spark处理离线数据处理环节。针对此,我们提供了基于Spark Thrift Server的解决方案,以减少离线数据处理中每个环节的资源申请开销,维持常驻资源,让用户运行SQL来构建离线数据处理模块。

在实时数据传输环节中,用户原本采用Kafka构建基础表并使用Hbase进行维表构建。因为Hbase对联合主键的支持并不友好,用户每次在读写时都需要去序列化和反序列化主键列,并将复合主键拼装成单一主键,最后将其写入Hbase中,然后再从Hbase中读取。这样的流程很复杂,且时间耗时较长。

那么除了替换之外,就像之前提到的一样,我们会替换掉MQ或Kafka这样的组件,并且我们还用其他存储替代了Hbase。我们的主要目标是实现基于Flink的lookup join功能,并将小表直接加载到内存中,以提高lookup join的性能。因此,在实时链路这一块,我们已经替换了所有的存储组件。



04/未来规划

最后分享一下未来规划。首先,我们会探索更多业务场景,大规模落地更多模式。其次,在技术迭代方面,我们会对负载均衡和分离做出优化。负载分离在BTS内部(即内存服务内部)会针对一些小组件进行优化,比如将WAL文件刷入持久化存储(如HDFS),这部分资源占用比较高,需要分离处理。另一方面,我们会更加细致化地对读写负载分离和内存服务负载均衡进行优化。此外,我们还会实现更精细的流批负载分离。第三点是查询优化,我们会结合索引的能力,在内存层和整体存储方案中通过构建索引优化块的数据结构来加速查询性能,包括点查和联合查询等。最后一点是与native engine的集成,以提升整体的读写速度。在这方面,我们需要对底层的log、block和parquet文件进行向量化处理。

05/Q&A环节

Q:Hudi支持流式的写入与更新,那Kafka 是否可以被取代了?

A:我不确定是指社区的Hudi还是我分享的Hudi。所以我谈一下我分享的整套方案,但我认为我们目前在某些方面还有欠缺。比较困难的是如何实现Kafka的exactly once语义。

Q:LAS支持的组件索引和二级索引是什么样的索引结构?

A:实际上,这部分主要是社区原生实现,包括我们内部的实现也大部分已经贡献到社区了。就组件而言,社区实现是基于哈希去进行分桶,并同时记录一些布隆过滤器(Bloom filter)。关于二级索引,社区目前正在进行迭代,但并没有完全合入。

Q:使用 BTS 可以加速 Flink 写入 Hudi 的性能吗?

A:会。时效性提升很多。因为第一个是我们写内存,第二个是整个数据结构会相对 Hudi 来说会比较轻量一些。

Q:BTS 与 Hudi分别适用的应用场景。BTS 与Hudi的具体关系。

A:首先对于我们来说,我们整体的这套方案叫LAS, LAS 底层是基于 Hudi 去做的实现,为了支持流批一体存储,我们在Hudi上加了一层内存缓存层BTS,结合查询引擎,一整套方案称之为 LAS。因为 BTS 是基于 Hudi 上架了一层,所以 BTS 整体的逻辑会跟 Hudi 强相关,它我两者之间的交互主要就是内存文件到持久化存储中间的一些交互,其他的大部分的设计会沿用Hudi的部分的逻辑,比方说我们会通过TSM, Table Service Management.去做一些比较优化的服务,比方说 Hudi 的compaction, BTS 的 WAL 的clean,就这些操作。

Q:LAS怎么保证查询数据的准确性?

A:我们当前支持的语义是 At least once,就是说首先用户能够在数据里面加业务字段来判断哪条数据是最新的At least once,但是这条数据有可能会被写入多次,所以用户下游需要做一些去重的操作。BTS 数据怎么保证一定是写进来的?首先我们会写 WAL 文件,就是 WAL 文件是持久化存储上的文件,当文件写完之后我们才会,我们会一边写 WAL文件一边写内存的数据,只有写到 WAL 文件才会写内存的数据,那整体都完成了之后才会返回给 client ACK,否则, client 会认为这次提交失败了,会重新去往里写,所以整体上一致性是不会有太大的问题的。

今天的分享就到这里,谢谢大家



本文地址:https://www.6aiq.com/article/1693822231924
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出