数据库CDC中间件学习之Canal
文章目录介绍准备工作打开MySQL的Binlog创建测试数据库及表Canal安装配置下载jar包配置canal/conf/canal.properties:conf/example/instance.properties启动使用Kafka实时消费TCP实时消费介绍类似于Maxwell,canal也是一个实时数据库变更订阅中间件,它是阿里用java开发的基于数据库增量日志解析、提供增量数据订阅消费的
文章目录
介绍
类似于Maxwell,canal也是一个实时数据库变更订阅中间件,它是阿里用java开发的基于数据库增量日志解析、提供增量数据订阅消费的中间件。目前,Canal主要支持了MySQL的binlog解析,并利用CanalClient进行处理以获得相关数据,其工作原理也是将自己伪装成MySQL主从复制中的从服务器,参见Maxwell相关笔记。
准备工作
打开MySQL的Binlog
编辑MySQL配置文件my.ini,开启binlog,且格式为row:
server-id=1
log-bin=mysql-bin
binlog-format=Row
而后重启MySQL。
创建测试数据库及表
创建数据库gmall-2021:
然后创建数据表:
mysql> use gmall-2021;
mysql> CREATE TABLE user_info(
-> `id` VARCHAR(255),
-> `name` VARCHAR(255),
-> `sex` VARCHAR(255)
-> );
Canal安装配置
下载jar包
下载地址:https://github.com/alibaba/canal/releases,我选择的是1.1.2版本,上传到虚拟机上,解压到canal目录中:
[root@scentos szc]# mkdir canal
[root@scentos szc]# tar -zxvf canal.deployer-1.1.2.tar.gz -C canal/
配置
要修改的文件有canal/conf/canal.properties和conf/example/instance.properties
canal/conf/canal.properties:
[root@scentos szc]# cd canal/
[root@scentos canal]# vim conf/canal.properties
要修改的关键内容如下:
.....
canal.ip = scentos
.....
canal.zkServers = scentos:2181
.....
# tcp, kafka, RocketMQ
canal.serverMode = kafka
.....
canal.destinations = example
....
canal.mq.servers = scentos:9092
....
首先定义canal所在服务器的ip地址,和要连接的Zookeeper服务器的位置,再把Canal的serverMode从默认的tcp改为kafka,最后将mq.servers修改成Kafka服务器的地址。
conf/example/instance.properties
注意我们这里的目标名为example,因此,接下来对conf/example/instance.properties进行修改:
[root@scentos canal]# vim conf/example/instance.properties
要修改的关键内容如下:
....
canal.instance.master.address=192.168.31.60:3306
....
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
....
canal.mq.topic=example_canal
....
# hash partition config
canal.mq.partitionsNum=1
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
....
首先修改目标MySQL的位置(canal.instance.master.address),再修改连接MySQL的用户名和密码。因为我们这里是向Kafka输出数据,因此要指定主题和分区数(canal.mq.partitionsNum),如果分区数>1,则要通过canal.mq.partitionHash字段指定参与分区的数据表字段。
启动
事先启动Zookeeper和Kafka,而后启动Canal:
[root@scentos canal]# bin/startup.sh
而后jps一下,看到CanalLauncher就说明启动成功:
[root@scentos canal]# jps
6470 CanalLauncher
4136 Kafka
4187 QuorumPeerMain
6508 Jps
使用
Kafka实时消费
启动Kafka的命令行客户端:
[root@scentos bin]# ./kafka-console-consumer.sh --bootstrap-server scentos:9092 --topic example_canal
向MySQL数据表里插入一条数据:
mysql> insert into user_info values('1', 'szc', 'male');
Query OK, 1 row affected (0.00 sec)
会看到Kafka命令行客户端的输出如下:
{"data":[{"id":"1","name":"szc","sex":"male"}],"database":"gmall-2021","es":1640850556000,"id":7,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640850556229,"type":"INSERT"}
MySQL修改数据:
mysql> update user_info set name = 'jason' where id = '1';
Kafka命令行客户端的输出如下:
{"data":[{"id":"1","name":"jason","sex":"male"}],"database":"gmall-2021","es":1640850945000,"id":8,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":[{"name":"szc"}],"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640850945209,"type":"UPDATE"}
MySQL删除数据:
mysql> delete from user_info where id = '1';
Query OK, 1 row affected (0.00 sec)
Kafka命令行客户端的输出如下:
{"data":[{"id":"1","name":"jason","sex":"male"}],"database":"gmall-2021","es":1640851022000,"id":9,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640851022179,"type":"DELETE"}
TCP实时消费
首先要把canal/conf/canal.properties中的canal.serverMode改为tcp,再重启Canal服务器:
[root@scentos canal]# bin/stop.sh && bin/startup.sh
然后新建java工程,在pom.xml中添加依赖项:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
com.alibaba.otter的版本和Canal服务器的版本一致。然后新建CanalTest类,编写以下java代码:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalTest {
public static void main(String[] args) throws Exception {
CanalConnector connector = CanalConnectors
.newSingleConnector(new InetSocketAddress("scentos", 11111),
"example", "", "");
// 连接Canal服务器,参数列表:Canal服务器地址,要连接的实例,实例用户名和密码(一般没有)
while (true) {
connector.connect(); // 连接Canal服务器
connector.subscribe("gmall-2021.*"); // 订阅gmall-2021数据库下的所有表
Message message = connector.get(1000); // 尝试拿1000条数据
List<CanalEntry.Entry> entries = message.getEntries(); // 获取实际拿到的数据列表
if (entries.size() <= 0) {
System.out.println("No data now.");
Thread.sleep(1000);
} else {
for (CanalEntry.Entry entry : entries) {
String tableName = entry.getHeader().getTableName(); // 表名
CanalEntry.EntryType entryType = entry.getEntryType(); // 获取数据类型
if (entryType.equals(CanalEntry.EntryType.ROWDATA)) { // 对ROWDATA类型的进行解析
ByteString storeValue = entry.getStoreValue(); // 获取原生数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 反序列化解析原生数据
CanalEntry.EventType eventType = rowChange.getEventType(); // 事件类型
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 遍历所有原生数据
JSONObject beforeData = new JSONObject(); // 收集before数据列
for (CanalEntry.Column beforeColumn : rowData.getBeforeColumnsList()) {
beforeData.put(beforeColumn.getName(), beforeColumn.getValue());
}
JSONObject afterData = new JSONObject(); // 收集after数据列
for (CanalEntry.Column afterColumn : rowData.getAfterColumnsList()) {
afterData.put(afterColumn.getName(), afterColumn.getValue());
}
System.out.println("表名:" + tableName + ", 事件类型:" + eventType
+ "\n旧数据:" + beforeData + ".\n新数据:" + afterData);
}
}
}
}
}
}
}
运行结果如下:
更多推荐
所有评论(0)