一:环境准备.

    1.1 Linux云服务器上安装Zookeeper,Kafka.可以参照我的这两篇博客.Zookeeper,Kafka.跟着做,一遍过.

    (注意)如果是开启了防火墙的,做一下端口映射,Kafka是9092端口,Zookeeper是2181端口.Linux云服务器也在做一下安全组配置,确保可以访问到.

    1.2 SpringBoot框架的依赖.

         ①.SpringBoot框架的依赖:1.5.10.RELEASE.

         ②.Kafka依赖.    

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.1.RELEASE</version>
</dependency>

   1.3 分别正确启动Zookeeper,Kafka.还是参照我上面那两篇博客.有非常详细的图文介绍的.

  二:Kafka的配置.

   2.1 application.properties的配置文件.

# 配置kafka服务器
spring.kafka.bootstrap-servers=Linux的IP:PORT
# 配置生产者消息的key和value的编解码方式-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 配置消费者消息的key和value的编解码方式-consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=test-producer-topic
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000

    2.2 配置消息生产者.

          KafkaProducer.key和value是String类型.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * author: 
 * date: 2018-12-27
 * time: 17:34
 * description:Kafka的生产者的配置
 */
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(String message){
        // 指定消息的key
        kafkaTemplate.send("test-producer-topic","test-key", message);
        return "Producer Message Success!";
    }
}

    KafkaTemplate的位置.    

SpringBoot对Kafka的自动配置.

KafkaProperties.

2.3 配置消息消费者.消息监听.

  KafkaConsumer

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * author: 
 * date: 2018-12-27
 * time: 17:46
 * description:Kafka消费者
 */
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "test-producer-topic")
    public void listen(ConsumerRecord<?, ?> record) throws Exception{
        System.out.printf("topic=%s,offset=%d,key=%s,value=%s,serializedKeySize=%s,serializedValueSize\n",record.topic(),record.offset(),record.key(),record.value(),record.serializedKeySize(),record.serializedValueSize());
    }
}

输入监听生产者的消息的topic,offset,key,value,serializedKeySize的大小和serializedValueSize的大小.

启动项目:

访问 http://localhost:8090/dockerboot/kafka/send?message=KafkaMessage

观察控制台:

报错信息如下:2018-12-27 20:10:42.877 [kafka-producer-network-thread | producer-1] ERROR org.springframework.kafka.support.LoggingProducerListener 76  -| Exception thrown when sending a message with key='test-key' and payload='KafkaMessage' to topic test-producer-topic:
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for test-producer-topic-0

解决方法:关闭Kafka,修改Kafka目录下的config的server.properties文件.

重新启动Kafka.

再次访问项目.

控制台结果:

再次执行.消息的偏移量加1.不只是执行一次.数据的分段.消费者不停的poll topic.

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐