kafka发送数据

(注: 这是在已搭建好的kafka中进行的,如果有kafka的IP地址你不用在自己电脑上搭建

第一步添加jar包
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
第二步 yml配置kafka的路径
注:你访问的kafka是否有权限,没有权限你是连不进去的会包超时错误。还有注意格式
spring:
   kafka:
     bootstrap-servers: localhost
第三步 写一个数据类 TopicMessageDTO
package com.jianxun.nash.mq.dto;

import lombok.Data;

/**
 * @author Carl Lee
 * @date 2020-01-04 9:03
 */
@Data
public class TopicMessageDTO<T> {
    /**
     * topic name
      */
    private String topicName;

    /**
     * 数据类型
      */
    private String messageType;

    /**
     * 数据产生的时间
      */
    private String timestamp;

    /**
     * 数据更新代码 C:created U: Updated D:Deleted
      */
    private String actionCode;

    /**
     * 业务逻辑key值
      */
    private String keyValue;

    /**
     * 医院id
     */
    private Integer hospitalId;

    /**
     * 需要传输的数据
     */
    private T data;

}

第四步 kafka数据发送类 KafkaProducerService
package com.jianxun.nash.mq.service;

import com.alibaba.fastjson.JSON;
import com.jianxun.nash.common.enums.ExceptionMsg;
import com.jianxun.nash.common.exception.MyException;
import com.jianxun.nash.mq.dto.TopicMessageDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class KafkaProducerService {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    /**
     * 异步发送kafka消息
     * @param topicMessageDTO
     * @return
     */
    public void synchronousSend(TopicMessageDTO topicMessageDTO){
        if(StringUtils.isNotEmpty(topicMessageDTO.getKeyValue())){
            kafkaTemplate.send(topicMessageDTO.getTopicName()
                    ,topicMessageDTO.getKeyValue(), JSON.toJSONString(topicMessageDTO));
        }else {
            kafkaTemplate.send(topicMessageDTO.getTopicName(), JSON.toJSONString(topicMessageDTO));
        }
    }
   // 一般用这个方法
    public boolean send(TopicMessageDTO topicMessageDTO){
        try {
            if(StringUtils.isNotEmpty(topicMessageDTO.getKeyValue())){
                kafkaTemplate.send(topicMessageDTO.getTopicName()
                        ,topicMessageDTO.getKeyValue(),JSON.toJSONString(topicMessageDTO)).get();
            }else {
                kafkaTemplate.send(topicMessageDTO.getTopicName(),JSON.toJSONString(topicMessageDTO)).get();
            }
        } catch (Exception e) {
            throw new MyException(ExceptionMsg.KAFAK_SEND_ERROR.getCode(),ExceptionMsg.KAFAK_SEND_ERROR.getMsg());
        }
        return true;
    }
}
第五步 数据发送 此方法可任意写根据自己的需求 方法写在service层

@Autowired
    KafkaProducerService kafkaProducerService;
/**
     * 送检订单推送到kafka
     * @param orderList 要发送的数据 T泛型
     * @param hospitalId 医院id
     * @param submitType 发送的 数据类型
     */
    private <T> void sendToKafka(List<T> orderList, Integer hospitalId,String submitType) {
        TopicMessageDTO<List<T>> topicMessageDTO = new TopicMessageDTO<>();
        // 获取kafka的标识 由 his_hospital_+ 医院id(hospitalId)
        topicMessageDTO.setTopicName(TopicNameEnum.HIS_HOSPITAL.getTopicName()+hospitalId);
        topicMessageDTO.setMessageType(submitType);
        topicMessageDTO.setActionCode(DataActionCodeEnum.CREATED.getKey());
        topicMessageDTO.setTimestamp(DateHelper.getCurrentDateTimeStr());
        topicMessageDTO.setKeyValue(hospitalId+"");
        topicMessageDTO.setHospitalId(hospitalId);
        topicMessageDTO.setData(orderList);
        kafkaProducerService.send(topicMessageDTO);
    }

消费数据

消费kafka数据博客

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐