1.创建父子级项目结构,生产者springboot微服务与消费者springboot微服务。

2.生产者创建一个供与测试的实体Bean

package com.jk.bean;

import java.util.Date;

public class MessageBean {

    private Long id;    //id
    private String msg; //消息
    private Date sendTime;  //时间戳

    @Override
    public String toString() {
        return "MessageBean{" +
                "id=" + id +
                ", msg='" + msg + '\'' +
                ", sendTime=" + sendTime +
                '}';
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

3.生产者创建controller层,创建producer类

   创建一个Gson对象 提供java对象和json字符串之间进行转化

package com.jk.controller;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.jk.bean.MessageBean;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
@Slf4j
public class producer {

   @Autowired
   KafkaTemplate<String,String> kafkaTemplate;

   //创建一个Gson对象 提供java对象和json字符串之间进行转化
   private Gson gson = new GsonBuilder().create();


   public void send() {
   MessageBean message = new MessageBean();
   message.setId(System.currentTimeMillis());
   message.setMsg(UUID.randomUUID().toString());
   message.setSendTime(new Date());

     //topic-ideal为主题
      //send方法 发送消息
   kafkaTemplate.send("test", gson.toJson(message));
     }

}

4.controller层下创建测试启动类写一个测试方法:

package com.jk.controller;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerApplicationTests{
    @Autowired
    private producer kafkaProducer;
    @Test
    public void kafkaProducer(){
        this.kafkaProducer.send();
    }
    @Test
    public void contextLoads() {
    }
}

5.配置yml配置文件


spring:
  kafka:
     bootstrap-servers: 192.168.1.120:9092,192.168.1.120:9093,192.168.1.120:9094
     producer:

       retries: 0


       batch-size: 16384

       buffer-memory: 33554432

       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer

bootstrap-servers: 是自己linux系统ip+kafka端口号,有几个写几个

6.消费者创建controller层,创建KafkaConsumer类接受消息

package com.jk.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

import static org.apache.kafka.common.requests.DeleteAclsResponse.log;

@Component
@Slf4j
public class KafkaConsumer{

    @KafkaListener(topics = {"test"})
   public void consumer(ConsumerRecord<?, ?> record){
   Optional<?> kafkaMessage = Optional.ofNullable(record.value());
   if (kafkaMessage.isPresent()) {
   Object message = kafkaMessage.get();
   log.info("----------------- record =" + record);
   log.info("------------------ message =" + message);
    }
    }
}

7.配置消费者yml配置文件,与生产者配置基本一致:

server:
  port: 9999
spring:
  kafka:
    bootstrap-servers: 192.168.1.120:9093,192.168.1.120:9094,192.168.1.120:9092
    consumer:
      group-id: ideal-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 20000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

8.启动生产者测试方法send

9.启动消费者xshell将会看到消息接收到了

完成.

附上测试项目下载地址:

本项目仅供下载学习:

链接:https://pan.baidu.com/s/1z9a2JDJKpLPL-0lP-4w3Cw 
提取码:vf3t 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐