Flink状态和容错-基础篇

1. 概念

flink的状态和容错绕不开3个概念,state backends和checkpoint、savepoint。本文重心即搞清楚这3部分内容。

容错机制是基于在状态快照的一种恢复方式。但是状态和容错要分开来看。

  • 什么是状态,为什么需要状态?

流计算和批计算在数据源上最大的区别是,流计算中的数据是无边界的,数据持续不断,而批计算中数据是有边界的,在计算时可以一次性将数据全部拿到。在流计算中无法拿到全部数据进行计算结果,因此需要将历史数据的处理结果记录下来,这个就是状态。通过状态实现历史数据和未来数据的结合处理。

举个例子,在对数据求和的场景中,第n条数据的求和结果是前n-1条数据的求和结果再加上第n条数据的值。因此必须记录前n-1条数据的累加和,才能在第n条数据到达时,得到前n条数据的累加和。
记录的前n-1条数据的累加和就是状态数据。

并不是全部的流计算场景都需要状态,如单词大小写转换的场景中,每条数据的处理仅和当前数据有关,因此不需要状态。

  • 什么是state backend?

state backend用于存储flink管理的状态。有多种实现方式,不同实现方式中数据结构和存储方式不同,对快照的支持也有所区别。

  • 什么是checkpoint?

checkpoint可以简单理解为游戏中的进度存档,在游戏中当玩家死亡或再次打开游戏时,可以从最近的游戏存档继续,而无需重头开始。checkpoint即作业在某个时刻的快照信息(状态的快照),如某个时刻数据源(消息队列)的消费位移,算子状态等。当发生故障恢复时,会从最新(最近)的一次checkpoint来恢复整个应用程序。

checkpoint是容错机制。checkpoint默认关闭,开启后会根据配置来持续自动保存。

  • 什么是savepoint?

checkpoint是根据配置项自动周期性进行的“存档,而savepoint则是需要手动触发的“存档”。savepoint是手动触发的checkpoint。

snapshot、checkpoint、savepoint在一些语境中是可以互换的,表示相同的含义。

  • state backend、checkpoint和savepoint的关系?

state backend表示的是状态,而checkpoint和savepoint表示的是状态在某个时刻的快照。state backend是状态存储。checkpoint和savepoint是容错机制。

2. checkpoint

2.1. 工作原理

checkpoint是由Jobmanager中的checkpoint coordinator来协调并在TaskManager执行的。当checkpoint开始时,所有的Source将会记录数据的偏移量,并将有编号的barrier插入到流中。barrier将流分划分了前一个checkpoint和下一个checkpoint两部分。当job graph中每个运算符收到其中一个barrier时,将开始记录状态。

当具有两个Input的运算符,默认情况下会执行barrier alignment。一个算子可能有多个Input,每个Input中都会携带barrier,根据运算符是否要等待全部Input中barrier将checkpoint分成aligned和unaligned(对齐和不对齐)的checkpoint。

state backend使用copy-on-write机制允许流处理不受阻碍得继续执行,同时对旧版本的状态进行异步快照,当快照被持久化后,旧版本的状态被清理。

2.2. 精确一次语义和端到端的精确一次?

  • 精确一次语义

精确一次语义的含义是,每个事件都会只影响flink管理的状态一次。并不是每个事件都只会处理一次。barrier alignment仅仅在精确一次语义提供保障,如果不需要精确一次语义,可以使用至少一次来获取一些性能。这具有禁用barrier alignment的效果。

  • 端到端的精确一次

端到端的精确一次语句是指来自Source中每个每个事件恰好影响sink一次。实现这个必须具备两个条件,Source必须是可重放的(如kafka)并且sink必须支持事务或幂等的。

2.3. checkpoint storage

checkpoint期间状态快照保留在哪里取决于所配置的checkpoint storage。

checkpoint storage提供了两种实现,基于分布式文件系统和基于JobManager jvm heap

  • 基于分布式文件系统,FileSystemCheckpointStorage
  • 基于Jobmanager jvm heap,JobManagerCheckpointStorage,默认方式。

JobManagerCheckpointStorage

使用方式

env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE));

MAX_MEM_STATE_SIZE的默认值5M,当快照超过此大小时checkpoint将失败,从而避免jobmanager OOM。无论配置的状态最大值是多少,状态都不能大于akka frame size(用于控制jogmanager和taskmanager之间发送消息的最大大小)。

FileSystemCheckpointStorage,配置了checkpoint路径后将会使用此方式,在执行checkpoint期间,状态快照将写入配置的文件系统和目录中的文件中。极少的元数据存储在JobManager内存中。

使用方式

// 全局配置 state.checkpoints.dir: hdfs:///checkpoints/
// 或在代码中为每个job配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
// 或
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/"));

目录结构

/user-defined-checkpoint-dir/{job-id}|+ --shared/ (这个目录表示多个checkpoint一部分的状态)+ --taskowned/ (该目录表示jobmanager绝对无法丢失的状态)+ --chk-1/ (其他目录表示单独属于一个checkpoint的状态)+ --chk-2/+ --chk-3/...  

2.4. retained checkpoint

默认情况下,flink仅仅保存最近的n个checkpoint并且取消作业或作业失败时删除它们,checkpoint结果仅用于在作业失败时自动使用其进行恢复作业。可以手动配置将checkpoint保留下来。这样在作业取消或失败时,可以手动使用保留下来的快照进行作业恢复。

使用方式

CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 可选项
// NO_EXTERNALIZED_CHECKPOINTS,默认值,禁用retained checkpoint
// DELETE_ON_CANCELLATION,作业被取消时ck将被删除。但当作业状态为JobStatus.FAILED时ck会保留
// RETAIN_ON_CANCELLATION,作业被取消时ck将保留(保留下来的数据需要手动删除)。但当作业状态为JobStatus.FAILED时ck会保留

2.5. 相关配置

  • checkpoint间隔
env.enableCheckpointing(1000);
  • checkpoint存储,设置存储checkpoint快照数据的位置,默认情况下使用jobmanager heap
// 当配置路径后,将使用FileSystemCheckpointStorage方式的checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
  • 外部化的checkpoint,即上文retained checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 精确一次 至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
  • 两次checkpoint的最小时间间隔,如果将该值设置为5s,则无论持续时间和间隔时间设置为何值,下一次checkpoint都将在上一个checkpoint完成后不早于5秒启动,这个值意味间隔时间将永远不会小于该时间,这个值还以意味着checkpoint的并发量为1。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • 可以容忍的checkpoint连续失败的个数,默认为0。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  • checkpoint的并行数量,默认情况下flink将不会再一个checkpoint进行时触发另一个checkpoint。定义了最小间隔时间后则不能使用该选项。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • 非对齐的checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
  • dag中包含有界数据源时的checkpoint,从1.14版本开始支持,1.15版本默认开启。
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);

2.6. checkpointing under backpressure

通常情况下,checkpoint时间由此过程中的同步和异步部分主导。但是在作业处于背压下时,checkpoint端到端时间主要影响因数是barriers传播到所有subtask/operators的时间。

3. savepoint

savepoint也是由checkpoint机制创建的。

savepoint由两部分组成,包含二进制文件的目录(通常较大)和元数据文件(相对较小)。二进制文件是状态快照的纯净文件,元数据文件中包含了二进制文件相对路径的指针。

通过state.savepoints.dir来指定savepoint文件路径。

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

savepoint可以将整个文件目录移动或复制到其他地方使用, 而checkpoint由于包含一些绝对路径,无法使用移动或复制后文件。

3.1. savepoint和checkpoint

savepoint和checkpoint之间的不同类似于传统数据中备份与恢复日志的不同。savepoint 代表数据库中的备份,checkpoint 代表数据中的日志恢复。

checkpoint的主要意图是一种异常时的容错机制。生命周期由flink管理,无需用户交互。checkpoint被频繁触发和依赖于故障恢复,因此checkpoint主要设计目标是尽可能轻量级的创建和尽可能的尽快恢复。

尽管savepoint也是使用checkpoint机制来创建的。但是和checkpoint的概念是不同的。savepoint的设计意图是可移植性和操作灵活性,尤其是作业更改方面,用于有计划的手动操作。如升级flink版本,更改job graph等,使用savepoint对作业进行恢复。

4. state backend

flink状态存储在state backend。state backend有两种实现:基于rocksDB(本地磁盘)和基于jvm heap(内存)

  • 基于rocksDB,EmbeddedRocksDBStateBackend,访问和更新状态涉及序列化和反序列化,成本更高,相对内存而言较慢,但是可以允许巨量的状态。支持增量快照。
  • 基于Java heap,HashMapStateBackend,默认方式,访问和更新状态涉及在heap上读写对象。

HashMapStateBackend

状态将作为jvm heap上的对象进行存储。由于作为对象进行存储,因此对象重用(reuse Object)是不安全的。这种建议将 managed memory 设置为0。从而使JVM为用户代码分配最大内存。状态大小受限于集群可用的内容大小。

EmbeddedRocksDBStateBackend

状态存储在rocksDB数据库中,即TaskManager的本地磁盘中,数据存储为序列化的字节数组。状态大小受限于磁盘空间大小。对象重用是安全的。当前唯一支持增量checkpoint的实现方式。

使用方式

// state.backend 配置项,可选项hashmap (HashMapStateBackend), rocksdb (EmbeddedRocksDBStateBackend)
// 或
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

4.1. 旧版的state backend实现

从1.13版本开始,社区重新设计了state backend实现类,新实现类目的是为了帮助用户更好的理解状态存储和checkpoint存储的分离。并未影响flink的state backend和checkpoint进行运行时实现和特征。可以使用新api来迁移老版本的应用程序,且不会丢失状态和一致性。

  • MemoryStateBackend,等价于HashMapStateBackend and JobManagerCheckpointStorage.
  • FsStateBackend,等价于HashMapStateBackend and FileSystemCheckpointStorage.
  • RocksDBStateBackend,等价于EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage.

4.2. 基于rocksDB的state backend

rocksDB的方式支持增量快照,增量快照是建立在先前旧快照基础上的。flink利用了rocksDB的内部的压缩机制对旧快照进行合并,因此增量快照的历史记录不会无限增长,旧的快照结果最终会被自动合并和修剪。

增量快照需要手动开启。开启增量快照后,在web UI上展示checkpoint data size仅仅表示增量数据的大小。

使用方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// true表示开启增量快照
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
4.2.1. 内存管理

rocksDB state backend的性能很大程度上取决于它可用的内存量。增加内存对提高性能有很大帮助。或者调整内存的功能。

默认情况下,RocksDB State Backend 使用 Flink 的managed memory预算作为 RocksDB 的缓冲区和缓存(state.backend.rocksdb.memory.managed:true)。

flink并不是直接将Manager memory的内存分配给rocksDB,而是将通过配置和限制来确保rocksDB的内存使用保持在Manager momery内存范围内。

flink在单个slot中的所有rocksDB实例之间共享缓存和写缓冲区。这种共享管理帮助限制rocksDB主要内存消耗组件的内存使用。

  • block cache:用于缓存从磁盘读取的数据块
  • index and bloom filters:用于加速数据检索,过滤不必要的读取
  • memtables:用于暂时存储写入的数据,直到他们被刷新到磁盘。

flink提供了额外的配置来调整rocksDB中写路径和读路径之间的内存分配

  • 写路径(memtable),如果发生频繁的刷新,则表示写缓存区内存不足,适当调整这部分内存。state.backend.rocksdb.memory.write-buffer-ratio,默认0.5,即50%
  • 读路径(index and bloom fliters,剩余缓存),如果发生频繁缓存未命中,则表示读操作的内存不足,适当调整这部分内存。state.backend.rocksdb.memory.high-prio-pool-ratio,默认0.1,即10%

在内存层面的调优,大多数情况下,增加Manager memory。

建议设置的配置。

state.backend.rocksdb.predefined-options,这个参数允许用户选择一组预定义的 RocksDB 配置,以优化不同的使用场景。具体来说,这个参数可以帮助用户在性能和资源使用之间进行权衡,而不需要手动调整大量的 RocksDB 配置项。

  • DEFAULT: 使用 RocksDB 的默认配置。
  • SPINNING_DISK_OPTIMIZED: 针对传统硬盘(HDD)进行优化。建议设置成此项
  • FLASH_SSD_OPTIMIZED: 针对固态硬盘(SSD)进行优化。
  • SPINNING_DISK_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 HDD 进行优化。单个slot的状态达到GB,且托管内存充裕,设置为此最佳。
  • FLASH_SSD_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 SSD 进行优化。

必要的RocksDB监控,观察是否有性能瓶颈,观察完毕后关闭它们

  • state.backend.rocksdb.metrics.block-cache-capacity,显示了为块缓存分配的总内存量,帮助了解块缓存的大小是否适合当前的需求。
  • state.backend.rocksdb.metrics.block-cache-usage,监视块缓存内存的使用情况,帮助了解在运行时使用了多少内存来缓存数据块。
  • state.backend.rocksdb.metrics.cur-size-all-mem-tables,监视以字节为单位的active和unflush的不可变 memtables 的大致大小。
  • state.backend.rocksdb.metrics.mem-table-flush-pending,监控 RocksDB 中挂起的 memtable 刷新次数。
  • state.backend.rocksdb.metrics.num-running-flushes,监控当前正在运行的flush次数。
  • state.backend.rocksdb.metrics.num-running-compactions,监控当前运行的压缩次数。
4.3. Timers

当state backend选则rocksDB时,定时器默认也存储在rocksDB中,这是更健壮和可扩展的选则。但是这样会有较高的成本,因此flink提供了基于JVM heap memory存储的定时器的选项(state.backend.rocksdb.timer-service.factory=heap)。当在无窗口、在ProcessFunction为使用定时器的场景下,将定时器存储在Heap中将会有更优的性能。但是可能会增加checkpoint时间,并且无法自然的扩展到内存之外。

当使用基于rocksDb的state backend和基于heap存储的定时器时,定时器不支持异步快照,其他状态仍会异步存储。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/86615.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/86615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【若依学习记录】RuoYi后台手册——分页实现

目录 若依系统简介 前端调用实现 前端调用举例 后台逻辑实现 若依系统简介 RuoYi 是一个基于 Spring Boot、Apache Shiro、MyBatis 和 Thymeleaf 的后台管理系统,旨在降低技术难度,助力开发者聚焦业务核心,从而节省人力成本、缩短项目周…

从台式电脑硬件架构看前后端分离开发模式

在软件开发领域,前后端分离早已成为主流架构设计理念。它将系统的业务逻辑处理与用户界面展示解耦,提升开发效率与系统可维护性。有趣的是,我们日常生活中常见的台式电脑硬件架构,竟与这一理念有着异曲同工之妙。今天,就让我们从台式电脑的硬件组成出发,深入探讨其与前后…

可观测性的哲学

在现代系统架构中,“可观测性(Observability)”已不仅仅是一个工程实践,是一种关于“理解世界”的哲学姿态, 还是一种帮助架构演变的认知工具。从柏拉图的“洞穴寓言”出发,我们可以构建起一条从被动接受投影&#xff…

开疆智能CCLinkIE转ModbusTCP网关连接傲博机器人配置案例

本案例是通过CClinkIE转ModbusTCP网关,连接傲博机器人的配置案例 PLC配置 打开三菱PLC组态软件GXWORK3设置CClinkIE一侧的参数配置,首先设置PLC的IP地址 双击详细设置进入CClinkIE配置 添加通用从站IP地址以及占用点数 设置好分配的软元件,确…

Bash Shellshock

CVE-2014-6271(Bash Shellshock远程命令注入漏洞) 该服务启动后有路径http://your-ip:port/victim.cgi和http://your-ip:port/safe.cgi。其中safe.cgi是新版页面,victim是bash4.3生成的页面。 漏洞位置在User-Agent中victim.cgi: User-Agent: () { foo; }; echo C…

以软件系统开发为例,解释PMO 与IPD、CMMI、项目管理什么区别和联系

以「开发一套智能仓储管理系统(WMS)」为例,拆解软件项目经理视角下的IPD、CMMI、项目管理和PMO如何协同运作: 场景设定 项目目标:6个月内交付WMS系统,支持日均10万订单处理关键角色: 你&#x…

TDengine 3.3.5.0 新功能 —— 查看库文件占用空间、压缩率

1. 背景 TDengine 之前版本一直没有通过 SQL 命令查看数据库占用的磁盘空间大小,从 3.3.5.0 开始,增加了这个方便且实用的小功能,这里详细介绍下。 2. SQL 基本语法 select expr from information_schema.ins_disk_usage [where condtion]…

蚂蚁百宝箱体验:如何快速创建“旅游小助手”AI智能体

蚂蚁百宝箱作为站式智能体应用开发平台,致力于为AI开发者提供简单、高效、快捷的智能体创作体验。作为业内领先的AI应用开发平台,开发者可以根据自身的个性化需求,基于各式各样的大模型来创建一个属于自己的智能体应用。 蚂蚁百宝箱&#xf…

AI助力JMeter—从静态参数化到智能动态化的进化之路

Apache JMeter作为开源利器被广泛应用于Web系统、API接口、数据库及消息队列等多场景性能验证。而“变量的使用”作为测试脚本灵活性和可维护性的核心手段,决定了脚本的复用性、可扩展性和数据驱动能力。传统的变量管理手段已难以应对大规模复杂测试任务中“动态化、…

第十六届蓝桥杯C/C++程序设计研究生组国赛 国二

应该是最后一次参加蓝桥杯比赛了,很遗憾,还是没有拿到国一。 大二第一次参加蓝桥杯,印象最深刻的是居然不知道1s是1000ms,花了很多时间在这题,后面节奏都乱了,抗压能力也不行,身体也不适。最后…

OpenCV计算机视觉实战(12)——图像金字塔与特征缩放

OpenCV计算机视觉实战(12)——图像金字塔与特征缩放 0. 前言1. 高斯金字塔1.1 应用场景1.2 实现过程 2. 拉普拉斯金字塔2.1 应用场景2.2 实现过程 3. 图像融合实例3.1 应用场景3.2 实现过程 小结系列链接 0. 前言 图像金字塔技术通过对原始图像按不同分…

【案例】基于Python的生源数据可视化分析:从Excel处理到动态地图展示

文章目录 需求分析技术要点程序流程一些细节核心代码表格的一些操作 心得体会代码汇总 需求分析 请设计一个程序,要求能够统计分析分散在不同表格中的数万条信息,以信息中的身份证号码或生源地代码字段为目标字段,统计每一年全国各省份及本省…

设计模式 | 原型模式

原型模式通过克隆机制实现对象高效创建,是性能敏感场景的利器。本文结合C示例详解实现原理、深拷贝陷阱、应用场景,并与工厂模式对比分析。 为何需要原型模式? 当遇到以下场景时,传统构造方法面临挑战: 创建成本高&am…

Go 语言中的单元测试

1、如何编写单元测试 在任何生产级别的项目开发中,单元测试都扮演着至关重要的角色。尽管许多初创项目在早期可能忽略了它,但随着项目逐渐成熟并成为核心业务,为其编写健壮的单元测试是保障代码质量和项目稳定性的必然选择。本文将带您快速掌…

8. 接口专业测试报告生成pytest-html

pytest-html 终极指南:打造专业级接口测试报告 在接口自动化测试中,清晰的测试报告是质量保障的关键。本文将深入解析如何通过pytest-html插件生成专业级测试报告。 一、核心安装与基础使用 快速安装(国内镜像) pip install -i …

Day45 Tensorboard使用介绍

目录 一、tensorboard的发展历史和原理及基本操作 1.1 发展历史 1.2 tensorboard的原理 1.3 日志目录自动管理 1.4 记录标量数据(Scalar) 1.5 可视化模型结构(Graph) 1.6 可视化图像(Image) 1.7 记…

用AI给AR加“智慧”:揭秘增强现实智能互动的优化秘密

用AI给AR加“智慧”:揭秘增强现实智能互动的优化秘密 引子:增强现实,到底还能怎么更聪明? 还记得当年Pokmon GO火爆全球的场景吗?玩家们手机对准街头,虚拟小精灵活灵活现地跳出来,那就是增强现实(AR)最经典的应用之一。随着硬件发展和算法进步,AR正逐步从“炫酷玩具…

1 Studying《Computer Vision: Algorithms and Applications 2nd Edition》1-5

目录 Chapter 1 Introduction 1.1 什么是计算机视觉? 1.2 简史 1.3 书籍概述 1.4 样本教学大纲 1.5 符号说明 1.6 其他阅读材料 Chapter 2 Image formation 2.1 几何基本元素和变换 2.2 光度图像形成 2.3 数码相机 2.4 其他阅读材料 2.5 练习 Chapter…

Augment插件macOS

macOS苹果电脑vscode-augment免费额度续杯跑满 前言 在AI辅助编程日益普及的今天,Augment作为VS Code中的智能代码助手,为开发者提供了强大的代码生成和优化功能。然而,免费版本每月300次的使用限制往往让重度用户感到困扰。本文将详细介绍如…

OpenCV CUDA模块设备层-----创建一个“常量指针访问器” 的工具函数constantPtr()

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 CUDA 设备端模拟一个“指向常量值”的虚拟指针访问器,使得你可以像访问数组一样访问一个固定值。 这在某些核函数中非常有用&…