1.引入依赖

        <!-- 这里kafka版本使用的是2.6.0 -->
        <kafka.version>2.6.0</kafka.version>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${kafka.version}</version>
        </dependency>

2.创建配置

配置application.yaml文件

spring:
  kafka:
    # kafka 服务地址
    bootstrap-servers: ip:9092
    # 消费者配置
    consumer:
      auto-commit-interval: 5000 #自动提交消费位移时间隔时间
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500 #批量消费每次最多消费多少条消息
      enable-auto-commit: true #开启自动提交消费位移
      auto-offset-reset: latest #其他earliest、none
      group-id: kafka.consumer.group #消费者组名称
      client-id: kafka.consumer.client.id #消费者客户端ID
      fetch-max-wait: 400 #最大等待时间
      fetch-min-size: 1 #最小消费字节数
      heartbeat-interval: 3000 #分组管理时心跳到消费者协调器之间的预计时间
      isolation-level: read_committed
      topic-name: huachun
    # 生产者配置
    producer:
      batch-size: 16384 #批次大小,默认16k
      acks: -1 #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息
      retries: 3 #重试次数
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      buffer-memory: 33554432 #缓冲区大小,默认32M
      client-id: kafka.producer.client.id #客户端ID
      compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4
      properties:
        retry.backoff.ms: 100 #重试时间间隔,默认100
        linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间
        max.request.size: 1048576 #默认1MB,表示发送消息最大值
        connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接
        receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用操作系统默认值
        send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用操作系统默认值
        request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间
      topic-name: huachun

3.创建生产者服务类

1.生产者接口

package com.hhmt.delivery.service;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.messaging.Message;

import java.util.concurrent.ExecutionException;

/**
 * 辉煌明天
 * FileName: ProducerService
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2022/1/24 18:26
 * Description:
 */
public interface ProducerService {

    /**
     * 发送同步消息
     * @param topic
     * @param data
     * @throws ExecutionException
     * @throws InterruptedException
     */
    void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException;

    /**
     * 发送普通消息
     * @param topic
     * @param data
     */
    void sendMessage(String topic, String data);

    /**
     * 发送带附加信息的消息
     * @param record
     */
    void sendMessage(ProducerRecord<String, String> record);

    /**
     * 发送Message消息
     * @param message
     */
    void sendMessage(Message<String> message);

    /**
     * 发送带key的消息
     * @param topic
     * @param key
     * @param data
     */
    void sendMessage(String topic, String key, String data);

    /**
     * 发送带key和分区的消息
     * @param topic
     * @param partition
     * @param key
     * @param data
     */
    void sendMessage(String topic, Integer partition, String key, String data);

    /**
     * 发送有分区,当前时间,key的消息
     * @param topic
     * @param partition
     * @param timestamp
     * @param key
     * @param data
     */
    void sendMessage(String topic, Integer partition, Long timestamp, String key, String data);

}

 2.生产者实现类

package com.hhmt.delivery.service.impl;

import com.hhmt.delivery.service.ProducerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.concurrent.ExecutionException;

/**
 * 辉煌明天
 * FileName: ProducerServiceImpl
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2022/1/24 18:27
 * Description:
 */
@Service
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);

    @Override
    public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException {
        SendResult<String, String> sendResult = kafkaTemplate.send(topic, data).get();
        RecordMetadata recordMetadata = sendResult.getRecordMetadata();
        logger.info("发送同步消息成功!发送的主题为:{}", recordMetadata.topic());
    }

    @Override
    public void sendMessage(String topic, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }

    @Override
    public void sendMessage(ProducerRecord<String, String> record) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(Message<String> message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(String topic, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
        future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage()));
    }

    @Override
    public void sendMessage(String topic, Integer partition, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }

    @Override
    public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, timestamp, key, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("发送消息失败!失败原因是:{}", throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                RecordMetadata metadata = sendResult.getRecordMetadata();
                logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition());
            }
        });
    }
}

4.生产者发送消息测试

1.创建topic

bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic huachun --bootstrap-server localhost:9092

2.向指定topic发送消息

package com.hhmt.delivery;

import com.hhmt.delivery.service.ProducerService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

/**
 * 辉煌明天
 * FileName: TestKafka
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2022/1/24 18:45
 * Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestKafka {

    @Autowired
    private ProducerService producerService;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${spring.kafka.producer.topic-name}")
    private String topicName;

    @Test
    public void cotextLoads() {
        producerService.sendMessage(topicName, "springboot");
    }

    @Test
    public void sendMessage1() {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, 0, System.currentTimeMillis(), "topic-key", "测试");
        producerRecord.headers().add("user", "zhangsan".getBytes());
        producerService.sendMessage(producerRecord);
    }

    @Test
    public void sendMessage2() {
        String event = "测试";
        Map<String, Object> map = new HashMap<>();
        map.put("user", "zhangsan");
        MessageHeaders headers = new MessageHeaders(map);
        Message<String> message = MessageBuilder.createMessage(event, headers);
        kafkaTemplate.setDefaultTopic(topicName);
        producerService.sendMessage(message);
    }

}

1.出现的问题

        此时发送kafka消息会出现一个问题

         定位后发现使用的是云主机,有内网IP和外网IP,虚拟机对外ip[暴露的ip]和真实ip[ifconfig显示的ip]可能只是映射关系,用户访问对外ip时,OpenStack会转发到对应的真实ip实现访问。但此时如果 Kafka server.properties配置中的listeners=PLAINTEXT://对外IP:9092中的ip配置为[对外ip]的时候无法启动,因为socket无法绑定监听

2.解决方案

        在kafka的server.properties中添加如下内容

listeners=PLAINTEXT://内网:9092
advertised.host.name=外网ip或者域名
advertised.listeners=PLAINTEXT://外网ip或者域名:9092

 重启kafka服务测试,消息发送成功...

参考原文:关于kafka的Cannot assign requested address_APTX4869_YXW的博客-CSDN博客

参考原文:kafka无法启动,Cannot assign requested address._MysticalYcc的博客-CSDN博客

5.消费者

1.消费指定topic消息

package com.hhmt.delivery;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 辉煌明天
 * FileName: TestCustomerKafka
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2022/1/25 10:42
 * Description:
 */
@Component
@Slf4j
public class TestCustomerKafka {

    @KafkaListener(topics = {"${spring.kafka.consumer.topic-name}"})
    public void listenerMessage(ConsumerRecord<String, String> record) {
        log.info("接收到kafka消息键为:{},消息值为:{},消息头为:{},消息分区为:{},消息主题为:{}", record.key(), record.value(), record.headers(), record.partition(), record.topic());
    }

}

2.测试消费消息

package com.hhmt.delivery.task;

import com.hhmt.delivery.service.ProducerService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 辉煌明天
 * FileName: KafkaTask
 * Author:   huachun
 * email: huachun_w@163.com
 * Date:     2022/1/25 11:07
 * Description:
 */
@Configuration
@EnableScheduling
public class KafkaTask {

    @Autowired
    private ProducerService producerService;

    @Value("${spring.kafka.producer.topic-name}")
    private String topicName;

    @Scheduled(cron = "*/1 * * * * *")
    public void cotextLoads() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        String date = sdf.format(new Date());

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, 0, System.currentTimeMillis(), "topic-key" + date, "测试" + date);
        producerRecord.headers().add("user", "zhangsan".getBytes());
        producerService.sendMessage(producerRecord);
    }

}

说明:通过定时任务每秒钟发送一个消息,kafka消费者可以监听到。启动服务查看日志

在服务器使用客户端也可以收到发送的消息

bin/kafka-console-consumer.sh --topic huachun --from-beginning --bootstrap-server ip:9092

常见问题:

1.消息无法正常发送

提示: Failed to construct kafka producer

通过打断点发现 class com.fasterxml.jackson.databind.ser.std.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer

此时发现配置中的序列化是有问题的,没有使用kafka序列化类

通过调整序列化类,问题解决,注意在配置类中的序列化和配置文件中的保持一致

import org.apache.kafka.common.serialization.StringSerializer

最开始使用的是配置文件配置kafka生产者,后来发现这个配置并没有生效,所以使用下面的配置类(为什么没生效有待研究,可能是不同版本之间的参数不同,导致没有引入正确的配置)

package com.hhmt.delivery.config;

import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author huachun
 * @Email huachun_w@163.com
 * @CreateTime 2024-04-25 10:40
 * @Description
 */
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

原文参考:Springboot整合kafka实现高效的消息传递和处理_kafka 發送接收消息方式-CSDN博客

2.发送自定义对象失败

异常信息:Can't convert value of class com.hhmt.delivery.pojo.model.DelayDataDTO to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

解决这个问题可以使用如下方法,将对象当做json发送,

kafkaTemplate.send(topic, delayDataDTO.toString());
package com.hhmt.delivery.pojo.model;

import com.alibaba.fastjson.JSON;
import lombok.Data;

/**
 * @Author huachun
 * @Email huachun_w@163.com
 * @CreateTime 2024-04-24 15:15
 * @Description
 */
@Data
public class DelayDataDTO {

    private String url;
    private Long startTime;
    private Long endTime;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐