Apache Flink 端到端(end-to-end)Exactly-Once 特性概览 (翻译)


本文地址:http://www.6aiq.com/article/1545571124974
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

原文地址: https://my.oschina.net/u/992559/blog/1819948
作者: moyiguke

Apache Flink 端到端(end-to-end)Exactly-Once 特性概览

本文是 flink 博文的翻译,原文链接https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

2017 年 12 月份发布的 Apache Flink 1.4 版本,引进了一个重要的特性:TwoPhaseCommitSinkFunction (关联 Jirahttps://issues.apache.org/jira/browse/FLINK-7210) ,它抽取了两阶段提交协议的公共部分,使得构建端到端 Excatly-Once 的 Flink 程序变为了可能。这些外部系统包括 Kafka0.11 及以上的版本,以及一些其他的数据输入(data sources)和数据接收 (data sink)。它提供了一个抽象层,需要用户自己手动去实现 Exactly-Once 语义。

如果仅仅是使用,可以查看这个文档https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html

如果想要了解更多,这篇文章我们会深入了解这个特性,以及 Flink 背后做的工作。

纵览全篇,有以下几点:

  • 描述 Flink checkpoints 的作用,以及它是如何保障 Flink 程序 Exactly-Once 的语义的。
  • 展现 Flink 如何与两阶段提交协议与输入输出(data sources and data sinks)交互,借此传递端到端的 Exactly-Once 语义保证。
  • 通过一个简单的例子来展现如何使用 TwoPhaseCommitSinkFunction,来实现 Exactly-Once 的文件输出(file sink)。

Apache Flink 程序的 Exactly-Once 语义

当我们在讨论 Exactly-Once 语义的时候,我们指的是每一个到来的事件仅会影响最终结果一次。就算机器宕机或者软件崩溃,即没有数据重复,也没有数据丢失。

Flink 很久之前就提供了 Exactly-Once 语义。在过去的几年时间里,我们对 Flink 的 checkpoint 做了深入的描述 ,这个是 Flink 能够提供 Exactly-Once 语义的核心。Flink 文档也对这个特性做了深入的介绍 。

在我们继续之前,有一个关于 checkpoint 算法的简要介绍,这对于了解更广的主题来说是十分必要的。

一个 checkpoint 是 Flink 的一致性快照,它包括:

  1. 程序当前的状态
  2. 输入流的位置

Flink 通过一个可配置的时间,周期性的生成 checkpoint,将它写入到存储中,例如 S3 或者 HDFS。写入到存储的过程是异步的,意味着 Flink 程序在 checkpoint 运行的同时还可以处理数据。

在机器或者程序遇到错误重启的时候,Flink 程序会使用最新的 checkpoint 进行恢复。Flink 会恢复程序的状态,将输入流回滚到 checkpoint 保存的位置,然后重新开始运行。这意味着 Flink 可以像没有发生错误一样计算结果。

在 Flink 1.4.0 版本之前,Flink 仅保证 Flink 程序内部的 Exactly-Once 语义,没有扩展到在 Flink 数据处理完成后存储的外部系统。

Flink 程序可以和不同的接收器(sink)交互,开发者需要有能力在一个组件的上下文中维持 Exactly-Once 语义。

为了提供端到端 Exactly-Once 语义,除了 Flink 应用程序本身的状态,Flink 写入的外部存储也需要满足这个语义。也就是说,这些外部系统必须提供提交或者回滚的方法,然后通过 Flink 的 checkpoint 来协调。

在分布式系统中,协调提交和回滚的通用做法是两阶段提交。接下来,我们讨论 Flink 的 TwoPhaseCommitSinkFunction 如何使用两阶段提交协议来保证端到端的 Exactly-Once 语义。

Flink 程序端到端的 Exactly-Once 语义

我们简略的看一下两阶段提交协议,以及它如何在一个读写 Kafka 的 Flink 实例程序中提供端到端的 Exactly-Once 语义。Kafka 是一个流行的消息中间件,经常被拿来和 Flink 一起使用,Kafka 在最近的 0.11 版本中添加了对事务的支持。这意味着现在 Flink 读写 Kafka 有了必要的支持,使之能提供端到端的 Exactly-Once 语义。

Flink 对端到端的 Exactly-Once 语义不仅仅局限在 Kafka,你可以使用任一输入输出源(source、sink),只要他们提供了必要的协调机制。例如Pravega ,来自 DELL/EMC 的流数据存储系统,通过 Flink 的 TwoPhaseCommitSinkFunction 也能支持端到端的 Exactly-Once 语义。

在这个示例程序中,我们有:

  • 从 Kafka 读取数据的 data source(KafkaConsumer,在 Flink 中)
  • 窗口聚合
  • 将数据写回到 Kafka 的 data sink(KafkaProducer,在 Flink 中)

在 data sink 中要保证 Exactly-Once 语义,它必须将所有的写入数据通过一个事务提交到 Kafka。在两个 checkpoint 之间,一个提交绑定了所有要写入的数据。

这保证了当出错的时候,写入的数据可以被回滚。



然而在分布式系统中,通常拥有多个并行执行的写入任务,简单的提交和回滚是效率低下的。为了保证一致性,所有的组件必须先达成一致,才能进行提交或者回滚。Flink 使用了两阶段提交协议以及预提交阶段来解决这个问题。

在 checkpoint 开始的时候,即两阶段提交中的预提交阶段。首先,Flink 的 JobManager 在数据流中注入一个 checkpoint 屏障(它将数据流中的记录分割开,一些进入到当前的 checkpoint,另一些进入下一个 checkpoint)。

屏障通过 operator 传递。对于每一个 operator,它将触发 operator 的状态快照写入到 state backend。

data source 保存了 Kafka 的 offset,之后把 checkpoint 屏障传递到后续的 operator。

这种方式仅适用于 operator 有他的内部状态。内部状态是指,Flink state backends 保存和管理的内容 - 举例来说,第二个 operator 中 window 聚合算出来的 sum。当一个进程有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交。 

然而,当一个进程有外部状态的时候,需要用一种不同的方式来处理。外部状态通常由需要写入的外部系统引入,例如 Kafka。因此,为了提供 Exactly-Once 保证,外部系统必须提供事务支持,借此和两阶段提交协议交互。

我们知道在我们的例子中,由于需要将数据写到 Kafka,data sink 有外部的状态。因此,在预提交阶段,除了将状态写入到 state backend 之外,data sink 必须预提交自己的外部事务。

当 checkpoint 屏障在所有 operator 中都传递了一遍,以及它触发的快照写入完成,预提交阶段结束。这个时候,快照成功结束,整个程序的状态,包括预提交的外部状态是一致的。万一出错的时候,我们可以通过 checkpoint 重新初始化。

下一步是通知所有 operator,checkpoint 已经成功了。这时两阶段提交中的提交阶段,Jobmanager 为程序中的每一个 operator 发起 checkpoint 已经完成的回调。data source 和 window operator 没有外部的状态,在提交阶段中,这些 operator 不会执行任何动作。data sink 拥有外部状态,所以通过事务提交外部写入。

让我们对上述的知识点汇总一下:

  • 一旦所有的 operator 完成预提交,就提交一个 commit。
  • 如果至少有一个预提交失败,其他的都会失败,这时回滚到上一个 checkpoint 保存的位置。
  • 预提交成功后,提交的 commit 也需要保障最终成功 -operator 和外部系统需要提供这个保障。如果 commit 失败了(比如网络中断引起的故障),整个 flink 程序也因此失败,它会根据用户的重启策略重启,可能还会有一个尝试性的提交。这个过程非常严苛,因为如果提交没有最终生效,会导致数据丢失。

因此,我们可以确定所有的 operator 同意 checkpoint 的最终结果:要么都同意提交数据,要么提交被终止然后回滚。

在 Flink 程序中实现两阶段提交

完整的实现两阶段提交协议可能会有一点复杂,因此 Flink 将通用逻辑提取到一个 abstract 的类 TwoPhaseCommitSinkFunction。

让我们通过一个简单的文件操作例子来说明如何使用 TwoPhaseCommitSinkFunction。我们只需要实现四个 method,并使 sink 呈现 Exactly-Once 语义。

  1. beginTransaction - 在事务开始前,我们在目标文件系统上面的临时目录上创建一个临时文件。随后,我们在程序处理的时候可以将数据写入到这个文件。
  2. preCommit - 在预提交阶段,我们刷新文件到磁盘,关闭文件,不要重新打开写入。我们也会为下一个 checkpoint 的文件写入开启一个新的事务。
  3. commit - 在提交阶段,我们原子性的将预提交阶段的文件移动到真正的目标目录。需要注意的是,这增加了输出数据的可见性的延迟。
  4. abort - 在终止阶段,我们删除临时文件。

我们知道,如果步骤中有任何错误,Flink 会通过最新的 checkpoint 来恢复程序状态。在一个罕见的场景中,预提交成功了,在通知到达 operator 之前失败了。这时候,Flink 将 operator 的状态恢复到预提交阶段,即还未真正提交的时候。

为了能在重启的时候能够正确的终止或者提交事务,我们需要在预提交阶段将足够的信息保存到 checkpoint 中。在这个例子中,这些信息是临时文件以及目标目录的地址。

TwoPhaseCommitSinkFunction 已经把这个场景考虑进去了,在从 checkpoint 恢复的时候,它会优先提交一个 commit。我们的任务是将 commit 实现成一个幂等的操作。一般的,这不是难题。在这个例子中,我们可以发现这种情况:临时文件不在临时目录中,但是已经移动到目标目录了。

在 TwoPhaseCommitSinkFunction 中,有一些其他的边界条件也考虑在内了。通过Flink 文档查看更多。

总结

如果你看到了这么后面,很感谢你通读这个详细的帖子。以下是我们主要覆盖的关键点:

  • Flink 的 checkpoint 系统是它支撑两阶段协议和保障 Exactly-Once 语义的基础设施,
  • 这种实现方案的优点是,Flink 不像其他系统那样,通过网络传输存储数据 - 它不需要像大部分批处理程序那样,将每一个计算结果保存到磁盘。
  • Flink 的 TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用部分,通过这个方法结合 Flink 以及支持事务的外部系统,可以构建端到端的 Exactly-Once 程序。
  • Flink 1.4.0开始,Pravega 和 Kafka 0.11 producer 提供了 Exactly-Once 语义;通过 Kafka 在 0.11 版本第一次引入的事务,为在 Flink 中使用 Kafka producer 提供 Exactly-Once 语义提供了可能性。
  • Kafka 0.11 版本的 producer 是在 TwoPhaseCommitSinkFunction 基础上实现的,它 at-least-once 的 producer 的基础上增加了很小的开销。

我们为这个特性能提供的功能感到很兴奋,今后希望能找到更多支持 TwoPhaseCommitSinkFunction 的 producer。


本文地址:http://www.6aiq.com/article/1545571124974
知乎专栏 点击关注
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出