​【1】基本理论    
        OOP 中的接口,也可以理解为面向对象编程语言中的接口语法。接口的设计要尽量单一,不要让接口的实现类和调用者,依赖不需要的接口函数。

【2】功能描述
        假设我们的项目中用到了三个外部系统:Redis、MySQL、Kafka。每个系统都对应一系列配置信息,比如地址、端口、访问超时时间等。为了在内存中存储这些配置信息,供项目中的其他模块来使用,我们分别设计实现了三个 Configuration 类:RedisConfig、MysqlConfig、KafkaConfig。

    现在我们有如下两个功能需求:
    1.支持 Redis 和 Kafka 配置信息的热更新。
    但是,因为某些原因,我们并不希望对 MySQL 的配置信息进行热更新。

    2.我们希望能有一种更加方便的配置信息查看方式。
    比如在项目中开发一个内嵌的 SimpleHttpServer,输出项目的配置信息到一个固定的HTTP地  址,比如:http://127.0.0.1:9200/config/redis。我们只需要在浏览器中输入这个地址,就可以显示出系统的配置信息。

     不过,出于某些原因,我们只想暴露 MySQL 和 Redis 的配置信息,不想暴露 Kafka 的配置信息。
 
 【3】设计思路
         我们设计了两个功能非常单一的接口:Updater 和 Viewer。

         为了实现这样一个功能需求,我们设计实现了一个 ScheduledUpdater 类,以固定时间间隔(interVal)调用 RedisConfig、KafkaConfig 的 Update() 方法更新配置信息。ScheduledUpdater 只依赖 Updater 这个跟热更新相关的接口,不需要被强迫去依赖不需要的 Viewer 接口,满足接口隔离原则。

        同理,SimpleHttpSvr 只依赖跟查看信息相关的 Viewer 接口,不依赖不需要的 Updater 接口,也满足接口隔离原则。

【4】Demo实战

(说明:热更新和浏览器输出代码 都通过输出语句和注释说明的描述简单代替。)

package main

import (
	"fmt"
	"time"
)

//更新接口
type Updater interface {
	Update()
}

//查看接口
type Viewer interface {
	OutputInPlainText() string
	OutPut() map[string]string
}

//Redis 配置
type RedisConfig struct {
	host string
	port string
	auth string
}

func NewRedisConfig(host string, port string, auth string) *RedisConfig {
	return &RedisConfig{
		host: host,
		port: port,
		auth: auth,
	}
}

func (config *RedisConfig) Update() {
	fmt.Println("RedisConfig::update(), reload data from private redis config")
	//config.host,config.port,config.auth = readfile congfig...
	fmt.Printf("update config(%s,%s,%s) succ\n", config.host, config.port, config.auth)
}

func (config *RedisConfig) OutputInPlainText() string {
	str := "RedisConfig::outputInPlainText"
	fmt.Println(str)
	return str
}

func (config *RedisConfig) OutPut() map[string]string {
	mp := make(map[string]string)
	mp[config.host] = config.port
	return mp
}

//Kafka配置
type KafkaConfig struct {
	path  string
	topic string
}

func NewKafkaConfig(path string, topic string) *KafkaConfig {
	return &KafkaConfig{
		path:  path,
		topic: topic,
	}
}

func (config *KafkaConfig) Update() {
	fmt.Println("KafkaConfig::update(), reload data from private redis config")
	//config.path,config.topic = readfile congfig...
	fmt.Printf("update config(%s,%s) succ\n", config.path, config.topic)
}

//MySQL配置
type MySqlConfig struct {
	host     string
	port     string
	passWord string
	dbName   string
}

func NewMySqlConfig(host string, port string, passWord string, dbName string) *MySqlConfig {
	return &MySqlConfig{
		host:     host,
		port:     port,
		passWord: passWord,
		dbName:   dbName,
	}
}

func (config *MySqlConfig) OutputInPlainText() string {
	str := "MySqlConfig::outputInPlainText"
	fmt.Println(str)
	return str
}

func (config *MySqlConfig) OutPut() map[string]string {
	mp := make(map[string]string)
	mp[config.dbName] = config.host + "_" + config.passWord
	return mp
}

//定时更新器
type ScheduledUpdater struct {
	interVal int
	updater  Updater
}

func NewScheduledUpdater(interval int, updater Updater) *ScheduledUpdater {
	return &ScheduledUpdater{
		interVal: interval,
		updater:  updater,
	}
}

func (scheduledUpdater *ScheduledUpdater) Run() {
	ticker := time.NewTicker(time.Duration(scheduledUpdater.interVal) * time.Second)
	go func() {
		for t := range ticker.C {
			fmt.Printf("Tick at %v\n", t)
			scheduledUpdater.updater.Update()
		}
	}()
}

//简单HttpServer
type SimpleHttpSvr struct {
	host      string
	port      int
	mpViewers map[string][]Viewer
}

func NewSimpleHttpSvr(host string, port int) *SimpleHttpSvr {
	return &SimpleHttpSvr{
		host:      host,
		port:      port,
		mpViewers: make(map[string][]Viewer),
	}
}

func (svr *SimpleHttpSvr) AddViewer(urlDirectory string, viewer Viewer) {
	svr.mpViewers[urlDirectory] = append(svr.mpViewers[urlDirectory], viewer)
}

func (svr *SimpleHttpSvr) Run() {
	for dir, viewers := range svr.mpViewers {
		fmt.Printf("urlDir:%s\n", dir)
		for _, view := range viewers {
			view.OutputInPlainText()
			fmt.Printf("Ouput:%v\n", view.OutPut())
		}
	}
}

func main() {
	redisConfig := NewRedisConfig("192.168.1.1", "6379", "redis_123")
	kafkaConfig := NewKafkaConfig("Kafka_Test:9092", "OrderState")
	mysqlConfig1 := NewMySqlConfig("192.168.1.2", "6379", "pwd001", "price_db")
	mysqlConfig2 := NewMySqlConfig("192.168.1.3", "6379", "pwd002", "order_db")

	redisConfigUpdater := NewScheduledUpdater(30, redisConfig)
	redisConfigUpdater.Run()
	kafaConfigUpdater := NewScheduledUpdater(15, kafkaConfig)
	kafaConfigUpdater.Run()

	SimpleHttpSvr := NewSimpleHttpSvr("127.0.0.1", 9200)
	SimpleHttpSvr.AddViewer("/config/redis", redisConfig)
	SimpleHttpSvr.AddViewer("/config/mysql", mysqlConfig1)
	SimpleHttpSvr.AddViewer("/config/mysql", mysqlConfig2)
	SimpleHttpSvr.Run()

	select {}
}

运行结果:
//模拟 SimpleHttpSvr 展示 RedisConfig和 MySqlConfig

urlDir:/config/redis
RedisConfig::outputInPlainText
Ouput:map[192.168.1.1:6379]
urlDir:/config/mysql
MySqlConfig::outputInPlainText
Ouput:map[price_db:192.168.1.2_pwd001]
MySqlConfig::outputInPlainText
Ouput:map[order_db:192.168.1.3_pwd002]

//模拟 定时热更新 KafkaConfig和RedisConfig

Tick at 2022-03-19 17:30:27.022323599 +0800 CST m=+15.012814195
KafkaConfig::update(), reload data from private redis config
update config(Kafka_Test:9092,OrderState) succ
Tick at 2022-03-19 17:30:42.024135251 +0800 CST m=+30.014625841
KafkaConfig::update(), reload data from private redis config
update config(Kafka_Test:9092,OrderState) succ
Tick at 2022-03-19 17:30:42.024132902 +0800 CST m=+30.014623512
RedisConfig::update(), reload data from private redis config
update config(192.168.1.1,6379,redis_123) succ

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐