概念

​ 消费者组(Consumer Group):由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

整体流程

在这里插入图片描述

流程说明:

  • 消费者组包括多个消费者,每个消费者只能消费分区中的一部分数据;
  • 当一个消费者组中的消费者读取一个分区中的数据时,其他消费者就不能再读取该分区中的数据;
  • 一个消费者组可以有多个消费者,每个消费者只能消费分配给该消费者组的某些主题的某些分区;
  • 同一个分区只会被一个消费者组中的一个消费者消费,不同消费者组之间可以重复消费
  • 当消费者组中的某个消费者宕机后,Kafka会将该消费者所消费的分区重新分配给其他消费者,从而实现消费者的高可用性;
  • 消费者组中的消费者可以动态加入和退出,Kafka会自动重新分配分区;
  • 在同一个消费者组内,消费者之间可以进行负载均衡,以此来提高消息的吞吐量和消费的效率;
  • 消费者组可以通过消费者组ID(groupid)来标识,一个消费者组ID可以同时消费多个主题;

配置参数说明

参数名称描述
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。
key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。一定要写全类名。
group.id标记消费者所属的消费者组。
enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

分区策略

  • Range
# 特点
确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
# 算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个。

在这里插入图片描述

  • RoundRobin
# 特点
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。

在这里插入图片描述

  • Sticky
# 特点
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
  • CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+ CooperativeSticky。

回调函数说明

事件回调

  • 设置回调
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    printf("Conf set(event_cb) failed, errorStr:%s\n", errorStr.c_str());
    return;
}
  • 回调处理
// 设置事件回调
class ConsumerEventCb : public RdKafka::EventCb 
{
public:
	void event_cb(RdKafka::Event &event) 
	{
		switch (event.type()) 
		{
		case RdKafka::Event::EVENT_ERROR:
			break;
		case RdKafka::Event::EVENT_STATS:
			break;
		case RdKafka::Event::EVENT_LOG:
			break;
		case RdKafka::Event::EVENT_THROTTLE:
			break;
		default:
			break;
		}
	}
};

消费者组再平衡回调

  • 设置回调
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode) 
{
    ELogError(("%s|Conf set(rebalance_cb) failed, errorStr:%s", GET_CODE_INFO(), errorStr.c_str()));
    break;
}
  • 回调处理
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb 
{
private:
	// 打印当前获取的分区
	static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions) 
	{
		for (unsigned int i = 0; i < partitions.size(); i++) 
		{
			printf("count:%d, topic:%s,partition:%d\n",
				i, 
				partitions[i]->topic().c_str(),
				partitions[i]->partition());
		}
	}

public:
	// 消费者组再平衡回调
	void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
		std::vector<RdKafka::TopicPartition *> &partitions) 
	{
		printf("RebalanceCb: %s", RdKafka::err2str(err).c_str());
		printTopicPartition(partitions);

		// 分区分配成功
		if (RdKafka::ERR__ASSIGN_PARTITIONS == err) 
		{
			// 消费者订阅这些分区
			consumer->assign(partitions);
			// 获取消费者组本次订阅的分区数量,可以属于不同的topic
			m_partitionCount = (int)partitions.size();
		} 
		else   // 分区分配失败
		{
			// 消费者取消订阅所有的分区
			consumer->unassign();
			// 消费者订阅分区的数量为0
			m_partitionCount = 0;
		}
	}

private:
	int m_partitionCount;    // 消费者组本次订阅的分区数量
};

流程(c++)

  • 配置消费者客户端;
  • 订阅主题和分区;
  • 拉取消息;
  • 处理消息;
  • 提交消费位移;

配置消费者客户端

int CKafkaConsumer::Create()
{
	std::string errorStr;
	RdKafka::Conf::ConfResult errorCode;

	do 
	{
		// 1、创建配置对象
		// 1.1、构造 consumer conf 对象
		m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
		if(nullptr == m_config)
		{
            printf("Create RdKafka Conf failed.\n");
			break;
		}

		// 必要参数1:指定 broker 地址列表
		errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 必要参数2:设置消费者组 id
		errorCode = m_config->set("group.id", m_groupID, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(group.id) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置事件回调
		m_event_cb = new ConsumerEventCb;
		errorCode = m_config->set("event_cb", m_event_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(event_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置消费者组再平衡回调
		m_rebalance_cb = new ConsumerRebalanceCb;
		errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
		errorCode = m_config->set("enable.partition.eof", "false", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 每次最大拉取的数据大小
		errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 设置分区分配策略:range、roundrobin、cooperative-sticky
		errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳探活超时时间---1s
		errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 心跳保活间隔
		errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 1.2、创建 topic conf 对象
		m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
		if (nullptr == m_topicConfig) 
		{
            printf("Create RdKafka Topic Conf failed.\n");
			break;
		}

		// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
		errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 默认 topic 配置,用于自动订阅 topics
		errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
		if (RdKafka::Conf::CONF_OK != errorCode) 
		{
            printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

		// 2、创建 Consumer 对象
		m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
		if (nullptr == m_consumer) 
		{
            printf("Create KafkaConsumer failed, errorStr:%s.\n",
                  errorStr.c_str());
			break;
		}

        printf("Created consumer success, consumerName:%s.\n",
                  m_consumer->name().c_str());
		return 0;
	} while (0);

	Destroy();
	return -1;
}

订阅主题和分区

std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");

RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode) 
{
    printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
    return;
}

拉取消息

// 可放到线程中处理

while (m_running) 
{
    RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
    if(NULL != msg)
    {
        // 消费消息
        ConsumeMsg_(msg, NULL);

        m_consumer->commitAsync(); 
        delete msg;
    }
}

处理消息

void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{
	switch (msg->err()) 
	{
	case RdKafka::ERR__TIMED_OUT: // 超时
		break;
	case RdKafka::ERR_NO_ERROR:   // 有消息进来
		printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",
			msg->topic_name().c_str(), 
			msg->partition(), 
			msg->key()->c_str(), 
			(char *)msg->payload());
            
        // 消息处理
		break;
	default:
		break;
	}
}

提交消费位移

m_consumer->commitAsync(); 
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐