MQ-Kafka4-日志存储

文件目录

image-20220106155935199

为了防止Log过大,Kafka映日了日志分段(LogSegment)的概念,将Log 切分为多个LogSegment。

Log在物理上只已文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".txnindex"为后缀的事务索引文件)

Log对应的是一个命名形式为 <topic>-<partition>的文件夹,假设有一个名为 "topic-log"的主题,此主题具有4个分区,那么实际物理存储上表现为:“topic-log-0”、“topic-log-1”、“topic-log-2”、“topic-log-3”

image-20220106162816388

向Log 中追加消息时是顺序写入的,只有最后一个LogSegment 才能执行写入操作,在此之前所有的LogSegment 都不能写入数据

为了方便描述,将最后一个LogSegment 称为activeSegment ,表示当前活跃的日志分段。随着消息的不断写入,当 activeSegment 满足一定的条件时,就创建新的activeSegment,并将消息追加到新的activeSegment

为了便于消息的检索,每个LogSegment 中的日志文件(以“ .log ”为文件后缀)都有对应的两个索引文件:

  • 偏移量索引文件,以“ .index ”为文件后缀
  • 时间戳索引文件,以“ .timeindex ”为文件后缀

每个LogSegment 都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offset 。偏移量是一个64 位的长整型数,日志文件和两个索引文件都是根据基准偏移量( baseOffset )命名的,名称固定为20 位数字,没有达到的位数则用0 填充。比如第一个LogSegment 的基准偏移量为0 ,对应的日志文件为00000000000000000000.log

image-20220106163610613

第2 个 LogSegment 对应的基准位移是133 ,也说明了该LogSegment 中的第一条消息的偏移量为133 '同时可以反映出第一个LogSegment 中共有133 条消息(偏移量从0 至132的消息)

消费者提交的位移是保存在Kafka 内部的主题consumer offsets中的,初始情况下这个主题并不存在,当第一次有消费者消费消息时会自动创建这个主题

Kafka文件目录布局

image-20220106163922633

每一个根目录都会包含最基本的4个检查点文件( xxx-checkpoint )和meta.propties 文件。在创建主题的时候,如果当前broker中不止配置了一个根目录,那么会挑选分区数最少的那个根目录来完成本次创建任务

日志格式

消息集称为 Record Batch,其内部可包含一条或多条消息

image-20220106170910446

在消息压缩的情形下, Record Batch Header 部分(参见图5-7 左部, 从first offset 到 records count 字段)是不被压缩的,而被压缩的是records 字段中的所有内容。生产者客户端中的ProducerBatch 对应这里的RecordBatch,而ProducerRecord 对应这里的Record

Record Record包含:

  1. length :消息总长度。
  2. attributes : 弃用,但还是在消息格式中占据1B 的大小, 以备未来的格式扩展。
  3. timestamp delta : 时间戳增量。通常一个time stamp 需要占用8 个字节,如果像这里一样保存与RecordBatch 的起始时间戳的差值,则可以进一步节省占用的字节数。
  4. offset delta : 位移增量。保存与RecordBatc h 起始位移的差值,可以节省占用的字节数
  5. headers :这个字段用来支持应用级别的扩展,包含key和value ,一个Record 里面可以包含0 至多个Header

RecodeBatch包含:

  1. first offset :表示当前RecordBatch 的起始位移。

  2. length :计算从partition leader epoeh 字段开始到末尾的长度。

  3. partition leader epoeh :分区leader 纪元,可以看作分区leader 的版本号或更新次数

  4. magic :消息格式的版本号,对v2 版本而言, magie 等于2 。

  5. attributes :消息属性(2B)

    • 低 3 位表示压缩格式,
    • 第4 位表示时间戳类型;
    • 第5 位表示此RecordBatch 是否处于事务中,0 表示非事务, l 表示事务。
    • 第6 位表示是否是控制消息(ControlBatch)。0 表示非控制消息,而 1 表示是控制消息,控制消息用来支持事务功能。
  6. last offset delta: RecordBatch 中最后一个Record 的offset 与自rst offset 的差值。主要被broker 用来确保RecordBatch 中Record 组装的正确性。

  7. first timestamp: RecordBatch 中第一条Record 的时间戳。

  8. max timestamp: RecordBatch 中最大的时间戳, 一般情况下是指最后一个Record的时间戳,和last offset delta 的作用一样,用来确保消息组装的正确性。

  9. produeer id: PID ,用来支持幂等和事务

日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

  • 偏移量索引文件用来建立消息偏移量( offset )到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;
  • 时间戳索引文件则根据指定的时间戳( timestamp )来查找对应的偏移量信息

Kafka 中的索引文件以 稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引页。每当写入一定量(由broker 端参数log.index.interval.bytes 指定,默认值为4096 ,即4KB )的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小log.index.interval.bytes的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。

偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。

时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。

当日志分段达到一定条件则创建新的日志分段,一定条件包括:

  1. 当前日志分段文件的大小超过了broker 端参数log.segment.bytes 配置的值,默认值1GB
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.hours参数配置的值。如果同时配置了log.roll.mslog.roll.hours 参数,那么log.roll.ms 的优先级高。默认情况下,只配置了log.roll.hours 参数,其值为168,即7天。
  3. 偏移量索引文件或时间戳索引文件的大小达到broker 端参数 log.index.size.max.bytes,默认10M
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE,即offset - baseOffset > Integer.MAX_VALUE

对非当前活跃的日志分段而言,其对应的索引文件内容己经固定而不需要再写入索引项,所以会被设定为只读,而对当前活跃的日志分段C activeSegment )而言,索引文件还会追加更多的索引项,所以被设定为可读写

在索引文件切分的时候, Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件

Kafka 在创建索引文件的时候会为其预分配log.index.size.max.bytes 大小的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候, Kafka 才会把该索引文件裁剪到实际的数据大小。也就是说,与当前活跃的日志分段对应的索引文件的大小固定为log.index.size.max.bytes,而其余日志分段对应的索引文件的大小为实际的占用空间。

偏移量索引

偏移量索引项的格式如下图,每个索引项占用8 个字节,分为两个部分。

image-20220106182254352
  1. relativeOffset:相对偏移量(4B),表示消息相对于baseOffset 的偏移量,当前索引文件的文件名即为baseOffset 的值。

  2. position:物理地址(4B),也就是消息在日志分段文件中对应的物理位置

消息的偏移量(offset)占用8 个字节,也称绝对偏移量。索引项中没有直接使用绝对偏移量而改为只占用4 个字节的相对偏移量CrelativeOffset =offset - baseOffset,这样可以减小索引文件占用的空间。

举个例子, 一个日志分段的baseOffset 为32 ,则文件名是00000000000000000032.log,offset 为35 的消息在索引文件中的relativeOffset 的值为35 - 32=3

image-20220106191956870

如果我们要查找偏移量为23 的消息,那么应该怎么做呢?

  1. 首先通过二分法在偏移量索引文件中找到不大于23 的最大索引项,即[ 22 , 656 ],

  2. 然后从日志分段文件中的物理位置656 开始顺序查找偏移量为23 的消息。

那么用户又是如何查找日志分段的呢?

并不是顺序查找,而是使用跳跃表的结构。Kafka的每个日志对象中使用了 ConcurrentSkipListMap来保存各个日志分段,每个日志分段的baseOffset 作为key,这样可以根据指定偏移量快速定位到消息所在的日志分段

Kafka 强制要求索引文件大小必须是索引项大小的整数倍,对偏移量索引文件而言,必须为8 的整数倍

时间戳索引

image-20220106193739690

每个索引项占用12 个字节,分为两个部分

  1. timestamp : 当前日志分段最大的时间戳。
  2. relativeOffset :时间戳所对应的消息的相对偏移量

时间戳索引文件中包含若干时间戳索引项, 每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的timestamp ,否则不予追加

与偏移量索引文件相似,时间戳索引文件大小必须是索引项大小(12B)的整数倍

会在偏移量索引文件和时间戳索引文件中分别增加一个偏移量索引项和时间戳索引项。两个文件增加索引项的操作是同时进行的,但并不意味着偏移量索引中的relativeOffset 和时间戳索引项中的relativeOffset 是同一个值

如果一个失败了怎么办?

为什么会出现不是同一个值的情况

image-20220106195049244

如果要查找指定时间戳 targetTimeStamp = 1526384718288 开始的消息,首先是找到不小于

指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段了, 需要分以下

几个步骤来完成。

  1. 步骤1 : 将 targetTimeStamp 和每个日志分段中的最大时间戳 largestTimeStamp 逐一对比,直到找到不小于targetTimeStamplargestTimeStamp 所对应的日志分段。日志分段中的 largestTimeStamp 的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则取该日志分段的最近修改时间。

  2. 步骤2 : 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于targetTimeStamp 的最大索引项,即[152638478283, 28],如此便找到了一个相对偏移量28 。

  3. 步骤3 : 在偏移量索引文件中使用二分算法查找到不大于28 的最大索引项,即[26, 838 ]

  4. 步骤4 :从步骤1中找到日志分段文件中的838 的物理位置开始查找不小于targetTimeStamp的消息

日志清理

Kafka 提供了两种日志清理策略

  1. 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段
  2. 日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本

通过broker端参数 log.cleanup.policy 来设置日志清理策略

  • “delete”:即采用日志删除的清理策略
  • “compact”: 即采用日志压缩的清理策略
  • “delete,compact”:同时迟滞日志删除和日志压缩两种策略

日志清理

在Kafka的日志管理器中有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms配置,默认5 分钟。当前日志分段的保留策略有3 种:

  • 基于时间的保留策略
  • 基于日志大小的保留策略
  • 基于日志起始偏移量的保留策略
基于时间

日志删除任务 检查日志文件中是否有保留时间超过设定的阀值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments),retentionMs 可以通过broker端参数log.retention.hourslog.retention.minuteslog.retention.ms 来配置,默认7 天

image-20220106201417174

查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间lastModifiedTime来计算的, 而是通过日志分段对应的时间戳索引文件,找出最后一条索引项(如果不大于0,则取最近修改时间lastModifiedTime)

如果所有的日志分段都己过期, 但该日志文件中还要有一个日志分段用于接收消息的写入,即必须要保证有一个活跃的日志分段acti veSegment ,在此种情况下,会先切分出一个新的日志分段作为activeSegment , 然后执行删除操作

删除日志分段步骤

  1. 首先会从Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。
  2. 然后将日志分段所对应的所有文件添加上 .deleted 的后缀(当然也包括对应的索引文件) 。
  3. 最后交由一个以 delete-file 命名的延迟任务来删除这些以 .deleted为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数默认1 分钟
基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阔值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),retentionSize 可以通过broker 端参数log.retention.bytes 来配置,默认值为 -1,表示无穷大

注意 log.retention.bytes 配置的是 Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log 日志文件)的大小。单个日志分段的大小由broker 端参数 log.segment.bytes 来限制,默认值为1GB

image-20220106202837928

基于日志大小的保留策略与基于时间的保留策略类似

基于日志起始偏移量

基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于logStartOffset,若是,则可以删除此日志分段。

如图,假设 logStartOffset 等于25,日志分段 1 的起始偏移量为0,日志分段2 的起始偏移量为11,日志分

段3 的起始偏移量为23 ,通过如下动作收集可删除的日志分段的文件集合deletableSegments :

image-20220106203450370
  1. 从头开始遍历每个日志分段,日志分段 1 的下一个日志分段的起始偏移量为11 ,小于 logStartOffset 的大小,将日志分段 1 加入deletableSegments 。

  2. 日志分段2 的下一个日志偏移量的起始偏移量为23 ,也小于logStartOffset 的大小,将日志分段2 页加入deletableSegments

  3. 日志分段3 的下一个日志偏移量在 logStartOffset 的右侧,故从日志分段3 开始的所有日志分段都不会加deletableSegments

logStartOffset是怎么来的?

一般情况下:日志文件的起始偏移量 logStartOffset 等于第一个日志分段的baseOffset,但可以通过脚本或请求进行修改

日志压缩

如果只关心 key 对应的最新 value 值,则可以开启Kafka 的日志清理功能,Kafka 会定期将相同 key 的消息进行合井,只保留最新的value值

注意区分日志压缩与消息压缩

image-20220106204542174

Log Compaction 执行前后,日志分段中的每条消息的偏移量和写入时的偏移量保持一致。Log Compaction 会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件来组织

拉取状态是客户端保存的,这个时候如果进行了日志压缩,是否导致乱序?

如何对日志文件中消息的Key进行筛选操作?

每个日志清理线程都会使用 SkimpyOffsetMap的对象来构建key与offset的映射关系的哈希表

截屏2022-01-06 下午8.51.33

日志清理需要遍历两次日志文件

第一次:遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中

第二次:检查每个消息的偏移量在Map中是否一样,否则就清理

墓碑消息是什么?

执行日志压缩之后,日志分段的大小会比原来小,如何防止出现大量小文件?

清理过程中并不对单个的日志分段进行单独清理,而是将日志文件中 offset 从 0 - firstUncleanableOffset的所有日志进行分组。每组中日志分段占用空间大小之和不超过 segmentSize( log.segment.bytes),清理后生成一个新的日志分段

image-20220106211250999

磁盘存储

在印象中,磁盘的速率要远低于内存,其实这要看我们怎么样使用磁盘。顺序写盘的速度不仅比随机写盘的速度快,而且也比随机写内存的速度快

image-20220106213056352

Kafka在设计时候采用了文件追击的方式来写入消息,只能在日志文件的尾部追加新的消息,并且也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作

日志压缩是不是破坏了顺序写盘?

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘I/O 的操作。其实是将磁盘中的数据缓存到内存中,将对磁盘的访问变为内存的访问

当进程准备读取磁盘上的文件时,操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中

  • 如果存在(命中)则直接返回数据

  • 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。

同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在, 则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性

Linux 操作系统中的vm.dirty background ratio 参数用来指定当脏页数量达到系统内存的百分之多少之后就会触发 pdflush/flush/kdmflush 等后台回写进程的运行来处理脏页, 一般设置为小于10 的值即可

Kafka 中大量使用了页缓存,这是Kafka 实现高吞吐的重要因素之一。

Kafka 中也提供了同步刷盘及间断性强制刷盘(fsync)的功能,这些功能可通过log.flush.interval.messageslog.flush.int erval.ms 等参数来控制,但不建议使用,会严重影响性能,消息的可靠性应该由多副本机制保证

另外Linux会使用磁盘的一部分做为swap分区,非活跃进行调入swap分区,把进程空出来让活跃的进程使用,Kafka也应该尽量避免使用 vm.swappiness

vm.swappiness

100 表示积极使用

0 表示任何时候都不要发生交换

磁盘I/O流程

磁盘IO四种场景如下:

  1. 用户调用IO操作接口,数据流为:应用程序buffer -> C 库标准IObuffer -> 文件系统页缓存 -> 通过具体文件系统到磁盘
  2. 用户调用文件I/O,数据流为:应用程序buffer -> 文件系统页缓存 -> 通过具体文件系统到磁盘
  3. 用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘
  4. 用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接写磁盘。
image-20220106231139238

零拷贝

参考链接

  1. 《深入理解Kafka核心设计与实践原理》