Partitions and Memory Usage

replica.fetch.max.bytes 

每个partition都分配一个buffer给replica,如果分配1M,如果有1000个partition就需要1G的内存需求考虑有充足的内存

fetch.message.max.bytes

同样的考虑也适用于consumer,对于大文件需要确保有充足的内存接收,大文件可能需要较少的partition或者更多的内存


Partition Reassignment


当系统的负载过大需要增加broker时,kafka不会自动将数据加载到该broker

需要手动执行


创建想要移动的topic list


topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}
保存为json文件,我们这里是topics-to-move.json


执行 --generate将产生推荐的列表


bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
    --topics-to-move-json-file topics-to-move.json 
    --broker-list "4" 
    --generate

执行完成屏幕将显示推荐的信息


Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,1]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,2]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

{"version":1,
 "partitions":[{"topic":"foo1","partition":3,"replicas":4},
               {"topic":"foo1","partition":1,"replicas":4},
               {"topic":"foo2","partition":2,"replicas":4}]
}

校验信息并且保存json文件,这里是expand-cluster-reassignment.json


执行动作

bin/kafka-reassign-partitions.sh \
    --zookeeper localhost:2181 \
    --reassignment-json-file expand-cluster-reassignment.json 
    --execute

最后验证结果


bin/kafka-reassign-partitions.sh \
    --zookeeper localhost:2181 \
    --reassignment-json-file expand-cluster-reassignment.json 
    --verify

Garbage Collection


监听gc log,gc暂停时间过长可能使zookeeper session超时,需要增加   zookeeper.session.timeout.ms

Handling Large Messages



大文件首先考虑以下几点,减少信息的大小

压缩
compression.codec 或者compressed.topics支持压缩信息

如果共享存储系统可用使用它比使用kafka发送大文件性能更快(NAS,HDFS,S3),将大文件发在共享存储,利用kafka发送消息

在producer端将大信息拆分成1k的小segment,利用partiton key发送到同一个partition,在consumer端可以还原它

如果仍然需要发送大文件到kafka,调整如下几项

message.max.bytes
broker可以接收的最大message,需要小于fetch.message.max.bytes否则consumer不能消息它

log.segment.bytes
kafka log文件的最大值,必须大于最大message的值


replica.fetch.max.bytes
必须大于message.max.bytes,保证replica可以复制成功否则将丢失数据




fetch.message.max.bytes
必须大于 message.max.bytes,保证可以消息


message.max.bytes < log.segment.bytes (保证可以保存)
message.max.bytes < replica.fetch.max.bytes (保证可以复制)
message.max.bytes <  fetch.message.max.bytes(保证可以消息)

Tuning Kafka Producers


kafka producer是将信息发入buffer,当buffer满时发送数据,然后重新填充buffer
它有两个非常重要的指标latency, throughput,对应kafka 参数就是batch.size,linger.ms

batch.size
当数据大小到达时发送数据到broker,你可以增加该值但是它可能永远不会满,但是它会在满足其他条件的情况下发送(超时)

linger.ms
利用时间来控制发送间隔,你可以调大它保证很好的throughput,但是latency会很大

这里说明一下因为我翻译的是cloudera的文档,可能再这里参数名和apache的不一样

所以这里以我的理解应该是这两个参数

cloudera的两个参数名分别是

queue.buffering.max.ms  -> linger.ms

send.buffer.bytes   -> batch.size

Tuning Kafka Brokers


kafka 每一个partition有一个leader,需要保证leader在broker之间的balance,配置auto.leader.rebalance.enable=true (更改)
cloudera推荐一个partiton单独一个物理磁盘,一个partiton一个consumer

Tuning Kafka Consumers


consumer的数量应该等于topic partiton的数量,同一个consumer group中的consumer会折分到不同的partition这样可以得到更好的性能

replica.high.watermark.checkpoint.interval.ms

调整该参数可以保证读取有更好的效率,它可以保存你读取的检查点,使你不会读取之前读过的数据,如果每个读取都保存chekpoint那么你永远不会丢消息,
但是throughput很低

Setting User Limits for Kafka


kafka 需要打开大量文件需要调linux limit

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐