Fork me on GitHub

百度流批一体的实时多维分析实践

导读: 今天给大家带来的分享主题是流批一体的实时多维分析。分享主要分为 4 个部分,首先是大数据架构演进介绍,从传统的经典离线架构到 Lambda 架构, Kappa 架构,分析一下各种架构的优缺点;接下来介绍我们的业务痛点,以及流批一体的解决方案;第三部分会重点介绍我们的流批一体方案在落地过程中遇到的问题,以及我们的解决方法;最后是总结以及未来规划。

主要内容如下:

  • 大数据架构演进
  • 流批一体方案
  • 关键问题突破
  • 总结和规划

分享嘉宾|郑德来 百度 资深研发工程师
编辑整理|Brandon OPPO
出品社区|DataFun


01 大数据架构演进

1. 经典离线数仓架构

在经典离线数仓架构中,最底层是数据源,包括日志打点、DB 存储的数据,以及第三方回传数据等等;ODS 层为数据操作层,主要用来做简单的清洗,存储一些基础数据;在 ODS 层之上是 DWD 层,在该层会构建出最细粒度的明细事实表;DWD 层之上是 DWS 汇总数据层,在这一层会按主题对明细数据进行汇总;最上层是 ADS 应用数据层,存放业务的个性化统计指标。

图片

经典离线数仓优点是架构简单,开发成本与资源成本低,数据容易管理,同时因为没有实时数据,也不会存在离线实时数据 diff 的问题。

经典离线数仓最大的问题 首先是数据延迟的问题 ,随着业务的复杂性越来越高,数仓复杂度也会不断的提升,需要关联的数据源也越来越多,数据的整体产出时效,就会有各种问题,包括输入数据流断流,数据延迟等在传统数仓常见的问题。

第二个问题是缺失实时数据,通常数据的产出时效都是小时或者天级别,有些数据量比较大可能会是周级别甚至月级别产出,无法支撑起业务对数据时效性的诉求。

第三个问题是表的数量太多,因为一些复杂的业务场景 ODS 表可能有几百张,产出数据时需要做大量的关联,数仓使用体验变差,同时关联查询也会导致数据产出的时效变差。

图片

2. Lambda架构介绍

Lambda 架构出现的初衷是为了弥补经典数仓时效性差的问题。

整个 Lambda 架构可以分为 3 层,首先图中 最左边的是 Batch Layer 层 ,也就是批处理层,在批处理层复用了经典离线数仓的分层架构,在技术栈上也是与经典数仓一致,使用 MR, Hive, Spark 等离线计算框架。

第二层是右边的 Speed Layer 加速数据层,用于产出时效性高的一些数据。Speed Layer 层对数据准确性和完整性会有一些降级。Speed Layer 层采用 Kafka 等消息中间件来进行数据的传输和存储,使用 Storm,Spark Streaming,Flink 进行数据的计算。

Lambda 架构的第三层就是 Server 层服务层,在服务层中会把 Speed 层和 Batch 层的数据进行合并,或者替换输出到数据库中来支持数据应用。

图片

Lambda 层中的 Speed 层让数据的产出时效大大提前了,同时具有 Speed 层和Batch 层让 Lambda 架构兼顾了数据的准确性和时效性。另外Lambda架构中的 Batch 层沿用经典离线数仓的架构,在经典离线数仓迁移到 Lambda 架构时,Lambda 架构是可以兼容经典数仓架构的。

Lambda 架构的缺点就是一个需求会有两套代码,一套离线,一套实时,造成了开发成本和运维成本的浪费。第二就是资源的浪费,一份离线资源一份实时资源,两份资源导致整体资源占用比较多。第三个问题就是数据的 diff 问题,因为它是实时和离线两条流,它的数据逻辑本质是不可能完全对齐的,这也是 Lambda 架构最大的一个痛点。

图片

3. Kappa 架构介绍

Lambda 架构本身具有 Batch 和 Speed 两条流,Speed 层的数据准确性是不被信任的,主要需要 Batch 层来保证数据的准确性,随着流式计算框架的发展,不重不丢语义的支持,最终演变出了 Kappa 架构。

图片

Kappa 架构的核心思想就是去掉 Lambda 架构的 Batch 层,实时和离线使用同一套代码,通过一套代码来满足业务对数据的准确性和实时性的要求。

Kappa 架构也不是完美的,比如我们正常的业务会经常遇到一些口径的变更,这些口径的变更会带来数据回溯的问题,因为它是没有离线这条数据流的,数据回溯就需要靠数据重新消费来解决。这种回溯成本非常高,对整个系统的吞吐压力挑战比较大。

同时,随着业务的复杂度不断增加,它的数据源的复杂度也会增加,随着数据的增加,关联场景比较多,会面临各种复杂关联场景的挑战。关联越来越复杂,开发和维护的成本也会变得非常高。

第三个就是对于一些新业务,它的数仓基本是从零开始搭建,这个时候搭建一个 Kappa 架构会相对简单,但是我们的数仓建设其实都是有历史包袱的,并非一蹴而就,而是经过一定的演变过来的,这样在改造时就需要对一些存量逻辑做实时的改造,Kappa架构是没有 Batch 这条流的,需要完全替换调离线,这样的改造往往成本是非常高的,同时收益比较小,这也是 Kappa 架构很难大规范铺开的一个很关键原因。

图片

02 流批一体方案

1. 流批一体背景——旧架构

首先看一下我们的旧架构, 其实也一个 Lambda 架构,左边深蓝色是离线数据流,右边浅蓝这部分是实时数据流 。离线数据流最底层是我们的数据源层,数据源主要有两类,一类是日志的打点数据,比如展现的打点,点击的打点,活动的打点。另一类是来源于业务的 DB 数据库,比如 MySQL,主要是一些订单数据或者物料数据。

对于这些数据我们会进行采集,像打点数据一般会采用 Flume 这种日志采集工具,小时级或者天级采集到文件系统 HDFS 上。对于业务的数据库数据,一般会采用类似与 Sqoop 工具采集到文件系统上去。这两部数据采集到文件系统上之后,就是经过离线数据的清洗和处理,来构建我们的数据仓库,数据仓库也是采用经典的离线数据仓库架构,再在数据仓库上层承接多维分析以及数据报表需求。

右边实时流的日志打点数据采集,我们是把它写到消息队列里面去,并不是所有日志都采集到消息队列里,因为消息队列成本比文件系统高很多,所以有实时需求时我们才会把它采集到消息队列。对于业务 DB 数据也一样,也是根据业务时效性需要,将 Binlog 日志采集到消息队列里。数据写入到消息队列中之后就是流式计算环节,在流式计算环节中,主要会按照需求分别对数据进行加工,最终满足策略信号,实时报表,实时应用的一些需求。

图片

2. 流批一体背景——旧架构问题

旧架构从业务诉求角度来看有这么几个问题。

第一个问题就是,表的数量非常的多,尤其是比较复杂的业务,ODS 表可能有几百张,这时业务在使用起来就非常困难,尤其是一些人员的流动比较大,新人刚来对数仓没有那么熟悉,完全学习起来的成本很高。

第二个问题就是表的关联场景比较多,一次查询经常要关联几十张表,查询时效慢,这是业务比较敏感的一点。第三个问题是实时分析能力比较弱,因为只有一些定制的实时报表,只面向了某一个场景,缺少多维的实时数据分析能力。

图片

3. 流批一体整体方案

整体上我们的流批一体方案其实是采用了一种 Lambda 和 Kappa 的混合架构 。首先最底层的数据源,数据采集和数据存储这一块,其实没有大的变化, 最关键的变化主要是数据清洗和数仓这两个环节

首先,每个字段我们会根据它的使用场景的时效性要求,来确定这个字段是走实时流还是走离线流,比如某个字段时效性要求在分钟级别,这个时候就需要走实时流,而如果只是天级别,周级别那就没必要走实时。但是如果一个字段既有实时又有离线需求,这个时候实时和离线它不再是一个补充关系,而是一个替换关系,这样来避免 Lambda 架构中最经典的问题,离线实时两套代码导致的数据不一致问题。对于没有时效性需求的字段继续沿用离线的处理逻辑,如果强行切换到实时流,消息队列和流式计算的成本比较高,同时也会增加开发和维护的成本。

数据仓库也由之前的分层建模的方式,变成了宽表建模,实时字段和离线字段通过分钟级 Merge 成一张宽表。整体的建模思路也是不再面向数据源建模,而是面向最终业务侧的视角,业务使用去建模,那最关键的保证业务方在使用表的时候,表尽量少,尽量不做关联,降低使用和学习成本,同时最主要是查询性能,这是业务最敏感的一个点。然后对数据应用层,我们升级了一些数据查询引擎。

目前这个架构整个时效性基本是分钟级别。 端到端的耗时,根据数据量,关联逻辑的复杂程度不一样,它的时效性在 5 分钟到 20 分钟不等,满足了我们业务对于实时多维分析的诉求。总结起来,业务的诉求是导入时效要求是分钟级别,查询时效是秒级别。偶尔一些秒级别的延迟需求场景,比如类似大屏或者榜单,这部分我们还是会通过定制的实时报表实现。

图片

03 关键问题突破

1. DB 数据更新问题——背景

在方案落地过程中,第一个问题是 DB 更新数据的问题。因为典型的实时数仓,比较简单的一个场景就是纯日志场景,所有数据源都来自于日志的。

一个典型的解决方案是,原始日志经过实时采集,写入消息队列中,在流式计算环节,通过固定的时间窗口,把数据写入文件系统,然后经过上层的查询引擎去实时查询。日志数据有个特点就是它是不会变化的,日志在打印的那一刻数据就固定下来了,但是 DB 数据不一样,它是会更新的,比如订单的一些状态,收款退款,或者是一些物料属性,它都会随时发生变化。

但因为我们的分布式文件系统,往往它是不支持更新的,随着计算窗口的变大,存储能力和可维护性都会变差。



图片

2. DB 数据更新问题——解决方案

我们的解决方案,首先 DB 数据会采集变更信息 Binlog 日志,写入到消息队列中。然后在流式计算框架对变更的日志数据,比如通过分钟级窗口,写成一些 Delta 文件,来保存 DB 数据的变更过程。

在初次上线过程中,我们会把 DB 的全量数据进行归档,产生 Base 文件,然后会增加一个 Copy On Write 机制,通过滚动 5min 的合并将 Base 文件和 Delta 文件不断进行合并,不断产生最新的一个可用的版本。

这里我们没有采用 Merge On Read 的这个方案,关键原因是在我们的业务诉求,业务对我们的查询时效敏感,要求在秒级,如果使用 Merge On Read,就会导致我们在查询过程中需要更多的数据处理,查询时效会降低,所以这个一块我们牺牲了一部分数据导入性能,来尽量满足查询的性能要求。

图片

3. 多表关联问题——背景

第二个问题是多表关联的问题,一个典型场景是,有一张 DB 的主表,然后几张关联表,通过 Spark 或者其它的离线计算框架,把它们关联在一起然后写入文件系统中,供查询引擎进行查询。因为 DB 的数据一般记录数特别多,每一个表的数据量都非常大,那在关联的过程中可能会促发 Shuffle 机制,性能就会非常差。

图片

4. 多表关联问题——解决方案

我们的方案是采用三次关联。

首先对每一张表根据前面的方案产出一个 Base 文件和一个 Delta 文件,那 Base 文件其实就包含了主表和关联表的 Base 文件,是一个截止到某个时刻包含全部记录数的数据快照。Delta 文件它其实是主表或关联表在某一个流式计算时间窗口的变更记录。

第一次关联是用主表的 Delta 文件跟关联表的 Base 文件进行关联 ,这样就拿到这个主表在窗口内变更字段的全部状态,这个数据结果本写入到一个 tmpDelta 文件中。

第二次关联是将 tmpDelta 文件与主表的 Base 文件进行关联 ,这样就生成了在某个时间点的全部记录数的 tmpBase 文件,只不过有部分关联字段还没更新。

第三次关联就是将上步中生成的 tmpBase 与关联表的 Delta 文件进行关联 ,生成了一个新的可查询版本。关联表的变更字段在主表文件里进行更新解释,与上一个问题类似,会随五分钟的时间窗口不断滚动下去。

通常情况下,DB 数据的存量数据比较多,但增量数据比较少,所以 Base 文件基本上非常大,而 Delta 文件都会非常小,虽然是三次关联,但每次关联都存在小数据集,虽然整体关联多次,但整体的时效性还是满足预期的。

图片

5. DB 和日志关联问题——背景

第三个问题是日志 DB 数据和日志数据关联的问题。这个问题会更复杂一些,有一部分数据是来自日志打点,另外一部分数据是来自业务 DB 数据库,我们的宽表是同时依赖与这两份数据。

这个场景比较典型的方案就是 ,把 DB 数据写入到一个类似于高性能的缓存之中,在流式计算环节查询这个缓存,把需要依赖 DB 的那部分字段拼接上,最终再通过一个固定的窗口写入到文件系统,给查询引擎查询。

这里有两个问题,一个是要求缓存的容量比较大,能够缓存下所有的 DB 数据,另外一个是日志的吞吐非常高,也会导致在拼接字段的过程中频繁的查询这个高性能的缓存。这就对缓存的 QPS 和容量有比较高的要求,导致缓存的成本非常高。

图片

6. DB 和日志关联问题——解决方案

我们的解决方案是,首先对日志数据通过日志采集写入消息队列中,在流式计算环节,通过固定的窗口产出 Delta 文件,Delta 文件是不会变的,对于 DB 数据可能是一张表或者多张表,多张表就采用之前介绍的多表关联的方案,也是滚动产出一个可查询的版本

但这里也有一些变化,采用了一个冷数据分离的方案,因为日志的 Delta 文件它是分钟级滚动,合并的时候我们只合并热数据,对冷数据是会进行天级别的合并,这个实现的降级主要是为了以最低成本来满足业务最核心的诉求,因为业务最主要的诉求其实是热数据能够最快速到达,对于冷数据可以接受隔天查询。它最关键的是数据准确性一致,整体也参考了 Lambda 的架构思想,虽然有冷热数据不同的两次合并,但合并的逻辑是一致的,不需要同时写两套代码,只不过在资源层面,新增了一份全量冷数据的关联,会多消耗一些资源,但这也不可避免,这样既满足业务的需求,又不会过多的消耗资源。

图片

7. 数据到位时间问题——背景

我们建模从之前的分层建模改成了宽表建模,宽表建模最大的一个问题是数据到位时间问题。通常情况下宽表的产出需要依赖所有依赖表,实际情况数据源往往比较复杂,一部分表是实时产出,而其它表需要复杂逻辑处理,可能需要 T+1, 甚至 T+2 才能产出。

图片

8. 数据到位时间问题——解决方案

针对数据到位时间问题,我们给出的方案是数据分版本产出,字段按虚实化机制。对于一些原始日志,DB 数据,通过前面介绍的几个方案,基本可以保证我们数据字段分钟级产出可查询版本。对于时效性不敏感的字段,可以 T+1 产出生成一个 V2 的数据版本,而一些复杂计算的或者第三方回传的字段可以 T+2 产出生成最终的数据版本。同一张宽表,业务在使用时会展示字段的可用状态,预计产出的时间。

图片

04 总结和规划

架构选型,首先要符合业务的现状,解决业务实际问题,不要为了技术而做技术,关键是解决业务的实际问题,没有最好的架构,只有最适合我们业务的架构。第二就是架构选型需要综合考量资源复杂度与维护成本,尤其是资源,期望以最低的成本去解决业务痛点,把钱花在刀刃上。

未来规划一个是查询引擎性能持续优化,来提升查询体验;另一个是上层查询工具的体验优化。

图片

05 提问

Q1:流批一体架构下,数仓里的大宽表是否会存在离线数据和实时数据的拼接,还是会严格分开离线宽表和实时宽表?

A1:我们的宽表实时表和离线表是拼在一起的,是一张宽表,只不过是分版本产出。也是面向需求,如果一个字段有实时诉求,那这个字段是实时产出,如果对时效没有那么敏感,那可能就是天级别,周级别,甚至月级别都可以,只不过是多个版本。实时和离线使用的是一张宽表,这样查询的时候不需要再做二次拼接,保证查询的时效性足够快。

Q2:10 亿级多个大表关联中,Delta 文件不需要跟 Base 文件关联吗?

A2:Delta文件是需要与 Base 文件关联,只不过它有关联顺序,Delta 文件跟 Base文件关联,然后 Base 文件再跟 Delta 文件关联,然后得 Base 文件再跟其他 Delta 文件关联。有三次关联,这里边是包含 Base 文件和 Delta 文件关联的,因为 Delta 比较小,可以做一些 Mapjoin,Mapjoin 的性能会比较好一些,不像几个大表的直接关联,它的性能会比较差。

Q3:高性能查询缓存应用了什么技术?

A3:高性能缓存技术有很多,比如 Redis、HBase 等,但我们最终选择了文件系统,主要是出于成本的考虑,我们在没有增加太多资源的情况,就满足了业务的时效性诉求。成本对我们研发来说压力是比较大的,成本可能就是第一个比较要务的事情。

Q4:数仓使用了什么技术,Doris、Clickhouse、Hive 还是 Iceberg?

A4:我们没有使用开源的引擎,而是使用我们内部自研的图灵引擎,它是基于文件存储的一个引擎。

|分享嘉宾|

图片

郑德来

百度 资深研发工程师

郑德来,毕业于吉林大学,目前主要负责百度信息流、百家号、电商业务的数据建设工作。


本文地址:https://www.6aiq.com/article/1673188017615
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出