所有可配置的动态配置 请看最后面的 附件 部分

2.2 增删改 配置 --alter

–alter

删除配置: --delete-config k1=v1,k2=v2

添加/修改配置: --add-config k1,k2

选择类型: --entity-type (topics/clients/users/brokers/broker-

loggers)

类型名称: --entity-name

Topic添加/修改动态配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic删除动态配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

添加/删除配置同时执行

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=788888888 --delete-config log.retention.ms

其他配置同理,只需要类型改下--entity-type

类型有: (topics/clients/users/brokers/broker- loggers)

哪些配置可以修改 请看最后面的附件:ConfigCommand 的一些可选配置

默认配置

配置默认 --entity-default

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers --entity-default --add-config log.segment.bytes=88888888

动态配置的默认配置是使用了节点 <defalut>;

在这里插入图片描述

该图转自https://www.cnblogs.com/lizherui/p/12271285.html

优先级 指定动态配置>默认动态配置>静态配置

3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions


请戳 【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移

kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)

4.Topic的发送kafka-console-producer.sh


4.1 生产无key消息

生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生产有key消息

加上属性--property parse.key=true

生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)


可选参数

| 参数 | 值类型 | 说明 | 有效值 |

| — | — | — | — |

| –bootstrap-server | String | 要连接的服务器必需(除非指定–broker-list) | 如:host1:prot1,host2:prot2 |

| –topic | String | (必需)接收消息的主题名称 | |

| –batch-size | Integer | 单个批处理中发送的消息数 | 200(默认值) |

| –compression-codec | String | 压缩编解码器 | none、gzip(默认值)snappy、lz4、zstd |

| –max-block-ms | Long | 在发送请求期间,生产者将阻止的最长时间 | 60000(默认值) |

| –max-memory-bytes | Long | 生产者用来缓冲等待发送到服务器的总内存 | 33554432(默认值) |

| –max-partition-memory-bytes | Long | 为分区分配的缓冲区大小 | 16384 |

| –message-send-max-retries | Integer | 最大的重试发送次数 | 3 |

| –metadata-expiry-ms | Long | 强制更新元数据的时间阈值(ms) | 300000 |

| –producer-property | String | 将自定义属性传递给生成器的机制 | 如:key=value |

| –producer.config | String | 生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径 | |

| –property | String | 自定义消息读取器 | parse.key=true/false key.separator=<key.separator>ignore.error=true/false |

| –request-required-acks | String | 生产者请求的确认方式 | 0、1(默认值)、all |

| –request-timeout-ms | Integer | 生产者请求的确认超时时间 | 1500(默认值) |

| –retry-backoff-ms | Integer | 生产者重试前,刷新元数据的等待时间阈值 | 100(默认值) |

| –socket-buffer-size | Integer | TCP接收缓冲大小 | 102400(默认值) |

| –timeout | Integer | 消息排队异步等待处理的时间阈值 | 1000(默认值) |

| –sync | 同步发送消息 | | |

| –version | 显示 Kafka 版本 | 不配合其他参数时,显示为本地Kafka版本 | |

| –help | 打印帮助信息 | | |

5. Topic的消费kafka-console-consumer.sh


1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)

下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正则表达式匹配topic进行消费--whitelist

消费所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

消费所有的topic,并且还从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

3.显示key进行消费--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


| 参数 | 描述 | 例子 |

| — | — | — |

| --group | 指定消费者所属组的ID | |

| --topic | 被消费的topic | |

| --partition | 指定分区 ;除非指定–offset,否则从分区结束(latest)开始消费 | --partition 0 |

| --offset | 执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量 | --offset 10 |

| --whitelist | 正则表达式匹配topic;--topic就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,--partition --offset等就不能使用了 | |

| --consumer-property | 将用户定义的属性以key=value的形式传递给使用者 | --consumer-propertygroup.id=test-consumer-group |

| --consumer.config | 消费者配置属性文件请注意,[consumer-property]优先于此配置 | --consumer.config config/consumer.properties |

| --property | 初始化消息格式化程序的属性 | print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer> |

| --from-beginning | 从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了 | |

| --max-messages | 消费的最大数据量,若不指定,则持续消费下去 | --max-messages 100 |

| --skip-message-on-error | 如果处理消息时出错,请跳过它而不是暂停 | |

| --isolation-level | 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted | |

| --formatter | kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter | |

6.kafka-leader-election Leader重新选举


6.1 指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions

6.3 设置配置文件批量指定topic和分区进行Leader重选举

先配置leader-election.json文件

{

“partitions”: [

{

“topic”: “test_create_topic4”,

“partition”: 1

},

{

“topic”: “test_create_topic4”,

“partition”: 2

}

]

}

sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json


相关可选参数

| 参数 | 描述 | 例子 |

| — | — | — |

| --bootstrap-server 指定kafka服务 | 指定连接到的kafka服务 | –bootstrap-server localhost:9092 |

| --topic | 指定Topic,此参数跟--all-topic-partitionspath-to-json-file 三者互斥 | |

| --partition | 指定分区,跟--topic搭配使用 | |

| --election-type | 两个选举策略(PREFERRED:优先副本选举,如果第一个副本不在线的话会失败;UNCLEAN: 策略) | |

| --all-topic-partitions | 所有topic所有分区执行Leader重选举; 此参数跟--topicpath-to-json-file 三者互斥 | |

| --path-to-json-file | 配置文件批量选举,此参数跟--topicall-topic-partitions 三者互斥 | |

7. 持续批量推送消息kafka-verifiable-producer.sh


单次发送100条消息--max-messages 100

一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100

每秒发送最大吞吐量不超过消息 --throughput 100

推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100

发送的消息体带前缀--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666

注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.

其他参数:

--producer.config CONFIG_FILE 指定producer的配置文件

--acks ACKS 每次推送消息的ack值,默认是-1

8. 持续批量拉取消息kafka-verifiable-consumer


持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

单次最大消费10条消息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10


相关可选参数

| 参数 | 描述 | 例子 |

| — | — | — |

| --bootstrap-server 指定kafka服务 | 指定连接到的kafka服务; | –bootstrap-server localhost:9092 |

| --topic | 指定消费的topic | |

| --group-id | 消费者id;不指定的话每次都是新的组id | |

| group-instance-id | 消费组实例ID,唯一值 | |

| --max-messages | 单次最大消费的消息数量 | |

| --enable-autocommit | 是否开启offset自动提交;默认为false | |

| --reset-policy | 当以前没有消费记录时,选择要拉取offset的策略,可以是earliest, latest,none。默认是earliest | |

| --assignment-strategy | consumer分配分区策略,默认是org.apache.kafka.clients.consumer.RangeAssignor | |

| --consumer.config | 指定consumer的配置文件 | |

9.生产者压力测试kafka-producer-perf-test.sh


1. 发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024 最大吞吐量每秒10000条--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通过LogIKM查看分区是否增加了对应的数据大小

在这里插入图片描述

LogIKM 可以看到发送了1024条消息; 并且总数据量=1M; 1024条*1024byte = 1M;

2. 用指定消息文件--payload-file发送100条消息最大吞吐量每秒100条--throughput 100

  1. 先配置好消息文件batchmessage.txt

在这里插入图片描述

  1. 然后执行命令

发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符,默认分隔符是\n换行;

bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 1024 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

  1. 验证消息,可以通过 LogIKM 查看发送的消息

在这里插入图片描述


相关可选参数

| 参数 | 描述 | 例子 |

| — | — | — |

| --topic | 指定消费的topic | |

| --num-records | 发送多少条消息 | |

| --throughput | 每秒消息最大吞吐量 | |

| --producer-props | 生产者配置, k1=v1,k2=v2 | --producer-props bootstrap.servers= localhost:9092,client.id=test_client |

| --producer.config | 生产者配置文件 | --producer.config config/producer.propeties |

| --print-metrics | 在test结束的时候打印监控信息,默认false | --print-metrics true |

| --transactional-id | 指定事务 ID,测试并发事务的性能时需要,只有在 --transaction-duration-ms > 0 时生效,默认值为 performance-producer-default-transactional-id | |

| --transaction-duration-ms | 指定事务持续的最长时间,超过这段时间后就会调用 commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0 | |

| --record-size | 一条消息的大小byte; 和 --payload-file 两个中必须指定一个,但不能同时指定 | |

| --payload-file | 指定消息的来源文件,只支持 UTF-8 编码的文本文件,文件的消息分隔符通过 --payload-delimeter指定,默认是用换行\nl来分割的,和 --record-size 两个中必须指定一个,但不能同时指定 ; 如果提供的消息 | |

| --payload-delimeter | 如果通过 --payload-file 指定了从文件中获取消息内容,那么这个参数的意义是指定文件的消息分隔符,默认值为 \n,即文件的每一行视为一条消息;如果未指定--payload-file则此参数不生效;发送消息的时候是随机送文件里面选择消息发送的; | |

10.消费者压力测试kafka-consumer-perf-test.sh


消费100条消息--messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100


相关可选参数

| 参数 | 描述 | 例子 |

| — | — | — |

| --bootstrap-server | | |

| --consumer.config | 消费者配置文件 | |

| --date-format | 结果打印出来的时间格式化 | 默认:yyyy-MM-dd HH:mm:ss:SSS |

| --fetch-size | 单次请求获取数据的大小 | 默认1048576 |

| --topic | 指定消费的topic | |

| --from-latest | | |

| --group | 消费组ID | |

| --hide-header | 如果设置了,则不打印header信息 | |

| --messages | 需要消费的数量 | |

| --num-fetch-threads | feth 数据的线程数(废弃无效) | 默认:1 |

| --print-metrics | 结束的时候打印监控数据 | |

| --show-detailed-stats | 如果设置,则按照--report_interval配置的方式报告每个报告间隔的统计信息 | |

| --threads | 消费线程数;(废弃无效) | 默认 10 |

| --reporting-interval | 打印进度信息的时间间隔(以毫秒为单位) | |

11.删除指定分区的消息kafka-delete-records.sh


删除指定topic的某个分区的消息删除至offset为1024

先配置json文件offset-json-file.json

{“partitions”:

[{“topic”: “test1”, “partition”: 0,

“offset”: 1024}],

“version”:1

}

在执行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

验证 通过 LogIKM 查看发送的消息

在这里插入图片描述

从这里可以看出来,配置"offset": 1024 的意思是从最开始的地方删除消息到 1024的offset; 是从最前面开始删除的

12. 查看Broker磁盘信息kafka-log-dirs.sh


查询指定topic磁盘信息--topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查询指定Broker磁盘信息--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一个3分区3副本的Topic的查出来的信息

logDir Broker中配置的log.dir

{

“version”: 1,

“brokers”: [{

“broker”: 0,

“logDirs”: [{

“logDir”: “/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0”,

“error”: null,

“partitions”: [{

“partition”: “test2-1”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-0”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-2”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}]

}]

}, {

“broker”: 1,

“logDirs”: [{

“logDir”: “/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1”,

“error”: null,

“partitions”: [{

“partition”: “test2-1”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-0”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-2”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}]

}]

}, {

“broker”: 2,

“logDirs”: [{

“logDir”: “/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2”,

“error”: null,

“partitions”: [{

“partition”: “test2-1”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-0”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}, {

“partition”: “test2-2”,

“size”: 0,

“offsetLag”: 0,

“isFuture”: false

}]

}]

}, {

“broker”: 3,

“logDirs”: [{

“logDir”: “/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3”,

“error”: null,

“partitions”: []

}]

}]

}

如果你觉得通过命令查询磁盘信息比较麻烦,你也可以通过 LogIKM 查看

在这里插入图片描述

12. 消费者组管理 kafka-consumer-groups.sh


1. 查看消费者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

在这里插入图片描述

先调用MetadataRequest拿到所有在线Broker列表

再给每个Broker发送ListGroupsRequest请求获取 消费者组数据

2. 查看消费者组详情--describe

DescribeGroupsRequest

查看消费组详情--group--all-groups

查看指定消费组详情--group

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group


查看所有消费组详情--all-groups

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups

查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息

在这里插入图片描述

查询消费者成员信息--members

所有消费组成员信息

sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090

指定消费组成员信息

sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

在这里插入图片描述

查询消费者状态信息--state

所有消费组状态信息

sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090

指定消费组状态信息

sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

在这里插入图片描述

3. 删除消费者组--delete

DeleteGroupsRequest

删除消费组–delete

删除指定消费组--group

sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090

删除所有消费组--all-groups

sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed:

  • Group ‘test2_consumer_group’ could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消费组的偏移量 --reset-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果;

等你想真正执行的时候请换成参数--execute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group

重置指定消费组的所有Topic的偏移量--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置指定消费组的指定Topic的偏移量--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消费组的偏移量 --all-group

重置所有消费组的所有Topic的偏移量--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置所有消费组中指定Topic的偏移量--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相关重置Offset的模式

| 参数 | 描述 | 例子 |

| — | — | — |

| --to-earliest : | 重置offset到最开始的那条offset(找到还未被删除最早的那个offset) | |

| --to-current: | 直接重置offset到当前的offset,也就是LOE | |

| --to-latest | 重置到最后一个offset | |

| --to-datetime: | 重置到指定时间的offset;格式为:YYYY-MM-DDTHH:mm:SS.sss; | --to-datetime "2021-6-26T00:00:00.000" |

| --to-offset | 重置到指定的offset,但是通常情况下,匹配到多个分区,这里是将匹配到的所有分区都重置到这一个值; 如果 1.目标最大offset<--to-offset, 这个时候重置为目标最大offset;2.目标最小offset>--to-offset ,则重置为最小; 3.否则的话才会重置为--to-offset的目标值; 一般不用这个 | --to-offset 3465 在这里插入图片描述 |

| --shift-by | 按照偏移量增加或者减少多少个offset;正的为往前增加;负的往后退;当然这里也是匹配所有的; | --shift-by 100--shift-by -100 |

| --from-file | 根据CVS文档来重置; 这里下面单独讲解 | |

--from-file着重讲解一下

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;

  1. 先配置cvs文档

test2,0,100

test2,1,200

test2,2,300

  1. 执行命令

sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量delete-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


相关可选参数

| 参数 | 描述 | 例子 |

| — | — | — |

| --bootstrap-server | 指定连接到的kafka服务; | –bootstrap-server localhost:9092 |

| --list | 列出所有消费组名称 | --list |

| --describe | 查询消费者描述信息 | --describe |

| --group | 指定消费组 | |

| --all-groups | 指定所有消费组 | |

| --members | 查询消费组的成员信息 | |

| --state | 查询消费者的状态信息 | |

| --offsets | 在查询消费组描述信息的时候,这个参数会列出消息的偏移量信息; 默认就会有这个参数的; | |

| dry-run | 重置偏移量的时候,使用这个参数可以让你预先看到重置情况,这个时候还没有真正的执行,真正执行换成--excute;默认为dry-run | |

| --excute | 真正的执行重置偏移量的操作; | |

| --to-earliest | 将offset重置到最早 | |

| to-latest | 将offset重置到最近 | |

13.查看日志文件 kafka-dump-log.sh


| 参数 | 描述 | 例子 |

| — | — | — |

| --deep-iteration | | |

| --files <String: file1, file2, ...> | 必需; 读取的日志文件 | –files 0000009000.log |

| --key-decoder-class | 如果设置,则用于反序列化键。这类应实现kafka.serializer。解码器特性。自定义jar应该是在kafka/libs目录中提供 | |

| --max-message-size | 最大的数据量,默认:5242880 | |

| --offsets-decoder | if set, log data will be parsed as offset data from the __consumer_offsets topic. | |

| --print-data-log | 打印内容 | |

| --transaction-log-decoder | if set, log data will be parsed as transaction metadata from the __transaction_state topic | |

| --value-decoder-class [String] | if set, used to deserialize the messages. This class should implement kafka. serializer.Decoder trait. Custom jar should be available in kafka/libs directory. (default: kafka.serializer. StringDecoder) | |

| --verify-index-only | if set, just verify the index log without printing its content. | |

查询Log文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log

在这里插入图片描述

查询Log文件具体信息 --print-data-log

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.log --print-data-log

在这里插入图片描述

查询index文件具体信息

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.index

在这里插入图片描述

配置项为log.index.size.max.bytes; 来控制创建索引的大小;

查询timeindex文件

sh bin/kafka-dump-log.sh --files kafka-logs-0/test2-0/00000000000000000300.timeindex

在这里插入图片描述

附件


ConfigCommand 的一些可选配置


Topic相关可选配置

| key | value | 示例 |

| — | — | — |

| cleanup.policy | 清理策略 | |

| compression.type | 压缩类型(通常建议在produce端控制) | |

| delete.retention.ms | 压缩日志的保留时间 | |

| file.delete.delay.ms | topic删除被标记为–delete文件之后延迟多长时间删除正在的Log文件 | 60000 |

| flush.messages | 持久化message限制 | |

| flush.ms | 持久化频率 | |

| follower.replication.throttled.replicas | flowwer副本限流 格式:分区号:副本follower号,分区号:副本follower号 | 0:1,1:1 |

| index.interval.bytes | | |

| leader.replication.throttled.replicas | leader副本限流 格式:分区号:副本Leader号 | 0:0 |

| max.compaction.lag.ms | | |

| max.message.bytes | 最大的batch的message大小 | |

| message.downconversion.enable | message是否向下兼容 | |

| message.format.version | message格式版本 | |

| message.timestamp.difference.max.ms | | |

| message.timestamp.type | | |

| min.cleanable.dirty.ratio | | |

| min.compaction.lag.ms | | |

| min.insync.replicas | 最小的ISR | |

| preallocate | | |

| retention.bytes | 日志保留大小(通常按照时间限制) | |

| retention.ms | 日志保留时间 | |

| segment.bytes | segment的大小限制 | |

| segment.index.bytes | | |

| segment.jitter.ms | | |

| segment.ms | segment的切割时间 | |

| unclean.leader.election.enable | 是否允许非同步副本选主 | |

Broker相关可选配置

| key | value | 示例 |

| — | — | — |

| advertised.listeners | | |

| background.threads | | |

| compression.type | | |

| follower.replication.throttled.rate | | |

| leader.replication.throttled.rate | | |

| listener.security.protocol.map | | |

| listeners | | |

| log.cleaner.backoff.ms | | |

| log.cleaner.dedupe.buffer.size | | |

| log.cleaner.delete.retention.ms | | |

| log.cleaner.io.buffer.load.factor | | |

| log.cleaner.io.buffer.size | | |

| log.cleaner.io.max.bytes.per.second | | |

| log.cleaner.max.compaction.lag.ms | | |

| log.cleaner.min.cleanable.ratio | | |

| log.cleaner.min.compaction.lag.ms | | |

| log.cleaner.threads | | |

| log.cleanup.policy | | |

| log.flush.interval.messages | | |

| log.flush.interval.ms | | |

| log.index.interval.bytes | | |

| log.index.size.max.bytes | | |

| log.message.downconversion.enable | | |

| log.message.timestamp.difference.max.ms | | |

| log.message.timestamp.type | | |

| log.preallocate | | |

| log.retention.bytes | | |

| log.retention.ms | | |

| log.roll.jitter.ms | | |

| log.roll.ms | | |

| log.segment.bytes | | |

| log.segment.delete.delay.ms | | |

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Linux运维工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Linux运维全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Linux运维知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip1024b (备注Linux运维获取)
img

为了做好运维面试路上的助攻手,特整理了上百道 【运维技术栈面试题集锦】 ,让你面试不慌心不跳,高薪offer怀里抱!

这次整理的面试题,小到shell、MySQL,大到K8s等云原生技术栈,不仅适合运维新人入行面试需要,还适用于想提升进阶跳槽加薪的运维朋友。

本份面试集锦涵盖了

  • 174 道运维工程师面试题
  • 128道k8s面试题
  • 108道shell脚本面试题
  • 200道Linux面试题
  • 51道docker面试题
  • 35道Jenkis面试题
  • 78道MongoDB面试题
  • 17道ansible面试题
  • 60道dubbo面试题
  • 53道kafka面试
  • 18道mysql面试题
  • 40道nginx面试题
  • 77道redis面试题
  • 28道zookeeper

总计 1000+ 道面试题, 内容 又全含金量又高

  • 174道运维工程师面试题

1、什么是运维?

2、在工作中,运维人员经常需要跟运营人员打交道,请问运营人员是做什么工作的?

3、现在给你三百台服务器,你怎么对他们进行管理?

4、简述raid0 raid1raid5二种工作模式的工作原理及特点

5、LVS、Nginx、HAproxy有什么区别?工作中你怎么选择?

6、Squid、Varinsh和Nginx有什么区别,工作中你怎么选择?

7、Tomcat和Resin有什么区别,工作中你怎么选择?

8、什么是中间件?什么是jdk?

9、讲述一下Tomcat8005、8009、8080三个端口的含义?

10、什么叫CDN?

11、什么叫网站灰度发布?

12、简述DNS进行域名解析的过程?

13、RabbitMQ是什么东西?

14、讲一下Keepalived的工作原理?

15、讲述一下LVS三种模式的工作过程?

16、mysql的innodb如何定位锁问题,mysql如何减少主从复制延迟?

17、如何重置mysql root密码?

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
img

zXSnAvC0-1712874814557)]
[外链图片转存中…(img-PGkYEMDI-1712874814557)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Linux运维知识点,真正体系化!

由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新

如果你觉得这些内容对你有帮助,可以添加VX:vip1024b (备注Linux运维获取)
[外链图片转存中…(img-Ia2nhmpX-1712874814558)]

为了做好运维面试路上的助攻手,特整理了上百道 【运维技术栈面试题集锦】 ,让你面试不慌心不跳,高薪offer怀里抱!

这次整理的面试题,小到shell、MySQL,大到K8s等云原生技术栈,不仅适合运维新人入行面试需要,还适用于想提升进阶跳槽加薪的运维朋友。

[外链图片转存中…(img-My7ou6dR-1712874814558)]

本份面试集锦涵盖了

  • 174 道运维工程师面试题
  • 128道k8s面试题
  • 108道shell脚本面试题
  • 200道Linux面试题
  • 51道docker面试题
  • 35道Jenkis面试题
  • 78道MongoDB面试题
  • 17道ansible面试题
  • 60道dubbo面试题
  • 53道kafka面试
  • 18道mysql面试题
  • 40道nginx面试题
  • 77道redis面试题
  • 28道zookeeper

总计 1000+ 道面试题, 内容 又全含金量又高

  • 174道运维工程师面试题

1、什么是运维?

2、在工作中,运维人员经常需要跟运营人员打交道,请问运营人员是做什么工作的?

3、现在给你三百台服务器,你怎么对他们进行管理?

4、简述raid0 raid1raid5二种工作模式的工作原理及特点

5、LVS、Nginx、HAproxy有什么区别?工作中你怎么选择?

6、Squid、Varinsh和Nginx有什么区别,工作中你怎么选择?

7、Tomcat和Resin有什么区别,工作中你怎么选择?

8、什么是中间件?什么是jdk?

9、讲述一下Tomcat8005、8009、8080三个端口的含义?

10、什么叫CDN?

11、什么叫网站灰度发布?

12、简述DNS进行域名解析的过程?

13、RabbitMQ是什么东西?

14、讲一下Keepalived的工作原理?

15、讲述一下LVS三种模式的工作过程?

16、mysql的innodb如何定位锁问题,mysql如何减少主从复制延迟?

17、如何重置mysql root密码?

一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
[外链图片转存中…(img-SHMXhgiK-1712874814558)]

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐