SpringBoot集成Kafka的简单教程
一、引入相关的Jar包(maven)项目构建工具使用的是maven:<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocati
一、引入相关的Jar包(maven)
项目构建工具使用的是maven:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、搭建步骤
1.添加配置文件
application.properties的配置代码如下:
spring.application.name=springboot-kafka-02
server.port=8080
# 用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.42.21:9092
# producer用到的key和value的序列化类
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化类
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-consumer-02
# 是否自动动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
2.添加相关的项目包结构
3.添加SpringBoot启动主类
package com.kafka.learn;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
4.添加Kakfa配置信息类
package com.kafka.learn.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
/**
* 第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
* 当broker个数为1个时会创建topic失败,
* 只有在集群中才能使用kafka的备份功能
* @return
*/
@Bean
public NewTopic topic1() {
return new NewTopic("topic1", 5, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic2", 3, (short) 1);
}
}
5.添加Kakfa消息生产者
在Controller层添加相关的消息生产接口,主要有同步发送、异步发送
同步发送Kafka消息生成者的业务接口设计:
package com.kafka.learn.controller;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.ExecutionException;
@RestController
@RequestMapping("/kafka/sync")
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping(value = "/send/{message}", method = RequestMethod.GET)
public String sendSync(@PathVariable("message") String message) {
ListenableFuture future = template.send(new ProducerRecord<Integer, String>(
"topic-spring-02",
0,
1,
message
));
try {
// 同步等待broker的响应
Object o = future.get();
SendResult<Integer, String> result = (SendResult<Integer, String>) o;
System.out.println(result.getRecordMetadata().topic()
+ result.getRecordMetadata().partition()
+ result.getRecordMetadata().offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
@RequestMapping(value = "/{topic}/send", method = RequestMethod.GET)
public void sendMessageToTopic(@PathVariable("topic") String topic,
@RequestParam(value = "partition", defaultValue = "0") int partition) {
System.out.println("开发发送消息给kafka:" + topic);
template.send(topic, partition, partition, "你好,kafka");
}
}
异步发送Kafka消息生成者的业务接口设计:
package com.kafka.learn.controller;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka/async")
public class KafkaAsyncProducerController {
@Autowired
private KafkaTemplate<Integer, String> template;
@RequestMapping("/send/{message}")
public String asyncSend(@PathVariable("message") String message) {
ProducerRecord<Integer, String> record = new ProducerRecord<>(
"topic-spring-02",
0,
3,
message
);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
// 添加回调,异步等待响应
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("发送成功:" +
result.getRecordMetadata().topic() + "\t"
+ result.getRecordMetadata().partition() + "\t"
+ result.getRecordMetadata().offset());
}
});
return "success";
}
}
6.添加Kakfa消息消费者
package com.kafka.learn.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class MyConsumer {
@KafkaListener(topics = "topic-spring-02")
public void onMessage(ConsumerRecord<Integer, String> record) {
Optional<ConsumerRecord<Integer, String>> optional =
Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(
record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
/**
* id : 消费者监听容器
* topicPartitions : 配置topic和分区,监听2个topic,分别为:topic1、topic2,topic1只接手分区0,3的消息,topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
*
* @param record
*/
@KafkaListener(id = "listen01",
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0", "3" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
})
public void listen(ConsumerRecord<?, ?> record) {
System.out.println("topic" + record.topic());
System.out.println("key:" + record.key());
System.out.println("value:"+record.value());
}
}
三、测试结果
测试kafka消息的同步发送:接口 --- http://localhost:8080/kafka/sync/send/你好,kafka
后端消费结果:
2021-11-12 10:54:20.143 INFO 20160 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-12 10:54:20.144 INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-12 10:54:20.150 INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 6 ms
2021-11-12 10:54:20.186 INFO 20160 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.42.21:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2021-11-12 10:54:20.203 INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1636685660203
2021-11-12 10:54:20.213 INFO 20160 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: BNUXbtH2RYmLyhUrarFpBw
topic-spring-02 0 1 1 你好,kafka
topic-spring-0201
测试kafka消息的异步发送:接口 --- http://localhost:8080/kafka/async/send/你好async,kafka
后端消费结果:
开发发送消息给kafka:topic1
topictopic1
key:3
value:你好,kafka
测试发送到消费者消费外的分区:http://localhost:8080/kafka/sync/topic1/send?partition=4
后端消费结果(无消费日志):
开发发送消息给kafka:topic1
测试kafka消息的指定分区消费的topic1的同步发送:
接口 --- http://localhost:8080/kafka/sync/topic2/send?partition=0
后端消费结果:
开发发送消息给kafka:topic2
topictopic2
key:0
value:你好,kafka
总结
SpringBoot集成Kafka很简单,相关的配置信息很多都已经被Kafka相关的项目组设置好,我们只要配置一些必须的参数,即可完成对Kafka的集成。当然要是需要对Kafka的发送和消费做业务上的限制,就需要我们去做二次封装了,比如:使用自定义的序列化方式,Kafka的生成者使用自定义的封装对象--- 限制发送的内容等等。
Kafka简单集成使用的是单机场景下的配置,若考虑集群情况下,还需要考虑一下kafka相关的配置参数,需要按自己的业务需求去调整。
更多推荐
所有评论(0)