最近公司交易所的撮合系统遇到了一个非常极端的bug,程序在匹配完买卖订单并且操作完redis内的数据之后,准备发送kafka的时候程序被意外kill掉了,数据丢失,导致盘口信息更改完了,后续的程序无法处理!

最终结果变相的解决了

//就不写mian函数了,直接写我内部处理的
func StartMatching() {
	//获取所有交易市场
	MarketList := v1.GetAllMarket(utils.MainDb)
	if len(MarketList) == 0 {
		panic("获取交易区信息失败")
	}
	ctx, cancel := context.WithCancel(context.Background())
	//因为我是协程处理交易所的所有交易区,所以需要每个交易区都退出才能关闭,
	exitStatus := make([]chan bool, len(MarketList))//交易区退出判断条件
	quit := make(chan bool, 1)//主进程退出条件
	for k, v := range MarketList {
		exitStatus[k] = make(chan bool)
		//重点是传入了上下文以及退出状态通道
		go Matching(v.BuyName+"_"+v.SellName, gconv.String(v.Id), v.BuyCurrencyId, v.SellCurrencyId, ctx, exitStatus[k])
	}
	sigterm := make(chan os.Signal, 1)
	//监听退出信号量,不要使用 -9,这个信号量无法捕获
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	<-sigterm                  //收到退出信号量
	cancel()                   //广播退出信号
	ExitHook(exitStatus, quit) //退出前的钩子
	<-quit//退出主服务
}
//退出前判断所有协程是否退出
func ExitHook(exitStatus []chan bool, quit chan bool) {
	number := 0
	statusLen := len(exitStatus)
	for index := 0; index < statusLen; index++ {
		if <-exitStatus[index] {
			number++
		}
	}
	if statusLen == number {
		glog.Println("工作协诚完全退出!")
		quit <- true
	} else {
		time.Sleep(100 * time.Millisecond)
		glog.Async().Println("工作协诚未完全退出!")
		ExitHook(exitStatus, quit)
	}
}

协程内的处理比较简单,只介绍个大概,自行处理

//我在发送消息的时候判断一下是否广播退出,然后更改exit通道即可
select {
	case <-ctx.Done():
		exit <- true
	default:
		break
	}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐