基于canal和kafka同步,实现binlog同步ElasticSearch
文章目录前言elasticsearch 安装canal安装canal-adapter 安装及配置mysql 安装zk及kafaka安装查看效果注意事项前言中间件版本elasticsearch7.5.2canal1.1.4client-adapter1.1.5-alpha-1zookeeper3.4.13kafka2.6.0mysql5.7.31elasticsearch 安装{"settings"
·
前言
中间件 | 版本 |
---|---|
elasticsearch | 7.5.2 |
canal | 1.1.4 |
client-adapter | 1.1.5-alpha-1 |
zookeeper | 3.4.13 |
kafka | 2.6.0 |
mysql | 5.7.31 |
elasticsearch 安装
docker-compose.yaml
脚本
version: '2.2'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
container_name: es7_01
environment:
- cluster.name=dockeres
- node.name=es7_01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
#volumes:
# - ./data:/usr/share/elasticsearch/data
# - ./plugins:/usr/share/elasticsearch/plugins
ports:
- 9200:9200
networks:
- es7net
networks:
es7net:
driver: bridge
{"settings":{"number_of_shards":"12","number_of_replicas":"1","refresh_interval":"5s","similarity":{"similarity_halfh":{"type":"BM25","k1":1.2,"b":0.2}}},"mappings":{"properties":{"id":{"type":"long","index":true},"order_id":{"type":"keyword","index":true},"amount":{"type":"long","index":true},"create_time":{"type":"date","index":true}}}}
canal安装
可以查看前一个章节的安装方法。基于canal和kafka同步,实现binlog同步mysql
canal-adapter 安装及配置
- github地址 canal-adapter
主要修改有两处
order.yml
dataSourceKey: defaultDS
destination: test
groupId: test
esMapping:
_index: order
_id: _id
sql: "select concat(o.id,'_',o.order_id) as _id,o.id, o.order_id, o.amount from t_order o"
etlCondition: "where o.create_time>={}"
commitBatch: 3000
application.yml
server:
port: 8081
logging:
level:
org.springframework: INFO
com.alibaba.otter.canal.client.adapter.hbase: DEBUG
com.alibaba.otter.canal.client.adapter.es: DEBUG
com.alibaba.otter.canal.client.adapter.rdb: DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mqServers: 192.168.56.120:9092 #or rocketmq
flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
mode: kafka # tcp kafka rocketMQ
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.56.120:33065/test?useUnicode=true
username: root
password: root
canalAdapters:
- instance: test # canal instance Name or mq topic name
groups:
- groupId: test
outerAdapters:
- name: es7
hosts: #{ip}:9200
properties:
mode: rest
cluster.name: ddmc-es-re
mysql 安装
查看上篇文章
zk及kafaka安装
参考上篇文章,这里补充一点,如何查看版本号
- kafka 查看版本号
docker exec a2cb6ca0bb9f find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka_2.13-2.6.0
这表示Scala 的版本为2.13,kafka的版本是2.6.0;
- zk查看版本号
docker exec bb6e6bdec415 pwd
/opt/zookeeper-3.4.13
查看效果
mysql中添加更新数据、即可看到es中数据更新
注意事项
- com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService#delete
翻到源代码时,复杂id会有问题,这个打算在以后博客中给出解决方案。
更多推荐
已为社区贡献4条内容
所有评论(0)