java集成kafka 0.10版本
一、添加maven依赖<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>
·
一、添加maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.2</version>
</dependency>
二、java代码
(1)生产者
package com.ldy.kafka.service;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;
/**
* @类名: KafkaMsgProducer<br>
* @描述: kafka消息生产者(0.10.x版本)<br>
* @创建者: lidongyang<br>
* @修改时间: 2018年7月13日 下午5:07:40<br>
*/
@Service
public class KafkaMsgProducer {
private static Properties props = new Properties();
private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";
static {
props.put("bootstrap.servers", servers);
props.put("acks", "all");
props.put("retries ", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
/**
* @方法名: sendMsg<br>
* @描述: 发送消息<br>
* @创建者: lidongyan<br>
* @修改时间: 2018年7月13日 下午5:05:52<br>
* @param topic
* @param msg
*/
public static void sendMsg(String topic, String msg) {
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topic, msg));
} catch (Exception e) {
e.printStackTrace();
} finally {
if(null != producer) {
producer.close();
}
}
}
/**
* @方法名: sendMsg<br>
* @描述: 发送消息 K,V格式<br>
* @创建者: lidongyang<br>
* @修改时间: 2018年7月13日 下午5:09:52<br>
* @param topic
* @param key
* @param value
*/
public static void sendMsg(String topic, String key, String value) {
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topic, key, value));
} catch (Exception e) {
e.printStackTrace();
} finally {
if(null != producer) {
producer.close();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
sendMsg("test_topic", "content" + i);
}
}
}
(2)消费者
package com.ldy.kafka.service;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @类名: KafkaMsgConsumer<br>
* @描述: kafka消息消费者(0.10.x版本)<br>
* @创建者: lidongyang<br>
* @修改时间: 2018年7月14日 上午10:35:23<br>
*/
public class KafkaMsgConsumer {
private static Properties props = new Properties();
private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";
static {
props.put("bootstrap.servers", servers);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
/**
* @方法名: receiveMsg<br>
* @描述: 接收消息<br>
* @创建者: lidongyang<br>
* @修改时间: 2018年7月14日 上午10:44:18<br>
* @param topic
* @param groupId
*/
public static void receiveMsg(String topic, String groupId) {
if (StringUtils.isNotBlank(groupId)) {
props.put("group.id", groupId);
}
try {
@SuppressWarnings("resource")
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 配置topic
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 得到ConsumerRecords实例
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 直接通过record.offset()得到offset的值
System.out.println("offset = " + record.offset());
System.out.println("key = " + record.key() + ",value=" + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String topic = "test_topic_ldy";
String groupId = "group1";
receiveMsg(topic, groupId);
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)