接收canal消息发送到kafka
Canal消息订阅端http://pms.today36524.td/central-tool/binlog-server-newGithub:https://github.com/alibaba/canalhttps://github.com/alibaba/canal/wiki/简介在binlog-server-new项目中,完成了接收canal消息,并发送到kafka我们需要...
在Canal消息订阅端 binlog-server-new项目中,完成了接收canal消息,并发送到kafka
我们需要写一个Canal客户端,CanalClient,来监听处理数据。这里的CanalClient实现了Runnable接口
CanalBinaryListener是基于二进制的canal监听器,BinlogKafkaProducer实现CanalBinaryListener的onBinlog将接收到的消息发送到Kafka。
将BinlogKafkaProducer注册到CanalClient
/**
* 以Java 环境变量模式启动
*/
def startServer(): Unit = {
logger.info(s"启动服务 binlogServer...")
val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
val topic = SysEnvUtil.CANAL_KAFKA_TOPIC
val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt
val destination = SysEnvUtil.CANAL_DESTINATION
val username = SysEnvUtil.CANAL_USERNAME
val password = SysEnvUtil.CANAL_PASSWORD
val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
kafkaProducer.init()
val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
canalClient.registerBinlogListener(kafkaProducer)
val executorService = Executors.newFixedThreadPool(1)
executorService.execute(canalClient)
logger.info("启动服务 binlogService 成功...")
}
启用一个线程池,运行CanalClient。run()中主要调用了work方法进行处理工作。
在初始化时,我们得到了一个SimpleCanalConnector,通过CanalConnector的getWithoutAck(BatchSize)方法,我们可以得到一个Message.
getWithoutAck(BatchSize)方法:
不指定 position 获取事件,该方法返回的条件:
尝试拿batchSize条记录,有多少取多少,不会阻塞等待
canal 会记住此 client 最新的position。
如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。
通过Message,我们可以得到唯一id,和具体的数据对象。对这些数据对象进行处理
,忽略事务开启、结束、query的binlog内容。然后处理数据对象,提交确认
/**
* 处理工作 work
*/
private void work() {
try {
while (runing) {
Message message = connector.getWithoutAck(BatchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(Sleep);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} else {
if(logger.isDebugEnabled()) {
logger.debug("读取binlog日志 batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
message.getEntries().get(0).getHeader().getLogfileName(),
message.getEntries().get(0).getHeader().getLogfileOffset());
}
//处理消息
process(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
connector.disconnect();
logger.error("[CanalClient] [run] " + e.getMessage(), e);
} finally {
reconnect();
}
}
当CanalClient注册了监听器的时候,调用监听方法,异步回调模式发送数据对象到kafka.
/**
* 异步回调模式发送消息
*
* @param topic
* @param message
*/
public void send(String topic, byte[] message) {
producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
if (e != null) {
logger.error("[" + getClass().getSimpleName() + "]: 消息发送失败,cause: " + e.getMessage(), e);
}
logger.info("[binlog]:消息发送成功,topic:{}, offset:{}, partition:{}, time:{}",
metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp());
});
}
@Override
public void onBinlog(CanalEntry.Entry entry) {
send(topic, entry.toByteArray());
}
更多推荐
所有评论(0)