LocalTableQuery
LocalTableQuery
是 Paimon 中实现本地化、带缓存的表查询的核心引擎。它的主要应用场景是 Flink 中的 Lookup Join。当 Flink 作业需要根据一个流中的 Key 去关联一个 Paimon 维表时,LocalTableQuery
可以在 Flink 的 TaskManager 节点上,通过将 Paimon 表的远程数据文件拉取到本地并建立索引,来实现高性能的低延迟点查。
LocalTableQuery
实现了 TableQuery
接口,其核心职责可以概括为:
- 管理表数据视图:在内存中维护一个按分区(Partition)和桶(Bucket)组织的表数据文件视图。
- 提供点查(Lookup)能力:对外提供
lookup(partition, bucket, key)
方法,用于根据主键查询单条数据。 - 动态更新数据:提供
refreshFiles
方法,可以增量地更新内存中的文件视图,以响应 Paimon 表的数据变更(新的 Commit)。 - 整合本地缓存机制:它是
LookupLevels
和LookupFile
的顶层封装和协调者,负责创建和管理这些底层的缓存查询组件。 - 配置与初始化:负责初始化所有进行本地查询所需要的组件,如文件读取器、本地 IO 管理器、缓存策略等。
关键成员变量
LocalTableQuery
的成员变量揭示了它的内部结构和依赖。
// ... existing code ...
public class LocalTableQuery implements TableQuery {private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> tableView;private final CoreOptions options;private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;private final LookupStoreFactory lookupStoreFactory;private final int startLevel;private IOManager ioManager;@Nullable private Cache<String, LookupFile> lookupFileCache;private final RowType rowType;private final RowType partitionType;@Nullable private Filter<InternalRow> cacheRowFilter;
// ... existing code ...
tableView
: 核心数据结构。这是一个嵌套的 Map (Map<Partition, Map<Bucket, LookupLevels>>
),它在内存中构建了整个表的逻辑视图。通过它,可以快速定位到任意分区任意桶的数据查询处理器LookupLevels
。options
: 表的配置项,决定了缓存策略、Merge-Engine 行为等。keyComparatorSupplier
: 主键比较器的提供者。readerFactoryBuilder
: 用于构建读取远程数据文件(如 Parquet)的KeyValueFileReaderFactory
。它封装了文件格式、数据类型等信息。lookupStoreFactory
: 用于创建本地查找文件(LookupFile
)的工厂。startLevel
: 查询起始的 Level。对于partial-update
或aggregation
等需要合并历史数据的场景,需要从 Level 1 开始查(needLookup()
为 true);对于deduplicate
这种新数据直接覆盖旧数据的场景,可以从 Level 0 开始查,性能更高。ioManager
: 本地磁盘 I/O 管理器,负责创建本地缓存文件所需的临时目录和文件。lookupFileCache
: 共享的缓存实例。这是一个Caffeine
缓存,用于存储LookupFile
对象。这个缓存可以在同一个 TaskManager 的多个LocalTableQuery
实例间共享,避免对同一个远程文件重复创建本地缓存。cacheRowFilter
: 一个可选的过滤器,可以在创建本地缓存文件时,预先过滤掉不需要的数据,从而减小本地缓存文件的大小。
构造函数 LocalTableQuery(FileStoreTable table)
负责从 FileStoreTable
中提取所有必要信息,并初始化各个工厂和配置。它会检查表的类型(必须是带主键的 KeyValueFileStore
),并根据表的配置(merge-engine
, sequence.field
等)来决定 startLevel
,这是对查询行为的一个重要优化。
refreshFiles(...)
// ... existing code ...public void refreshFiles(BinaryRow partition,int bucket,List<DataFileMeta> beforeFiles,List<DataFileMeta> dataFiles) {LookupLevels<KeyValue> lookupLevels =tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket);if (lookupLevels == null) {Preconditions.checkArgument(beforeFiles.isEmpty(),"The before file should be empty for the initial phase.");newLookupLevels(partition, bucket, dataFiles);} else {lookupLevels.getLevels().update(beforeFiles, dataFiles);}}
// ... existing code ...
这是实现数据动态更新的关键。当 Paimon 表有新的数据提交时,外部调用者(如 PrimaryKeyPartialLookupTable
)会通过 StreamTableScan
感知到文件的变化(beforeFiles
是被删除的文件,dataFiles
是新增的文件),然后调用此方法。
- 如果
lookupLevels
不存在(即第一次加载该分区/桶的数据),则调用newLookupLevels
创建一个全新的查询处理器。 - 如果
lookupLevels
已存在,则调用其update
方法,增量地更新内部的文件列表,同时清理掉被删除文件(beforeFiles
)对应的本地缓存。
newLookupLevels(...)
这个私有方法是真正创建查询处理器的地方。当需要为一个新的分区/桶建立查询能力时,此方法被调用。
// ... existing code ...private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels());// ...KeyValueFileReaderFactory factory =readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory());// ...if (lookupFileCache == null) {lookupFileCache =LookupFile.createCache(options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));}LookupLevels<KeyValue> lookupLevels =new LookupLevels<>(levels,// ... 传入各种工厂和配置 ...lookupFileCache);tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels);}
// ... existing code ...
它执行了以下关键步骤:
- 初始化
Levels
: 将文件列表组织成 LSM-Tree 的分层结构。 - 构建文件读取工厂
factory
: 准备好读取远程文件的能力。 - 创建共享缓存
lookupFileCache
: 如果是第一次创建,则根据配置初始化Caffeine
缓存。 - 实例化
LookupLevels
: 将上面准备好的所有组件(Levels
、比较器、各种工厂、共享缓存等)组装起来,创建一个LookupLevels
实例。 - 存入
tableView
: 将新创建的lookupLevels
实例放入tableView
中,以备后续查询使用。
lookup(...)
// ... existing code ...@Nullable@Overridepublic synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key)throws IOException {Map<Integer, LookupLevels<KeyValue>> buckets = tableView.get(partition);if (buckets == null || buckets.isEmpty()) {return null;}LookupLevels<KeyValue> lookupLevels = buckets.get(bucket);if (lookupLevels == null) {return null;}KeyValue kv = lookupLevels.lookup(key, startLevel);if (kv == null || kv.valueKind().isRetract()) {return null;} else {return kv.value();}}
// ... existing code ...
这是对外提供查询服务的入口。
- 从
tableView
中根据partition
和bucket
找到对应的LookupLevels
实例。 - 调用
lookupLevels.lookup(key, startLevel)
,执行真正的、带缓存的查找逻辑。 - 对返回的
KeyValue
进行处理,如果数据不存在,或者是一条删除标记(Retract
),则返回null
,否则返回其value
。 synchronized
关键字表明当前实现是线程安全的,但注释也指出了未来的优化方向是移除它以支持多线程并发查询。
close()
close()
方法负责释放资源。它会遍历 tableView
中所有的 LookupLevels
实例并调用它们的 close()
方法。这会触发 LookupLevels
清理它自己创建的缓存项。最后,它会清空整个共享缓存 lookupFileCache
,确保所有本地文件都被删除,所有资源都被释放。
总结
LocalTableQuery
是一个上层的协调和管理类。它本身不执行具体的 I/O 或查找算法,而是通过巧妙地组织和管理 LookupLevels
、LookupFile
、KeyValueFileReaderFactory
等底层组件,构建了一个完整的、动态的、高性能的本地化查询服务。它将 Paimon 表的静态文件视图,转化为了一个可以响应数据更新、并提供低延迟点查能力的“活”的查询引擎,是支撑 Flink Lookup Join 功能的基石。
谁主动监控元数据变化?
LocalTableQuery.refreshFiles(...)
本身是一个被动的方法,它只负责接收文件更新信息并应用到内存视图中。监控文件变化的职责由更上层的、与计算引擎(如 Flink)紧密集成的类来承担。
在 Flink Lookup Join 的场景下,主要的监控和调用者是 PrimaryKeyPartialLookupTable
。
我们来梳理一下这个调用链:
监控者:PrimaryKeyPartialLookupTable
这个类内部有一个 LookupTable
实现,其中包含一个 refresh()
方法。这个方法是监控和刷新逻辑的核心。
// ... existing code ...@Overridepublic void refresh() {while (true) {// 1. 规划一次增量扫描,获取文件的变更List<Split> splits = scan.plan().splits();log(splits);// 2. 如果没有新的变更,说明数据已是最新,退出循环if (splits.isEmpty()) {return;}// 3. 遍历所有的变更(每个 split 代表一个 bucket 的变更)for (Split split : splits) {BinaryRow partition = ((DataSplit) split).partition();int bucket = ((DataSplit) split).bucket();List<DataFileMeta> before = ((DataSplit) split).beforeFiles();List<DataFileMeta> after = ((DataSplit) split).dataFiles();// 4. 将文件变更信息传递给 LocalTableQuerytableQuery.refreshFiles(partition, bucket, before, after);}}}
// ... existing code ...
分析:
- 谁在监控? 核心是
scan.plan().splits()
这行代码。这里的scan
是一个StreamTableScan
实例。它被配置为增量模式,每次调用plan()
时,它会从上一次结束的位置开始,扫描 Paimon 的 Manifest 文件,找出从上次扫描到现在发生变化的文件。 - 监控到了什么?
scan.plan()
的结果是一系列的DataSplit
。每个DataSplit
都封装了一个 Bucket 内的数据文件变化,其中:dataFiles()
(即after
): 本次扫描发现的新增文件。beforeFiles()
(即before
): 因为 Compaction 操作而被替换掉的旧文件。
- 如何触发更新?
refresh()
方法拿到这些DataSplit
后,从中解析出分区、Bucket、新增文件和过期文件列表,然后调用我们关注的tableQuery.refreshFiles(...)
方法,将这些元数据变更信息传递下去。
触发时机:FileStoreLookupFunction
那么,PrimaryKeyPartialLookupTable.refresh()
方法又是在什么时候被调用的呢?这通常由 Flink 的 RichAsyncFunction
实现,即 FileStoreLookupFunction
来控制。
FileStoreLookupFunction.java
// ... existing code ...void tryRefresh() throws Exception {// ... (省略黑名单和动态分区检查) ...// 3. refresh lookup table// 检查是否到达了刷新间隔if (shouldRefreshLookupTable()) {// 调用上一节的 refresh 方法lookupTable.refresh();// 更新下一次刷新时间nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis();}}
// ... existing code ...
分析:
FileStoreLookupFunction
会在处理数据(asyncInvoke
)的间隙或者通过定时器,周期性地调用 tryRefresh()
方法。这个方法会根据用户配置的刷新间隔(lookup.refresh-interval
)来决定是否需要调用 lookupTable.refresh()
,从而启动一轮新的文件变更监控。
总结
整个监控和刷新流程如下:
- 定时触发 (The "When"):
FileStoreLookupFunction
作为 Flink 的一个算子的核心逻辑,根据配置的刷新间隔,定时触发刷新操作。 - 调用刷新 (The "Trigger"):
FileStoreLookupFunction
调用PrimaryKeyPartialLookupTable.refresh()
方法。 - 监控变更 (The "Monitor"):
refresh()
方法内部通过StreamTableScan.plan()
扫描 Paimon 的 Manifest 文件,获取到自上次扫描以来的文件增量变化(新增和被替换的文件)。 - 应用更新 (The "Action"):
refresh()
方法将获取到的文件变更信息,最终调用LocalTableQuery.refreshFiles()
,将更新应用到本地查询引擎的内存视图中。
所以,LocalTableQuery
只是一个“执行者”,真正的“监控者”是 StreamTableScan
,而“调度者”则是 PrimaryKeyPartialLookupTable
和 FileStoreLookupFunction
。这是一个典型的分层设计,各司其职,非常清晰。
RemoteTableQuery 和 LocalTableQuery 的区别
这两个类都实现了 TableQuery
接口,提供了对 Paimon 表进行 Key-Value 方式点查(Lookup)的能力,但它们的实现机制和适用场景截然不同,代表了 Paimon 在 Lookup Join 场景下的两种核心策略:本地缓存模式和远程服务模式。
核心区别:架构与数据流
1. LocalTableQuery (本地缓存模式)
去中心化。查询逻辑和数据缓存完全发生在 Flink 的 TaskManager 进程内部。每个需要进行 Lookup Join 的 Flink Task 都会在自己的本地磁盘上创建和管理一份数据文件的缓存。
数据流:
- 拉取 (Pull):
Flink Task 启动时,LocalTableQuery
会根据需要查询的数据,主动从远程存储(如 HDFS、S3)将 Paimon 的 SST 文件(如 Parquet)整个拉取到 TaskManager 的本地磁盘。 - 转换与索引:
拉取到本地后,它会将 SST 文件转换为一种更适合点查的格式(如哈希索引文件),并建立布隆过滤器等索引结构。这个过程由LookupStoreFactory
和LookupFile
负责。 - 查询:
后续的lookup
请求直接在本地磁盘上的索引文件进行,速度极快,不涉及网络 IO。 - 刷新:
通过refreshFiles
机制,定期从远程存储拉取最新的增量文件,并更新本地缓存。
代码体现:
持有 LookupLevels
、LookupFileCache
等大量与本地缓存、文件管理相关的组件。构造函数和 newLookupLevels
方法中充满了创建本地文件、管理本地 IO (IOManager
)、配置缓存策略的逻辑。
// LocalTableQuery.java
public class LocalTableQuery implements TableQuery {private final Map<BinaryRow, Map<Integer, LookupLevels<KeyValue>>> tableView;// ...private IOManager ioManager;@Nullable private Cache<String, LookupFile> lookupFileCache;// ... existing code ...
}
RemoteTableQuery (远程服务模式)
客户端-服务器 (Client-Server)。它依赖一个独立部署的、专门用于提供查询服务的集群(Query Service)。RemoteTableQuery
本身只是一个轻量级的客户端。
数据流:
- 服务发现:
RemoteTableQuery
启动时,通过ServiceManager
去发现和定位远程 Query Service 的地址。 - RPC 调用:
当lookup
请求发生时,RemoteTableQuery
将要查询的partition
、bucket
、key
等信息打包,通过网络 RPC 调用发送给远程的 Query Service。 - 远程处理:
Query Service 接收到请求后,在其内部执行真正的查询逻辑(其内部可能也使用了类似LocalTableQuery
的机制来加速查询),然后将结果返回给客户端。 - 接收结果:
RemoteTableQuery
接收到网络返回的结果并将其返回给上层调用者。
代码体现:
核心成员是 KvQueryClient
,一个专门用于和服务端进行 RPC 通信的客户端。lookup
方法的核心逻辑是调用 client.getValues(...)
,这是一个异步的网络请求。完全没有本地文件、缓存、IO 管理相关的逻辑。它不关心数据存储在哪里,只关心服务端的地址。
// RemoteTableQuery.java
public class RemoteTableQuery implements TableQuery {private final FileStoreTable table;private final KvQueryClient client;private final InternalRowSerializer keySerializer;// ...public RemoteTableQuery(Table table) {this.table = (FileStoreTable) table;ServiceManager manager = this.table.store().newServiceManager();this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);// ...}@Nullable@Overridepublic InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException {BinaryRow row;try {row =client.getValues(partition,bucket,new BinaryRow[] {keySerializer.toBinaryRow(key)}).get()[0];// ...}// ... existing code ...
}
优缺点与适用场景对比
特性 | LocalTableQuery (本地缓存模式) | RemoteTableQuery (远程服务模式) |
---|---|---|
性能 | 极高。一旦缓存预热完成,所有查询都是本地磁盘/内存操作,无网络延迟。 | 较高,但有网络开销。每次查询都涉及一次 RPC 网络往返,延迟相对较高。 |
资源消耗 | 高。每个 Flink TaskManager 都需要消耗额外的磁盘空间来存储缓存文件,并消耗 CPU 和内存来构建和维护索引。 | 低。Flink TaskManager 端非常轻量,只作为客户端,不消耗额外磁盘,内存和 CPU 占用极少。资源消耗集中在独立的 Query Service 集群。 |
部署复杂度 | 低。无需额外部署,逻辑内嵌在 Flink 作业中。 | 高。需要独立部署和维护一套 Paimon Query Service 集群,并确保其高可用。 |
数据一致性 | 最终一致性。依赖 Flink 作业的刷新间隔(lookup.refresh-interval )来同步数据,存在一定的延迟。 | 强一致性(取决于 Query Service 的实现)。Query Service 可以更及时地感知数据更新,提供更实时的数据视图。 |
扩展性 | 受限于 Flink TaskManager。如果维表数据量巨大,可能会撑爆 TaskManager 的本地磁盘。 | 良好。可以通过水平扩展 Query Service 集群来应对高并发和大数据量的查询需求,与 Flink 作业本身解耦。 |
适用场景 | 1. 维表数据量可控,能够完全放入 TaskManager 本地磁盘。 2. 对查询延迟要求极高的场景。 3. 部署架构简单,不希望引入额外组件。 | 1. 超大规模维表,无法在每个 TaskManager 中缓存全量数据。 2. 多作业共享同一张维表,通过服务化避免重复缓存。 3. 希望将查询服务与计算任务解耦,独立管理和扩展。 |
总结
LocalTableQuery
和 RemoteTableQuery
是 Paimon 为满足不同业务需求而设计的两种 Lookup Join 方案。
- LocalTableQuery 是一个性能优先、简单直接的方案。它用资源换性能,通过在计算节点本地缓存数据,将网络 IO 转化为本地 IO,以达到最低的查询延迟。
- RemoteTableQuery 是一个架构优先、灵活可扩展的方案。它通过引入独立的查询服务,实现了计算与存储的分离,让 Flink 作业本身变得轻量,同时为超大规模数据和多租户共享场景提供了可能。
在实际选择时,需要根据维表的大小、查询的 QPS、对延迟的容忍度以及运维复杂度等因素进行综合权衡。
Query Service
启动 Query Service 通常通过 Flink SQL CALL
一个系统过程,或者执行一个 Flink Action。
QueryServiceProcedure.java
: 这个类实现了CALL sys.query_service(...)
的逻辑。它会获取 Flink 的StreamExecutionEnvironment
,然后调用QueryService.build
。// ... existing code ... public String[] call(ProcedureContext procedureContext, String tableId, Integer parallelism)throws Exception {Table table = catalog.getTable(Identifier.fromString(tableId));StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();QueryService.build(env, table, parallelism);return execute(env, IDENTIFIER); } // ... existing code ...
QueryService.java
: 这个类负责构建 Flink 作业的数据流(DataStream)。它会创建一个QueryExecutorOperator
。// ... existing code ... public static void build(StreamExecutionEnvironment env, Table table, int parallelism) {// ...QueryExecutorOperator executorOperator = new QueryExecutorOperator(table);DataStreamSink<?> sink =stream.transform("Executor",InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),executorOperator).setParallelism(parallelism);// ... } // ... existing code ...
Query Service 的核心 QueryExecutorOperator
在做什么?
这正是揭示谜底的关键。QueryExecutorOperator
是一个 Flink 的算子,在它的 open()
方法中,它做了两件核心事情:
- 创建
LocalTableQuery
: 它实例化了一个LocalTableQuery
,这和普通的 Lookup Join 作业中的本地缓存模式完全一样。 - 启动
KvQueryServer
: 它启动了一个 RPC 服务器,这个服务器持有上面创建的LocalTableQuery
实例,并监听一个网络端口,等待客户端的查询请求。
// ... existing code ...
public class QueryExecutorOperator extends AbstractStreamOperator<InternalRow>implements OneInputStreamOperator<InternalRow, InternalRow> {
// ...private transient LocalTableQuery tableQuery;private transient KvQueryServer server;
// ...@Overridepublic void open() throws Exception {super.open();// 1. 创建一个 LocalTableQuery 实例,用于本地缓存和查询this.tableQuery =table.newLocalTableQuery().withIOManager(new IOManagerImpl(ioManager.getSpillingDirectories()));// 2. 启动一个 RPC 服务,该服务使用上面的 tableQuery 来响应查询this.server =new KvQueryServer(// ...tableQuery,// ...);server.start();}
// ...
}
当一个 RemoteTableQuery
客户端发起请求时,KvQueryServer
接收到请求,然后调用其内部持有的 tableQuery.lookup(...)
方法,执行查询并将结果返回给客户端。
为什么不直接用 LocalTableQuery
,而要启动一个服务?
既然底层都是 LocalTableQuery
,为什么还要引入 RemoteTableQuery
和 Query Service 这种 C/S 架构呢?这正是 Paimon 设计的精妙之处,它解决了 LocalTableQuery
模式的一些固有痛点:
资源隔离与解耦:
- Local 模式: Lookup 的缓存(磁盘、内存)和计算(CPU)都发生在业务 Flink 作业的 TaskManager 中。如果维表很大,或者更新频繁,会严重抢占业务作业的资源,可能导致作业不稳定。
- Service 模式: 查询负载被完全剥离到一个独立的 Flink 作业中。业务作业和查询服务作业可以独立部署、独立配置资源,互不干扰。
缓存共享与资源复用 (最核心的优势):
- Local 模式: 如果有 10 个不同的 Flink 作业都需要关联同一张大的维表,那么每个作业都会在自己的 TaskManager 上维护一套完整且独立的本地缓存。这造成了巨大的资源浪费(10 倍的磁盘空间、10 倍的网络拉取带宽、10 倍的索引构建 CPU 消耗)。
- Service 模式: 只需要部署一套 Query Service。所有 10 个业务作业都通过轻量级的
RemoteTableQuery
客户端去请求这一套共享的服务。缓存只存在一份,被所有作业共享,极大地节约了资源。
独立的生命周期与弹性伸缩:
- Query Service 可以作为基础设施长期运行,其生命周期与任何一个业务作业都无关。
- 可以根据查询的 QPS 压力,独立地对 Query Service 作业进行扩缩容(调整其
parallelism
),而无需触碰任何业务作业。
总结
用一个形象的比喻来总结:
LocalTableQuery
就像一个内嵌式数据库引擎(比如 SQLite),它直接在您的应用程序(Flink 作业)内部运行,简单直接,但资源耦合,无法共享。- Paimon Query Service 则更像是一个独立的数据库服务器(比如 MySQL 或 PostgreSQL)。虽然它底层可能也用了相似的存储和索引技术(即
LocalTableQuery
),但通过 C/S 架构,它提供了资源隔离、缓存共享和独立管理的强大能力。
Paimon Query Service 正是通过将 LocalTableQuery
的能力服务化,从而解决了大规模、多租户场景下的维表关联痛点。