菜鸟网络实时数仓 2.0 进阶之路

分享嘉宾:张庭 菜鸟 数据工程师
文章整理:comn
出品平台:DataFunTalk

导读: 供应链物流场景下的业务复杂度高,业务链路长,节点多,实体多,实时数仓建设难度高。菜鸟跨境进口业务场景更是如此,更复杂的场景带来更复杂的实体数据模型,对接的业务系统多导致 ETL 流程特别复杂,还有海量的日均处理数据量,使得团队在建设进口实时数仓的过程中,面临着诸多挑战:如何保证复杂实体关系下的数据准确性?如何降低多数据源情况下的数据处理复杂度?如何提升实时多流 Join 的处理效率?如何实现实时超时统计?如何实现异常情况下的数据状态恢复?本文主要分享菜鸟进口实时数仓的升级经验,以及如何利用 Flink 的特性解决在开发实践中遇到的问题。

主要内容包括:

  • 相关背景介绍
  • 进口实时数仓演进过程
  • 挑战及实践
  • 总结与展望

01 相关背景介绍

1. 进口业务简介

进口业务的流程大致比较清晰,国内的买家下单之后,国外的卖家发货,经过清关,干线运输,到国内的清关,配送,到消费者手里,菜鸟在整个过程中负责协调链路上的各个资源,完成物流履约的服务。去年考拉融入到阿里体系之后,整个进口业务规模占国内进口单量的规模是非常高的。并且每年的单量都在迅速增长,订单履行周期特别长,中间涉及的环节多,所以在数据建设时,既要考虑把所有数据融合到一起,还要保证数据有效性,是非常困难的一件事情。

2. 实时数仓加工流程

① 一般过程

下面简单介绍一下实时数仓的加工流程,一般会对接业务库或者日志源,通过数据同步的方式,比如 Sqoop 或 DataX 把消息同步到消息中间件中暂存,下游会接一个实时计算引擎,对消息进行消费,消费之后会进行计算、加工,产出一些明细表或汇总指标,放到查询服务上供数据应用端使用。

② 菜鸟内部流程

在菜鸟内部也是同样的流程,我们将业务库数据通过 DRC ( 数据备份中心 ) 增量采集 Binlog 日志的方式,同步到 TT ( 类似 Kafka 的消息中间件 ) 做一个消息暂存,后面会接一个 Flink 实时计算引擎进行消费,计算好之后写入两种查询服务,一种是 ADB,一种是 HBase ( Lindorm ),ADB 是一个 OLAP 引擎,阿里云对外也提供服务,主要是提供一些丰富的多维分析查询,写入的也是一些维度比较丰富的轻度汇总或明细数据,对于实时大屏的场景,因为维度比较少,指标比较固定,我们会沉淀一些高度汇总指标写到 HBase 中供实时大屏使用。

02 进口实时数仓演进过程

接下来讲一下进口实时数仓的演进过程:

2014 年: 进口业务线大概在 14 年时,建好了离线数仓,能提供日报。

2015 年: 能提供小时报,更新频度从天到小时。

2016 年: 基于 JStorm 探索了一些实时指标的计算服务,越来越趋向于实时化。由于 16 年刚开始尝试实时指标,指标还不是特别丰富。

2017 年: 菜鸟引进了 Blink,也就是 Flink 在阿里的内部版本,作为我们的流计算引擎,并且进口业务线在同一年打通了实时明细,通过实时明细大宽表对外提供数据服务。

2018 年: 完成了菜鸟进口实时数仓 1.0 的建设。

**2020 年:**开始了实时数仓 2.0 的建设,为什么开始 2.0?因为 1.0 在设计过程中存在了很多问题,整个模型架构不够灵活,扩展性不高,还有一些是因为没有了解 Blink 的特性,导致误用带来的一些运维成本的增加,所以后面进行了大的升级改造。

1. 实时数仓 1.0

接下来讲一下实时数仓 1.0 的情况,一开始因为在发展初期,业务模式不太稳定,所以一开始的策略就是围绕业务小步快跑,比如针对业务 1 会开发一套实时明细层,针对业务 2 也会开发一套实时任务,好处是可以随着业务发展快速迭代,互相之间不影响,早期会更灵活。

如上图右侧所示,最底层是各个业务系统的消息源,实时任务主要有两层,一层是实时明细层,针对业务线会开发不同的明细表,明细表就是针对该条业务线需要的数据把它抽取过来,在这之上是 ADM 层,也就是实时应用层,应用层主要针对具体的场景定制,比如有个场景要看整体汇总指标,则从各个明细表抽取数据,产生一张实时汇总层表,整个过程是竖向烟囱式开发,模型比较混乱,难扩展,并且存在很多重复计算。

后面也是由于重复计算的问题,进行了一层抽象,加了一个前置中间层,对公共的部分进行提取,但是治标不治本,整个模型还是比较混乱的,数据建设上也没有进行统一,模型扩展性上也很差。

2. 实时数仓 2.0

2.0 升级完之后是比较清晰的一张图:

  • 前置层: 底层数据源会接入到前置中间层,屏蔽掉底层一些非常复杂的逻辑。
  • 明细层: 前置层会把比较干净的数据给到明细表,明细层打通了各个业务线,进行了模型的统一。
  • 汇总层: 明细层之上会有轻度汇总和高度汇总,轻度汇总表维度非常多,主要写入到 OLAP 引擎中供多维查询分析,高度汇总指标主要针对实时大屏场景进行沉淀。
  • 接口服务: 汇总层之上会根据统一的接口服务对外提供数据输出。
  • 数据应用: 应用层主要接入包括实时大屏,数据应用,实时报表以及消息推送等。

这就是实时数仓 2.0 升级之后的模型,整个模型虽然看起来比较简单,其实背后从模型设计到开发落地,遇到了很多困难,花费了很大的精力。下面为大家分享下我们在升级过程中遇到的挑战及实践。

03 挑战及实践

我们在实时数仓升级的过程中,面临的挑战如下:

1. 业务线和业务模式多

第一个就是对接的业务线比较多,不同的业务线有不同的模式,导致一开始小步快跑方式的模型比较割裂,模型和模型之间没有复用性,开发和运维成本都很高,资源消耗严重。

解决方案:逻辑中间层升级

我们想到的比较简单的思路就是建设统一的数据中间层,比如业务 A 有出库、揽收、派送等几个业务节点,业务 B 可能是另外几个节点,整个模型是割裂的状态,但实际上业务发展到中后期比较稳定的时候,各个业务模式之间相对比较稳定,这个时候可以对数据进行一个抽象,比如业务 A 有节点 1、节点 5 和其他几个业务模式是一样的,通过这种对齐的方式,找出哪些是公共的,哪些是非公共的,提取出来沉淀到逻辑中间层里,从而屏蔽各业务之间的差距,完成统一的数据建设。把逻辑中间层进行统一,还有一个很大的原因,业务 A,B,C 虽然是不同的业务系统,比如履行系统,关务系统,但是本质上都是同一套,底层数据源也是进行各种抽象,所以数仓建模上也要通过统一的思路进行建设。

2. 业务系统多,超大数据源

第二个就是对接的系统非常多,每个系统数据量很大,每天亿级别的数据源就有十几个,梳理起来非常困难。带来的问题也比较明显,第一个问题就是大状态的问题,需要在 Flink 里维护特别大的状态,还有就是接入这么多数据源之后,成本怎么控制。

解决方案:善用 State

State 是 Flink 的一大特性,因为它才能保证状态计算,需要更合理的利用。我们要认清 State 是干什么的,什么时候需要 State,如何优化它,这些都是需要考虑的事情。State 有两种,一种是 KeyedState,具体是跟数据的 Key 相关的,例如 SQL 中的 Group By,Flink 会按照键值进行相关数据的存储,比如存储到二进制的一个数组里。第二个是 OperatorState,跟具体的算子相关,比如用来记录 Source Connector 里读取的 Offset,或者算子之间任务 Failover 之后,状态怎么在不同算子之间进行恢复。

① 数据接入时"去重"

下面举个例子,怎么用到 KeyedState,比如物流订单流和履行日志流,两个作业关联产生出最终需要的一张大表,Join 是怎么存储的呢?流是一直不停的过来的,消息到达的前后顺序可能不一致,需要把它存在算子里面,对于 Join 的状态节点,比较简单粗暴的方式是把左流和右流同时存下来,通过这样的方式保证不管消息是先到还是后到,至少保证算子里面数据是全的,哪怕其中一个流很晚才到达,也能保证匹配到之前的数据,需要注意的一点是,State 存储根据上游不同而不同,比如在上游定义了一个主键 Rowkey,并且 JoinKey 包含了主键,就不存在多笔订单对应同一个外键,这样就告诉 State 只需要按照 JoinKey 存储唯一行就可以了。如果上游有主键,但是 JoinKey 不包含 Rowkey 的话,就需要在 State 里将两个 Rowkey 的订单同时存下来。最差的情况是,上游没有主键,比如同一笔订单有 10 条消息,会有先后顺序,最后一条是有效的,但是对于系统来说不知道哪条是有效的,没有指定主键也不好去重,它就会全部存下来,特别耗资源和性能,相对来说是特别差的一种方式。

因此,我们在数据接入时进行"去重"。数据接入时,按照 row_number 进行排序,告诉系统按照主键进行数据更新就可以了,解决 10 条消息不知道应该存几条的问题。在上面这个 case 里面,就是按照主键进行更新,每次取最后一条消息。

按照 row_number 这种方式并不会减少数据处理量,但是会大大减少 State 存储量,每一个 State 只存一份有效的状态,而不是把它所有的历史数据都记录下来。

② 多流 join 优化

第二个是多流 Join 的优化,比如像上图左侧的伪代码,一张主表关联很多数据源产生一个明细大宽表,这是我们喜欢的方式,但是这样并不好,为什么呢?这样一个 SQL 在实时计算里会按照双流 Join 的方式依次处理,每次只能处理一个 Join,所以像左边这个代码里有 10 个 Join,在右边就会有 10 个 Join 节点,Join 节点会同时将左流和右流的数据全部存下来,所以会看到右边这个图的红框里,每一个 Join 节点会同时存储左流和右流的节点,假设我们订单源有 1 亿,里面存的就是 10 亿,这个数据量存储是非常可怕的。



另外一个就是链路特别长,不停的要进行网络传输,计算,任务延迟也是很大的。像十几个数据源取数关联在一起,在我们的实际场景是真实存在的,而且我们的关联关系比这个还要更复杂。

那我们怎么优化呢?我们采用 Union All 的方式,把数据错位拼接到一起,后面加一层 Group By,相当于将 Join 关联转换成 Group By,它的执行图就像上图右侧这样,黄色是数据接入过程中需要进行的存储,红色是一个 Join 节点,所以整个过程需要存储的 State 是非常少的,主表会在黄色框和红色框分别存一份,别看数据源非常多,其实只会存一份数据,比如我们的物流订单是 1000 万,其他数据源也是 1000 万,最终的结果有效行就是 1000 万,数据存储量其实是不高的,假设又新接了数据源,可能又是 1000 万的日志量,但其实有效记录就是 1000 万,只是增加了一个数据源,进行了一个数据更新,新增数据源成本近乎为 0,所以用 Union All 替换 Join 的方式在 State 里是一个大大的优化。

3. 取数外键多,易乱序

第三个是取数外键多,乱序的问题,乱序其实有很多种,采集系统采集过来就是乱序的,或者传输过程中导致的乱序,我们这边要讨论的是,在实际开发过程中不小心导致的乱序,因为其他层面的东西平台已经帮我们考虑好了,提供了很好的端到端的一致性保证。

举个例子比如说有两个单子都是物流单,根据单号取一些仓内的消息,消息 1 和消息 2 先后进入流处理里面,关联的时候根据 JoinKey 进行 Shuffle,在这种情况下,两个消息会流到不同的算子并发上,如果这两个并发处理速度不一致,就有可能导致先进入系统的消息后完成处理,比如消息 1 先到达系统的,但是处理比较慢,消息 2 反倒先产出,导致最终的输出结果是不对的,本质上是多并发场景下,数据处理流向的不确定性,同一笔订单的多笔消息流到不同的地方进行计算,就可能会导致乱序。

所以,同一笔订单消息处理完之后,如何保证是有序的?

上图是一个简化的过程,业务库流入到 Kafka,Binlog 日志是顺序写入的,需要采用一定的策略,也是顺序采集,可以根据主键进行 Hash 分区,写到 Kafka 里面,保证 Kafka 里面每个分区存的数据是同一个 Key,首先在这个层面保证有序。然后 Flink 消费 Kafka 时,需要设置合理的并发,保证一个分区的数据由一个 Operator 负责,如果一个分区由两个 Operator 负责,就会存在类似于刚才的情况,导致消息乱序。另外还要配合下游的应用,能保证按照某些主键进行更新或删除操作,这样才能保证端到端的一致性。

Flink 已经配合上下游系统已经帮我们实现了端到端的一致性功能,我们只需要保证内部处理任务不能乱序。我们的解法是避免 Join Key 发生变化,如提前通过特殊映射关系把 Join Key 变为业务主键,来保证任务处理是有序的。

4. 统计指标依赖明细,服务压力大

另外一个难点就是我们的很多统计指标都依赖明细,主要是一些实时统计,这种风险比较明显,服务端压力特别大,尤其是大促时,极其容易把系统拖垮。

实时超时统计就是一个典型的场景,比如说会有这样两笔订单,一笔订单 1 点钟创建了物流订单,2 点钟进行出库,如何统计超 6 小时未揽收的收单量,因为没有消息就无法触发计算,Flink 是基于消息触发的,比如说 2 点钟出库了,那理论上在 8 点钟的时候超 6 小时未揽收的单量要加 1,但是因为没有消息触发,下游系统不会触发计算,这是比较难的事情,所以一开始没有特别好的方案,我们直接从明细表出,比如订单的出库时间是 2 点钟,生成这条明细之后,写到数据库的 OLAP 引擎里,和当前明细进行比较计算。

我们也探索了一些方案比如基于消息中间件,进行一些定时超时消息下发,或者也探索过基于 Flink CEP 的方式,第一种方式需要引入第三方的中间件,维护成本会更高,CEP 这种方式采用时间窗口稳步向前走,像我们这种物流场景下会存在很多这样的情况,比如回传一个 2 点出库的时间,后面发现回传错了,又会补一个 1 点半的时间,那么我们需要重新触发计算,Flink CEP 是不能很好的支持的。后面我们探索了基于 Flink Timer Service 这种方式,基于 Flink 自带的 Timer Service 回调方法,来制造一个消息流,首先在我们的方法里面接入数据流,根据我们定义的一些规则,比如出库时间是 2 点,会定义 6 小时的一个超时时间,注册到 Timer Service 里面,到 8 点会触发一次比较计算,没有的话就会触发一个超时消息,整个方案不依赖第三方组件,开发成本比较低。

5. 履行环节多,数据链路长

另外一个难点就是我们的履行环节比较多,数据链路比较长,导致异常情况很难处理。比如消息要保留 20 多天的有效期,State 也要存 20 多天,状态一直存在 Flink 里面,如果某一天数据出现错误或者逻辑加工错误,追溯是个很大问题,因为上游的消息系统一般保持三天数据的有效期。

这边说几个真实的案例。

案例 1:

我们在双十一期间发现了一个 Bug,双十一已经过去好几天了,因为我们的履行链路特别长,要 10~20 天,第一时间发现错误要改已经改不了了,改了之后 DAG 执行图会发生变化,状态就无法恢复,而且上游只能追 3 天的数,改了之后相当于上游的数全没了,这是不能接受的。

案例 2:

疫情期间的一些超长尾单,State 的 TTL 设置都是 60 天,我们认为 60 天左右肯定能够全部完结,后来发现超过 24 天数据开始失真,明明设置的有效期是 60 天,后来发现底层 State 存储用的是 int 型,所以最多只能存 20 多天的有效期,相当于触发了 Flink 的一个边界 case,所以也证明了我们这边的场景的确很复杂,很多状态需要超长的 State 生命周期来保证的。

案例 3:

每次代码停止升级之后,状态就丢失了,需要重新拉取数据计算,但是一般上游的数据只保留 3 天有效期,这样的话业务只能看 3 天的数据,用户体验很不好。

解决方案:批流混合

我们怎么做?

采用批流混合的方式来完成状态复用,基于 Blink 流处理来处理实时消息流,基于 Blink 的批处理完成离线计算,通过两者的融合,在同一个任务里完成历史所有数据的计算,举个例子,订单消息流和履行消息流进行一个关联计算,那么会在任务里增加一个离线订单消息源,跟我们的实时订单消息源 Union All 合并在一起,下面再增加一个 Group By 节点,按照主键进行去重,基于这种方式就可以实现状态复用。有几个需要注意的点,第一个需要自定义 Source Connector 去开发,另外一个涉及到离线消息和实时消息合并的一个问题,GroupBy 之后是优先取离线消息还是实时消息,实时消息可能消费的比较慢,哪个消息是真实有效的需要判断一下,所以我们也定制了一些,比如 LastValue 来解决任务是优先取离线消息还是实时消息,整个过程是基于 Blink 和 MaxCompute 来实现的。

6. 一些小的 Tips

① 消息下发无法撤回问题

第一个就是消息一旦下发无法撤回,所以有些订单一开始有效,后面变成无效了,这种订单不应该在任务中过滤,而是打上标记下传,统计的时候再用。

② 增加数据版本,数据处理时间以及数据处理版本

  • 数据版本是消息结构体的版本定义,避免模型升级后,任务重启读到脏数据。
  • 处理时间就是消息当前的处理时间,比如消息回流到离线,我们会按照主键进行时间排序,取到最新记录,通过这种方式还原一份准实时数据。
  • 增加数据处理版本是因为即使到毫秒级也不够精确,无法区分消息的前后顺序。

③ 实时对数方案

实时对数方案有两个层面,实时明细和离线明细,刚刚也提到将实时数据回流到离线,我们可以看当前 24 点前产生的消息,因为离线 T+1 只能看到昨天 23 点 59 分 59 秒的数据,实时也可以模拟,我们只截取那个时刻的数据还原出来,然后实时和离线进行对比,这样也可以很好的进行数据比对,另外可以进行实时明细和实时汇总对比,因为都在同一个 DB 里,对比起来也特别方便。

03 总结与展望

1. 总结

简单做下总结:

  • 模型与架构:好的模型和架构相当于成功了 80%。
  • 准确性要求评估:需要评估数据准确性要求,是否真的需要对齐 CheckPoint 或者一致性的语义保证,有些情况下保证一般准确性就 ok 了,那么就不需要这么多额外消耗资源的设计。
  • 合理利用 Flink 特性:需要合理利用 Fink 的一些特性,避免一些误用之痛,比如 State 和 CheckPoint 的使用。
  • 代码自查:保证数据处理是正常流转的,合乎目标。
  • SQL 理解:写 SQL 并不是有多高大上,更多考验的是在数据流转过程中的一些思考。

2. 展望

① 实时数据质量监控

实时处理不像批处理,批处理跑完之后可以在跑个小脚本统计一下主键是否唯一,记录数波动等,实时的数据监控是比较麻烦的事情·。

② 流批统一

流批统一有几个层面,第一个就是存储层面的统一,实时和离线写到同一个地方去,应用的时候更方便。第二个就是计算引擎的统一,比如像 Flink 可以同时支持批处理和流处理,还能够写到 Hive 里面。更高层次的就是可以做到处理结果的统一,同一段代码,在批和流的语义可能会不一样,如何做到同一段代码,批和流的处理结果是完全统一的。

③ 自动调优

自动调优有两种,比如在大促的时候,我们申请了 1000 个 Core 的资源,1000 个 Core 怎么合理的分配,哪些地方可能是性能瓶颈,要多分配一些,这是给定资源的自动调优。还有一种比如像凌晨没什么单量,也没什么数据流量,这个时候可以把资源调到很小,根据数据流量情况自动调整,也就是自动伸缩能力。

以上是我们整体对未来的展望和研究方向。


本文地址:https://www.6aiq.com/article/1605221673311
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出