Flink SQLServer CDC 环境配置与验证

一、SQL Server 数据库核心配置
1. 启用 CDC 功能(Change Data Capture)

SQL Server CDC 依赖数据库级别的 CDC 功能及表级别的捕获配置,需按以下步骤启用:

启用数据库 CDC

-- 以管理员身份连接数据库
USE master;
GO-- 检查数据库是否已启用CDC
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'MyDB' AND is_cdc_enabled = 1)
BEGINEXEC sys.sp_cdc_enable_db;PRINT 'CDC已启用';
END
ELSEPRINT 'CDC已启用';
GO

启用表级 CDC(以dbo.Orders表为例)

USE MyDB;
GO-- 确保SQL Agent服务已启动(CDC依赖Agent作业)
EXEC sys.sp_cdc_enable_table@source_schema = N'dbo',          -- 表所属模式@source_name = N'Orders',         -- 表名@role_name = N'cdc_reader',       -- 授权角色(可设为NULL使用默认权限)@filegroup_name = N'MyDB_CT',     -- 存储变更表的文件组(需提前创建)@supports_net_changes = 0;        -- 是否支持净变更(0为不支持)
GO-- 验证CDC配置
EXEC sys.sp_cdc_help_change_data_capture;
GO

创建文件组(若不存在)

USE MyDB;
GO
IF NOT EXISTS (SELECT 1 FROM sys.filegroups WHERE name = N'MyDB_CT')
BEGINALTER DATABASE MyDB ADD FILEGROUP MyDB_CT;ALTER DATABASE MyDB ADD FILE (NAME = N'MyDB_CT', FILENAME = N'C:\Data\MyDB_CT.ndf') TO FILEGROUP MyDB_CT;
END
GO
2. 创建专用用户并授权
-- 创建用户
CREATE LOGIN flinkuser WITH PASSWORD = 'Flink@123';
CREATE USER flinkuser FOR LOGIN flinkuser;-- 授予数据库访问权限
ALTER ROLE db_owner ADD MEMBER flinkuser;  -- 生产环境建议细化权限
GRANT SELECT ON ALL TABLES IN SCHEMA dbo TO flinkuser;-- 授予CDC相关权限
GRANT VIEW SERVER STATE TO flinkuser;
GRANT SELECT ON sys.change_tables TO flinkuser;
GO
二、Flink 环境集成配置
1. 添加Maven依赖
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-sqlserver-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency>
2. SQL Client部署
  1. 下载JAR包:flink-sql-connector-sqlserver-cdc-3.0.1.jar
  2. 将JAR包放入$FLINK_HOME/lib/目录后重启Flink集群。
三、Flink SQL 表定义与参数详解
1. 完整建表示例(含元数据列)
-- 配置checkpoint(可选)
SET 'execution.checkpointing.interval' = '5s';-- 创建SQL Server CDC表
CREATE TABLE sqlserver_orders (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,-- 元数据列:捕获变更信息db_name STRING METADATA FROM 'database_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector' = 'sqlserver-cdc','hostname' = '192.168.1.100','port' = '1433','username' = 'flinkuser','password' = 'Flink@123','database-name' = 'MyDB','table-name' = 'dbo.orders','server-time-zone' = 'Asia/Shanghai','scan.incremental.snapshot.enabled' = 'true'
);
2. 核心参数详解
参数名必选默认值类型说明
connectorString固定为sqlserver-cdc
hostnameStringSQL Server服务器IP或域名
usernameString连接数据库的用户名(需具备CDC读取权限)
passwordString连接数据库的密码
database-nameString数据库名称(如MyDB
table-nameString表名(格式:schema.table,如dbo.orders
port1433Integer数据库端口号
server-time-zoneUTCString数据库时区(如Asia/Shanghai),影响TIMESTAMP转换
scan.incremental.snapshot.enabledtrueBoolean启用增量快照(并行读取,需主键),默认开启
debezium.snapshot.modeinitialString快照模式:initial(结构+数据)、initial-only(仅快照)、latest-offset(仅结构)
四、环境验证与测试
1. 准备测试数据
-- 创建测试表(已启用CDC)
USE MyDB;
GO
CREATE TABLE dbo.orders (id INT PRIMARY KEY,order_date DATE,purchaser INT,quantity INT,product_id INT,update_time DATETIME
);-- 插入测试数据
INSERT INTO dbo.orders VALUES 
(1, '2023-01-01', 101, 5, 1001, GETDATE()),
(2, '2023-01-02', 102, 3, 1002, GETDATE());
GO
2. Flink SQL 验证
-- 查询CDC表(首次触发快照读取)
SELECT * FROM sqlserver_orders;-- 在SQL Server中更新数据
UPDATE dbo.orders SET quantity = 10 WHERE id = 1;
GO-- 观察Flink输出:应显示变更记录,op_ts为变更时间
3. DataStream API 验证(增量模式)
import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SqlServerCdcExample {public static void main(String[] args) throws Exception {// 配置SQL Server Source(增量快照模式)SqlServerSourceBuilder.SqlServerIncrementalSource<String> sourceBuilder = SqlServerSourceBuilder.sqlserverIncrementalSource().hostname("192.168.1.100").port(1433).databaseList("MyDB").tableList("dbo.orders").username("flinkuser").password("Flink@123").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).splitSize(1000) // 快照分片大小.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.fromSource(sourceBuilder,WatermarkStrategy.noWatermarks(),"SQL Server CDC Source").setParallelism(4) // 设置4并行度.print();env.execute("SQL Server CDC Test");}
}
五、常见问题与解决方案
  1. SQL Agent未运行

    ERROR: CDC作业无法启动,SQL Agent服务未运行
    
    • 解决方案:启动SQL Server Agent服务(可通过SQL Server配置管理器或命令行启动)。
  2. 权限不足

    ERROR: 用户无权访问CDC表
    
    • 解决方案:确认用户属于db_owner角色,或手动授予SELECT权限至sys.change_tables
  3. 增量快照失败(无主键表)

    ERROR: 表缺少主键,无法进行增量快照
    
    • 解决方案:为表添加主键,或手动指定分片键:
      'scan.incremental.snapshot.chunk.key-column' = 'id'
      
  4. 时区转换异常

    • 解决方案:显式设置server-time-zone参数:
      'server-time-zone' = 'Asia/Shanghai'
      
六、生产环境优化建议
  1. CDC清理策略

    • 配置CDC清理作业(定期删除旧变更数据):
      USE MyDB;
      GO
      EXEC sys.sp_cdc_cleanup_change_data; -- 清理旧变更记录
      
  2. 作业高可用

    • 使用SQL Server Always On Availability Groups时,Flink作业需连接主副本,并确保CDC配置在主库。
  3. 性能调优

    • 调整scan.incremental.snapshot.chunk.size(如设为10000)以平衡并行度和内存占用;
    • 对于大表,启用debezium.snapshot.fetch.size(如设为2048)优化快照读取性能。

通过以上步骤,可完成Flink SQL Server CDC的全流程配置与验证。生产环境中需特别注意SQL Agent的运行状态、CDC数据清理策略及增量快照的并行参数调优,以确保数据一致性和系统稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/88003.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88003.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软考(软件设计师)存储管理—设备管理,磁盘调度

I/O软件的核心目标是管理硬件差异、提供统一接口、实现高效可靠的数据传输。 核心目标&#xff1a; 设备无关性&#xff1a; 应用程序无需关心具体硬件细节。错误处理&#xff1a; 处理硬件错误和传输异常。同步/异步传输&#xff1a; 支持阻塞&#xff08;等待完成&#xff09…

[C语言] C语言数学函数库概览

C语言数学函数库概览 文章目录 C语言数学函数库概览一、概述二、基本数学函数详解1. 平方根函数 sqrt(x)2. 幂函数 pow(x, y)3. 绝对值函数 fabs(x)4. 向上取整函数 ceil(x)5. 向下取整函数 floor(x) 三、三角函数与双曲函数详解1. 正弦函数 double sin(double x)2. 余弦函数 d…

【简单三步】Stable diffusion Webai本地部署无法加载模型并报openai/clip-vit-large-patch14错误的解决方法

问题描述 Stable diffusion Webai本地部署成功后&#xff0c;手动加载本地模型checkpoint时&#xff0c;始终无法加载进去&#xff0c;确定模型存放位置无误&#xff08;位于models\Stable-diffusion&#xff09;查看cmd窗口时&#xff0c;发现一个报错提示&#xff1a;Can’t …

Java 命令行参数详解:系统属性、JVM 选项与应用配置

Java 命令行参数详解&#xff1a;系统属性、JVM 选项与应用配置 在 Java 应用启动命令中&#xff0c;如&#xff1a; java -jar -Dserver.port8088 xdr-demo-1.0-SNAPSHOT-assembly.jar &-Dserver.port8088是一个 系统属性&#xff08;System Property&#xff09; 设置。…

【论文笔记】World Models for Autonomous Driving: An Initial Survey

原文链接&#xff1a;https://ieeexplore.ieee.org/abstract/document/10522953 1. 世界模型的发展 A. 世界模型的结构基础 世界模型包含4个关键组件&#xff0c;以模拟人类连贯的思考和决策过程。 a&#xff09;感知模块使用如变分自编码器&#xff08;VAE&#xff09;、掩…

Spring Cloud Config(微服务配置中心详解)

关键词&#xff1a;Spring Cloud Config、配置中心、远程仓库、动态刷新、加密解密 ✅ 摘要 在微服务架构中&#xff0c;随着服务数量的增加&#xff0c;统一管理各服务的配置信息变得尤为重要。传统的本地配置文件方式难以满足多环境、多实例、集中化的需求。 Spring Cloud …

【Note】《深入理解Linux内核》 第二十章:深入理解 Linux 程序执行机制

《深入理解Linux内核》 第二十章&#xff1a;深入理解 Linux 程序执行机制&#xff08;Program Execution&#xff09;关键词&#xff1a;exec 系列系统调用、可执行文件格式&#xff08;ELF&#xff09;、用户地址空间、内存映射、动态链接、栈初始化、入口点、共享库、内核态…

服务器如何配置防火墙规则以阻止恶意流量和DDoS攻击?

防火墙是保护服务器免受恶意流量和 DDoS 攻击的第一道防线。通过合理配置防火墙规则&#xff0c;可以有效阻止恶意访问、限制不必要的流量&#xff0c;并减少攻击对服务器的影响。以下是配置防火墙规则的全面指南&#xff0c;包括基础规则设置、防御 DDoS 攻击的高级策略和最佳…

持续性投入是成就自我价值的关键一环

概述 时间&#xff0c;的唯一公平之处就是给你我的长度是相同的&#xff0c;这也是它唯一公平&#xff0c;也是不公平的地方。 所谓的公平&#xff0c;就是不患寡而患不均中所说的平均。 所谓的不公平就是&#xff0c;相同时间内我们彼此对应的标价不同&#xff0c;延伸到后…

使用allegro在BoardGeometry的Silkscreen_Top层画出图案

目录 1. 图形及图形放置显示2. 绘制 1. 图形及图形放置显示 绘制完成图案&#xff1a; 导出后图案&#xff1a; 2. 绘制 图层选中&#xff1b; 画圆型&#xff1b; 半径3.5mm&#xff0c;原点生成&#xff1b; 在图案中挖空&#xff1b; 用指令走线&#xff1a; …

Kotlin 协程:Channel 与 Flow 深度对比及 Channel 使用指南

前言 在 Kotlin 协程的异步编程世界里&#xff0c;Channel 和 Flow 是处理数据流的重要工具&#xff0c;它们有着不同的设计理念与适用场景。本文将对比二者功能与应用场景&#xff0c;详细讲解 Channel 的使用步骤及注意事项 。 一、Channel 与 Flow 的特性对比 Channel 是协程…

MYsql主从复制部署

MySQL 主从复制是将主数据库的变更自动同步到从数据库的过程&#xff0c;常用语读写分离、高可用性和数据备份。 1.环境准备 确保主从服务器已安装相同版本的 MySQL&#xff0c;并能通过网络互相访问。 # 检查 MySQL 版本 mysql -V 2.配置主服务器 &#xff08;1&#xff0…

安灯呼叫看板如何实现汽车生产异常秒级响应

在汽车零部件工厂的静置车间&#xff0c;传统生产管理依赖人工巡检与纸质记录&#xff0c;存在效率低、信息滞后、异常响应慢等问题。某汽车厂曾因物料静置时间未及时监控&#xff0c;导致批次混料&#xff0c;损失超10万元。而安灯呼叫看板系统的引入&#xff0c;通过实时状态…

构造函数注入在spring boot 中怎么使用详解

我们来详细讲解一下在 Spring Boot 中如何使用构造函数注入&#xff0c;并通过一个完整的、可运行的例子来演示。 构造函数注入是 Spring 官方最推荐的依赖注入方式&#xff0c;因为它能保证对象的不可变性和依赖的完整性。 核心理念 在 Spring Boot 中使用构造函数注入非常简单…

2025.6.30-2025.7.06第26周:第一次参加头马演讲俱乐部

现在是周一早上6:23&#xff0c;我开始写上周的周总结。 3件超出预期的事 参加头马俱乐部绝对是最超出预期的&#xff0c;使得这个周末格外的快乐简历的第一版终于改完了&#xff0c;花了好长的时间&#xff0c;其中有一天心情还很荡&#xff0c;因为&#xff0c;我想&#x…

2025使用VM虚拟机安装配置Macos苹果系统下Flutter开发环境保姆级教程--下篇

其实如何安装VM,如何安装MACOS网上的教程很多,我只是结合我的体验重新整理了一次,接下来才进入本教程最核心的部分,Flutter开发环境的配置部分。、一.配置前准备 主要是准备相应的工具包,以及其他虚拟机设置1.工具包 工具包的版本也可以自行配置,我这主要是我使用的是F…

QSPI、OSPI与FSMC的区别与内存映射分析

QSPI、OSPI与FSMC的区别与内存映射分析 基本概念与区别 1. FSMC (灵活静态存储控制器) 接口类型&#xff1a;并行接口&#xff0c;通常8/16位数据总线总线标准&#xff1a;传统并行总线协议速度&#xff1a;相对较低&#xff0c;通常最高约100MHz应用场景&#xff1a;SRAM、NOR…

系统思考与心智模式探索

成长的真正障碍&#xff0c;不是能力的不足&#xff0c;而是看待问题的局限。 在复杂多变的商业环境中&#xff0c;我们往往习惯于解决“眼前”的问题&#xff0c;却忽视了深藏背后的系统性障碍。我们看到的只是表面的“症状”&#xff0c;而真正的根源&#xff0c;却往往隐藏…

物联网技术的关键技术与区块链发展趋势的深度融合分析

一、物联网技术的核心架构与关键技术 物联网技术体系由感知层、网络层、平台层、应用层和安全层构成&#xff0c;各层技术协同工作&#xff0c;实现物理世界与数字世界的深度融合。 感知层&#xff1a;物联网的“感官” 传感器技术&#xff1a;包括环境传感器&#xff08;温度…

针对Exhcnage Server的攻击防范措施

一、背景介绍最近&#xff0c;安全研究人员揭露了一个名为 NightEagle&#xff08;又名 APT-Q-95&#xff09; 的高级持续性威胁&#xff08;APT&#xff09;组织。这个组织被观察到利用 Microsoft Exchange 服务器中的零日漏洞链 进行攻击&#xff0c;其主要目标是中国政府、国…