1、idea中开发kafka并发测试需要的扩展包

由于jmeter的jms模块并不支持kafka,因此需要扩展开发
引入jmeter

        <dependency>
            <groupId>org.apache.jmeter</groupId>
            <artifactId>ApacheJMeter_core</artifactId>
            <version>5.4.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.jmeter</groupId>
            <artifactId>ApacheJMeter_java</artifactId>
            <version>5.4.1</version>
            <scope>provided</scope>
        </dependency>

编写向kafka发送消息的核心逻辑,继承AbstractJavaSamplerClient

public class JmeterTest extends AbstractJavaSamplerClient {

    private static final Logger log = LoggerFactory.getLogger(JmeterTest.class);

    private KafkaProducer producer;

    private Integer dataSize;

    private Integer dataNum;

    private String topic;

    // SampleResult类封装了从入口样本返回的各种信息
    private SampleResult results;

    /**
     * 创建生产者实例
    */
    private void init() {
        if (producer == null) {
            // ...
            Map<String, Object> kafkaProperites = ConfigUtil.getProducerParams(openSsl, "jmeter-test");
            producer = new KafkaProducer(kafkaProperites);
        }
    }

    @Override
    public Arguments getDefaultParameters() {
        // 声明定义Arguments类
        Arguments params = new Arguments();
        // 添加一个新参数
        params.addArgument("dataSize", "4608");
        params.addArgument("dataNum", "1");
        params.addArgument("topic", "test_topic_1");
        return params;
    }

    @Override
    public void setupTest(JavaSamplerContext jsc) {
        // 定义SampleResult类
        results = new SampleResult();
        // 以String形式获取指定参数的值,或者如果未指定该值,则返回指定的默认值
        dataSize = jsc.getIntParameter("dataSize", 4608);
        dataNum = jsc.getIntParameter("dataNum", 1);
        topic = jsc.getParameter("topic", "test_topic_1");
    }


    @Override
    public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
        init();
        ProducerRecord<String, byte[]> record = getRecord();
        results.sampleStart();
        for (int i = 0; i < dataNum; i++) {
            producer.send(record);
        }

        results.sampleEnd();
        //由于kafka的错误基本上都是警告,没法通过异常捕获知道,这里直接返回成功了,真正是否成功还得通过日志定位, 待优化
        results.setSuccessful(true);
        results.setResponseData("send " + dataNum + " records to " + topic, StandardCharsets.UTF_8.name());
        producer.close();
        return results;
    }
	// 构造dataSize大小的随机数据
    private ProducerRecord<String, byte[]> getRecord() {
        byte[] payload = null;
        Random random = new Random(0);
        if (dataSize != null) {
            payload = new byte[dataSize];
            for (int i = 0; i < payload.length; ++i)
                payload[i] = (byte) (random.nextInt(26) + 65);
        }

        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, payload);
        return record;
    }
}

2、在windows环境中用可视化页面配置

添加线程组,组下面添加取样器-》Java请求
在这里插入图片描述
配置java请求的自定义类路径
在这里插入图片描述
将配置好的线程组保存成jmx文件

3、上传至linux环境中执行

问题:执行jmeter报没有权限
在这里插入图片描述
给整个bin目录赋执行权限即可
在这里插入图片描述
然后执行sh ./jmeter.sh -v,显示jmeter的banner就ok啦
在这里插入图片描述

执行我们配好的jmx文件
sh jmeter.sh -n -t kafka-producer.jmx -l result.jtl
在这里插入图片描述

4、收集结果

回到windows系统,在jmeter中添加一个“聚合报告”,打开result.jtl
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐