MFC/QT通过librdkafka读写Kafka
C++通过librdkafka接口访问Kafka,封装了对kafka消息的生产和消费接口,MFC和QT可以直接调用本接口实现kafka消息的读写操作。
·
MFC/QT通过librdkafka访问Kafka
创建VS动态库工程项目
创建工程
用 VS2019
创建动态库工程,如下图所示:
设置头文件包含路径
设置 librdkafka
头文件路径:
设置库文件包含路径
设置 librdkafka
库文件路径:
设置依赖 lib 文件:
封装librdkafka接口
Kafka 生产者接口
生产者接口主要实现向Kafka服务器发送消息。
#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H
#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>
#ifdef KAFKACLIENT_DLL
#define KAFKA_DLLIMEXPORT __declspec(dllexport)
#else
#define KAFKA_DLLIMEXPORT __declspec(dllimport)
#endif
using namespace std;
/*-----------------------------------------------------------------------------------------------
* Kafka 生产者接口
*----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb
{
public:
void dr_cb(RdKafka::Message& message);
};
class KAFKA_DLLIMEXPORT KafkaProducerEventCallBack : public RdKafka::EventCb
{
public:
void event_cb(RdKafka::Event& event);
};
class KAFKA_DLLIMEXPORT KafkaProducerClient
{
public:
KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition = 0);
virtual ~KafkaProducerClient();
//初始化
bool Init(std::string& errstr);
//停止生产
void Stop();
//发送消息
void Send(std::string topic,const string& msg, std::string& errstr);
private:
//标志位
bool m_bRunFlag;
//分区
int m_nPpartition;
//代理
std::string m_strBroker;
//主题实例
//RdKafka::Topic* m_pTopic;
std::map<std::string, RdKafka::Topic*> m_mapTopic;
//生产者
RdKafka::Producer* m_pProducer;
//事项回调
KafkaProducerEventCallBack m_producerEventCallBack;
//消息分发通知
KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
};
Kafka 消费者接口
消费者接口主要实现从Kafka服务器读取消息。
#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H
#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>
#ifdef KAFKACLIENT_DLL
#define KAFKA_DLLIMEXPORT __declspec(dllexport)
#else
#define KAFKA_DLLIMEXPORT __declspec(dllimport)
#endif
using namespace std;
/*-----------------------------------------------------------------------------------------------
* Kafka 消费者接口
*----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaConsumerClient
{
public:
KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
virtual ~KafkaConsumerClient();
//初始化
bool Init(std::string& errstr);
//停止消费
void Stop();
//开始获取消息
void StartFetchMsg(std::string topic,std::string& msg, std::string& errstr, int timeout_ms = 10);
private:
void MsgConsume(RdKafka::Message* message, void* opaque);
private:
//标志位
bool m_bRunFlag;
//分区号
int32_t m_nPartition;
//读取偏移
int64_t m_nLastOffset;
//当前偏移
int64_t m_nCurrentOffset;
//代理
std::string m_strBrokers;
//分组号
std::string m_strGroupid;
//主题实例
//RdKafka::Topic* m_pTopic;
std::map<std::string, RdKafka::Topic*> m_mapTopic;
//消费者
RdKafka::Consumer* m_pKafkaConsumer;
};
完整代码
CKafkaClient.h
#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H
#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>
#ifdef KAFKACLIENT_DLL
#define KAFKA_DLLIMEXPORT __declspec(dllexport)
#else
#define KAFKA_DLLIMEXPORT __declspec(dllimport)
#endif
using namespace std;
/*-----------------------------------------------------------------------------------------------
* Kafka 生产者接口
*----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb
{
public:
void dr_cb(RdKafka::Message& message);
};
class KAFKA_DLLIMEXPORT KafkaProducerEventCallBack : public RdKafka::EventCb
{
public:
void event_cb(RdKafka::Event& event);
};
class KAFKA_DLLIMEXPORT KafkaProducerClient
{
public:
KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition = 0);
virtual ~KafkaProducerClient();
//初始化
bool Init(std::string& errstr);
//停止生产
void Stop();
//发送消息
void Send(std::string topic,const string& msg, std::string& errstr);
private:
//标志位
bool m_bRunFlag;
//分区
int m_nPpartition;
//代理
std::string m_strBroker;
//主题实例
//RdKafka::Topic* m_pTopic;
std::map<std::string, RdKafka::Topic*> m_mapTopic;
//生产者
RdKafka::Producer* m_pProducer;
//事项回调
KafkaProducerEventCallBack m_producerEventCallBack;
//消息分发通知
KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
};
/*-----------------------------------------------------------------------------------------------
* Kafka 消费者接口
*----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaConsumerClient
{
public:
KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
virtual ~KafkaConsumerClient();
//初始化
bool Init(std::string& errstr);
//停止消费
void Stop();
//开始获取消息
void StartFetchMsg(std::string topic,std::string& msg, std::string& errstr, int timeout_ms = 10);
private:
void MsgConsume(RdKafka::Message* message, void* opaque);
private:
//标志位
bool m_bRunFlag;
//分区号
int32_t m_nPartition;
//读取偏移
int64_t m_nLastOffset;
//当前偏移
int64_t m_nCurrentOffset;
//代理
std::string m_strBrokers;
//分组号
std::string m_strGroupid;
//主题实例
//RdKafka::Topic* m_pTopic;
std::map<std::string, RdKafka::Topic*> m_mapTopic;
//消费者
RdKafka::Consumer* m_pKafkaConsumer;
};
#endif // KAFKA_CLIENT_H
CKafkaClient.cpp
#include <stdafx.h>
#include <iostream>
#include "CKafkaClient.h"
/*-----------------------------------------------------------------------------------------------
* Kafka 生产者接口实现
*----------------------------------------------------------------------------------------------*/
void KafkaProducerDeliveryReportCallBack::dr_cb(RdKafka::Message& message)
{
}
void KafkaProducerEventCallBack::event_cb(RdKafka::Event& event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
break;
case RdKafka::Event::EVENT_STATS:
break;
case RdKafka::Event::EVENT_LOG:
break;
default:
break;
}
}
KafkaProducerClient::KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition /*= 1*/)
: m_bRunFlag(TRUE), m_strBroker(brokers), m_nPpartition(nPpartition)
{
m_pProducer = NULL;
m_nPpartition = 0;
m_mapTopic.clear();
auto itr = topics.begin();
for (;itr != topics.end();++itr)
{
m_mapTopic.insert(std::pair<std::string,RdKafka::Topic*>(*itr,nullptr));
}
}
KafkaProducerClient::~KafkaProducerClient()
{
Stop();
}
bool KafkaProducerClient::Init(std::string& errstr)
{
errstr.clear();
//Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
//Set configuration properties
if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK)
{
std::cout << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
}
//Set delivery report callback
conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
conf->set("event_cb", &m_producerEventCallBack, errstr);
//Create producer using accumulated global configuration
m_pProducer = RdKafka::Producer::create(conf, errstr);
if (!m_pProducer)
{
std::cout << "Failed to create producer: " << errstr << std::endl;
return FALSE;
}
// Create topic handle
auto itr = m_mapTopic.begin();
for (;itr != m_mapTopic.end();++itr)
{
RdKafka::Topic* pTopic = RdKafka::Topic::create(m_pProducer, itr->first,tconf, errstr);
if (!pTopic)
{
continue;
}
itr->second = pTopic;
}
return TRUE;
}
void KafkaProducerClient::Send(std::string topic, const string &msg, std::string& errstr)
{
errstr.clear();
if (!m_bRunFlag)
return;
auto itr = m_mapTopic.begin();
for (; itr != m_mapTopic.end(); ++itr)
{
if (topic == itr->first)
{
//Produce message
RdKafka::ErrorCode resp = m_pProducer->produce(itr->second, m_nPpartition,
RdKafka::Producer::RK_MSG_COPY //Copy payload,
const_cast<char*>(msg.c_str()), msg.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
errstr = err2str(resp);
else
std::cout << "Produced message (" << msg.size() << " bytes)" << std::endl;
m_pProducer->poll(0);
// Wait for messages to be delivered
int iTimeOut = GetTickCount();
while (m_bRunFlag && m_pProducer->outq_len() > 0)
{
if ((GetTickCount() - iTimeOut) > 200)
{
break;
}
m_pProducer->poll(100);
}
break;
}
}
}
void KafkaProducerClient::Stop()
{
m_mapTopic.clear();
if (m_pProducer != nullptr)
{
delete m_pProducer;
m_pProducer = nullptr;
}
}
/*-----------------------------------------------------------------------------------------------
* Kafka 消费者接口实现
*----------------------------------------------------------------------------------------------*/
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
:m_strBrokers(brokers),
m_strGroupid(groupid),
m_nPartition(nPartition),
m_nCurrentOffset(offset)
{
m_nLastOffset = 0;
m_pKafkaConsumer = NULL;
m_bRunFlag = FALSE;
m_mapTopic.clear();
auto itr = topics.begin();
for (; itr != topics.end(); ++itr)
{
m_mapTopic.insert(std::pair<std::string, RdKafka::Topic*>(*itr, nullptr));
}
}
KafkaConsumerClient::~KafkaConsumerClient()
{
Stop();
}
bool KafkaConsumerClient::Init(std::string& errstr)
{
errstr.clear();
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if (!conf)
{
//RdKafka create global conf failed
return FALSE;
}
//设置broker list
if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK)
{
std::cout << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
}
//设置consumer group
if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK)
{
std::cout << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
}
std::string strfetch_num = "10240000";
//每次从单个分区中拉取消息的最大尺寸
if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK)
{
std::cout << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
}
//Create consumer using accumulated global configuration
m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
if (!m_pKafkaConsumer)
{
std::cout << "failed to ceate consumer" << endl;
}
std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
delete conf;
//创建kafka topic的配置
auto tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (!tconf)
{
//RdKafka create topic conf failed
return FALSE;
}
if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
std::cout << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
}
//Create topic handle
auto itr = m_mapTopic.begin();
for (;itr != m_mapTopic.end();++itr)
{
RdKafka::Topic* pTopic = RdKafka::Topic::create(m_pKafkaConsumer, itr->first, tconf, errstr);
if (!pTopic)
{
std::cout << "RdKafka create topic failed :" << errstr.c_str() << endl;
}
//Start consumer for topic+partition at start offset
RdKafka::ErrorCode resp = m_pKafkaConsumer->start(pTopic, m_nPartition, m_nCurrentOffset);
if (resp != RdKafka::ERR_NO_ERROR)
{
std::cout << "failed to start consumer : " << errstr.c_str() << endl;
}
itr->second = pTopic;
}
delete tconf;
m_bRunFlag = TRUE;
return TRUE;
}
void KafkaConsumerClient::MsgConsume(RdKafka::Message* message, void* opaque)
{
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
//Real message
if (message->key())
{
std::cout << "Key: " << *message->key() << std::endl;
}
m_nLastOffset = message->offset();
break;
case RdKafka::ERR__PARTITION_EOF:
//Last message
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
//Consume failed
Stop();
break;
default:
//Errors,Consume failed
Stop();
break;
}
}
void KafkaConsumerClient::StartFetchMsg(std::string topic, std::string& msg, std::string& errstr, int timeout_ms)
{
msg.clear();
errstr.clear();
auto itr = m_mapTopic.begin();
for (;itr != m_mapTopic.end();++itr)
{
if (itr->first == topic)
{
RdKafka::Message* kmsg = NULL;
int iTryTimes = 3;
while (m_bRunFlag && iTryTimes-- > 0)
{
kmsg = m_pKafkaConsumer->consume(itr->second, m_nPartition, timeout_ms);
switch (kmsg->err())
{
case RdKafka::ERR__TIMED_OUT:
errstr = std::string("RX KafkaFetchMsg Error >>> ERR_RECV_TIMED_OUT.");
break;
case RdKafka::ERR_NO_ERROR:
//Real message
msg = std::string(static_cast<const char*>(kmsg->payload()));
m_nLastOffset = kmsg->offset();
iTryTimes = 0;
break;
case RdKafka::ERR__PARTITION_EOF:
errstr = std::string("RX KafkaFetchMsg Error >>> ERR_PARTITION_EOF.");
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
errstr = std::string("RX KafkaFetchMsg Error >>> ERR_UNKNOWN_TOPIC.");
Stop();
break;
case RdKafka::ERR__UNKNOWN_PARTITION:
errstr = std::string("RX KafkaFetchMsg Error >>> ERR_UNKNOWN_PARTITION.");
Stop();
break;
default:
//Errors ,Consume failed
errstr = std::string("RX KafkaFetchMsg Error >>> ") + kmsg->errstr();
Stop();
break;
}
delete kmsg;
m_pKafkaConsumer->poll(0);
}
break;
}
}
}
void KafkaConsumerClient::Stop()
{
m_bRunFlag = FALSE;
auto itr = m_mapTopic.begin();
for (;itr != m_mapTopic.end();++itr)
{
m_pKafkaConsumer->stop(itr->second, m_nPartition);
m_pKafkaConsumer->poll(1000);
}
m_mapTopic.clear();
if (m_pKafkaConsumer)
{
delete m_pKafkaConsumer;
m_pKafkaConsumer = NULL;
}
//Wait for RdKafka to decommission.
RdKafka::wait_destroyed(1000);
}
更多推荐
已为社区贡献3条内容
所有评论(0)