kafka的c/c++高性能客户端librdkafka简介/使用librdkafka的C++接口实现简单的生产者和消费者
Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口性能:Librdkafka 是一款专为现代硬件使用而设计的高性能库,它尝试将内存复制保持在最小,可以让用户决定是需要高吞吐量还是低延迟的服务,性能调优的两个最重要的配置是:*batch.num.messages:在发送消息之前累积在本地队列中等待的消息的最小数量。
Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口
性能:
Librdkafka 是一款专为现代硬件使用而设计的高性能库,它尝试将内存复制保持在最小,可以让用户决定是需要高吞吐量还是低延迟的服务,性能调优的两个最重要的配置是:
*batch.num.messages:在发送消息之前累积在本地队列中等待的消息的最小数量。
*queue.buffering.max.ms:等待batch.num.messages多长时间来填写到本地队列中。
使用:
源码中的rdkafka.h、CONFIGURATION.md有Librdkafka的API的说明
初始化:
应用程序需要实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态,调用rd_kafka_new()创建。
还需要实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,通过调用`rd_kafka_topic_new()`创建。
`rd_kafka_t`和`rd_kafka_topic_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。
注意
1.应用程序可能会创建多个`rd_kafka_t`对象,并且它们不共享任何状态
2.一个`rd_kafka_topic_t`对象仅可以用于创建它的`rd_kafka_t`对象
配置
为了简化与Apache Kafka官方软件的集成,降低学习曲线,librdkafka实现了与Apache Kafka官方客户端相同的配置属性。
使用`rd_kafka_conf_set()` 和`rd_kafka_topic_conf_set()`在创建对象之前应用配置。
注意:
`rd_kafka.._conf_t`对象在传递给rd_kafka.._new()`之后不可重复使用,调用`rd_kafka.._new()`后,应用程序不需要free任何配置资源。
例子
-
rd_kafka_conf_t*conf;
-
char errstr[512];
-
conf = rd_kafka_conf_new();
-
rd_kafka_conf_set(conf, "compression.codec","snappy", errstr, sizeof(errstr));
-
rd_kafka_conf_set(conf, "batch.num.messages", "100",errstr, sizeof(errstr));
-
rd_kafka_new(RD_KAFKA_PRODUCER,conf);
线程和回调函数
librdkafka内部使用多个线程来充分利用硬件资源.
API是线程安全的,应用程序可以在任意时间调用其线程内的任意api函数.
poll-based的API用于向应用程序提供信号,应用程序定期调用` rd_kafka_poll() `,poll API将会调用如下的API:
*消息传递报告回调函数:消息传递成功或失败的信号,允许应用程序释放消息中使用的任何应用程序资源。
*错误回调函数:发出错误信号,这些错误通常具有信息性质,例如连接broker失败,应用程序通常不需要做任何处理,错误的类型通过` rd_kafka_resp_err_t `枚举值传递,包括远程的broke错误和本地错误。
可选回调不是通过poll触发的,可以通过任意线程调用:
*Logging callback :允许应用程序输出librdkafka生成的日志消息
*partitioner callback:应用提供的消息分区器,可在任意时刻、任意线程中调用,对于相同的键,可以调用多次
Brokers
Librdkafka需要至少一个brokers的初始化list,称作` bootstrap brokers `,通过"metadata.broker.list"配置属性或`rd_kafka_brokers_add()`来指定,用来连接所有bootstrapbrokers,并查询每个元数据的信息,其中包含brokers、topic、partitions和它们在kafka cluster中的leaders的完整列表,
Brokers的名字被指定为"host[:port]",端口可选(默认9092),host是主机名或ip地址,如果主机解析到多个地址,librdkafka将轮询每个尝试连接的地址,因此,可以使用包含所有brokers地址的DNS记录来提供可靠的bootstrap broker。
Producer API
使用`RD_KAFKA_PRODUCER`设置了`rd_kafka_t`对象,并设置了一个或多个`rd_kafka_topic_t`对象后,librdkafka已经准备好接收要发送给brokers的消息。
`rd_kafka_produce()`函数有如下参数:
*`rkt` - 需要produce的topic,之前通过`rd_kafka_topic_new()`函数创
*`partition` - 生产到的partition,如果设置为`RD_KAFKA_PARTITION_UA`(UnAssigned),那么配置的分区函数将会用来选择目标分区。
*`msgflags` - 0,或者是:
* `RD_KAFKA_MSG_F_COPY` - librdkafka会立刻生成payload的一份拷贝,当payload在非持久化内存中(例如堆)时使用。
* `RD_KAFKA_MSG_F_FREE` - librdkafka使用完payload后,会使用`free(3)`将其释放。
这两个指标是互斥的,如果既不需要copy也不需要free,那么这两个指标都不需要设置。
如果`RD_KAFKA_MSG_F_COPY`没有设置,将不会执行数据的复制,librdkafka将会hold住payload的指针直到消息成功传输或传输失败。
当librdkafka完成消息的传递,使应用程序重新获得payload内存的所有权后,传递报告回调函数将会被调用
如果设置了`RD_KAFKA_MSG_F_FREE`,传递报告回调函数不能对payload进行free
*`payload`,`len` - 消息的payload
*`key`,`keylen` - 可以用来进行消息分区的消息键
它将被传递到topic分区回调函数(如果存在的话),并在发送给broker的时候附加在消息上
*`msg_opaque` - 应用程序提供的一个可选的每条消息的不透明指针,在消息回调函数中提供,让应用程序引用一个特定的指针。
`rd_kafka_produce()`是一个非阻塞API,它会在内部队列中排列消息并立即返回。如果已排列的消息个数超过了"queue.buffering.max.messages"配置项,`rd_kafka_produce()`返回-1并将errno设置为`ENOBUFS`,从而提供了一种背压机制
Simple Consumer API
NOTE: 对于高级KafkaConsumer接口,查看rd_kafka_subscribe(rdkafka.h) 或者 KafkaConsumer (rdkafkacpp.h)。
使用`RD_KAFKA_CONSUMER`和`rd_kafka_topic_t`实例创建`rd_kafka_t`后,应用程序还必须通过调用`rd_kafka_consume_start()`来为给定的分区启动consumer。
`rd_kafka_consume_start()` 参数:
* `rkt` - 需要消费的topic,之前通过`rd_kafka_topic_new()`创建。
*`partition` - 从哪个分区消费
*`offset` - 开始消费的消息offset,这可能是绝对消息偏移或两个特殊偏移之一:
`RD_KAFKA_OFFSET_BEGINNING` :从partition队列的起始位置开始消费(最老的message)
`RD_KAFKA_OFFSET_END`:在下一个要生产到该partition上的消息处开始消费
`RD_KAFKA_OFFSET_STORED`:使用存储的offset
一个topic+partition的consumer启动后,librdkafka将会尝试通过反复从broker获取批次消息以保持本地队列中保存"queued.min.messages"条消息,然后这个本地消息队列将会通过三个不同的consume API传递给应用程序:
*`rd_kafka_consume()` - consume单条消息
*`rd_kafka_consume_batch()` - consume单条或多条消息
*`rd_kafka_consume_callback()` - consume本地队列中的所有消息,并给每条消息调用一个回调函数
这三个API按照性能升序排列,`rd_kafka_consume()`最慢,`rd_kafka_consume_callback()`最快。
使用`rd_kafka_message_t`类型标识一条已消费的消息,其成员为:
*`err` - 发回到应用程序的错误信号,如果不为0,那么`payload`成员将被认为是一条错误消息,`err`是错误码(`rd_kafka_resp_err_t`),如果为0,`payload`则包含消息数据。
*`rkt`,`partition` - 该消息的topic和partition
*`payload`,`len` - payload消息,或者是错误信息(err!=0)
*`key`,`key_len` - 生产者指定的可选消息key
*`offset` - Message offset
`payload`和`key`以及整个消息的内存,属于librdkafka,调用`rd_kafka_message_destroy()`后不可再次使用,librdkafka将为该消息集的所有消息payloads共享相同的消息集接收缓冲存储器,以避免过度复制,这意味着如果应用程序决定hang on单个rd_kafka_message_t,它将阻止从相同消息集中释放所有其他消息的备份内存。
当应用程序完成从topic+partition的消息消费后,需要调用`rd_kafka_consume_stop()`来停止这个consumer,这也将清除本地队列中的当前的消息。
Offset management
broker version >= 0.9.0结合使用高版本的KafkaConsumer接口,可实现基于Broker的offset管理(查看rdkafka.h或 rdkafkacpp.h)
还可以通过本地文件存储来实现Offset管理,通过如下的topic配置参数,offset被永久写在本地文件中:
* `auto.commit.enable`
* `auto.commit.interval.ms`
* `offset.store.path`
* `offset.store.sync.interval.ms`
目前还没有对ZooKeeper的偏移量管理的支持。
Consumer groups
当kafka broker 版本>= 0.9 ,librdkafka支持基于broker的consumer groups
Topics
Librdkafka支持自动创建topic,broker需要配置"auto.create.topics.enable=true"
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
一.编译librdkafka
环境:Fedora 20,32位
依赖:pthreads(必选),zlib(可选),libssl-dev(可选),libsasl2-dev(可选)
先查看自己的linux上是否安装了pthreads,指令如下:
# locate libpthread
因为我之前安过了,所以可以直接编译librdkafka,没有安的下个pthreads的源码——configure、make、make install。
开始编译librdkafka,指令如下:
-
# ./configure
-
# make
-
# make install
lib库会被默认安装到/usr/local/lib目录
头文件被默认安装到/usr/local/include/librdkafka目录
二.生产者
新建Qt控制台工程KafkaProducer,Pro文件如下:
-
#-------------------------------------------------
-
#
-
# Project created by QtCreator 2018-03-27T19:45:09
-
#
-
#-------------------------------------------------
-
QT -= gui core
-
TARGET = KafkaProducer
-
CONFIG += console
-
CONFIG -= app_bundle
-
TEMPLATE = app
-
SOURCES += main.cpp
-
INCLUDEPATH += /usr/local/include/librdkafka
-
LIBS += -L/usr/local/lib -lrdkafka
-
LIBS += -L/usr/local/lib -lrdkafka++
main.cpp文件如下:
-
#include <iostream>
-
#include <string>
-
#include <cstdlib>
-
#include <cstdio>
-
#include <csignal>
-
#include <cstring>
-
#include <getopt.h>
-
#include "rdkafkacpp.h"
-
static bool run = true;
-
static void sigterm (int sig) {
-
run = false;
-
}
-
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
-
public:
-
void dr_cb (RdKafka::Message &message) {
-
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
-
message.errstr() << std::endl;
-
if (message.key())
-
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
-
}
-
};
-
class ExampleEventCb : public RdKafka::EventCb {
-
public:
-
void event_cb (RdKafka::Event &event) {
-
switch (event.type())
-
{
-
case RdKafka::Event::EVENT_ERROR:
-
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
-
event.str() << std::endl;
-
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
-
run = false;
-
break;
-
case RdKafka::Event::EVENT_STATS:
-
std::cerr << "\"STATS\": " << event.str() << std::endl;
-
break;
-
case RdKafka::Event::EVENT_LOG:
-
fprintf(stderr, "LOG-%i-%s: %s\n",
-
event.severity(), event.fac().c_str(), event.str().c_str());
-
break;
-
default:
-
std::cerr << "EVENT " << event.type() <<
-
" (" << RdKafka::err2str(event.err()) << "): " <<
-
event.str() << std::endl;
-
break;
-
}
-
}
-
};
-
int main ()
-
{
-
std::string brokers = "localhost";
-
std::string errstr;
-
std::string topic_str="test";
-
int32_t partition = RdKafka::Topic::PARTITION_UA;
-
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-
conf->set("bootstrap.servers", brokers, errstr);
-
ExampleEventCb ex_event_cb;
-
conf->set("event_cb", &ex_event_cb, errstr);
-
signal(SIGINT, sigterm);
-
signal(SIGTERM, sigterm);
-
ExampleDeliveryReportCb ex_dr_cb;
-
conf->set("dr_cb", &ex_dr_cb, errstr);
-
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
-
if (!producer) {
-
std::cerr << "Failed to create producer: " << errstr << std::endl;
-
exit(1);
-
}
-
std::cout << "% Created producer " << producer->name() << std::endl;
-
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
-
tconf, errstr);
-
if (!topic) {
-
std::cerr << "Failed to create topic: " << errstr << std::endl;
-
exit(1);
-
}
-
for (std::string line; run && std::getline(std::cin, line);) {
-
if (line.empty()) {
-
producer->poll(0);
-
continue;
-
}
-
RdKafka::ErrorCode resp =
-
producer->produce(topic, partition,
-
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
-
const_cast<char *>(line.c_str()), line.size(),
-
NULL, NULL);
-
if (resp != RdKafka::ERR_NO_ERROR)
-
std::cerr << "% Produce failed: " <<
-
RdKafka::err2str(resp) << std::endl;
-
else
-
std::cerr << "% Produced message (" << line.size() << " bytes)" <<
-
std::endl;
-
producer->poll(0);
-
}
-
run = true;
-
// 退出前处理完输出队列中的消息
-
while (run && producer->outq_len() > 0) {
-
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
-
producer->poll(1000);
-
}
-
delete conf;
-
delete tconf;
-
delete topic;
-
delete producer;
-
RdKafka::wait_destroyed(5000);
-
return 0;
-
}
三.消费者
新建Qt控制台工程KafkaConsumer,Pro文件如下:
-
#-------------------------------------------------
-
#
-
# Project created by QtCreator 2018-03-28T16:27:54
-
#
-
#-------------------------------------------------
-
QT -= gui core
-
TARGET = KafkaConsumer
-
CONFIG += console
-
CONFIG -= app_bundle
-
TEMPLATE = app
-
SOURCES += main.cpp
-
INCLUDEPATH += /usr/local/include/librdkafka
-
LIBS += -L/usr/local/lib -lrdkafka
-
LIBS += -L/usr/local/lib -lrdkafka++
main.cpp文件如下:
-
#include <iostream>
-
#include <string>
-
#include <cstdlib>
-
#include <cstdio>
-
#include <csignal>
-
#include <cstring>
-
#include <sys/time.h>
-
#include <getopt.h>
-
#include <unistd.h>
-
#include "rdkafkacpp.h"
-
static bool run = true;
-
static bool exit_eof = true;
-
static int eof_cnt = 0;
-
static int partition_cnt = 0;
-
static int verbosity = 1;
-
static long msg_cnt = 0;
-
static int64_t msg_bytes = 0;
-
static void sigterm (int sig) {
-
run = false;
-
}
-
class ExampleEventCb : public RdKafka::EventCb {
-
public:
-
void event_cb (RdKafka::Event &event) {
-
switch (event.type())
-
{
-
case RdKafka::Event::EVENT_ERROR:
-
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
-
event.str() << std::endl;
-
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
-
run = false;
-
break;
-
case RdKafka::Event::EVENT_STATS:
-
std::cerr << "\"STATS\": " << event.str() << std::endl;
-
break;
-
case RdKafka::Event::EVENT_LOG:
-
fprintf(stderr, "LOG-%i-%s: %s\n",
-
event.severity(), event.fac().c_str(), event.str().c_str());
-
break;
-
case RdKafka::Event::EVENT_THROTTLE:
-
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
-
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
-
break;
-
default:
-
std::cerr << "EVENT " << event.type() <<
-
" (" << RdKafka::err2str(event.err()) << "): " <<
-
event.str() << std::endl;
-
break;
-
}
-
}
-
};
-
void msg_consume(RdKafka::Message* message, void* opaque) {
-
switch (message->err()) {
-
case RdKafka::ERR__TIMED_OUT:
-
//std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl;
-
break;
-
case RdKafka::ERR_NO_ERROR:
-
/* Real message */
-
msg_cnt++;
-
msg_bytes += message->len();
-
if (verbosity >= 3)
-
std::cerr << "Read msg at offset " << message->offset() << std::endl;
-
RdKafka::MessageTimestamp ts;
-
ts = message->timestamp();
-
if (verbosity >= 2 &&
-
ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
-
std::string tsname = "?";
-
if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)
-
tsname = "create time";
-
else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)
-
tsname = "log append time";
-
std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;
-
}
-
if (verbosity >= 2 && message->key()) {
-
std::cout << "Key: " << *message->key() << std::endl;
-
}
-
if (verbosity >= 1) {
-
printf("%.*s\n",
-
static_cast<int>(message->len()),
-
static_cast<const char *>(message->payload()));
-
}
-
break;
-
case RdKafka::ERR__PARTITION_EOF:
-
/* Last message */
-
if (exit_eof && ++eof_cnt == partition_cnt) {
-
std::cerr << "%% EOF reached for all " << partition_cnt <<
-
" partition(s)" << std::endl;
-
run = false;
-
}
-
break;
-
case RdKafka::ERR__UNKNOWN_TOPIC:
-
case RdKafka::ERR__UNKNOWN_PARTITION:
-
std::cerr << "Consume failed: " << message->errstr() << std::endl;
-
run = false;
-
break;
-
default:
-
/* Errors */
-
std::cerr << "Consume failed: " << message->errstr() << std::endl;
-
run = false;
-
}
-
}
-
class ExampleConsumeCb : public RdKafka::ConsumeCb {
-
public:
-
void consume_cb (RdKafka::Message &msg, void *opaque) {
-
msg_consume(&msg, opaque);
-
}
-
};
-
int main () {
-
std::string brokers = "localhost";
-
std::string errstr;
-
std::string topic_str="test";
-
std::vector<std::string> topics;
-
std::string group_id="101";
-
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-
//group.id必须设置
-
if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {
-
std::cerr << errstr << std::endl;
-
exit(1);
-
}
-
topics.push_back(topic_str);
-
//bootstrap.servers可以替换为metadata.broker.list
-
conf->set("bootstrap.servers", brokers, errstr);
-
ExampleConsumeCb ex_consume_cb;
-
conf->set("consume_cb", &ex_consume_cb, errstr);
-
ExampleEventCb ex_event_cb;
-
conf->set("event_cb", &ex_event_cb, errstr);
-
conf->set("default_topic_conf", tconf, errstr);
-
signal(SIGINT, sigterm);
-
signal(SIGTERM, sigterm);
-
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
-
if (!consumer) {
-
std::cerr << "Failed to create consumer: " << errstr << std::endl;
-
exit(1);
-
}
-
std::cout << "% Created consumer " << consumer->name() << std::endl;
-
RdKafka::ErrorCode err = consumer->subscribe(topics);
-
if (err) {
-
std::cerr << "Failed to subscribe to " << topics.size() << " topics: "
-
<< RdKafka::err2str(err) << std::endl;
-
exit(1);
-
}
-
while (run) {
-
//5000毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUT
-
RdKafka::Message *msg = consumer->consume(5000);
-
msg_consume(msg, NULL);
-
delete msg;
-
}
-
consumer->close();
-
delete conf;
-
delete tconf;
-
delete consumer;
-
std::cerr << "% Consumed " << msg_cnt << " messages ("
-
<< msg_bytes << " bytes)" << std::endl;
-
//应用退出之前等待rdkafka清理资源
-
RdKafka::wait_destroyed(5000);
-
return 0;
-
}
四.测试
先启动zookeeper服务和kafka服务,详见:kafka的编译和使用,然后再启动生产者和消费者。
生产者循环等待用户输入,输入后回车,消息就发布出去了,此时消费者显示订阅到的内容。
转载地址:
https://blog.csdn.net/lijinqi1987/article/details/76571757
https://blog.csdn.net/caoshangpa/article/details/79786100
更多推荐
所有评论(0)