主要研究一下这种如标题的这种整合方式 如何在实际业务中应用。

一 问题引入

我发现在实际的使用过程中 配置好如下不自动提交offset信息如下

val kafkaParams = Map[String, Object] (
      "bootstrap.servers" -> "X:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "feed:alg:test",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

自己在代码也不手动提交offset的话。spark streaming其实也能正常的实时的处理新来的数据, 而不是说不断的重复处理,上一次的信息,也就是说,spark并不是每次都是去读对应kafka的 提交的那个offset来决定处理那些信息,应该是spark自身维护了一个offset信息。

如何验证上述的猜想,需要查看kafka服务器上的offset的更新情况。

二 kafka 服务器offset更新查看

首先要明确的是kafka在不同版本情况下offset信息保存到位置其实也不一样, 老版本保存在zk中,新版本可能保存在broker中。 kafka 消费者offset记录位置和方式

  • 自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
  • 手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

这里面我后来把group.id设置成feed-alg-test,然后也没有手动提交,结果在执行如下命令的时候,根本就 看不到这个group.id

bin/kafka-consumer-groups.sh  --bootstrap-server X:9092 --list
Note: This will not show information about old Zookeeper-based consumers.

console-consumer-39216
kafka-node-energyBar
feed:alg:test
KMOffsetCache-cloud06-test-zk-00
device-shadow-cn-qa
iot-lbs-group

另外一个角度,我们去查看这个ofo:alg:test下面的信息

➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server X:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'feed:alg:test' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    2          543512          545324          1812       -                                                 -                              -
ofo_sc_test                    3          543516          545328          1812       -                                                 -                              -
ofo_sc_test                    0          643068          644880          1812       -                                                 -                              -
ofo_sc_test                    4          643063          644875          1812       -                                                 -                              -
ofo_sc_test                    1          643061          644873          1812       -                                                 -                              -

然后我重新再启动看offset的变化, 可以看到服务器上的offset根本就没变化,但是程序里面的offset其实是一致在变化的

➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'feed:alg:test' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    2          543512          545359          1847       -                                                 -                              -
ofo_sc_test                    3          543516          545363          1847       -                                                 -                              -
ofo_sc_test                    0          643068          644915          1847       -                                                 -                              -
ofo_sc_test                    4          643063          644911          1848       -                                                 -                              -
ofo_sc_test                    1          643061          644909          1848       -                                                 -                              -
➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    0          643068          644916          1848       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    1          643061          644909          1848       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    2          543512          545359          1847       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    3          543516          545364          1848       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    4          643063          644911          1848       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    0          643068          644917          1849       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    1          643061          644911          1850       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    2          543512          545361          1849       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    3          543516          545366          1850       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1
ofo_sc_test                    4          643063          644913          1850       consumer-1-f72f1d7a-dd91-4bc5-ae74-4dd33c585afb   /10.6.30.215                   consumer-1

guoqiang:offset ofo_sc_test 0 645374 645374
guoqiang:offset ofo_sc_test 4 645369 645369
18/09/29 17:31:30 INFO Executor: Running task 4.0 in stage 79.0 (TID 265)
18/09/29 17:31:30 INFO KafkaRDD: Beginning offset 645367 is the same as ending offset skipping ofo_sc_test 1
guoqiang:offset ofo_sc_test 1 645367 645367
18/09/29 17:31:30 INFO Executor: Finished task 4.0 in stage 79.0 (TID 265). 751 bytes result sent to driver
18/09/29 17:31:30 INFO TaskSetManager: Finished task 4.0 in stage 79.0 (TID 265) in 8 ms on localhost (executor driver) (4/5)
guoqiang:offset ofo_sc_test 2 545817 545818

当我把手动提交加进去之后,在看一下变化

stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    0          643068          645454          2386       consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    1          643061          645447          2386       consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    2          543512          545898          2386       consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    3          543516          545902          2386       consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    4          643063          645450          2387       consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    0          645462          645462          0          consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    1          645455          645456          1          consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    2          545906          545906          0          consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    3          545910          545910          0          consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
ofo_sc_test                    4          645457          645458          1          consumer-1-dfe7d0e9-cc41-44a0-85c9-39093cc9f693   /10.6.30.215                   consumer-1
➜  kafka_2.11-1.0.0 bin/kafka-consumer-groups.sh  --bootstrap-server 10.6.26.183:9092,10.6.26.184:9092,10.6.26.185:9092 --group feed:alg:test --describe
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'feed:alg:test' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
ofo_sc_test                    2          545990          546003          13         -                                                 -                              -
ofo_sc_test                    3          545995          546007          12         -                                                 -                              -
ofo_sc_test                    0          645547          645559          12         -                                                 -                              -
ofo_sc_test                    4          645542          645555          13         -                                                 -                              -
ofo_sc_test                    1          645540          645553          13         -                                                 -                              -

三 解决方案:存储在kafka上

这里面的关键点其实就是,程序恢复的时候如何从kafka拿到上次提交的offset; 还有一个点是,需要争取理解kafka启动参数里面的auto.offset.reset设置为latest的时候代表的是什么意思, 这里面可能和自己从字面意思的理解不一样;

  earliest 
  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
  latest 
  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
  none  
  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


这里面就要注意保存在kafka和其他位置的一些微妙的区别就是,在zk或者hbase下面的话,可以方便的自己手动 修改一下offset的,但是假如保存kafka自身的话,修改起来会麻烦一些;

另外一个区别是,假如kafka增加分区的话,不同的offset存储方案之间的区别是啥,以及如何影响的 也应该仔细考虑一下。关于Spark Streaming感知kafka动态分区的问题

四 解决方案:存储在zk上[Spark streaming 基础]--使用低阶API消费Kafka数据(手动更新offset)

ToDo streaming-offset-to-zk

使用低阶API消费Kafka数据(手动更新offset)

https://www.cnblogs.com/mlxx9527/p/9376202.html

五 解决方案:保存在其他数据源数据库(可实现事务)

http://shzhangji.com/blog/2017/07/31/how-to-achieve-exactly-once-semantics-in-spark-streaming/

https://github.com/jizhang/spark-sandbox/blob/master/src/main/scala/ExactlyOnce.scala

系统性总结

Kafka设计解析(八)- Exactly Once语义与事务机制原理

参考文章

sparkstreaming中kafka的offset提交

springboot和kafka结合其中enable.auto.commit等于false失效

Kafka查看topic、consumer group状态命令

Spark Streaming 管理 Kafka Offsets 的方式探讨

如何管理Spark Streaming消费Kafka的偏移量(一)

如何管理Spark Streaming消费Kafka的偏移量(二)

Zabbix监控之从Kafka中获取消费进度和lag

Kafka auto.offset.reset值详解

kafka+sparkstreaming 的offset管理