概要

Consumer group主要处理协调消费的问题。

为了应对变化,消费时按照时间段被切分成不同的generation,在同一时刻所有的group中的clients对应同一个generation,同一时刻只会有一个generation,新的generation需要所有旧generation产生的goroutine结束后才会创建(后面代码分析会做解释)。使用generation的另一个好处是防止旧generation拉取或者提交的数据,保证数据的一致性。

理论上讲,当group中的组员发生变化、监听的topics发生变化、同一topic中的partition数量发生变化时,会生成新的generation,重新分配组员处理的partitions。从代码上看,触发更新generation的操作可以是由客户端发起,也可以由服务端发起(个人觉得只由服务端发起应该更合理一些)。

当新一轮generation开始时,分为两个步骤,join操作和sync操作。Join是所有的group client均向coordinator发出请求,表示新generation的选举开始。Coordinator会指定一个client作为这一轮generation的leader,同时将消费topic的基础信息发送给她。Coordinator在接受到join请求后,并不会立即返回,它会等待一段时间,以便接收到该group下所有client发出的join请求,这么做的目的是需要返回给leader这次所有client的信息和分配策略,以便其做分配。Client发出join请求时,还需要带上支持的balancer协议,coordinator会选择一个所有clients均支持的协议返回给leader。做sync操作时,leader会把分配方案一起发送给coordinator,coordinator会将每个client需要处理的partition返回给client。与join一致,coordinator接收到sync请求时也不会立即返回,因为它要等待leader的分配方案到达。

代码

这部分的代码主要有两个核心类型,Consumer Group和Generation,我们逐一分析。

Consumer Group

ConsumerGroup的主要成员是Generation,这个类型的主要作用是提供在其类型上的方法,包括创建和离开generation。在创建过程中,又分为join操作和sync操作。因为在这些步骤中,均不需要提供和generation相关的信息,可以好好体会一下它与Generation的分工。

类型和配置参数


type ConsumerGroup struct {
	config ConsumerGroupConfig
	next   chan *Generation
	errs   chan error

	closeOnce sync.Once
	wg        sync.WaitGroup
	done      chan struct{} // 表示结束的channel
}
type ConsumerGroupConfig struct {
	// consumer group ID.根据它定位到coordinator所在的broker,不能为空
	ID string

	Brokers []string

	Dialer *Dialer // 前面章节介绍过

	Topics []string

	// rebalance时的分配策略
	GroupBalancers []GroupBalancer

	// 向coordinator发送心跳的间隔时间,失败时表示当前generation关闭,需要生成新的generation
	// Default: 3s
	HeartbeatInterval time.Duration

	// 确认topic对应的partition数目发生变化的间隔时间,partition数目变化时表示当前generation关闭,需要生成新的generation
	// Default: 5s
	PartitionWatchInterval time.Duration

	// 开关,控制是否监控partition数目发生变化
	WatchPartitionChanges bool

	// 如果coordinator未在该时间内接收到client发出的心跳,则可认为该client已经处于失效的状态,同时触发rebalance
	// Default: 30s
	SessionTimeout time.Duration

	// 等待所有client join的时间长度
	// Default: 30s
	RebalanceTimeout time.Duration

	// 当join请求失败后,下一次re-join请求的时间间隔
	// Default: 5s
	JoinGroupBackoff time.Duration

	//  Consumer Group在brokers中的存留时间
	RetentionTime time.Duration

	// 当partition未有过commit记录时采用的消费策略
	// FirstOffset or LastOffset.
	// Default: FirstOffset
	StartOffset int64
}

核心方法

1 获取新的generation

func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-cg.done:
		return nil, ErrGroupClosed
	case err := <-cg.errs:
		return nil, err
	case next := <-cg.next:
		return next, nil
	}
}

2 产生generation的后台goroutine

func (cg *ConsumerGroup) run() {
	var memberID string
	var err error
	for { // 循环,这样当上一个generation close后可以生成新的
		memberID, err = cg.nextGeneration(memberID) // 生成新的generation
		var backoff <-chan time.Time
		switch err {
		case nil: // no error,之前的generation正常退出
			continue
		case ErrGroupClosed: // cg 关闭
			_ = cg.leaveGroup(memberID) // 通知coordinator
			return
		case RebalanceInProgress: // 服务端逻辑
		default: // 其它错误
			_ = cg.leaveGroup(memberID)
			memberID = ""
			backoff = time.After(cg.config.JoinGroupBackoff)
		}
		
		select {
		case <-cg.done:
			return
		case cg.errs <- err:
		}
		
		if backoff != nil {
			select {
			case <-cg.done:
				return
			case <-backoff:
			}
		}
	}
}

2.1 生成新的generation

func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
	// 连接coordinator
	conn, err := cg.coordinator()
	if err != nil {
		。。。// 日志
	}
	defer conn.Close()

	var generationID int32
	var groupAssignments GroupMemberAssignments
	var assignments map[string][]int32

	memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID) // join操作,如果是leader则分配任务
	if err != nil {
		。。。 // 日志
	}

	// sync group
	assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments) // sync操作
	if err != nil {
		。。。//日志
	}

	// 获取本client需要处理partitions的offsets.
	var offsets map[string]map[int]int64
	offsets, err = cg.fetchOffsets(conn, assignments)
	if err != nil {
		。。。//日志
		return memberID, err
	}

	// 生成新的generation
	gen := Generation{
		ID:              generationID,
		GroupID:         cg.config.ID,
		MemberID:        memberID,
		Assignments:     cg.makeAssignments(assignments, offsets),
		conn:            conn,
		done:            make(chan struct{}),
		joined:          make(chan struct{}),
		retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
		log:             cg.withLogger,
		logError:        cg.withErrorLogger,
	}

	// 开启本generation的多个goroutine,一旦其中一个goroutine返回,表示本次generation结束
	gen.heartbeatLoop(cg.config.HeartbeatInterval) // 心跳
	if cg.config.WatchPartitionChanges {
		for _, topic := range cg.config.Topics {
			gen.partitionWatcher(cg.config.PartitionWatchInterval, topic) // 监控topic partitions的变化
		}
	}

	select {
	case <-cg.done:
		gen.close()
		return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
	case cg.next <- &gen: // 将新生成的generation放入channel中
	}

	select {
	case <-cg.done:
		gen.close()
		return memberID, ErrGroupClosed // ErrGroupClosed will trigger leave logic.
	case <-gen.done: // 等待本次generation结束
		gen.close()
		return memberID, nil
	}
}

2.2 join操作

func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
	request, err := cg.makeJoinGroupRequestV1(memberID) // 拼request
	response, err := conn.joinGroup(request) // 向coordinator发出请求
	memberID = response.MemberID // 获取member id
	generationID := response.GenerationID // 获取generation id

	var assignments GroupMemberAssignments
	if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { // 被coordinator指定为本次generation的leader
		v, err := cg.assignTopicPartitions(conn, response) // 分配工作
		assignments = v
	}
	return memberID, generationID, assignments, nil
}

2.3 sync操作

func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
	request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) // 拼请求
	response, err := conn.syncGroup(request) // 向coordinator发出sync请求
	assignments := groupAssignment{}
	reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
	if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil { // 读取本client需要处理的partitions
		return nil, err
	}
	return assignments.Topics, nil
}

2.4 离开本generation

func (cg *ConsumerGroup) leaveGroup(memberID string) error {
	if memberID == "" {
		return nil
	}

	// 重新连接,因为存在因连接问题造成的generation失效的情况,重新连接可以防止这种情况
	coordinator, err := cg.coordinator()
	_, err = coordinator.leaveGroup(leaveGroupRequestV0{ // 向coordinator发出离开的请求
		GroupID:  cg.config.ID,
		MemberID: memberID,
	})
	_ = coordinator.Close() // 清理工作
	return err
}

Generation

commit offset,heartbeat以及观察partitions数量的变化均同generation相关。试想一旦generation发生变化,则commit offset一定是不会被coordinator所接受的。

类型和配置

type Generation struct {
	ID int32
	GroupID string // customer group id
	MemberID string // 由coordinator分配
	Assignments map[string][]PartitionAssignment // 被分配的任务

	conn coordinator

	lock     sync.Mutex
	done     chan struct{} // 本次generation是否需要关闭
	closed   bool
	routines int // 开启goroutine的个数
	joined   chan struct{} // 是否所有开启的goroutine都退出
}

核心方法

1 结束一轮generation

func (g *Generation) close() {
	g.lock.Lock()
	if !g.closed {
		close(g.done) // 关闭channel
		g.closed = true
	}
	r := g.routines
	g.lock.Unlock()

	// 如果还有这轮活动开启的goroutine,等待结束
	if r > 0 {
		<-g.joined
	}
}

2 generation开启goroutine的方法

func (g *Generation) Start(fn func(ctx context.Context)) {
	g.lock.Lock()
	defer g.lock.Unlock()

	if g.closed { // 没看懂。。。
		go fn(genCtx{g})
		return
	}

	// 计数加1
	g.routines++

	go func() {
		fn(genCtx{g}) // 运行函数
		g.lock.Lock()
		
		if !g.closed {
			close(g.done) // 关闭channel,这样别的goroutine也能感知到
			g.closed = true
		}
		g.routines-- // 计数减1
		if g.routines == 0 {
			close(g.joined) // 通知close函数退出
		}
		g.lock.Unlock()
	}()
}

3 本轮generation的commit offset

func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
	if len(offsets) == 0 {
		return nil
	}
        // 按照请求格式的需要构造参数
	topics := make([]offsetCommitRequestV2Topic, 0, len(offsets))
	for topic, partitions := range offsets {
		t := offsetCommitRequestV2Topic{Topic: topic}
		for partition, offset := range partitions {
			t.Partitions = append(t.Partitions, offsetCommitRequestV2Partition{
				Partition: int32(partition),
				Offset:    offset,
			})
		}
		topics = append(topics, t)
	}

	request := offsetCommitRequestV2{ // 带上generation的参数,coordinator会做验证
		GroupID:       g.GroupID,
		GenerationID:  g.ID,
		MemberID:      g.MemberID,
		RetentionTime: g.retentionMillis,
		Topics:        topics,
	}

	_, err := g.conn.offsetCommit(request) // 向coordinator发送commit请求
	if err == nil {
		。。。 // 日志
	}

	return err
}

4 发送心跳

func (g *Generation) heartbeatLoop(interval time.Duration) {
	g.Start(func(ctx context.Context) {
		ticker := time.NewTicker(interval) // 设置ticker
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				_, err := g.conn.heartbeat(heartbeatRequestV0{ // 发送心跳请求,带上本轮generation的信息
					GroupID:      g.GroupID,
					GenerationID: g.ID,
					MemberID:     g.MemberID,
				})
				if err != nil { // 退出,进入下一轮generation
					return
				}
			}
		}
	})
}

5 观察partition的变化

func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
	g.Start(func(ctx context.Context) {
		ticker := time.NewTicker(interval)  // 设置ticker
		defer ticker.Stop()

		ops, err := g.conn.readPartitions(topic) // topic下的partitions信息
		oParts := len(ops)
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				ops, err := g.conn.readPartitions(topic)
				switch err {
				case nil, UnknownTopicOrPartition:
					if len(ops) != oParts { // partition发生变化,退出进入下一轮generation
						return
					}
				default:
					return
				}
			}
		}
	})
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐