SpringBoot整合kafka(九)
SpringBoot整合kafka
·
SpringBoot是一个在JavaEE开发中非常常用的组件,可用于kafka的生产者,也可以用于SpringBoot的消费者
1. 导入jar包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 配置kafka
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.0.120:9092 # 连接kafka
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # key序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer # value序列化
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # key反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value反序列化
enable-auto-commit: true # 自动提交
# group-id: testTopicGroup # 指定消费者组,也可以在方法注解中加组信息
3. 生产者代码
@Component
public class KafkaProducer {
// 引入kafka模板
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${test.topic}")
private String topic;
@Value("${test.key}")
private String key;
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Scheduled(cron = "0/2 * * * * ?") // 通过定时任务循环发送
public void send() {
for (int i = 0; i < 100; i++) {
int finalI = i;
CompletableFuture.runAsync(() -> {
kafkaTemplate.send(topic, key, finalI + "=>" + date());
}, pool);
}
}
private String date() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
}
4. 消费者代码
@Configuration
public class KafkaConsumer {
@KafkaListener(topics = "${test.topic}", groupId = "testTopicGroup")
public void listener(ConsumerRecord<String, String> record) {
int partition = record.partition();
String key = record.key();
String topic = record.topic();
String value = record.value();
System.out.println(date() + ": 分区:" + partition + " | 主体:" + topic + " | 键:" + key + " | 值:" + value);
}
private String date() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
}
更多推荐
已为社区贡献5条内容
所有评论(0)