首先导入包

  1. 将kafka目录下的libs中的jar包导入
  2. 用maven建立
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>

写好properties配置文件
一下为项目结构
这里写图片描述

#kafka集群地址
bootstrap.servers = 192.168.222.131:9092,192.168.222.130:9092,192.168.222.132:9092,192.168.222.133:9092
client.id = testProducer
key.serializer = org.apache.kafka.common.serialization.IntegerSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer

然后上代码

package kafka.producer;

import java.io.IOException;
import java.util.Properties;

import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerTest extends Thread{
     private final KafkaProducer<Integer, String> producer;
        private final String topic;
        private final Boolean isAsync;

/*isAsync同步、异步*/
        public ProducerTest(String topic, Boolean isAsync) {
            Properties properties = new Properties();
            /*加载配置文件*/
            try {
properties.load(ProducerTest.class.getClassLoader().getResourceAsStream("conf/kafka.producer.properties"));
            } catch (IOException e) {

                e.printStackTrace();
            }
            producer = new KafkaProducer<>(properties);
            this.topic = topic;
            this.isAsync = isAsync;
        }

        public void run() {
            int messageNo = 1;
            while (true) {
                String messageStr = "Message_" + messageNo;
                long startTime = System.currentTimeMillis();
                if (isAsync) { // Send asynchronously
                    producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
                } else { // Send synchronously
                    try {
                        producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                ++messageNo;
            }
        }
    }

    class DemoCallBack implements Callback {

        private final long startTime;
        private final int key;
        private final String message;

        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }

        /**
         * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
         *                  occurred.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }


}

测试代码

package kafka.producer;

public class Main {

    public static void main(String[] args) {
        ProducerTest test = new ProducerTest("tp_hw", true);
        test.start();
    }
}

运行结果

message(51215, Message_51215) sent to partition(0), offset(161830) in 3497 ms
message(51216, Message_51216) sent to partition(0), offset(161831) in 3497 ms
message(51217, Message_51217) sent to partition(0), offset(161832) in 3497 ms
message(51218, Message_51218) sent to partition(0), offset(161833) in 3497 ms
message(51219, Message_51219) sent to partition(0), offset(161834) in 3497 ms
message(51220, Message_51220) sent to partition(0), offset(161835) in 3497 ms
message(51221, Message_51221) sent to partition(0), offset(161836) in 3497 ms
message(51222, Message_51222) sent to partition(0), offset(161837) in 3497 ms
message(51223, Message_51223) sent to partition(0), offset(161838) in 3497 ms
message(51224, Message_51224) sent to partition(0), offset(161839) in 3497 ms
message(51225, Message_51225) sent to partition(0), offset(161840) in 3497 ms
message(51226, Message_51226) sent to partition(0), offset(161841) in 3497 ms
message(51227, Message_51227) sent to partition(0), offset(161842) in 3497 ms
message(51228, Message_51228) sent to partition(0), offset(161843) in 3497 ms

.............
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐