一、Kafka mirror基本概念:维护现有的kafka集群,将源集群的消息数据同步到镜像集群。

       Kafka mirror的工作过程是创建一个mirror_consumer,从源集群中需要同步的主题消费消息数据,再通过创建的mirror_producer将mirror_consumer消费到的消息push到镜像集群。下图摘自《Apache Kafka源码剖析》。

二、Kafka mirror的实验过程

1.首先需要在本地实现两个kafka代理,以代表一个源集群和一个镜像集群。具体镜像的实现过程是,创建一个mirror-consumer来poll源集群的消息,然后通过创建的mirror-producer将消费到的消息push到镜像集群,从而实现源集群的数据同步到镜像集群。

2.创建两个kafka代理

       在不同目录下创建kafka代理,下载的kafka版本为0.10.2.1,zookeeper的默认端口号为2181,kafka的默认端口号为9092。所以需要将创建的另一个kafka代理中,目录下的config/zookeeper.properties中的端口号改为2182,另外需要修改集群数据的存放目录:          

 

zookeeper.connect=localhost:2182

dataDir=/tmp/zookeeper2

      将config/server.properties中的zookeeper的端口号改为2182,将kafka的端口号改为9093,将数据日志的目录修改为/tmp/kafka-logs2:

zookeeper.connect=localhost:2182

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs2

3.创建mirror-consumer和mirror-producer的配置文件

       在创建的mirror-consumer的配置文件中写入以下内容:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

#对所有topic的topic partition求并集,基于consumer进行RoundRobin轮询

bootstrap.servers=localhost:9092

#指定源 Kafka 集群的代理地址表列

group.id=mirror

#消费组名

       在创建的mirror-producer的配置文件中写入以下内容:

bootstrap.servers=localhost:9093

#指定目标集群的代理地址列表

producer.type=sync

acks=all

4.在源kafka集群目录下进行数据同步

       首先需要在源集群创建topic,名为my-topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

       然后进行数据同步的工作,其中whitelist为my-topic,是将其列为白名单:

bin/kafka-mirror-maker.sh --consumer.config ./config/mirror-consumer.properties --producer.config ./config/mirror-producer.properties --whitelist my-topic

       另开一个终端,对my-topic进行生产消息,运行了下面指令后,就可以输入一些内容:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

       进入源集群和镜像集群的kafka日志目录,打开/tmp/kafka-logs和/tmp/kafka-logs2下的log文件,可以看到每当生产一些内容后,两个日志文件中的内容都会得到生产的内容。

       于是集群的数据同步实验完成了。

三、Kafka mirror测试调研

       正常的消息同步过程是,源集群有需要同步的指定的topic,而镜像集群一般没有同名的topic,所以在消息同步过程中,默认会在镜像集群中创建同名topic,并且partition的数量会默认相同,所以topic中partition的消息在源集群和镜像集群能正确映射。

       当镜像集群中已经存在同名topic,同步过程会是怎样的呢?

1.先在镜像集群创建topic1,partition数量为3,然后在源集群创建topic1,partition数量为1。

       同步了一部分消息后,在镜像集群中删除了topic的一句内容,之后的同步过程中,不会将缺少的再补全添加,只会添加现在生产的。

       关闭了源集群与镜像集群的连接,生产源集群中topic的消息,然后打开连接后,堆积的消息会自动更新至mirror。

       直接通过镜像集群中的生产者,在镜像集群中的topic中加入内容,成功加入,源集群的生产者正常生产正常添加至源topic,镜像同步正常,但镜像集群中生产的消息不会反向同步。

2.先在源集群创建topic2,partition数量为1,然后在镜像集群创建同topic2,partition数量为3。

       通过镜像集群的producer生产消息给镜像topic2,不会反向同步,镜像producer生产到镜像topic的分区和源producer同步消息到镜像topic2的分区的存储顺序均为2->1->0->2...,但起始不同,并且存储顺序互不影响。

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐