golang kafka内存泄露
项目需求将接收到的消息写到kafka中,kafka 客户端用到了比较流行的sarama组件。参考了网上的示例代码,上线后一堆问题,开源代码还是需要理解API底层实现原理,不能简单拿来主义。一. kafka生产者import"github.com/Shopify/sarama"var Brokers = []string{"10.120.9.123:909...
项目需求将接收到的消息写到kafka中,kafka 客户端用到了比较流行的sarama组件。参考了网上的示例代码,上线后一堆问题,开源代码还是需要理解API 底层实现原理,不能简单拿来主义。
一. kafka 生产者
import "github.com/Shopify/sarama"
var Brokers = []string{
"10.120.9.123:9092",
"10.120.9.124:9092",
}
// 初始化kafka
func InitKafka() (sarama.AsyncProducer, sarama.Client, error) {
config := sarama.NewConfig()
config.Net.DialTimeout = 3 * time.Second // connect
config.Net.ReadTimeout = 5 * time.Second
config.Net.WriteTimeout = 5 * time.Second
config.Producer.Timeout = 3 * time.Second // Ack
// 创建连接
client, err := sarama.NewClient(Brokers, config)
if err != nil {
log.Printf("Create kafka error: %v\n", err)
return nil, nil, err
}
// 异步客户端
producer, err1 := sarama.NewAsyncProducerFromClient(client)
if err1 != nil {
log.Printf("Create produce error: %v\n", err1)
return nil, nil, err1
}
//log.Printf("Create kafka ok: %v", producer)
return producer, client, nil
}
func IndexHandler(w http.ResponseWriter, r *http.Request) {
// 初始化kafka
producer, _, _ := InitKafka()
defer func() {
fmt.Fprintf(w, strconv.Itoa(len(Msg)))
if producer != nil {
producer.Close() // 释放连接资源
}
}()
// 批量写kafka
for _, v := range (Msg) {
b, _ := json.Marshal(v)
// 异步写kafka消息
producer.Input() <- &sarama.ProducerMessage{
Topic: TOPIC,
Key: nil,
Value: sarama.StringEncoder(string(b)),
}
}
}
/// main
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/interface.php", IndexHandler)
server := &http.Server{
Addr: ":8000",
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
Handler: mux,
}
server.ListenAndServe()
}
代码是标准的sara用法,每次接收到Post请求后创建异步生产者,使用完后调用Close() 释放连接资源。运行10分钟后,程序core了,查看日志:
2019/12/15 00:51:22 server.go:3056: http: Accept error: accept tcp [::]:80: accept4: too many open files; retrying in 160ms
2019/12/15 00:51:23 server.go:3056: http: Accept error: accept tcp [::]:80: accept4: too many open files; retrying in 320ms
2019/12/15 00:51:23 kafkaserver.go:59: Create kafka error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
2019/12/15 00:51:23 kafkaserver.go:144: err: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
从日志中看到是连接fd过多,导致无空闲的fd使用。
netstat -anp |grep 9092| wc -l
180234
netstat -anp |grep 9092 | grep TIME | wc -l
0
连接数不断增长,没有处于 TIME_WAIT 状态的连接,说明在不断创建连接,却没有关闭。 那么问题来了, defer produce.Close() 岂不是形同虚设。
二. pprof定位资源
添加pprof 监控代码,定位程序调用堆栈:
import _ "net/http/pprof"
/// main
func main() {
// 启用sarama日志
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
mux := http.NewServeMux()
mux.HandleFunc("/interface.php", IndexHandler)
server := &http.Server{
Addr: ":8000",
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
Handler: mux,
}
go func(){
http.ListenAndServe(":8001", nil) // 启动pprof监控
}()
server.ListenAndServe()
}
2.1 资源使用
发送一次Get请求,居然产生了两个kafka连接,且请求结束后连接仍然为established 状态。
$ netstat -anp |grep 9092
tcp 0 0 100.28.13.167:26874 100.12.92.233:9092 ESTABLISHED 10131/mykafka
tcp 0 0 100.28.13.167:26875 100.12.92.233:9092 ESTABLISHED 10131/mykafka
资源一直未释放,直到用尽本机所有的fd而打印上述错误。
2.2 goroutine
$ curl 100.28.13.167:8001/debug/pprof/goroutine?debug=1
goroutine profile: total 8
2 @ 0x42f71b 0x42f7c3 0x40779e 0x4074cb 0x7077f9 0x747dea 0x741ea3 0x45cb31
# 0x7077f8 github.com/Shopify/sarama.(*Broker).responseReceiver+0xd8 /data/home/kanesun/go_work/src/github.com/Shopify/sarama/broker.go:818
# 0x747de9 github.com/Shopify/sarama.(*Broker).responseReceiver-fm+0x29 /data/home/kanesun/go_work/src/github.com/Shopify/sarama/broker.go:216
# 0x741ea2 github.com/Shopify/sarama.withRecover+0x42 /data/home/kanesun/go_work/src/github.com/Shopify/sarama/utils.go:45
1 @ 0x42f71b 0x43f44d 0x710d3b 0x747e2a 0x741ea3 0x45cb31
# 0x710d3a github.com/Shopify/sarama.(*client).backgroundMetadataUpdater+0x12a /data/home/kanesun/go_work/src/github.com/Shopify/sarama/client.go:721
# 0x747e29 github.com/Shopify/sarama.(*client).backgroundMetadataUpdater-fm+0x29 /data/home/kanesun/go_work/src/github.com/Shopify/sarama/client.go:172
# 0x741ea2 github.com/Shopify/sarama.withRecover+0x42 /data/home/kanesun/go_work/src/github.com/Shopify/sarama/utils.go:45
服务同时产生了3个goroutine常驻内存,没有被释放,就这样产生了内存泄露。
协程一主要是启动了定时器,刷新元数据信息。
// 协程一:client.go
func NewClient(addrs []string, conf *Config) (Client, error) {
Logger.Println("Initializing new client")
go withRecover(client.backgroundMetadataUpdater) // L172,启动了间隔为10min的定时器
}
func (client *client) backgroundMetadataUpdater() {
defer close(client.closed)
ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency) // 定时器默认10min
defer ticker.Stop()
for {
select { // L721
case <-ticker.C:
if err := client.refreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
return
}
}
}
协程二:启动读取响应协程,由于没有信息交互,该channel会一直堵塞。
// 协程二:broker.go
func (b *Broker) Open(conf *Config) error {
go withRecover(b.responseReceiver) // L216 启动读取channel协程
}
func (b *Broker) responseReceiver() {
var dead error
header := make([]byte, 8)
for response := range b.responses { // L818 遍历channel
if dead != nil {
response.errors <- dead
continue
}
}
goroutine泄露的本质是channel阻塞,无法继续向下执行,导致此goroutine关联的内存都无法释放,进一步造成内存泄露。
现在问题的本质在于client 连接没有关闭,produce.Close() 未生效,跟踪下Close()调用源码:
// Close()
func (p *asyncProducer) Close() error {
p.AsyncClose()
}
// 一言不合就启协程
func (p *asyncProducer) AsyncClose() {
go withRecover(p.shutdown)
}
func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}
p.inFlight.Wait()
err := p.client.Close() // 调用client 关闭方法
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
close(p.input)
close(p.retries)
close(p.errors)
close(p.successes)
}
shutdown() 明明已经调用了client.Close(),连接为何还没被释放。那看下这个Client 有何异常。
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
// For clients passed in by the client, ensure we don't
// call Close() on it.
cli := &nopCloserClient{client} // 实际上初始化的是Client派生类
return newAsyncProducer(cli)
}
type nopCloserClient struct {
Client
}
// Close intercepts and purposely does not call the underlying
// client's Close() method.
func (ncc *nopCloserClient) Close() error { // Close接口被重写
return nil
}
p.client.Close() 这行代码实际上执行的是nopCloserClient.Close(),没有关闭操作。
三. 解决方案
我们需要手动调用client.Close()
// 初始化kafka
producer, client, _ := InitKafka()
defer func() {
if producer != nil {
producer.Close() // 关闭生产者
}
if client != nil {
client.Close() // 手动关闭客户端连接
}
}()
再次编译运行,执行完请求,多余的3个协程不见了,测试正常。
四. kafka连接数
为什么每次请求产生了两个kakfa连接?上文异常代码最终产生的kafka连接数为请求数*2。
// 创建broker连接
func NewClient(addrs []string, conf *Config) (Client, error) {
Logger.Println("Initializing new client")
if conf == nil {
conf = NewConfig()
}
if err := conf.Validate(); err != nil {
return nil, err
}
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata() // kafka连接一:刷新元数据,选取可用broker
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
go withRecover(client.backgroundMetadataUpdater) // kafka连接二:更新元数据
Logger.Println("Successfully initialized new client")
return client, nil
}
NewClient 产生了两个连接,连接一在初始时更新一次元数据,同时还启动一个协程,用于在后台定时更新元数据。两者都会调用RefreshMetadata()。
更多推荐


所有评论(0)