springboot整合kafka,原理不多说了,道理大家都懂,直接上代码了。

整合前,kafka需要启动,我现在使用的是虚拟机里面的kafka

1.maven配置

<!--kafka支持-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.kafka配置文件

#kafka相关配置
spring.kafka.bootstrap-servers=192.168.43.128:9092
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

kafka的端口号为什么是9093,等下说

3.kafka生产者


/**
 * kafka消息产生者
 */
@RestController
public class KafkaSentController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     * 主题Topic
     */
    @RequestMapping("/kafka")
    public void sendChannelMess(String message){
        kafkaTemplate.send("Topic",message);
    }



}

4.kafka消费者consumern

@Component
public class KafkaConsumer {
    /**
     * 监听Topic主题,有消息就读取
     * @param message
     */
    @KafkaListener(topics = {"Topic"})
    public void receiveMessage(String message){
        //收到通道的消息之后执行秒杀操作
        System.out.println(message);
    }
}

5启动springboot项目

就会发现一直连不上kafka,这个是kafka只允许在一个服务器上面使用,如果是其他服务器调用kafka服务器的话,就调不通

这个需要对kafka配置文件做处理  进入kafka  config文件,编辑server.proper 


# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=INSIDE://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093
inter.broker.listener.name = INSIDE

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=INSIDE://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

这个时间将springboot的配置文件  kafka端口号改成9093,再启动项目就不报错了

server.port=23451
#kafka相关配置
spring.kafka.bootstrap-servers=192.168.43.128:9093
#设置一个默认组
spring.kafka.consumer.group-id=0
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量发送消息的数量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

6.访问,验证结果

http://192.168.43.129:23451/kafka?message=111123456578767564534231111111111111111111111111111111111111111111111111111243546532132434323434434343

 

下面打印结果跟传参一样,springboot集成kafka成功

源码地址:https://github.com/chengda1992/mxlgslcd.git

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐