实时ETL流程测试

文档编号版本号V1.0
名称实时ETL流程测试文档
总页数正文
编写日期审批

目录

1. 测试目的

本次测试主要对基于Flink的实时ETL系统各个环节跑通

  • 测试各个组件功能可用性
  • 测试数据一致性、实时性、完整性

2. 测试方法

采用python脚本模拟生成数据,通过Kafka作为消息队列,Flink完成实时抽取转换,输出数据到HDFS

测试过程主要分为以下3阶段:

  1. 模拟实时生产数据
    1. 程序目录:scdh03 /tmp/pycharm_myrs
    2. 主要逻辑:每秒发送十条模拟数据到kafka
  2. 获取数据并转换输出
    1. Flink集群管理平台
    2. 测试项目地址:git:my-sc-slaughter-flink-test
      1. 程序入口:org.myrs.consumer.StreamingJob
    3. 主要逻辑:实时消费kafka数据,对数据进行转换处理,存入HDFS
  3. 数据存储
    1. HDFS存储路径:/data/ods/test/
    2. 存储规则:按小时划分目录,已log文件形式存储
    3. 查看方法:scdh01 hadoop fs -ls /data/ods/test

3. 测试结果

  1. 组件功能可用性
    1. Kafka接收发送消息正常,无积压,集群运行稳定无异常
    2. Flink数据采集、转换、输出正常,集群运行稳定无异常
  2. 数据验证
    1. 实时性:数据生产和落地无延迟
    2. 完整性:源数据和落地数据数据量一致,数据内容相同

实时大数据平台压测方案

1. 压测目的

本次性能测试在正式环境下单台服务器上Kafka处理消息能力及Flink承载能力进行压力测试。测试包括对Kafka写入消息和消费消息进行压力测试,根据不同量级的消息处理结果,评估Kafka的处理性能是否满足项目需求,Flink处理速度是否会产生背压

2. 测试范围及方法

测试使用测试脚本,通过Kafka发起大量写入请求。模拟不同数量级的消息写入和消费场景,查看Kafka处理不同数量级的消息数时的处理能力,包括每秒生成消息数、吞吐量、消息延迟时间

2.1 测试方法

Kafka消息写入创建的topic命名为myrs_consumer,Kafka消费读取的topic也是该topic,使用命令发起消费该topic的请求,针对不同的测试指标,本次我们采用固定其他值,动态变化测量值的方式来进行

2.2 准备工作

测试之前,先用linux命令去测试磁盘的读写速度,具体命令如下:

1.测试IO读
    hdparm -t --direct /dev/scda3
2.测试IO写
    sync;/usr/bin/time -p bash -c "(dd if=/dev/zero of=test.dd  bs=1M count=20000)"

磁盘读在163m/s-206m/s之间,而写速度是163m/s。后续评测我们以该磁盘测试为基准来

2.3 测试环境

  1. 硬件资源:3台32C-CPU、32G内存、500G硬盘的虚拟机
  2. 操作系统:Centos7
  3. 程序版本:Kafka(2.11) Flink(1.12.5)

2.4 Kafka参数

zookeeper.version=3.4.5-cdh6.2.0--1

java.version=1.8.0_181

user.dir=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka

Maximum Message Size=1000kb

Segment File Size=1G

Data Retention Time=7天

Data Directories=/var/local/kafka/data

Additional Broker Java Options:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.local.only=true -Djava.rmi.server.hostname=127.0.0.1

3. 测试过程

3.1 producer测试

  • 测试目的:测试kafka producer吞吐量
  • 测试脚本
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=10000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=20000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=40000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=60000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=80000   --throughput 30000
  • 测试结果

  • 测试结论

    • 发现在消息未压缩的前提下,20000条一批次之后吞吐稳定在30000条/s,而数据量在19.65M/s
    • 本次测试对数据的存储块大小未测,但在之前的测试中发现压缩以及解压的情况也是lz4算法最优,==lz4压缩最大时可以达到30w+/s的吞吐,而不压缩为12w/s,snappy最大为16w/s,gzip最大为5.8w/s==;故后续生产消息时建议采用lz4压缩,不仅可以节省磁盘,也可以大幅度增加我们的吞吐

3.2consumer测试

  • 测试目的:测试consumer消费情况
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 1 --hide-header --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 4 --hide-header  --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 7 --hide-header  --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 10 --hide-header  --num-fetch-threads 1
  • 测试结论:在threads为4时,消费速度最好达到24.1w/s,而后续慢慢平稳

4. 测试结果

  • producer方面,在主从同步选取1时性能和稳定性适中,压缩方面,我们选择lz4压缩方式,而批大小我们可以选择100w左右,并发保持在60,消息的大小建议在4k左右较好,分区数在3-5个,副本数为3个既可以保证性能也能维持高可用;
  • consumer的处理线程我们选择4个,抓取消息大小则设置在400w条左右,抓取线程设置为10个即可
  • broker的参数方面,replica.fetcher设置为服务器core的个数时较好,io.threads 则设置为core个数的3倍,network.threads保持和core个数相等即可,interval.messages数设置为2w,interval.ms则设置为10000 ms

5. 测试异常

  1. Kafka集群宕机
    1. 现象:集群监控显示scdh2节点失去联系,查阅日志监控服务读取信息失败,提示空间不足
    2. 解决方案:对Kafka存储空间扩容,并增加监控告警,避免极端情况下大量数据写入影响集群运行

6. 性能优化

  1. Kafka缓冲磁盘扩容

    1. kafka数据目录:scdh02 /var/local/kafka/data
      • 默认数据目录大小57G,增加磁盘 /root/data/kafka/data
  2. Flink集群配置高可用

    1. 修改 /usr/app/flink-1.12.5/conf/flink-conf.yaml

      1. jobmanager.rpc.address: scdh01
    2. 修改 /usr/app/flink-1.12.5/conf/workers

      scdh02
      scdh03
    3. 修改flink-conf.yaml配置文件,增加高可用配置

      state.backend: filesystem
      state.checkpoints.dir: hdfs://scdh02:9000/flink-checkpoints
      state.savepoints.dir: hdfs://scdh02:9000/flink-savepoints
      
      high-availability: zookeeper
      high-availability.storageDir: hdfs://scdh02:9000/flink/ha/
      high-availability.zookeeper.quorum: scdh01:2181,scdh02:2181,scdh03:2181
    4. 重启Flink集群

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐