解耦利器–事件驱动机制



背景

之前的课程中我们已经学习了使用go-micro创建微服务,并实现了服务的调用。我们具体的实现是实例化了client对象,并调用了对应服务的相关方法。这种方式可以实现系统功能,但有比较大的缺点。

我们通过举例来说明:在某个系统中存在用户服务(user service)、产品服务(product service)和消息服务(message service)。如果用户服务中要调用消息服务中的功能方法,则具体的实现方式可用下图所示方法表示:

在这里插入图片描述

按照正常的实现是在user service模块的程序中实例化message service的一个client,然后进行RPC调用,调用sendMessage来实现发送消息。

缺点

这种实现方式代码耦合度高,用户服务的模块中出现了消息服务模块的代码,不利于系统的扩展和功能的迭代开发。

发布/订阅机制

事件驱动

依然是上述的案例,用户服务在用户操作的过程中,需要调用消息服务的某个方法,假设为发送验证码消息的一个方法。为了使系统代码能够实现解耦,用户服务并不直接调用消息服务的具体的方法,而是将用户信息等相关数据发送到一个中间组件,该组件负责存储消息,而消息服务会按照特定的频率访问中间的消息存储组件,并取出其中的消息,然后执行发送验证码等操作。具体的示意图如下所示:

在这里插入图片描述

在上述的架构图中,我们可以看到,相较于之前的实现,多了一个中间的消息组件系统。

事件发布

只有当用户服务中的某个功能执行时,才会触发相应的事件,并将对应的用户数据等消息发送到消息队列组件中,这个过程我们称之为事件发布。

事件订阅

与事件发布对应的是事件订阅。我们增加消息队列组件的目的是实现模块程序的解耦,原来是程序调用端主动进行程序调用,现在需要由另外一方模块的程序到消息队列组件中主动获取需要相关数据并进行相关功能调用。这个主动获取的过程称之为订阅。

基于消息发布/订阅的消息系统有很多种框架的实现,常见的有:Kafka、RabbitMQ、ActiveMQ、Kestrel、NSQ等。

Broker

在我们介绍go-micro的时已经提到过,go-micro整个框架都是插件式的设计。没错,这里的发布/订阅也是通过接口设计来实现的。

定义
type Broker interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	String() string
}

如果我们要具体实现事件的发布和订阅功能,只需要安装对应支持的go-plugins插件实现就可以了。go-plugins里支持的消息队列方式有:kafka、nsq、rabbitmq、redis等。同时,go-micro本身支持三种broker,分别是http、nats、memory,默认的broker是http,在实际使用过程中往往使用第三方的插件来进行消息发布/订阅的实现。

在本课程中,我们演示RabbitMQ插件实现的事件订阅和发布机制。

安装go-plugins

在go-micro框架的学习过程中,需要频繁的用到相关的插件。因此,首先安装go-plugins插件库,在go-plugins插件库中,封装提供了go-micro框架中的插件机制的实现方案。

源码库

在github网站上能够找到对应的go-plugins插件库的源码,源码地址是:https://github.com/micro/go-plugins

安装
go get github.com/micro/go-plugins

可以通过上述的命令安装micro的插件库。

Broker实现

在已经安装和下载的go-plugins插件库中,我们可以看到有一个broker目录,其中就封装了go-micro框架的broker机制支持的解决方案。

我们在本案例中,以mqtt进行讲解。

kafka介绍及环境搭建

Kafka简介

Kafka最先由LinkedIn公司开发,之后成为Apache的顶级项目。Kafka是一个分布式的、分区化、可复制提交的日志服务。使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案

Kafka安装

我们的举例在win10环境下进行,所以这里只列举win10环境kafka的安装:

win10环境部署kafka

运行kafka

其实高版本的kafka内置了zookeeper,按理我们可以直接使用kafka内置的zookeeper,但是高版本的kafka对win10的文件系统支持并不好,kafka内置的zookeeper总是启动失败,所以我另外下载了一个zookeeper。

使用时首先启动zookeeper,然后启动kafka即可:

kafka的简单使用

编程实现

接下来进行订阅和发布机制的编程的实现。

消息组件初始化

如果要想使用消息组件完成消息的发布和订阅,首先应该让消息组件正常工作。因此,需要先对消息组件进行初始化。我们可以在服务创建时,对消息组件进行初始化,并进行可选项配置,设置使用kafka作为消息组件。代码实现如下:

...
BrokerURLs := []string{
		0: "127.0.0.1:9093",
	}
	service := micro.NewService(
		micro.Name("student.client"),
		micro.Broker(kafka.NewBroker(func(o *broker.Options) {
			o.Addrs = BrokerURLs
		})),
	)
...

可以使用micro.Broker来指定特定的消息组件,并通过kafka.NewBroker初始化一个kafka实例对象,作为broker参数。需要注意的是,在我的机器中,kafka的默认监听端口我改成了9093。

消息订阅

因为是时间驱动机制,消息的发送方随时可能发布相关事件。因此需要消息的接收方先进行订阅操作,避免遗漏消息。go-micro框架中可以通过broker.Subscribe实现消息订阅。编程代码如下所示:

...
// 消息订阅
	_, err := pubSub.Subscribe("go.micro.srv.message", func(event broker.Event) error {
		var req *message.StudentRequest
		fmt.Println(string(event.Message().Body))

		if err := json.Unmarshal(event.Message().Body, &req); err != nil {
			return err
		}
		fmt.Println(" 接收到信息:", req)
		//去执行其他操作
		return nil
	})
...
消息发布

完成了消息的订阅,我们再来实现消息的发布。在客户端实现消息的发布。在go-micro框架中,可以使用broker.Publish来进行消息的发布,具体的代码如下:

...

	brok := service.Server().Options().Broker
	if err := brok.Connect(); err != nil {
		log.Fatal(" broker connection failed, error : ", err.Error())
	}

	//student := &message.Student{Name: "davie", Classes: "软件工程专业", Grade: 80, Phone: "12345678901"}
	student := &message.Student{Name: "tony", Classes: "网络工程专业", Grade: 95, Phone: "12345678902"}
	msgBody, err := json.Marshal(student)
	if err != nil {
		log.Fatal(err.Error())
	}

	msg := &broker.Message{
		Header: map[string]string{
			"name": student.Name,
		},
		Body: msgBody,
	}

	// 发布消息
	err = brok.Publish("go.micro.srv.message", msg)
	if err != nil {
		log.Fatalf(" 消息发布失败:%s\n", err.Error())
	} else {
		log.Print("消息发布成功")
	}

	
...

运行程序

启动kafka

kafka会在9093端口监听。

启动server程序

首先运行server端程序的main.go文件中的main函数。

启动client程序

server程序启动后,启动客户端程序client.go,可以输出正确日志:

服务端日志:
在这里插入图片描述客户端日志:
在这里插入图片描述


然后我们可以查看kafka的日志文件:

在这里插入图片描述
发现了这两条消费记录。

弊端

在服务端通过fmt.println日志,可以输出event.Message().Body)数据,其格式为:

{"name":"davie","classes":"软件工程专业","grade":80,"phone":"12345678901"}

我们可以看到在服务实例之间传输的数据格式是json格式。根据之前学习proto知识可以知道,在进行消息通信时,采用JSON格式进行数据传输,其效率比较低。

因此,这意味着,当我们在使用第三方消息组件进行消息发布/订阅时,会失去对protobuf的使用。这对追求高消息的开发者而言,是需要解决和改进的问题。因为使用protobuf可以直接在多个服务之间使用二进制流数据进行传输,要比json格式高效的多。

googlepubsub

在go-micro框架中内置的Broker插件中,有google提供的googlepubsub插件实现,位于代理层之上,同时还省略了使用第三方代理消息组件(如mqtt)。

参考资料:https://cloud.google.com/pubsub/。感兴趣的同学可以自己动手实现,此处我们作为拓展思路,不再进行实现。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐