kafka日常运维工作
一、创建/删除/重建topic1、创建:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topicTOPIC_NAME2、删除:./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --t
一、创建/删除/重建topic
1、创建:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic TOPIC_NAME
2、删除:./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TOPIC_NAME
3、查看状态和分区负载详情:./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TOPIC_NAME
二、设置过期时间
以下例子为保存7天
处理方法:
1、设置:./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --alter --entity-name TOPIC_NAME --add-config retention.ms=604800000
2、删除:./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --alter --entity-name TOPIC_NAME --delete-config retention.ms
三、删除某个offset之前的数据
说明:需求是删除某个offset之前的数据;一般来说,直接重建topic比较快,但是不排除不能重建topic的情况
处理方法:
1、创建json文件,以下例子是删除topic:test_product,分区:0,offset:10之前的数据,不包含offset 10,改操作不会重置offset
示例:{"partitions":[{"topic": "test_product", "partition": 0,"offset": 10}],"version":1}
2、执行命令:./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file cleanup.json
四、kafka磁盘报警
处理方法:
1、查看哪个topic数据量比较大,进入kafka数据目录,执行命令查看分区数大于1G的:du -sh * |grep G (注意:不一定是topic数据量大,也有可能是日志之类)
2、查看数据量最大的topic是否设置过期时间,如果未设置,和业务部门确认是否可以设置;如果已设置,是否需要缩短时间或者重建topic
3、如果以上方法都无法解决,让业务部门提单扩容磁盘。
五、kafka_exporter报警
处理方法:一般情况是CPU使用率过高导致拉取不到监控数据,也有可能是网络问题;查看监控面板,如果CPU使用率突然升高需要确认,如果CPU持续较高需要升级配置。
六、重置消费者offset
说明:有些场景可能希望修改消费者消费到的offset位置,以达到重新消费,或者跳过一部分消息的目的
处理方法:
1、将指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。
注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
2、将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-earliest --topic TOPIC_NAME(从头消费)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-latest --topic TOPIC_NAME(从最新消费)
七、查询topic的offset的范围
说明:一般很少用到,可以直接在kafdrop上面看,如果没有kafdrop的时候使用以下方法查询
处理方法:
查询offset的最小值(理解为消费者的offset)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -2
查询offset的最大值(理解为生产者的offset)
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -1
八、导出某个topic的数据
说明:注意topic数据量大小和磁盘大小,最好不要重定向到根分区;如果需要下载先压缩再下载,压缩比很高
处理方法:从头开始消费重定向到文件
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_NAME --from-beginning > TOPIC_NAME
九、增加topic分区数(只能加不能减)
# ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 12 --topic ec-applog-err
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# 这里给了一个警告 : 如果为具有关键字的主题增加了分区,则分区逻辑或消息的顺序将受到影响。
# 如果我们的业务场景对消息的顺序有着严格的要求, 一定要谨慎添加分区! 建议将之前所有的消息全部消费完才执行添加分区操作
十、控制台生产者/消费者
生产者:./bin/kafka-console-producer--broker-list localhost:9092 --topic test1
消费者:./bin/kafka-console-consumer--zookeeper localhost:2181 --topic test1 --from-beginning
十一、停止/启动服务
启动:./bin/kafka-server-start.sh -daemon config/server.properties
停止:./bin/kafka-server-stop.sh
十二、kafka鉴权配置
配置文件示例server.properties
|
配置文件示例kafka_server_jaas.conf
|
修改启动脚本,添加bin/kafka-server-start.sh
export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf
授权脚本
|
更多推荐
所有评论(0)