首先我们梳理Exactly once所涉及到的相关内容,并明确一个讨论基础: 首先要明确的是,spark streaming是一个计算框架,大部分的工作其实 都是数据转换或者统计,不会涉及到外部系统的交互,只有最后统计完 的结果可能需要保存下来,推到外部系统里面。

  1. 将Spark计算引擎看作是一个整体
  2. Spark Streaming(以下简写SS)需要对接上下游
  3. Exactly once语义(以下简写EO,或EO语义)涉及到
  4. 接收数据:上游是否EO到SS
  5. 转换数据:SS作为整体是否保证了EO
  6. 推出数据:SS是否将数据EO地写出到了下游
  7. 只有保证了上述三个过程的EO,才能保证SS是能够实现EO的

0 消费kafka的两种方式

接收方式

接收方式的数据源 DStream 一般都继承了 ReceiverInputDStream, spark streaming 的启动的时候,会分发一个spark job, 目的就是到 各个executor 上去启动数据接收器, 例如 SocketInputDStream 就是要到每个executor 上去启动一个线程监听端口去接收数据, 然后把数据一块块放在每个 executor 的BlockManager (不懂的可以参考 spark 自己的分布式存储系统 - BlockManager) 并且把元信息汇报到 driver 上面, 最终 ReceiverInputDStream 计算的时候会产生 一个可以根据元信息去 BlockManager中拿数据的BlockRDD。

示例图

拉取方式

DirectKafkaInputDStream 这个DStream 计算的时候就是直接去kafka中拉取数据。

1 Spark Streaming上游对接kafka时如何保证Exactly Once?

简要说一下,Spark Streaming使用Direct模式对接上游kafka。无论kafka有多少个partition, 使用Direct模式总能保证SS中有相同数量的partition与之相对, 也就是说SS中的KafkaRDD的并发数量在Direct模式下是由上游kafka决定的。 在这个模式下,kafka的offset是作为KafkaRDD的一部分存在,会存储在checkpoints中, 由于checkpoints只存储offset内容,而不存储数据,这就使得checkpoints是相对轻的操作。 这就使得SS在遇到故障时,可以从checkpoint中恢复上游kafka的offset,从而保证exactly once。

\(这里有个需要注意的地方\),那就是checkpoint方法在spark中主要有两块应用: 一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复; 另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理 (如之前waiting batch的job会在重启后继续处理)。

1.1 checkpoint 机制能准确保证不重复消费吗?

要解决这个问题,需要明确checkpoint这个功能是如何实现的。

  • cache 和checkpoint的区别和联系
  • cache 和 checkpoint 是有显著区别的, 缓存把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储, 所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。
  • Checkpoint 的局限
  • 局限 Spark Streaming 的 checkpoint 机制看起来很美好,却有一个硬伤。 上文提到最终刷到外部存储的是类 Checkpoint 对象序列化后的数据。 那么在 Spark Streaming application 重新编译后, 再去反序列化 checkpoint 数据就会失败。这个时候就必须新建 StreamingContext。

1.2 解决checkpoint自身缺陷的方案?

针对这种情况,在我们结合 Spark Streaming + kafka 的应用中, 我们自行维护了消费的 offsets,这样一来即使重新编译 application, 还是可以从需要的 offsets 来消费数据。

1.3 checkpoint错误实现

主要出现的问题是针对stream操作的时候,出现在了functionToCreateContext函数的外面; 这样的话,会导致下次启动的时候,假如想通过checkpoint启动的话会没法初始化的错误;

val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

https://stackoverflow.com/questions/35090180/why-does-spark-throw-sparkexception-dstream-has-not-been-initialized-when-res

下面有个中文的详细的介绍: https://blog.csdn.net/Dax1n/article/details/53425668

1.4 正确的实现

package realtime

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent


object feed_ops  {

  def main(args: Array[String]): Unit = {

    val checkpointDirectory = "./check"

    def functionToCreateContext(): StreamingContext = {

      val conf = new SparkConf().setMaster("local[2]").setAppName("feed:alg:test")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory

      val kafkaParams = Map[String, Object] (
        "bootstrap.servers" -> "xxx:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "feed:alg:test",
        "auto.offset.reset" -> "earliest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("test")

      val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
      stream.map(record => (record.key, record.value)).
        filter(x=>x._2.contains("kankan_v2")).filter(x=>x._2.contains("NewsList") || x._2.contains("NewsPage") ).
        map(x =>(x._1, x._2.split("\u0001",0).filter(x=>x!="")) ).map(x=>(x._1, x._2.mkString("^"))).
        print()

      stream.foreachRDD {
        rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition { iter =>
            val o:OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println("data:  ", iter.map(x=>(x.key, x.value)).take(2).foreach(println))
            println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
          }
      }

      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
    ssc.start()
    ssc.awaitTermination()
  }
}

1.5 streaming中checkpoint写流程

同样,针对streaming中checkpoint的写流程,主要有以下三个问题,并对此做相关解释。

Q1:streaming中checkpoint是在何时做的?

A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。

Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?

A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。

在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。

其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。

Checkpoint主要包含的信息如下:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。

Q3: streaming checkpoint都有哪些坑?

A3:

从A2中可以看到,应用定义的计算函数也被序列化到checkpoint目录,当应用代码发生改变时,此时就没法从checkpoint恢复。个人感觉这是checkpoint在生产环境使用中碰到的最大障碍。

另外,当从checkpoint目录恢复streamingContext时,配置信息啥的也都是从checkpoint读取的(只有很少的一部分配置是reload的,具体见读流程),当重启任务时,新改变的配置就可能不生效,导致很奇怪的问题。

此外,broadcast变量在checkpoint中使用也受到限制(SPARK-5206)。

2 Spark Streaming输出下游如何保证Exactly once?

    详情参考这里 首先输出操作是具有At-least Once语义的,也就是说SS可以保证需要输出的数据一定会输出出去,只不过由于失败等原因可能会输出多次。 那么如何保证Exactly once?
  • 第一种“鸵鸟做法”,就是期望下游(数据)具有幂等特性。
  • 多次尝试总是写入相同的数据,例如,saveAs***Files 总是将相同的数据写入生成的文件。
  • 第二种使用事务更新
  • 所有更新都是事务性的,以便更新完全按原子进行。这样做的一个方法如下: 使用批处理时间(在foreachRDD中可用)和RDD的partitionIndex(分区索引)来创建identifier(标识符)。 该标识符唯一地标识streaming application 中的blob数据。 使用该identifier,blob 事务地更新到外部系统中。也就是说,如果identifier尚未提交,则以 (atomicall)原子方式提交分区数据和identifier。否则,如果已经提交,请跳过更新。

简要代码如下:

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

这样保证同一个partition要么一起更新成功,要么一起失败,通过uniqueId来标识这一次的更新,这就要求下游支持事务机制。

3 Spark Streaming 内部如何保证Exactly Once?

SS内部的实现机制是基于RDD模型的,RDD为保证计算过程中数据不丢失使用了checkpoint机制, 也就是说其计算逻辑是RDD的变换过程,也就是DAG,可以在计算过程中的任何一个阶段(也就是这个阶段的RDD) 上使用checkpoint方法,就可以保证当后续计算失败,可以从这个checkpoint重新算起,使得计算延续下去。 当Spark Streaming场景下,其天然会进行batch操作,也就是说kafka过来的数据, 每秒(一个固定batch的时间周期)会对当前kafka中的数据产生一个RDD, 那么后续计算就是在这个RDD上进行的。只需要在kafkaRDD这个位置合理使用了checkpoint (这一点在前面已经讲过,可以保证)就能保证SS内部的Exactly once。

注意一点:SS中没有Tuple级别的ACK,其操作必然是在RDD的某个partition上的, 要么全做,要么不做,要么失败,要么成功,都是基于RDD的partition的。

关于Event Time 在Spark最近的release note中没有明确提及event time。应该是对event time的支持力度还是有限的。 以下是我对event time在SS中可能的使用方式的理解 由于SS是基于batch rdd来实现的,如果在kafka接入到SS时就已经使用event time对数据进行划分了, 也就是说batch rdd的划分方式使用数据本身的时间戳就可以实现对业务时间的支持。

参考链接

Spark Streaming如何应对 Exactly once 语义

彻底理解 Spark 的 checkpoint 机制

上面博客

spark streaming 读取 kafka 的各种姿势解析

spark checkpoint详解

spark streming + kafka 0.8.0 中文文档

【容错篇】Spark Streaming的还原药水——Checkpoint

Spark Streaming + kafka 0.10 中文文档

Spark Streaming的Exactly-One的事务处理和不重复输出彻底掌握

!!! How to Achieve Exactly-Once Semantics in Spark Streaming

Spark Streaming 管理 Kafka Offsets 的方式探讨

必读:Spark与kafka010整合