通过调用librdkafka库实现操作kafka
消费者:
1 #include <vector> 2 #include <string> 3 #include <memory> 4 #include <getopt.h> 5 #include <csignal> 6 #include <iostream> 7 #include "librdkafka/rdkafkacpp.h" 8 9 class kafka_consumer_client{ 10 public: 11 kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1); 12 //kafka_consumer_client(); 13 virtual ~kafka_consumer_client(); 14 15 bool initClient(); 16 bool consume(int timeout_ms); 17 void finalize(); 18 private: 19 void consumer(RdKafka::Message *msg, void *opt); 20 21 std::string brokers_; 22 std::string topics_; 23 std::string groupid_; 24 25 int64_t last_offset_ = 0; 26 RdKafka::Consumer *kafka_consumer_ = nullptr; 27 RdKafka::Topic *topic_ = nullptr; 28 int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING; 29 int32_t partition_ = 0; 30 31 };
1 #include "kafka_comsumer.h" 2 3 4 bool run_ = true; 5 6 static void sigterm (int sig) { 7 run_ = false; 8 } 9 10 kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset) 11 :brokers_(brokers), 12 topics_(topics), 13 groupid_(groupid), 14 offset_(offset){ 15 } 16 17 //kafka_consumer_client::kafka_consumer_client(){} 18 19 kafka_consumer_client::~kafka_consumer_client(){} 20 21 bool kafka_consumer_client::initClient(){ 22 RdKafka::Conf *conf = nullptr; 23 conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 24 if(!conf){ 25 fprintf(stderr, "RdKafka create global conf failed\n"); 26 return false; 27 } 28 29 std::string errstr; 30 /*设置broker list*/ 31 if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){ 32 fprintf(stderr, "RdKafka conf set brokerlist failed : %s\n", errstr.c_str()); 33 } 34 35 /*设置consumer group*/ 36 if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){ 37 fprintf(stderr, "RdKafka conf set group.id failed : %s\n", errstr.c_str()); 38 } 39 40 std::string strfetch_num = "10240000"; 41 /*每次从单个分区中拉取消息的最大尺寸*/ 42 if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){ 43 fprintf(stderr, "RdKafka conf set max.partition failed : %s\n", errstr.c_str()); 44 } 45 46 /*创建kafka consumer实例*/ 47 kafka_consumer_ = RdKafka::Consumer::create(conf, errstr); 48 if(!kafka_consumer_){ 49 fprintf(stderr, "failed to ceate consumer\n"); 50 } 51 delete conf; 52 53 RdKafka::Conf *tconf = nullptr; 54 /*创建kafka topic的配置*/ 55 tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 56 if(!tconf){ 57 fprintf(stderr, "RdKafka create topic conf failed\n"); 58 return false; 59 } 60 61 /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息, 62 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是 63 ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset), 64 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者, 65 在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始 66 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的 67 开始位置消费所有消息.*/ 68 if(tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK){ 69 fprintf(stderr, "RdKafka conf set auto.offset.reset failed : %s\n", errstr.c_str()); 70 } 71 72 topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr); 73 if(!topic_){ 74 fprintf(stderr, "RdKafka create topic failed : %s\n", errstr.c_str()); 75 } 76 delete tconf; 77 78 RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_); 79 if (resp != RdKafka::ERR_NO_ERROR){ 80 fprintf(stderr, "failed to start consumer : %s\n", RdKafka::err2str(resp).c_str()); 81 } 82 83 return true; 84 } 85 86 void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){ 87 switch(message->err()){ 88 case RdKafka::ERR__TIMED_OUT: 89 break; 90 case RdKafka::ERR_NO_ERROR: 91 printf("%.*s\n", 92 static_cast<int>(message->len()), 93 static_cast <const char*>(message->payload())); 94 last_offset_ = message->offset(); 95 96 break; 97 case RdKafka::ERR__PARTITION_EOF: 98 std::cerr << "%% Reached the end of the queue, offset: " << last_offset_ << std::endl; 99 break; 100 case RdKafka::ERR__UNKNOWN_TOPIC: 101 case RdKafka::ERR__UNKNOWN_PARTITION: 102 std::cerr << "Consume failed: " << message->errstr() << std::endl; 103 run_ = false; 104 break; 105 default: 106 std::cerr << "Consume failed: " << message->errstr() << std::endl; 107 run_ = false; 108 break; 109 } 110 } 111 112 bool kafka_consumer_client::consume(int timeout_ms){ 113 RdKafka::Message *msg = nullptr; 114 115 while(run_){ 116 msg = kafka_consumer_->consume(topic_, partition_, timeout_ms); 117 consumer(msg, nullptr); 118 kafka_consumer_->poll(0); 119 delete msg; 120 } 121 122 kafka_consumer_->stop(topic_, partition_); 123 if(topic_){ 124 delete topic_; 125 topic_ = nullptr; 126 } 127 if(kafka_consumer_){ 128 delete kafka_consumer_; 129 kafka_consumer_ = nullptr; 130 } 131 132 /*销毁kafka实例*/ 133 RdKafka::wait_destroyed(5000); 134 return true; 135 } 136
生产者:
1 #ifndef KAFKAPRODUCER_H 2 #define KAFKAPRODUCER_H 3 4 #include <iostream> 5 #include <string> 6 #include <cstdlib> 7 #include <cstdio> 8 #include <csignal> 9 #include <cstring> 10 #include <list> 11 #include <librdkafka/rdkafkacpp.h> 12 #include <vector> 13 #include <fstream> 14 15 using std::string; 16 using std::list; 17 using std::cout; 18 using std::endl; 19 using std::vector; 20 using std::fstream; 21 22 class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb { 23 public: 24 void dr_cb(RdKafka::Message &message) { 25 std::cout << "Message delivery for (" << message.len() << " bytes): " << 26 message.errstr() << std::endl; 27 if (message.key()) 28 std::cout << "Key: " << *(message.key()) << ";" << std::endl; 29 } 30 }; 31 class KafkaProducerEventCallBack : public RdKafka::EventCb { 32 public: 33 void event_cb(RdKafka::Event &event) { 34 switch (event.type()) 35 { 36 case RdKafka::Event::EVENT_ERROR: 37 std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << 38 event.str() << std::endl; 39 if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) 40 break; 41 case RdKafka::Event::EVENT_STATS: 42 std::cerr << "\"STATS\": " << event.str() << std::endl; 43 break; 44 case RdKafka::Event::EVENT_LOG: 45 fprintf(stderr, "LOG-%i-%s: %s\n", 46 event.severity(), event.fac().c_str(), event.str().c_str()); 47 break; 48 default: 49 std::cerr << "EVENT " << event.type() << 50 " (" << RdKafka::err2str(event.err()) << "): " << 51 event.str() << std::endl; 52 break; 53 } 54 } 55 }; 56 class KafkaProducer 57 { 58 public: 59 KafkaProducer(const string &brokers, const string &topics, int nPpartition = 0); 60 virtual ~KafkaProducer(); 61 bool Init(); 62 void Send(const string &msg); 63 void Stop(); 64 private: 65 RdKafka::Producer *m_pProducer = NULL; 66 RdKafka::Topic *m_pTopic = NULL; 67 KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack; 68 KafkaProducerEventCallBack m_producerEventCallBack; 69 std::string m_strTopics; 70 std::string m_strBroker; 71 bool m_bRun = false; 72 int m_nPpartition = 0; 73 }; 74 #endif // KAFKAPRODUCER_H
1 #include <iostream> 2 #include "kafkaproducer.h" 3 4 KafkaProducer::KafkaProducer(const string &brokers, const string &topics, int nPpartition /*= 1*/) 5 : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition) 6 { 7 } 8 9 KafkaProducer::~KafkaProducer() 10 { 11 Stop(); 12 } 13 14 bool KafkaProducer::Init() 15 { 16 string errstr = ""; 17 18 /* 19 * Create configuration objects 20 */ 21 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 22 RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 23 24 /*Set configuration properties,设置broker list*/ 25 if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){ 26 std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl; 27 } 28 /* Set delivery report callback */ 29 conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr); 30 conf->set("event_cb", &m_producerEventCallBack, errstr); 31 32 /* 33 * Create producer using accumulated global configuration. 34 */ 35 m_pProducer = RdKafka::Producer::create(conf, errstr); 36 if (!m_pProducer) { 37 std::cerr << "Failed to create producer: " << errstr << std::endl; 38 return false; 39 } 40 std::cout << "% Created producer " << m_pProducer->name() << std::endl; 41 /* 42 * Create topic handle. 43 */ 44 m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics, 45 tconf, errstr); 46 if (!m_pTopic) { 47 std::cerr << "Failed to create topic: " << errstr << std::endl; 48 return false; 49 } 50 return true; 51 } 52 void KafkaProducer::Send(const string &msg) 53 { 54 if (!m_bRun) 55 return; 56 /* 57 * Produce message 58 */ 59 RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition, 60 RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 61 const_cast<char *>(msg.c_str()), msg.size(), 62 NULL, NULL); 63 if (resp != RdKafka::ERR_NO_ERROR) 64 std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl; 65 else 66 std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl; 67 68 m_pProducer->poll(0); 69 70 /* Wait for messages to be delivered */ //firecat add 71 while (m_bRun && m_pProducer->outq_len() > 0) { 72 std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl; 73 m_pProducer->poll(1000); 74 } 75 } 76 77 void KafkaProducer::Stop() 78 { 79 delete m_pTopic; 80 delete m_pProducer; 81 } 82 83 84 85 int main() 86 { 87 //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0); 88 89 KafkaProducer* Kafkapr_ = new KafkaProducer("localhost:9092", "test", 0); 90 Kafkapr_->Init(); 91 Kafkapr_->Send("hello world!"); 92 93 char str_msg[] = "Hello Kafka!"; 94 95 while (fgets(str_msg, sizeof(str_msg), stdin)) 96 { 97 size_t len = strlen(str_msg); 98 if (str_msg[len - 1] == '\n') 99 { 100 str_msg[--len] = '\0'; 101 } 102 103 if (strcmp(str_msg, "end") == 0) 104 { 105 break; 106 } 107 108 Kafkapr_->Send(str_msg); 109 } 110 111 return 0; 112 }
所有评论(0)