Kafka】Kafka数据推送失败怎么处理呢?

2020-05-15 10:27发布

4条回答
小猴哥哥
2楼 · 2020-05-15 10:40

kafka数据推送失败,如果是生产者推送kafka数据失败,有可能是网络端口问题,配置网络ip的域名映射,防火墙问题可以做防火墙策略,还有配置的ip和端口是否正确,还要确定kafka是否启动服务。

kafka推送数据到其他系统,可以使用flume以及消费者来做

小橘子
3楼 · 2021-07-01 11:22

如图所示:image.png

ablabla
4楼 · 2021-11-05 14:48

这里梳理一下这个 原始数据 的整个流程:

upload 转发 原始数据到 kafka,用的topic为 OREGINAL_DATA ,   storm计算启动一个叫realtime的拓扑,消费 OREGINAL_DATA 上的数据,再把处理后的数据转发到 kafka, 用的topic为  OREGINAL_DATA_FORWARD。

 

排查过程:

1. 查看 upload日志,确定日志中有正常发数据到kafka ,且显示成功了   -- 这个日志有一个很大的误导作用

2. 通过 kafka 的消息积压命令查看(如下),确定是没有新的信息生产(logSize没有变化,通过kafka的 log.dirs 也确定该topic没有 生产到数据)

          kafka-consumer-offset-checker.sh   --topic OREGINAL_DATA --zookeeper zkIP:2181 --group GROUP_OREGINAL_DATA

3. 通过第2步的排除,怀疑 upload机器与kafka集群不通,用ping命令,确定机器是通的,查看 iptables ,没有信息, 用 telnet   kafkaIP   9092 ,不通,提供没有路由(no route to host),怀疑是hosts文件配置问题,确认有正常配置。 把 kafka上的防火墙关了,telnet 通了(这里还要恶补一下防火墙的知识,另外,这个是测试环境,所以可以直接关)。

4. 发数据,重复2,问题还是没有解决。通过修改程序中 topic为 OREGINAL_DATA_NEW 确定 程序是否与 kafka集群通。发现topic有正常创建,但是就是没有消息生产出来。

5. 在upload机器上安装 kafka,通过运行kafka自带的producer 行(命令如下),给新建的topic发数据,发现有报错:

         kafka-console-producer.sh --broker-list kafkaIP:9092 --topic  OREGINAL_DATA_NEW

    报错信息为:  org.apache.kafka.common.errors.TimeoutException,通过网上搜索,定位原因是发布到zookeeper的advertised.host.name如果没有设置,默认取 java.net.InetAddress.getCanonicalHostName().值,被用于生产端和消费端。因此外部网络或者未配置hostname映射的机器访问kafka集群时就会有网络问题了。

 

6. 配置 kafka 集群的主机名到 upload 机器的hosts 文件中, 发数据,问题解决。

 

这里注明一下,upload表本来是有配置 kafka集群的,但是用的是别名,且 公共配置文件上相关集群机器的配置信息也是用的别名,当时测试用的命令,也是别名。

 

 

关于测试这边的疏漏问题总结:

以前测试都是直接在公共配置文件用的主机名测试(这里用的主机别名的方式还没遇到过),另外,在kafka集群不通的情况下,upload日志居然没有报一点异常,还显示成功,这里对于 异常 情况的测试用例设置有些欠缺,后期要专门整理对程序的异常处理情况做测试,来验证程序的健壮性及自我恢复能力。

一: 通过控制防火墙来模拟网络异常的情况

    1.  在环境正常情况下,把 防火墙打开,确定程序是否正常,日志有没有对应的异常信息协助定位问题

    2.  关闭防火墙,确定程序能否自恢复,日志是否正常

二:要把各种基础服务分开部署,这个涉及到机器的申请,不然测试环境所有服务都部署在一台机器 或者几台机器上,所有网络都是通的,完全无法模拟这种测试环境


希希
5楼 · 2021-11-10 11:00

如果是生产者推送kafka数据失败,有可能是网络端口问题,配置网络ip的域名映射,防火墙问题可以做防火墙策略,还有配置的ip和端口是否正确,还要确定kafka是否启动服务。

相关问题推荐

  • 回答 17

    flume和kafka的侧重点不同,flume追求的是数据和数据源、数据流向的多样性,适合多个生产者的场景;flume有自己内置的多种source和sink组件,具体操作方式是编写source、channel和sink的.conf配置文件,开启flume组件的时候用命令关联读取配置文件实现kafka追...

  • 回答 2

    探究的是kafka的数据生产出来之后究竟落到了哪一个分区里面去了第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区第三种分区策略:既没有给定分区号,也没有给...

  • 回答 2

    读取数据的过程中,数据是属于某一个topic的某一个partition对应的某一个segment文件中的某一条记录。①定位到具体的segment日志文件②计算查找的offset在日志文件的相对偏移量

  • 回答 2

    Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...

  • 回答 3

    消息丢失解决方案:        首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。消息重复解决方案:        消息可以使用唯一id标识 ...

  • 回答 5

    遇到这种问题,基本上是心跳或offset更新不及时导致。在kafka环境中,有以下几个参数对于数据重复有很好的效果。auto.commit.interval.msconsumer向zookeeper提交offset的频率,单位是秒,默认60*1000此值太大会导致数据重复消费,将其调小可避免重复数据。建...

  • 回答 3

    数据一致性可以理解为数据前后是否一样,比如说数据的丢失或者重复都回造成数据的不一致!kafka的0.11之后引入了幂等性可以解决单次会话、单个partition的数据不重复,生产者可以利用其0.11后引入的事务来保证其数据一致性,消费者的话其实事务是无法保证的,...

  • 回答 1

    需要明白kafka的底层机制及工作原理,这里只简要说明,详细的参考kafka官网。kafka是将每一条写入kafka的数据按分区分布存储,将每条写入的数据作一个offset标记,这个标记的顺序是按插入数据自增的。当消费程序的时候,会按照分区区分,逐个根据offset顺序消费。...

没有解决我的问题,去提问