kafka整合短信发送
关于基础搭建:请参看https://blog.csdn.net/zhang_zha_zha/article/details/75041047首先上依赖:<dependency><groupId>org.springframework.kafka</groupId><artif...
·
关于基础搭建:请参看
https://blog.csdn.net/zhang_zha_zha/article/details/75041047
首先上依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后是Config:
需要注意的是前方的注解:
生产者:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig
{
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
public Map<String, Object> producerConfigs()
{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate()
{
return new KafkaTemplate<String, String>(producerFactory());
}
}
然后消费者
import cn.istarfinancial.platformcommon.sms.chuanglan.biz.ChuangLanBiz;
import cn.istarfinancial.platformcommon.sms.chuanglan.dto.MessageInfo;
import cn.istarfinancial.platformcommon.sms.chuanglan.dto.SmsVariableResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@Configuration
@EnableKafka
@Slf4j
public class SmsConsumer
{
@Autowired
private ChuangLanBiz chuangLanBiz;
@KafkaListener(topics = {"platform-sms"})
public void listen(ConsumerRecord<?, ?> record)
{
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent())
{
Object message = kafkaMessage.get();
MessageInfo messageInfo = JSON.parseObject(message.toString(),MessageInfo.class);
String params = getParams(new StringBuffer(),messageInfo.getParams()).toString();
SmsVariableResponse smsVariableResponse = chuangLanBiz.getSmsVariable(messageInfo.getTemplet(),params);
log.info("----sms 回执----" + smsVariableResponse);
}
}
public StringBuffer getParams(StringBuffer stringBuffer,String[] strs)
{
Arrays.stream(strs).forEach(str -> stringBuffer.append(str+","));
return stringBuffer.append(";");
}
}
具体使用:
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
/**
* 1.通过kafka发送短信
*/
@Override
public BaseResponse<Object> sendMsg(MessageInfo msg)
{
log.info("message = {}", JSON.toJSONString(msg));
try
{
kafkaTemplate.send("platform-sms", JSON.toJSONString(msg));
return new BaseResponse<Object>(ResponseEnum.SUCCESS);
}
catch (Exception e)
{
log.info("短信消息发送失败,异常信息为:" + e);
return new BaseResponse<Object>(ResponseEnum.ERROR);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)