前提

环境

oracle 19c   10.0.2.15  
kafka  kafka_2.13-2.8.0 二进制安装   10.0.2.12

安装包

oracle源端

191004_fbo_ggs_Linux_x64_shiphome.zip

kafka目标端

OGG_BigData_Linux_x64_19.1.0.0.5.tar.gz
kafka_2.13-2.8.0.tgz

oracle源端操作

安装ogg

创建ogg目录,解压安装包,root用户操作

mkdir -p /ogg
任意目录下解压,这里解压到了/home/oracle/目录
unzip 191004_fbo_ggs_Linux_x64_shiphome.zip -d /home/oracle/

授权root用户操作

chown -R oracle.oinstall /home/oracle/fbo_ggs_Linux_x64_shiphome/

编辑安装应答文件

cat  /home/oracle/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp 

####################################################################
## Copyright(c) Oracle Corporation 2019. All rights reserved.     ##
##                                                                ##
## Specify values for the variables listed below to customize     ##
## your installation.                                             ##
##                                                                ##
## Each variable is associated with a comment. The comment        ##
## can help to populate the variables with the appropriate        ##
## values.                                                        ##
##                                                                ##
## IMPORTANT NOTE: This file should be secured to have read       ##
## permission only by the oracle user or an administrator who     ##
## own this installation to protect any sensitive input values.   ##
##                                                                ##
####################################################################

#-------------------------------------------------------------------------------
# Do not change the following system generated value. 
#-------------------------------------------------------------------------------
oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v19_1_0


################################################################################
##                                                                            ##
## Oracle GoldenGate installation option and details                          ##
##                                                                            ##
################################################################################

#-------------------------------------------------------------------------------
# Specify the installation option.
# Specify ORA19c for installing Oracle GoldenGate for Oracle Database 19c or
#         ORA18c for installing Oracle GoldenGate for Oracle Database 18c or
#         ORA12c for installing Oracle GoldenGate for Oracle Database 12c or
#         ORA11g for installing Oracle GoldenGate for Oracle Database 11g 
#-------------------------------------------------------------------------------
INSTALL_OPTION=ORA19c

#-------------------------------------------------------------------------------
# Specify a location to install Oracle GoldenGate
#-------------------------------------------------------------------------------
SOFTWARE_LOCATION=/ogg

#-------------------------------------------------------------------------------
# Specify true to start the manager after installation. 
#-------------------------------------------------------------------------------
START_MANAGER=true

#-------------------------------------------------------------------------------
# Specify a free port within the valid range for the manager process.
# Required only if START_MANAGER is true.
#-------------------------------------------------------------------------------
MANAGER_PORT=7809

#-------------------------------------------------------------------------------
# Specify the location of the Oracle Database.
# Required only if START_MANAGER is true.
#-------------------------------------------------------------------------------
DATABASE_LOCATION=/opt/oracle/product/19.3.0/db_1


################################################################################
##                                                                            ##
## Specify details to Create inventory for Oracle installs                    ##
## Required only for the first Oracle product install on a system.            ##
##                                                                            ##
################################################################################

#-------------------------------------------------------------------------------
# Specify the location which holds the install inventory files.
# This is an optional parameter if installing on
# Windows based Operating System.
#-------------------------------------------------------------------------------
INVENTORY_LOCATION=/opt/oraInventory

#-------------------------------------------------------------------------------
# Unix group to be set for the inventory directory.  
# This parameter is not applicable if installing on
# Windows based Operating System.
#-------------------------------------------------------------------------------
UNIX_GROUP_NAME=oinstall

开始安装

[oracle@localhost Disk1]$ pwd
/home/oracle/fbo_ggs_Linux_x64_shiphome/Disk1
      
[oracle@localhost Disk1]$ ./runInstaller -silent -responseFile /home/oracle/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

配置环境变量oracle用户操作

[root@localhost oracle]# su - oracle
Last login: Mon May 24 16:10:01 CST 2021 on pts/1
[oracle@localhost ~]$ cat ~/.bash_profile
export OGG_HOME=/ogg
export PATH=$OGG_HOME:$PATH
export LD_LIBRARY_PATH=$OGG_HOME:$LD_LIBRARY_PATH

确认是否安装成功
切换到ogg安装目录

ggsci

oracle设置日志归档模式

切换到oracle用户并登录数据库

su - oracle
sqlplus / as sysdba

查看是否为归档模式

SQL> archive log list 
Database log mode	       Archive Mode
Automatic archival	       Disabled
Archive destination	       USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     11
Next log sequence to archive   13
Current log sequence	       13

设置数据库归档模式

conn / as sysdba (以DBA身份连接数据库) 
SQL> shutdown immediate (立即关闭数据库)
SQL> startup mount (启动实例并加载数据库,但不打开)
SQL> alter database archivelog; (更改数据库为归档模式)
SQL> alter database open; (打开数据库)
SQL> alter system archive log start; (启用自动归档)

确认

SQL> archive log list 
Database log mode	       Archive Mode
Automatic archival	       Enabled
Archive destination	       USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     11
Next log sequence to archive   13
Current log sequence	       13

打开日志开关

查看是否开启日志

SQL> select force_logging, supplemental_log_data_min from v$database;

FORCE_LOGGING
--------------------------------------------------------------------------------
SUPPLEMENTAL_LOG_DATA_MI
------------------------
NO
NO

开启相关日志

SQL> ALTER DATABASE FORCE LOGGING;

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

SQL> alter system switch logfile;

SQL> alter system set enable_goldengate_replication=true scope=both;

确认

SQL> select force_logging, supplemental_log_data_min from v$database;

FORCE_LOGGING
--------------------------------------------------------------------------------
SUPPLEMENTAL_LOG_DATA_MI
------------------------
YES
YES

oracle创建复制用户并授权

创建表空间目录并授权

mkdir -p /u01/app/oracle/oggdata/orcl
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl

创建复制用户并授权

SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

SQL> create user ogg identified by ogg default tablespace oggtbs;

SQL> grant dba to ogg;

ogg初始化

[oracle@localhost ogg]$ cd /ogg/
[oracle@localhost ogg]$ ggsci 

Oracle GoldenGate Command Interpreter for Oracle
Version 19.1.0.0.4 OGGCORE_19.1.0.0.0_PLATFORMS_191017.1054_FBO
Linux, x64, 64bit (optimized), Oracle 19c on Oct 17 2019 21:16:29
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (localhost.localdomain) 1> create subdirs

oracle创建测试表

SQL> create user test_ogg  identified by test_ogg default tablespace users;
SQL> grant dba to test_ogg;
SQL> conn test_ogg/test_ogg;
SQL> create table test_ogg(id int ,name varchar(20),primary key(id));

kafka目标端操作

安装ogg客户端

创建目录

mkdir -p /opt/ogg

解压安装包到安装目录

gzip OGG_BigData_Linux_x64_19.1.0.0.5.tar.gz
tar zxvf OGG_BigData_Linux_x64_19.1.0.0.5.tar.gz -C /opt/ogg/

授权

useradd oracle
chown -R oracle.oracle /opt/ogg

配置环境变量

PATH=$PATH:$HOME/bin
export OGG_HOME=/opt/ogg
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b03-1.el7.x86_64/jre
export LD_LIBRARY_PATH=/opt/ogg/dirprm:/opt/kafka/libs:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64/libjsig.so:$JAVA_HOME/lib/amd64/server/libjvm.so:$OGG_HOME/lib:$OGG_HOME/dirprm
export PATH=$JAVA_HOME/bin:$OGG_HOME:$PATH
export CLASSPATH=/opt/ogg/dirprm:/opt/kafka/libs:/opt/ogg/ggjava/ggjava.jar

使环境变量生效

source /etc/profile

测试下ogg

ggsci

初始化ogg

[root@localhost opt]# cd /opt/ogg/
[root@localhost ogg]# ggsci 

Oracle GoldenGate for Big Data
Version 19.1.0.0.5 (Build 007)

Oracle GoldenGate Command Interpreter
Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (localhost.localdomain) 1> create subdirs

OGG源端操作

配置OGG的全局变量

su oracle
[oracle@localhost ogg]$ cd /ogg/
[oracle@localhost ogg]$ ggsci 

Oracle GoldenGate Command Interpreter for Oracle
Version 19.1.0.0.4 OGGCORE_19.1.0.0.0_PLATFORMS_191017.1054_FBO
Linux, x64, 64bit (optimized), Oracle 19c on Oct 17 2019 21:16:29
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.


登录数据库
GGSCI (localhost.localdomain) 1> dblogin userid ogg password ogg
Successfully logged into database.
GGSCI (localhost.localdomain as ogg@orcl) 2> edit param ./globals
oggschema ogg

配置mgr进程

GGSCI (localhost.localdomain as ogg@orcl) 3> edit param mgr


PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

说明:

PORT               即mgr的默认监听端口;
DYNAMICPORTLIST    动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;
AUTORESTART        重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;
PURGEOLDEXTRACTS   即TRAIL文件的定期清理
GGSCI (localhost.localdomain as ogg@orcl) 4> add trandata test_ogg.test_ogg

2021-05-24 16:46:11  INFO    OGG-15131  Logging of supplemental redo log data is already enabled for table TEST_OGG.TEST_OGG.

2021-05-24 16:46:11  INFO    OGG-15135  TRANDATA for instantiation CSN has been added on table TEST_OGG.TEST_OGG.

2021-05-24 16:46:11  INFO    OGG-10471  ***** Oracle Goldengate support information on table TEST_OGG.TEST_OGG ***** 
Oracle Goldengate support native capture on table TEST_OGG.TEST_OGG.
Oracle Goldengate marked following column as key columns on table TEST_OGG.TEST_OGG: ID.

GGSCI (localhost.localdomain as ogg@orcl) 5> info trandata test_ogg.test_ogg

2021-05-24 16:46:18  INFO    OGG-10471  ***** Oracle Goldengate support information on table TEST_OGG.TEST_OGG ***** 
Oracle Goldengate support native capture on table TEST_OGG.TEST_OGG.
Oracle Goldengate marked following column as key columns on table TEST_OGG.TEST_OGG: ID.

Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.

Columns supplementally logged for table TEST_OGG.TEST_OGG: "ID".

Prepared CSN for table TEST_OGG.TEST_OGG: 3094843

配置extract进程

GGSCI (localhost.localdomain as ogg@orcl) 6> edit param extkafka


extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /ogg/dirdat/to
table test_ogg.test_ogg;

说明:

第一行指定extract进程名称;
dynamicresolution          动态解析;
SETENV                     设置环境变量,这里分别设置了Oracle数据库以及字符集;
userid ggs,password ggs    即OGG连接Oracle数据库的帐号密码,这里使用2.5中特意创建的复制帐号;
exttrail                   定义trail文件的保存位置以及文件名,配置的是源端的目录,注意这里文件名只能是2个字母,其余部分OGG会补齐;
table                      即复制表的表名,支持*通配,必须以;结尾

添加extract进程:

GGSCI (localhost.localdomain as ogg@orcl) 7> add extract extkafka,tranlog,begin now
# 或者指定捕获时间点
# GGSCI>ADD EXTRACT extkafka,TRANLOG,BEGIN 2022-03-25 17:00
 
# 可选:删除Extract组:
# GGSCI >DELETE EXTRACT extkafka

添加trail文件的定义与extract进程绑定

GGSCI (localhost.localdomain as ogg@orcl) 8> add exttrail /opt/ogg/dirdat/to,extract extkafka
# 可选,删除Trail文件:
# GGSCI >DELETE EXTTRAIL /opt/ogg/dirdat/to
# rm /opt/ogg/dirdat/to*

注意: /opt/ogg/dirdat/是源端的目录

配置pump进程

pump进程负责把trail文件传送到目标端
Pump进程也是一种Extract进程,只因配置不同而功能不同。

GGSCI (localhost.localdomain as ogg@orcl) 9> edit param pukafka


extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 10.0.2.12 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

说明:

第一行指定extract进程名称;
passthru                  即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;
dynamicresolution         动态解析;
userid ogg,password ogg   即OGG连接Oracle数据库的帐号密码
rmthost和mgrport          即目标端(kafka)OGG的mgr服务的地址以及监听端口;
rmttrail                  即目标端trail文件存储位置以及名称。

分别将本地trail文件和目标端的trail文件绑定到extract进程:

GGSCI (localhost.localdomain as ogg@orcl) 10> add extract pukafka,exttrailsource /ogg/dirdat/to
# EXTTRAILSOURCE指前面extkafka的Trail文件
# 可选:删除Extract组:
# GGSCI >DELETE EXTRACT pukafka

Pump从本地Trail文件提取的变更,保存到远程Trail文件路径的位置,一个本地(源端)Trail文件可以分成多个远程Trail文件,在远程用多个Replicat同步,以增大速度。

GGSCI (localhost.localdomain as ogg@orcl) 11> add rmttrail /opt/ogg/dirdat/to,extract pukafka
# 可选,删除Data Pump组:DELETE EXTRACT pump201
# 删除远程Trail文件:
# DELETE RMTTRAIL /opt/ogg/dirdat/to
# rm /opt/ogg/dirdat/to*

注意:第一条命令exttrailsource后是源端的目录
第二条命令rmttrail后是目标端的目录

配置define文件

GGSCI (localhost.localdomain as ogg@orcl) 12> edit param test_ogg


defsfile /ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

注意:/ogg/dirdef/为源端目录
table后是表名,oracle表名格式为 用户名.表名

[oracle@localhost ogg]$ cd /ogg/
[oracle@localhost ogg]$ ./defgen paramfile dirprm/test_ogg.prm

将生成的/opt/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

[oracle@localhost ogg]$ scp -r /ogg/dirdef/test_ogg.test_ogg root@10.0.2.12:/opt/ogg/dirdef/ 

OGG目标端配置

前提kafka服务开启

配置管理器mgr

[root@localhost ogg]# cd /opt/ogg/
[root@localhost ogg]# ggsci 

Oracle GoldenGate for Big Data
Version 19.1.0.0.5 (Build 007)

Oracle GoldenGate Command Interpreter
Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (localhost.localdomain) 1> edit param mgr


PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

配置checkpoint

checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。

GGSCI (localhost.localdomain) 2> edit  param  ./GLOBALS


CHECKPOINTTABLE test_ogg.checkpoint

配置replicate进程

GGSCI (localhost.localdomain) 3> edit param rekafka


REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

说明:

REPLICATE rekafka    定义 rep进程名称;
sourcedefs           即在4.6中在源服务器上做的表映射文件(如果该文件已经复制到了目标端,这里就是指目标端的路径及其文件);
TARGETDB LIBFILE     即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;
REPORTCOUNT          即复制任务的报告生成频率;
GROUPTRANSOPS        为以事务传输时,事务合并的单位,减少IO操作;
MAP                  即源端与目标端的映射关系

配置kafka.props

cd /opt/ogg/dirprm/
vim kafka.props
[root@localhost ogg]# cd /opt/ogg/dirprm/
[root@localhost dirprm]# cat kafka.props 
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties 
gg.handler.kafkahandler.topicMappingTemplate=test_ogg 
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op  
gg.classpath=dirprm/:/opt/kafka_2.13-2.8.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
[root@localhost ogg]# cd /opt/ogg/dirprm/
[root@localhost dirprm]# cat custom_kafka_producer.properties 
bootstrap.servers=10.0.2.12:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

其中需要将后面的注释去掉,ogg不识别注释,如果不去掉会报错

添加trail文件到replicate进程

[root@localhost dirprm]# cd /opt/ogg/
[root@localhost ogg]# ggsci 

Oracle GoldenGate for Big Data
Version 19.1.0.0.5 (Build 007)

Oracle GoldenGate Command Interpreter
Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (localhost.localdomain) 1> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

说明:add replicat rekafka exttrail 后的目录为目标端的目录

测试

源端操作

启动进程

[oracle@localhost ogg]$ cd /ogg/
[oracle@localhost ogg]$ ggsci 

Oracle GoldenGate Command Interpreter for Oracle
Version 19.1.0.0.4 OGGCORE_19.1.0.0.0_PLATFORMS_191017.1054_FBO
Linux, x64, 64bit (optimized), Oracle 19c on Oct 17 2019 21:16:29
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.
GGSCI (localhost.localdomain) 1> start mgr
GGSCI (localhost.localdomain) 2> start extkafka
GGSCI (localhost.localdomain) 3> start pukafka

目标端操作

启动进程

[root@localhost ogg]# cd /opt/ogg/
[root@localhost ogg]# ggsci 

Oracle GoldenGate for Big Data
Version 19.1.0.0.5 (Build 007)

Oracle GoldenGate Command Interpreter
Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:01:58
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (localhost.localdomain) 1> start mgr
GGSCI (localhost.localdomain) 2> start rekafka

查看源端进程是否启动成功

GGSCI (localhost.localdomain) 4> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
EXTRACT     RUNNING     EXTKAFKA    00:00:00      00:00:09    
EXTRACT     RUNNING     PUKAFKA     00:00:00      00:00:08    

查看目标端进程是否启动成功

GGSCI (localhost.localdomain) 3> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
REPLICAT    RUNNING     REKAFKA     00:00:00      00:00:03    

排查错误方法

查看进程日志
登录ogg命令行以下执行命令
view report 进程名

GGSCI (localhost.localdomain) 4> view report rekafka


***********************************************************************
                    Oracle GoldenGate for Big Data
                    Version 19.1.0.0.5 (Build 007)
                                   
                      Oracle GoldenGate Delivery
 Version 19.1.0.0.200714 OGGCORE_19.1.0.0.0OGGBP_PLATFORMS_200628.2141
    Linux, x64, 64bit (optimized), Generic on Jun 28 2020 23:07:55
 
Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.

                    Starting at 2021-05-24 15:48:53
***********************************************************************

Operating System Version:
Linux
Version #1 SMP Fri Mar 6 11:36:42 UTC 2015, Release 3.10.0-229.el7.x86_64
Node: localhost.localdomain
Machine: x86_64
                         soft limit   hard limit
Address Space Size   :    unlimited    unlimited
Heap Size            :    unlimited    unlimited
File Size            :    unlimited    unlimited
CPU Time             :    unlimited    unlimited

Process id: 3161

Description: 

***********************************************************************
**            Running with the following parameters                  **
***********************************************************************

2021-05-24 15:48:53  INFO    OGG-03059  Operating system character set identified as UTF-8.

2021-05-24 15:48:53  INFO    OGG-02695  ANSI SQL parameter syntax is used for parameter parsing.

2021-05-24 15:48:53  INFO    OGG-03528  The source database character set, as determined from the table definition file, is UTF-8.

登录oracle数据库

[oracle@localhost ogg]$ sqlplus / as sysdba

更新数据

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件状态

[oracle@localhost ogg]$ ll /ogg/dirdat/
total 8
-rw-r----- 1 oracle oinstall 1285 May 24 15:57 to000000000

查看目标端trail文件状态

[root@localhost ogg]# ll /opt/ogg/dirdat/
total 4
-rw-r-----. 1 oracle oracle 2498 May 24 17:09 to000000000

查看kafka是否自动建立对应的主题

[root@localhost kafka_2.13-2.8.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
test_ogg

通过消费者看是否有同步消息

[root@localhost kafka_2.13-2.8.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.0.2.12:9092 --topic test_ogg --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2021-05-24 15:59:41.000814","current_ts":"2021-05-24T15:59:48.395000","pos":"00000000000000001827","after":{"ID":2,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2021-05-24 15:59:49.001165","current_ts":"2021-05-24T15:59:55.782000","pos":"00000000000000001958","after":{"ID":3,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2021-05-24 15:59:53.002124","current_ts":"2021-05-24T15:59:57.815000","pos":"00000000000000002085","after":{"ID":4,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2021-05-24 17:09:39.002891","current_ts":"2021-05-24T17:09:46.082000","pos":"00000000000000002211","before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2021-05-24 17:09:39.002891","current_ts":"2021-05-24T17:09:46.091000","pos":"00000000000000002366","before":{"ID":1,"NAME":"zhangsan"}}

操作命令

#停止服务

GGSCI >stop extract ex201

#查看状态:

GGSCI >info ex201
GGSCI >stats ex201
GGSCI >status ex201

#如果成功运行,会在本地Trail路径下应该生成文件,如果出错,查看错误原因:

GGSCI >view report ex201

注意

如果是大数据量抽取,需要调整几个参数。
分别在kafka端和ogg目标端调整
kafka的配置文件

server.properties
socket.send.buffer.bytes=5024000
socket.receive.buffer.bytes=5024000

ogg目标端参数

custom_kafka_producer.properties
max.request.size=5024000
send.buffer.bytes=5024000

修改完重启kafka和目标端ogg进程。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐