kafka压测
实时ETL流程测试文档编号版本号V1.0名称实时ETL流程测试文档总页数正文编写日期审批目录1. 测试目的本次测试主要对基于Flink的实时ETL系统各个环节跑通测试各个组件功能可用性测试数据一致性、实时性、完整性2. 测试方法采用python脚本模拟生成数据,通过Kafka作为消息队列,Flink完成实时抽取转换,输出数据到HDFS测试过程主要分为以下3
实时ETL流程测试
文档编号 | 版本号 | V1.0 | |
---|---|---|---|
名称 | 实时ETL流程测试文档 | ||
总页数 | 正文 | ||
编写日期 | 审批 |
目录
1. 测试目的
本次测试主要对基于Flink的实时ETL系统各个环节跑通
- 测试各个组件功能可用性
- 测试数据一致性、实时性、完整性
2. 测试方法
采用python脚本模拟生成数据,通过Kafka作为消息队列,Flink完成实时抽取转换,输出数据到HDFS
测试过程主要分为以下3阶段:
- 模拟实时生产数据
- 程序目录:scdh03 /tmp/pycharm_myrs
- 主要逻辑:每秒发送十条模拟数据到kafka
- 获取数据并转换输出
- Flink集群管理平台
- 测试项目地址:git:my-sc-slaughter-flink-test
- 程序入口:org.myrs.consumer.StreamingJob
- 主要逻辑:实时消费kafka数据,对数据进行转换处理,存入HDFS
- 数据存储
- HDFS存储路径:/data/ods/test/
- 存储规则:按小时划分目录,已log文件形式存储
- 查看方法:scdh01
hadoop fs -ls /data/ods/test
3. 测试结果
- 组件功能可用性
- Kafka接收发送消息正常,无积压,集群运行稳定无异常
- Flink数据采集、转换、输出正常,集群运行稳定无异常
- 数据验证
- 实时性:数据生产和落地无延迟
- 完整性:源数据和落地数据数据量一致,数据内容相同
实时大数据平台压测方案
1. 压测目的
本次性能测试在正式环境下单台服务器上Kafka处理消息能力及Flink承载能力进行压力测试。测试包括对Kafka写入消息和消费消息进行压力测试,根据不同量级的消息处理结果,评估Kafka的处理性能是否满足项目需求,Flink处理速度是否会产生背压
2. 测试范围及方法
测试使用测试脚本,通过Kafka发起大量写入请求。模拟不同数量级的消息写入和消费场景,查看Kafka处理不同数量级的消息数时的处理能力,包括每秒生成消息数、吞吐量、消息延迟时间
2.1 测试方法
Kafka消息写入创建的topic命名为myrs_consumer,Kafka消费读取的topic也是该topic,使用命令发起消费该topic的请求,针对不同的测试指标,本次我们采用固定其他值,动态变化测量值的方式来进行
2.2 准备工作
测试之前,先用linux命令去测试磁盘的读写速度,具体命令如下:
1.测试IO读
hdparm -t --direct /dev/scda3
2.测试IO写
sync;/usr/bin/time -p bash -c "(dd if=/dev/zero of=test.dd bs=1M count=20000)"
磁盘读在163m/s-206m/s之间,而写速度是163m/s。后续评测我们以该磁盘测试为基准来
2.3 测试环境
- 硬件资源:3台32C-CPU、32G内存、500G硬盘的虚拟机
- 操作系统:Centos7
- 程序版本:Kafka(2.11) Flink(1.12.5)
2.4 Kafka参数
zookeeper.version=3.4.5-cdh6.2.0--1
java.version=1.8.0_181
user.dir=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka
Maximum Message Size=1000kb
Segment File Size=1G
Data Retention Time=7天
Data Directories=/var/local/kafka/data
Additional Broker Java Options:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.local.only=true -Djava.rmi.server.hostname=127.0.0.1
3. 测试过程
3.1 producer测试
- 测试目的:测试kafka producer吞吐量
- 测试脚本
./kafka-producer-perf-test.sh --topic myrs_consumer --num-records 100000000 --record-size 687 --producer-props bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 batch.size=10000 --throughput 30000
./kafka-producer-perf-test.sh --topic myrs_consumer --num-records 100000000 --record-size 687 --producer-props bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 batch.size=20000 --throughput 30000
./kafka-producer-perf-test.sh --topic myrs_consumer --num-records 100000000 --record-size 687 --producer-props bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 batch.size=40000 --throughput 30000
./kafka-producer-perf-test.sh --topic myrs_consumer --num-records 100000000 --record-size 687 --producer-props bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 batch.size=60000 --throughput 30000
./kafka-producer-perf-test.sh --topic myrs_consumer --num-records 100000000 --record-size 687 --producer-props bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 batch.size=80000 --throughput 30000
-
测试结果
-
测试结论
- 发现在消息未压缩的前提下,20000条一批次之后吞吐稳定在30000条/s,而数据量在19.65M/s
- 本次测试对数据的存储块大小未测,但在之前的测试中发现压缩以及解压的情况也是lz4算法最优,==lz4压缩最大时可以达到30w+/s的吞吐,而不压缩为12w/s,snappy最大为16w/s,gzip最大为5.8w/s==;故后续生产消息时建议采用lz4压缩,不仅可以节省磁盘,也可以大幅度增加我们的吞吐
3.2consumer测试
- 测试目的:测试consumer消费情况
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576 --messages 100000 --threads 1 --hide-header --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576 --messages 100000 --threads 4 --hide-header --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576 --messages 100000 --threads 7 --hide-header --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576 --messages 100000 --threads 10 --hide-header --num-fetch-threads 1
- 测试结论:在threads为4时,消费速度最好达到24.1w/s,而后续慢慢平稳
4. 测试结果
- producer方面,在主从同步选取1时性能和稳定性适中,压缩方面,我们选择lz4压缩方式,而批大小我们可以选择100w左右,并发保持在60,消息的大小建议在4k左右较好,分区数在3-5个,副本数为3个既可以保证性能也能维持高可用;
- consumer的处理线程我们选择4个,抓取消息大小则设置在400w条左右,抓取线程设置为10个即可
- broker的参数方面,replica.fetcher设置为服务器core的个数时较好,io.threads 则设置为core个数的3倍,network.threads保持和core个数相等即可,interval.messages数设置为2w,interval.ms则设置为10000 ms
5. 测试异常
- Kafka集群宕机
- 现象:集群监控显示scdh2节点失去联系,查阅日志监控服务读取信息失败,提示空间不足
- 解决方案:对Kafka存储空间扩容,并增加监控告警,避免极端情况下大量数据写入影响集群运行
6. 性能优化
-
Kafka缓冲磁盘扩容
- kafka数据目录:scdh02 /var/local/kafka/data
- 默认数据目录大小57G,增加磁盘 /root/data/kafka/data
- kafka数据目录:scdh02 /var/local/kafka/data
-
Flink集群配置高可用
-
修改 /usr/app/flink-1.12.5/conf/flink-conf.yaml
jobmanager.rpc.address: scdh01
-
修改 /usr/app/flink-1.12.5/conf/workers
scdh02 scdh03
-
修改flink-conf.yaml配置文件,增加高可用配置
state.backend: filesystem state.checkpoints.dir: hdfs://scdh02:9000/flink-checkpoints state.savepoints.dir: hdfs://scdh02:9000/flink-savepoints high-availability: zookeeper high-availability.storageDir: hdfs://scdh02:9000/flink/ha/ high-availability.zookeeper.quorum: scdh01:2181,scdh02:2181,scdh03:2181
-
重启Flink集群
-
更多推荐
所有评论(0)