# annote **Repository Path**: anliang11/annote ## Basic Information - **Project Name**: annote - **Description**: 学习笔记 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2020-08-29 - **Last Updated**: 2021-09-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 利用ogg实现oracle到kafka的增量数据实时同步 #### 前言 > ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json。 下面是源端和目标端的一些配置信息: | -- | 版本 | OGG版本 | IP | 别名 | | :--------- | :--: | -----------: | :-------------: | :---------: | | 源端 | OracleRelease 11.2.0.1.0 | Oracle GoldenGate 12.3.0.1.4 for Oracle on Linux x86-64 | 192.168.226.138 | oraclepc | | 目标端 | kafka_2.11-0.10.2.2 | Oracle GoldenGate for Big Data 12.3.0.1.0 on Linux x86-64 | 192.168.226.139 | streamsets | #### 效果 ###### 源端 - 源端事务型操作 ![这里写图片描述](files/images/源端数据库插入修改删除操作.png) - 目标端kafka流数据 ![这里写图片描述](files/images/目标端kafka消息输出结果.png) #### 1、下载 **下载地址:** - [GodenGate for Oracle](http://www.oracle.com/splash/www/index.html?nexturl=http://download.oracle.com/otn/goldengate/12301/123014_fbo_ggs_Linux_x64_shiphome.zip) - [GodenGate for BigData](https://download.oracle.com/otn/goldengate/123010/123010_ggs_Adapters_Linux_x64.zip?AuthParam=1598656891_bc5d7149964b0f9171e75e115333c8fa) #### 2、源端配置 _注意:源端是安装了oracle的机器,oracle环境变量之前都配置好了_ ###### 2.1 解压\安装 **先建立ogg目录** ```$xslt mkdir -p /opt/ogg chown -R oracle:oinstall /opt/ogg (使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功) unzip 123014_fbo_ggs_Linux_x64_shiphome.zip ``` **解压后得到OGG安装文件夹** ```$xslt cd /opt/ogg/fbo_ggs_Linux_x64_shiphome ``` **修改静默安装配置** **修改/opt/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp文件内容如下:** ```sbtshell #不要修改这个值 oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2 #根据系统是12c还是11g选择ORA12c或者ORA11g INSTALL_OPTION=ORA11g #写上goldengate的安装目录 SOFTWARE_LOCATION=/opt/ogg/ #是否在配置完成后自动启动mgr进程,是就选true,否就选false START_MANAGER=false #在start_manager为true时添加,选择mgr启动端口号 MANAGER_PORT= #在start_manager为true时添加,写上$ORACLE_HOME的值 DATABASE_LOCATION=$ORACLE_HOME #如果在安装goldengate前没有安装过其他Oracle产品再填写以下两个参数 INVENTORY_LOCATION=/data/oracle/inventory UNIX_GROUP_NAME=oinstall ``` ###### 启动静默安装 ```sbtshell cd /opt/ogg/fbo_ggs_Linux_x64_shiphome/Disk1 ./runInstaller -silent -responseFile /opt/ogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp ``` 之后开始安装 这时候会提示一个安装日志 ```sbtshell You can find the log of this install session at: /oracle/app/oraInventory/logs/installActions2020-08-29_11-26-16AM.log 可以使用 tail -100f /oracle/app/oraInventory/logs/silentInstall2020-08-29_11-26-16AM.log来查看安装情况 安装完成后,安装命令执行页面会输出以下内容: The installation of Oracle GoldenGate Core was successful. Please check '/data/oracle/oraInventory/logs/silentInstall2020-08-29_11-26-16AM.log' for more details. Successfully Setup Software. ``` ###### 2.2 配置ogg环境变量 > 为了简单方便起见,在/etc/profile里配置的,建议在生产中配置oracle的环境变量文件/home/oracle/.bash_profile里配置,为了怕出问题,我把OGG_HOME等环境变量在/etc/profile配置了一份。 ```sbtshell vim /etc/profile ``` ```sbtshell export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib export PATH=$OGG_HOME:$PATH ``` 使之生效 ```sbtshell source /etc/profile ``` 测试一下ogg命令 ```sbtshell ggsci ``` ###### 2.3 oracle打开归档模式 ```sbtshell su oracle source ~/.bash_profile sqlplus / as sysdba ``` 执行下面的命令查看当前是否为归档模式 ```sbtshell archive log list ``` ```sbtshell SQL> archive log list Database log mode No Archive Mode Automatic archival Disabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Current log sequence 14 ``` 若为Disabled,手动打开即可 ```sbtshell conn / as sysdba (以DBA身份连接数据库) shutdown immediate (立即关闭数据库) startup mount (启动实例并加载数据库,但不打开) alter database archivelog; (更改数据库为归档模式) alter database open; (打开数据库) alter system archive log start; (启用自动归档) ``` 再执行一下 ```sbtshell archive log list ``` ```sbtshell Database log mode Archive Mode Automatic archival Enabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Next log sequence to archive 14 Current log sequence 14 ``` 可以看到为Enabled,则成功打开归档模式。 ###### 2.4 Oracle打开日志相关 OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态 ```hsqldb select force_logging, supplemental_log_data_min from v$database; ``` ```sbtshell FORCE_ SUPPLEMENTAL_LOG ------ ---------------- NO NO ``` 若为NO,则需要通过命令修改 ```sbtshell alter database force logging; alter database add supplemental log data; ``` 再查看一下为YES即可 ```sbtshell SQL> select force_logging, supplemental_log_data_min from v$database; FORCE_ SUPPLEMENTAL_LOG ------ ---------------- YES YES ``` ###### 2.5 oracle创建复制用户 首先root用户建立相关文件夹,并赋予权限 ```sbtshell mkdir -p /data/oracle/tablespace chown -R oracle:oinstall /data/oracle/tablespace ``` 然后执行下面sql ```sbtshell SQL> create tablespace oggtbs datafile '/data/oracle/tablespace/oggtbs01.dbf' size 1000M autoextend on; Tablespace created. SQL> create user ogg identified by ogg default tablespace oggtbs; User created. SQL> grant dba to ogg; Grant succeeded. ``` 2.6 OGG初始化 > 这里特别注意源端和目标端的ogg目录要保持一致 ```sbtshell ggsci create subdirs ``` ```sbtshell ggsci GGSCI (oraclepc) 1> create subdirs Creating subdirectories under current directory /root Parameter files /opt/ogg/dirprm: created Report files /opt/ogg/dirrpt: created Checkpoint files /opt/ogg/dirchk: created Process status files /opt/ogg/dirpcs: created SQL script files /opt/ogg/dirsql: created Database definitions files /opt/ogg/dirdef: created Extract data files /opt/ogg/dirdat: created Temporary files /opt/ogg/dirtmp: created Stdout files /opt/ogg/dirout: created GGSCI (oraclepc) 2> ``` ###### 2.7 Oracle创建测试表 创建一个用户,在该用户下新建测试表,用户名、密码、表名均为 test_ogg。 ```hsqldb create user test_ogg identified by test_ogg default tablespace users; grant dba to test_ogg; conn test_ogg/test_ogg; create table test_ogg(id int ,name varchar(20),primary key(id)); ``` #### 3、目标端(kafka)配置 ```sbtshell mkdir -p /opt/ogg unzip 123111_ggs_Adapters_Linux_x64.zip tar xf ggs_Adapters_Linux_x64.tar -C /opt/ogg/ ``` ###### 3.2 环境变量 ```sbtshell vim /etc/profile ``` ```sbtshell export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib export PATH=$OGG_HOME:$PATH ``` ```sbtshell source /etc/profile ``` 同样测试一下ogg命令 ```sbtshell ggsci ``` ###### 3.3 初始化目录 ```sbtshell create subdirs ``` #### 4、OGG源端配置 ###### 4.1 配置OGG的全局变量 先切换到oracle用户下 ```sbtshell su oracle cd /opt/ogg ggsci ``` ```sbtshell GGSCI (oraclepc) 1> dblogin userid ogg password ogg Successfully logged into database. GGSCI (oraclepc) 2> edit param ./globals ``` 然后和用vim编辑一样添加 ```sbtshell oggschema ogg ``` ###### 4.2 配置管理器mgr ```sbtshell GGSCI (oraclepc) 3> edit param mgr PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 ``` > 说明:PORT即mgr的默认监听端口;DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;PURGEOLDEXTRACTS即TRAIL文件的定期清理 ###### 4.3 添加复制表 ```sbtshell GGSCI (oraclepc) 4> add trandata test_ogg.test_ogg Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG. GGSCI (oraclepc) 5> info trandata test_ogg.test_ogg Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG. Columns supplementally logged for table TEST_OGG.TEST_OGG: ID ``` ###### 4.4 配置extract进程 ```sbtshell GGSCI (oraclepc) 6> edit param extkafka extract extkafka dynamicresolution SETENV (ORACLE_SID = "orcl") SETENV (NLS_LANG = "american_america.AL32UTF8") userid ogg,password ogg exttrail /opt/ogg/dirdat/to table test_ogg.test_ogg; ``` >说明:第一行指定extract进程名称;dynamicresolution动态解析;SETENV设置环境变量,这里分别设置了Oracle数据库以及字符集;userid ggs,password ggs即OGG连接Oracle数据库的帐号密码,这里使用2.5中特意创建的复制帐号;exttrail定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐;table即复制表的表名,支持*通配,必须以;结尾 添加extract进程: ```sbtshell GGSCI (oraclepc) 16> add extract extkafka,tranlog,begin now EXTRACT added. ``` 添加trail文件的定义与extract进程绑定: ```sbtshell GGSCI (oraclepc) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka EXTTRAIL added. ``` ###### 4.5 配置pump进程 pump进程本质上来说也是一个extract,只不过他的作用仅仅是把trail文件传递到目标端,配置过程和extract进程类似,只是逻辑上称之为pump进程 ```sbtshell GGSCI (oraclepc) 18> edit param pukafka extract pukafka passthru dynamicresolution userid ogg,password ogg rmthost 192.168.226.139 mgrport 7809 rmttrail /opt/ogg/dirdat/to table test_ogg.test_ogg; ``` > 说明:第一行指定extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。 分别将本地trail文件和目标端的trail文件绑定到extract进程: ```sbtshell GGSCI (oraclepc) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to EXTRACT added. GGSCI (oraclepc) 2> add rmttrail /opt/ogg/dirdat/to,extract pukafka RMTTRAIL added. ``` ###### 4.6 配置define文件 ```sbtshell Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行: ``` ```sbtshell GGSCI (oraclepc) 3> edit param test_ogg defsfile /opt/ogg/dirdef/test_ogg.test_ogg userid ogg,password ogg table test_ogg.test_ogg; ``` 在OGG主目录下执行(oracle用户): ```sbtshell ./defgen paramfile dirprm/test_ogg.prm *********************************************************************** Oracle GoldenGate Table Definition Generator for Oracle Version 12.3.0.1.4 OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359 Linux, x64, 64bit (optimized), Oracle 11g on Apr 15 2018 10:11:00 Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved. Starting at 2020-08-29 06:05:24 *********************************************************************** Operating System Version: Linux Version #1 SMP Fri Apr 20 16:44:24 UTC 2018, Release 3.10.0-862.el7.x86_64 Node: oraclepc Machine: x86_64 soft limit hard limit Address Space Size : unlimited unlimited Heap Size : unlimited unlimited File Size : unlimited unlimited CPU Time : unlimited unlimited Process id: 6931 *********************************************************************** ** Running with the following parameters ** *********************************************************************** defsfile /opt/ogg/dirdef/test_ogg.test_ogg userid ogg,password *** table test_ogg.test_ogg; Retrieving definition for TEST_OGG.TEST_OGG. Definitions generated for 1 table in /opt/ogg/dirdef/test_ogg.test_ogg. ``` 将生成的/opt/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里: ```sbtshell scp -r /opt/ogg/dirdef/test_ogg.test_ogg root@192.168.226.139:/opt/ogg/dirdef/ ``` #### 5、OGG目标端配置 ###### 5.1 开启kafka服务 略 ###### 5.2 配置管理器mgr ```sbtshell GGSCI (streamsets) 1> edit param mgr PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 ``` ###### 5.3 配置checkpoint checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。 ```sbtshell edit param ./GLOBALS CHECKPOINTTABLE test_ogg.checkpoint ``` ###### 5.4 配置replicate进程 ```sbtshell GGSCI (streamsets) 4> edit param rekafka REPLICAT rekafka sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg; ``` > 说明:REPLICATE rekafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系 ###### 5.5 配置kafka.props ```sbtshell cd /opt/ogg/dirprm/ vim kafka.props ``` ```sbtshell gg.handlerlist=kafkahandler //handler类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建 gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等 gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.classpath=dirprm/:/opt/kafka/libs/*:/opt/ogg/:/opt/ogg/lib/* ``` ```sbtshell vim custom_kafka_producer.properties ``` ```sbtshell bootstrap.servers=192.168.226.139:9092 //kafkabroker的地址 acks=1 # compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 ``` 其中需要将后面的注释去掉,ogg不识别注释,如果不去掉会报错 ###### 5.6 添加trail文件到replicate进程 ```sbtshell GGSCI (streamsets) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint REPLICAT added. ``` #### 6、测试 ###### 6.1 启动所有进程 在源端和目标端的OGG命令行下使用start 进程名的形式启动所有进程。 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。 全部需要在ogg目录下执行ggsci目录进入ogg命令行。 源端依次是 ```sbtshell start mgr start extkafka start pukafka ``` 目标端 ```sbtshell start mgr start rekafka ``` 可以通过info all 或者info 进程名 查看状态,所有的进程都为RUNNING才算成功 源端 ```sbtshell GGSCI (oraclepc) 5> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTKAFKA 04:50:21 00:00:03 EXTRACT RUNNING PUKAFKA 00:00:00 00:00:03 ``` 目标端 ```sbtshell GGSCI (streamsets) 3> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING REKAFKA 00:00:00 00:00:01 ``` ###### 6.2 异常解决 如果有不是RUNNING可通过查看日志的方法检查解决问题,ogg命令行,以rekafka进程为例 ```sbtshell GGSCI (streamsets) 2> view report rekafka ``` ###### 6.3 测试同步更新效果 现在源端执行sql语句 ```hsqldb conn test_ogg/test_ogg insert into test_ogg values(1,'test'); commit; update test_ogg set name='zhangsan' where id=1; commit; delete test_ogg where id=1; commit; ``` 查看源端trail文件状态 ```sbtshell [oracle@oraclepc tablespace]$ ls -l /opt/ogg/dirdat/to* -rw-r----- 1 oracle oinstall 2353 8月 29 06:41 /opt/ogg/dirdat/to000000000 ``` 查看目标端trail文件状态 ```sbtshell [root@streamsets ogg]# ls -l /opt/ogg/dirdat/to* -rw-r-----. 1 root root 2392 Aug 28 15:41 /opt/ogg/dirdat/to000000000 ``` 查看kafka是否自动建立对应的主题 ```sbtshell [root@streamsets ogg]# kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets test1 test_ogg ``` 在列表中显示有test_ogg则表示没问题 通过消费者看是否有同步消息 ```sbtshell [root@streamsets ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-08-28 15:32:26.998384","current_ts":"2020-08-28T15:32:32.331000","pos":"00000000000000001854","after":{"ID":1,"NAME":"test"}} {"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2020-08-28 15:32:37.998918","current_ts":"2020-08-28T15:32:43.582000","pos":"00000000000000001984","before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"zhangsan"}} {"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2020-08-28 15:32:45.997888","current_ts":"2020-08-28T15:32:50.601000","pos":"00000000000000002138","before":{"ID":1,"NAME":"zhangsan"}} {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-08-28 15:41:41.997974","current_ts":"2020-08-28T15:41:48.445000","pos":"00000000000000002268","after":{"ID":2,"NAME":"an"}} ```