Fork me on GitHub

Apache Flink 在斗鱼的应用与实践

作者: 夏畅@斗鱼 稿

摘要:本文整理自斗鱼实时计算负责人夏畅在 Flink Forward Asia 2021 行业实践专场的分享。

一、背景介绍

图片

斗鱼成立于 2014 年,是一家致力于为所有人带来欢乐的,弹幕式直播分享平台。在斗鱼,实时计算发展得并不算早。

2018 年前后,为了满足一些近实时数据需求,如 5 分钟、1 小时等场景,先后引入了 Spark streaming 和 Storm 技术。随着业务的持续发展,实时指标的需求愈加多样性,Spark streaming 和 Strom 也越加难以支持。

大概在 2019 年,斗鱼引入了 Flink 技术,起初以 Flink jar 的开发方式,来支持这类实时数据需求。但 Flink jar 的方式使用起来门槛和成本还是太高了。

在 19 年底 20 年初,设计开发落地了基于 K8s 的 Flink 实时计算平台,同时支持以 SQL 和 JAR 两种方式的作业开发,在内部这个平台称为 “玄武计算平台”。

图片

玄武计算平台上线后,支撑了不少业务场景,如广告、大屏,推荐、系统监控、风控,数据分析和实时标签等。

图片

截止到 2021 年 3 季度,斗鱼实时计算平台的用户数达到 100+,Vcore 达到 2000+,作业数达到 500+,日处理数据量超过千亿条。

图片

二、实时平台建设

在建设玄武实时计算平台之前,我们主要以 Flink jar 的方式开发,有以下几个痛点:

  • 开发门槛高;
  • 部署成本高;
  • 没有监控告警;
  • 没有作业版本管理。

基于以上四点,我们设计开发了自己的实时计算平台。

图片

玄武实时计算平台构建在 K8s 集群之上,支持多个 Flink 版本,一站式实时数据开发平台。架构上从上到下,可以分为四层:平台层、服务层、调度层、以及 K8s 集群层。

  • 平台层:提供包括元数据管理、作业管理、作业运维、案例示范、监控大盘、调度管理、告警管理等用户交互功能。
  • 服务层:分为 Flink 作业服务和 Flink 网关服务,提供 SQL 校验、SQL 调试、作业运行、作业停止、日志查询等能力。
  • 调度层:借助 K8s 的容器镜像,实现 Flink 多个版本的共存。每个 Flink 版本都对应一个 K8s 的镜像,从而实现作业版本的随时切换。当然,为了实现一个 SQL 在多个 Flink 版本下通用,我们还做了一层 SQL 的映射,主要为了解决 Flink 版本间 connector 的配置差异。此外,我们还在调度层内提供了完整的作业状态跟踪机制。
  • K8s 集群层:主要是提供基础的运行环境。

图片

上图是实时计算平台进行作业开发的实例图。可以看到整个平台提供如下能力:SQL 化作业开发、在线调试、语法校验、作业多版本、元数据管理、配置脱敏、集群管理、参数调优等。

搭建平台的过程中,我们也遇到了不少的挑战。

图片

第一个挑战是 Flink on K8s 集群的部署资源问题。方案上,我们是使用 Standalone Kubernetes 部署,实际是在 K8s 的集群中,创建了两个实例组。一个实例组用来运行 JM 进程,另一个实例组用来运行 TM 进程。两个实例组之间,通过设置 HA 的集群 id 相同来实现绑定。

  • JM 实例组运行多个 pod 时,除其中一个作为 master 节点外,其他的 pod 都将以 StandBy 的身份运行;
  • TM 实例组运行多个 pod 时,每一个 pod 都将注册到 JM 上,作为一个作业执行器存在。

为了使资源充分隔离,依托于 K8s 的能力,生产部署时,我们是一个作业创建一个 Flink 集群。我们知道 K8s 创建一个 pod 时,需要指定 CPU 和内存的设置。而 Flink 集群启动的时候,需要在 Flink-conf 文件指定 JM 和 TM 的资源配置。

在这个方案中,我们遇到的挑战就是如何统一设置 K8s 实例资源与 Flink 集群资源。

图片

为了解决这个问题,我们改造了 Flink 镜像启动脚本 entrypoint,在脚本中增加了两个操作:

  • 一个是拉取作业定义,以获取作业的运行配置;
  • 第二个是替换 flink-conf 文件 memory size 配置。

当然,在最新的 native kubenates 方案中,这个问题官方通过参数化配置解决了。

图片

平台遇到的第二个挑战,就是如何去监控每个作业的运行状态。方案上,我们将每个作业抽象成一条消息,存放在基于 ZK 开发的消息队列中。并且在消息队列虚化了 5 个状态,Accept、Running、Failed、Cancel 以及 Finish。

每个状态都有一个独立的线程池去监控消费。比如 Running 状态,线程池从消息队列中获取一条作业消息,从中解析 Flink 集群信息,获取 FlinkUI 域名,通过 K8s 的 Nginx Ingress,使用域名去访问 Flink JM Pod,从而获取运行作业的状态。当获取作业状态还是 Running 时,将重入队到队尾,否则将移动到对应状态队列下。

图片

实时计算平台上线初期,我们又遇到了新的挑战。在 Flink 的集群中,如何读取 Hive 表,以及如何使用 Hive-Udf 函数。

我们将一个 FlinkSQL 的提交拆分成三个部分:作业组装、上下文初始化和 SQL 执行。

作业组装,我们实现了 2 个方式:

  • 第一个是 SDK GET,通过 SDK 封装的方法,请求平台的服务层,去获取作业定义;
  • 第二个是 FILE GET,直接读取当前机器,指定路径下的 SQL 文件,生成作业定义。第二个方式主要是方便本地不依赖平台服务,可快速调试引擎。

上下文初始化部分,分为两个过程:

  • 一个是调优参数的设置,类似常用 HiveSQL 的 Set 命令;
  • 另外一个就是 Catalog 初始化,而 Flink 集群与 Hive 的集成,就是在整个环节实现的。

图片

以 Hive 为例,在 Catalog 注入之前,平台元数据管理模块有一个 Catalog 初始化的过程,预先将 Catalog 的创建语句存储起来。当一个 Flink 作业提交时,选择需要注入的 Catalog。创建 Catalog,并注册到 Flink 的上下文中,从而实现 Catalog 的元素注入。

图片

随着任务的增加,对于新手来说,在平台上开发 Flink 作业,从 SQL 编写到上线,往往需要改写数十个版本。平台缺少快速试错的能力。所以我们设计开发了实时监控、实时调试功能。

在架构方面,斗鱼引入了 Flink Gateway Server 对 Flink 集群接口二次分装。包含语法校验、SQL 提交、SQL 状态检查、SQL 停止、SQL mock 等功能。将 Flink 集群和网关服务的日志统一收集。通过预启动 Flink 集群,缩短作业启动时间,达到快速调试的能力。

图片

实时调试主要分为四个步骤,即 SQL 解析、规则校验、执行计划,和物理执行。

SQL mock 就是改写了原有的 SQL 解析过程。根据 SQL 解析后得到 Node 数,分析 SQL 的血缘关系,去判断 Source 来源表和 Sink 目的表。动态的将 Source 表改写为 dataGen 的数据源,和 Sink 表改写成 console 的数据源。

图片

动态修改 Source 和 Sink 表的配置,实现数据源的 mock。这个带来的好处是:线上开发 SQL 可直接用于调试,不需要修改,并且也不用担心会产生脏数据,可快速验证 SQL 逻辑是否符合预期。

图片

Flink 作业的监控告警,使用自定义 Metrics Reporter,将 metrics 指标上报到 Kafka 集群,继而使用 Flink 任务去消费 Kafka 里的 metrics 信息,完成如聚合、补充链路维度等操作,处理后的数据再推送到 Push Gateway,写入 Prometheus 中。最后监控大盘基于 Grafana 绘制。

图片

斗鱼的监控大盘分为资源监控,稳定性监控,Kafka 监控和 CPU 内存监控。

三、实时数仓探索

图片

第一版实时数仓方案,借鉴离线数仓的分层与开发思路,以 Kafka 作为中间层的数据存储。DB 和 LOG 数据分别经过 Canal 和打点服务写入 Kafka,作为实时数据的 ODS 层。

  1. 消费 ODS 层,使用 Flink 做维度补充和清洗等操作后,写回 Kafka,生成 DWD 层数据;
  2. 消费 DWD 层,以分钟、小时的窗口,和指定维度产生聚合数据,写回 Kafka,生成 DWS 层的数据;
  3. 最后消费 DWS 层的数据,写入到 HBase、MySQL、ES、Redis、ClickHouse 等数据源中,供数据服务使用。

图片

随着业务场景越来越多,这个方案显现出了四个问题:

  • Kafka 数据保留时间有限;
  • 离线、实时数据存储层不统一;
  • 中间层较难直接查询分析;
  • 数据回溯场景不友好。

图片

基于上诉问题,我们尝试了第二套方案,使用 Iceberg 作为中间层存储。利用前面提到的 Catalog 注入,我们注入了 Iceberg 的元数据,将 DWD、DWS 层使用 Iceberg 来存储。

这个方案解决了使用 Kafka 作为中间层的部分问题,但是又引入了新的问题。Flink 写入 Iceberg 表时,数据的可见性依赖 Checkpoint 的 Commit 操作。因此 Iceberg 数据的延迟取决于 Checkpoint 的周期。而 Checkpoint 是阻塞式操作,往往不建议设置过于小。也就是说 Iceberg 作为中间层会比 Kafka 延迟高。对于时延要求高的场景就不太适合。

图片

最终我们通过自定义元数据服务,维护库表的 Catalog 信息,以及动态注入 Catalog 能力,实现双方案并行。当然,我们也在继续探索更加便捷的方案去开发实时数仓。

四、未来发展与展望

图片

Flink 让实时计算更加简单,斗鱼在搭建实时计算平台过程中也并非一帆风顺。对于实时计算平台未来的发展,我们有三个展望:

  • 第一个是 Flink 的动态扩缩容,实现平台自动化,调整 Flink 作业资源,解决业务数据突增引起的问题;
  • 第二个是简化实时数仓开发模型,降低实时数仓开发门槛,在企业内,将实时数仓真正大规模推广使用;
  • 最后一个是完善实时数据质量监控体系,实现实时数据质量可验证与可追溯。

本文地址:https://www.6aiq.com/article/1647650726372
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出