1. Kafka全部数据清空

kafka全部数据清空的步骤为:

  1. 停止每台机器上的kafka;
  2. 删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
  3. 删除zookeeper上与kafka相关的znode节点;
  4. 重启kafka、如果删除topic还在则需要重启zookeeper;

这里以192.168.187.201 node1、192.168.187.202 node2、192.168.187.203 node3三台机器作为kafka的集群。

注意:kafka版本为kafka_2.11-1.1.1

1.1 停止每台机器上的kafka

以root用户分别登录三台机器,使用命令jps 找出kafka的PID,再使用命令 kill kafka进程。

 节点node1

技术分享图片

节点node2

技术分享图片

节点node3

技术分享图片

1.2 删除kafka存储目录

在kafka安装目录的config文件夹下server.properties中查看存储目录为:

技术分享图片

删除该目录所有数据:

技术分享图片

技术分享图片

技术分享图片

1.3 删除zookeeper上与kafka相关的znode节点

zookeeper上面保存着kafka的所有topic及其消费信息,故需要删除与kafka相关的znode节点:

进入zookeeper的shell界面:

技术分享图片

查看与kafka相关的znode节点:

技术分享图片

在上面的znode节点中,除了zookeeper作为zk的安全保障措施,其他znode节点都得删除 

技术分享图片

1.4 重启kafka

分别在node1、node2、node3上面执行如下命令启动kafka:

/opt/app/kafka_2.11-1.1.1/bin/kafka-server-start.sh /opt/app/kafka_2.11-1.1.1/config/server.properties > /dev/null 2>&1 &

jps命令查看node1、node2、node3上面的启动情况:

技术分享图片

技术分享图片

技术分享图片

最后在查看kafka上面是否还有topic存在:

技术分享图片

可以看到topic及其相关数据已被清空删除

2. 某一topic数据清空

查看当前所有topic

 技术分享图片

技术分享图片

比如目前需要删除test这一topic,目前kafka_2.11-1.1.1以上版本默认delete.topic.enable=true,即是说使用命令

./kafka-topics.sh --zookeeper node1:2181 --delete --topic test

该命令将会在zookeeper中删除与test这一topic相关的znode节点(包括test详细信息、生产数据、消费数据的节点),并在kafka的存储目录/opt/data/kafka/kafka-logs/下把与test这一topic相关的存储数据目录标记为待删除,稍后会真正删除这些待删除的目录,如下:

技术分享图片

使用kafka-topics.sh查看test在zookeeper中相关znode节点信息是否已被删除

技术分享图片

在/opt/data/kafka/kafka-logs目录下查看test相关存储目录是否被标记删除

技术分享图片

在/opt/data/kafka/kafka-logs目录下查看test相关存储目录已被删除

技术分享图片

3. 思考

kafka全部数据清空步骤比较繁琐,借鉴某一topic数据清空的方式,可以通过使用kafka-topics.sh --delete命令逐个删除所有的topic,达到清空kafka全部topic数据的目的,不足的是topic“__consumer_offsets”无法删除,不过不碍事。

 

4. 过期数据清理

Kafka 作为一个高吞吐的消息中间件和传统的消息中间件一个很大的不同点就在于它的日志实际上是以日志的方式默认保存在/kafka-logs文件夹中的。虽然默认有7天清除的机制,但是在数据量大,而磁盘容量不足的情况下,经常出现无法写入的情况。如何调整Kafka的一些默认参数就显得比较关键了。这里笔者整理了一些常见的配置参数供大家参考:

分段策略属性

属性名含义默认值
og.roll.{hours,ms}日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment168(7day)
log.segment.bytes每个segment的最大容量。到达指定容量时,将强制生成一个新的segment1G(-1为不限制)
log.retention.check.interval.ms日志片段文件检查的周期时间60000

 

 

 

 

 

日志刷新策略

Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。

属性名含义默认值
log.flush.interval.messages消息达到多少条时将数据写入到日志文件10000
log.flush.interval.ms当达到该时间时,强制执行一次flushnull
log.flush.scheduler.interval.ms周期性检查,是否需要将信息flush很大的值

日志保存清理策略

属性名含义默认值
log.cleanup.polict日志清理保存的策略只有delete和compact两种delete
log.retention.hours日志保存的时间,可以选择hours,minutes和ms168(7day)
log.retention.bytes删除前日志文件允许保存的最大值-1
log.segment.delete.delay.ms日志文件被真正删除前的保留时间60000
log.cleanup.interval.mins每隔一段时间多久调用一次清理的步骤10
log.retention.check.interval.ms周期性检查是否有日志符合删除的条件(新版本使用)300000

这里特别说明一下,日志的真正清除时间。当删除的条件满足以后,日志将被“删除”,但是这里的删除其实只是将该日志进行了“delete”标注,文件只是无法被索引到了而已。但是文件本身,仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐