Paimon的部分更新以及DeleteVector实现

背景

本文基于 Paimon 0.9
出于对与Paimon内部的DeleteVctor的实现以及部分更新的实现进行的源码阅读。
关于 DeleteVector的介绍可以看这里

说明

对于Paimon来说无论是Spark中使用还是Flink使用,后面的逻辑都是一样的,所以我们以Spark为例来说。所以我们会参考类 org.apache.paimon.spark.SparkSource,
对于Flink可以参考org.apache.paimon.flink.FlinkTableFactory
如没特别说明,这里都是以主键表来进行说明。

paimon的部分字段更新

这里主要的场景更多的是多流或者多批写同一个表字段的场景,且每个流或批只更新某几个字段(同样的主键),具体的配置或说明参考Partial Update
这里涉及到的方法为 SparkTableWrite.write,最终会到MergeTreeWriter.write:

 @Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
  • writeBuffer.put 主要是往buffer中写数据
    这里的writeBufferSortBufferWriteBuffer类实例。
    这里会 主键+sequenceNumber+valueKind + value 的形式写入数据
  • flushWriteBuffer 这里就会涉及到数据落盘以及部分更新的逻辑:
      writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);
    
    • mergeFunction 这里的函数就是会在MergeTreeWriter初始化,也就是会初始化为PartialUpdateMergeFunction
    • 对于forEach的实现会构建一个 MergeIterator,在这里面会调用 PartialUpdateMergeFunction.add方法
      这里就会涉及到部分更新的逻辑,主要就是:把按照 主键+sequenceNumber 排序好的数据传给PartialUpdateMergeFunction
      这样PartialUpdateMergeFunction只需要判断前后两个的数据的主键是否一致来进行更新。
      具体的更新逻辑见: Partial Update
      new MergeIterator(awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
      
      这里的buffer.sortedIterator主要看SortBufferWriteBuffer构造方法(也就是为什么会按照主键+sequenceNumber排序):
       public SortBufferWriteBuffer(RowType keyType,RowType valueType,@Nullable FieldsComparator userDefinedSeqComparator,MemorySegmentPool memoryPool,boolean spillable,MemorySize maxDiskSize,int sortMaxFan,CompressOptions compression,IOManager ioManager) {...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);sortFields = IntStream.concat(sortFields, udsFields);}// sequence fieldsortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();// row typeList<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());fieldTypes.add(new BigIntType(false));fieldTypes.add(new TinyIntType(false));fieldTypes.addAll(valueType.getFieldTypes());NormalizedKeyComputer normalizedKeyComputer =CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);RecordComparator keyComparator =CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);...InternalRowSerializer serializer =InternalSerializers.create(KeyValue.schema(keyType, valueType));BinaryInMemorySortBuffer inMemorySortBuffer =BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
      
      其中IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())) 就会会把sequenceNumber这个字段带入到排序中去,
      也就是在buffer.sortedIterato方法中调用。
      如果有定义sequence.field,那这里面的字段也会参与排序,见:udsFields 字段

DeleteVector的实现

关于deleteVector的实现,可以参考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup的机制产生 DeleteVector:

  • 当一个记录不属于 level0层的话,就不会产生DelectVector
  • 当一个记录只属于需要进行compaction的level的话,就不会产生DeleteVector
  • 当一个记录只属于 level0层的话,就要去查询不包含 Compaction的层的文件数据,从而产生DeleteVector
    注意: deleteVector只支持主键表, 是属于bucket级别的,一个bucket一个DeleteVector。

DeleteVector的写

按照以上的说法,只有在Compaction的时候,才会产生DeleteVector,所以 我们直接到达 MergeTreeWriter.flushWriteBuffer,这里涉及到DeleteVector的数据流如下:

compactManager.triggerCompaction(forcedFullCompaction)||\/
submitCompaction||\/
MergeTreeCompactTask.doCompact||\/rewrite  ||\/
rewriteImpl ||\/
LookupMergeTreeCompactRewriter.rewrite ||\/
rewriteOrProduceChangelog||\/
createMergeWrapper||\/
iterator.next()||\/
RecordReaderIterator.next()||\/
advanceIfNeeded||\/
currentIterator.next() ||\/
SortMergeIterator.next()||\/
LookupChangelogMergeFunctionWrapper.add(winner)||\/
LookupChangelogMergeFunctionWrapper.getResult()
  • 这里MergeTreeCompactTask.doCompact写完之后,会有result.setDeletionFile(compactDfSupplier.get())
    compactDfSupplier 这里的源自submitCompaction方法中的compactDfSupplier构造:

     if (dvMaintainer != null) {compactDfSupplier =lazyGenDeletionFile? () -> CompactDeletionFile.lazyGeneration(dvMaintainer): () -> CompactDeletionFile.generateFiles(dvMaintainer);}
    

    而这里的deleteVector的产生来自LookupChangelogMergeFunctionWrapper.getResult(),见以下说明

  • 这里的LookupMergeTreeCompactRewriter.rewriteLookupMergeTreeCompactRewriter实例是在创建MergeTreeWriter

     CompactManager compactManager =createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
    

    这里会调用createRewriter方法创建LookupMergeTreeCompactRewriter实例,
    其中会根据lookupStrategy来创建该实例:

     public LookupStrategy lookupStrategy() {return LookupStrategy.from(mergeEngine().equals(MergeEngine.FIRST_ROW),changelogProducer().equals(ChangelogProducer.LOOKUP),deletionVectorsEnabled(),options.get(FORCE_LOOKUP));
    
  • 这里 currentIterator.next() 是 通过调用currentIterator = SortMergeReaderWithLoserTree.readBatch获取的,而SortMergeReaderWithLoserTree 是通过readerForMergeTree方法获取的

  • 这里LookupChangelogMergeFunctionWrapper.getResult()才是重点

     @Overridepublic ChangelogResult getResult() {// 1. Compute the latest high level record and containLevel0 of candidatesLinkedList<KeyValue> candidates = mergeFunction.candidates();Iterator<KeyValue> descending = candidates.descendingIterator();KeyValue highLevel = null;boolean containLevel0 = false;while (descending.hasNext()) {KeyValue kv = descending.next();if (kv.level() > 0) {descending.remove();if (highLevel == null) {highLevel = kv;}} else {containLevel0 = true;}}// 2. Lookup if latest high level record is absentif (highLevel == null) {InternalRow lookupKey = candidates.get(0).key();T lookupResult = lookup.apply(lookupKey);if (lookupResult != null) {if (lookupStrategy.deletionVector) {PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;highLevel = positionedKeyValue.keyValue();deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition());} else {highLevel = (KeyValue) lookupResult;}}}// 3. Calculate resultKeyValue result = calculateResult(candidates, highLevel);// 4. Set changelog when there's level-0 recordsreusedResult.reset();if (containLevel0 && lookupStrategy.produceChangelog) {setChangelog(highLevel, result);}return reusedResult.setResult(result);} 
    
  • 这里主要说明 lookup.apply的方法,其中 lookup的 构造是在createLookupChangelogMergeFunctionWrapper构造中:

       @Overridepublic MergeFunctionWrapper<ChangelogResult> create(MergeFunctionFactory<KeyValue> mfFactory,int outputLevel,LookupLevels<T> lookupLevels,@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {return new LookupChangelogMergeFunctionWrapper<>(mfFactory,key -> {try {return lookupLevels.lookup(key, outputLevel + 1);} catch (IOException e) {throw new UncheckedIOException(e);}},valueEqualiser,changelogRowDeduplicate,lookupStrategy,deletionVectorsMaintainer,userDefinedSeqComparator);}}
    

    这里的lookupLevels.lookup 会最终调用createLookupFile 方法构造LookupFile 实例,
    其中会调用 valueProcessor.persistToDisk(kv, batch.returnedPosition()方法,持久化 行号到对应的文件,
    这样就能获取到对应的行号。

  • 获取到对应的结果 lookupResult 后
    调用 deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()方法去构造
    DeletionVector.
    上面提到的result.setDeletionFile(compactDfSupplier.get())会调用 CompactDeletionFile.generateFiles(dvMaintainer) 方法
    从而调用maintainer.writeDeletionVectorsIndex方法,从而写如到DeleteVector文件中。

DeleteVector的读

DeleteVector的读取主要在以下方法中构造:PrimaryKeyFileStoreTable.newRead:
最终会调用RawFileSplitRead.createReader从而调用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)方法构造ApplyDeletionVectorReader实例:

 public RecordIterator<InternalRow> readBatch() throws IOException {RecordIterator<InternalRow> batch = reader.readBatch();if (batch == null) {return null;}checkArgument(batch instanceof FileRecordIterator,"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");return new ApplyDeletionFileRecordIterator((FileRecordIterator<InternalRow>) batch, deletionVector);}

该处的readBatch方法会构造一个ApplyDeletionFileRecordIterator迭代器,可见在next()方法会对每一个记录调用deletionVector.isDeleted是否删除的判断.

 @Overridepublic InternalRow next() throws IOException {while (true) {InternalRow next = iterator.next();if (next == null) {return null;}if (!deletionVector.isDeleted(returnedPosition())) {return next;}}}

FAQ

写入文件的时候,怎么记录行号和主键的关系?

这里不会写入的时候记录行号,会在调用createLookupFile 在构建 LookupFile这个文件的时候(初始化),从parquet文件读取过来的时候,就会获取行号。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/90130.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/90130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 的事务机制是怎样的?

Redis 的事务机制 Redis支持事务机制,其主要目的是确保多个命令执行的原子性,即这些命令会作为一个不可分割的操作单元执行。 需要注意的是,Redis事务不支持回滚操作。从Redis 2.6.5版本开始,服务器会在命令累积阶段检测错误。在执行EXEC命令时,若发现错误则会拒绝执行事…

网安学习NO.17

1. VPN 概述定义&#xff1a;在公用网络&#xff08;如 Internet、帧中继、ATM 等&#xff09;中&#xff0c;通过技术手段虚拟出的一条企业内部专线&#xff0c;能像私有网络一样提供安全性、可靠性和可管理性。核心特征&#xff1a;利用公共网络构建&#xff0c;具备 “虚拟性…

MCU芯片AS32S601在卫星光纤放大器(EDFA)中的应用探索

摘要&#xff1a;本文聚焦于国科安芯推出的AS32S601型MCU芯片在卫星光纤放大器&#xff08;EDFA&#xff09;中的潜在应用&#xff0c;探讨其技术特性、抗辐射性能及适用性。通过分析其在单粒子效应脉冲激光试验中的表现&#xff0c;结合EDFA系统对控制芯片的要求&#xff0c;评…

Hexo - 免费搭建个人博客02 - 创建个人博客

导言我的博客&#xff1a;https://q164129345.github.io/ 开始一步一步地完成博客的创建。 一、初始化Hexo博客以上所示&#xff0c;运行以下指令在myCode文件夹里初始化一个hexo博客。 hexo init myblog二、安装依赖如上所示&#xff0c;完成依赖的安装。 cd myblog npm insta…

单片机-----基础知识整合

一、基础知识1&#xff09;单片机的组成&#xff1a;中央处理器CPU、随机存储器RAM、只读存储器ROM、定时器、多种I/O接口、中断系统等2&#xff09;STM32U575RIT6采用ARM Cortex-M33内核架构ARM是什么&#xff1f;①ARM是一家公司&#xff0c;ARM公司是一家芯片知识产权&#…

双流join 、 Paimon Partial Update 和 动态schema

背景 Paimon 通过其独特的 partial-update 合并引擎和底层的 LSM 存储结构&#xff0c;巧妙地将传统双流 Join 中对 Flink State 的高频随机读/写&#xff0c;转换为了对 Paimon 表的顺序写和后台的高效合并&#xff0c;从而一站式地解决了 Flink 作业状态过大、依赖外部 KV 系…

7.3.1 进程调度机制那些事儿

一&#xff1a;task_struct结构体分析 1、进程有两种特殊形式&#xff1a;没有用户虚拟地址空间的进程叫内核线程&#xff0c;共享用户虚拟地址空间的进程叫作用户线程。共享同一个用户虚拟地址空间的所有用户线程叫线程组。 C语言标准库进程 Linux内核进程 …

基于多种机器学习的水质污染及安全预测分析系统的设计与实现【随机森林、XGBoost、LightGBM、SMOTE、贝叶斯优化】

文章目录有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主项目介绍总结每文一语有需要本项目的代码或文档以及全部资源&#xff0c;或者部署调试可以私信博主 项目介绍 随着工业化和城市化的不断推进&#xff0c;水质污染问题逐渐成为影响生态环境…

Linux第三天Linux基础命令(二)

1.grep命令可以通过grep命令&#xff0c;从文件中通过关键字过滤文件行。grep [-n] 关键字 文件路径选项-n&#xff0c;可选&#xff0c;表示在结果中显示匹配的行的行号。参数&#xff0c;关键字&#xff0c;必填&#xff0c;表示过滤的关键字&#xff0c;带有空格或其它特殊符…

Linux Debian操作系统、Deepin深度操作系统手动分区方案参考

以下是Linux Debian操作系统、Deepin深度操作系统安装过程中手动分区的建议&#xff0c;按UEFI、swap、boot、根分区、home分区划分&#xff0c;以下是详细的分区配置参考建议&#xff1a; 一、手动分区方案&#xff08;UEFI模式&#xff09;分区名称分区类型大小建议挂载点文件…

jmeter如何做自动化接口测试?

全网最全流程&#xff01;JmeterAntAllureJenkins搭建属于你的接口自动化流水线&#xff0c;CI/CD直接起飞&#xff01;1.什么是jmeter&#xff1f; JMeter是100%完全由Java语言编写的&#xff0c;免费的开源软件&#xff0c;是非常优秀的性能测试和接口测试工具&#xff0c;支…

MyBatis整合SpringBoot终极指南

以下是一份系统化的 ​MyBatis 整合 Spring Boot 学习笔记&#xff0c;结合官方文档与最佳实践整理&#xff0c;涵盖配置、核心功能、实战示例及常见问题解决。 一、整合基础与依赖配置 1. ​核心依赖​ 在 pom.xml 中添加&#xff1a; <dependency><groupId>or…

企业微信ipad协议接口解决方案最新功能概览

支持最新版本企业微信&#xff0c;安全稳定0封号免费试用&#xff0c;技术支持&#xff1a;string wechat"Mrzhu0107"企微ipad协议接口最新功能升级如下&#xff1a;【初始化】初始化企业微信&#xff0c;设置消息回调地址&#xff0c;获取运行中的实例&#xff0c;根…

ansible 批量 scp 和 load 镜像

1、save 镜像脚本 在本地保存镜像到 ansible 代码目录的脚本。 1.1、使用说明: 保存单个镜像 save -i gcr.io/cadvisor/cadvisor:v0.52.1保存某个 namespace 下的所有镜像 save1.2、脚本内容 cat /usr/local/bin/save #!/bin/bash #set -e # 分隔符 str="-"# …

【C# in .NET】20. 探秘静态类:抽象与密封的结合体

探秘静态类:抽象与密封的结合体 一、静态类的底层本质:抽象与密封的结合体 静态类作为 C# 中特殊的类型形式,其底层实现融合了抽象类与密封类的特性,形成了不可实例化、不可继承的类型约束。 1. IL 层面的静态类标识 定义一个简单的静态类: public static class Stri…

【Vue3】ECharts图表案例

官方参考&#xff1a;Examples - Apache ECharts 1、创建工程 npm create vitelatest 或 npm init vuelatest 设置如下 2、下载依赖集运行项目 cd vue-echarts-demo npm install npm install echarts npm run dev 3、编写核心代码 创建src\components\BarView.vue文件…

二分查找----2.搜索二维矩阵

题目链接 /** 方案一: 每行都是递增的,对每行进行二分,逐行查找;效率不高,每次搜索只能控制列无法兼顾到行,行被固定存在不必要的搜索 方案二: 从右上或左下顶点出发,以右上为例,向左迭代列减小,向下迭代行增大;效率更高避免重复搜索 */ class Solution {/**方案一: 每行都是…

2025.7.23

flen&#xff08;&#xff09;这个函数计算到的文件大小为0&#xff0c;明天解决 原因是路径错误&#xff0c;写成了CONFIG_ROOT_PATH"/music/test2.mp3,但是也没报错&#xff0c;打开文件也成功&#xff0c;所以就没有怀疑到路径方面来

大致自定义文件I/O库函数的实现详解(了解即可)

目录 一、mystdio.h 代码思路分析 二、mystdio.c 1. 辅助函数 BuyFile 2. 文件打开函数 MyFopen 3. 文件关闭函数 MyFclose 4. 数据写入函数 MyFwrite 1、memcpy(file->outbuffer file->bufferlen, str, len); 2、按位与&#xff08;&&#xff09;运算的作…

Zipformer

Zipformer首先&#xff0c;Conv-Embed 将输入的 100Hz 的声学特征下采样为 50 Hz 的特征序列&#xff1b;然后&#xff0c;由 6 个连续的 encoder stack 分别在 50Hz、25Hz、12.5Hz、6.25Hz、12.5Hz 和 25Hz 的采样率下进行时域建模。除了第一个 stack 外&#xff0c;其他的 st…