kafka数据会不会重复消费,什么情况下会重复消费?

2020-06-01 22:36发布

5条回答
无需指教
2楼 · 2020-06-02 08:35

要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费


1、消息发送

         Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:


0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;


1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;(数据要求快,重要性不高时用)


-1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,数据一般不会丢失,延迟时间长但是可靠性高。(数据重要时用)


综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:


(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;


(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;


kafka中维护了一个AR列表,包括所有分区的副本,AR又分为ISR和OSR,


ISR中副本都要同步leader中的数据,ack为-1时,只有都同步完数据才认为是成功提交了,成功提交后才能供外界访问。


OSR内的副本是否同步了leader的数据并不影响数据的提交,OSR内的follower尽力去同步leader,可能同步不是很及时。


最开始所有副本都在ISR里,kafka工作的过程中,如果某个副本同步速度慢于默认阈值,则被踢出ISR存入OSR中,如果后续速度恢复则回到ISR中。



2、消息消费


        Kafka消息消费有两个consumer接口,Low-level API和High-level API:


Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制,可以指定读取哪个topic、partition、offset;


                         缺点是太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。


High-level API:不需要管理offset,系统通过zookeeper自动管理;不需要管理partition,系统自动管理;消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据。


                         缺点是不能自行控制offset(对于某些特殊需求来说),不能细化控制如分区、副本、zk等。


使用低级接口Low-level API,每次消费的offset都是自行指定,可以有目的地去重复消费数据;


使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;    


kafka的ack设置为-1虽然不会丢数据,但是可能产生冗余数据,例如生产者发送数据给leader,leader同步数据给ISR队列中follower,同步到一半leader宕机,此时选出新的leader,可能成为leader的这个follower具有部分刚才同步的数据,而生产者收到失败消息重发数据,新的leader接收数据就有部分数据重复了。


解决办法:

        针对消息丢失:


同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;


异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;


        针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。


ps:其实理论上kafka,在同步模式下ack设置为-1,异步模式在配置文件中设置不限制阻塞超时时间,防止缓冲区满;这样理论上就不会丢数据了;


但是实际生产环境还是会丢数据,正常丢个百分之一影响已经很大了,例如日常数据500G的日志,丢个5G影响很大,所以离线写入数据的场合不适合用kafka就是这个道理。



小新没有蜡笔
3楼 · 2021-09-07 17:28

你需要确保两点:幂等producer,offset commit和外部记录的xa提交。

退一步讲,就假设你这两点都保证,redis是异步复制,xa成功了,failover后之前存的offset一样会丢。

其实kafka streams就靠上面这两点来提供所谓exactly once的语意,后者改成多topic的xa提交,因为kafka的offset本身就存在一个topic里面。

即便如此,也不是说保证业务代码绝绝对对的只运行一次,也只是能保证整个状态的完整正确而已,用redis这点都无法保证。


一个Ai
4楼 · 2021-09-09 15:38

导致kafka的重复消费问题原因在于,已经消费了数据,但是offset没来得及提交(比如Kafka没有或者不知道该数据已经被消费)。
总结以下场景导致Kakfa重复消费:

  • 原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。

  • 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。

例如:

try {    consumer.unsubscribe();} catch (Exception e) {}
try {    consumer.close();} catch (Exception e) {}

上面代码会导致部分offset没提交,下次启动时会重复消费。

解决方法:设置offset自动提交为false

整合了Spring配置的修改如下配置
spring配置:

spring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest

整合了API方式的修改enable.auto.commit为false
API配置:

Properties�0�2props�0�2=�0�2new�0�2Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit",�0�2"false");

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。

  • 原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

  • 原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。

  • 原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

  • 原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

    问题描述:
    我们系统压测过程中出现下面问题:异常rebalance,而且平均间隔3到5分钟就会触发rebalance,分析日志发现比较严重。错误日志如下:

    08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]�0�2�0�2�0�2�0�2�0�2�0�2�0�2�0�2at�0�2java.lang.Thread.run(Thread.java:748)�0�2[na:1.8.0_161]

    这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

    问题分析:

    这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms(默认间隔时间为300s),

    该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

    处理重复数据

    因为offset此时已经不准确,生产环境不能直接去修改offset偏移量。

    所以重新指定了一个消费组(group.id=order_consumer_group),然后指定auto-offset-reset=latest这样我就只需要重启我的服务了,而不需要动kafka和zookeeper了!

    #consumerspring.kafka.consumer.group-id=order_consumer_groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-offset-reset=latest

    注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定一个全新的group.id=new_group,然后指定auto-offset-reset=earliest即可


    freediandianer
    5楼 · 2021-10-15 09:34

    问题描述:

           在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。


    原理解析:



    上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。


    验证过程:

    搭建一个简单的springboot,集成kakfa,添加配置信息如下:


    spring.kafka.bootstrap_servers=${KAFKA_ADDRESS:10.199.1.0:9092}

    spring.kafka.consumer.group_id=${KAFKA_GROUP_ID:0}

    spring.kafka.consumer.max-poll-records=1000

    spring.kafka.consumer.properties.max.partition.fetch.bytes=65536

    spring.kafka.consumer.properties.receive.buffer.bytes=65536

    spring.kafka.consumer.properties.session.timeout.ms=10000

    spring.kafka.consumer.max-poll-records表示每次最多能拉取1000个数据,spring.kafka.consumer.properties.session.timeout.ms表示超时时间,这里设置10秒。


    再来看一下consumer的代码:


    @Component

    public class KafkaConsumer {

        @KafkaListener(topics = {"test"})

        public void receive(ConsumerRecord consumerRecord){

            System.out.println("test--消费消息:" + consumerRecord.key());

            try {

                Thread.sleep(500);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

    这样的预期结果是每次都在重复执行同样的连续1000条数据。下面看一下验证结果:


    开始消费的日志:




    重复消费的日志:




    结果正好是在1000个数据之后,提交失败。并且日志打出提示,建议你增加session timeout的时间或者减少从kafka里面取到一批的数据量。

    征戰撩四汸
    6楼 · 2021-11-04 10:06

    原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。
    原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。

    原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

    原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。

    原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

    原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费




    相关问题推荐

    • 什么是大数据时代?2021-01-13 21:23
      回答 100

      大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

    • 回答 84

      Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

    • 回答 52
      已采纳

      学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

    • 回答 29

      简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

    • 回答 14

      tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

    • 回答 18

      您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

    • 回答 17

      最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

    • 回答 33

      大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

    • 回答 5

      MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

    • mysql安装步骤mysql 2022-05-07 18:01
      回答 2

      mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

    • 回答 5

      1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

    • 回答 5

      CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

    • 回答 9

      学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

    • 回答 7

      添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

    • 回答 5

      看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

    • 回答 7

      查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

    没有解决我的问题,去提问