Flink Oracle CDC 总结

官方文档

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/oracle-cdc/

版本

  • Flink 1.15.3
  • CDC 2.3.0
  • Oracle 11G 12C (官网说支持19,未测试)

Jar包

https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar

  • 2.1-3.0 : https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/
  • 3.1+ : https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/

Oracle 安装

Docker 安装 Oracle 11G
Docker 安装 Oracle 12C

开启归档模式

-- 1. 关闭数据库
SHUTDOWN IMMEDIATE;-- 2. 启动数据库到 MOUNT 状态
STARTUP MOUNT;-- 3. 启用归档模式
ALTER DATABASE ARCHIVELOG;-- 4. 打开数据库
ALTER DATABASE OPEN;-- 5. 验证归档状态
ARCHIVE LOG LIST;SP2-0718: illegal ARCHIVE LOG option
SQL> archive log list;
Database log mode              Archive Mode
Automatic archival             Enabled
Archive destination            USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     20
Next log sequence to archive   22
Current log sequence           22-- 6. 也可以单独查询 log_mode,默认 NOARCHIVELOG
select log_mode from v$database;
ARCHIVELOG-- 7. 启用最小补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;-- 8. 启用所有列补充日志
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

创建表空间、用户并赋权

这里参考官网即可

  CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/xe/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

创建测试表

Source

CREATE TABLE FLINKUSER.CDC_SOURCE (ID INTEGER NOT NULL,NAME VARCHAR2(100),CONSTRAINT CDC_SOURCE_PK PRIMARY KEY (ID)
) ;
INSERT INTO FLINKUSER.CDC_SOURCE (ID, NAME )VALUES(1, '1');
INSERT INTO FLINKUSER.CDC_SOURCE (ID, NAME )VALUES(2, '2');
……

Sink

CREATE TABLE FLINKUSER.CDC_SINK (ID INTEGER NOT NULL,NAME VARCHAR2(100),CONSTRAINT CDC_SINK_PK PRIMARY KEY (ID)
) ;

CDC Oracle2Oracle

set yarn.application.name=cdc_oracle2oracle;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;
set taskmanager.numberOfTaskSlots=1;
set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_oracle2oracle;
set execution.target=yarn-per-job;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE oracle_cdc_source (ID  int  PRIMARY KEY NOT ENFORCED,NAME string
) WITH ('connector' = 'oracle-cdc','url' = 'jdbc:oracle:thin:@192.168.44.128:1522:XE','hostname' = '192.168.44.128','port' = '1522','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'FLINKUSER','table-name' = 'CDC_SOURCE','debezium.log.mining.strategy' = 'online_catalog','debezium.log.mining.continuous.mine' = 'true'
);create table oracle_cdc_sink (ID int PRIMARY KEY NOT ENFORCED,NAME string
) with ('connector' = 'jdbc','url' = 'jdbc:oracle:thin:@192.168.44.128:1522:XE','username' = 'flinkuser','password' = 'flinkpw','table-name' = 'FLINKUSER.CDC_SINK','sink.buffer-flush.max-rows' = '1000000'
);insert into oracle_cdc_sink select * from oracle_cdc_source;

注意:

  • Oracle CDC SQL 中的 字段名称、database-name、schema-name、table-name都要大写,否则会有问题。
  • Source 和 Sink表都要有主键,否则数据量对不上,差异很大,好像有参数能支持没有主键的表,暂时没有验证通。

异常

异常1

org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: oracle_cdc_source[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: oracle_cdc_sink[3]' (operator cbc357ccb763df2852fee8c4fc7d55f2).at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:201)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:394)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumeratorat com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151)at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197)... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database versionat io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:159)at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71)at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54)at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90)at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107)at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51)at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139)... 9 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.44.128:1522:XE
# 如果没有使用 url 参数
# Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.44.128:1522/XEat java.sql.DriverManager.getConnection(DriverManager.java:689)at java.sql.DriverManager.getConnection(DriverManager.java:208)at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184)at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121)at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890)at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885)at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643)at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517)at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129)... 15 more
  • jar 冲突:calcite-core-1.10.0.jar.
  • 移除 flink lib下该包

异常2

Caused by: java.sql.SQLException: Invalid column typeat oracle.jdbc.driver.Redirector$2.redirect(Redirector.java:261)at oracle.jdbc.driver.Representation.getObject(Representation.java:423)at oracle.jdbc.driver.Accessor.getObject(Accessor.java:986)at oracle.jdbc.driver.OracleStatement.getObject(OracleStatement.java:6521)at oracle.jdbc.driver.InsensitiveScrollableResultSet.getObject(InsensitiveScrollableResultSet.java:909)at io.debezium.connector.oracle.logminer.LogMinerHelper.lambda$getSystime$0(LogMinerHelper.java:207)at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:649)at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517)at io.debezium.connector.oracle.logminer.LogMinerHelper.getSystime(LogMinerHelper.java:205)at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:202)... 8 more

jar 冲突:ojdbc7.jar soure表只有一条数据
flink lib 下没有这个包,根据任务日志发现有加载这个包,排查 jdk下面有没有:

find /usr/lib/jvm -name "ojdbc*"
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre/lib/ext/ojdbc7.jar

移除该包即可:

mv /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64/jre/lib/ext/ojdbc7.jar ~/

异常3

2025-06-23 09:26:52
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: oracle_cdc_source[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: oracle_cdc_sink[3]' (operator cbc357ccb763df2852fee8c4fc7d55f2).at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:420)at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access method oracle.sql.Datum.compareBytes([B[B)I from class com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitterat com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.isChunkEndGeMax(OracleChunkSplitter.java:283)at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.nextChunkEnd(OracleChunkSplitter.java:307)at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.splitUnevenlySizedChunks(OracleChunkSplitter.java:249)at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.splitTableIntoChunks(OracleChunkSplitter.java:181)at com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter.generateSplits(OracleChunkSplitter.java:80)at com.ververica.cdc.connectors.base.source.assigner.SnapshotSplitAssigner.getNext(SnapshotSplitAssigner.java:178)at com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner.getNext(HybridSplitAssigner.java:129)at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:166)at com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:97)at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$2(SourceCoordinator.java:230)at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)... 8 more

与异常2一样,jar 冲突:ojdbc7.jar 但soure表数据量 ≥2

异常4

Caused by: io.debezium.DebeziumException: Supplemental logging not properly configured.  Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

没有启用最小补充日志,启用最小补充日志:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

可通过下面的SQL查询是否开启:

SELECT SUPPLEMENTAL_LOG_DATA_MIN,    -- 最小补充日志SUPPLEMENTAL_LOG_DATA_PK,     -- 主键补充日志SUPPLEMENTAL_LOG_DATA_UI,     -- 唯一索引补充日志SUPPLEMENTAL_LOG_DATA_FK,     -- 外键补充日志SUPPLEMENTAL_LOG_DATA_ALL     -- 所有列补充日志
FROM V$DATABASE;

异常5

Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table XE.FLINKUSER.CDC_SOURCE.  Use command: ALTER TABLE FLINKUSER.CDC_SOURCE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

没有启用所有列补充日志,启用所有列补充日志

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

异常6

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumeratorat com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database versionat io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:159) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]... 8 more
Caused by: java.sql.SQLRecoverableException: Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptorat oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_242]at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_242]at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]... 8 more
Caused by: oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12514, TNS:listener does not currently know of service requested in connect descriptorat oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_242]at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_242]at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:184) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:121) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:643) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:129) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:71) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:54) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139) ~[flink-sql-connector-oracle-cdc-2.3.0.jar:2.3.0]at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197) ~[flink-dist-1.15.3.jar:1.15.3]... 8 more

原因:database-name 填的是 SID 而非 Service Name,改成 Service Name 或者使用 url参数(官网最开始的文档里是没有 url参数的)
根本原因:Oracle CDC 源码根据配置的参数拼接Url时写死了格式 jdbc:oracle:thin:@//<host>:<port>/<service_name>,而 SID 和 Service Name的格式是不一样的:

  • 使用 SID 的 URL 格式:jdbc:oracle:thin:@<host>:<port>:<SID>
  • 使用 Service Name 的 URL 格式:jdbc:oracle:thin:@//<host>:<port>/<service_name>
    为了解决这个问题:在2.3.0 添加支持自定义url : Support custom url for incremental snapshot source https://github.com/ververica/flink-cdc-connectors/commit/4d9c0e41e169bf6cd8196a318c65fc965e002f57

异常7

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name

该异常出现在抽取增量数据时,原因为 database-name 填的小写,需要将其改为大写

-- 'database-name' = 'xe',
'database-name' = 'XE',

异常8

Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.

该异常为分区表 bug (不支持分区表),只有分区表才会有这个这个异常,相关链接:

https://github.com/apache/flink-cdc/issues/1737
https://developer.aliyun.com/ask/531328
https://github.com/apache/flink-cdc/pull/2479
https://github.com/apache/seatunnel/pull/8265

解决方法,参考上面两个PR修改源码 OracleConnectionUtils, 重新打包:

会话数一直增加不释放

flink任务失败后,不释放jdbc线程数(部分异常),一直尝试连接一直失败,导致线程数剧增,超过可用线程总数,最终导致Oracle不可用。

原因为在Flink 重启策略和故障恢复策略中提到的,默认参数时流任务失败后会一直无限重试,可以通过添加重试次数解决:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

具体哪个异常会导致该问题还没复现,等复现了会更新,因公众号文章修改字数有限制,可以通过点击文章底部阅读原文查看更新。

异常9

Caused by: Error : 1031, Position : 25, Sql = SELECT * FROM "FLINKUSER"."CDC_SOURCE" AS OF SCN 20645794535650, OriginalSql = SELECT * FROM "FLINKUSER"."CDC_SOURCE" AS OF SCN 20645794535650, Error Msg = ORA-01031: insufficient privileges

权限不足,通过SQL添加:

GRANT FLASHBACK ANY TABLE TO flinkuser ;
GRANT SELECT ANY TRANSACTION TO flinkuser ;

增量数据不同步

项目上碰到增量数据不同步问题,不报错,没找到原因就自己变好了,暂时未复现。

参数优化

对于大表,需要优化参数

'debezium.log.mining.batch.size.min' = '200000',
'debezium.log.mining.batch.size.max' = '50000000'
'scan.incremental.snapshot.enabled' = 'false',--默认是true,不修改数据获取很慢,每秒几十条SET execution.checkpointing.timeout = 600000s;

参考: https://github.com/apache/flink-cdc/discussions/1430

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/88623.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/88623.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

django request.data.get 判断有没有 某个参数

在 Django 的视图函数中&#xff0c;当你想要判断请求&#xff08;request&#xff09;中是否包含某个特定的参数&#xff0c;你可以使用 request.data.get() 方法。这种方法不仅适用于 POST 请求&#xff08;例如&#xff0c;在创建资源时&#xff09;&#xff0c;也适用于任何…

SD-WAN在可扩展性与未来发展灵活性方面的优势探讨

在企业数字化转型的浪潮中&#xff0c;网络基础设施的灵活性和扩展性成为企业关注的核心议题之一。SD-WAN&#xff08;Software-Defined Wide Area Network&#xff09;作为一种新兴的网络技术&#xff0c;因其灵活、智能、高效的特性&#xff0c;逐渐取代传统WAN&#xff0c;成…

4.9. 环境和分布偏移

目录 4.9. 环境和分布偏移1&#xff09;分布偏移的类型 4.9. 环境和分布偏移 机器学习应用常被忽视数据来源和模型输出处理。许多模型在测试集上表现好&#xff0c;但数据分布改变时会部署失败&#xff0c;甚至模型决策本身可能破坏数据分布&#xff08;如贷款模型基于“穿牛津…

UI前端与数字孪生融合:打造智能工厂的可视化监控平台

hello宝子们...我们是艾斯视觉擅长ui设计、前端开发、数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 在工业 4.0 与智能制造的浪潮中&#xff0c;数字孪生技术正从概念走向大规模落地。据麦肯锡报…

【数据集】3D-GloBFP:全球首个三维建筑轮廓数据集

目录 一、数据集介绍:《3D-GloBFP:全球首个三维建筑轮廓数据集》主要数据来源:模型方法:📥 二、数据下载方式方式1:Figshare方式2:下载亚洲建筑高度数据(完整版)参考🧾 数据集概述: 3D-GloBFP 是全球首个在单体建筑层面估算建筑高度的三维建筑轮廓数据集,基于 20…

python基于协同过滤的动漫推荐系统

目录 技术栈介绍具体实现截图系统设计研究方法&#xff1a;设计步骤设计流程核心代码部分展示研究方法详细视频演示试验方案论文大纲源码获取/详细视频演示 技术栈介绍 Django-SpringBoot-php-Node.js-flask 本课题的研究方法和研究步骤基本合理&#xff0c;难度适中&#xf…

MySQL 中 DATE、DATETIME 和 TIMESTAMP 的区别

MySQL 中 DATE、DATETIME 和 TIMESTAMP 的区别 在 MySQL 中&#xff0c;DATE、DATETIME 和 TIMESTAMP 都是用于存储日期和时间的数据类型&#xff0c;但它们在格式、范围、存储大小、时区处理和功能上存在显著差异。以下将逐步对比这些区别&#xff0c;帮助您根据实际需求选择…

面试 — 预进行 — 面试前准备

好记忆不如烂笔头&#xff0c;能记下点东西&#xff0c;就记下点&#xff0c;有时间拿出来看看&#xff0c;也会发觉不一样的感受. 书接上回&#xff0c;虽然我已经阐述过一下&#xff0c;详见面试 — 预准备 — 面试前准备攻略&#xff0c;但是我还是想再说一次&#xff0c;毕…

“易问易视”——让数据分析像聊天一样简单

一、项目简介 “易问易视”通过自然语言理解和大语言模型技术&#xff0c;将用户的中文查询自动转化为数据处理指令&#xff0c;实现无代码的数据检索与图表生成。你只要在大屏上输入一句话&#xff0c;比如“2024年每月有多少人出境”&#xff0c;它就能自动看懂你要查的时间…

【入门级-基础知识与编程环境:9、使用图形界面新建、复制、删除、移动文件或目录】

在图形界面&#xff08;GUI&#xff09;中操作文件或目录&#xff08;新建、复制、删除、移动&#xff09;是最直观的方式&#xff0c;不同操作系统&#xff08;Windows、macOS、Linux&#xff09;的操作类似但略有差异。以下是详细步骤&#xff1a; Windows 系统 新建文件/目…

百度中年危机:一场艰难的突围战

自互联网萌芽阶段起&#xff0c;搜索引擎便在数字世界中扮演着“指南针”的角色&#xff0c;引领用户在海量信息洪流中精准定位所需内容。传统搜索引擎依托关键词匹配技术&#xff0c;构建起大规模的信息索引系统&#xff0c;这一模式曾助力百度等企业攀上行业高峰。 然而&…

Vue3解析Spring Boot ResponseEntity

在 Vue 3 中解析 Spring Boot 返回的 ResponseEntity 主要涉及处理 HTTP 响应。Spring Boot 的 ResponseEntity 通常包含状态码、响应头和响应体&#xff08;JSON 数据为主&#xff09;。以下是详细步骤和代码示例&#xff1a; 解决方案步骤&#xff1a; 发送 HTTP 请求&#x…

深入掌握MyBatis:核心解析

一、MyBatis核心架构解析 1. 什么是MyBatis&#xff1f; MyBatis是一款半自动ORM框架&#xff0c;它通过XML或注解将SQL与Java对象映射&#xff0c;提供比Hibernate更灵活的SQL控制能力&#xff0c;同时消除了传统JDBC的样板代码。 2. 核心组件关系图 3. 核心组件职责 组件…

通达信 超级趋势强悍 幅图指标公式

指标用法说明 核心逻辑 该指标通过结合价格趋势、波动率和支撑阻力分析来识别潜在的买入机会和趋势转折点。 主要组成部分 趋势判断: 使用19日和7日EMA的交叉判断趋势方向 股道_Q_3:19日EMA上穿7日EMA(看涨信号) 股道_Q_4:7日EMA上穿19日EMA(看跌信号) 支撑阻力线: …

knowledge-vue2项目(Electron)打包为PC桌面应用程序

1.使用nvm管理node版本 不同的项目开发需要的node版本环境不一样,所以需要使用nvm进行版本管理。 关键命令: &#xff08;1&#xff09;检查nvm版本号是否安装成功 nvm -v &#xff08;2&#xff09;检查所有node版本号 nvm ls &#xff08;3&#xff09;安装指定node版…

k8s集群1.18.20更换节点ip地址段需求操作

前期已经部署好一套k8s集群1.18.20版本&#xff0c;1个master&#xff0c;2个node节点&#xff0c;使用节点地址段为192.168.66.0/24&#xff0c;现在因测试任务需要临时调整到192.168.40.0/24&#xff0c;以下记录一下相关操作步骤&#xff0c;请供参考学习。 一、环境准备 …

1-BaoStock股票数据下载

一、程序功能 程序基于 baostock 接口实现 A 股股票数据的获取与存储&#xff0c;主要功能包括股票列表更新、数据下载与处理。程序通过三个核心函数协同工作&#xff1a; update_stk_list(dateNone)&#xff1a;获取指定日期的 A 股股票列表&#xff0c;默认使用当日。自动处…

【C/C++】无锁队列实现与内存回收机制:Hazard Pointer 深度解析

无锁队列实现与内存回收机制&#xff1a;Hazard Pointer 深度解析 在并发系统中&#xff0c;为了提升性能和避免锁竞争&#xff0c;我们常常追求 lock-free 数据结构。但当你实现完一个无锁队列后&#xff0c;会发现一个严重问题&#xff1a; 内存什么时候释放&#xff1f;怎样…

Scrapy进阶封装(第三阶段:多管道封装,多文件存储)

1.yield返回数据的原理? 为什么要用yield返回数据给管道&#xff1f; 遍历这个函数的返回值的时候&#xff0c;挨个把数据读到内存&#xff0c;不会造成内存的瞬间占用过高&#xff0c;Python3中的range和python2中的xrange同理。scrapy是异步爬取&#xff0c;所以通过yield…

证照大师 MAX 4.0安装与基础功能体验(附流程演示)

软件介绍 证照大师 MAX 4.0是一款功能强大的证件照制作软件&#xff0c;专为满足用户不同场景下的证件照需求而设计。它整合了专业的照片处理技术和智能化的操作系统&#xff0c;提供了自动抠图、尺寸调整、美颜处理、批量处理以及格式转换等多种功能。该软件用户界面简洁明快…