目录

一、初始化producer对象(序列化消息)

生产者发送消息的三种方式

kafka生产者其它详细知识:

二、初始化consumer对象(反序列化消息)

consumer取消订阅的方式consumer.unsubscribe();

使用自定义的序列化


一、初始化producer对象(序列化消息)

kafka序列化消息是在生产端,序列化后,消息才能网络传输。而构造KafkaProducer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");//
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
//Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
kafkaProducer = new KafkaProducer<>(props);
	    int i=0;
	    while(true){
	    	i++;
	    	JsonObject  p = new JsonObject();
                p .put("id","1")
	    	p .put("name","tester")
	    	System.out.println(p.toString());
// 当指定发送消息的分区时,程序就不会根据key值再判断发往哪个分区了。
           // record = new ProducerRecord<>(TOPIC, 0, String.valueOf(i), messageStr);
	    	ProducerRecord<String, Object> record = new ProducerRecord<String,String>(TOPIC, p.toString());
	    	producer.send(record);
	    
	    }
	       producer.close();
	}

ProducerRecord 构造方法,该方法的参数 topic 和 value 属性是必填项,其余属性(比如:分区号、时间戳、key、headers)是选填项。对应的 ProducerRecord 的构造方法也有多种:使用者可根据场景来选择合适的 ProducerRecord 。

属性key.serializer和value.serializer就是key和value指定的序列化方式。无论是key还是value序列化和反序列化实现都是一样的。

StringSerializer是内置的字符串序列化方式

生产者发送消息的三种方式

Kafka 生产者发送消息有三种方式,分别为:普通发送(发后即忘)、同步发送、异步发送。

kafka生产者其它详细知识:

Kafka基础(二):生产者相关知识汇总

https://blog.csdn.net/CREATE_17/article/details/93396981

二、初始化consumer对象(反序列化消息)

kafka反序列化消息是在消费端。由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。而构造KafkaConsumer代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "10.0.55.229:9092");
props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group1");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 订阅test1 topic   使用subscribe()方法订阅主题 (一般推荐这种方式)
consumer.subscribe(Collections.singletonList("test1"));
// 另一种订阅的方式 使用assign()方法订阅确定主题和分区
//consumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
consumer.seekToBeginning(Arrays.asList(TOPIC_PARTITION));
	while(true){
             ConsumerRecords<String, String> records = consumer.poll(100);
	     System.err.println("print the size of records ,size="+records.count());
	     for(ConsumerRecord<String, String> record:records){
	        System.out.println(record);
	       }
}

属性key.deserializervalue.deserializer就是key和value指定的反序列化方式。

StringDeserializer是内置的字符串反序列化方式

consumer取消订阅的方式consumer.unsubscribe();

可以使用 unsubscribe() 方法来取消主题的订阅

使用方式:consumer.unsubscribe();

unsubscribe()方法即可以取消通过subscribe()方式实现的订阅,还可以取消通过assign()方式实现的订阅

consumer的两种不同订阅方式:subscribe()  和 assign()

使用自定义的序列化

写一个自定义的类,比如说我们要传递一个Person对象,那么我们就定义个Person对象的序列化和反序列化的类,并且实现Serializer接口,下面继续看,首先定义个Person类

public class Person implements Serializable{
 
	private String id ;
	private String name ;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public String toString() {
		return "Person [id=" + id + ", name=" + name + "]";
	}
	
}

接下来,我们自定义一个序列化的类: PersonUtilSerializer

public class PersonUtilSerializer  implements Serializer<Person>{
 
	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
		
	}
 
	@Override
	public byte[] serialize(String topic, Person data) {
		
		return JSON.toJSONBytes(data);
	}
 
	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}
 
}

初始化producer实例对象

props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.java.kafka.PersonUtilSerializer");
Producer<String, Object> producer = new KafkaProducer<String, Object>(props);
	    int i=0;
	    while(true){
	    	i++;
	    	Person p = new Person();
	    	p.setId(i+"");
	    	p.setName("zhangsan-"+i);
	    	System.out.println(p.toString());
	    	ProducerRecord<String, Object> record = new ProducerRecord<String,Object>(TOPIC, p);
	    	producer.send(record);
	    
	    }
	       producer.close();
	}

自定义Serializer和Deserializer非常痛苦,而且上面还有很多异常情况没有处理,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

Kafka中位移提交那些事儿

https://blog.csdn.net/weixin_45039616/article/details/107081148

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐