kafka为什么有高吞吐的特点?

2020-09-09 09:08发布

kafka为什么有高吞吐的特点?

kafka为什么有高吞吐的特点?

4条回答
天天
2楼 · 2020-09-09 09:27

1.顺序读写

kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能

2.零拷贝

Kafka把所有的消息存放到一个文件中,当消费者需要数据的时候直接将文件发送给消费者,比如10W的消息共10M,全部发送给消费者,10M的消息在内网中传输是非常快的,假如需要1s,那么kafka的tps就是10w。

3.分区

kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力

4.批量发送

kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka

1)等消息条数到固定条数

2)一段时间发送一次

5.数据压缩

Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。

压缩的好处就是减少传输的数据量,减轻对网络传输的压力。


猜不到结尾
3楼 · 2020-09-09 16:32

Jafka/KafkaKafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台。

我的网名不再改
4楼 · 2020-09-11 20:13

kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲、异步通信、汇集日志、系统解耦等方面。相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能。

本文将针对Kafka性能方面进行简单分析,首先简单介绍一下Kafka的架构和涉及到的名词:

1. Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。
2. Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。
3. Offset:消息在Partition中的编号,编号顺序不跨Partition。
4. Consumer:用于从Broker中取出/消费Message。
5. Producer:用于往Broker中发送/生产Message。
6. Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。
7. Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。
8. Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
9. ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。

以上几乎是我们在使用Kafka的过程中可能遇到的所有名词,同时也无一不是最核心的概念或组件,感觉到从设计本身来说,Kafka还是足够简洁的。这次本文围绕Kafka优异的吞吐性能,逐个介绍一下其设计与实现当中所使用的各项“黑科技”。

Broker
不同于Redis和MemcacheQ等内存消息队列,Kafka的设计是把所有的Message都要写入速度低容量大的硬盘,以此来换取更强的存储能力。实际上,Kafka使用硬盘并没有带来过多的性能损失,“规规矩矩”的抄了一条“近道”。

首先,说“规规矩矩”是因为Kafka在磁盘上只做Sequence I/O,由于消息系统读写的特殊性,这并不存在什么问题。关于磁盘I/O的性能,引用一组Kafka官方给出的测试数据(Raid-5,7200rpm):

Sequence I/O: 600MB/s
Random I/O: 100KB/s

所以通过只做Sequence I/O的限制,规避了磁盘访问速度低下对性能可能造成的影响。

接下来我们再聊一聊kafka是如何“抄近道的”。

首先,Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。
使用PageCache功能同时可以避免在JVM内部缓存数据,JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。
 
• 如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。
• 所有在在JVM内的对象都不免带有一个Object Overhead(千万不可小视),内存的有效空间利用率会因此降低。
• 所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过只在PageCache中做缓存至少可以提高一倍的缓存空间。
• 如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的PageCache依然可以继续使用。

PageCache还只是第一步,Kafka为了进一步的优化性能还采用了Sendfile技术。在解释Sendfile之前,首先介绍一下传统的网络I/O操作流程,大体上分为以下4步。

1. OS从硬盘把数据读到内核区的PageCache。
2. 用户进程把数据从内核区Copy到用户区。
3. 然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
4. OS再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。


整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。


通过以上的介绍不难看出,Kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果Producer和Consumer之间生产和消费进度上配合得当,完全可以实现数据交换零I/O。这也就是我为什么说Kafka使用“硬盘”并没有带来过多性能损失的原因。下面是我在生产环境中采到的一些指标。
(20 Brokers, 75 Partitions per Broker, 110k msg/s)


此时的集群只有写,没有读操作。10M/s左右的Send的流量是Partition之间进行Replicate而产生的。从recv和writ的速率比较可以看出,写盘是使用Asynchronous+Batch的方式,底层OS可能还会进行磁盘写顺序优化。而在有Read Request进来的时候分为两种情况,第一种是内存中完成数据交换。



Send流量从平均10M/s增加到了到平均60M/s,而磁盘Read只有不超过50KB/s。PageCache降低磁盘I/O效果非常明显。

接下来是读一些收到了一段时间,已经从内存中被换出刷写到磁盘上的老数据。


其他指标还是老样子,而磁盘Read已经飚高到40+MB/s。此时全部的数据都已经是走硬盘了(对硬盘的顺序读取OS层会进行Prefill PageCache的优化)。依然没有任何性能问题。

Tips
1. Kafka官方并不建议通过Broker端的log.flush.interval.messageslog.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。
2. 可以通过调整/proc/sys/vm/dirty_background_ratio/proc/sys/vm/dirty_ratio来调优性能。
a. 脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache。
b. 脏页率超过第二个指标会阻塞所有的写操作来进行Flush。
c. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio

Partition
Partition是Kafka可以很好的横向扩展和提供高并发处理以及实现Replication的基础。

扩展性方面。首先,Kafka允许Partition在集群内的Broker之间任意移动,以此来均衡可能存在的数据倾斜问题。其次,Partition支持自定义的分区算法,例如可以将同一个Key的所有消息都路由到同一个Partition上去。 同时Leader也可以在In-Sync的Replica中迁移。由于针对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。

并发方面。任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition),Kafka非常简洁的Offset机制最小化了Broker和Consumer之间的交互,这使Kafka并不会像同类其他消息队列一样,随着下游Consumer数目的增加而成比例的降低性能。此外,如果多个Consumer恰巧都是消费时间序上很相近的数据,可以达到很高的PageCache命中率,因而Kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。

不过,Partition的数量并不是越多越好,Partition的数量越多,平均到每一个Broker上的数量也就越多。考虑到Broker宕机(Network Failure, Full GC)的情况下,需要由Controller来为所有宕机的Broker上的所有Partition重新选举Leader,假设每个Partition的选举消耗10ms,如果Broker上有500个Partition,那么在进行选举的5s的时间里,对上述Partition的读写操作都会触发LeaderNotAvailableException

再进一步,如果挂掉的Broker是整个集群的Controller,那么首先要进行的是重新任命一个Broker作为Controller。新任命的Controller要从Zookeeper上获取所有Partition的Meta信息,获取每个信息大概3-5ms,那么如果有10000个Partition这个时间就会达到30s-50s。而且不要忘记这只是重新启动一个Controller花费的时间,在这基础上还要再加上前面说的选举Leader的时间 -_-!!!!!!

此外,在Broker端,对Producer和Consumer都使用了Buffer机制。其中Buffer的大小是统一配置的,数量则与Partition个数相同。如果Partition个数过多,会导致Producer和Consumer的Buffer内存占用过大。


tips
1. Partition的数量尽量提前预分配,虽然可以在后期动态增加Partition,但是会冒着可能破坏Message Key和Partition之间对应关系的风险。
2. Replica的数量不要过多,如果条件允许尽量把Replica集合内的Partition分别调整到不同的Rack。
3. 尽一切努力保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。

Producer
Kafka的研发团队表示在0.8版本里用Java重写了整个Producer,据说性能有了很大提升。我还没有亲自对比试用过,这里就不做数据对比了。本文结尾的扩展阅读里提到了一套我认为比较好的对照组,有兴趣的同学可以尝试一下。

其实在Producer端的优化大部分消息系统采取的方式都比较单一,无非也就化零为整、同步变异步这么几种。

Kafka系统默认支持MessageSet,把多条Message自动地打成一个Group后发送出去,均摊后拉低了每次通信的RTT。而且在组织MessageSet的同时,还可以把数据重新排序,从爆发流式的随机写入优化成较为平稳的线性写入。

此外,还要着重介绍的一点是,Producer支持End-to-End的压缩。数据在本地压缩后放到网络上传输,在Broker一般不解压(除非指定要Deep-Iteration),直至消息被Consume之后在客户端解压。

当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟Kafka目前支持的压缩算法有限,只有GZIP和Snappy),不过这样做反而会意外的降低效率!!!! Kafka的End-to-End压缩与MessageSet配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。

不过Kafka采用MessageSet也导致在可用性上一定程度的妥协。每次发送数据时,Producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的MessageSet当中,尚未发送到网络,这时候如果Producer挂掉,那就会出现丢数据的情况。

为了解决这个问题,Kafka在0.8版本的设计借鉴了网络当中的ack机制。如果对性能要求较高,又能在一定程度上允许Message的丢失,那就可以设置request.required.acks=0 来关闭ack,以全速发送。如果需要对发送的消息进行确认,就需要设置request.required.acks为1或-1,那么1和-1又有什么区别呢?这里又要提到前面聊的有关Replica数量问题。如果配置为1,表示消息只需要被Leader接收并确认即可,其他的Replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低。如果设置为-1,表示消息要Commit到该Partition的ISR集合中的所有Replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着Replica的数量正比增长,这里就需要根据不同的需求做相应的优化。

tips
1. Producer的线程不要配置过多,尤其是在Mirror或者Migration中使用的时候,会加剧目标集群Partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。
2. 0.8版本的request.required.acks默认是0(同0.7)。

Consumer
Consumer端的设计大体上还算是比较常规的。

• 通过Consumer Group,可以支持生产者消费者和队列访问两种模式。
• Consumer API分为High level和Low level两种。前一种重度依赖Zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖Zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(Leader迁移、Offset越界、Broker宕机等)和Offset的维护都需要自行处理。
• 大家可以关注下不日发布的0.9 Release。这帮货又用Java重写了一套Consumer。把两套API合并在一起,同时去掉了对Zookeeper的依赖。据说性能有大幅度提升哦~~

tips
强烈推荐使用Low level API,虽然繁琐一些,但是目前只有这个API可以对Error数据进行自定义处理,尤其是处理Broker异常或由于Unclean Shutdown导致的Corrupted Data时,否则无法Skip只能等着“坏消息”在Broker上被Rotate掉,在此期间该Replica将会一直处于不可用状态。


studentaaa
5楼 · 2021-01-15 09:48

(1)使用了页缓存技术
Kafka 每次接收到数据都会往磁盘上去写,它是基于操作系统的页缓存来实现文件写入的
操作系统本身有一层缓存,叫做 Page Cache,是在内存里的缓存,我们也可以称之为 OS Cache,意思就是操作系统自己管理的缓存。
在写入磁盘文件的时候,可以直接写入这个 OS Cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 OS Cache 里的数据真的刷入磁盘文件中
(2)以磁盘顺序写的方式来写
将数据追加到文件的末尾,不是在文件的随机位置来修改数据
(3)零拷贝技术
零拷贝就是不需要把 OS Cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了
Kafka就是直接让操作系统的 Cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤

相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问