flink oracle cdc实时同步(超详细)

avatar
作者
猴君
阅读量:0

文章目录

01 引言

官方文档:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md

本文参照官方文档来记录Oracle CDC 的配置。

在本文开始前,需要先安装Oracle,有兴趣的同学可以参考博主之前写的《docker下安装oracle11g(一次安装成功)》

02 前提条件

如果要做oracle的实时同步,Oracle数据库配置必须满足如下:

  1. Oracle数据库启用日志归档
  2. 定义具有适当权限的Oracle用户
  3. 被捕获的表或数据库上必须启用增量日志记录

在官网的安装教程中,可以看到有两种数据库类型的配置,分别是:

类型描述版本
独立的数据库架构(Non-CDB database)是传统的独立数据库架构,每个数据库实例包含所有的对象和数据Oracle 11g以及之前的版本中使用
多租户数据库架构(CDB database)多租户架构的数据库,包含一个根容器和多个子容器(PDB),每个PDB可以看作是一个独立的数据库Oracle 12c开始

因为 之前安装 的是Oracle11g,所以本文以独立的数据库架构(Non-CDB database)的配置来讲解。

03 配置

注意以下执行的条件是基于之前安装好的oracle环境来执行的,详情参阅:《docker下安装oracle11g(一次安装成功)》

3.1 启用日志归档

Step1:进入容器:

docker exec -it oracle_11g bash 

Step2:以DBA的权限登录数据库:

sqlplus /nolog CONNECT sys/system AS SYSDBA 

Step3:启用日志归档:

-- 设置数据库恢复文件目标大小为10G alter system set db_recovery_file_dest_size = 10G;  -- 设置数据库恢复文件目标路径 alter system set db_recovery_file_dest = '/home/oracle/app/oracle/product/11.2.0' scope=spfile;  -- 立即关闭数据库 shutdown immediate;  -- 以mount模式启动数据库 startup mount;  -- 启用数据库归档日志模式 alter database archivelog;  -- 打开数据库,允许用户访问 alter database open; 

操作记录如下:
在这里插入图片描述
Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)

archive log list; 

在这里插入图片描述
/home/oracle/app/oracle/product/11.2.0目录也能看到数据库恢复文件(按日期分目录):
在这里插入图片描述


备注:

  • 启用日志归档需要重启数据库。
  • 归档日志将占用大量的磁盘空间,因此需要定期清理过期的日志。

3.2 用户赋权

Step1创建表空间(创建表空间是为了提供一个独立、可控、可扩展的存储区域,以供CDC工具捕获和管理数据库的增量数据,这对于实时同步和数据变更追踪非常重要,并为数据流和数据仓库等应用提供可靠的数据源。)

-- 以DBA的权限登录数据库 sqlplus /nolog CONNECT sys/system AS SYSDBA -- 创建一个名为"logminer_tbs"的表空间 -- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf",其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录,"logminer_tbs.dbf"是数据文件的文件名 -- 设置表空间的初始大小为25MB -- 如果数据文件已经存在且可重用,将其重用,否则创建一个新的数据文件 -- 启用表空间的自动扩展功能,即当表空间空间不足时,自动增加数据文件的大小 -- 设置表空间的最大允许大小为无限,即表空间可以无限制地自动扩展 CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; 

在这里插入图片描述
可以看到在“/home/oracle/app/oracle/product/11.2.0”目录里已经创建了logminer_tbs.dbf文件:
在这里插入图片描述

Step2:创建用户并赋予权限

-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。 CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;  -- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。 GRANT CREATE SESSION TO flinkuser;  -- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。 -- GRANT SET CONTAINER TO flinkuser;  -- 允许"flinkuser"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。 GRANT SELECT ON V_$DATABASE TO flinkuser;  -- 允许"flinkuser"用户执行任何表的闪回操作。 GRANT FLASHBACK ANY TABLE TO flinkuser;  -- 允许"flinkuser"用户查询任何表的数据。 GRANT SELECT ANY TABLE TO flinkuser;  -- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。 GRANT SELECT_CATALOG_ROLE TO flinkuser;  -- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。 GRANT EXECUTE_CATALOG_ROLE TO flinkuser;  -- 允许"flinkuser"用户查询任何事务。 GRANT SELECT ANY TRANSACTION TO flinkuser;  -- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。 -- GRANT LOGMINING TO flinkuser;  -- 允许"flinkuser"用户创建表。 GRANT CREATE TABLE TO flinkuser;  -- 允许"flinkuser"用户锁定任何表。 GRANT LOCK ANY TABLE TO flinkuser;  -- 允许"flinkuser"用户修改任何表。 GRANT ALTER ANY TABLE TO flinkuser;  -- 允许"flinkuser"用户创建序列。 GRANT CREATE SEQUENCE TO flinkuser;  -- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。 GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;  -- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。 GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。 GRANT SELECT ON V_$LOG TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。 GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。 GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。 GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。 GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;  -- 允许"flinkuser"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。 GRANT SELECT ON V_$LOGFILE TO flinkuser;  -- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。 GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;  -- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。 GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;  

3.3 表或数据库上启用增量日志记录(supplemental log)

在讲解Oracle之前,很有必要先了解Oracle的逻辑结构。

3.3.1 Oracle 逻辑结构

Oracle数据库的物理结构与MySQL以及SQLServer有着很大的不同,在使用MySQL或SQLServer时,我们不需要去关心它们的逻辑结构和物理结构。

Oracle在逻辑结构中,分别是如下的结构:数据库实例 => 表空间 => 数据段(表) => 区 => 块

  • 数据库实例:前面的《docker下安装oracle11g(一次安装成功)》,在启动容器时,已经指定了Oracle的数据库实例的唯一ID,每个Oracle数据库实例都有一个唯一的SID,也就是说,安装的时候,已经创建好了一个“helowin”数据库实例了。在这里插入图片描述
  • 表空间:在 “用户赋权”的第1步骤,可以看出已经创建了“logminer_tbs”表空间(CREATE TABLESPACE logminer_tbs…)

相关的查询SQL:

-- 以DBA的权限登录数据库 sqlplus /nolog CONNECT sys/system AS SYSDBA  -- 查询数据库实例名称 SELECT NAME FROM V$DATABASE;  -- 查询所有表空间名称 SELECT TABLESPACE_NAME FROM DBA_TABLESPACES; 

结果如下:
在这里插入图片描述
ok,接下来就可以进入“LOGMINER_TBS”表空间去创建表了。

3.3.2 创建表

LOGMINER_TBS表空间下的flinkuser用户下创建customers表:

-- 切换至flinkuser用户 sqlplus /nolog CONNECT flinkuser/flinkpw  -- 创建customers表 CREATE TABLE customers (     customer_id NUMBER PRIMARY KEY,     customer_name VARCHAR2(50),     email VARCHAR2(100),     phone VARCHAR2(20) ) TABLESPACE LOGMINER_TBS; 

查看表是否创建成功:

-- 查看LOGMINER_TBS表空间下的所有表 select tablespace_name, table_name from user_tables where tablespace_name = 'LOGMINER_TBS'; 

可以看到表创建成功:
在这里插入图片描述

3.3.3 启用增量日志

切换至SYS用户,以DBA的权限登录数据库,为表和数据库启用增量日志:

-- 以DBA的权限登录数据库 sqlplus /nolog CONNECT sys/system AS SYSDBA  -- 为LOGMINER_TBS表空间下的customers表启用增强日志记录 ALTER TABLE FLINKUSER.CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS  -- 为数据库启用增强日志记录: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; 

操作成功:
在这里插入图片描述

04 flink sql

注意:源表的字段定义、schema-name以及table-name都要大写,否则无法同步

-- 创建Oracle CDC源表table_source_oracle,从Oracle数据库中读取数据 CREATE TABLE table_source_oracle (         CUSTOMER_ID INT,         CUSTOMER_NAME STRING,         EMAIL STRING,         PHONE STRING,         PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED         ) WITH ( 		'connector' = 'oracle-cdc', 		'hostname' = '10.194.183.120', 		'port' = '30026', 		'username' = 'flinkuser', 		'password' = 'flinkpw', 		'database-name' = 'helowin', 		'schema-name' = 'FLINKUSER', 		'table-name' = 'CUSTOMERS' )  -- 创建MySQL JDBC接收表table_sink_mysql,将数据写入到MySQL数据库 CREATE TABLE table_sink_mysql (     customer_id INT,     customer_name STRING,     email STRING,     phone STRING,     PRIMARY KEY (customer_id) NOT ENFORCED ) WITH (     'connector' = 'jdbc',     'url' = 'jdbc:mysql://10.194.183.120:30025/test',     'username' = 'root',     'password' = 'root',     'table-name' = 'customers' );  -- 将table_source_oracle表的数据插入到table_sink_mysql表中 INSERT INTO table_sink_mysql SELECT * FROM table_source_oracle; 

执行flinksql后,可以看到控制台没有报错,程序已经正常启动:
在这里插入图片描述

往customers插入一条数据:

-- 切换至flinkuser用户 sqlplus /nolog CONNECT flinkuser/flinkpw  -- 插入数据 INSERT INTO customers (customer_id, customer_name, email, phone) VALUES (1, 'Dumas', 'Dumas@example.com', '123-456-7890'); 

在这里插入图片描述

可以看到,已经写入成功了:
在这里插入图片描述

05 其它问题

问题解决参考:https://flink-learning.org.cn/article/detail/bf01dd4ff3ed8a11d6d38f365bc2a15d

虽然能做同步了,但是感觉还是有很多坑的,比如控制台会报错:
在这里插入图片描述
解决方式:在 create 语句中加上

'debezium.database.tablename.case.insensitive'='false' 

还有数据延迟较大,也是在create语句加上:

'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true' 

所以,最终的Flink SQL如下:

-- 创建Oracle CDC源表table_source_oracle,从Oracle数据库中读取数据 CREATE TABLE table_source_oracle (         CUSTOMER_ID INT,         CUSTOMER_NAME STRING,         EMAIL STRING,         PHONE STRING,         PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED         ) WITH (         'connector' = 'oracle-cdc',         'hostname' = '10.194.183.120',         'port' = '30026',         'username' = 'flinkuser',         'password' = 'flinkpw',         'database-name' = 'HELOWIN',         'schema-name' = 'FLINKUSER',         'table-name' = 'CUSTOMERS',         'debezium.database.tablename.case.insensitive'='false',         'debezium.log.mining.strategy'='online_catalog',         'debezium.log.mining.continuous.mine'='true' );  -- 创建MySQL JDBC接收表table_sink_mysql,将数据写入到MySQL数据库 CREATE TABLE table_sink_mysql (     customer_id INT,     customer_name STRING,     email STRING,     phone STRING,     PRIMARY KEY (customer_id) NOT ENFORCED ) WITH (     'connector' = 'jdbc',     'url' = 'jdbc:mysql://10.194.183.120:30025/test',     'username' = 'root',     'password' = 'root',     'table-name' = 'customers' );  -- 将table_source_oracle表的数据插入到table_sink_mysql表中 INSERT INTO table_sink_mysql SELECT * FROM table_source_oracle; 

06 文末

本文主要讲解了Flink Oracle CDC实时同步的所有步骤,希望能帮助到大家,谢谢大家的阅读,本文完!

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!