一、创建/删除/重建topic

1、创建:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic TOPIC_NAME

2、删除:./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic TOPIC_NAME

3、查看状态和分区负载详情:./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic TOPIC_NAME

二、设置过期时间

以下例子为保存7天

处理方法

1、设置:./bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --alter --entity-name TOPIC_NAME --add-config retention.ms=604800000

2、删除:./bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --alter --entity-name TOPIC_NAME --delete-config retention.ms

三、删除某个offset之前的数据

说明:需求是删除某个offset之前的数据;一般来说,直接重建topic比较快,但是不排除不能重建topic的情况

处理方法

1、创建json文件,以下例子是删除topic:test_product,分区:0,offset:10之前的数据,不包含offset 10,改操作不会重置offset

示例:{"partitions":[{"topic": "test_product", "partition": 0,"offset": 10}],"version":1}

2、执行命令:./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file cleanup.json

四、kafka磁盘报警

处理方法

1、查看哪个topic数据量比较大,进入kafka数据目录,执行命令查看分区数大于1G的:du -sh * |grep G (注意:不一定是topic数据量大,也有可能是日志之类)

2、查看数据量最大的topic是否设置过期时间,如果未设置,和业务部门确认是否可以设置;如果已设置,是否需要缩短时间或者重建topic

3、如果以上方法都无法解决,让业务部门提单扩容磁盘。

五、kafka_exporter报警

处理方法:一般情况是CPU使用率过高导致拉取不到监控数据,也有可能是网络问题;查看监控面板,如果CPU使用率突然升高需要确认,如果CPU持续较高需要升级配置。

六、重置消费者offset

说明:有些场景可能希望修改消费者消费到的offset位置,以达到重新消费,或者跳过一部分消息的目的

处理方法

1、将指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。

注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用。

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME

2、将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费。

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-earliest --topic TOPIC_NAME(从头消费)

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group GROUP_NAME --reset-offsets --execute --to-latest --topic TOPIC_NAME(从最新消费)

七、查询topic的offset的范围

说明:一般很少用到,可以直接在kafdrop上面看,如果没有kafdrop的时候使用以下方法查询

处理方法:

查询offset的最小值(理解为消费者的offset)

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -2

查询offset的最大值(理解为生产者的offset)

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic TOPIC_NAME --time -1

八、导出某个topic的数据

说明:注意topic数据量大小和磁盘大小,最好不要重定向到根分区;如果需要下载先压缩再下载,压缩比很高

处理方法:从头开始消费重定向到文件

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_NAME  --from-beginning > TOPIC_NAME 

九、增加topic分区数(只能加不能减)

# ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 12 --topic ec-applog-err
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

# 这里给了一个警告 : 如果为具有关键字的主题增加了分区,则分区逻辑或消息的顺序将受到影响。
# 如果我们的业务场景对消息的顺序有着严格的要求, 一定要谨慎添加分区! 建议将之前所有的消息全部消费完才执行添加分区操作

十、控制台生产者/消费者

生产者:./bin/kafka-console-producer--broker-list localhost:9092 --topic test1

消费者:./bin/kafka-console-consumer--zookeeper localhost:2181 --topic test1 --from-beginning 

十一、停止/启动服务

启动:./bin/kafka-server-start.sh -daemon config/server.properties

停止:./bin/kafka-server-stop.sh

十二、kafka鉴权配置

配置文件示例server.properties

# -----------  security begin -----------

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:admin

sasl.enabled.mechanisms=SCRAM-SHA-512

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

security.inter.broker.protocol=SASL_PLAINTEXT

listeners=SASL_PLAINTEXT://192.168.1.213:9092

advertised.listeners=SASL_PLAINTEXT://192.168.1.213:9092

# -----------  security end -----------

配置文件示例kafka_server_jaas.conf

KafkaServer {

   org.apache.kafka.common.security.scram.ScramLoginModule required

   username="admin"

   password="admin-secret";

};

修改启动脚本,添加bin/kafka-server-start.sh

export KAFKA_OPTS=-Djava.security.auth.login.config=$KAFKA_HOME/config/jaas.conf

授权脚本

[root@data-kafka-auth-prod01 scripts]# cat chmod_read.sh

#!/bin/sh

#通过传入用户名、用户消费组作为参数、即可批量赋权

cat topic | while read line

do

  kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:$1 --operation Read --group $2 --topic $line

done

[root@data-kafka-auth-prod01 scripts]# cat chmod_write.sh

#!/bin/sh

#通过传入用户名、即可批量赋写权限

cat topic | while read line

do

  kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:$1 --operation Write --topic $line

done

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐