二、canal推送数据到kafka中
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:kafka: https://github.com/apache/kafkaRocketMQ : https://github.com/apache/rocketmq配置推送kafka:1.修改instance 配置文件 vi conf/example/insta
·
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
- kafka: https://github.com/apache/kafka
- RocketMQ : https://github.com/apache/rocketmq
配置推送kafka:
1.修改instance 配置文件 vi conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config 推送的kafka的topic名称
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
2.修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# canal server链接zookeeper集群的链接信息
canal.zkServers = 127.0.0.1:2181
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
##################################################
######### Kafka 相关配置 #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
3.启动canal
kafak如果未开启创建topic的话,要先进行创建使用的topic,然后进行观测kafka的消费情况
4.如果是配置了instance直接推送数据到kafka,那么此实例不能再被其他的客户端连接,进行订阅,强行连接那么客户端会报如下错
2021-09-18 10:53:26 [ERROR][org.springframework.boot.SpringApplication][826]-> Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cannalClient' defined in file [E:\yyy\examproject\canal\target\classes\com\example\canal\deploy\CannalClient.class]: Invocation of init method failed; nested exception is com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
at com.example.canal.CanalApplication.main(CanalApplication.java:10)
Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:198)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:115)
at com.example.canal.deploy.CannalClient.afterPropertiesSet(CannalClient.java:44)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1855)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1792)
... 16 common frames omitted
Caused by: java.net.ConnectException: Connection refused: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:150)
... 20 common frames omitted
更多推荐
已为社区贡献1条内容
所有评论(0)