抽取mysql binlog--maxwell部署
业务需求,抽取mysql数据到kafka,然后到es或hbase。相比canal,maxwell相对简单。版本1.14.41、前提1)mysql配置server_id,开启row格式的binlog2)kafka配置listeners最好使用ip,否则出现产生数据到kafka或到kafka后无法消费情况listeners=PLAINTEXT://192.168.104.104:9...
业务需求,抽取mysql数据到kafka,然后到es或hbase。相比canal,maxwell相对简单。
版本1.14.4
1、前提
1)mysql配置server_id,开启row格式的binlog
2)kafka配置listeners最好使用ip,否则出现产生数据到kafka或到kafka后无法消费情况
listeners=PLAINTEXT://192.168.104.104:9092
2、mysql授权
授权涉及maxwell进程访问业务库数据和maxwell数据
grant all privileges on maxwellali.* to 'maxwell'@'%' identified by '123456';
grant select,repliction client,replication slave on ali.* to 'maxwell'@'%';
flush privileges;
3、解压
tar zxf maxwell-1.14.4.tar.gz -C /usr/local/
ln -s /usr/local/maxwell-1.14.4 /usr/local/maxwell
4、maxwell配置文件,/usr/local/maxwell/config.properties.ali
# tl;dr config
daemon=true
log_level=info
client.id=10
# mysql login info
host=192.168.104.101
user=maxwell
password=123456
port=3306
schema_database=maxwellali
replica_server_id=2
# product
include_dbs=ali
producer=kafka
kafka.bootstrap.servers=192.168.104.104:9092
http_port=9001
http_diagnostic=true
http_path_prefix=/
metrics_type=jmx|http|datadog
5、log4j
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Properties>
<Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_ERR">
<PatternLayout pattern="%d{HH:mm:ss,SSS} %-5level %logger{1} - %msg%n"/>
</Console>
<RollingFile name="rolling_file"
fileName="/tmp/maxwell/server.log"
filePattern="/tmp/maxwell/server_%d{yyyy-MM-dd}.log">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout>
<Pattern>${pattern}</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<!--<SizeBasedTriggeringPolicy size="1 KB"/>-->
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="rolling_file"></AppenderRef>
</Root>
<Logger name="snaq.db.ConnectionPool.RawMaxwellConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="snaq.db.ConnectionPool.MaxwellConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="snaq.db.ConnectionPool.ReplicationConnectionPool" level="off" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>
6、启动
nohup /usr/local/maxwell/bin/maxwell --conf /usr/local/maxwell/config.properties.ali &
# 第一次启动可指定binlog postition
nohup /usr/local/maxwell/bin/maxwell --conf /usr/local/maxwell/config.properties.ali --init_position=mysql-bin.000004:4:0 &
说明:
1、抽取多个库可添加include_dbs,逗号分隔,这个时候所有db数据都存入一个topic。如果需要不同业务存入到不同的topic,则使用新的配置文件,启动新的maxwell进程即可。
2、maxwell模拟mysql slave,所以多个maxwell进程时,每个进程的client.id及replica_server_id保证不同
3、binlog如果断了,可能会maxwell失败,最好设置mysql的expire_logs_days=0
4、init_position参数mysql-bin.000004:4:0分别为:binlog文件名、position、mawell hearbeat
5、目标使用kafka,maxwell日志无法看到dml(insert update delete)信息。
更多推荐
所有评论(0)