关于基础搭建:请参看

https://blog.csdn.net/zhang_zha_zha/article/details/75041047

首先上依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

然后是Config:

需要注意的是前方的注解:

生产者:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig
{

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    public Map<String, Object> producerConfigs()
    {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory()
    {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate()
    {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

然后消费者

import cn.istarfinancial.platformcommon.sms.chuanglan.biz.ChuangLanBiz;
import cn.istarfinancial.platformcommon.sms.chuanglan.dto.MessageInfo;
import cn.istarfinancial.platformcommon.sms.chuanglan.dto.SmsVariableResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

@Configuration
@EnableKafka
@Slf4j
public class SmsConsumer
{

    @Autowired
    private ChuangLanBiz chuangLanBiz;

    @KafkaListener(topics = {"platform-sms"})
    public void listen(ConsumerRecord<?, ?> record)
    {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent())
        {
            Object message = kafkaMessage.get();
            MessageInfo messageInfo = JSON.parseObject(message.toString(),MessageInfo.class);
            String params = getParams(new StringBuffer(),messageInfo.getParams()).toString();
            SmsVariableResponse smsVariableResponse = chuangLanBiz.getSmsVariable(messageInfo.getTemplet(),params);
            log.info("----sms 回执----" + smsVariableResponse);
        }
    }

    public StringBuffer getParams(StringBuffer stringBuffer,String[] strs)
    {
        Arrays.stream(strs).forEach(str -> stringBuffer.append(str+","));
        return stringBuffer.append(";");
    }
}

具体使用:

 @Autowired private KafkaTemplate<String, String> kafkaTemplate;
    /**
     * 1.通过kafka发送短信
     */
    @Override
    public BaseResponse<Object> sendMsg(MessageInfo msg)
    {

        log.info("message = {}", JSON.toJSONString(msg));
        try
        {
            kafkaTemplate.send("platform-sms", JSON.toJSONString(msg));
            return new BaseResponse<Object>(ResponseEnum.SUCCESS);
        }
        catch (Exception e)
        {
            log.info("短信消息发送失败,异常信息为:" + e);
            return new BaseResponse<Object>(ResponseEnum.ERROR);
        }
    }

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐