linux 怎样查看kafka的某 topic数据

2020-06-12 20:15发布

16条回答
小猪仔
2楼 · 2020-12-28 14:32
  • 启动kafka:

  • ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &     # kafka-server-start.sh 脚本在 kafka_2.12-2.2.0/bin路径下
  • 查看已创建的topic列表:

  • ./kafka-topics.sh --list --zookeeper localhost:2181

  • 查看对应topic的描述信息: 

  • ./kafka-topics.sh --describe --zookeeper xx.x.xxx.xxx:2181 --topic xxxxx.xx.xxxxx.xxx     # --zookeeper为zookeeperIP, --topic为topic名称
  • 消费消息:

  • ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxxxx.xx.xxxxx.xxx --from-beginning    # 若没有任何返回或没有响应,则该topic中没有数据内容;否则就是有数据


嘿呦嘿呦拔萝卜
3楼 · 2021-09-08 17:24
  • 查看topic分布情况kafka-list-topic.sh:

    bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)

    bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181-topic test (查看test的分区情况)

  • 创建topickafka-create-topic.sh:

    bin/kafka-create-topic.sh-replica 2-partition 8-topic test-zookeeper 192.168.197.170:2181,192.168.197.171:2181

    创建名为test的topic,8个分区分别存放数据,数据备份总共2份


流流流年
4楼 · 2020-06-15 11:17

1、创建一个需要增加备份因子的topic列表的文件,文件格式是json格式的。

2、使用kafka官方提供的工具拿到上面topic的partions 分布情况,并重定向到文件中。

3、修改ressgintopic.conf 文件的,手动分配新增加的partion 备份因子。

4、通过下面命令执行备份因子扩容过程,bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json。

5、最后查看kafka的某 topic数据如图。

天天
5楼 · 2021-01-18 14:27

1、查看topic分布情况kafka-list-topic.sh:

bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)

bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181-topic test (查看test的分区情况)

2、创建topickafka-create-topic.sh:

bin/kafka-create-topic.sh-replica 2-partition 8-topic test-zookeeper 192.168.197.170:2181,192.168.197.171:2181

创建名为test的topic,8个分区分别存放数据,数据备份总共2份。


722
6楼 · 2021-08-31 09:29

如果你在0.9版本以上,可以用最新的Consumer client 客户端,有consumer.seekToEnd() / consumer.position() 可以用于得到当前最新的offset: ${log.dirs}/replication-offset-checkpoint

freediandianer
7楼 · 2021-09-02 11:55

基于0.8.0版本。

##查看topic分布情况kafka-list-topic.sh
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分区情况)

其实kafka-list-topic.sh里面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类
##创建TOPIC kafka-create-topic.sh
bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181
创建名为test的topic, 8个分区分别存放数据,数据备份总共2份

bin/kafka-create-topic.sh --replica 1 --partition 1 --topic test2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181
结果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170
##重新分配分区kafka-reassign-partitions.sh
这个命令可以分区指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute
cat topic-to-move.json
{"topics":
[{"topic": "test2"}],
"version":1
}
##为Topic增加 partition数目kafka-add-partitions.sh
bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)

##控制台接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test
##控制台发送消息
bin/kafka-console-producer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test
##手动均衡topic, kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json

cat preferred-click.json
{
"partitions":
[
{"topic": "click", "partition": 0},
{"topic": "click", "partition": 1},
{"topic": "click", "partition": 2},
{"topic": "click", "partition": 3},
{"topic": "click", "partition": 4},
{"topic": "click", "partition": 5},
{"topic": "click", "partition": 6},
{"topic": "click", "partition": 7},
{"topic": "play", "partition": 0},
{"topic": "play", "partition": 1},
{"topic": "play", "partition": 2},
{"topic": "play", "partition": 3},
{"topic": "play", "partition": 4},
{"topic": "play", "partition": 5},
{"topic": "play", "partition": 6},
{"topic": "play", "partition": 7}

]
}

##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181

IT学习
8楼 · 2021-09-02 17:37

1、单节点启动(非集群)

(1)下载kafka安装包http://kafka.apache.org/downloads,我这里下载的是kafka_2.11-2.3.0.tgz

(2)解压(不需要安装,直接配置可用) tar -xzvf kafka_2.11-2.3.0.tgz

(3)对于单节点kafka,我们配置很简单,只需在.../kafka_2.11-2.3.0/config 目录下修改/kafka_2.11-2.3.0/config/server.properties这个配置文件的内容,只需取消#listeners=PLAINTEXT://:9092的注释,或者重新加一行listeners=PLAINTEXT:[本设备的ip]//:9092,最中修改的结果为listeners=PLAINTEXT:[本设备的ip]//:9092

(4)启动zookeeper:.../kafka_2.11-2.3.0/bin/zookeeper-server-start.sh -daemon .../kafka_2.11-2.3.0/config/zookeeper.properties

(5)启动kafka:.../kafka_2.11-2.3.0/bin/kafka-server-start.sh -daemon .../kafka_2.11-2.3.0/config/server.properties

(6)启动完成


我的网名不再改
9楼 · 2021-09-07 11:13

1、创建一个需要增加备份因子的topic列表的文件,文件格式是json格式的。

2、使用kafka官方提供的工具拿到上面topic的partions 分布情况,并重定向到文件中。

3、修改ressgintopic.conf 文件的,手动分配新增加的partion 备份因子。

4、通过下面命令执行备份因子扩容过程,bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json。

5、最后查看kafka的某 topic数据如图。

注意事项:

Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。


相关问题推荐

  • 回答 77

    云计算话题太大了,建议学具体点的可落地的,找个培训班摸两月项目比自学找各种杂资料性价比更高

  • 回答 4

    交换机不分进线和出线,找个接口插上即可。交换机每一端口都可视为独回立的物理网段(注:答非IP网段),连接在其上的网络设备独自享有全部的带宽,无须同其他设备竞争使用。当节点A向节点D发送数据时,节点B可同时向节点C发送数据,而且这两个传输都享有网络...

  • 回答 39

    编程和UI,一个程序向,每天更多的面对满屏幕的代码,对逻辑思维和数学有基本要求。UI设计则是美术向,网站UI、app应用UI和游戏UI要求又不一样(游戏要求更高)。一个好的UI设计师需要具备平面设计,交互设计,手绘等等。...

  • 回答 6
    已采纳

    静态资源和动态资源的概念:静态资源:我的理解是前端的固定页面,这里面包含HTML、CSS、JS、图片等等,不需要查数据库也不需要程序处理,直接就能够显示的页面。具体形式为:客户端发送请求到web服务器,web服务器拿到对应的文件,返回给客户端,客户端解析...

  • 回答 4
    已采纳

    查看进程--方法:ps -aux | grep 'zookeeper'系统有百返回,说明zookeeper启动。linux上进程有5种状态:运行(正在运行或在运行队列中等待);中断度(休眠中, 受阻, 在等待某个条件的形成或接受到信号);不可中断(收到信号不唤醒和不可运行, 进程必须等...

  • 回答 26

    Linux应用领域非常广泛,各个行业都得到了非常好的应用,从目前市场招聘需求量说,IT行业需求最为广泛,对Linux人才需求量也很高,而且Linux云计算入门简单、容易学习,适合零基础,学习之后可以就业领域有很多,涉及岗位广泛。...

  • 回答 8
    已采纳

    相对学的东西挺多的,有点儿复杂。云计算涉及的内容非常多,基本覆盖了计算机领域各方面的知识。涉及到基础架构,网络,灾备,容量规划,安全,分布式计算与存储,虚拟化等等,这些知识需要你对计算机系统有着非常深入而广泛的了解。无论是从事云计算的搭建,...

  • 回答 0

  • net.core.netdev_max_backl2021-12-13 14:01
    回答 3

    TCP SYN_REVD, ESTABELLISHED 状态对应的队列TCP 建立连接时要经过 3 次握手,在客户端向服务器发起连接时,对于服务器而言,一个完整的连接建立过程,服务器会经历 2 种 TCP 状态:SYN_REVD, ESTABELLISHED对应也会维护两个队列:一个存放 SYN ...

  • 回答 3

    对于一个TCP连接,Server与Client需要通过三次握手来建立网络连接.当三次握手成功后,我们可以看到端口的状态由LISTEN转变为ESTABLISHED,接着这条链路上就可以开始传送数据了.每一个处于监听(Listen)状态的端口,都有自己的监听队列.监听队列的长度,与如下两方...

  • net.ipv4.tcp_abort_on_ove2021-12-09 14:50
    回答 5
    已采纳

    设置该参数为 1 时,当系统在短时间内收到了大量的请求,而相关的应用程序未能处理时,就会发送 Reset 包直接终止这些链接。建议通过优化应用程序的效率来提高处理能力,而不是简单地 Reset。默认值: 0。...

  • 回答 7

    *tcpsyncookies是一个开关,是否打开SYN Cookie功能,该功能可以防止部分SYN×××。tcpsynackretries和tcpsynretries定义SYN的重试次数。YN Cookie是对TCP服务器端的三次握手做一些修改,专门用来防范SYN Flood×××的一种手段。它的原理是,在TCP服务器接...

  • net.ipv4.tcp_max_syn_back2021-12-07 14:32
    回答 7
    已采纳

    该参数决定了系统中处于 SYN_RECV 状态的 TCP 连接数量。SYN_RECV 状态指的是当系统收到 SYN 后,作了 SYN+ACK 响应后等待对方回复三次握手阶段中的最后一个 ACK 的阶段。

  • 回答 6

    先重头检查一下是不是配置出现问题,不行的话重装软件试试

  • 回答 10

    首先是我们将鼠标移动到如下图所示的计算机的图标上,点击右键,选择管理。然后我们点击如下图界面中的服务和应用程序。接下来可以看到它下面有一个我们的目标——服务。服务的话它是按字母排列的,我们往下拉,找到Mysql服务。此时可以看到它是一个手动启动...

没有解决我的问题,去提问