对librdkafka的C++封装
librdkafka是kafka消息系统的C/C++跨平台开源库,关于如何搭建kafka服务器,网上有很多介绍的资料。我在ubuntu16.04上搭建了一个 kafka_2.12-0.11.0.1 + zookeeper-3.4.10 服务器,并且在ubuntu上编译成功了librdkafka_0.11.0.orig.tar.gz一、测试kafka非集群服务器是否成功启动1.
librdkafka是kafka消息系统的C/C++跨平台开源库,关于如何搭建kafka服务器,网上有很多介绍的资料。
我在ubuntu16.04上搭建了一个 kafka_2.12-0.11.0.1 + zookeeper-3.4.10 服务器,并且在ubuntu上编译成功了librdkafka_0.11.0.orig.tar.gz
一、测试kafka非集群服务器是否成功启动
1. cd /home/guoke/kafka/zookeeper-3.4.10/bin
2. export ZOOKEEPER_HOME=/home/guoke/kafka/zookeeper-3.4.10
3. export PATH=$ZOOKEEPER_HOME/bin:$PATH
4. $ZOOKEEPER_HOME/bin/zkServer.sh start //启动zookeeper服务
5. cd /home/guoke/kafka/kafka_2.12-0.11.0.1
6. nohup bin/kafka-server-start.sh config/server.properties & //启动kafka服务
7. 另外打开一个命令终端运行 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
8. 另外打开一个命令终端运行 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
9. 在producer终端输入"hello world!"后,会在consumer终端显示出刚才的字符串,则表示kafka搭建成功。
二、在win7上利用librdkafka开发出producer和consumer,并通过ubuntu上的kafka进行消息中转。
就好比打电话一样,用户A拨打了用户B的手机号码,A说的话,先传到通信卫星上,然后通信卫星将此语音信号转发到用户B的手机上,在这个例子中,kafka就相当于通信卫星的作用,用户A就是producer生产者,用户B就是consumer消费者。
废话不多说,上代码。
------------------------下面是producer的代码----------------------------------------
//KafkaProducer.h
//对librdkafka进行封装的kafka消息生产者C++类
//386520874@qq.com & 2017.10.10
#pragma once
#include "librdkafka/rdkafka.h"
class CKafkaProducer
{
public:
rd_kafka_t * m_kafka_handle; //kafka消息生产者句柄
rd_kafka_topic_t * m_kafka_topic; //kafka消息主题名称
rd_kafka_conf_t * m_kafka_conf; //kafka消息配置
rd_kafka_topic_conf_t * m_kafka_topic_conf;
rd_kafka_topic_partition_list_t * m_kafka_topic_partition_list;
int m_partition;
public:
CKafkaProducer();
~CKafkaProducer();
int init(char *topic, char *brokers, int partition); //topic="my_test"; brokers="192.168.1.42:9092"; partition=0;
int sendMessage(char *str, int len); //向kafka服务器发送消息
static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
};
#include "KafkaProducer.h"
CKafkaProducer::CKafkaProducer()
{
m_kafka_handle = NULL;
m_kafka_topic = NULL;
m_kafka_conf = NULL;
m_kafka_topic_conf = NULL;
m_kafka_topic_partition_list = NULL;
m_partition = RD_KAFKA_PARTITION_UA;
}
CKafkaProducer::~CKafkaProducer()
{
rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds
rd_kafka_topic_destroy(m_kafka_topic);
rd_kafka_destroy(m_kafka_handle);
rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}
int CKafkaProducer::init(char *topic, char *brokers, int partition)
{
int ret = 0;
rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
char errstr[512] = {0};
m_kafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);
//---------Producer config-------------------
ret_conf = rd_kafka_conf_set(m_kafka_conf, "queue.buffering.max.messages", "500000", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
ret_conf = rd_kafka_conf_set(m_kafka_conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
ret_conf = rd_kafka_conf_set(m_kafka_conf, "retry.backoff.ms", "500", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
//---------Kafka topic config-------------------
m_kafka_topic_conf = rd_kafka_topic_conf_new();
ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic
m_partition = partition;
//---------Create Kafka handle-------------------
m_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, m_kafka_conf, errstr, sizeof(errstr));
if(m_kafka_handle == NULL)
{
printf("Error: Failed to create Kafka producer: %s\n", errstr);
return -1;
}
//---------Add broker(s)-------------------
if(brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
{
printf("Error: No valid brokers specified\n");
return -2;
}
m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups
return ret;
}
int CKafkaProducer::sendMessage(char *str, int len)
{
int ret = 0;
if(str == NULL){return -1;}
if(len <= 0){return -2;}
char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;
// char * buf = (char *)malloc(len);
// memcpy(buf, str, len);
//------------向kafka服务器发送消息----------------
ret = rd_kafka_produce(m_kafka_topic, partition, RD_KAFKA_MSG_F_COPY | RD_KAFKA_MSG_F_FREE, str, len, NULL, 0, NULL);
if(ret == -1)
{
rd_kafka_resp_err_t err = rd_kafka_last_error();
if(err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
{
printf("Error: No such partition: %"PRId32"\n", partition);
}else
{
printf("Error: produce error: %s%s\n", rd_kafka_err2str(err), err == RD_KAFKA_RESP_ERR__QUEUE_FULL ? " (backpressure)" : "");
}
rd_kafka_poll(m_kafka_handle, 10); //Poll to handle delivery reports
ret = -2;
goto end;
}
int ret2 = rd_kafka_poll(m_kafka_handle, 0);
end:
//---------------------
// free(buf);
// buf = NULL;
return ret;
}
void CKafkaProducer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}
void CKafkaProducer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, broker_name, broker_id);
}
void CKafkaProducer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
int i;
int verbosity = 1;
if(err || verbosity >= 2)
{
printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
}
for(i = 0; i < offsets->cnt; i++)
{
rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];
if(rktpar->err || verbosity >= 2)
{
printf("%% %s [%"PRId32"] @ %"PRId64": %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
}
}
}
int CKafkaProducer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
printf("%s\n", json);
return 0;
}
// producer main.cpp : 定义控制台应用程序的入口点。
//
#include "KafkaProducer.h"
int main(int argc, char *argv[])
{
CKafkaProducer kp;
char topic[] = "test";
char brokers[] = "192.168.2.73:9092";
int partition = 0;
char str_msg[] = "Hello Kafka!";
int ret = 0;
ret = kp.init(topic, brokers, partition);
if(ret != 0){printf("Error: kp.init(): ret=%d;\n", ret); return 0;}
ret = kp.sendMessage(str_msg, strlen(str_msg)); //向kafka服务器发送消息
if(ret != 0){printf("Error: kp.sendMessage(): ret=%d;\n", ret); return 0;}
return 0;
}
------------------------下面是 consumer的代码 ----------------------------------------
//KafkaConsumer.h
//对librdkafka进行封装的kafka消息消费者C++类
//386520874@qq.com & 2017.10.10
#pragma once
#include "librdkafka/rdkafka.h"
typedef void (* consumer_callback)(rd_kafka_message_t *rkmessage, void *opaque);
class CKafkaConsumer
{
public:
rd_kafka_t * m_kafka_handle; //kafka消息生产者句柄
rd_kafka_topic_t * m_kafka_topic; //kafka消息主题名称
rd_kafka_conf_t * m_kafka_conf; //kafka消息配置
rd_kafka_topic_conf_t * m_kafka_topic_conf;
rd_kafka_topic_partition_list_t * m_kafka_topic_partition_list;
rd_kafka_queue_t * m_kafka_queue;
consumer_callback m_consumer_callback; //消息回调函数
void * m_consumer_callback_param; //消息回调函数的参数
public:
CKafkaConsumer();
~CKafkaConsumer();
int init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb); //topic="my_test"; brokers="192.168.1.42:9092"; partitions="0,1,2"; groupId="my_group";
int getMessage(); //从kafka服务器接收消息
static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf);
static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque);
};
#include "KafkaConsumer.h"
CKafkaConsumer::CKafkaConsumer()
{
m_kafka_handle = NULL;
m_kafka_topic = NULL;
m_kafka_conf = NULL;
m_kafka_topic_conf = NULL;
m_kafka_topic_partition_list = NULL;
m_kafka_queue = NULL;
m_consumer_callback = NULL;
m_consumer_callback_param = NULL;
}
CKafkaConsumer::~CKafkaConsumer()
{
rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds
rd_kafka_queue_destroy(m_kafka_queue);
rd_kafka_topic_destroy(m_kafka_topic);
rd_kafka_destroy(m_kafka_handle);
rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}
int CKafkaConsumer::init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb)
{
int ret = 0;
rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
char errstr[512] = {0};
if(topic == NULL){return -1;}
if(brokers == NULL){return -1;}
if(groupId == NULL){return -1;}
m_consumer_callback = consumer_cb;
m_consumer_callback_param = param_cb;
m_kafka_conf = rd_kafka_conf_new();
rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);
rd_kafka_conf_set_log_cb(m_kafka_conf, logger);
//---------Consumer config-------------------
ret_conf = rd_kafka_conf_set(m_kafka_conf, "queued.min.messages", "1000000", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
ret_conf = rd_kafka_conf_set(m_kafka_conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
// ret_conf = rd_kafka_conf_set(m_kafka_conf, "group.id", groupId, errstr, sizeof(errstr));
// if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
//---------Kafka topic config-------------------
m_kafka_topic_conf = rd_kafka_topic_conf_new();
ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_topic_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);
//------------解析json字符串------------------------
int cnt = 0;
int len = strlen(partitions);
char * pTemp = new char[len + 1];
char * pTemp2 = pTemp;
sprintf(pTemp, "%s", partitions); //partitions="0,1,2";
while(*pTemp != '\0')
{
char * s = strstr(pTemp, ",");
if(s != NULL)
{
*s = '\0';
}
int partition = atoi(pTemp);
rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic
if(s != NULL)
{
pTemp = s + 1;
}else
{
break;
}
}
if(pTemp2){delete [] pTemp2; pTemp2 = NULL;}
//---------Create Kafka handle-------------------
m_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, m_kafka_conf, errstr, sizeof(errstr));
if(m_kafka_handle == NULL)
{
printf("Error: Failed to create Kafka producer: %s\n", errstr);
return -1;
}
rd_kafka_poll_set_consumer(m_kafka_handle); //Redirect rd_kafka_poll() to consumer_poll()
//---------Add broker(s)-------------------
if(brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
{
printf("Error: No valid brokers specified\n");
return -2;
}
// char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;
int partition_cnt = m_kafka_topic_partition_list->cnt;
m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups
//-----------------------------------------
// int64_t seek_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED
// rd_kafka_resp_err_t err = rd_kafka_seek(m_kafka_topic, partition, seek_offset, 2000);
m_kafka_queue = rd_kafka_queue_new(m_kafka_handle);
return ret;
}
int CKafkaConsumer::getMessage()
{
int ret = 0;
char * topic = m_kafka_topic_partition_list->elems[0].topic;
int partition = m_kafka_topic_partition_list->elems[0].partition;
int partition_cnt = m_kafka_topic_partition_list->cnt;
int64_t start_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED
//------------从kafka服务器接收消息----------------
for(int i = 0; i < partition_cnt; i++)
{
int partition = m_kafka_topic_partition_list->elems[i].partition;
int r = rd_kafka_consume_start_queue(m_kafka_topic, partition, start_offset, m_kafka_queue);
if(r == -1)
{
printf("Error: creating queue: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
return -1;
}
}
while(1)
{
int r = rd_kafka_consume_callback_queue(m_kafka_queue, 1000, msg_consume, this); //Queue mode
if(r <= 0){rd_kafka_poll(m_kafka_handle, 1000); continue;}
rd_kafka_poll(m_kafka_handle, 0); //Poll to handle stats callbacks
// Sleep(1000);
// break;
}
//----------Stop consuming------------------------------
for(int i = 0; i < partition_cnt; i++)
{
int r = rd_kafka_consume_stop(m_kafka_topic, (int32_t)i);
if(r == -1)
{
printf("Error: in consume_stop: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
}
}
return ret;
}
void CKafkaConsumer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}
void CKafkaConsumer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, broker_name, broker_id);
}
void CKafkaConsumer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
int i;
int verbosity = 1;
if(err || verbosity >= 2)
{
printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
}
for(i = 0; i < offsets->cnt; i++)
{
rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];
if(rktpar->err || verbosity >= 2)
{
printf("%% %s [%"PRId32"] @ %"PRId64": %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
}
}
}
int CKafkaConsumer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
printf("%s\n", json);
return 0;
}
void CKafkaConsumer::logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
fprintf(stdout, "RDKAFKA-%i-%s: %s: %s\n", level, fac, rd_kafka_name(rk), buf);
}
void CKafkaConsumer::msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
CKafkaConsumer * p = (CKafkaConsumer *)opaque;
if(p && p->m_consumer_callback)
{
p->m_consumer_callback(rkmessage, p->m_consumer_callback_param);
return;
}
if(rkmessage->err)
{
if(rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
printf("[INFO] Consumer reached end of %s [%"PRId32"] message queue at offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset);
return;
}
printf("Error: Consume error for topic \"%s\" [%"PRId32"] offset %"PRId64": %s\n", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage));
return;
}
if(rkmessage->key_len)
{
printf("Key: %d: %s\n", (int)rkmessage->key_len, (char *)rkmessage->key);
}
printf("%d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}
// consumer main.cpp : 定义控制台应用程序的入口点。
//
#include "KafkaConsumer.h"
static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
printf("[MSG] %d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}
int main(int argc, char *argv[])
{
CKafkaConsumer kc;
char topic[] = "test";
char brokers[] = "192.168.2.73:9092";
char partitions[] = "0";
char groupId[] = "my_group1";
consumer_callback consumer_cb = msg_consume; //注册消息回调函数,用户可以自定义此函数
void * param_cb = NULL; //param_cb=this;
int ret = 0;
ret = kc.init(topic, brokers, partitions, groupId, consumer_cb, param_cb);
if(ret != 0){printf("Error: kc.init(): ret=%d;\n", ret); return 0;}
ret = kc.getMessage(); //从kafka服务器接收消息
if(ret != 0){printf("Error: kc.getMessage(): ret=%d;\n", ret); return 0;}
return 0;
}
-----------------------测试结果-------------------------------
工程源码链接:http://download.csdn.net/download/jfu22/10014222
更多推荐
所有评论(0)