Kafka 如何实现更高效消费者-批量读取(C++客户端实现)
Kafka 批量消费 批量读取
作为一个c++程序,我相信很多人都会使用c++的librdkafkacpp的接口,这些接口通过C++封装,使用起来方便,不需要去了解底层的c接口的封装及实现,是最理想的使用方式。
那为啥还要使用C接口呢?
这就涉及到C++的ABI二进制兼容问题。
当我们编译一个C++动态库,受限于不同的编译器版本,无论是gcc还是VC,都存在着兼容问题。对于一个生产版本,如果在统一了编译工具链的情景下,是可以使用C++版本,但如果考虑到更好的ABI兼容,所以就选择了C库。其无论在VC还是gcc,都表现得很好。
C库kafka的消费模式
三种消费模式的异同
librdKafka 提供了三种不同的消费模式,下面将讲解这三种模式的差异及如何使用。
// 一次读取一条数据
rd_kafka_consumer_poll();
rd_kafka_consume();
// 批量读取,一次可以读取多条数据
// 通过回调函数的批量读取,将废弃
rd_kafka_consume_callback();
rd_kafka_consume_batch();
// 通过Queue路由道特定的处理函数
rd_kafka_consume_callback_queue
rd_kafka_consume_batch_queue
1、这种模式,直接使用rd_kafka_consumer_poll()接口,其可以实现1次读取一条,这种场景对于数据量不大,可以很好的使用,而且其客户端API也实现了相关的消息确认和提交,大多数采用这种方式,这种方式的例程,我在前面的博文中有讲到,有兴趣的可以去看看
2、这种模式是批量读取的模式,这种模式下有两种接口,一种基于api的回调
rd_kafka_consume_callback,这个接口根据官方文档是即将被废弃的接口,所以不建议使用该接口,建议使用rd_kafka_consume_batch(),后面会给出这种模式下的使用例子。
3、使用Queue的方式,该方式也提供了两种模式,一种回调,一种直接消费,回调的方式应该在性能上是最高的。这种方式与第二种方式的不同是,他能让不同topic,不同patition的数据路由到一个队列,也就是对于一个需要处理多种数据的消费者。
使用批量读取的注意:
1、消息最好程序中自己做确认提交,这样在kafka的服务,才不会看到滞后的消息消费
2、注意消息的销毁,kafka在销毁对象时会持有引用技术,消费完的消息,要destoy
3、对于topic+partition的消费模式,最好不要在运行热新增分区或主题,否则客户端处理起来比较麻烦。如果必要这么做,要做好测试。保证数据不重复不丢消息。
代码实现
KafkaConsumer.h
#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H
#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafka.h"
using namespace std;
struct TKafkaCfgInfo
{
string m_strBrokers;
string m_strGroupID;
string m_strTopics;
int m_nPartition;
int m_nBatchReadSize;
int m_nReadTimeout;
bool m_bIsLogKafka;
TKafkaCfgInfo()
: m_nPartition(-1)
, m_nBatchReadSize(-1)
, m_nReadTimeout(-1)
, m_bIsLogKafka(false)
{}
};
class CKafkaConsumer
{
public:
CKafkaConsumer();
virtual ~CKafkaConsumer();
// 初?始º?化¡¥Kafka配?置?
int InitCfg(const TKafkaCfgInfo& tCfg);
const TKafkaCfgInfo& GetCfg() { return m_tCfg; }
// 拉¤-取¨?Kafka消?息¡é
void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);
void SetConsumerOk(rd_kafka_message_t* pMsg);
protected:
rd_kafka_t* m_pkafka;
rd_kafka_topic_t* m_pKafkaTopic;
TKafkaCfgInfo m_tCfg;
};
#endif // KAFKACONSUMER_H
// KafkaConsumer.cpp
#include "KafkaConsumer.h"
#include <iostream>
using namespace std;
static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
{
测a试º?代䨲码?
#if 0
if (partitions)
{
for (int i = 0; i < partitions->cnt; i++)
{
partitions->elems[i].offset = 0;
}
}
printf("RebalanceCb \r\n");
#endif
printf("RebalanceCb \n");
rd_kafka_assign(rk, partitions);
}
static void EventErrorCb(rd_kafka_t* rk, int err,
const char* reason,
void* opaque)
{
printf("Kafka EventErrorCb(%d): %s\n", err, reason);
}
CKafkaConsumer::CKafkaConsumer()
: m_pkafka(nullptr)
, m_pKafkaTopic(nullptr)
{
}
int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
{
m_tCfg = tCfg;
rd_kafka_conf_t* pConf = rd_kafka_conf_new();
if (!pConf)
{
return -1;
}
char szErr[512] = { 0 };
if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
// rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
rd_kafka_conf_set_error_cb(pConf, EventErrorCb);
if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(pConf);
return -1;
}
// topic配?置?
/*
rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
/*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_topic_conf_destroy(pTopicConf);
return -1;
}*/
// 创ä¡ä建¡§kafka实º¦Ì例¤y
m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
if (!m_pkafka)
{
return -1;
}
m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
if (!m_pKafkaTopic)
{
return -1;
}
rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);
return 0;
}
void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
{
nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
// rd_kafka_poll(m_pkafka, 0);
}
void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
{
rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
{
return;
}
errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
{
return;
}
}
CKafkaConsumer::~CKafkaConsumer()
{
if (m_pKafkaTopic)
{
rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
rd_kafka_topic_destroy(m_pKafkaTopic);
m_pKafkaTopic = nullptr;
}
if (m_pkafka)
{
// rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy调Ì¡Â用®?会¨¢调Ì¡Â用®?rd_kafka_consumer_close
rd_kafka_destroy(m_pkafka);
m_pkafka = nullptr;
}
}
// Test.cpp
#include "stdafx.h"
#include "KafkaConsumer.h"
#include <assert.h>
int _tmain(int argc, _TCHAR* argv[])
{
string brokers = "192.168.254.129:9092";
string strTopic = "test";
string group = "test1";
int nPartition = 0;
CKafkaConsumer consumer;
TKafkaCfgInfo t;
t.m_strBrokers = brokers;
t.m_strGroupID = group;
t.m_strTopics = strTopic;
t.m_nPartition = nPartition;
int nRet = consumer.InitCfg(t);
if (nRet != 0)
{
assert(0);
return 0;
}
int nBatchSize = 10;
rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];
while (1)
{
int nTimeout = 1000;
ssize_t nCnt = 0;
consumer.pullMessage(pRet, nCnt);
if (nCnt < 0)
{
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
continue;
}
if (nCnt == 0)
{
// donothing
continue;
}
for (int nIndex = 0; nIndex < nCnt; nIndex++)
{
rd_kafka_message_t* pMsg = pRet[nIndex];
printf("%s\n", pMsg->payload);
consumer.SetConsumerOk(pMsg);
rd_kafka_message_destroy(pMsg);
}
}
delete[] pRet;
return 0;
}
更多推荐
所有评论(0)