使用Flink CDC实现 Oracle数据库数据同步(非SQL)

文章目录

  • 前言
  • 一、开启归档日志
  • 二、创建flinkcdc专属用户
    • 2.1 对于Oracle 非CDB数据库,执行如下sql
    • 2.2 对于Oracle CDB数据库,执行如下sql
  • 三、指定oracle表、库级启用
  • 四、使用flink-connector-oracle-cdc实现数据库同步
    • 4.1 引入pom依赖
    • 4.1 Java主代码
    • 4.1 json转换为row


前言

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。


一、开启归档日志

1)数据库服务器终端,使用sysdba角色连接数据库

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;

2)检查归档日志是否开启

archive log list;

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态

二、创建flinkcdc专属用户

2.1 对于Oracle 非CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;

  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.2 对于Oracle CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
  GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
  GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
  GRANT LOGMINING TO flinkuser CONTAINER=ALL;
  GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;

  GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

三、指定oracle表、库级启用

-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

四、使用flink-connector-oracle-cdc实现数据库同步

4.1 引入pom依赖

 <dependency>
     <groupId>com.ververica</groupId>
     <artifactId>flink-connector-oracle-cdc</artifactId>
     <version>2.4.0</version>
 </dependency>

4.1 Java主代码

package test.datastream.cdc.oracle;


import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;

import java.util.Properties;

public class OracleCdcExample {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        //数字类型数据 转换为字符
        properties.setProperty("decimal.handling.mode", "string");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 从最晚位点启动
                .url("jdbc:oracle:thin:@localhost:1521:orcl")
                .port(1521)
                .database("ORCL") // monitor XE database
                .schemaList("c##flink_user") // monitor inventory schema
                .tableList("c##flink_user.TEST2") // monitor products table
                .username("c##flink_user")
                .password("flinkpw")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
        SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());

        SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new CacheDataAllWindowFunction());
		//批量同步
        winStream.addSink(new DbCdcSinkFunction(null));
        env.execute();
    }
}

4.1 json转换为row

package test.datastream.cdc.oracle.function;

import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @desc cdc json解析,并转换为Row
 */
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
    private Map<String,Integer> columnMap =new HashMap<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        columnMap.put("ID",0);
        columnMap.put("NAME",1);
        columnMap.put("DESCRIPTION",2);
        columnMap.put("AGE",3);
        columnMap.put("CREATE_TIME",4);
        columnMap.put("SCORE",5);
        columnMap.put("C_1",6);
        columnMap.put("B_1",7);
    }
    @Override
    public void flatMap(String s, Collector<Row> collector) throws Exception {
        System.out.println("receive: "+s);
        VsConfiguration conf=VsConfiguration.from(s);
        String op = conf.getString(CdcConstants.K_OP);
        VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
        VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
        Row row =null;
        if(CdcConstants.OP_C.equals(op)){
            //插入,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }else if(CdcConstants.OP_U.equals(op)){
            //更新,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.UPDATE_AFTER);
        }else if(CdcConstants.OP_D.equals(op)){
            //删除,使用before数据
            row = convertToRow(before);
            row.setKind(RowKind.DELETE);
        }else {
            //r 操作,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }
        collector.collect(row);
    }

    private Row convertToRow(VsConfiguration data){
        Set<String> keys = data.getKeys();
        int size = keys.size();
        Row row=new Row(8);
        int i=0;
        for (String key:keys) {
            Integer index = this.columnMap.get(key);
            Object value=data.get(key);
            if(key.equals("CREATE_TIME")){
                //long日期转timestamp
                value=long2Timestamp((Long)value);
            }
            row.setField(index,value);
        }
        return row;
    }
    private static  java.sql.Timestamp long2Timestamp(Long time){
        Timestamp timestamp = new Timestamp(time/1000);
        System.out.println(timestamp);
        return timestamp;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/752461.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

没有思考过 Embedding,不足以谈 AI

在当今的人工智能&#xff08;AI&#xff09;领域&#xff0c;Embedding 是一个不可或缺的概念。如果你没有深入理解过 Embedding&#xff0c;那么就无法真正掌握 AI 的精髓。接下来&#xff0c;我们将深入探讨 Embedding 的基本概念。 1. Embedding的基本概念 1.1 什么是 Emb…

ET实现游戏中邮件系统逻辑思路(服务端)

ET是一个游戏框架&#xff0c;用的编程语言是C#&#xff0c;游戏引擎是Unity&#xff0c;框架作者&#xff1a;熊猫 ET社区 在游戏中我们通常都会看到有邮件系统&#xff0c;邮件系统的作用有给玩家通知、发放奖励等 下面小编使用ET框架带大家看一下邮件系统的一种实现方…

远程过程调用RPC实现原理

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…

深度挖掘数据资产,洞察业务先机:利用先进的数据分析技术,精准把握市场趋势,洞悉客户需求,为业务决策提供有力支持,实现持续增长与创新

在当今日益激烈的商业竞争环境中&#xff0c;企业想要实现持续增长与创新&#xff0c;必须深入挖掘和有效运用自身的数据资产。数据不仅是企业运营过程中的副产品&#xff0c;更是洞察市场趋势、理解客户需求、优化业务决策的重要资源。本文将探讨如何通过利用先进的数据分析技…

多行业预约门店服务小程序源码系统 支持多门店预约 带完整的安装代码包以及搭建教程

系统概述 该系统基于先进的云计算和大数据技术&#xff0c;采用模块化设计&#xff0c;具有高度的可扩展性和可定制性。无论是餐饮、美容美发、健身房还是其他服务行业&#xff0c;都可以通过该系统轻松实现多门店预约功能。同时&#xff0c;我们还提供了丰富的接口和插件&…

stylelint 配置

1.vscode 安装插件Stylelint 2.项目安装插件 pnpm i stylelint stylelint-config-standard stylelint-config-recommended-scss stylelint-config-recommended-vue postcss postcss-html postcss-scss stylelint-config-recess-order stylelint-config-html -D 依赖 说明 备…

如何判断一个Repo是否是Private还是Internal?

Github的Repository分为三种类型&#xff0c;主要是用于决定谁可以访问、查看和克隆该仓库。GitHub 提供了几种不同的可见性选项&#xff0c;包括 Private、Public 和 Internal。 Private 只有仓库的拥有者和被明确邀请为协作者&#xff08;Collaborator&#xff09;的用户才能…

VMware虚拟机移植保姆级教程

文章目录 前言:一、打包与备份二、VMware移植1. 文件介绍2. 移植过程总结:前言: 前几日对电脑做了一个大的更新升级,不仅将硬件进行了升级,还对电脑的软件进行了升级也就是我从Win10今家庭版升级到Win11专业版啦,之前没有升级是因为数据量很多,怕升级后找不到自己需要的…

Windows和Linux C++判断磁盘空间是否充足

基本是由百度Ai写代码生成的&#xff0c;记录一下。实现此功能需要调用系统的API函数。 对于Windows&#xff0c;可调用函数GetDiskFreeSpaceEx&#xff0c;使用该函数需要包含头文件windows.h。该函数的原型&#xff1a; 它的四个参数&#xff1a; lpDirectoryName&#xff0…

基于SpringBoot养老院管理系统设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f;感兴趣的可以先收藏起来&#xff0c;还…

基于改进天鹰优化算法(IAO)优化RBF神经网络数据回归预测 (IAO-RBF)的数据回归预测(多输入多输出)

改进天鹰优化算法(IAO)见&#xff1a;【智能优化算法】改进的AO算法(IAO)-CSDN博客 代码原理 基于改进天鹰优化算法&#xff08;IAO&#xff09;优化RBF神经网络数据回归预测&#xff08;IAO-RBF&#xff09;的多输入多输出&#xff08;MIMO&#xff09;数据回归预测&#xf…

CVPR24已开源:刷新监督学习SOTA,无监督多目标跟踪时代来临!

论文标题&#xff1a; Matching Anything by Segmenting Anything 论文作者&#xff1a; Siyuan Li, Lei Ke, Martin Danelljan, Luigi Piccinelli, Mattia Segu, Luc Van Gool, Fisher Yu 导读&#xff1a; 在计算机视觉的征途中&#xff0c;多目标跟踪&#xff08;MOT&…

Centos安装redis(附:图形化管理工具)

第一步&#xff1a;下载redis wget http://download.redis.io/releases/redis-6.2.7.tar.gz 第二步&#xff1a;解压 tar zxvf redis-6.2.7.tar.gz 第三步&#xff1a;安装依赖环境 yum -y install gcc-c第四步&#xff1a;安装依赖环境 make install第五步&#xff1a;修…

开源项目-商城管理系统

哈喽&#xff0c;大家好&#xff0c;今天主要给大家带来一个开源项目-商城管理系统 商城管理系统分前后端两部分。前端主要有商品展示&#xff0c;我的订单&#xff0c;个人中心等内容&#xff1b;后端的主要功能包括产品管理&#xff0c;门店管理&#xff0c;会员管理&#x…

C++之STL(十)

1、适配器 2、函数适配器 #include <iostream> using namespace std;#include <algorithm> #include <vector> #include <functional>bool isOdd(int n) {return n % 2 1; } int main() {int a[] {1, 2, 3, 4, 5};vector <int> v(a, a 5);cou…

外贸人该怎么进行客户分类,怎么找出那20%的重要客户?

更多外贸干货及开发见客户的方法&#xff0c;尽在微信【千千外贸干货】 我们往往只是知道这个规则&#xff0c;却不懂怎么去进行客户分类&#xff0c;怎么找出这20%的重要客户&#xff1f; 具体而言&#xff0c;有8个指标来衡量&#xff1a; 1 利润率高 不以盈利为目的的企业…

使用python基于经纬度获取高德地图定位地址【逆地址解析】

一、高德地图api申请 1. 高德开放平台注册&#xff0c;登录 进入网址&#xff1a;高德开放平台 | 高德地图API 注册 -- 支付宝扫码认证 -- 完善个人信息 -- 登录 2. 申请API &#xff08;1&#xff09;点击头像 -- 应用管理 -- 我的应用 -- 创建新应用 &#xff08;2&…

对于恒指你了解够多吗?

不少人进入股市选择投资哪种哪种期货&#xff0c;都是因为听别人说利润大&#xff0c;于是也不管三七二十一&#xff0c;就盲目的跟着投资了&#xff0c;认为所有的期货都应该应用一样的操作办法&#xff0c;随机应变就是了&#xff0c;其实不然&#xff0c;每种期货都有着自己…

springboot3.x的优势在哪里,我们是否要选择springboot3.x

Spring Boot 3.x的优势主要体现在以下几个方面&#xff0c;这些优势使得它成为了一个值得考虑的选择&#xff1a; Java 17支持&#xff1a;Spring Boot 3.x 支持 Java 17&#xff0c;这是一个长期支持&#xff08;LTS&#xff09;版本&#xff0c;带来了许多新特性和性能改进。…

从ChatGPT代码执行逃逸到LLMs应用安全思考

摘要 11月7日OpenAI发布会后&#xff0c;GPT-4的最新更新为用户带来了更加便捷的功能&#xff0c;包括Python代码解释器、网络内容浏览和图像生成能力。这些创新不仅开辟了人工智能应用的新境界&#xff0c;也展示了GPT-4在处理复杂任务方面的惊人能力。然而&#xff0c;与所有…