介绍

类似于Maxwell,canal也是一个实时数据库变更订阅中间件,它是阿里用java开发的基于数据库增量日志解析、提供增量数据订阅消费的中间件。目前,Canal主要支持了MySQL的binlog解析,并利用CanalClient进行处理以获得相关数据,其工作原理也是将自己伪装成MySQL主从复制中的从服务器,参见Maxwell相关笔记

准备工作

打开MySQL的Binlog

编辑MySQL配置文件my.ini,开启binlog,且格式为row:

server-id=1
log-bin=mysql-bin
binlog-format=Row

而后重启MySQL。

创建测试数据库及表

创建数据库gmall-2021:
在这里插入图片描述
然后创建数据表:

mysql> use gmall-2021;

mysql> CREATE TABLE user_info(
   -> `id` VARCHAR(255),
   -> `name` VARCHAR(255),
   -> `sex` VARCHAR(255)
   -> );

Canal安装配置

下载jar包

下载地址:https://github.com/alibaba/canal/releases,我选择的是1.1.2版本,上传到虚拟机上,解压到canal目录中:

[root@scentos szc]# mkdir canal
[root@scentos szc]# tar -zxvf canal.deployer-1.1.2.tar.gz -C canal/

配置

要修改的文件有canal/conf/canal.properties和conf/example/instance.properties

canal/conf/canal.properties:

[root@scentos szc]# cd canal/
[root@scentos canal]# vim conf/canal.properties

要修改的关键内容如下:

.....
canal.ip = scentos
.....
canal.zkServers = scentos:2181
.....
# tcp, kafka, RocketMQ
canal.serverMode = kafka
.....
canal.destinations = example
....
canal.mq.servers = scentos:9092
....

首先定义canal所在服务器的ip地址,和要连接的Zookeeper服务器的位置,再把Canal的serverMode从默认的tcp改为kafka,最后将mq.servers修改成Kafka服务器的地址。

conf/example/instance.properties

注意我们这里的目标名为example,因此,接下来对conf/example/instance.properties进行修改:

[root@scentos canal]# vim conf/example/instance.properties

要修改的关键内容如下:

....
canal.instance.master.address=192.168.31.60:3306
....
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
....
canal.mq.topic=example_canal
....
# hash partition config
canal.mq.partitionsNum=1
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
....

首先修改目标MySQL的位置(canal.instance.master.address),再修改连接MySQL的用户名和密码。因为我们这里是向Kafka输出数据,因此要指定主题和分区数(canal.mq.partitionsNum),如果分区数>1,则要通过canal.mq.partitionHash字段指定参与分区的数据表字段。

启动

事先启动Zookeeper和Kafka,而后启动Canal:

[root@scentos canal]# bin/startup.sh

而后jps一下,看到CanalLauncher就说明启动成功:

[root@scentos canal]# jps
6470 CanalLauncher
4136 Kafka
4187 QuorumPeerMain
6508 Jps

使用

Kafka实时消费

启动Kafka的命令行客户端:

[root@scentos bin]# ./kafka-console-consumer.sh --bootstrap-server scentos:9092 --topic example_canal

向MySQL数据表里插入一条数据:

mysql> insert into user_info values('1', 'szc', 'male');
Query OK, 1 row affected (0.00 sec)

会看到Kafka命令行客户端的输出如下:

{"data":[{"id":"1","name":"szc","sex":"male"}],"database":"gmall-2021","es":1640850556000,"id":7,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640850556229,"type":"INSERT"}

MySQL修改数据:

mysql> update user_info set name = 'jason' where id = '1';

Kafka命令行客户端的输出如下:

{"data":[{"id":"1","name":"jason","sex":"male"}],"database":"gmall-2021","es":1640850945000,"id":8,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":[{"name":"szc"}],"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640850945209,"type":"UPDATE"}

MySQL删除数据:

mysql> delete from user_info where id = '1';
Query OK, 1 row affected (0.00 sec)

Kafka命令行客户端的输出如下:

{"data":[{"id":"1","name":"jason","sex":"male"}],"database":"gmall-2021","es":1640851022000,"id":9,"isDdl":false,"mysqlType":{"id":"VARCHAR(255)","name":"VARCHAR(255)","sex":"VARCHAR(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1640851022179,"type":"DELETE"}

TCP实时消费

首先要把canal/conf/canal.properties中的canal.serverMode改为tcp,再重启Canal服务器:

[root@scentos canal]# bin/stop.sh && bin/startup.sh

然后新建java工程,在pom.xml中添加依赖项:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>

com.alibaba.otter的版本和Canal服务器的版本一致。然后新建CanalTest类,编写以下java代码:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalTest {
    public static void main(String[] args) throws Exception {
        CanalConnector connector = CanalConnectors
                .newSingleConnector(new InetSocketAddress("scentos", 11111),
                        "example", "", "");
        // 连接Canal服务器,参数列表:Canal服务器地址,要连接的实例,实例用户名和密码(一般没有)

        while (true) {
            connector.connect(); // 连接Canal服务器
            connector.subscribe("gmall-2021.*"); // 订阅gmall-2021数据库下的所有表

            Message message = connector.get(1000); // 尝试拿1000条数据
            List<CanalEntry.Entry> entries = message.getEntries(); // 获取实际拿到的数据列表
            if (entries.size() <= 0) {
                System.out.println("No data now.");
                Thread.sleep(1000);
            } else {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName(); // 表名

                    CanalEntry.EntryType entryType = entry.getEntryType(); // 获取数据类型
                    if (entryType.equals(CanalEntry.EntryType.ROWDATA)) { // 对ROWDATA类型的进行解析
                        ByteString storeValue = entry.getStoreValue(); // 获取原生数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 反序列化解析原生数据
                        CanalEntry.EventType eventType = rowChange.getEventType(); // 事件类型
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { // 遍历所有原生数据
                            JSONObject beforeData = new JSONObject(); // 收集before数据列
                            for (CanalEntry.Column beforeColumn : rowData.getBeforeColumnsList()) {
                                beforeData.put(beforeColumn.getName(), beforeColumn.getValue());
                            }

                            JSONObject afterData = new JSONObject(); // 收集after数据列
                            for (CanalEntry.Column afterColumn : rowData.getAfterColumnsList()) {
                                afterData.put(afterColumn.getName(), afterColumn.getValue());
                            }
                            System.out.println("表名:" + tableName + ", 事件类型:" + eventType
                                    + "\n旧数据:" + beforeData + ".\n新数据:" + afterData);
                        }
                    }
                }
            }
        }
    }
}

运行结果如下:
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐