Kafka传递自定义对象
1、搭好相应的环境(ZK+kafka),保证kafka能正常的发送接收消息2、新建一个工具类,负责对象字节数组的相互转换,传输数据用package com.kafka.util;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import
·
1、搭好相应的环境(ZK+kafka),保证kafka能正常的发送接收消息
2、新建一个工具类,负责对象字节数组的相互转换,传输数据用
package com.kafka.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class BeanUtils {
private BeanUtils(){}
/**
* 对象转字节数组
* @param obj
* @return
*/
public static byte[] ObjectToBytes(Object obj){
byte[] bytes = null;
ByteArrayOutputStream bo = null;
ObjectOutputStream oo = null;
try {
bo = new ByteArrayOutputStream();
oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bytes = bo.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
if(bo!=null){
bo.close();
}
if(oo!=null){
oo.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return bytes;
}
/**
* 字节数组转对象
* @param bytes
* @return
*/
public static Object BytesToObject(byte[] bytes){
Object obj = null;
ByteArrayInputStream bi = null;
ObjectInputStream oi = null;
try {
bi =new ByteArrayInputStream(bytes);
oi =new ObjectInputStream(bi);
obj = oi.readObject();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(bi!=null){
bi.close();
}
if(oi!=null){
oi.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return obj;
}
}
3、定义一个实体(vo,pojo)类。需要继承Serializable接口
package com.kafka.vo;
import java.io.Serializable;
import java.util.Date;
@SuppressWarnings("serial")
public class Member implements Serializable{
private String name;
private int age;
private Date birthday;
private double soruce;
public Member() {
super();
}
public Member(String name, int age, Date birthday, double soruce) {
super();
this.name = name;
this.age = age;
this.birthday = birthday;
this.soruce = soruce;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Date getBirthday() {
return birthday;
}
public void setBirthday(Date birthday) {
this.birthday = birthday;
}
public double getSoruce() {
return soruce;
}
public void setSoruce(double soruce) {
this.soruce = soruce;
}
@Override
public String toString() {
return "Member [name=" + name + ", age=" + age + ", birthday=" + birthday + ", soruce=" + soruce + "]";
}
}
4、自定义一个Encoder继承kafka的kafka.serializer.Encoder的接口
package com.kafka;
import com.kafka.util.BeanUtils;
import com.kafka.vo.Member;
import kafka.utils.VerifiableProperties;
public class ObjectEncoder implements kafka.serializer.Encoder<Member>{
public ObjectEncoder(VerifiableProperties verifiableProperties){
}
@Override
public byte[] toBytes(Member member) { //填写你需要传输的对象
return BeanUtils.ObjectToBytes(member);
}
}
5、Producer的封装发送对象
package com.kafka;
import java.util.Date;
import java.util.Properties;
import com.kafka.vo.Member;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@SuppressWarnings("deprecation")
public class KafkaProducerObject {
public static void main(String[] args) {
String topic = "test"; // 定义要操作的主题
Properties pro = new Properties(); // 定义相应的属性保存
pro.setProperty("zookeeper.connect", "192.168.19.128:2181"); //这里根据实际情况填写你的zk连接地址
pro.setProperty("metadata.broker.list", "192.168.19.128:9092"); //根据自己的配置填写连接地址
pro.setProperty("serializer.class", ObjectEncoder.class.getName()); //填写刚刚自定义的Encoder类
Producer<Integer, Object> prod = new Producer<Integer, Object>(new ProducerConfig(pro));
prod.send(new KeyedMessage<Integer, Object>(topic, new Member("姓名",12,new Date(),12.1))); //测试发送对象数据
}
}
6、Consumer接收消息
package com.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.kafka.util.BeanUtils;
import com.kafka.vo.Member;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerObject {
public static void main(String[] args) {
String topic = "test"; // 定义要操作的主题
Properties pro = new Properties(); // 定义相应的属性保存
pro.setProperty("zookeeper.connect", "192.168.19.128:2181"); //这里根据实际情况填写你的zk连接地址
pro.setProperty("metadata.broker.list", "testlinux:9092"); //根据自己的配置填写连接地址
pro.setProperty("group.id", "group1");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(pro)); // 需要定义一个主题的映射的存储集合
Map<String,Integer> topicMap = new HashMap<String,Integer>() ;
topicMap.put(topic, 1) ; // 设置要读取数据的主题
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMap) ; // 现在只有一个主题,所以此处只接收第一个主题的数据即可
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0) ; // 第一个主题
ConsumerIterator<byte[], byte[]> iter = stream.iterator() ;
while(iter.hasNext()) {
// String msg = new String(iter.next().message()) ;
Member vo = (Member)BeanUtils.BytesToObject(iter.next().message()); //接收消息,并将字节数组转换为对象
System.out.println("接收到消息:" + vo);
}
}
}
打印内容如下:
Producer
INFO | Fetching metadata from broker BrokerEndPoint(0,192.168.19.128,9092) with correlation id 0 for 1 topic(s) Set(test)
INFO | Connected to 192.168.19.128:9092 for producing
INFO | Disconnecting from 192.168.19.128:9092
INFO | Connected to 192.168.19.128:9092 for producing
Consumer:
INFO | Fetching metadata from broker BrokerEndPoint(0,192.168.19.128,9092) with correlation id 0 for 1 topic(s) Set(test)
INFO | Connected to 192.168.19.128:9092 for producing
INFO | Disconnecting from 192.168.19.128:9092
INFO | [ConsumerFetcherThread-group1_zhrtDev-1483671510979-7a70fd79-0-0], Starting
INFO | [ConsumerFetcherManager-1483671511081] Added fetcher for partitions ArrayBuffer([test-0, initOffset 8 to broker BrokerEndPoint(0,192.168.19.128,9092)] )
接收到消息:Member [name=姓名, age=12, birthday=Fri Jan 06 10:58:17 CST 2017, soruce=12.1]
更多推荐
已为社区贡献1条内容
所有评论(0)