在Canal消息订阅端 binlog-server-new项目中,完成了接收canal消息,并发送到kafka

我们需要写一个Canal客户端,CanalClient,来监听处理数据。这里的CanalClient实现了Runnable接口

CanalBinaryListener是基于二进制的canal监听器,BinlogKafkaProducer实现CanalBinaryListener的onBinlog将接收到的消息发送到Kafka。

将BinlogKafkaProducer注册到CanalClient

  /**
    * 以Java 环境变量模式启动
    */
  def startServer(): Unit = {
    logger.info(s"启动服务 binlogServer...")

    val producerBrokerHost = SysEnvUtil.CANAL_KAFKA_HOST
    val topic = SysEnvUtil.CANAL_KAFKA_TOPIC

    val canalServerIp = SysEnvUtil.CANAL_SERVER_IP
    val canalServerPort = SysEnvUtil.CANAL_SERVER_PORT.toInt

    val destination = SysEnvUtil.CANAL_DESTINATION
    val username = SysEnvUtil.CANAL_USERNAME
    val password = SysEnvUtil.CANAL_PASSWORD

    val kafkaProducer = new BinlogKafkaProducer(producerBrokerHost, topic)
    kafkaProducer.init()


    val canalClient = new CanalClient(canalServerIp, canalServerPort, destination, username, password);
    canalClient.registerBinlogListener(kafkaProducer)

    val executorService = Executors.newFixedThreadPool(1)

    executorService.execute(canalClient)

    logger.info("启动服务 binlogService 成功...")


  }

启用一个线程池,运行CanalClient。run()中主要调用了work方法进行处理工作。
在初始化时,我们得到了一个SimpleCanalConnector,通过CanalConnector的getWithoutAck(BatchSize)方法,我们可以得到一个Message.

getWithoutAck(BatchSize)方法:

不指定 position 获取事件,该方法返回的条件:

尝试拿batchSize条记录,有多少取多少,不会阻塞等待

canal 会记住此 client 最新的position。

如果是第一次 fetch,则会从 canal 中保存的最老一条数据开始输出。

通过Message,我们可以得到唯一id,和具体的数据对象。对这些数据对象进行处理
,忽略事务开启、结束、query的binlog内容。然后处理数据对象,提交确认

    /**
     * 处理工作 work
     */
    private void work() {

        try {
            while (runing) {

                Message message = connector.getWithoutAck(BatchSize);

                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(Sleep);
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage(), e);
                    }

                } else {
                    if(logger.isDebugEnabled()) {
                        logger.debug("读取binlog日志 batchId: {}, size: {}, name: {}, offsets:{}", batchId, size,
                                message.getEntries().get(0).getHeader().getLogfileName(),
                                message.getEntries().get(0).getHeader().getLogfileOffset());
                    }
                    //处理消息
                    process(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }

        } catch (Exception e) {
            connector.disconnect();
            logger.error("[CanalClient] [run] " + e.getMessage(), e);
        } finally {
            reconnect();
        }
    }

当CanalClient注册了监听器的时候,调用监听方法,异步回调模式发送数据对象到kafka.

    /**
     * 异步回调模式发送消息
     *
     * @param topic
     * @param message
     */
    public void send(String topic, byte[] message) {
        producer.send(new ProducerRecord<>(topic, message), (metadata, e) -> {
            if (e != null) {
                logger.error("[" + getClass().getSimpleName() + "]: 消息发送失败,cause: " + e.getMessage(), e);
            }
            logger.info("[binlog]:消息发送成功,topic:{}, offset:{}, partition:{}, time:{}",
                    metadata.topic(), metadata.offset(), metadata.partition(), metadata.timestamp());

        });
    }


    @Override
    public void onBinlog(CanalEntry.Entry entry) {
        send(topic, entry.toByteArray());
    }
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐