使用Flink CDC实现 Oracle数据库数据同步(非SQL)

avatar
作者
猴君
阅读量:2

文章目录


前言

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。


一、开启归档日志

1)数据库服务器终端,使用sysdba角色连接数据库

 sqlplus / as sysdba 或 sqlplus /nolog CONNECT sys/password AS SYSDBA; 

2)检查归档日志是否开启

archive log list; 

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志

alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open; 

注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态

二、创建flinkcdc专属用户

2.1 对于Oracle 非CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;   GRANT CREATE SESSION TO flinkuser;   GRANT SET CONTAINER TO flinkuser;   GRANT SELECT ON V_$DATABASE to flinkuser;   GRANT FLASHBACK ANY TABLE TO flinkuser;   GRANT SELECT ANY TABLE TO flinkuser;   GRANT SELECT_CATALOG_ROLE TO flinkuser;   GRANT EXECUTE_CATALOG_ROLE TO flinkuser;   GRANT SELECT ANY TRANSACTION TO flinkuser;   GRANT LOGMINING TO flinkuser;   GRANT ANALYZE ANY TO flinkuser;    GRANT CREATE TABLE TO flinkuser;   -- need not to execute if set scan.incremental.snapshot.enabled=true(default)   GRANT LOCK ANY TABLE TO flinkuser;   GRANT ALTER ANY TABLE TO flinkuser;   GRANT CREATE SEQUENCE TO flinkuser;    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;   GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;    GRANT SELECT ON V_$LOG TO flinkuser;   GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;   GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;   GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;   GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;   GRANT SELECT ON V_$LOGFILE TO flinkuser;   GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;   GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; 

2.2 对于Oracle CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;   GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;   GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;   GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;   GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;   GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;   GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;   GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;   GRANT LOGMINING TO flinkuser CONTAINER=ALL;   GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;   -- need not to execute if set scan.incremental.snapshot.enabled=true(default)   GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;   GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;   GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;    GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;   GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL; 

三、指定oracle表、库级启用

-- 指定表启用补充日志记录: ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;  -- 为数据库的所有表启用 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;  -- 指定数据库启用补充日志记录 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; 

四、使用flink-connector-oracle-cdc实现数据库同步

4.1 引入pom依赖

 <dependency>      <groupId>com.ververica</groupId>      <artifactId>flink-connector-oracle-cdc</artifactId>      <version>2.4.0</version>  </dependency> 

4.1 Java主代码

package test.datastream.cdc.oracle;   import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.types.Row; import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction; import test.datastream.cdc.oracle.function.CdcString2RowMap; import test.datastream.cdc.oracle.function.DbCdcSinkFunction;  import java.util.Properties;  public class OracleCdcExample {     public static void main(String[] args) throws Exception {         Properties properties = new Properties();         //数字类型数据 转换为字符         properties.setProperty("decimal.handling.mode", "string");         SourceFunction<String> sourceFunction = OracleSource.<String>builder() //                .startupOptions(StartupOptions.latest()) // 从最晚位点启动                 .url("jdbc:oracle:thin:@localhost:1521:orcl")                 .port(1521)                 .database("ORCL") // monitor XE database                 .schemaList("c##flink_user") // monitor inventory schema                 .tableList("c##flink_user.TEST2") // monitor products table                 .username("c##flink_user")                 .password("flinkpw")                 .debeziumProperties(properties)                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String                 .build();          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering         SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());          SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))                 .process(new CacheDataAllWindowFunction()); 		//批量同步         winStream.addSink(new DbCdcSinkFunction(null));         env.execute();     } } 

4.1 json转换为row

package test.datastream.cdc.oracle.function;  import cn.com.victorysoft.common.configuration.VsConfiguration; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import test.datastream.cdc.CdcConstants;  import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; import java.util.Set;  /**  * @desc cdc json解析,并转换为Row  */ public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {     private Map<String,Integer> columnMap =new HashMap<>();     @Override     public void open(Configuration parameters) throws Exception {         columnMap.put("ID",0);         columnMap.put("NAME",1);         columnMap.put("DESCRIPTION",2);         columnMap.put("AGE",3);         columnMap.put("CREATE_TIME",4);         columnMap.put("SCORE",5);         columnMap.put("C_1",6);         columnMap.put("B_1",7);     }     @Override     public void flatMap(String s, Collector<Row> collector) throws Exception {         System.out.println("receive: "+s);         VsConfiguration conf=VsConfiguration.from(s);         String op = conf.getString(CdcConstants.K_OP);         VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);         VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);         Row row =null;         if(CdcConstants.OP_C.equals(op)){             //插入,使用after数据             row = convertToRow(after);             row.setKind(RowKind.INSERT);         }else if(CdcConstants.OP_U.equals(op)){             //更新,使用after数据             row = convertToRow(after);             row.setKind(RowKind.UPDATE_AFTER);         }else if(CdcConstants.OP_D.equals(op)){             //删除,使用before数据             row = convertToRow(before);             row.setKind(RowKind.DELETE);         }else {             //r 操作,使用after数据             row = convertToRow(after);             row.setKind(RowKind.INSERT);         }         collector.collect(row);     }      private Row convertToRow(VsConfiguration data){         Set<String> keys = data.getKeys();         int size = keys.size();         Row row=new Row(8);         int i=0;         for (String key:keys) {             Integer index = this.columnMap.get(key);             Object value=data.get(key);             if(key.equals("CREATE_TIME")){                 //long日期转timestamp                 value=long2Timestamp((Long)value);             }             row.setField(index,value);         }         return row;     }     private static  java.sql.Timestamp long2Timestamp(Long time){         Timestamp timestamp = new Timestamp(time/1000);         System.out.println(timestamp);         return timestamp;     }  } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!