1.安装kafka
1)安装zk和kafka镜像

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

2)启动zk镜像生成容器

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

3)启动kafka镜像生成容器

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=xxxx:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=xxxx:2181 配置zookeeper管理kafka的路径xxxx:2181

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 把kafka的地址端口注册给zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

x x x为ip地址在云服务器上如果想让外部网络的java端访问的话该地址为公网地址,并且需要在vi /etc/hosts中添加公网ip 才能访问,这里是个云服务环境使用的大坑。

4)#进入kafka终端

docker exec -it kafka /bin/bash

5)#进入kafka相应目录

cd opt/kafka_2.13-2.8.1/bin

6)#发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

7)#接收消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

按照步骤4–5新启动一个消费者实例去消费消息。

--from-beginning  :从最开始的消息进行消费。

2.SpringBoot集成Kafka
1)pom文件

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2)yaml配置文件

spring:
  kafka:
    bootstrap-servers: kafka部署服务的ip地址:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 5
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3)生产者代码

@Component
public class KafkaProducer {
private static final Logger log=LoggerFactory.getLogger(KafkaProducer.class);

@Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public void send2(String msg){
        kafkaTemplate.send("test3",msg);
    }

}

4)消费者代码

@Component
public class KafkaConsumer {
    private static final Logger log= LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(groupId = "3",topics = "test3")
    public void listen(String msg){
        log.info("接收消息:"+msg);
    }
	
	@KafkaListener(topics = "topic_test", groupId ="1")
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
    }
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐