kafka 0.10.0 producer java代码实现
首先导入包将kafka目录下的libs中的jar包导入用maven建立<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency>写好pro
·
首先导入包
- 将kafka目录下的
libs
中的jar包导入 - 用maven建立
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
写好properties配置文件
一下为项目结构
#kafka集群地址
bootstrap.servers = 192.168.222.131:9092,192.168.222.130:9092,192.168.222.132:9092,192.168.222.133:9092
client.id = testProducer
key.serializer = org.apache.kafka.common.serialization.IntegerSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
然后上代码
package kafka.producer;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerTest extends Thread{
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
/*isAsync同步、异步*/
public ProducerTest(String topic, Boolean isAsync) {
Properties properties = new Properties();
/*加载配置文件*/
try {
properties.load(ProducerTest.class.getClassLoader().getResourceAsStream("conf/kafka.producer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
producer = new KafkaProducer<>(properties);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
* occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
测试代码
package kafka.producer;
public class Main {
public static void main(String[] args) {
ProducerTest test = new ProducerTest("tp_hw", true);
test.start();
}
}
运行结果
message(51215, Message_51215) sent to partition(0), offset(161830) in 3497 ms
message(51216, Message_51216) sent to partition(0), offset(161831) in 3497 ms
message(51217, Message_51217) sent to partition(0), offset(161832) in 3497 ms
message(51218, Message_51218) sent to partition(0), offset(161833) in 3497 ms
message(51219, Message_51219) sent to partition(0), offset(161834) in 3497 ms
message(51220, Message_51220) sent to partition(0), offset(161835) in 3497 ms
message(51221, Message_51221) sent to partition(0), offset(161836) in 3497 ms
message(51222, Message_51222) sent to partition(0), offset(161837) in 3497 ms
message(51223, Message_51223) sent to partition(0), offset(161838) in 3497 ms
message(51224, Message_51224) sent to partition(0), offset(161839) in 3497 ms
message(51225, Message_51225) sent to partition(0), offset(161840) in 3497 ms
message(51226, Message_51226) sent to partition(0), offset(161841) in 3497 ms
message(51227, Message_51227) sent to partition(0), offset(161842) in 3497 ms
message(51228, Message_51228) sent to partition(0), offset(161843) in 3497 ms
.............
更多推荐
已为社区贡献1条内容
所有评论(0)