AIQ | Spark Streaming 场景应用 -Kafka 数据读取方式



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

概述

Spark Streaming 支持多种实时输入源数据的读取,其中包括 Kafka、flume、socket 流等等。除了 Kafka 以外的实时输入源,由于我们的业务场景没有涉及,在此将不会讨论。本篇文章主要着眼于我们目前的业务场景,只关注 Spark Streaming 读取 Kafka 数据的方式。 Spark Streaming 官方提供了两种方式读取 Kafka 数据:

➣一是 Receiver-based Approach。该种读取模式官方最先支持,并在 Spark 1.2 提供了数据零丢失 (zero-data loss) 的支持;

➣一是 Direct Approach (No Receivers)。该种读取方式在 Spark 1.3 引入。

此两种读取方式存在很大的不同,当然也各有优劣。接下来就让我们具体剖解这两种数据读取方式。

一、Receiver-based Approach

如前文所述,Spark 官方最先提供了基于 Receiver 的 Kafka 数据消费模式。但会存在程序失败丢失数据的可能,后在 Spark 1.2 时引入一个配置参数 spark.streaming.receiver.writeAheadLog.enable 以规避此风险。以下是官方的原话:

under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure.

Receiver-based 读取方式

Receiver-based 的 Kafka 读取方式是基于 Kafka 高阶 (high-level) api 来实现对 Kafka 数据的消费。在提交 Spark Streaming 任务后,Spark 集群会划出指定的 Receivers 来专门、持续不断、异步读取 Kafka 的数据,读取时间间隔以及每次读取 offsets 范围可以由参数来配置。读取的数据保存在 Receiver 中,具体 StorageLevel 方式由用户指定,诸如 MEMORY_ONLY 等。当 driver 触发 batch 任务的时候,Receivers 中的数据会转移到剩余的 Executors 中去执行。在执行完之后,Receivers 会相应更新 ZooKeeper 的 offsets。如要确保 at least once 的读取方式,可以设置 spark.streaming.receiver.writeAheadLog.enable 为 true。具体 Receiver 执行流程见下图:

Receiver-based 读取实现

Kafka 的 high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护 consumer 的 offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入 Spark Streaming 计算引擎时,我们优先考虑采用此种方式来读取数据,具体的代码如下:

/*读取kafka数据函数*/

  def getKafkaInputStream(zookeeper: String,

                            topic: String,

                            groupId: String,

                            numRecivers: Int,

                            partition: Int,

                            ssc: StreamingContext): DStream[String] = {

    val kafkaParams = Map(

      ("zookeeper.connect", zookeeper),

      ("auto.offset.reset", "largest"),

      ("zookeeper.connection.timeout.ms", "30000"),

      ("fetch.message.max.bytes", (1024 * 1024 * 50).toString),

      ("group.id", groupId)

    )

    val topics = Map(topic -> partition / numRecivers)

    val kafkaDstreams = (1 to numRecivers).map { _ =>

      KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,

        kafkaParams,

        topics,

        StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    }

    ssc.union(kafkaDstreams)

  }

如上述代码,函数 getKafkaInputStream 提供了 zookeeper, topic, groupId, numReceivers, partition 以及 ssc,其传入函数分别对应:

• zookeeper: ZooKeeper 连接信息

• topic: Kafka 中输入的 topic 信息

• groupId: consumer 信息

• numReceivers: 打算开启的 receiver 个数, 并用来调整并发

• partition: Kafka 中对应 topic 的分区数

以上几个参数主要用来连接 Kafka 并读取 Kafka 数据。具体执行的步骤如下:

• Kafka 相关读取参数配置,其中 zookeeper.connect 即传入进来的 zookeeper 参数;auto.offset.reset 设置从 topic 的最新处开始读取数据;zookeeper.connection.timeout.ms 指 zookeepr 连接超时时间,以防止网络不稳定的情况;fetch.message.max.bytes 则是指单次读取数据的大小;group.id 则是指定 consumer。

• 指定 topic 的并发数,当指定 receivers 个数之后,但是由于 receivers 个数小于 topic 的 partition 个数,所以在每个 receiver 上面会起相应的线程来读取不同的 partition。

• 读取 Kafka 数据,numReceivers 的参数在此用于指定我们需要多少 Executor 来作为 Receivers,开多个 Receivers 是为了提高应用吞吐量。

• union 用于将多个 Receiver 读取的数据关联起来

Receiver-based 读取问题

采用 Reveiver-based 方式满足我们的一些场景需求,并基于此抽象出了一些 micro-batch、内存计算模型等。在具体的应用场景中,我们也对此种的方式做了一些优化:

• 防数据丢失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable 参数;

• 提高 receiver 数据吞吐量。采用 MEMORY_AND_DISK_SER 方式读取数据、提高单 Receiver 的内存或是调大并行度,将数据分散到多个 Receiver 中去。

以上处理方式在一定程度上满足了我们的应用场景,诸如 micro-batch 以及内存计算模型等。但是同时因为这两方面以及其他方面的一些因素,导致也会出现各种情况的问题:

• 配置 spark.streaming.receiver.writeAheadLog.enable 参数,每次处理之前需要将该 batch 内的日志备份到 checkpoint 目录中,这降低了数据处理效率,反过来又加重了 Receiver 端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。

• 采用 MEMORY_AND_DISK_SER 降低对内存的要求。但是在一定程度上影响计算的速度

• 单 Receiver 内存。由于 receiver 也是属于 Executor 的一部分,那么为了提高吞吐量,提高 Receiver 的内存。但是在每次 batch 计算中,参与计算的 batch 并不会使用到这么多的内存,导致资源严重浪费。

• 提高并行度,采用多个 Receiver 来保存 Kafka 的数据。Receiver 读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。

• Receiver 和计算的 Executor 的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而 Receiver 则在一直接收数据,这非常容易导致程序崩溃。

• 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新 offsets 的情况,这导致数据重复消费。

为了回辟以上问题,降低资源使用,我们后来采用 Direct Approach 来读取 Kafka 的数据,具体接下来细说。

二、Direct Approach (No Receivers)

区别于 Receiver-based 的数据消费方法,Spark 官方在 Spark 1.3 时引入了 Direct 方式的 Kafka 数据消费方式。相对于 Receiver-based 的方法,Direct 方式具有以下方面的优势:

• 简化并行 (Simplified Parallelism)。 不现需要创建以及 union 多输入源,Kafka topic 的 partition 与 RDD 的 partition 一一对应,官方描述如下:

No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

**
• 高效 (Efficiency)。**Receiver-based 保证数据零丢失(zero-data loss) 需要配置 spark.streaming.receiver.writeAheadLog.enable, 此种方式需要保存两份数据,浪费存储空间也影响效率。而 Direct 方式则不存在这个问题。

Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.

**
• 强一致语义 (Exactly-once semantics)。**High-level 数据由 Spark Streaming 消费,但是 Offsets 则是由 Zookeeper 保存。通过参数配置,可以实现 at-least once 消费,此种情况有重复消费数据的可能。

The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

Direct 读取方式

Direct 方式采用 Kafka 简单的 consumer api 方式来读取数据,无需经由 ZooKeeper,此种方式不再需要专门 Receiver 来持续不断读取数据。当 batch 任务触发时,由 Executor 读取数据,并参与到其他 Executor 的数据计算过程中去。driver 来决定读取多少 offsets,并将 offsets 交由 checkpoints 来维护。将触发下次 batch 任务,再由 Executor 读取 Kafka 数据并计算。从此过程我们可以发现 Direct 方式无需 Receiver 读取数据,而是需要计算时再读取数据,所以 Direct 方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外 batch 任务堆积时,也不会影响数据堆积。其具体读取方式如下图:

Direct 读取实现

Spark Streaming 提供了一些重载读取 Kafka 数据的方法,本文中关注两个基于 Scala 的方法,这在我们的应用场景中会用到,具体的方法代码如下:

• 方法 createDirectStream 中,ssc 是 StreamingContext;kafkaParams 的具体配置见 Receiver-based 之中的配置,与之一样;这里面需要指出的是 fromOffsets ,其用来指定从什么 offset 处开始读取数据。

def createDirectStream[

    K: ClassTag,

    V: ClassTag,

    KD <: Decoder[K]: ClassTag,

    VD <: Decoder[V]: ClassTag,

    R: ClassTag] (

      ssc: StreamingContext,

      kafkaParams: Map[String, String],

      fromOffsets: Map[TopicAndPartition, Long],

      messageHandler: MessageAndMetadata[K, V] => R

  ): InputDStream[R] = {

    val cleanedHandler = ssc.sc.clean(messageHandler)

    new DirectKafkaInputDStream[K, V, KD, VD, R](

      ssc, kafkaParams, fromOffsets, cleanedHandler)

  }

• 方法 createDirectStream 中,该方法只需要 3 个参数,其中 kafkaParams 还是一样,并未有什么变化,不过其中有个配置 auto.offset.reset 可以用来指定是从 largest 或者是 smallest 处开始读取数据;topic 是指 Kafka 中的 topic,可以指定多个。具体提供的方法代码如下:

def createDirectStream[

    K: ClassTag,

    V: ClassTag,

    KD <: Decoder[K]: ClassTag,

    VD <: Decoder[V]: ClassTag] (

      ssc: StreamingContext,

      kafkaParams: Map[String, String],

      topics: Set[String]

  ): InputDStream[(K, V)] = {

    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

    val kc = new KafkaCluster(kafkaParams)

    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)

    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](

      ssc, kafkaParams, fromOffsets, messageHandler)

  }

在实际的应用场景中,我们会将两种方法结合起来使用,大体的方向分为两个方面:

**• 应用启动。** 当程序开发并上线,还未消费 Kafka 数据,此时从 largest 处读取数据,采用第二种方法;

**•应用重启。** 因资源、网络等其他原因导致程序失败重启时,需要保证从上次的 offsets 处开始读取数据,此时就需要采用第一种方法来保证我们的场景。

总体方向上,我们采用以上方法满足我们的需要,当然具体的策略我们不在本篇中讨论,后续会有专门的文章来介绍。从 largest 或者是 smallest 处读 Kafka 数据代码实现如下:

/**

    * 读取kafka数据,从最新的offset开始读

    *

    * @param ssc         : StreamingContext

    * @param kafkaParams : kafka参数

    * @param topics      : kafka topic

    * @return : 返回流数据

    */

private def getDirectStream(ssc: StreamingContext,

                            kafkaParams: Map[String, String],

                            topics: Set[String]): DStream[String] = {

  val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

    ssc,

    kafkaParams,

    topics

  )

  kafkaDStreams.map(_._2)

}

程序失败重启的逻辑代码如下:

/**

    * 如果已有offset,则从offset开始读数据

    *

    * @param ssc         : StreamingContext

    * @param kafkaParams : kafkaParams配置参数

    * @param fromOffsets : 已有的offsets

    * @return : 返回流数据

    */

private def getDirectStreamWithOffsets(ssc: StreamingContext,

                                       kafkaParams: Map[String, String],

                                       fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = {

  val kfkData = try {

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](

      ssc,

      kafkaParams,

      fromOffsets,

      (mmd: MessageAndMetadata[String, String]) => mmd.message()

    )

  } catch { //offsets失效, 从最新的offsets读。

    case _: Exception =>

    val topics = fromOffsets.map { case (tap, _) =>

      tap.topic

    }.toSet

    getDirectStream(ssc, kafkaParams, topics)

  }

  kfkData

}

代码中的 fromOffsets 参数从外部存储获取并需要处理转换,其代码如下:

val fromOffsets = offsets.map { consumerInfo =>

  TopicAndPartition(consumerInfo.topic, consumerInfo.part) -> consumerInfo.until_offset

}.toMap

该方法提供了从指定 offsets 处读取 Kafka 数据。如果发现读取数据异常,我们认为是 offsets 失败,此种情况去捕获这个异常,然后从 largest 处读取 Kafka 数据。

Direct 读取问题

在实际的应用中,Direct Approach 方式很好地满足了我们的需要,与 Receiver-based 方式相比,有以下几方面的优势:

**• 降低资源。**Direct 不需要 Receivers,其申请的 Executors 全部参与到计算任务中;而 Receiver-based 则需要专门的 Receivers 来读取 Kafka 数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

•  ** 降低内存。**Receiver-based 的 Receiver 与其他 Exectuor 是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高 Receiver 的内存,但是参与计算的 Executor 并无需那么多的内存。而 Direct 因为没有 Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的 10G 降至现在的 2-4G 左右。

• ** 鲁棒性更好。**Receiver-based 方法需要 Receivers 来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但 Receivers 却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其 Driver 在触发 batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

至于其他方面的优势,比如 简化并行 (Simplified Parallelism)、高效(Efficiency) 以及强一致语义 (Exactly-once semantics) 在之前已列出,在此不再介绍。虽然 Direct 有以上这些优势,但是也存在一些不足,具体如下:

**• 提高成本。**Direct 需要用户采用 checkpoint 或者第三方存储来维护 offsets,而不像 Receiver-based 那样,通过 ZooKeeper 来维护 Offsets,此提高了用户的开发成本。

**• 监控可视化。**Receiver-based 方式指定 topic 指定 consumer 的消费情况均能通过 ZooKeeper 来监控,而 Direct 则没有这种便利,如果做到监控并可视化,则需要投入人力开发。

总结

本文介绍了基于 Spark Streaming 的 Kafka 数据读取方式,包括 Receiver-based 以及 Direct 两种方式。两种方式各有优劣,但相对来说 Direct 适用于更多的业务场景以及有更好的可护展性。至于如何选择以上两种方式,除了业务场景外也跟团队相关,如果是应用初期,为了快速迭代应用,可以考虑采用第一种方式;如果要深入使用的话则建议采用第二种方式。本文只介绍了两种读取方式,并未涉及到读取策略、优化等问题。这些会在后续的文章中详细介绍。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com