1.导入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.添加配置application.yml:

spring:
  application:
    name: demo

  kafka:
    bootstrap-servers: 43.136.16.123:9092,43.136.16.123:9093,43.136.16.123:9094
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #      value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer: # consumer消费者
      group-id: javagroup # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)

      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.编写kafka消息生产者:

package com.study.kafka.controller;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 简单生产者
     * @param msg
     */
    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        kafkaTemplate.send("topic3", msg);
    }


    /**
     * 带回调的生产者
     * @param callbackMessage
     */
    @GetMapping("/kafka/callbackOne/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic3", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();

            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }

    /**
     * 带回调的生产者
     * @param callbackMessage
     */
    @GetMapping("/kafka/callbackTwo/{message}")
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic3", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}


4.编写kafka消息接收者:

package com.study.kafka.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    /**
     * 消费监听
     * @param record
     */
    @KafkaListener(topics = {"topic3"})
    public void onMessage1(ConsumerRecord<?, ?> record){

        /**
         * 消费的哪个topic、partition的消息,打印出消息内容
         * record.topic()  交换机
         * record.partition() 分区
         * record.value() 消息
         */
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}

5.创建topic:

在kafka根目录执行以下命令创建topic

bin/kafka-topics.sh --create --zookeeper 10.0.12.16:2181,10.0.12.16:2182,10.0.12.16:2183 --replication-factor 2 --partitions 2 --topic topic3

6.查看topic:

bin/kafka-topics.sh --list --zookeeper 10.0.12.16:2181,10.0.12.16:2182,10.0.12.16:2183

在这里插入图片描述

7.测试:

浏览器访问:http://localhost:8080/kafka/test/HelloKafka
结果:
在这里插入图片描述
可以看到,消息已被成功消费。

想要了解更加详细的内容,请访问:https://blog.csdn.net/yuanlong122716/article/details/105160545

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐