java/kakfa 生产者发送json数据,消费者解析json数据,实战。
发送自定义数据/对象请查看:发送自定义数据/自定义对象一,生产者的的使用:package com.kuxingseng.lbw.mq;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.kafka.
·
发送自定义数据/对象请查看:发送自定义数据/自定义对象,不建议使用耦合性强
一,生产者的的使用:
package com.kuxingseng.lbw.mq;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class MyProducer {
private final static Logger logger = LoggerFactory.getLogger(MyProducer.class);
private String TOPIC = "LBW_EPM";
protected static String IP = "192.168.199.128";
protected static String PORT = "9092";
private Properties properties = getProperties();
private static MyProducer producer = new MyProducer();
private ObjectMapper mapper = new ObjectMapper();
public static MyProducer getInstance() {
if (producer == null) {
producer = new MyProducer();
}
return producer;
}
public void send() {
List<Map<String, String>> list = new ArrayList<>();
Map<String, String> map = new HashMap<>();
map.put("1112", "33333");
map.put("99999", "22222");
list.add(map);
Producer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord record = null;
try {
record = new ProducerRecord<String, String>(TOPIC, mapper.writeValueAsString(list)); //转化为json字符串
} catch (JsonProcessingException e) {
e.printStackTrace();
}
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
logger.info("kafka send successful");
} else {
if (e instanceof RetriableException) {
//处理可重试异常
try {
logger.error("kafka send fail Retry sending.");
Thread.sleep(3000);
MyProducer.getInstance().send();
} catch (InterruptedException e1) {
logger.error("kafka error :", e1);
}
} else {
throw new KafkaException("kafka server message error.");
}
}
}
});
producer.close();
}
public static Properties getProperties() {
Properties props = new Properties();
//集群地址,多个服务器用 逗号 ","分隔
props.put("bootstrap.servers", IP + ":" + PORT);
//key 的序列化,此处以字符串为例,使用kafka已有的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//发送字符串 json数据 可以使用StringSerializer kafka默认序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//使用自定义序列化器 发送自定义数据
// props.put("value.serializer", "com.kuxingseng.lbw.mq.KafkaEntityDataSerializer");
props.put("request.required.acks", "1");
return props;
}
}
二,消费者解析json字符串。
package com.kuxingseng.lbw.mq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class MyConsumer implements Runnable {
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private Properties props = getProperties();
private final String TOPIC = "LBW_EPM";
private ObjectMapper mapper = new ObjectMapper();
@Override
public void run() {
//创建消息者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic1的消息
consumer.subscribe(Arrays.asList(TOPIC));
try {
while (true) {
Thread.sleep(3000);
//到服务器中读取记录
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("value:" + record.value());
if (isJsonArray(record.value())) {
try {
logger.info(mapper.readValue(record.value(), List.class).toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
} catch (InterruptedException e) {
logger.error("MyConsumer error:", e);
} finally {
consumer.close();
}
}
private static Properties getProperties() {
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.199.128:9092");
//必须指定消费者组
props.put("group.id", "DEMO");
//设置数据key和value的序列化处理类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//接受字符串/json字符串 可以使用默认的反序列化 /也可自定义反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// props.put("value.deserializer", "com.kuxingseng.lbw.mq.KafkaEntityDeserializer");
return props;
}
/**
* 判断是是否为json 组
*/
public boolean isJsonArray(String content) {
try {
mapper.readValue(content, List.class);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 判断是是否为json 组
*/
public boolean isJson(String content) {
try {
mapper.readValue(content, Map.class);
return true;
} catch (Exception e) {
return false;
}
}
}
更多推荐
已为社区贡献4条内容
所有评论(0)