搭建简单的springboot2项目

1.maven添 

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

pom.xml配置添加

kafka:
  bootstrap-servers: 服务地址
  consumer:
    group-id: group

2.添加RestController接口用于发送message

 

@RestController
public class TestController {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(String name) {
        kafkaTemplate.send("newtopic", name);
        return name;
    }

}

3添加监听接口用于记录日志

@Component
public class MyConsumer {

    @KafkaListener(topics = "mytopic")
    public void listen(ConsumerRecord<?,String> record) {
        String value = record.value();
        System.out.println("kafka监听的值是——————"+value);
        System.out.println(record);
    }
}

为了测试,我们需要启动多个项目来测试,简单点复制改下名字(记得改下pom.xml中端口设置)

修改demo1的pom.xml

kafka:
  bootstrap-servers: 服务地址
  producer:
    group-id: newgroup
  consumer:
    group-id: mygroup

修改demo2的pom.xml

kafka:
  bootstrap-servers: 服务地址
  producer:
    group-id: newgroup
  consumer:
    group-id: mygroup

完成了,同时启动,调用demo接口

查看日志,demo1有记录,demo2无,多试试几次,发现group_id相同的demo1demo2在收到topic时只能有一个接收到

证明分组概念正确,2同理测试去掉分组,发现demo1,demo2都能取到数据,测试成功

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐