kafka发送数据与接收数据(发送数据-可不同系统使用)
kafka发送数据第一步添加jar包<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.
·
kafka发送数据
(注: 这是在已搭建好的kafka中进行的,如果有kafka的IP地址你不用在自己电脑上搭建
第一步添加jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
第二步 yml配置kafka的路径
注:你访问的kafka是否有权限,没有权限你是连不进去的会包超时错误。还有注意格式
spring:
kafka:
bootstrap-servers: localhost
第三步 写一个数据类 TopicMessageDTO
package com.jianxun.nash.mq.dto;
import lombok.Data;
/**
* @author Carl Lee
* @date 2020-01-04 9:03
*/
@Data
public class TopicMessageDTO<T> {
/**
* topic name
*/
private String topicName;
/**
* 数据类型
*/
private String messageType;
/**
* 数据产生的时间
*/
private String timestamp;
/**
* 数据更新代码 C:created U: Updated D:Deleted
*/
private String actionCode;
/**
* 业务逻辑key值
*/
private String keyValue;
/**
* 医院id
*/
private Integer hospitalId;
/**
* 需要传输的数据
*/
private T data;
}
第四步 kafka数据发送类 KafkaProducerService
package com.jianxun.nash.mq.service;
import com.alibaba.fastjson.JSON;
import com.jianxun.nash.common.enums.ExceptionMsg;
import com.jianxun.nash.common.exception.MyException;
import com.jianxun.nash.mq.dto.TopicMessageDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class KafkaProducerService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 异步发送kafka消息
* @param topicMessageDTO
* @return
*/
public void synchronousSend(TopicMessageDTO topicMessageDTO){
if(StringUtils.isNotEmpty(topicMessageDTO.getKeyValue())){
kafkaTemplate.send(topicMessageDTO.getTopicName()
,topicMessageDTO.getKeyValue(), JSON.toJSONString(topicMessageDTO));
}else {
kafkaTemplate.send(topicMessageDTO.getTopicName(), JSON.toJSONString(topicMessageDTO));
}
}
// 一般用这个方法
public boolean send(TopicMessageDTO topicMessageDTO){
try {
if(StringUtils.isNotEmpty(topicMessageDTO.getKeyValue())){
kafkaTemplate.send(topicMessageDTO.getTopicName()
,topicMessageDTO.getKeyValue(),JSON.toJSONString(topicMessageDTO)).get();
}else {
kafkaTemplate.send(topicMessageDTO.getTopicName(),JSON.toJSONString(topicMessageDTO)).get();
}
} catch (Exception e) {
throw new MyException(ExceptionMsg.KAFAK_SEND_ERROR.getCode(),ExceptionMsg.KAFAK_SEND_ERROR.getMsg());
}
return true;
}
}
第五步 数据发送 此方法可任意写根据自己的需求 方法写在service层
@Autowired
KafkaProducerService kafkaProducerService;
/**
* 送检订单推送到kafka
* @param orderList 要发送的数据 T泛型
* @param hospitalId 医院id
* @param submitType 发送的 数据类型
*/
private <T> void sendToKafka(List<T> orderList, Integer hospitalId,String submitType) {
TopicMessageDTO<List<T>> topicMessageDTO = new TopicMessageDTO<>();
// 获取kafka的标识 由 his_hospital_+ 医院id(hospitalId)
topicMessageDTO.setTopicName(TopicNameEnum.HIS_HOSPITAL.getTopicName()+hospitalId);
topicMessageDTO.setMessageType(submitType);
topicMessageDTO.setActionCode(DataActionCodeEnum.CREATED.getKey());
topicMessageDTO.setTimestamp(DateHelper.getCurrentDateTimeStr());
topicMessageDTO.setKeyValue(hospitalId+"");
topicMessageDTO.setHospitalId(hospitalId);
topicMessageDTO.setData(orderList);
kafkaProducerService.send(topicMessageDTO);
}
消费数据
更多推荐
所有评论(0)