前言

首先说一下延迟队列这个东西,实际上实现他的方法有很多,kafka实现并不是一个最好的选择,例如redis的zset可以实现,rocketmq天然的可以实现,rabbitmq也可以实现。如果切换前几种方案成本高的情况下,那么就使用kafka实现,实际上kafka实现延迟队列也是借用了rocketmq的延迟队列思想,rocketmq的延迟时间是固定的几个,并不是自定义的,但是kafka可以实现自定义的延迟时间,但是不能过多,因为是依据topic实现的,接下来我使用go实现简单的kafka的延迟队列。

实现方案

1、首先创建两个topic、一个delayTopic、一个realTopic

2、生产者把消息先发送到delayTopic

3、延迟服务再把delayTopic里面的消息超过我们所设置的时间写入到realTopic

4、消费者再消费realTopic里面的数据即可

具体实现

1、生产者发送消息到延迟队列
msg := &sarama.ProducerMessage{
		Topic:     kafka.DelayTopic,
		Timestamp: time.Now(),
		Key:       sarama.StringEncoder("rta_key"),
		Value:     sarama.StringEncoder(riStr),
	}
	partition, offset, err := kafka.KafkaDelayQueue.SendMessage(msg)
2、延迟服务的消费者(消费延迟队列里面的数据到real队列)
const (
	DelayTime  = time.Minute * 5
	DelayTopic = "delayTopic"
	RealTopic  = "realTopic"
)

// KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务
type KafkaDelayQueueProducer struct {
	producer   sarama.SyncProducer // 生产者
	delayTopic string              // 延迟服务主题
}

// NewKafkaDelayQueueProducer 创建延迟队列生产者
// producer 生产者
// delayServiceConsumerGroup 延迟服务消费者组
// delayTime 延迟时间
// delayTopic 延迟服务主题
// realTopic 真实队列主题
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
	delayTime time.Duration, delayTopic, realTopic string, log *log) *KafkaDelayQueueProducer {
	var (
		signals = make(chan os.Signal, 1)
	)
	signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
	// 启动延迟服务
	consumer := NewDelayServiceConsumer(producer, delayTime, realTopic, log)
	log.Info("[NewKafkaDelayQueueProducer] delay queue consumer start")
	go func() {
		for {
			if err := delayServiceConsumerGroup.Consume(context.Background(),
				[]string{delayTopic}, consumer); err != nil {
				log.Error("[NewKafkaDelayQueueProducer] delay queue consumer failed,err: ", zap.Error(err))
				break
			}
			time.Sleep(2 * time.Second)
			log.Info("[NewKafkaDelayQueueProducer] 检测消费函数是否一直执行")
			// 检查是否接收到中断信号,如果是则退出循环
			select {
			case sin := <-signals:
				consumer.Logger.Info("[NewKafkaDelayQueueProducer]get signal,", zap.Any("signal", sin))
				return
			default:
			}
		}
		log.Info("[NewKafkaDelayQueueProducer] consumer func exit")
	}()
	log.Info("[NewKafkaDelayQueueProducer] return KafkaDelayQueueProducer")

	return &KafkaDelayQueueProducer{
		producer:   producer,
		delayTopic: delayTopic,
	}
}

// SendMessage 发送消息
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
	msg.Topic = q.delayTopic
	return q.producer.SendMessage(msg)
}

// DelayServiceConsumer 延迟服务消费者
type DelayServiceConsumer struct {
	producer  sarama.SyncProducer
	delay     time.Duration
	realTopic string
	Logger    *log.DomobLog
}

func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
	realTopic string, log *log.DomobLog) *DelayServiceConsumer {
	return &DelayServiceConsumer{
		producer:  producer,
		delay:     delay,
		realTopic: realTopic,
		Logger:    log,
	}
}

func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {
	c.Logger.Info("[delaye ConsumerClaim] cc")
	for message := range claim.Messages() {
		// 如果消息已经超时,把消息发送到真实队列
		now := time.Now()
		c.Logger.Info("[delay ConsumeClaim] out",
			zap.Any("send real topic res", now.Sub(message.Timestamp) >= c.delay),
			zap.Any("message.Timestamp", message.Timestamp),
			zap.Any("c.delay", c.delay),
			zap.Any("claim.Messages len", len(claim.Messages())),
			zap.Any("sub:", now.Sub(message.Timestamp)),
			zap.Any("meskey:", message.Key),
			zap.Any("message:", string(message.Value)),
		)

		if now.Sub(message.Timestamp) >= c.delay {
			c.Logger.Info("[delay ConsumeClaim] jinlai", zap.Any("mes", string(message.Value)))
			_, _, err := c.producer.SendMessage(&sarama.ProducerMessage{
				Topic:     c.realTopic,
				Timestamp: message.Timestamp,
				Key:       sarama.ByteEncoder(message.Key),
				Value:     sarama.ByteEncoder(message.Value),
			})
			if err != nil {
				c.Logger.Info("[delay ConsumeClaim] delay already send to real topic failed", zap.Error(err))

				return nil
			}
			if err == nil {
				session.MarkMessage(message, "")
				c.Logger.Info("[delay ConsumeClaim] delay already send to real topic success")
				continue
			}
		}
		// 否则休眠一秒
		time.Sleep(time.Second)
		return nil
	}

	c.Logger.Info("[delay ConsumeClaim] ph",
		zap.Any("partitiion", claim.Partition()),
		zap.Any("HighWaterMarkOffset", claim.HighWaterMarkOffset()))
	c.Logger.Info("[delay ConsumeClaim] delay consumer end")
	return nil
}

func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

这个方法整体逻辑就是不断消费延迟队列里面的消息,判断消息时间是否大于现在,如果大于现在说明消息超时了,就把该消息发送到真实的队列里面去了,真实队列是一直在消费的。如果没超时的话就不会标记消息,还会重新消费,消费成功会标记该消息。

重点:我在测试的时候是一秒拉一次消息,但这个也不是太准时,不过最终结果差距不大,想知道具体怎么消费的可以自己debug

3、真实队列里面的消费逻辑

type ConsumerRta struct {
	Logger *log
}

func ConsumerToRequestRta(consumerGroup sarama.ConsumerGroup, lg *log) {
	var (
		signals = make(chan os.Signal, 1)
		wg = &sync.WaitGroup{}
	)
	signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
	wg.Add(1)
	// 启动消费者协程
	go func() {
		defer wg.Done()
		consumer := NewConsumerRta(lg)
		consumer.Logger.Info("[ConsumerToRequestRta] consumer group start")
		// 执行消费者组消费
		for {
			if err := consumerGroup.Consume(context.Background(), []string{kafka.RealTopic}, consumer); err != nil {
				consumer.Logger.Error("[ConsumerToRequestRta] Error from consumer group:", zap.Error(err))
				break
			}
			time.Sleep(2 * time.Second) // 等待一段时间后重试

			// 检查是否接收到中断信号,如果是则退出循环
			select {
			case sin := <-signals:
				consumer.Logger.Info("get signal,", zap.Any("signal", sin))
				return
			default:
			}
		}
	}()
	wg.Wait()
	lg.Info("[ConsumerToRequestRta] consumer end & exit")
}

func NewConsumerRta(lg *log) *ConsumerRta {
	return &ConsumerRta{
		Logger: lg,
	}
}



func (c *ConsumerRta) ConsumeClaim(session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		// 消费逻辑
		session.MarkMessage(message, "")
		return nil
	}

	return nil
}

func (c *ConsumerRta) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *ConsumerRta) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

4、kafka配置
type KafkaConfig struct {
	BrokerList []string
	Topic      []string
	GroupId    []string
	Cfg        *sarama.Config
	PemPath    string
	KeyPath    string
	CaPemPath  string
}

var (
	Producer           sarama.SyncProducer
	ConsumerGroupReal  sarama.ConsumerGroup
	ConsumerGroupDelay sarama.ConsumerGroup
	KafkaDelayQueue    *KafkaDelayQueueProducer
)

func NewKafkaConfig(cfg KafkaConfig) (err error) {
	Producer, err = sarama.NewSyncProducer(cfg.BrokerList, cfg.Cfg)
	if err != nil {
		return err
	}
	ConsumerGroupReal, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[0], cfg.Cfg)
	if err != nil {
		return err
	}
	ConsumerGroupDelay, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[1], cfg.Cfg)
	if err != nil {
		return err
	}

	return nil
}

func GetKafkaDelayQueue(log *log) {
	KafkaDelayQueue = NewKafkaDelayQueueProducer(Producer, ConsumerGroupDelay, DelayTime, DelayTopic, RealTopic, log)
}

这个里面我没有怎么封装,可以自行封装,使用的是IBM的sarama客户端

总结

基本上就是以上三步实现,里面的一些log日志可以传递自己的log日志即可,使用的是消费者组消费的,添加上自己的topic和groupid即可

重点:以上实现延迟时间可能不是太精准,我使用的时候还是有点小小的误差,不过误差不大,强相关业务还是使用其他专业实现延迟队列mq,或使用自行方案

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐