通过调用librdkafka库实现操作kafka

消费者:

 1 #include <vector>
 2 #include <string>
 3 #include <memory>
 4 #include <getopt.h>
 5 #include <csignal>
 6 #include <iostream>
 7 #include "librdkafka/rdkafkacpp.h"
 8  
 9 class kafka_consumer_client{
10 public:
11     kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1);
12     //kafka_consumer_client();
13     virtual ~kafka_consumer_client();
14  
15     bool initClient();
16     bool consume(int timeout_ms);
17     void finalize();
18 private:
19     void consumer(RdKafka::Message *msg, void *opt);
20  
21     std::string brokers_;
22     std::string topics_;
23     std::string groupid_;
24  
25     int64_t last_offset_ = 0;
26     RdKafka::Consumer *kafka_consumer_ = nullptr;   
27     RdKafka::Topic    *topic_          = nullptr;
28     int64_t           offset_          = RdKafka::Topic::OFFSET_BEGINNING;
29     int32_t           partition_       = 0;
30      
31 };
kafka_comsumer.h
  1 #include "kafka_comsumer.h"
  2  
  3  
  4 bool run_ = true;
  5  
  6 static void sigterm (int sig) {
  7   run_ = false;
  8 }
  9  
 10 kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
 11 :brokers_(brokers),
 12  topics_(topics),
 13  groupid_(groupid),
 14  offset_(offset){
 15  }
 16  
 17 //kafka_consumer_client::kafka_consumer_client(){}
 18  
 19 kafka_consumer_client::~kafka_consumer_client(){}
 20  
 21 bool kafka_consumer_client::initClient(){
 22     RdKafka::Conf *conf = nullptr;
 23     conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 24     if(!conf){
 25         fprintf(stderr, "RdKafka create global conf failed\n");
 26         return false;
 27     }
 28  
 29     std::string errstr;
 30     /*设置broker list*/
 31     if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
 32         fprintf(stderr, "RdKafka conf set brokerlist failed : %s\n", errstr.c_str());
 33     }
 34  
 35     /*设置consumer group*/
 36     if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
 37         fprintf(stderr, "RdKafka conf set group.id failed : %s\n", errstr.c_str());
 38     }
 39  
 40     std::string strfetch_num = "10240000";
 41     /*每次从单个分区中拉取消息的最大尺寸*/
 42     if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
 43         fprintf(stderr, "RdKafka conf set max.partition failed : %s\n", errstr.c_str());
 44     }
 45  
 46     /*创建kafka consumer实例*/
 47     kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
 48     if(!kafka_consumer_){
 49         fprintf(stderr, "failed to ceate consumer\n");
 50     }
 51     delete conf;
 52  
 53     RdKafka::Conf *tconf = nullptr;
 54     /*创建kafka topic的配置*/
 55     tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 56     if(!tconf){
 57         fprintf(stderr, "RdKafka create topic conf failed\n");
 58         return false;
 59     }
 60  
 61     /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
 62     当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
 63     ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
 64     有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
 65     在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
 66     消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
 67     开始位置消费所有消息.*/
 68     if(tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK){
 69         fprintf(stderr, "RdKafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
 70     }
 71  
 72     topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
 73     if(!topic_){
 74         fprintf(stderr, "RdKafka create topic failed : %s\n", errstr.c_str());
 75     }
 76     delete tconf;
 77  
 78     RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
 79     if (resp != RdKafka::ERR_NO_ERROR){
 80         fprintf(stderr, "failed to start consumer : %s\n", RdKafka::err2str(resp).c_str());
 81     }
 82  
 83     return true;
 84 }
 85  
 86 void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
 87     switch(message->err()){
 88         case RdKafka::ERR__TIMED_OUT:
 89             break;
 90         case RdKafka::ERR_NO_ERROR:
 91             printf("%.*s\n", 
 92                 static_cast<int>(message->len()),
 93              static_cast <const char*>(message->payload()));
 94             last_offset_ = message->offset();
 95           
 96              break;
 97         case RdKafka::ERR__PARTITION_EOF:
 98             std::cerr << "%% Reached the end of the queue, offset: " << last_offset_ << std::endl;
 99             break;
100         case RdKafka::ERR__UNKNOWN_TOPIC:
101         case RdKafka::ERR__UNKNOWN_PARTITION:
102             std::cerr << "Consume failed: " << message->errstr() << std::endl;
103             run_ = false;
104             break;
105         default:
106             std::cerr << "Consume failed: " << message->errstr() << std::endl;
107             run_ = false;
108             break;
109     }
110 }
111  
112 bool kafka_consumer_client::consume(int timeout_ms){
113     RdKafka::Message *msg = nullptr;
114  
115     while(run_){
116         msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
117         consumer(msg, nullptr);
118         kafka_consumer_->poll(0);
119         delete msg;
120     }
121  
122     kafka_consumer_->stop(topic_, partition_);
123     if(topic_){
124         delete topic_;
125         topic_ = nullptr;
126     }
127     if(kafka_consumer_){
128         delete kafka_consumer_;
129         kafka_consumer_ = nullptr;
130     }
131  
132     /*销毁kafka实例*/
133     RdKafka::wait_destroyed(5000);
134     return true;
135 }
136  
kafka_comsumer.cpp

生产者:

 1 #ifndef KAFKAPRODUCER_H
 2 #define KAFKAPRODUCER_H
 3  
 4 #include <iostream>
 5 #include <string>
 6 #include <cstdlib>
 7 #include <cstdio>
 8 #include <csignal>
 9 #include <cstring>
10 #include <list>
11 #include <librdkafka/rdkafkacpp.h>
12 #include <vector>
13 #include <fstream>
14  
15 using std::string;
16 using std::list;
17 using std::cout;
18 using std::endl;
19 using std::vector;
20 using std::fstream;
21  
22 class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
23 public:
24     void dr_cb(RdKafka::Message &message) {
25         std::cout << "Message delivery for (" << message.len() << " bytes): " <<
26             message.errstr() << std::endl;
27         if (message.key())
28             std::cout << "Key: " << *(message.key()) << ";" << std::endl;
29     }
30 };
31 class KafkaProducerEventCallBack : public RdKafka::EventCb {
32 public:
33     void event_cb(RdKafka::Event &event) {
34         switch (event.type())
35         {
36         case RdKafka::Event::EVENT_ERROR:
37             std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
38                 event.str() << std::endl;
39             if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
40             break;
41         case RdKafka::Event::EVENT_STATS:
42             std::cerr << "\"STATS\": " << event.str() << std::endl;
43             break;
44         case RdKafka::Event::EVENT_LOG:
45             fprintf(stderr, "LOG-%i-%s: %s\n",
46                 event.severity(), event.fac().c_str(), event.str().c_str());
47             break;
48         default:
49             std::cerr << "EVENT " << event.type() <<
50                 " (" << RdKafka::err2str(event.err()) << "): " <<
51                 event.str() << std::endl;
52             break;
53         }
54     }
55 };
56 class KafkaProducer
57 {
58 public:
59     KafkaProducer(const string &brokers, const string &topics, int nPpartition = 0);
60     virtual ~KafkaProducer();
61     bool Init();
62     void Send(const string &msg);
63     void Stop();
64 private:
65     RdKafka::Producer *m_pProducer = NULL;
66     RdKafka::Topic *m_pTopic = NULL;
67     KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
68     KafkaProducerEventCallBack m_producerEventCallBack;
69     std::string m_strTopics;
70     std::string m_strBroker;
71     bool m_bRun = false;
72     int m_nPpartition = 0;
73 };
74 #endif // KAFKAPRODUCER_H
kafkaproducer.h
  1 #include <iostream>
  2 #include "kafkaproducer.h"
  3  
  4 KafkaProducer::KafkaProducer(const string &brokers, const string &topics, int nPpartition /*= 1*/)
  5     : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
  6 {
  7 }
  8  
  9 KafkaProducer::~KafkaProducer()
 10 {
 11     Stop();
 12 }
 13  
 14 bool KafkaProducer::Init()
 15 {
 16     string errstr = "";
 17  
 18     /*
 19      * Create configuration objects
 20      */
 21     RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 22     RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 23  
 24     /*Set configuration properties,设置broker list*/
 25     if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
 26         std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
 27     }
 28     /* Set delivery report callback */
 29     conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
 30     conf->set("event_cb", &m_producerEventCallBack, errstr);
 31  
 32     /*
 33      * Create producer using accumulated global configuration.
 34     */
 35     m_pProducer = RdKafka::Producer::create(conf, errstr);
 36     if (!m_pProducer) {
 37         std::cerr << "Failed to create producer: " << errstr << std::endl;
 38         return false;
 39     }
 40     std::cout << "% Created producer " << m_pProducer->name() << std::endl;
 41     /*
 42      * Create topic handle.
 43     */
 44     m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
 45                                       tconf, errstr);
 46     if (!m_pTopic) {
 47         std::cerr << "Failed to create topic: " << errstr << std::endl;
 48         return false;
 49     }
 50     return true;
 51 }
 52 void KafkaProducer::Send(const string &msg)
 53 {
 54     if (!m_bRun)
 55         return;
 56     /*
 57      * Produce message
 58     */
 59     RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
 60                                                    RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
 61                                                    const_cast<char *>(msg.c_str()), msg.size(),
 62                                                    NULL, NULL);
 63     if (resp != RdKafka::ERR_NO_ERROR)
 64         std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
 65     else
 66         std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
 67  
 68     m_pProducer->poll(0);
 69  
 70     /* Wait for messages to be delivered */  //firecat add
 71     while (m_bRun && m_pProducer->outq_len() > 0) {
 72         std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
 73         m_pProducer->poll(1000);
 74     }
 75 }
 76  
 77 void KafkaProducer::Stop()
 78 {
 79     delete m_pTopic;
 80     delete m_pProducer;
 81 }
 82 
 83 
 84  
 85 int main()
 86 {
 87     //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
 88  
 89     KafkaProducer* Kafkapr_ = new KafkaProducer("localhost:9092", "test", 0);
 90     Kafkapr_->Init();
 91     Kafkapr_->Send("hello world!");
 92  
 93     char str_msg[] = "Hello Kafka!";
 94  
 95     while (fgets(str_msg, sizeof(str_msg), stdin))
 96     {
 97         size_t len = strlen(str_msg);
 98         if (str_msg[len - 1] == '\n')
 99         {
100             str_msg[--len] = '\0';
101         }
102  
103         if (strcmp(str_msg, "end") == 0)
104         {
105             break;
106         }
107  
108         Kafkapr_->Send(str_msg);
109     }
110  
111     return 0;
112 }
kafkaproducer.cpp

 

转载于:https://www.cnblogs.com/swyroc/p/10678379.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐