kafka的消息存储机制和原理分析(kafka消息保留机制)

网友投稿 405 2022-08-08


kafka的消息存储机制和原理分析(kafka消息保留机制)

目录消息的保存路径数据分片log分段日志和索引文件内容分析在 partition 中通过 offset 查找 message过程日志的清除策略以及压缩策略日志的清理策略有两个日志压缩策略消息写入的性能顺序写零拷贝

消息的保存路径

消息发送端发送消息到 broker 上以后,消息是如何持久化的?

数据分片

kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。

Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是_

比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有http:// 3 个目录,firstTopic-0~3

多个分区在集群中多个broker上的分配方法

1.将所有 N Broker 和待分配的 i 个 Partition 排序

2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上

log分段

每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。

每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。

segment 的 index file 和 data file 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充

第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 0000000000000005376.log。

对应的 index 为 00000000000000005376.index

kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。

日志和索引文件内容分析

通过下面这条命令可以看到 kafka 消息日志的内容

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

输出结果为:

offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376

可以看到一条消息,会包含很多的字段,如下:

offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371

各字段的意义:

offset:记录号 ;position:偏移量;createTime:创建时间、keysize 和 valuesize 表示 key 和 value 的大小compresscodec:表示压缩编码payload:表示消息的具体内容

为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex 和 TimeIndex,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件

查看索引内容命令:

sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log

索引文件和日志文件内容关系如下

如上图所示,index 文件中存储了索引以及物理偏移量。

log 文件存储了消息的内容。

索引文件中保存了部分offset和偏移量position的对应关系。

比如 index文件中 [4053,80899],表示在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899.

在 partition 中通过 offset 查找 message过程

根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量position。(kafka 采用稀疏索引的方式来提高查找性能)得到 position 以后,再到对应的 log 文件中,从 position处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息

比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回

日志的清除策略以及压缩策略

日志的清理策略有两个

根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。

通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天

kafka会启动一个后台线程,定期检查是否存在可以删除的消息。

日志压缩策略

Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。

因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:

消息写入的性能

顺序写

我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;

这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。

零拷贝

即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝

消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。

在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。

虽然这个操作描述起来KtBpn很简单,但实际上经历了很多步骤。如下:

操作系统将数据http://从磁盘读入到内核空间的页缓存应用程序将数据从内核空间读入到用户空间缓存中应用程序将数据写回到内核空间到 socket 缓存中操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出

这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。

现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 linux 中,是通过 sendfile 系统调用来完成的。

java 提供了访问这个系统调用的方法:FileChannel.transferTo API。

使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。

所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringBoot整合MybatisPlus实现增删改查功能(spring mybatis增删改查)
下一篇:关于Kafka消费者订阅方式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~