kafka 日志索引 AbstractIndex

AbstractIndex

AbstractIndex 是 Kafka 日志(Log)子系统中一个至关重要的基础类。它为 Kafka 的各种索引文件(如偏移量索引 .index 和时间戳索引 .timeindex)提供了一个统一的、抽象的框架。这个类的设计目标是实现极高的读写性能和可靠的文件管理。

AbstractIndex 最核心的设计思想是使用内存映射文件(mmap)来管理索引数据。这在类的注释和实现中都有清晰的体现。

// ... existing code ...private volatile MappedByteBuffer mmap;
// ... existing code ...private void createAndAssignMmap() throws IOException {
// ... existing code ...MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize());this.length = length;this.mmap = mmap;
// ... existing code ...}
// ... existing code ...

分析与逻辑:

  • 是什么:内存映射是一种将文件或设备直接映射到进程地址空间的技术。映射完成后,对这块内存的读写操作会由操作系统自动同步到对应的磁盘文件中。
  • 为什么用
    1. 高性能:Kafka 无需在用户空间(Java 堆)和内核空间之间频繁地复制数据。所有的读写操作都直接在 MappedByteBuffer 上进行,这本质上是在操作操作系统的页缓存(Page Cache)。这极大地减少了系统调用和内存拷贝的开销。
    2. 利用操作系统优化:将文件的缓存管理完全交给操作系统。现代操作系统在页缓存管理上(如 LRU 算法)已经做得非常成熟和高效,能够很好地适应 Kafka 索引的访问模式(通常是顺序写入和接近末尾的读取)。
    3. 持久化:通过调用 mmap.force() 方法,可以确保内存中的修改被刷写到磁盘,保证了数据的持久性。

文件管理与生命周期

AbstractIndex 封装了对底层索引文件的完整生命周期管理。

// ... existing code ...private volatile File file;
// ... existing code ...public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {
// ... existing code ...createAndAssignMmap();
// ... existing code ...}public boolean resize(int newSize) throws IOException {
// ... existing code ...}public void renameTo(File f) throws IOException {
// ... existing code ...}public boolean deleteIfExists() throws IOException {
// ... existing code ...}public void close() throws IOException {trimToValidSize();closeHandler();}
// ... existing code ...

分析与逻辑:

  • 创建:构造函数 AbstractIndex(...) 负责初始化。如果文件不存在,它会创建文件并预分配 maxIndexSize 大小的空间。预分配可以避免在写入过程中频繁地扩展文件,这是一种性能优化。
  • 调整大小 (resize):允许动态地改变索引文件的大小。一个关键点是,在 Windows 或 z/OS 上,必须先解除内存映射(safeForceUnmap())才能修改文件长度,这个方法处理了这种跨平台的兼容性问题。
  • 重命名 (renameTo):当日志段(Log Segment)滚动时,对应的索引文件也需要被重命名(例如,从 000...1.index 变为 000...1.snapshot),这个方法提供了原子性的重命名操作。
  • 关闭与清理 (closecloseHandlerdeleteIfExists)
    • close() 方法在关闭前会调用 trimToValidSize(),将文件裁剪到只包含有效数据的大小,回收未使用的预分配空间。
    • closeHandler() 会强制解除内存映射。注释中提到,这样做是为了避免 JVM 垃圾回收器在回收 MappedByteBuffer 时可能引发的长时间 STW(Stop-The-World)暂停(KAFKA-4614)。
    • deleteIfExists() 负责安全地删除索引文件。

并发控制

索引文件可能会被多个线程访问(例如,写入线程和读取线程),因此必须保证线程安全。

// ... existing code ...protected final ReentrantLock lock = new ReentrantLock();
// ... existing code ...public boolean resize(int newSize) throws IOException {lock.lock();try {
// ... existing code ...} finally {lock.unlock();}}public void flush() {lock.lock();try {mmap.force();} finally {lock.unlock();}}
// ... existing code ...

分析与逻辑:

  • AbstractIndex 使用了 java.util.concurrent.locks.ReentrantLock 来保护所有关键的修改操作,如 resizeflushcloseHandler 等。
  • 这确保了在进行文件结构性变更(如调整大小、刷盘、关闭)时,不会与其他操作发生冲突,保证了数据的一致性和完整性。

缓存友好的搜索算法(Warm Section 优化)

这是 AbstractIndex 中一个非常精妙的性能优化,在类的尾部有大段注释详细解释。

// ... existing code .../** Kafka mmaps index files into memory...* ...* However, when looking up index, the standard binary search algorithm is not cache friendly, and can cause unnecessary* page faults...* ...* Here, we use a more cache-friendly lookup algorithm:* if (target > indexEntry[end - N]) // if the target is in the last N entries of the index*    binarySearch(end - N, end)* else*    binarySearch(begin, end - N)** If possible, we only look up in the last N entries of the index. By choosing a proper constant N, all the in-sync* lookups should go to the 1st branch. We call the last N entries the "warm" section...** We set N (_warmEntries) to 8192, because...*/
// ... existing code ...

分析与逻辑:

  • 问题:标准的二分查找算法在访问大文件时,访问模式是跳跃式的,这对于操作系统的页缓存(通常使用 LRU 策略)非常不友好。它可能会导致频繁的“缺页中断”(Page Fault),即需要从磁盘加载数据到内存,从而阻塞线程,导致延迟飙升。
  • 解决方案:Kafka 观察到,绝大多数的索引查找(来自消费者或副本同步)都集中在索引文件的末尾部分。因此,AbstractIndex 将索引逻辑上划分为两部分:
    1. "Warm Section"(热区):索引文件末尾的 N 个条目(N 被设为 8192)。
    2. "Cold Section"(冷区):热区之前的所有条目。
  • 查找逻辑:当进行查找时,首先判断目标是否可能在“热区”内。如果是,则只在热区内进行二分查找;否则,才在整个“冷区”进行查找。
  • 效果:由于绝大多数查找都命中“热区”,而“热区”范围较小(8192 个条目)且被频繁访问,因此它所对应的内存页会一直保留在操作系统的页缓存中,从而避免了磁盘 I/O,保证了低延迟的查找性能。选择 8192 这个值也是经过计算的,以确保在常见 4KB 页大小的系统上,一次热区查找就能“触摸”到所有相关的内存页,使其保持“温热”。

indexSlotRangeFor

这是查找逻辑的入口点。它清晰地展示了如何决定是在热区还是冷区进行搜索。

// ... existing code .../*** Lookup lower or upper bounds for the given target.*/private int indexSlotRangeFor(ByteBuffer idx, long target, IndexSearchType searchEntity,SearchResultType searchResultType) {// check if the index is emptyif (entries == 0)return -1;// 1. 计算热区的起始位置int firstHotEntry = Math.max(0, entries - 1 - warmEntries());// 2. 判断目标是否大于热区的第一个条目,如果是,则在热区 [firstHotEntry, entries - 1] 中搜索if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {return binarySearch(idx, target, searchEntity,searchResultType, firstHotEntry, entries - 1);}// 3. 检查目标是否比整个索引的第一个条目还小(边界检查)if (compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) {switch (searchResultType) {case LARGEST_LOWER_BOUND:return -1;case SMALLEST_UPPER_BOUND:return 0;}}// 4. 如果目标不在热区,并且不小于第一个条目,则在冷区 [0, firstHotEntry] 中搜索return binarySearch(idx, target, searchEntity, searchResultType, 0, firstHotEntry);}
// ... existing code ...

代码逻辑分析:

  1. int firstHotEntry = Math.max(0, entries - 1 - warmEntries());
    • 这行代码计算出“热区”的起始条目索引。warmEntries() 方法返回热区包含的条目数量。
  2. if (compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0)
    • 这是一个关键的判断。它取出热区的第一个条目,并与目标值 target 进行比较。
    • 如果目标值比热区的起始值还要大,那么目标值只可能存在于热区中。此时,就调用 binarySearch 方法,但搜索范围被限定在 [firstHotEntry, entries - 1] 这个热区内。
  3. return binarySearch(idx, target, searchEntity, searchResultType, 0, firstHotEntry);
    • 如果上一步的判断不成立,说明目标值小于或等于热区的起始值,那么目标值就应该在“冷区”中。此时,调用 binarySearch 的搜索范围是 [0, firstHotEntry] 这个冷区。

热区大小的定义:warmEntries

这个方法定义了“热区”的大小。它的大小是固定的8KB,然后根据每个索引条目的大小(由子类定义)来计算出具体包含多少个条目。

// ... existing code ...* 1) support larger warm section* 2) make sure the warm section of low QPS topic-partitions are really warm.*/protected final int warmEntries() {return 8192 / entrySize();}protected void safeForceUnmap() {
// ... existing code ...

底层二分查找实现:binarySearch

这个方法是标准的二分查找算法,但它的特别之处在于,它接收 begin 和 end 参数,使其可以对索引的任意一个子区间(无论是热区还是冷区)进行操作。

// ... existing code ...private int binarySearch(ByteBuffer idx, long target, IndexSearchType searchEntity,SearchResultType searchResultType, int begin, int end) {// binary search for the entryint lo = begin;int hi = end;while (lo < hi) {int mid = (lo + hi + 1) >>> 1;IndexEntry found = parseEntry(idx, mid);int compareResult = compareIndexEntry(found, target, searchEntity);if (compareResult > 0)hi = mid - 1;else if (compareResult < 0)lo = mid;elsereturn mid;}
// ... existing code ...

AbstractIndex 通过 indexSlotRangeFor 方法作为分流器,先判断目标值的大致范围,然后调用通用的 binarySearch 方法在更小的、更有可能被操作系统缓存的“热区”或者“冷区”中进行精确查找,从而实现了高效且缓存友好的查询。

maybeLock 

这是一个设计得非常巧妙的辅助方法,它的主要目的是处理跨操作系统的兼容性问题,同时优化性能

// ... existing code .../*** Execute the given function in a lock only if we are running on windows or z/OS. We do this* because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it* and this requires synchronizing reads.*/protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)lock.lock();try {return action.execute();} finally {if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)lock.unlock();}}
// ... existing code ...

方法签名解析

  • protected finalprotected 意味着这个方法可以被同一个包下的类以及 AbstractIndex 的子类访问。final 意味着子类不能重写(override)这个方法,保证了其行为的一致性。
  • <T, E extends Exception>: 这是泛型定义。
    • T: 代表方法执行后返回值的类型。
    • E extends Exception: 代表方法在执行过程中可能抛出的异常类型,它必须是 Exception 的子类。
  • T maybeLock(Lock lock, StorageAction<T, E> action) throws E:
    • maybeLock: 方法名,意为“可能加锁”。
    • Lock lock: 接收一个 Lock 对象作为参数,用于实际的加锁解锁操作。
    • StorageAction<T, E> action: 接收一个 StorageAction 类型的对象。这本质上是一个函数式接口(类似于 Callable 或 Runnable),它封装了真正需要被执行的业务逻辑。
    • throws E: 声明了该方法可能会抛出 action 中定义的异常。

方法的注释已经清晰地解释了其设计意图:

Execute the given function in a lock only if we are running on windows or z/OS. We do this because Windows or z/OS won't let us resize a file while it is mmapped. As a result we have to force unmap it and this requires synchronizing reads.

翻译和解读:

  • 问题背景:在 Windows 和 z/OS 操作系统上,有一个限制:如果一个文件被内存映射(mmapped),你就不能改变这个文件的长度(比如 resize)。要改变文件长度,必须先解除内存映射(unmap),执行完操作后再重新映射。
  • 并发风险:解除和重新映射的过程不是原子的。如果在解除映射后、重新映射前,有另一个线程来读取这个索引,就可能会读到不一致或已失效的数据。因此,这个过程必须被同步机制(锁)保护起来,以防止并发读写冲突。
  • 性能优化:在 Linux/Unix 等其他操作系统上,没有这个限制,可以在文件被映射的同时调整其大小。在这些系统上,如果每次读取都加锁,会带来不必要的性能开销。
  • 解决方案 (maybeLock)
    1. 方法首先检查当前操作系统:if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
    2. 如果是 Windows 或 z/OS,它会执行 lock.lock(),获取锁。
    3. 然后,在 try 块中,执行传入的 action.execute(),也就是真正的业务逻辑(比如 lookup 操作)。
    4. 最后,在 finally 块中,再次检查操作系统,如果是 Windows 或 z/OS,则执行 lock.unlock() 释放锁。
    5. 如果不是 Windows 或 z/OSif 条件不满足,代码会直接执行 action.execute(),完全不会进行任何加锁解锁操作

在 OffsetIndex 类的 lookup 方法中,就使用了 maybeLock

// 在 OffsetIndex.java 中
public OffsetPosition lookup(long targetOffset) {return maybeLock(lock, () -> { // 使用 lambda 表达式传入一个 StorageActionByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);if (slot == -1)return new OffsetPosition(baseOffset(), 0);elsereturn parseEntry(idx, slot);});
}

当这段代码运行时:

  • 在 Linux 上:maybeLock 不会加锁,直接执行 lambda 表达式中的查找逻辑。
  • 在 Windows 上:maybeLock 会先获取 lock,然后执行 lambda 表达式中的查找逻辑,最后释放 lock

maybeLock 是一个典型的策略模式应用,它根据运行环境(操作系统)动态地选择不同的执行策略(加锁或不加锁)。它通过将“是否加锁”的判断逻辑与“具体业务逻辑”解耦,实现了:

  1. 跨平台兼容性:确保了在有特殊限制的操作系统上(Windows/z/OS)数据操作的线程安全。
  2. 性能最优化:避免了在没有限制的操作系统上(Linux/Unix)引入不必要的锁开销。
  3. 代码简洁性:将平台相关的锁逻辑封装在一个地方,使得调用方的代码(如 lookup)无需关心底层操作系统的差异,保持了业务逻辑的纯粹和清晰。

抽象与扩展

作为一个抽象类,它定义了所有索引的通用行为,并将与具体索引格式相关的部分留给子类实现。

// ... existing code ...public abstract class AbstractIndex implements Closeable {
// ... existing code ...public abstract void sanityCheck();public abstract void truncateTo(long offset);protected abstract void truncate();protected abstract int entrySize();protected abstract IndexEntry parseEntry(ByteBuffer buffer, int n);
// ... existing code ...}

分析与逻辑:

  • entrySize(): 不同的索引(偏移量索引、时间戳索引)每个条目的大小不同,由子类定义。
  • parseEntry(...): 如何从 ByteBuffer 中解析出一个具体的索引条目,其逻辑也由子类实现。
  • sanityCheck()truncateTo(...): 这些操作的具体逻辑也可能因索引类型而异。

这种设计遵循了面向对象的设计原则,使得代码结构清晰,易于扩展。如果未来需要引入新的索引类型(例如,基于 Producer ID 的索引),只需继承 AbstractIndex 并实现这些抽象方法即可。

总结

public abstract class AbstractIndex 是 Kafka 高性能存储引擎的基石。它通过内存映射文件实现了高效的 I/O,通过精细的文件生命周期管理和并发控制保证了数据安全和可靠性,并通过创新的缓存友好搜索算法解决了大规模数据查找的性能瓶颈。其良好的抽象设计也为系统的可扩展性提供了坚实的基础。理解了这个类,就等于掌握了 Kafka 日志索引实现的核心。

OffsetIndex

OffsetIndex 是 Kafka 存储引擎中一个具体且至关重要的索引实现。顾名思义,它负责维护 逻辑偏移量(Offset)物理文件位置(Position) 之间的映射关系。当消费者或副本需要从某个特定偏移量开始读取数据时,Kafka 正是利用 OffsetIndex 来快速定位到数据在日志文件(.log 文件)中的大致位置,从而避免了从头扫描整个巨大的日志文件。

OffsetIndex 的核心职责是提供高效的偏移量查找。它对应磁盘上的 .index 文件。为了实现极致的性能和空间效率,它采用了非常紧凑的文件格式。

文件格式分析:

类的注释中清晰地描述了其格式:

The physical format is a 4 byte "relative" offset and a 4 byte file location for the message with that offset.

  • 8字节条目(Entry): 每个索引条目固定为8个字节。这在代码中由常量定义:
    // ... existing code ...
    public final class OffsetIndex extends AbstractIndex {private static final Logger log = LoggerFactory.getLogger(OffsetIndex.class);private static final int ENTRY_SIZE = 8;
    // ... existing code ...
    
  • 4字节相对偏移量 (Relative Offset): 为了节省空间,索引文件中存储的不是绝对偏移量,而是相对于该日志段(Log Segment)的 baseOffset 的相对值。例如,如果一个日志段的 baseOffset 是 5000,那么逻辑偏移量 5050 在索引文件中会被存储为 50。这使得偏移量可以用一个4字节的整数表示,极大地扩展了单个日志段能覆盖的范围。
  • 4字节物理位置 (Physical Position): 这4个字节存储的是该消息在对应的 .log 文件中的物理字节位置。

这种设计体现了 Kafka 对性能和存储效率的极致追求。

继承 AbstractIndex 并实现其抽象方法

OffsetIndex 是 final 类,它继承了 AbstractIndex,这意味着它自动获得了父类提供的所有强大功能:

  • 内存映射(mmap):直接在操作系统的页缓存中进行读写,性能极高。
  • 文件生命周期管理:创建、关闭、重命名、调整大小等。
  • 并发控制:通过 ReentrantLock 保证线程安全。
  • 缓存友好的查找算法:自动拥有了“热区/冷区”分段二分查找的能力。

OffsetIndex 需要做的就是实现父类定义的抽象方法,告诉框架如何处理自己特有的8字节条目格式。

  • entrySize():

    // ... existing code ...@Overrideprotected int entrySize() {return ENTRY_SIZE;}
    

    这个方法非常简单,直接返回常量 8

  • parseEntry(ByteBuffer buffer, int n):

    // ... existing code ...@Overrideprotected OffsetPosition parseEntry(ByteBuffer buffer, int n) {return new OffsetPosition(baseOffset() + relativeOffset(buffer, n), physical(buffer, n));}private int relativeOffset(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE);}private int physical(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE + 4);}
    // ... existing code ...
    

    这是连接抽象框架和具体实现的桥梁。它根据给定的条目序号 n,从 ByteBuffer 中:

    1. 读取前4个字节作为相对偏移量 (relativeOffset)。
    2. 读取后4个字节作为物理位置 (physical)。
    3. 将相对偏移量加上 baseOffset() 转换回绝对偏移量。
    4. 最后,用绝对偏移量和物理位置创建一个 OffsetPosition 对象并返回。

lookup(long targetOffset)

// ... existing code ...public OffsetPosition lookup(long targetOffset) {return maybeLock(lock, () -> {ByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);if (slot == -1)return new OffsetPosition(baseOffset(), 0);elsereturn parseEntry(idx, slot);});}
// ... existing code ...

逻辑分析

  1. 它直接调用了从 AbstractIndex 继承的 largestLowerBoundSlotFor 方法。这个方法的作用是:找到小于或等于 targetOffset 的那个最大的偏移量所在的索引槽位(slot)。
  2. 这个调用自动享受了“热区/冷区”优化,性能非常高。
  3. 如果找不到(slot == -1),说明 targetOffset 比索引中最小的偏移量还要小,此时返回该日志段的起始位置 (baseOffset, 0)
  4. 如果找到了,就用 parseEntry 解析该槽位的数据,返回对应的 OffsetPosition

append: 唯一的写入方法。

// ... existing code ...public void append(long offset, int position) {lock.lock();try {if (isFull())throw new IllegalArgumentException("Attempt to append to a full index (size = " + entries() + ").");if (entries() == 0 || offset > lastOffset) {log.trace("Adding index entry {} => {} to {}", offset, position, file().getAbsolutePath());mmap().putInt(relativeOffset(offset));mmap().putInt(position);incrementEntries();lastOffset = offset;
// ... existing code ...} elsethrow new InvalidOffsetException("Attempt to append an offset " + offset + " to position " + entries() +" no larger than the last offset appended (" + lastOffset + ") to " + file().getAbsolutePath());} finally {lock.unlock();}}
// ... existing code ...

逻辑分析

  1. 该方法是线程安全的,通过 lock 保护。
  2. 它会检查索引是否已满,以及待追加的 offset 是否大于当前索引中最后的 lastOffset索引条目必须是按偏移量单调递增的
  3. 如果检查通过,它会将 offset 转换为相对偏移量,然后将4字节的相对偏移量和4字节的物理位置 position 写入 mmap
  4. 最后更新内部状态,如条目数 entries 和 lastOffset
  • truncateTo(long offset) 和 truncate(): 用于日志截断。当 Kafka 需要删除某个偏移量之后的数据时(例如日志清理策略或副本同步失败),会调用这些方法来同步截断索引文件,确保索引和日志文件的一致性。

总结

OffsetIndex 是一个设计精良、高度优化的类。它完美地展示了 Kafka 如何通过继承和组合来复用通用逻辑,同时通过专用的、紧凑的数据结构精巧的算法(如相对偏移量、热区查找)来满足特定的高性能需求。

简单来说,OffsetIndex 就是 Kafka 能够快速在海量数据中定位到任意一条消息的“目录”或“索引页”,是其实现高性能随机读取能力的关键所在。

TimeIndex

TimeIndex 是 Kafka 存储引擎中与 OffsetIndex 并列的另一个核心索引实现。它的主要职责是建立 时间戳(Timestamp)逻辑偏移量(Offset) 之间的映射关系。这个功能对于 Kafka 的许多高级特性至关重要,比如:

  • 按时间消费:允许消费者从指定的时间点开始消费消息(例如,消费过去一小时内的所有消息)。
  • 基于时间的日志保留策略:根据消息的时间戳来删除过期的日志段(例如,删除超过7天的数据)。

TimeIndex 对应于磁盘上的 .timeindex 文件。

TimeIndex 的核心是提供高效的时间戳查找。为了实现这一点,它采用了与 OffsetIndex 类似但又不同的紧凑文件格式。

文件格式分析:

类的注释中清晰地描述了其格式:

The physical format is a 8 bytes timestamp and a 4 bytes "relative" offset... A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen before OFFSET is TIMESTAMP.

  • 12字节条目(Entry): 每个索引条目固定为12个字节。这在代码中由常量定义:
    // ... existing code ...
    public class TimeIndex extends AbstractIndex {private static final Logger log = LoggerFactory.getLogger(TimeIndex.class);private static final int ENTRY_SIZE = 12;
    // ... existing code ...
    
  • 8字节时间戳 (Timestamp): 使用一个64位的长整型来存储时间戳,足以应对未来的需求。
  • 4字节相对偏移量 (Relative Offset): 与 OffsetIndex 一样,这里存储的也是相对于日志段 baseOffset 的相对偏移量,以节省空间。

关键语义:一个 (TIMESTAMP, OFFSET) 条目的含义是:在该日志段中,所有偏移量小于 OFFSET 的消息,其时间戳都小于或等于 TIMESTAMP。这个定义对于查找算法至关重要。

继承 AbstractIndex 并实现其抽象方法

TimeIndex 同样继承自 AbstractIndex,因此它也自动获得了内存映射、文件管理、并发控制和缓存友好查找等所有底层能力。它需要做的就是根据自己12字节的条目格式来实现父类的抽象方法。

  • entrySize():

    // ... existing code ...@Overrideprotected int entrySize() {return ENTRY_SIZE;}
    // ... existing code ...
    

    直接返回常量 12

  • parseEntry(ByteBuffer buffer, int n):

    // ... existing code ...@Overrideprotected TimestampOffset parseEntry(ByteBuffer buffer, int n) {return new TimestampOffset(timestamp(buffer, n), baseOffset() + relativeOffset(buffer, n));}private long timestamp(ByteBuffer buffer, int n) {return buffer.getLong(n * ENTRY_SIZE);}private int relativeOffset(ByteBuffer buffer, int n) {return buffer.getInt(n * ENTRY_SIZE + 8);}
    // ... existing code ...
    

    这个实现从 ByteBuffer 的指定位置 n

    1. 读取前8个字节作为时间戳。
    2. 读取后4个字节作为相对偏移量。
    3. 将相对偏移量加上 baseOffset() 转换回绝对偏移量。
    4. 最后,用时间戳和绝对偏移量创建一个 TimestampOffset 对象并返回。

lookup: 时间戳查找方法。

// ... existing code ...public TimestampOffset lookup(long targetTimestamp) {return maybeLock(lock, () -> {ByteBuffer idx = mmap().duplicate();int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY);if (slot == -1)return new TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset());elsereturn parseEntry(idx, slot);});}
// ... existing code ...

逻辑分析

  1. 它调用了继承自 AbstractIndex 的 largestLowerBoundSlotFor 方法,并指定按 KEY(即时间戳)搜索。
  2. 该方法会利用“热区/冷区”优化,高效地找到索引中时间戳小于或等于 targetTimestamp 的那个最大的条目
  3. 返回这个条目对应的 TimestampOffset。这个结果告诉调用者,从返回的 offset 开始扫描 .log 文件,就有可能找到时间戳大于等于 targetTimestamp 的第一条消息。

maybeAppend: 唯一的写入方法。

// ... existing code ...public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {if (!skipFullCheck && isFull())throw new IllegalArgumentException("Attempt to append to a full time index (size = " + entries() + ").");
// ... existing code ...if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException("Attempt to append a timestamp (" + timestamp + ") to slot " + entries()+ " no larger than the last timestamp appended (" + lastEntry.timestamp + ") to " + file().getAbsolutePath());// We only append to the time index when the timestamp is greater than the last inserted timestamp.if (timestamp > lastEntry.timestamp) {log.trace("Adding index entry {} => {} to {}.", timestamp, offset, file().getAbsolutePath());MappedByteBuffer mmap = mmap();mmap.putLong(timestamp);mmap.putInt(relativeOffset(offset));incrementEntries();this.lastEntry = new TimestampOffset(timestamp, offset);
// ... existing code ...}} finally {lock.unlock();}}
// ... existing code ...

逻辑分析

  1. 该方法是线程安全的,通过 lock 保护。
  2. 它有严格的校验:待追加的 timestamp 和 offset 都不能小于最后一条已存入的条目,这保证了索引的单调递增特性,这是二分查找正确性的前提。
  3. 一个关键点是 if (timestamp > lastEntry.timestamp):只有当新消息的时间戳严格大于上一条索引的时间戳时,才会追加新条目。如果时间戳相同,则不追加。这是一种优化,避免了在时间戳没有变化时(例如,Producer没有设置时间戳,使用LogAppendTime,而一批消息在同一毫秒内写入)产生冗余的索引项。
  • isFull():

    // ... existing code ...// We override the full check to reserve the last time index entry slot for the on roll call.@Overridepublic boolean isFull() {return entries() >= maxEntries() - 1;}
    // ... existing code ...
    

    这个方法重写了父类的实现。它故意保留了最后一个索引槽位。注释解释了原因:这是为日志段滚动(roll over)时准备的。当一个日志段即将被关闭时,Kafka需要确保该段最后一条消息的时间戳被准确记录下来,即使此时索引按正常标准已经“满了”。这个预留的槽位就是为了这个目的。

总结

TimeIndex 是 Kafka 实现高级时间相关功能(如按时间点查找、按时间保留数据)的基石。它与 OffsetIndex 协同工作,构成了 Kafka 高性能存储引擎的索引双雄。

它继承了 AbstractIndex 的通用高性能框架,并针对时间戳->偏移量的映射需求,定义了自己专属的12字节条目格式和读写逻辑。通过强制的单调递增规则和精巧的追加策略,TimeIndex 在保证数据正确性的同时,实现了极高的空间和时间效率。

TimeIndex类和具体索引条目

TimeIndex 类并不是只保存一项索引,而是作为一个管理者(Manager)或访问器(Accessor),负责管理和操作整个 .timeindex 文件,而这个文件里包含了成百上千条索引条目。

让我们一步步来分析:

可以把 TimeIndex 类想象成一个文件的“遥控器”或者“句柄”(Handle)。它本身不是数据集合,而是提供了操作数据集合(也就是 .timeindex 文件)的所有方法。

  • 文件 (.timeindex): 这是物理存储,是真正的数据仓库。它在磁盘上,里面按顺序存放着一条又一条的索引条目。
  • 类 (TimeIndex): 这是在内存中的一个对象,它“知道”如何去读、写、和管理那个物理文件。

当创建一个 TimeIndex 对象时,构造函数会打开对应的文件,并使用 内存映射(mmap) 技术将文件内容映射到内存中,以便进行高性能的读写。

// ... existing code ...
public class TimeIndex extends AbstractIndex {
// ... existing code ...@SuppressWarnings("this-escape")public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) throws IOException {// super(...) 负责打开文件、进行内存映射等底层操作super(file, baseOffset, maxIndexSize, writable);// 从文件中读取最后一个条目,并缓存起来this.lastEntry = lastEntryFromIndexFile();log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}",file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position());}
// ... existing code ...

在构造函数中,super(...) 调用了父类 AbstractIndex 的逻辑,正是这一步完成了对 file 的打开和内存映射。

文件内容:一系列的索引条目

.timeindex 文件并不是只存一个 TimestampOffset。它的内部格式是一连串的12字节的条目:

[Timestamp 1 (8 bytes), Offset 1 (4 bytes)], [Timestamp 2 (8 bytes), Offset 2 (4 bytes)], [Timestamp 3 (8 bytes), Offset 3 (4 bytes)], ...

TimeIndex 类提供了 parseEntry 方法,用于从文件的任意位置解析出这样一条条的 TimestampOffset 对象。

// ... existing code ...@Overrideprotected TimestampOffset parseEntry(ByteBuffer buffer, int n) {// 从 buffer (也就是映射的内存) 的第 n 个条目位置,解析出一个 TimestampOffset 对象return new TimestampOffset(timestamp(buffer, n), baseOffset() + relativeOffset(buffer, n));}
// ... existing code ...

lastEntry 字段的作用:性能优化缓存

现在我们来解释最关键的一点:private volatile TimestampOffset lastEntry; 这个字段为什么存在?

它并不是说 TimeIndex 只存这一个条目,而是对文件中最后一个条目的内存缓存(Cache)

为什么需要这个缓存? 当 Kafka 往日志段中写入新消息时,需要判断是否要向 .timeindex 文件中追加新的索引条目。追加的规则之一是,新消息的时间戳必须大于索引中最后一条记录的时间戳。

// ... existing code ...public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {lock.lock();try {
// ...// 检查时间戳是否比上一个条目大if (entries() != 0 && timestamp < lastEntry.timestamp)throw new IllegalStateException("...");// 只有当新时间戳 > 上一个时间戳时,才写入if (timestamp > lastEntry.timestamp) {// ... 写入文件 ...// 更新内存中的缓存this.lastEntry = new TimestampOffset(timestamp, offset);}
// ...} finally {lock.unlock();}}
// ... existing code ...

如果没有 lastEntry 这个缓存,那么每次调用 maybeAppend 时,都需要去访问文件(即使是通过mmap)来读取最后12个字节,以获取上一个时间戳。通过 lastEntry 字段将这个值缓存在内存中,就可以极快地完成这个比较,这是一个非常重要的性能优化。volatile 关键字确保了多线程之间的可见性。

总结

  • TimeIndex 类 是一个管理器,它引用并操作一个文件
  • .timeindex 文件 是一个数据容器,它包含了一个索引条目序列
  • TimestampOffset 对象 是索引条目在内存中的逻辑表示
  • lastEntry 字段 是对文件中最后一个条目的性能缓存,而不是 TimeIndex 的全部内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/89959.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/89959.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重学前端008 --- 响应式网页设计 CSS 无障碍 Quiz

文章目录meta 总结html 页面结构img 尺寸子选择器 >a 锚点仅屏幕阅读器可见li 元素的悬停设置小屏幕防止溢出meta 总结 <head><!-- 基础字符编码声明 --><meta charset"UTF-8"><!-- 视口设置&#xff0c;响应式设计必备 --><meta nam…

C# 调用CodeSoft模板打印标签,编辑模板覆盖根目录的文件,不能拷贝

C# 调用CodeSoft模板打印标签&#xff0c;编辑模板覆盖根目录的文件&#xff0c;不能拷贝&#xff0c;报文件已经打开。 原因&#xff1a;C#窗体关闭时&#xff0c;没有关闭LabelManager2.ApplicationClass labApp&#xff0c;别忘记写labApp1.Quit(); if (labApp1 ! null) {la…

Logback简单使用

Logback 日志框架介绍 正如你所知&#xff0c;开发者拥有大量日志工具可供选择。本节中&#xff0c;我们将学习一个非常流行的日志库 —— Logback。它是 Log4j 日志库的继任者&#xff0c;基于相似的理念构建。Logback 在同步和异步日志记录方面都非常快速&#xff0c;并提供了…

Python爬虫实战:研究langid.py库相关技术

一、引言 在当今全球化的网络环境下,互联网上的内容呈现出多语言的特点。对于许多自然语言处理 (NLP) 任务,如文本分类、情感分析和信息检索,准确识别文本的语言是首要步骤。网络爬虫作为获取互联网内容的重要工具,结合语言识别技术,可以为多语言信息处理提供丰富的数据来…

打车代驾 app 派单接单系统模块搭建

一、逻辑分析打车代驾 APP 的派单接单系统模块是整个应用的核心部分&#xff0c;它需要高效、准确地处理订单分配和司机接单流程&#xff0c;以确保用户能够快速得到服务&#xff0c;司机能够合理地接到订单。用户端下单逻辑&#xff1a;用户打开 APP&#xff0c;输入出发地、目…

Java Stream API性能优化:原理深度解析与实战指南

Java Stream API性能优化&#xff1a;原理深度解析与实战指南 技术背景与应用场景 随着大数据量处理和高并发场景的普及&#xff0c;传统的集合遍历方式在代码可读性和性能上逐渐显现瓶颈。Java 8引入的Stream API&#xff0c;通过声明式的流式编程极大提升了开发效率和可读性&…

Nginx配置proxy protocol代理获取真实ip

Nginx配置proxy protocol 文章目录Nginx配置proxy protocol前言一、PROXY Protocol协议二、配置方法代理服务器配置http模块代理​​Stream 模块​代理测试配置是否生效端口检查测试ip记录验证http验证tcp注意事项和理解误区应用程序机器配置总结前言 在现代开发中有很多场景需…

什么是商业智能BI数据分析的指标爆炸?

指标爆炸这个词大家可能都是第一次听说&#xff0c;指标怎么会爆炸呢&#xff1f;其实这个是我们很多年前在一些商业智能BI项目上总结出来的一种场景或者现象&#xff0c;就是过于的开放给业务人员在BI自助分析过程中创造了很多衍生性的分析指标&#xff0c;结果就造成了前端指…

Spring AI 系列之十八 - ChatModel

之前做个几个大模型的应用&#xff0c;都是使用Python语言&#xff0c;后来有一个项目使用了Java&#xff0c;并使用了Spring AI框架。随着Spring AI不断地完善&#xff0c;最近它发布了1.0正式版&#xff0c;意味着它已经能很好的作为企业级生产环境的使用。对于Java开发者来说…

Linux学习之Linux系统权限

在上一篇的内容中我们学习到了Linux系统命令相关的知识及其相关的扩展内容&#xff0c;本期我们将学习Linux基础的另一个重要部分&#xff1a;Linux系统权限管理 作者的个人gitee&#xff1a;楼田莉子 (riko-lou-tian) - Gitee.com 目录 权限概念及必要性 什么是权限 为什么要…

Web3.0 能为你带来哪些实质性的 改变与突破

如今各种大厂裁员消息层出不穷&#xff0c;今年又添飞书、剪映、微软、思科... 这有一张网友整理的去年互联网大厂裁员裁员信息表&#xff1a; 目前国内很多大厂都在裁员&#xff0c;非常现实、且越来越多 35 技术人&#xff0c;正在面临这样的问题&#xff0c;那么Web3.0 确实…

doker centos7安装1

1.什么是doker Docker 是一个开源的应用容器引擎&#xff0c;它允许开发者将应用程序及其依赖项打包到一个可移植的容器中&#xff0c;然后发布到任何支持 Docker 的操作系统上&#xff0c;实现 “一次构建&#xff0c;到处运行”。 容器是一种轻量级的虚拟化技术&#xff0c…

自动化面试题

1、什么是测试套件测试套件是多个测试用例的集合。2、搭建接口自动化框架中&#xff0c;你遇到最大的难点是什么&#xff0c;以及怎么解决的?测试数据动态管理难点:接口依赖动态参数(如Token、订单ID)&#xff0c;数据无法硬编码.解决方案:使用关联提取(如正则提取响应中的Tok…

【Linux】LVS(Linux virual server)环境搭建

一、LVS的运行原理1.1 LVS简介LVS:Linux Virtual Server&#xff0c;负载调度器&#xff0c;内核集成&#xff0c;章文嵩&#xff0c;阿里的四层SLB(Server LoadBalance)是基于LVSkeepalived实现LVS 官网: http://www.linuxvirtualserver.org/ LVS 相关术语 VS: Virtual Server…

算法竞赛备赛——【图论】求最短路径——Dijkstra

Dijkstra 用来计算从一个点到其他所有点的最短路径的算法&#xff0c;是一种单源最短路径算法。也就是说&#xff0c;只能计算起点只有一个的情况。Dijkstra的时间复杂度是O (|v|^2)&#xff0c;它不能处理存在负边权的情况。 邻接矩阵存图 #include<iostream> using …

影刀 RPA:批量修改 Word 文档格式,高效便捷省时省力

在日常办公和文档处理中&#xff0c;Word 文档格式的统一和规范是许多企业和个人用户的重要需求。无论是撰写报告、制作提案&#xff0c;还是整理资料&#xff0c;都需要确保文档格式的一致性。然而&#xff0c;手动修改多个 Word 文档的格式不仅耗时费力&#xff0c;还容易因疏…

GitLab 社区版 10.8.4 安装、汉化与使用教程

一、GitLab 安装 GitLab 提供了集成所需软件的 RPM 包&#xff0c;简化了安装流程。我们选择安装社区版&#xff08;CE&#xff09;10.8.4&#xff0c;可通过官方网站或国内镜像源&#xff08;如清华镜像&#xff09;获取安装包。 1. 准备工作 首先创建工具目录并进入&#…

[硬件电路-64]:模拟器件 -二极管在稳压电路中的应用

二极管在稳压电路中的应用主要基于其单向导电性和特定类型二极管&#xff08;如稳压二极管&#xff09;的电压稳定特性。以下是详细解释&#xff1a;一、普通二极管的稳压作用&#xff08;有限场景&#xff09;正向导通压降的利用&#xff1a;原理&#xff1a;普通二极管在正向…

【Linux】重生之从零开始学习运维之Nginx

安装apt/yum安装apt imstall nginx yum install nginxRocky源码编译安装基础编译环境yum install gcc make gcc-c glibc glibc-devel pcre pcre-devel openssl openssldevel systemd-devel zlib-devel yum install libxml2 libxml2-devel libxslt libxslt-devel php-gd gd-deve…

主流 MQ 的关键性能指标

常用消息队列&#xff08;MQ&#xff09;的“数量级”通常围绕吞吐量&#xff08;TPS&#xff0c;每秒处理消息数&#xff09;、消息堆积能力、延迟三个核心指标展开&#xff0c;不同MQ因设计目标&#xff08;高吞吐、低延迟、高可靠等&#xff09;不同&#xff0c;数量级差异显…