MFC/QT通过librdkafka访问Kafka

创建VS动态库工程项目

创建工程

VS2019 创建动态库工程,如下图所示:
在这里插入图片描述

设置头文件包含路径

设置 librdkafka头文件路径:
在这里插入图片描述

设置库文件包含路径

设置 librdkafka库文件路径:
在这里插入图片描述
设置依赖 lib 文件:
在这里插入图片描述

封装librdkafka接口

Kafka 生产者接口

生产者接口主要实现向Kafka服务器发送消息。

#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H

#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>


#ifdef KAFKACLIENT_DLL
	#define KAFKA_DLLIMEXPORT  __declspec(dllexport)
#else
	#define KAFKA_DLLIMEXPORT  __declspec(dllimport)
#endif

using namespace std;

/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 生产者接口                        
 *----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb
{
public:
	void dr_cb(RdKafka::Message& message);
};

class KAFKA_DLLIMEXPORT KafkaProducerEventCallBack : public RdKafka::EventCb
{
public:
	void event_cb(RdKafka::Event& event);
};

class KAFKA_DLLIMEXPORT KafkaProducerClient
{
public:
	KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition = 0);
	virtual ~KafkaProducerClient();
	
	//初始化
	bool Init(std::string& errstr);
	//停止生产
	void Stop();
	//发送消息
	void Send(std::string topic,const string& msg, std::string& errstr);

private:
	//标志位
	bool m_bRunFlag;
	//分区
	int m_nPpartition;
	//代理
	std::string m_strBroker;
	//主题实例
	//RdKafka::Topic* m_pTopic;
	std::map<std::string, RdKafka::Topic*> m_mapTopic;
	//生产者
	RdKafka::Producer* m_pProducer;
	//事项回调
	KafkaProducerEventCallBack m_producerEventCallBack;
	//消息分发通知
	KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
};

Kafka 消费者接口

消费者接口主要实现从Kafka服务器读取消息。

#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H

#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>


#ifdef KAFKACLIENT_DLL
	#define KAFKA_DLLIMEXPORT  __declspec(dllexport)
#else
	#define KAFKA_DLLIMEXPORT  __declspec(dllimport)
#endif

using namespace std;

/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 消费者接口
 *----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaConsumerClient 
{
public:
	KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
	virtual ~KafkaConsumerClient();

	//初始化
	bool Init(std::string& errstr);
	//停止消费
	void Stop();
	//开始获取消息
	void StartFetchMsg(std::string topic,std::string& msg, std::string& errstr, int timeout_ms = 10);

private:
	void MsgConsume(RdKafka::Message* message, void* opaque);

private:
	//标志位
	bool m_bRunFlag;
	//分区号
	int32_t m_nPartition;
	//读取偏移
	int64_t m_nLastOffset;
	//当前偏移
	int64_t m_nCurrentOffset;
	//代理
	std::string m_strBrokers;
	//分组号
	std::string m_strGroupid;
	//主题实例
	//RdKafka::Topic* m_pTopic;
	std::map<std::string, RdKafka::Topic*> m_mapTopic;
	//消费者
	RdKafka::Consumer* m_pKafkaConsumer;
};

完整代码

CKafkaClient.h

#ifndef KAFKA_CLIENT_H
#define KAFKA_CLIENT_H

#include <map>
#include <vector>
#include <string>
#include <rdkafkacpp.h>


#ifdef KAFKACLIENT_DLL
	#define KAFKA_DLLIMEXPORT  __declspec(dllexport)
#else
	#define KAFKA_DLLIMEXPORT  __declspec(dllimport)
#endif

using namespace std;

/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 生产者接口                        
 *----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb
{
public:
	void dr_cb(RdKafka::Message& message);
};

class KAFKA_DLLIMEXPORT KafkaProducerEventCallBack : public RdKafka::EventCb
{
public:
	void event_cb(RdKafka::Event& event);
};

class KAFKA_DLLIMEXPORT KafkaProducerClient
{
public:
	KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition = 0);
	virtual ~KafkaProducerClient();
	
	//初始化
	bool Init(std::string& errstr);
	//停止生产
	void Stop();
	//发送消息
	void Send(std::string topic,const string& msg, std::string& errstr);

private:
	//标志位
	bool m_bRunFlag;
	//分区
	int m_nPpartition;
	//代理
	std::string m_strBroker;
	//主题实例
	//RdKafka::Topic* m_pTopic;
	std::map<std::string, RdKafka::Topic*> m_mapTopic;
	//生产者
	RdKafka::Producer* m_pProducer;
	//事项回调
	KafkaProducerEventCallBack m_producerEventCallBack;
	//消息分发通知
	KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
};


/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 消费者接口
 *----------------------------------------------------------------------------------------------*/
class KAFKA_DLLIMEXPORT KafkaConsumerClient 
{
public:
	KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
	virtual ~KafkaConsumerClient();

	//初始化
	bool Init(std::string& errstr);
	//停止消费
	void Stop();
	//开始获取消息
	void StartFetchMsg(std::string topic,std::string& msg, std::string& errstr, int timeout_ms = 10);

private:
	void MsgConsume(RdKafka::Message* message, void* opaque);

private:
	//标志位
	bool m_bRunFlag;
	//分区号
	int32_t m_nPartition;
	//读取偏移
	int64_t m_nLastOffset;
	//当前偏移
	int64_t m_nCurrentOffset;
	//代理
	std::string m_strBrokers;
	//分组号
	std::string m_strGroupid;
	//主题实例
	//RdKafka::Topic* m_pTopic;
	std::map<std::string, RdKafka::Topic*> m_mapTopic;
	//消费者
	RdKafka::Consumer* m_pKafkaConsumer;
};

#endif // KAFKA_CLIENT_H

CKafkaClient.cpp

#include <stdafx.h>
#include <iostream>
#include "CKafkaClient.h"

/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 生产者接口实现
 *----------------------------------------------------------------------------------------------*/

void KafkaProducerDeliveryReportCallBack::dr_cb(RdKafka::Message& message)
{
	
}

void KafkaProducerEventCallBack::event_cb(RdKafka::Event& event)
{
	switch (event.type())
	{
	case RdKafka::Event::EVENT_ERROR:
		if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
			break;
	case RdKafka::Event::EVENT_STATS:
		break;
	case RdKafka::Event::EVENT_LOG:
		break;
	default:
		break;
	}
}

KafkaProducerClient::KafkaProducerClient(const string &brokers, std::vector<string>& topics, int nPpartition /*= 1*/)
    : m_bRunFlag(TRUE), m_strBroker(brokers), m_nPpartition(nPpartition)
{
	m_pProducer = NULL;
	m_nPpartition = 0;

    m_mapTopic.clear();
    auto itr = topics.begin();
    for (;itr != topics.end();++itr)
    {
        m_mapTopic.insert(std::pair<std::string,RdKafka::Topic*>(*itr,nullptr));
    }
}

KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}

bool KafkaProducerClient::Init(std::string& errstr)
{
    errstr.clear();

    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    //Set configuration properties
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cout << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }

    //Set delivery report callback 
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);

    //Create producer using accumulated global configuration
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) 
    {
        std::cout << "Failed to create producer: " << errstr << std::endl;
        return FALSE;
    }
    // Create topic handle
    auto itr = m_mapTopic.begin();
    for (;itr != m_mapTopic.end();++itr)
    {
        RdKafka::Topic* pTopic = RdKafka::Topic::create(m_pProducer, itr->first,tconf, errstr);
		if (!pTopic)
		{
			continue;
		}

        itr->second = pTopic;
    }
  
    return TRUE;
}
void KafkaProducerClient::Send(std::string topic, const string &msg, std::string& errstr)
{
    errstr.clear();

    if (!m_bRunFlag)
        return;

    auto itr = m_mapTopic.begin();
    for (; itr != m_mapTopic.end(); ++itr)
    {
        if (topic == itr->first)
        {
            //Produce message
            RdKafka::ErrorCode resp = m_pProducer->produce(itr->second, m_nPpartition,
                RdKafka::Producer::RK_MSG_COPY //Copy payload,
                const_cast<char*>(msg.c_str()), msg.size(),
                NULL, NULL);
            if (resp != RdKafka::ERR_NO_ERROR)
                errstr = err2str(resp); 
            else
                std::cout << "Produced message (" << msg.size() << " bytes)" << std::endl;

            m_pProducer->poll(0);

            // Wait for messages to be delivered 
            int iTimeOut = GetTickCount();
            while (m_bRunFlag && m_pProducer->outq_len() > 0)
            {
                if ((GetTickCount() - iTimeOut) > 200)
                {
                    break;
                }
                m_pProducer->poll(100);
            }
            break;
        }
    }

}

void KafkaProducerClient::Stop()
{
    m_mapTopic.clear();

    if (m_pProducer != nullptr) 
    {
        delete m_pProducer;
        m_pProducer = nullptr;
    }
}



/*-----------------------------------------------------------------------------------------------
 *                                   Kafka 消费者接口实现
 *----------------------------------------------------------------------------------------------*/
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, std::vector<string>& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
    m_strGroupid(groupid),
    m_nPartition(nPartition),
    m_nCurrentOffset(offset)
{
    m_nLastOffset = 0;
    m_pKafkaConsumer = NULL;
    m_bRunFlag = FALSE;

	m_mapTopic.clear();
	auto itr = topics.begin();
	for (; itr != topics.end(); ++itr)
	{
		m_mapTopic.insert(std::pair<std::string, RdKafka::Topic*>(*itr, nullptr));
	}
}

KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}

bool KafkaConsumerClient::Init(std::string& errstr)
{
    errstr.clear();

    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf)
    {
        //RdKafka create global conf failed
        return FALSE;
    }
    //设置broker list
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cout << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    
    //设置consumer group
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cout << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    
    std::string strfetch_num = "10240000";
    
    //每次从单个分区中拉取消息的最大尺寸
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK)
    {
        std::cout << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    
    //Create consumer using accumulated global configuration
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer)
    {
        std::cout << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;

    //创建kafka topic的配置
    auto tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf)
    {
        //RdKafka create topic conf failed
        return FALSE;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cout << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    
    //Create topic handle
    auto itr = m_mapTopic.begin();
    for (;itr != m_mapTopic.end();++itr)
    {
        RdKafka::Topic* pTopic  = RdKafka::Topic::create(m_pKafkaConsumer, itr->first, tconf, errstr);
        if (!pTopic)
        {
            std::cout << "RdKafka create topic failed :" << errstr.c_str() << endl;
        }
        
        //Start consumer for topic+partition at start offset
        RdKafka::ErrorCode resp = m_pKafkaConsumer->start(pTopic, m_nPartition, m_nCurrentOffset);
        if (resp != RdKafka::ERR_NO_ERROR)
        {
            std::cout << "failed to start consumer : " << errstr.c_str() << endl;
        }

        itr->second = pTopic;
    }
    delete tconf;

    m_bRunFlag = TRUE;
    return TRUE;
}
void KafkaConsumerClient::MsgConsume(RdKafka::Message* message, void* opaque)
{
    switch (message->err())
    {
    case RdKafka::ERR__TIMED_OUT:
        break;

    case RdKafka::ERR_NO_ERROR:
        //Real message
        if (message->key())
        {
            std::cout << "Key: " << *message->key() << std::endl;
        }

        m_nLastOffset = message->offset();
        break;
    case RdKafka::ERR__PARTITION_EOF:
        //Last message 
        break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
        //Consume failed
        Stop();
        break;
    default:
        //Errors,Consume failed
        Stop();
        break;
    }
}

void KafkaConsumerClient::StartFetchMsg(std::string topic, std::string& msg, std::string& errstr, int timeout_ms)
{
    msg.clear();
    errstr.clear();

    auto itr = m_mapTopic.begin();
    for (;itr != m_mapTopic.end();++itr)
    {
        if (itr->first == topic)
        {
			RdKafka::Message* kmsg = NULL;
			int iTryTimes = 3;
			while (m_bRunFlag && iTryTimes-- > 0)
			{
				kmsg = m_pKafkaConsumer->consume(itr->second, m_nPartition, timeout_ms);

				switch (kmsg->err())
				{
				case RdKafka::ERR__TIMED_OUT:
					errstr = std::string("RX KafkaFetchMsg Error >>> ERR_RECV_TIMED_OUT.");
					break;
				case RdKafka::ERR_NO_ERROR:
					//Real message 
					msg = std::string(static_cast<const char*>(kmsg->payload()));
					m_nLastOffset = kmsg->offset();
					iTryTimes = 0;
					break;

				case RdKafka::ERR__PARTITION_EOF:
					errstr = std::string("RX KafkaFetchMsg Error >>> ERR_PARTITION_EOF.");
					break;
				case RdKafka::ERR__UNKNOWN_TOPIC:
					errstr = std::string("RX KafkaFetchMsg Error >>> ERR_UNKNOWN_TOPIC.");
					Stop();
					break;
				case RdKafka::ERR__UNKNOWN_PARTITION:
					errstr = std::string("RX KafkaFetchMsg Error >>> ERR_UNKNOWN_PARTITION.");
					Stop();
					break;
				default:
					//Errors ,Consume failed
					errstr = std::string("RX KafkaFetchMsg Error >>> ") + kmsg->errstr();
					Stop();
					break;
				}

				delete kmsg;
				m_pKafkaConsumer->poll(0);
			}
            break;
        }
    }
}

void KafkaConsumerClient::Stop()
{
    m_bRunFlag = FALSE;

    auto itr = m_mapTopic.begin();
    for (;itr != m_mapTopic.end();++itr)
    {
		m_pKafkaConsumer->stop(itr->second, m_nPartition);
		m_pKafkaConsumer->poll(1000);

    }
    
    m_mapTopic.clear();

    if (m_pKafkaConsumer)
    {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }

    //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(1000);
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐