springboot+kafka
卡夫卡
·
本篇简述springboot集成kafka,基于kafka集群模式。(严格来说,kafka不是基于JMS规范的,但却经常被大家作为一款消息中间件来使用,可能源于kafka具有高吞吐量的性能)
准备工作:
- 搭建springboot脚手架并成功运行,可参考历史分享springboot+mybatis
- 搭建zookeeper集群(kafka集群依赖)
- 搭建kafka集群 (zk集群及kafka集群,后续会在运维章节另行讲述)
1. maven添加kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. kafka 配置
2.1 yml
#spring
spring:
application:
name: demo-order
#kafka
kafka:
bootstrap-servers: 192.168.2.9:9092,192.168.2.10:9092,192.168.2.11:9092 # kafka brokers
consumer:
group-id: demo-order # 消息消费组
auto-offset-reset: earliest # 新的消费者将会从offset=0的位置从头开始消费;latest则相反
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2.2 Kafka Producer
public interface KafkaTopic {
/** 订单相关消息 **/
String demo_order = "demo_order";
}
@Component
@Slf4j
public class KafkaProducer {
private static final int DEFAULT_PARTITIONS = 3;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaMsgResult send(String topic, String key, boolean isPartition, Object msg) {
String msgData = JSON.toJSONString(msg);
log.info("kafka send message======{}", msgData);
if (isPartition) {
int partition = this.getPartition(key, DEFAULT_PARTITIONS);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partition, key, msgData);
return checkProRecord(result, isPartition);
} else {
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, msgData);
return checkProRecord(result, isPartition);
}
}
public KafkaMsgResult send(String topic, String role, String message) {
String key = role + "-" + message.hashCode();
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, message);
return checkProRecord(result, false);
}
/**
* 根据key值获取分区索引
*
* @param key
* @param partitionNum
* @return
*/
private int getPartition(String key, int partitionNum) {
if (key == null) {
Random random = new Random();
return random.nextInt(partitionNum);
} else {
return Math.abs(key.hashCode()) % partitionNum;
}
}
/**
* 检查发送返回结果record
*
* @param result
* @return
*/
private KafkaMsgResult checkProRecord(ListenableFuture<SendResult<String, String>> result, boolean isPartition) {
if (result != null) {
try {
// 检查result结果集
SendResult<String, String> sendResult = result.get();
// 检查recordMetadata的offset数据,不检查producerRecord
long offsetIndex = sendResult.getRecordMetadata().offset();
if (offsetIndex >= 0) {
return KafkaMsgResult.SEND_SUCCESS;
} else {
if (isPartition) {
return KafkaMsgResult.NO_OFFSET;
}
}
} catch (InterruptedException | ExecutionException e) {
return KafkaMsgResult.SEND_ERROR;
}
}
return KafkaMsgResult.NO_RESULT;
}
@Getter
public enum KafkaMsgResult {
SEND_SUCCESS(0, "消息发送成功"),
SEND_ERROR(1, "消息发送失败"),
NO_RESULT(2, "无返回结果"),
NO_OFFSET(3, "未查到返回数据指针偏移量");
private int code;
private String msg;
KafkaMsgResult(int code, String msg) {
this.code = code;
this.msg = msg;
}
}
}
发送消息的工具类:
public final class OrderMsgUtil {
/**
* 发送订单已读消息
* @param producer
* @param orderReadMsg
* @return
*/
public static KafkaMsgResult sendOrderReadMsg(KafkaProducer producer, OrderReadMsg orderReadMsg){
BizMsg<OrderReadMsg> bizMsg = new BizMsg<>();
bizMsg.setMsgType(BizMsgType.ORDER);
bizMsg.setItemMsgType(BizMsgType.ORDER_READ);
bizMsg.setData(orderReadMsg);
String topic = KafkaTopic.demo_order;
String key = topic + bizMsg.getMsgType() + bizMsg.getItemMsgType();
return producer.send(topic, key, true, bizMsg);
}
/**
* 发送订单服务通知
* @param producer
* @param orderNoticeMsg
* @return
*/
public static KafkaMsgResult sendNoticeMsg(KafkaProducer producer, OrderNoticeMsg orderNoticeMsg){
BizMsg<OrderNoticeMsg> bizMsg = new BizMsg<>();
bizMsg.setMsgType(BizMsgType.ORDER);
bizMsg.setItemMsgType(BizMsgType.ORDER_SERVICE_NOTICE);
bizMsg.setData(orderNoticeMsg);
String topic = KafkaTopic.demo_order;
String key = topic + bizMsg.getMsgType() + bizMsg.getItemMsgType();
return producer.send(topic, key, true, bizMsg);
}
}
在需要异步处理,同时又不影响主业务事务最终一致性的情况下,触发消息:
@Service
public class OrderServiceImpl extends BaseServiceImpl implements OrderService {
@Autowired
private KafkaProducer kafkaProducer;
public Order order(){
Order order = new Order();
// todo something
// 发送订单服务通知
OrderMsgUtil.sendNoticeMsg(kafkaProducer, new OrderNoticeMsg(order));
return order;
}
}
2.3 Kafka Consumer
@Slf4j
@Component
public class KafkaOrderConsumer {
@Autowired
private OrderService orderService;
@KafkaListener(topics = {KafkaTopic.demo_order})
public void listen(ConsumerRecord<String, String> record) {
log.info("=============kafkaConsumer=>order receive msg:{}", record.toString());
String topic = record.topic();
String key = record.key();
String message = record.value();
long offset = record.offset();
int partition = record.partition();
log.info("-------------topic:"+topic);
log.info("-------------key:" + key);
log.info("-------------offset:" + offset);
log.info("-------------partition:" + partition);
if(StringUtils.isEmpty(message)){
log.warn("message is empty");
return;
}
// 订单消息分类处理
try {
BizMsg<?> bizMsg = new ObjectMapper().readValue(message, new TypeReference<BizMsg<?>>(){});
if(bizMsg.getMsgType() == BizMsgType.ORDER){
this.dealWithOrderMsg(bizMsg.getItemMsgType(), bizMsg.getData());
}
} catch (IOException e) {
log.warn("parse message error");
}
log.info("=============kafka Consume msg finished!");
}
/**
* 分类处理订单消息
* @param itemMsgType
* @param msg
*/
private void dealWithOrderMsg(int itemMsgType, Object msg){
switch (itemMsgType){
// 订单已读
case BizMsgType.ORDER_READ :
OrderReadMsg orderReadMsg = JSON.parseObject(JSON.toJSONString(msg), OrderReadMsg.class);
orderService.updateOrderRead(orderReadMsg);
break;
// 用户订单服务通知
case BizMsgType.ORDER_SERVICE_NOTICE:
OrderNoticeMsg noticeMsg = JSON.parseObject(JSON.toJSONString(msg), OrderNoticeMsg.class);
Order order = new Order();
BeanUtils.copyProperties(noticeMsg, order);
orderService.serviceNotice(order);
break;
}
}
}
3. kafka topic分区工作原理个人理解
更多推荐
已为社区贡献2条内容
所有评论(0)