从零搭建开发脚手架 Spring Boot集成Kafka实现生产者消费者的多种方式
我们使用。和的代码:@Bean@Bean我们可以使用这个新的来发送Greeting消息:同样,让我们修改和以正确反序列化 Greeting 消息:@Bean// …@Beanspring-kafka JSON 序列化器和反序列化器使用Jackson库,它也是 spring-kafka 项目的可选 Maven 依赖项。所以,让我们将它添加到我们的_pom.xml 中_:监听器代码:JavaConfi
发布空闲消费者事件之间的时间(未收到数据)。
#idle-event-interval:
#是否在初始化期间记录容器配置(INFO 级别)。
#log-container-config:
检查无响应消费者的时间间隔。 如果未指定持续时间后缀,则将使用秒。
#monitor-interval:
乘数应用于“pollTimeout”以确定消费者是否无响应。
#no-poll-threshold:
#轮询消费者时使用的超时。
#poll-timeout:
#侦听器类型。默认single,可选batch
#type: single
方式一(不推荐): 自动创建主题
在配置文件里指定好kafka的topic之后,调用send方法或者KafkaListener指定topic会自动帮我们创建好topic,只是创建的topic默认是1个副本和1个分区的,这一般不能满足我们的要求,所以我们还需要在kafka的server.properties里增加或修改以下参数:
auto.create.topics.enable=true
num.partitions=3
default.replication.factor=3
之后,kafka自动帮我们创建的主题都会包含3个副本和3个分区。
方式二:可以提前运行命令行工具在 Kafka 中创建主题:
$ bin/kafka-topics.sh --create \
–zookeeper localhost:2181 \
–replication-factor 1 --partitions 1 \
–topic mytopic
方式三:随着Kafka中_AdminClient_的引入,我们现在可以以编程方式创建主题。
我们需要添加KafkaAdmin Spring bean,它将自动为NewTopic类型的所有 bean 添加主题:
@Configuration
public class KafkaTopicConfig {
@Value(value = “${kafka.bootstrapAddress}”)
private String bootstrapAddress;
@Bean// 使用 Spring Boot 时,KafkaAdmin会自动注册一个bean,因此您只需要NewTopic @Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
// 主题名称、分区数、副本数 或者使用TopicBuilder
return new NewTopic(“baeldung”, 1, (short) 1);
}
}
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
简单发布消息
我们可以使用_KafkaTemplate_类发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
带有回调发布消息
在发送API返回的ListenableFuture对象可以阻塞发送线程并获取发送消息的结果,线程将等待结果,但它会减慢生产者的速度。
Kafka 是一个快速的流处理平台。因此,最好异步处理结果,以便后续消息不会等待上一条消息的结果。
我们可以通过回调来做到这一点:
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
// 发送成功的处理
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println(“Sent message=[” + message +
“] with offset=[” + result.getRecordMetadata().offset() + “]”);
}
@Override
public void onFailure(Throwable ex) {
// 发送失败的处理
System.out.println(“Unable to send message=[”
- message + "] due to : " + ex.getMessage());
}
});
}
简单消费者
@KafkaListener(topics = “topicName”, groupId = “foo”)
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
我们可以为一个主题实现多个侦听器,每个侦听器都有不同的组 ID。此外,一个消费者可以监听来自不同主题的消息:
@KafkaListener(topics = “topic1, topic2”, groupId = “foo”)
Spring 还支持在侦听器中使用@Header注释检索一个或多个消息头:
@KafkaListener(topics = “topicName”)
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(“Received Message: " + message” + "from partition: " + partition);
}
@KafkaListener
可以接受的参数有:
-
data : 对于data值的类型其实并没有限定,根据KafkaTemplate所定义的类型来决定。 data为List集合的则是用作批量消费。
-
ConsumerRecord:具体消费数据类,包含Headers信息、分区信息、时间戳等
-
Acknowledgment:用作Ack机制的接口
-
Consumer:消费者类,使用该类我们可以手动提交偏移量、控制消费速率等功能
public void listen1(String data)
public void listen2(ConsumerRecord<K,V> data)
public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
public void listen5(List data)
public void listen6(List<ConsumerRecord<K,V>> data)
public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
消费特定分区的消息
对于具有多个分区的主题,@KafkaListener
可以显式订阅具有初始偏移量的主题的特定分区:
@KafkaListener(
topicPartitions = @TopicPartition(topic = “topicName”,
partitionOffsets = {
@PartitionOffset(partition = “0”, initialOffset = “0”),
@PartitionOffset(partition = “3”, initialOffset = “0”)}),
containerFactory = “partitionsKafkaListenerContainerFactory”)
public void listenToPartition(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(“Received Message: " + message” + "from partition: " + partition);
}
由于此侦听器中的initialOffset
已设置为 0,因此每次初始化此侦听器时,将重新使用来自分区 0 和 3 的所有先前消耗的消息。
如果我们不需要设置偏移量,我们可以使用@TopicPartition
注解的partitions
属性,只设置没有偏移量的分区:
@KafkaListener(topicPartitions
= @TopicPartition(topic = “topicName”, partitions = { “0”, “1” }))
为监听器添加消息过滤器
我们可以通过添加自定义过滤器来配置侦听器以使用特定类型的消息。这可以通过将RecordFilterStrategy设置到KafkaListenerContainerFactory来完成:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains(“World”));
return factory;
}
然后我们可以配置一个监听器来使用这个容器工厂:
@KafkaListener(topics = “topicName”, containerFactory = “filterKafkaListenerContainerFactory”)
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
在这个监听器中,所有匹配过滤器的消息都将被丢弃。
前面消息发送和接收都是字符串。但是,我们也可以发送和接收自定义 Java 对象。这需要在ProducerFactory
中配置适当的序列化器,在ConsumerFactory
中配置反序列化器。
让我们看一个简单的 bean 类*,*我们将其作为消息发送:
public class Greeting {
private String msg;
private String name;
}
发送自定义消息
我们使用JsonSerializer。
ProducerFactory
和KafkaTemplate
的代码:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
我们可以使用这个新的KafkaTemplate
来发送Greeting
消息:
kafkaTemplate.send(topicName, new Greeting(“Hello”, “World”));
消费自定义消息
同样,让我们修改ConsumerFactory
和KafkaListenerContainerFactory
以正确反序列化 Greeting 消息:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// …
return new DefaultKafkaConsumerFactory<>(props,new StringDeserializer(), new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
spring-kafka JSON 序列化器和反序列化器使用Jackson库,它也是 spring-kafka 项目的可选 Maven 依赖项。
所以,让我们将它添加到我们的_pom.xml 中_:
com.fasterxml.jackson.core
jackson-databind
监听器代码:
@KafkaListener(
topics = “topicName”,
containerFactory = “greetingKafkaListenerContainerFactory”)
public void greetingListener(Greeting greeting) {
// process greeting message
}
@Configuration
public class Kafka_Config {
@Value(“${kafka.broker.list}”)
public String brokerList;
public static final String topic = “TOPIC_LIN_LIANG”;
public final String groupId = “group.01”;
public Properties customerConfigs() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动位移提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);//自动位移提交间隔时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//消费组失效超时时间
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “latest”);//位移丢失和位移越界后的恢复起始位置
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return props;
}
public Properties producerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 消息缓存
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
Java核心架构进阶知识点
面试成功其实都是必然发生的事情,因为在此之前我做足了充分的准备工作,不单单是纯粹的刷题,更多的还会去刷一些Java核心架构进阶知识点,比如:JVM、高并发、多线程、缓存、Spring相关、分布式、微服务、RPC、网络、设计模式、MQ、Redis、MySQL、设计模式、负载均衡、算法、数据结构、kafka、ZK、集群等。而这些也全被整理浓缩到了一份pdf——《Java核心架构进阶知识点整理》,全部都是精华中的精华,本着共赢的心态,好东西自然也是要分享的
内容颇多,篇幅却有限,这就不在过多的介绍了,大家可根据以上截图自行脑补
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!**
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
Java核心架构进阶知识点
面试成功其实都是必然发生的事情,因为在此之前我做足了充分的准备工作,不单单是纯粹的刷题,更多的还会去刷一些Java核心架构进阶知识点,比如:JVM、高并发、多线程、缓存、Spring相关、分布式、微服务、RPC、网络、设计模式、MQ、Redis、MySQL、设计模式、负载均衡、算法、数据结构、kafka、ZK、集群等。而这些也全被整理浓缩到了一份pdf——《Java核心架构进阶知识点整理》,全部都是精华中的精华,本着共赢的心态,好东西自然也是要分享的
[外链图片转存中…(img-hS7U8eZI-1712811244353)]
[外链图片转存中…(img-GPehA97r-1712811244354)]
[外链图片转存中…(img-op0sn1Zx-1712811244354)]
内容颇多,篇幅却有限,这就不在过多的介绍了,大家可根据以上截图自行脑补
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
更多推荐
所有评论(0)