kafka 迁移最佳实践
kafka 迁移kafka的迁移分为一个集群内数据迁移或者是不同集群内数据迁移,一. 同一个集群内broker 迁移原理相当于在一个集群内,将数据进行了快速copy的机制.不过是非常快的.应用场景broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作.实践step1将需要新添加的broker 列表一并添加到kafka的集群中,这里就省略了, 怎
目录
kafka 迁移
kafka的迁移分为一个集群内数据迁移或者是不同集群内数据迁移,
一. 同一个集群内broker 迁移
原理
相当于在一个集群内,将数据进行了快速copy的机制.不过是非常快的.
应用场景
broker 迁移 主要使用的场景是broker 上线,下线,或者扩容等.基于同一套zookeeper的操作.
实践
step1
将需要新添加的broker 列表一并添加到kafka的集群中,这里就省略了, 怎么让kafka 集群添加新节点相对于比较简单,各种cdh管理工具都能做,网上也可以搜索.
step2
找到 集群中broker 编号和每条服务的对应关系
准备下线的机器
1001 datanode1
1002 datanode2
1003 datanode3
准备扩容并往上迁移的机器
1006 datanode4
1007 datanode5
1008 datanode6
step3
准备迁移脚本
cd $KAFKA_HOME/bin
mkdir data
cat> data/topics-to-move.json
...
将你需要的topic的名称按下列各式方进去,注意,可以放topic .
{"topics":
[
{"topic":"lara_test"}],
"version": 1
}
执行一下的命令生成 执行计划
./kafka-reassign-partitions.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 \
--topics-to-move-json-file data/topics-to-move.json \
--broker-list "1006,1007,1008" --generate
找到 proposed的下面生成的json文件,即为迁移的计划,将文件copy再次覆盖topics-to-move.json
cat> data/topics-to-move.json
...
执行迁移任务:
./kafka-reassign-partitions.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 \
--reassignment-json-file data/topics-to-move.json --execute
这时候需要时刻关注是否数据迁移成功,查看topic的总数
.kafka-reassign-partitions --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 --reassignment-json-file topic-reassignment.json --verify
查看topic 所在哪些broker上.是否达到了目的
./kafka-topics.sh --zookeeper datanode1:2181 datanode2:2181 datanode3:2181 --describe --topic lara_test
二. 集群迁移
原理
该工具从源集群中消费并生产到目标群集。这种镜像的常见用例是在另一个数据中心提供副本。其实现原理是通过从源集群中消费消息,然后将消息生产到目标集群中,也就是普通的生产和消费消息。
应用场景
主要用于kafka 集群的变更. 将数据同步等操作. 相当于是实现了双打.等各项数据消费端在零误差的对接好了后,可以停掉就集群啦
方案一:MirrorMaker
step1
tips
(1)kafka-mirror-maker.sh --help 查看
- abort.on.send.failure 默认为true,决定生产者写入失败时的处理机制
- consumer.config 用于指定消费者的配置文件,配置文件里有两个必填的参数:bootstrap.servers 和 group.id
- consumer.rebalance.listener 指定再均衡监听器
- message.handler 指定消息的处理器。这个处理器会在消费者消费到消息之后且在生产者发送消息之前被调用
- message.handler.args 指定消息处理器的参数,同message.handler一起使用
- num.streams 指定消费线程的数量
- offset.commit.interval.ms 指定消费位移提交间隔
- producer.config 用于指定生产者的配置文件,配置文件里唯一必填的参数是 bootstrap.servers
- rebalance.listener.args 指定再均衡监听器的参数,同consumer.rebalance.listener一起使用
- whitelist 指定需要复制的源集群中的主题。这个参数可以指定一个正则表达式,比如a|b表示复制源集群中主题a和主题b的数据。为了方便使用,这里也允许将“|”替换为“,”
(2)服务端必须开启 auto.create.topics.enable=true
step2
cd $KAFKA_HOME/config
vim consumer.properties
bootstrap.servers=kafka1:9092
group.id=groupIdMirror
client.id=sourceMirror
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAsignor # 这个挺重要,关系到什么机制去Round
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=latest
vim producer.properties
bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# (async) or (sync)
producer.type=sync
# th none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder
开始迁移
./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties \
--producer.config ../config/producer.properties \
--num.streams 6 --whitelist 'lara_test'
tips
- num.streams 配置好合理的并行度.
- producer.properties 中原则上不用管配置,但是遇到特殊机器有其他重要任务在running的时候,最好设置相关资源和批次的参数
- offset是否接着相同的问题,group是否相同,我这里清测了,肯定不同, 除非用新版本可以考虑一下.
- auto.offset.reset=latest是数据实时同步.
- abort.on.send.failure=true ,生产者写入失败时,多次重试依然无法完成消息写入,则会直接停止kafka mirror maker进程,反之,这跳过当前批次,进入效益了
方案二: MirrorMaker2
基于kafka2.4 后可以实现,看起来相当的优秀,只是个人没有实操过,这里提供一个链接,欢迎感兴趣的同学亲测哦
https://baijiahao.baidu.com/s?id=1688735874830868124&wfr=spider&for=pc
https://blog.csdn.net/u010003835/article/details/86611070
提供几个基本命令有可能你用得着.
./kafka-consumer-groups.sh --all-topics --bootstrap-server localhost:9092 --list
查看lag值
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-78333
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group local-test
更多推荐
所有评论(0)