列出所有topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
说明:其实就是去检查zk上节点的 /brokers/topics子节点,打印出来

创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3
说明:线上环境我们会将自动创建topic禁用掉,改为手动创建(auto.create.topics.enable=false),parttitions和replication-factor是两个必备选项,第一个参数是消费并行度的一个重要参数,第二个极大提高了topic的可用性.备份因子默认是1,相当于没有备份,注意其值不能大于broker个数,否则会报错。同时还可以指定topic级别的配置参数,这种特定的配置会覆盖掉默认配置,并且存储在zookeeper的/config/topics/[topic_name]节点数据里。--alter  --config  --deleteConfig


删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete
说明:在0.8.2.1之前的版本一般不建议之行删除操作,因为有各种各样的bug存在,目前的版本稳定些,同时我们需要将配置参数打开(delete.topic.enable=true),删除操作其实是通过更改一个zk节点,由另外的删除线程异步做的topicdeletionmanager。

增加partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
说明:只能增加,不能减少。如果原有分散策略是hash的方式,将会受影响。发送端(默认10分钟会刷新本地存储元信息)和消费端都无需重启即可生效。


增加broker:
需要将安装包拷贝到对应服务器上,修改broker.id,不能与现有系统中broker id冲突,然后创建好对应的日志目录和数据目录等。注意的是,现有partition会继续保持到其他broker上面,新创建topic才可能分配到该新机器上面。如果需要保持整个kafka集群比较均衡,需要手动对现有数据进行迁移,尽量迁移非leader的partition,利用partition reassignment tools
1.--generate
也可以对候选的进行适当调整

[hadoop@i-o1oh kafka_2.10-0.8.2.1]$ cat topics-to-move.json

{"version":1,

 "partitions":[{"topic":"production_process_flow_test"},

               {"topic":"production_process_flow_dev"}

}


[hadoop@i-o1ohkafka_2.10-0.8.2.1]$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Current partition replica assignment


{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

Proposed partition reassignment configuration


{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

2.--execute 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --execute

Current partition replica assignment


{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}


Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]}]}

3.--verify

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --verify

Status of partition reassignment:

Reassignment of partition [production_process_flow_test,1] completed successfully

Reassignment of partition [production_process_flow_dev,3] completed successfully

Reassignment of partition [production_process_flow_test,3] completed successfully

Reassignment of partition [production_process_flow_dev,0] completed successfully

Reassignment of partition [production_process_flow_dev,1] completed successfully

Reassignment of partition [production_process_flow_test,0] completed successfully

Reassignment of partition [production_process_flow_test,2] completed successfully

Reassignment of partition [production_process_flow_dev,2] completed successfully

下线某台机器分两种情况:
1.数据完整:直接将数据目录拷贝到新机器上,同时保持kafka安装配置不变,启动起来就行

2.数据不完整:这种情况下比较麻烦,需要对所有topic进行describe,如果发现有replica在这台下线机器上,需要用partition reassignment tool进行迁移工作

增减replica:
只需要编辑 --reassignment-json-file,添加或者减少broker id即可。其他操作可以依赖partition reassignment tool 的--execute --verify.

消费消息
watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning

查询消费信息
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev

写入消息性能测试
bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐