目录

kafka 迁移

一. 同一个集群内broker 迁移

原理

应用场景

实践

step1

step2

step3

二. 集群迁移

原理

应用场景

方案一:MirrorMaker

step1

step2

方案二: MirrorMaker2


kafka 迁移

kafka的迁移分为一个集群内数据迁移或者是不同集群内数据迁移,

一. 同一个集群内broker 迁移

原理

相当于在一个集群内,将数据进行了快速copy的机制.不过是非常快的.

应用场景

broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作.

实践

step1

将需要新添加的broker 列表一并添加到kafka的集群中,这里就省略了, 怎么让kafka 集群添加新节点相对于比较简单,各种cdh管理工具都能做,网上也可以搜索.

step2

找到 集群中broker 编号和每条服务的对应关系

 

准备下线的机器

1001 datanode1

1002 datanode2

1003 datanode3

准备扩容并往上迁移的机器

1006 datanode4

1007 datanode5

1008 datanode6

 

 

step3

准备迁移脚本


 

cd $KAFKA_HOME/bin

mkdir data

cat> data/topics-to-move.json

...

将你需要的topic的名称按下列各式方进去,注意,可以放topic .

{"topics":

    [

{"topic":"lara_test"}],

  "version": 1

}

 

执行一下的命令生成 执行计划

./kafka-reassign-partitions.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 \

--topics-to-move-json-file data/topics-to-move.json \

--broker-list "1006,1007,1008" --generate

 

找到 proposed的下面生成的json文件,即为迁移的计划,将文件copy再次覆盖topics-to-move.json

cat> data/topics-to-move.json

...

 

执行迁移任务:

./kafka-reassign-partitions.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 \

--reassignment-json-file data/topics-to-move.json --execute

 

这时候需要时刻关注是否数据迁移成功,查看topic的总数

 

.kafka-reassign-partitions --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 --reassignment-json-file topic-reassignment.json --verify

 

查看topic 所在哪些broker上.是否达到了目的 

 

./kafka-topics.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 --describe --topic lara_test

 

 

 

二. 集群迁移

原理

 该工具从源集群中消费并生产到目标群集。这种镜像的常见用例是在另一个数据中心提供副本。其实现原理是通过从源集群中消费消息,然后将消息生产到目标集群中,也就是普通的生产和消费消息。

应用场景

主要用于kafka 集群的变更. 将数据同步等操作. 相当于是实现了双打.等各项数据消费端在零误差的对接好了后,可以停掉就集群啦

方案一:MirrorMaker

step1

tips

(1)kafka-mirror-maker.sh --help 查看

  • abort.on.send.failure 默认为true,决定生产者写入失败时的处理机制
  • consumer.config 用于指定消费者的配置文件,配置文件里有两个必填的参数:bootstrap.servers 和 group.id
  • consumer.rebalance.listener 指定再均衡监听器
  • message.handler 指定消息的处理器。这个处理器会在消费者消费到消息之后且在生产者发送消息之前被调用
  • message.handler.args 指定消息处理器的参数,同message.handler一起使用
  • num.streams 指定消费线程的数量
  • offset.commit.interval.ms 指定消费位移提交间隔
  • producer.config 用于指定生产者的配置文件,配置文件里唯一必填的参数是 bootstrap.servers
  • rebalance.listener.args 指定再均衡监听器的参数,同consumer.rebalance.listener一起使用
  • whitelist 指定需要复制的源集群中的主题。这个参数可以指定一个正则表达式,比如a|b表示复制源集群中主题a和主题b的数据。为了方便使用,这里也允许将“|”替换为“,”

(2)服务端必须开启 auto.create.topics.enable=true


step2

cd $KAFKA_HOME/config

vim consumer.properties
 

bootstrap.servers=kafka1:9092

group.id=groupIdMirror

client.id=sourceMirror

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAsignor  # 这个挺重要,关系到什么机制去Round

# offset does not exist any more on the server: latest, earliest, none 

auto.offset.reset=latest

 

vim producer.properties

 

bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly

#partitioner.class=

# (async) or (sync)

producer.type=sync

# th none, gzip, snappy, lz4, respectively

compression.codec=none

# message encoder

serializer.class=kafka.serializer.DefaultEncoder

 

开始迁移

./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties \

--producer.config ../config/producer.properties \

--num.streams 6 --whitelist 'lara_test'

 

 

tips

  • num.streams 配置好合理的并行度.
  • producer.properties 中原则上不用管配置,但是遇到特殊机器有其他重要任务在running的时候,最好设置相关资源和批次的参数
  • offset是否接着相同的问题,group是否相同,我这里清测了,肯定不同, 除非用新版本可以考虑一下.
  • auto.offset.reset=latest是数据实时同步.
  • abort.on.send.failure=true ,生产者写入失败时,多次重试依然无法完成消息写入,则会直接停止kafka mirror maker进程,反之,这跳过当前批次,进入效益了

 

方案二: MirrorMaker2

基于kafka2.4 后可以实现,看起来相当的优秀,只是个人没有实操过,这里提供一个链接,欢迎感兴趣的同学亲测哦

https://baijiahao.baidu.com/s?id=1688735874830868124&wfr=spider&for=pc

https://blog.csdn.net/u010003835/article/details/86611070

 

提供几个基本命令有可能你用得着.

 

./kafka-consumer-groups.sh --all-topics --bootstrap-server localhost:9092 --list

查看lag值

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-78333

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group local-test

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐