作为一个c++程序,我相信很多人都会使用c++的librdkafkacpp的接口,这些接口通过C++封装,使用起来方便,不需要去了解底层的c接口的封装及实现,是最理想的使用方式。

那为啥还要使用C接口呢?

这就涉及到C++的ABI二进制兼容问题。

当我们编译一个C++动态库,受限于不同的编译器版本,无论是gcc还是VC,都存在着兼容问题。对于一个生产版本,如果在统一了编译工具链的情景下,是可以使用C++版本,但如果考虑到更好的ABI兼容,所以就选择了C库。其无论在VC还是gcc,都表现得很好。

C库kafka的消费模式

 三种消费模式的异同

librdKafka 提供了三种不同的消费模式,下面将讲解这三种模式的差异及如何使用。

// 一次读取一条数据
rd_kafka_consumer_poll();
rd_kafka_consume();

// 批量读取,一次可以读取多条数据
// 通过回调函数的批量读取,将废弃
rd_kafka_consume_callback();
rd_kafka_consume_batch();

// 通过Queue路由道特定的处理函数
rd_kafka_consume_callback_queue
rd_kafka_consume_batch_queue

1、这种模式,直接使用rd_kafka_consumer_poll()接口,其可以实现1次读取一条,这种场景对于数据量不大,可以很好的使用,而且其客户端API也实现了相关的消息确认和提交,大多数采用这种方式,这种方式的例程,我在前面的博文中有讲到,有兴趣的可以去看看

2、这种模式是批量读取的模式,这种模式下有两种接口,一种基于api的回调

rd_kafka_consume_callback,这个接口根据官方文档是即将被废弃的接口,所以不建议使用该接口,建议使用rd_kafka_consume_batch(),后面会给出这种模式下的使用例子。

3、使用Queue的方式,该方式也提供了两种模式,一种回调,一种直接消费,回调的方式应该在性能上是最高的。这种方式与第二种方式的不同是,他能让不同topic,不同patition的数据路由到一个队列,也就是对于一个需要处理多种数据的消费者。

使用批量读取的注意:

1、消息最好程序中自己做确认提交,这样在kafka的服务,才不会看到滞后的消息消费

2、注意消息的销毁,kafka在销毁对象时会持有引用技术,消费完的消息,要destoy

3、对于topic+partition的消费模式,最好不要在运行热新增分区或主题,否则客户端处理起来比较麻烦。如果必要这么做,要做好测试。保证数据不重复不丢消息。

代码实现

KafkaConsumer.h

#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>

#include "rdkafka.h"

using namespace std;

struct TKafkaCfgInfo
{
    string m_strBrokers;
    string m_strGroupID;
    string m_strTopics;
    int m_nPartition;

    int m_nBatchReadSize;
    int m_nReadTimeout;
    bool m_bIsLogKafka;

    TKafkaCfgInfo()
        : m_nPartition(-1)
        , m_nBatchReadSize(-1)
        , m_nReadTimeout(-1)
        , m_bIsLogKafka(false)
    {}
};

class CKafkaConsumer
{
public:
    CKafkaConsumer();
    virtual ~CKafkaConsumer();

    // 初?始º?化¡¥Kafka配?置?
    int InitCfg(const TKafkaCfgInfo& tCfg);
    const TKafkaCfgInfo& GetCfg() { return m_tCfg; }

    // 拉¤-取¨?Kafka消?息¡é
    void pullMessage(rd_kafka_message_t** p, ssize_t& nCnt);

    void SetConsumerOk(rd_kafka_message_t* pMsg);

protected:
    rd_kafka_t* m_pkafka;
    rd_kafka_topic_t* m_pKafkaTopic;

    TKafkaCfgInfo m_tCfg;
};

#endif // KAFKACONSUMER_H

// KafkaConsumer.cpp

#include "KafkaConsumer.h"

#include <iostream>

using namespace std;

static void RebalanceCb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque)
{
     测a试º?代䨲码?
#if 0
    if (partitions)
    {
        for (int i = 0; i < partitions->cnt; i++)
        {
            partitions->elems[i].offset = 0;
        }
    }
    printf("RebalanceCb \r\n");
#endif
    printf("RebalanceCb \n");
    rd_kafka_assign(rk, partitions);
}

static void  EventErrorCb(rd_kafka_t* rk, int err,
    const char* reason,
    void* opaque)
{
    printf("Kafka EventErrorCb(%d): %s\n", err, reason);
}

CKafkaConsumer::CKafkaConsumer()
    : m_pkafka(nullptr)
    , m_pKafkaTopic(nullptr)
{
}

int CKafkaConsumer::InitCfg(const TKafkaCfgInfo& tCfg)
{
    m_tCfg = tCfg;

    rd_kafka_conf_t* pConf = rd_kafka_conf_new();
    if (!pConf)
    {

        return -1;
    }

    char szErr[512] = { 0 };
    if (rd_kafka_conf_set(pConf, "bootstrap.servers", m_tCfg.m_strBrokers.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    if (rd_kafka_conf_set(pConf, "group.id", m_tCfg.m_strGroupID.c_str(), szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {

        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    // rd_kafka_conf_set_rebalance_cb(pConf, &RebalanceCb);
    rd_kafka_conf_set_error_cb(pConf, EventErrorCb);

    if (rd_kafka_conf_set(pConf, "enable.auto.commit", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    if (rd_kafka_conf_set(pConf, "enable.auto.offset.store", "false", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_conf_destroy(pConf);
        return -1;
    }

    // topic配?置?
    /*
    rd_kafka_topic_conf_t* pTopicConf = rd_kafka_topic_conf_new();
    /*if (rd_kafka_topic_conf_set(pTopicConf, "auto.offset.reset", "latest", szErr, sizeof(szErr)) != RD_KAFKA_CONF_OK)
    {
        rd_kafka_topic_conf_destroy(pTopicConf);
        return -1;
    }*/

    // 创ä¡ä建¡§kafka实º¦Ì例¤y
    m_pkafka = rd_kafka_new(RD_KAFKA_CONSUMER, pConf, szErr, sizeof(szErr));
    if (!m_pkafka)
    {
        return -1;
    }

    m_pKafkaTopic = rd_kafka_topic_new(m_pkafka, m_tCfg.m_strTopics.c_str(), nullptr);
    if (!m_pKafkaTopic)
    {
        return -1;
    }

    rd_kafka_consume_start(m_pKafkaTopic, m_tCfg.m_nPartition, RD_KAFKA_OFFSET_STORED);

    return 0;
}

void CKafkaConsumer::pullMessage(rd_kafka_message_t** pRet, ssize_t& nCnt)
{
    nCnt = rd_kafka_consume_batch(m_pKafkaTopic, m_tCfg.m_nPartition, m_tCfg.m_nReadTimeout, pRet, m_tCfg.m_nBatchReadSize);
    // rd_kafka_poll(m_pkafka, 0);
}

void CKafkaConsumer::SetConsumerOk(rd_kafka_message_t* pMsg)
{
    rd_kafka_resp_err_t errCode = rd_kafka_offset_store(m_pKafkaTopic, m_tCfg.m_nPartition, pMsg->offset);
    printf("rd_kafka_offset_store offset = %d\n", pMsg->offset);
    if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        return;
    }

    errCode = rd_kafka_commit_message(m_pkafka, pMsg, 0);
    if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        return;
    }
}

CKafkaConsumer::~CKafkaConsumer()
{
    if (m_pKafkaTopic)
    {
        rd_kafka_consume_stop(m_pKafkaTopic, m_tCfg.m_nPartition);
        rd_kafka_topic_destroy(m_pKafkaTopic);
        m_pKafkaTopic = nullptr;
    }

    if (m_pkafka)
    {
        // rd_kafka_consumer_close(m_pkafka); // rd_kafka_destroy调Ì¡Â用®?会¨¢调Ì¡Â用®?rd_kafka_consumer_close
        rd_kafka_destroy(m_pkafka);
        m_pkafka = nullptr;
    }
}

// Test.cpp


#include "stdafx.h"
#include "KafkaConsumer.h"
#include <assert.h>

int _tmain(int argc, _TCHAR* argv[])
{
    string brokers = "192.168.254.129:9092";
    string strTopic = "test";
    string group = "test1";
    int nPartition = 0;
    CKafkaConsumer consumer;
    TKafkaCfgInfo t;
    t.m_strBrokers = brokers;
    t.m_strGroupID = group;
    t.m_strTopics = strTopic;
    t.m_nPartition = nPartition;
    int nRet = consumer.InitCfg(t);
    if (nRet != 0)
    {
        assert(0);
        return 0;
    }

    int nBatchSize = 10;
    rd_kafka_message_t** pRet = new rd_kafka_message_t * [nBatchSize];

    while (1)
    {
        int nTimeout = 1000;
        ssize_t nCnt = 0;
        consumer.pullMessage(pRet, nCnt);

        if (nCnt < 0)
        {
            fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
            continue;
        }

        if (nCnt == 0)
        {
            // donothing
            continue;
        }

        for (int nIndex = 0; nIndex < nCnt; nIndex++)
        {
            rd_kafka_message_t* pMsg = pRet[nIndex];
            printf("%s\n", pMsg->payload);
            consumer.SetConsumerOk(pMsg);
            rd_kafka_message_destroy(pMsg);
        }
    }

    delete[] pRet;
    return 0;
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐