2020-05-15 10:25发布
遇到这种问题,基本上是心跳或offset更新不及时导致。
在kafka环境中,有以下几个参数对于数据重复有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的频率,单位是秒,默认60*1000
此值太大会导致数据重复消费,将其调小可避免重复数据。建议值100(毫秒)
max.poll.interval.ms
数据处理时间
max.poll.records
一次从kafka中poll出来的数据条数
max.poll.records条数据需要在max.poll.interval.ms这个时间内处理完
session.timeout.ms
zookeeper 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生。默认6000
request.timeout.ms
broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端,默认10000
request.timeout.ms值应大于session.timeout.ms的值
fetch.min.bytes
每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认1
————————————————
版权声明:本文为CSDN博主「handu940955668」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/handu940955668/java/article/details/85010773
丢包问题:消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。 解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。 检测方法:使用重放机制,查看问题所在。 kafka配置如下:
props.put("compression.type", "gzip"); props.put("linger.ms", "50"); props.put("acks", "all"); props.put("retries ", 30); props.put("reconnect.backoff.ms ", 20000); props.put("retry.backoff.ms", 20000);
重发问题:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。 底层根本原因:已经消费了数据,但是offset没提交。 配置问题:设置了offset自动提交 解决办法:至少发一次+去重操作(幂等性) 问题场景:1.设置offset为自动提交,正在消费数据,kill消费者线程;2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费;3.消费kafka与业务逻辑在一个线程中处理,可能出现消费程序业务处理逻辑阻塞超时,导致一个周期内,offset还未提交;继而重复消费,但是业务逻辑可能采用发送kafka或者其他无法回滚的方式; 重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 去重问题:消息可以使用唯一id标识 保证不丢失消息:生产者(ack=all 代表至少成功发送一次) 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
问题根源在于Kafka的平衡机制,Kafka什么时候平衡我们无从知晓,而消费又是没平衡好就开始消费了,所以解决也从这个角度来解决。和网友交流了下,了解到,新版本的API在平衡的时候可以注册一个对象,在平衡前和后可以调用这个对象的方法,我们在这个方法里面将此topic的stream提交(这可能会造成数据丢失,因为这些数据很可能还没处理),这个新API测试了下,基本没什么问题。高级API如何解决?用类分布式锁最终解决了这个问题,实现思路比较简单,就是通过ZK来实现,程序启动前先定义好需要启动的消费者数量,如果还没达到这个量,线程都不能启动,达到这个线程数后,休眠几秒后启动,在启动的时候,消费者线程已经得到了平衡,除非线程死掉否则不会发生平衡了,所以暂时解决了这个问题。
自己维护offset提交,任务正常执行再提交
flume和kafka的侧重点不同,flume追求的是数据和数据源、数据流向的多样性,适合多个生产者的场景;flume有自己内置的多种source和sink组件,具体操作方式是编写source、channel和sink的.conf配置文件,开启flume组件的时候用命令关联读取配置文件实现kafka追...
探究的是kafka的数据生产出来之后究竟落到了哪一个分区里面去了第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区第三种分区策略:既没有给定分区号,也没有给...
读取数据的过程中,数据是属于某一个topic的某一个partition对应的某一个segment文件中的某一条记录。①定位到具体的segment日志文件②计算查找的offset在日志文件的相对偏移量
Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...
消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。消息重复解决方案: 消息可以使用唯一id标识 ...
kafka数据推送失败,如果是生产者推送kafka数据失败,有可能是网络端口问题,配置网络ip的域名映射,防火墙问题可以做防火墙策略,还有配置的ip和端口是否正确,还要确定kafka是否启动服务。kafka推送数据到其他系统,可以使用flume以及消费者来做...
数据一致性可以理解为数据前后是否一样,比如说数据的丢失或者重复都回造成数据的不一致!kafka的0.11之后引入了幂等性可以解决单次会话、单个partition的数据不重复,生产者可以利用其0.11后引入的事务来保证其数据一致性,消费者的话其实事务是无法保证的,...
需要明白kafka的底层机制及工作原理,这里只简要说明,详细的参考kafka官网。kafka是将每一条写入kafka的数据按分区分布存储,将每条写入的数据作一个offset标记,这个标记的顺序是按插入数据自增的。当消费程序的时候,会按照分区区分,逐个根据offset顺序消费。...
最多设置5个标签!
遇到这种问题,基本上是心跳或offset更新不及时导致。
在kafka环境中,有以下几个参数对于数据重复有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的频率,单位是秒,默认60*1000
此值太大会导致数据重复消费,将其调小可避免重复数据。建议值100(毫秒)
max.poll.interval.ms
数据处理时间
max.poll.records
一次从kafka中poll出来的数据条数
max.poll.records条数据需要在max.poll.interval.ms这个时间内处理完
session.timeout.ms
zookeeper 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生。默认6000
request.timeout.ms
broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端,默认10000
request.timeout.ms值应大于session.timeout.ms的值
fetch.min.bytes
每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。默认1
————————————————
版权声明:本文为CSDN博主「handu940955668」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/handu940955668/java/article/details/85010773
遇到这种问题,基本上是心跳或offset更新不及时导致。
在kafka环境中,有以下几个参数对于数据重复有很好的效果。
auto.commit.interval.ms
consumer向zookeeper提交offset的频率,单位是秒,默认60*1000
此值太大会导致数据重复消费,将其调小可避免重复数据。建议值100(毫秒)
max.poll.interval.ms
丢包问题:消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。
解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。
检测方法:使用重放机制,查看问题所在。
kafka配置如下:
props.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all");
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
重发问题:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
底层根本原因:已经消费了数据,但是offset没提交。
配置问题:设置了offset自动提交
解决办法:至少发一次+去重操作(幂等性)
问题场景:1.设置offset为自动提交,正在消费数据,kill消费者线程;2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费;3.消费kafka与业务逻辑在一个线程中处理,可能出现消费程序业务处理逻辑阻塞超时,导致一个周期内,offset还未提交;继而重复消费,但是业务逻辑可能采用发送kafka或者其他无法回滚的方式;
重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
去重问题:消息可以使用唯一id标识
保证不丢失消息:生产者(ack=all 代表至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
问题根源在于Kafka的平衡机制,Kafka什么时候平衡我们无从知晓,而消费又是没平衡好就开始消费了,所以解决也从这个角度来解决。
和网友交流了下,了解到,新版本的API在平衡的时候可以注册一个对象,在平衡前和后可以调用这个对象的方法,我们在这个方法里面将此topic的stream提交(这可能会造成数据丢失,因为这些数据很可能还没处理),这个新API测试了下,基本没什么问题。
高级API如何解决?用类分布式锁最终解决了这个问题,实现思路比较简单,就是通过ZK来实现,程序启动前先定义好需要启动的消费者数量,如果还没达到这个量,线程都不能启动,达到这个线程数后,休眠几秒后启动,在启动的时候,消费者线程已经得到了平衡,除非线程死掉否则不会发生平衡了,所以暂时解决了这个问题。
自己维护offset提交,任务正常执行再提交
相关问题推荐
flume和kafka的侧重点不同,flume追求的是数据和数据源、数据流向的多样性,适合多个生产者的场景;flume有自己内置的多种source和sink组件,具体操作方式是编写source、channel和sink的.conf配置文件,开启flume组件的时候用命令关联读取配置文件实现kafka追...
探究的是kafka的数据生产出来之后究竟落到了哪一个分区里面去了第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区第三种分区策略:既没有给定分区号,也没有给...
读取数据的过程中,数据是属于某一个topic的某一个partition对应的某一个segment文件中的某一条记录。①定位到具体的segment日志文件②计算查找的offset在日志文件的相对偏移量
Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...
消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。消息重复解决方案: 消息可以使用唯一id标识 ...
kafka数据推送失败,如果是生产者推送kafka数据失败,有可能是网络端口问题,配置网络ip的域名映射,防火墙问题可以做防火墙策略,还有配置的ip和端口是否正确,还要确定kafka是否启动服务。kafka推送数据到其他系统,可以使用flume以及消费者来做...
数据一致性可以理解为数据前后是否一样,比如说数据的丢失或者重复都回造成数据的不一致!kafka的0.11之后引入了幂等性可以解决单次会话、单个partition的数据不重复,生产者可以利用其0.11后引入的事务来保证其数据一致性,消费者的话其实事务是无法保证的,...
需要明白kafka的底层机制及工作原理,这里只简要说明,详细的参考kafka官网。kafka是将每一条写入kafka的数据按分区分布存储,将每条写入的数据作一个offset标记,这个标记的顺序是按插入数据自增的。当消费程序的时候,会按照分区区分,逐个根据offset顺序消费。...