本篇简述springboot集成kafka,基于kafka集群模式。(严格来说,kafka不是基于JMS规范的,但却经常被大家作为一款消息中间件来使用,可能源于kafka具有高吞吐量的性能)

准备工作:

  1. 搭建springboot脚手架并成功运行,可参考历史分享springboot+mybatis
  2. 搭建zookeeper集群(kafka集群依赖)
  3. 搭建kafka集群 (zk集群及kafka集群,后续会在运维章节另行讲述)

1. maven添加kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. kafka 配置

2.1 yml

#spring
spring:
    application:
    	  name: demo-order

	  #kafka
    kafka:
        bootstrap-servers: 192.168.2.9:9092,192.168.2.10:9092,192.168.2.11:9092 # kafka brokers
        consumer:
              group-id: demo-order       # 消息消费组
              auto-offset-reset: earliest  # 新的消费者将会从offset=0的位置从头开始消费;latest则相反
              key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
              key-serializer: org.apache.kafka.common.serialization.StringSerializer
              value-serializer: org.apache.kafka.common.serialization.StringSerializer

2.2 Kafka Producer

public interface KafkaTopic {

    /** 订单相关消息 **/
    String demo_order = "demo_order";
}

@Component
@Slf4j
public class KafkaProducer {

    private static final int DEFAULT_PARTITIONS = 3;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaMsgResult send(String topic, String key, boolean isPartition, Object msg) {
        String msgData = JSON.toJSONString(msg);
        log.info("kafka send message======{}", msgData);

        if (isPartition) {
            int partition = this.getPartition(key, DEFAULT_PARTITIONS);
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partition, key, msgData);
            return checkProRecord(result, isPartition);
        } else {
            ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, msgData);
            return checkProRecord(result, isPartition);
        }
    }

    public KafkaMsgResult send(String topic, String role, String message) {
        String key = role + "-" + message.hashCode();
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, message);
        return checkProRecord(result, false);
    }

    /**
     * 根据key值获取分区索引
     *
     * @param key
     * @param partitionNum
     * @return
     */
    private int getPartition(String key, int partitionNum) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(partitionNum);
        } else {
            return Math.abs(key.hashCode()) % partitionNum;
        }
    }

    /**
     * 检查发送返回结果record
     *
     * @param result
     * @return
     */
    private KafkaMsgResult checkProRecord(ListenableFuture<SendResult<String, String>> result, boolean isPartition) {
        if (result != null) {
            try {
                // 检查result结果集
                SendResult<String, String> sendResult = result.get();
                // 检查recordMetadata的offset数据,不检查producerRecord
                long offsetIndex = sendResult.getRecordMetadata().offset();
                if (offsetIndex >= 0) {
                    return KafkaMsgResult.SEND_SUCCESS;
                } else {
                    if (isPartition) {
                        return KafkaMsgResult.NO_OFFSET;
                    }
                }
            } catch (InterruptedException | ExecutionException e) {
                return KafkaMsgResult.SEND_ERROR;
            }
        }

        return KafkaMsgResult.NO_RESULT;
    }

    @Getter
    public enum KafkaMsgResult {

        SEND_SUCCESS(0, "消息发送成功"),
        SEND_ERROR(1, "消息发送失败"),
        NO_RESULT(2, "无返回结果"),
        NO_OFFSET(3, "未查到返回数据指针偏移量");

        private int code;

        private String msg;

        KafkaMsgResult(int code, String msg) {
            this.code = code;
            this.msg = msg;
        }

    }
    
}

发送消息的工具类:

public final class OrderMsgUtil {

    /**
     * 发送订单已读消息
     * @param producer
     * @param orderReadMsg
     * @return
     */
    public static KafkaMsgResult sendOrderReadMsg(KafkaProducer producer, OrderReadMsg orderReadMsg){
        BizMsg<OrderReadMsg> bizMsg = new BizMsg<>();
        bizMsg.setMsgType(BizMsgType.ORDER);
        bizMsg.setItemMsgType(BizMsgType.ORDER_READ);
        bizMsg.setData(orderReadMsg);

        String topic = KafkaTopic.demo_order;
        String key = topic + bizMsg.getMsgType() + bizMsg.getItemMsgType();
        return producer.send(topic, key, true, bizMsg);
    }
  
    /**
     * 发送订单服务通知
     * @param producer
     * @param orderNoticeMsg
     * @return
     */
    public static KafkaMsgResult sendNoticeMsg(KafkaProducer producer, OrderNoticeMsg orderNoticeMsg){
        BizMsg<OrderNoticeMsg> bizMsg = new BizMsg<>();
        bizMsg.setMsgType(BizMsgType.ORDER);
        bizMsg.setItemMsgType(BizMsgType.ORDER_SERVICE_NOTICE);
        bizMsg.setData(orderNoticeMsg);

        String topic = KafkaTopic.demo_order;
        String key = topic + bizMsg.getMsgType() + bizMsg.getItemMsgType();
        return producer.send(topic, key, true, bizMsg);
    }
  
}

在需要异步处理,同时又不影响主业务事务最终一致性的情况下,触发消息:

@Service
public class OrderServiceImpl extends BaseServiceImpl implements OrderService {
     @Autowired
     private KafkaProducer kafkaProducer;
  
     public Order order(){
          Order order = new Order();
          // todo something
       
          // 发送订单服务通知
     			OrderMsgUtil.sendNoticeMsg(kafkaProducer, new OrderNoticeMsg(order));
       
          return order;
     }    
}

2.3 Kafka Consumer

@Slf4j
@Component
public class KafkaOrderConsumer {

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = {KafkaTopic.demo_order})
    public void listen(ConsumerRecord<String, String> record) {
        log.info("=============kafkaConsumer=>order receive msg:{}", record.toString());

        String topic = record.topic();
        String key = record.key();
        String message = record.value();
        long offset = record.offset();
        int partition = record.partition();

        log.info("-------------topic:"+topic);
        log.info("-------------key:" + key);
        log.info("-------------offset:" + offset);
        log.info("-------------partition:" + partition);

        if(StringUtils.isEmpty(message)){
            log.warn("message is empty");
            return;
        }

        // 订单消息分类处理
        try {
            BizMsg<?> bizMsg = new ObjectMapper().readValue(message, new TypeReference<BizMsg<?>>(){});
            if(bizMsg.getMsgType() == BizMsgType.ORDER){
                this.dealWithOrderMsg(bizMsg.getItemMsgType(), bizMsg.getData());
            }
        } catch (IOException e) {
            log.warn("parse message error");
        }

        log.info("=============kafka Consume msg finished!");
    }

    /**
     * 分类处理订单消息
     * @param itemMsgType
     * @param msg
     */
    private void dealWithOrderMsg(int itemMsgType, Object msg){
        switch (itemMsgType){
            // 订单已读
            case BizMsgType.ORDER_READ :
                OrderReadMsg orderReadMsg = JSON.parseObject(JSON.toJSONString(msg), OrderReadMsg.class);
                orderService.updateOrderRead(orderReadMsg);
                break;

            // 用户订单服务通知
            case BizMsgType.ORDER_SERVICE_NOTICE:
                OrderNoticeMsg noticeMsg = JSON.parseObject(JSON.toJSONString(msg), OrderNoticeMsg.class);
                Order order = new Order();
                BeanUtils.copyProperties(noticeMsg, order);
                orderService.serviceNotice(order);
                break;
        }
    }

}

3. kafka topic分区工作原理个人理解

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐