java代码kafka初始化producer和consumer
序列化kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:Properties props = new Properties();props.put("bootstrap.servers", "10.0.55.229:9092");props.put("key.serializer", "org.apache.kafka.common.serial
目录
consumer取消订阅的方式consumer.unsubscribe();
一、初始化producer对象(序列化消息)
kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");//
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
//Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
kafkaProducer = new KafkaProducer<>(props);
int i=0;
while(true){
i++;
JsonObject p = new JsonObject();
p .put("id","1")
p .put("name","tester")
System.out.println(p.toString());
// 当指定发送消息的分区时,程序就不会根据key值再判断发往哪个分区了。
// record = new ProducerRecord<>(TOPIC, 0, String.valueOf(i), messageStr);
ProducerRecord<String, Object> record = new ProducerRecord<String,String>(TOPIC, p.toString());
producer.send(record);
}
producer.close();
}
ProducerRecord 构造方法,该方法的参数 topic 和 value 属性是必填项,其余属性(比如:分区号、时间戳、key、headers)是选填项。对应的 ProducerRecord 的构造方法也有多种:使用者可根据场景来选择合适的 ProducerRecord 。
属性key.serializer和value.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的。
StringSerializer是内置的字符串序列化方式
生产者发送消息的三种方式
Kafka 生产者发送消息有三种方式,分别为:普通发送(发后即忘)、同步发送、异步发送。
kafka生产者其它详细知识:
Kafka基础(二):生产者相关知识汇总
https://blog.csdn.net/CREATE_17/article/details/93396981
二、初始化consumer对象(反序列化消息)
kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group1");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 订阅test1 topic 使用subscribe()方法订阅主题 (一般推荐这种方式)
consumer.subscribe(Collections.singletonList("test1"));
// 另一种订阅的方式 使用assign()方法订阅确定主题和分区
//consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION));
while(true){
ConsumerRecords<String, String> records = consumer.poll(100);
System.err.println("print the size of records ,size="+records.count());
for(ConsumerRecord<String, String> record:records){
System.out.println(record);
}
}
属性key.deserializer和value.deserializer就是key和value指定的反序列化方式。
StringDeserializer是内置的字符串反序列化方式
consumer取消订阅的方式consumer.unsubscribe();
可以使用 unsubscribe() 方法来取消主题的订阅
使用方式:consumer.unsubscribe();
unsubscribe()方法即可以取消通过subscribe()方式实现的订阅,还可以取消通过assign()方式实现的订阅
consumer的两种不同订阅方式:subscribe() 和 assign()
使用自定义的序列化
写一个自定义的类,比如说我们要传递一个Person对象,那么我们就定义个Person对象的序列化和反序列化的类,并且实现Serializer接口,下面继续看,首先定义个Person类
public class Person implements Serializable{
private String id ;
private String name ;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person [id=" + id + ", name=" + name + "]";
}
}
接下来,我们自定义一个序列化的类: PersonUtilSerializer
public class PersonUtilSerializer implements Serializer<Person>{
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Person data) {
return JSON.toJSONBytes(data);
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
初始化producer实例对象
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
int i=0;
while(true){
i++;
Person p = new Person();
p.setId(i+"");
p.setName("zhangsan-"+i);
System.out.println(p.toString());
ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p);
producer.send(record);
}
producer.close();
}
自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。
Kafka中位移提交那些事儿
https://blog.csdn.net/weixin_45039616/article/details/107081148
更多推荐
所有评论(0)