一、spirng boot 集成这里跳过了

简单入门例子:https://www.jianshu.com/p/d24bceea7665

二、pom文件

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.2.RELEASE</version>
</dependency>

三、spring boot配置文件
我采用了yml配置
部分配置如下:

 spring:
     kafka: 
            bootstrap-servers: 192.168.133.128:9092
            producer: 
                #value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
            consumer: 
                group-id: test1
                #保证每个组一个消费者消费同一条消息,若设置为earliest,那么会从头开始读partition(none)
                auto-offset-reset: latest

更多可以配置的属性参考 KafkaProperties 。

四、消费者监听

package com.st.kafka.consumer;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
@Component
public class Listener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @KafkaListener(topics = {"test"},id="t2")
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("kafka的key: " + record.key());
        List<String> list=(List<String>) JSONObject.parse(record.value().toString());
        for(int i=0;i<list.size();i++){
            logger.info("kafka遍历: " + list.get(i));
        }
        logger.info("kafka的value: " + record.value().toString());
    }
}

五、简单测试生产者

package com.st.kafka.controller;

import java.util.ArrayList;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSONObject;
import com.st.base.dto.BaseResult;
import com.st.base.util.ResultUtil;

import io.swagger.annotations.ApiOperation;

@RestController
@RequestMapping("/kafka")
public class KafkaController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @ApiOperation("测试卡夫卡生产者")
    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public BaseResult sendKafka(@RequestParam String key,@RequestParam String message) {
        try {
            List<String> list=new ArrayList<String>();
            for(long i=0;i<1000;i++){
                list.add(i+"");
            }

            logger.info("kafka的消息={}", list);
            String temp=JSONObject.toJSONString(list);
            kafkaTemplate.send("test","key501", temp);
            logger.info("发送kafka成功.");
            return ResultUtil.success("SUCCESS","发送kafka成功");
        } catch (Exception e) {
            logger.error("发送kafka失败", e);
            return ResultUtil.success("FAIL","发送kafka失败");
        }
    }

    @KafkaListener(id = "t1", topics = "test")
    public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
    }
}

使用kafkaTemplate send 直接传obj 会报错 求各位大神指点解决方案

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐