Kafka常用操作:创建,删除,生产,消费topic。Too many open file问题修复。修改偏移量,重新消费。
Topic命令创建topickafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test192.168.6.57:9092,192.168.6.58:9092,192.168.6.59:9092删除topic./kafka-topics.sh –dele
Topic命令
创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
localhost:9092
删除topic
./kafka-topics.sh –delete -zookeeper localhost:2181 --topic push_input_counter
./bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic push_input_counter
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic sentinel_metric_dev
第一步:bin/kafka-topics --zookeeper zookeeper集群 --delete --topic topicName
第二步:进入zookeeper命令行,删除三个目录①rmr /brokers/topics/market ②rmr /admin/delete_topics/market ③rmr /config/topics/market
集群命令
kafka-topics.bat --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic test-p1 --replication-factor 3 --partitions 1 --create
当kafka集群单个节点出现磁盘满了,需要清理历史topic数据;方法如下
1): 停掉kafka进程,将kafka的server.properties中的log.retention.hours=1/60; 将日志数据保留1分钟
或者
在不用停掉kafka进程的情况下执行:
kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name test –alter –add-config retention.ms=86400000
#test 为topic名称
#retention.ms 保留时间24小时
topic 参数
这是在创建一个topic时自定义了最大的消息字节数和消息持久化参数:
> bin/kafka-topics.sh –zookeeper localhost:2181 –create –topic my-topic –partitions 1 –replication-factor 1 –config max.message.bytes=64000 –config flush.messages=1
可以在创建topic后继续使用命令修改topic中已经定义了的参数,本示例更新my-topic的最大message大小::
> bin/kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name my-topic –alter –add-config max.message.bytes=128000
要检查在主题上设置的覆盖,您可以执行
> bin/kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name my-topic –describe
或者是删除指定的topic中的某个自定义参数:
> bin/kafka-topics.sh –zookeeper localhost:2181 –alter –topic my-topic –deleteConfig max.message.bytes
或者是删除指定的topic中的某个自定义参数:
> bin/kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name my-topic –alter –delete-config max.message.bytes
查看所有topic
kafka-topics.bat --zookeeper localhost:2181 -list
kafka-topics --zookeeper localhost:2181 --describe --topic test-partitions2
查看topic某分区偏移量最大(小)值kafka-run-class.
kafka-run-class.bat kafka.tools.GetOffsetShell --topic test-partitions2 --time -1 --broker-list localhost:9092 --partitions 0
查看topic分区数
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test_out
增加topic分区数
kafka-topics --zookeeper localhost:2181 -alter --partitions 3 --topic test-partitions2
查看topic消费者进度
kafka-consumer-groups.sh --bootstrap-server 172.17.2.118:9092 --list --group 1620 --describe
kafka-run-class.bat kafka.tools.ConsumerOffsetChecker --group test-consumer-group --zookeeper localhost:2181
使用此命令报错Could not find or load main class kafka.tools.ConsumerOffsetChecker
可用下面命令代替
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group
生产者启动kafka-console-producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test-partitions2
消费者启动kafka-console-consumer
指定clientid
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test-partitions2 --property client.id=1
group.id=test-consumer-group
指定分组
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test-partition4 --consumer-property group.id= test-consumer-group group4
从开始消费
kafka-console-consumer.bat --zookeeper localhost 2181 --topic test-partitions2 --from-beginning
groups命令
kafka-consumer-groups
kafka-consumer-groups.bat --zookeeper localhost:2181 --group test-consumer-group --describe
kafka-consumer-groups.bat --zookeeper localhost:2181 --group test-consumer-group --describe
kafka-consumer-groups.bat --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --new-consumer --list
Timestamp offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <address:port> --topic <string> --time <param>
configs命令
设置过期时间
./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name testtopic --entity-type topics --add-config retention.ms=86400000
retention.ms=86400000 为一天,单位是毫秒。
kafka限流 kafka-configs
配置kafka-configs
kafka-configs.bat --zookeeper localhost:2181 --alter -add-config producer_byte_rate=1048576,consumer_byte_rate=1024 --entity-type clients --entity-name 1
测试启动kafka-producer-perf-test
kafka-producer-perf-test.bat --topic test-partitions2 --num-records 10000 --record-size 100 --throughput 150 --producer-props bootstrap.servers=localhost:9092 client.id=1
avg latency 平均延迟
max latency 最大延迟
消费者日志
消费者测试命令kafka-consumer-perf-test
kafka-consumer-perf-test.bat --broker-list localhost:9092 --messages 500000 --topic test-partitions2
Offset操作
查询topic的offset的范围
用下面命令可以查询到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -2
查询offset的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave6:9092 -topic videoplay --time -1
D:\apache-kafka1\kafka_2.11-0.11.0.2\bin\windows>kafka-run-class kafka.tools.GetOffsetShell --broker-list kafka-1:9092 --topic log_p
log_p:0:175
Kafka迁移
D:\kafka_2.11-0.11.0.2\kafka_2.11-0.11.0.2\bin\windows>kafka-reassign-partitions.bat --zookeeper localhost:2181 --topics-to-move-json-file topic.json --broker-list "0" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-partitions2","partition":1,"replicas":[0]},{"topic":"test-partitions2","partition":0,"replicas":[0]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-partitions2","partition":1,"replicas":[0]},{"topic":"test-partitions2","partition":0,"replicas":[0]}]}
D:\kafka_2.11-0.11.0.2\kafka_2.11-0.11.0.2\bin\windows>kafka-reassign-partitions.bat --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute
Partitions reassignment failed due to Partition reassignment data file is empty
kafka.common.AdminCommandFailedException: Partition reassignment data file is empty
at kafka.admin.ReassignPartitionsCommand$.parseAndValidate(ReassignPartitionsCommand.scala:188)
at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:158)
at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:154)
at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:51)
at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
D:\kafka_2.11-0.11.0.2\kafka_2.11-0.11.0.2\bin\windows>kafka-reassign-partitions.bat --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-partitions2","partition":1,"replicas":[0]},{"topic":"test-partitions2","partition":0,"replicas":[0]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
D:\kafka_2.11-0.11.0.2\kafka_2.11-0.11.0.2\bin\windows>kafka-reassign-partitions.bat --zookeeper localhost:2181 --reassignment-json-file reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [test-partitions2,1] completed successfully
Reassignment of partition [test-partitions2,0] completed successfully
文字说明
如何设置Kafka能接收的最大消息的大小?
对于SRE来讲,该题简直是送分题啊,但是,最大消息的设置通常情况下有生产者端,消费者端,broker端和topic级别的参数,我们需要正确设置,以保证可以正常的生产和消费。
Broker端参数:message.max.bytes,max.message.bytes(topic级别),replica.fetch.max.bytes(否则follow会同步失败)
Consumer端参数:fetch.message.max.bytes
LAG参数说明
正数:就是kafka数据积压了,往kafka进数据的速度,大于这个数据被消费的速度。a-b就是正数了。供大于求。
负数:就是有时候,我刚刚取了a还没来得及做减法呢,b已经查、超过a了,导致结果是负数,说明kafka的消费者干活很快,分分钟就处理完消费的数据,供小于求。
0:生产者和消费者速率基本相当,说明2者都工作正常。
Too many open file
ulimit -a | grep "open files"
然后,您可以再次通过 ulimit 设置该值:
sudo ulimit -n 4096
lsof | wc -l
lsof -p 128023|wc -l
修改偏移量
通过将当前偏移位置递增 10 来重置偏移
<span style="background-color:#f6f6f6"><span style="color:#626262">kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --shift-by 10
--topic sales_topic --执行</span></span>
通过将当前偏移位置减 10 来重置偏移
<span style="background-color:#f6f6f6"><span style="color:#626262">kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --shift-by -10
--topic sales_topic --执行</span></span>
–to-datetime <字符串>
重置偏移量以从日期时间偏移。格式:'YYYY-MM-DDTHH:mm:SS.sss'
<span style="background-color:#f6f6f6"><span style="color:#626262">kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets
--to-datetime 2020-11-01T00:00:00Z --topic sales_topic --execute</span></span>
–到最早
将偏移量重置为主题中可用的最早(最旧)偏移量。
<span style="background-color:#f6f6f6"><span style="color:#626262">kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest
--topic sales_topic --执行
</span></span>
–到最新
将偏移量重置为主题中可用的最新(最近)偏移量。
<span style="background-color:#f6f6f6"><span style="color:#626262">kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-latest
--topic sales_topic --执行</span></span>
更多推荐
所有评论(0)