项目需求将接收到的消息写到kafka中,kafka 客户端用到了比较流行的sarama组件。参考了网上的示例代码,上线后一堆问题,开源代码还是需要理解API 底层实现原理,不能简单拿来主义。

 

一. kafka 生产者

import 	"github.com/Shopify/sarama"

var Brokers = []string{
	"10.120.9.123:9092",
	"10.120.9.124:9092",
}

// 初始化kafka
func InitKafka() (sarama.AsyncProducer, sarama.Client, error) {
	config := sarama.NewConfig()
	config.Net.DialTimeout = 3 * time.Second // connect
	config.Net.ReadTimeout = 5 * time.Second
	config.Net.WriteTimeout = 5 * time.Second
	config.Producer.Timeout = 3 * time.Second // Ack

	// 创建连接
	client, err := sarama.NewClient(Brokers, config)
	if err != nil {
		log.Printf("Create kafka error: %v\n", err)
		return nil, nil, err
	}

	// 异步客户端
	producer, err1 := sarama.NewAsyncProducerFromClient(client)
	if err1 != nil {
		log.Printf("Create produce error: %v\n", err1)
		return nil, nil, err1
	}

	//log.Printf("Create kafka ok: %v", producer)
	return producer, client, nil
}


func IndexHandler(w http.ResponseWriter, r *http.Request) {
	// 初始化kafka
	producer, _, _ := InitKafka()
	defer func() {
		fmt.Fprintf(w, strconv.Itoa(len(Msg)))

		if producer != nil {
			producer.Close()		// 释放连接资源
		}
	}()

	// 批量写kafka
	for _, v := range (Msg) {
		b, _ := json.Marshal(v)

		// 异步写kafka消息
		producer.Input() <- &sarama.ProducerMessage{
			Topic: TOPIC,
			Key:   nil,
			Value: sarama.StringEncoder(string(b)),
		}
	}
}

/// main
func main() {
        mux := http.NewServeMux()
        mux.HandleFunc("/interface.php", IndexHandler)

        server := &http.Server{
                Addr:         ":8000",
                ReadTimeout:  time.Second * 5,
                WriteTimeout: time.Second * 5,
                Handler:      mux,
        }

        server.ListenAndServe()
}

代码是标准的sara用法,每次接收到Post请求后创建异步生产者,使用完后调用Close() 释放连接资源。运行10分钟后,程序core了,查看日志:

2019/12/15 00:51:22 server.go:3056: http: Accept error: accept tcp [::]:80: accept4: too many open files; retrying in 160ms
2019/12/15 00:51:23 server.go:3056: http: Accept error: accept tcp [::]:80: accept4: too many open files; retrying in 320ms
2019/12/15 00:51:23 kafkaserver.go:59: Create kafka error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2019/12/15 00:51:23 kafkaserver.go:144: err: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

从日志中看到是连接fd过多,导致无空闲的fd使用。

netstat -anp |grep 9092| wc -l
180234
netstat -anp |grep 9092 | grep TIME | wc -l
0

连接数不断增长,没有处于 TIME_WAIT 状态的连接,说明在不断创建连接,却没有关闭。 那么问题来了, defer produce.Close() 岂不是形同虚设。

 

二. pprof定位资源

添加pprof 监控代码,定位程序调用堆栈:

import _ "net/http/pprof"

/// main
func main() {
        // 启用sarama日志
        sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)

        mux := http.NewServeMux()
        mux.HandleFunc("/interface.php", IndexHandler)

        server := &http.Server{
                Addr:         ":8000",
                ReadTimeout:  time.Second * 5,
                WriteTimeout: time.Second * 5,
                Handler:      mux,
        }

        go func(){
                http.ListenAndServe(":8001", nil)    // 启动pprof监控
        }()

        server.ListenAndServe()
}

2.1 资源使用

发送一次Get请求,居然产生了两个kafka连接,且请求结束后连接仍然为established 状态。

$ netstat -anp |grep 9092
tcp        0      0 100.28.13.167:26874     100.12.92.233:9092      ESTABLISHED 10131/mykafka
tcp        0      0 100.28.13.167:26875     100.12.92.233:9092      ESTABLISHED 10131/mykafka 

资源一直未释放,直到用尽本机所有的fd而打印上述错误。

 

2.2 goroutine 

$ curl 100.28.13.167:8001/debug/pprof/goroutine?debug=1

goroutine profile: total 8
2 @ 0x42f71b 0x42f7c3 0x40779e 0x4074cb 0x7077f9 0x747dea 0x741ea3 0x45cb31
#       0x7077f8        github.com/Shopify/sarama.(*Broker).responseReceiver+0xd8       /data/home/kanesun/go_work/src/github.com/Shopify/sarama/broker.go:818
#       0x747de9        github.com/Shopify/sarama.(*Broker).responseReceiver-fm+0x29    /data/home/kanesun/go_work/src/github.com/Shopify/sarama/broker.go:216
#       0x741ea2        github.com/Shopify/sarama.withRecover+0x42                      /data/home/kanesun/go_work/src/github.com/Shopify/sarama/utils.go:45

1 @ 0x42f71b 0x43f44d 0x710d3b 0x747e2a 0x741ea3 0x45cb31
#       0x710d3a        github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0x12a     /data/home/kanesun/go_work/src/github.com/Shopify/sarama/client.go:721
#       0x747e29        github.com/Shopify/sarama.(*client).backgroundMetadataUpdater-fm+0x29   /data/home/kanesun/go_work/src/github.com/Shopify/sarama/client.go:172
#       0x741ea2        github.com/Shopify/sarama.withRecover+0x42                              /data/home/kanesun/go_work/src/github.com/Shopify/sarama/utils.go:45

服务同时产生了3个goroutine常驻内存,没有被释放,就这样产生了内存泄露。

 

协程一主要是启动了定时器,刷新元数据信息。

// 协程一:client.go 
func NewClient(addrs []string, conf *Config) (Client, error) {
	Logger.Println("Initializing new client")
	
	go withRecover(client.backgroundMetadataUpdater)		// L172,启动了间隔为10min的定时器
}

func (client *client) backgroundMetadataUpdater() {
	defer close(client.closed)
	
	ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)			// 定时器默认10min
	defer ticker.Stop()

	for {
		select {																			// L721
		case <-ticker.C:
			if err := client.refreshMetadata(); err != nil {
				Logger.Println("Client background metadata update:", err)
			}
		case <-client.closer:
			return
		}
	}
}

协程二:启动读取响应协程,由于没有信息交互,该channel会一直堵塞。

// 协程二:broker.go 
func (b *Broker) Open(conf *Config) error {
	go withRecover(b.responseReceiver)				// L216 启动读取channel协程
}

func (b *Broker) responseReceiver() {
	var dead error
	header := make([]byte, 8)

	for response := range b.responses {				// L818 遍历channel
		if dead != nil {
			response.errors <- dead
			continue
		}
}

goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露

现在问题的本质在于client 连接没有关闭,produce.Close() 未生效,跟踪下Close()调用源码:

// Close()
func (p *asyncProducer) Close() error {
	p.AsyncClose()
}

// 一言不合就启协程
func (p *asyncProducer) AsyncClose() {
	go withRecover(p.shutdown)
}

func (p *asyncProducer) shutdown() {
	Logger.Println("Producer shutting down.")
	p.inFlight.Add(1)
	p.input <- &ProducerMessage{flags: shutdown}

	p.inFlight.Wait()

	err := p.client.Close()			// 调用client 关闭方法
	if err != nil {
		Logger.Println("producer/shutdown failed to close the embedded client:", err)
	}

	close(p.input)
	close(p.retries)
	close(p.errors)
	close(p.successes)
}

shutdown() 明明已经调用了client.Close(),连接为何还没被释放。那看下这个Client 有何异常。

func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
	// For clients passed in by the client, ensure we don't
	// call Close() on it.
	cli := &nopCloserClient{client}				// 实际上初始化的是Client派生类
	return newAsyncProducer(cli)
}

type nopCloserClient struct {
	Client
}

// Close intercepts and purposely does not call the underlying
// client's Close() method.
func (ncc *nopCloserClient) Close() error {		// Close接口被重写
	return nil
}

p.client.Close() 这行代码实际上执行的是nopCloserClient.Close(),没有关闭操作。

 

三. 解决方案

我们需要手动调用client.Close() 

// 初始化kafka
producer, client, _ := InitKafka()
defer func() {
	if producer != nil {
		producer.Close()		// 关闭生产者
	}

	if client != nil {
		client.Close()				// 手动关闭客户端连接
	}
}()

再次编译运行,执行完请求,多余的3个协程不见了,测试正常。

 

四. kafka连接数

为什么每次请求产生了两个kakfa连接?上文异常代码最终产生的kafka连接数为请求数*2。

// 创建broker连接
func NewClient(addrs []string, conf *Config) (Client, error) {
	Logger.Println("Initializing new client")

	if conf == nil {
		conf = NewConfig()
	}

	if err := conf.Validate(); err != nil {
		return nil, err
	}

	if conf.Metadata.Full {
		// do an initial fetch of all cluster metadata by specifying an empty list of topics
		err := client.RefreshMetadata()		// kafka连接一:刷新元数据,选取可用broker
		switch err {
		case nil:
			break
		case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
			// indicates that maybe part of the cluster is down, but is not fatal to creating the client
			Logger.Println(err)
		default:
			close(client.closed) // we haven't started the background updater yet, so we have to do this manually
			_ = client.Close()
			return nil, err
		}
	}
	go withRecover(client.backgroundMetadataUpdater) // kafka连接二:更新元数据

	Logger.Println("Successfully initialized new client")
	return client, nil
}

NewClient 产生了两个连接,连接一在初始时更新一次元数据,同时还启动一个协程,用于在后台定时更新元数据。两者都会调用RefreshMetadata()。

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐