业务需求,抽取mysql数据到kafka,然后到es或hbase。相比canal,maxwell相对简单。

版本1.14.4

1、前提

1)mysql配置server_id,开启row格式的binlog

2)kafka配置listeners最好使用ip,否则出现产生数据到kafka或到kafka后无法消费情况

listeners=PLAINTEXT://192.168.104.104:9092

2、mysql授权

授权涉及maxwell进程访问业务库数据和maxwell数据
grant all privileges on maxwellali.* to 'maxwell'@'%' identified by '123456';
grant select,repliction client,replication slave on ali.* to 'maxwell'@'%';
flush privileges;

3、解压

tar zxf maxwell-1.14.4.tar.gz -C /usr/local/
ln -s /usr/local/maxwell-1.14.4 /usr/local/maxwell

4、maxwell配置文件,/usr/local/maxwell/config.properties.ali

# tl;dr config
daemon=true
log_level=info
client.id=10

# mysql login info
host=192.168.104.101
user=maxwell
password=123456
port=3306
schema_database=maxwellali
replica_server_id=2

# product
include_dbs=ali
producer=kafka
kafka.bootstrap.servers=192.168.104.104:9092

http_port=9001
http_diagnostic=true
http_path_prefix=/
metrics_type=jmx|http|datadog

5、log4j

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Properties>
<Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout pattern="%d{HH:mm:ss,SSS} %-5level %logger{1} - %msg%n"/>
</Console>
<RollingFile name="rolling_file"
fileName="/tmp/maxwell/server.log"
filePattern="/tmp/maxwell/server_%d{yyyy-MM-dd}.log">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout>
<Pattern>${pattern}</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<!--<SizeBasedTriggeringPolicy size="1 KB"/>-->
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="rolling_file"></AppenderRef>
</Root>
<Logger name="snaq.db.ConnectionPool.RawMaxwellConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="snaq.db.ConnectionPool.MaxwellConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="snaq.db.ConnectionPool.ReplicationConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>

6、启动

nohup /usr/local/maxwell/bin/maxwell --conf /usr/local/maxwell/config.properties.ali &
# 第一次启动可指定binlog postition
nohup /usr/local/maxwell/bin/maxwell --conf /usr/local/maxwell/config.properties.ali --init_position=mysql-bin.000004:4:0 &

说明:

1、抽取多个库可添加include_dbs,逗号分隔,这个时候所有db数据都存入一个topic。如果需要不同业务存入到不同的topic,则使用新的配置文件,启动新的maxwell进程即可。

2、maxwell模拟mysql slave,所以多个maxwell进程时,每个进程的client.id及replica_server_id保证不同

3、binlog如果断了,可能会maxwell失败,最好设置mysql的expire_logs_days=0

4、init_position参数mysql-bin.000004:4:0分别为:binlog文件名、position、mawell hearbeat

5、目标使用kafka,maxwell日志无法看到dml(insert update delete)信息。

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐