本文采用Maxwell来实现实时解析mysql的binlog日志发送至kafka
1、开启mysql binlog
    环境中mysql是docker容器,所以需要进入容器修改mysql配置.
    docker exec -it ef07dab4da9d bash
    然后进入/etc/mysql/ 修改my.cnf
    增加如下内容
    server-id=1
    log-bin=master
    binlog_format=row
    重启docker容器 ,确认binlog是否开启
    show variables like '%log_bin%';
    log_bin属性值为ON,则以开启成功
2、配置并启动Maxwell
Maxwell的 github地址为 https://github.com/zendesk/maxwell。可去下载最新版。
wget https://github.com/zendesk/maxwell/releases/download/v1.10.7/maxwell-1.10.7.tar.gz 
tar -zxf maxwell-1.10.7.tar.gz 
bin/maxwell --user='root' --password='123456'--host='10.99.22.24' --producer=kafka --kafka.bootstrap.servers=bigdata001:9092
host为 mysql地址,bigdata001是我机器地址
3、启动kafka

我这里kafka是已经安装好的,直接启动就可以了,如果没有可以按照官网的步骤安装kafka,启动kafka之前先要启动 zookeeper
bin/zkServer.sh start
然后开启kafka
bin/kafka-server-start.sh config/server.properties
创建topic 
bin/kafka-topics.sh--create --zookeeper bigdata001:2181 --replication-factor 1 --partitions 1 --topic maxwell 
启动producer
bin/kafka-console-producer.sh--broker-list bigdata001:9092 --topic maxwell
启动consumer
 bin/kafka-console-consumer.sh--zookeeper bigdata001:2181 --topic maxwell --from-beginning
4、测试

改变数据库可看到如下效果
这里写图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐