librdkafka是kafka消息系统的C/C++跨平台开源库,关于如何搭建kafka服务器,网上有很多介绍的资料。

我在ubuntu16.04上搭建了一个 kafka_2.12-0.11.0.1 + zookeeper-3.4.10 服务器,并且在ubuntu上编译成功了librdkafka_0.11.0.orig.tar.gz


一、测试kafka非集群服务器是否成功启动

1. cd /home/guoke/kafka/zookeeper-3.4.10/bin

2. export ZOOKEEPER_HOME=/home/guoke/kafka/zookeeper-3.4.10

3. export PATH=$ZOOKEEPER_HOME/bin:$PATH

4. $ZOOKEEPER_HOME/bin/zkServer.sh start    //启动zookeeper服务

5. cd /home/guoke/kafka/kafka_2.12-0.11.0.1

6. nohup bin/kafka-server-start.sh config/server.properties &   //启动kafka服务

7. 另外打开一个命令终端运行 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

8. 另外打开一个命令终端运行 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

9. 在producer终端输入"hello world!"后,会在consumer终端显示出刚才的字符串,则表示kafka搭建成功。


二、在win7上利用librdkafka开发出producer和consumer,并通过ubuntu上的kafka进行消息中转。

      就好比打电话一样,用户A拨打了用户B的手机号码,A说的话,先传到通信卫星上,然后通信卫星将此语音信号转发到用户B的手机上,在这个例子中,kafka就相当于通信卫星的作用,用户A就是producer生产者,用户B就是consumer消费者。

    废话不多说,上代码。

------------------------下面是producer的代码----------------------------------------

//KafkaProducer.h
//对librdkafka进行封装的kafka消息生产者C++类
//386520874@qq.com & 2017.10.10

#pragma once

#include "librdkafka/rdkafka.h"


class CKafkaProducer
{
public:
	rd_kafka_t *                         m_kafka_handle;  //kafka消息生产者句柄
	rd_kafka_topic_t *                   m_kafka_topic;   //kafka消息主题名称
	rd_kafka_conf_t *                    m_kafka_conf;    //kafka消息配置
	rd_kafka_topic_conf_t *              m_kafka_topic_conf;
	rd_kafka_topic_partition_list_t *    m_kafka_topic_partition_list;

	int m_partition;

public:
	CKafkaProducer();
	~CKafkaProducer();

	int init(char *topic, char *brokers, int partition); //topic="my_test"; brokers="192.168.1.42:9092"; partition=0;
	int sendMessage(char *str, int len); //向kafka服务器发送消息

	static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
	static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
	static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
	static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
};

#include "KafkaProducer.h"


CKafkaProducer::CKafkaProducer()
{
	m_kafka_handle = NULL;
	m_kafka_topic = NULL;
	m_kafka_conf = NULL;
	m_kafka_topic_conf = NULL;
	m_kafka_topic_partition_list = NULL;

	m_partition = RD_KAFKA_PARTITION_UA;
}


CKafkaProducer::~CKafkaProducer()
{
	rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds
	
	rd_kafka_topic_destroy(m_kafka_topic);
	rd_kafka_destroy(m_kafka_handle);
	rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}


int CKafkaProducer::init(char *topic, char *brokers, int partition)
{
	int ret = 0;
	rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
	char errstr[512] = {0};

	m_kafka_conf = rd_kafka_conf_new();

	rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
	rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
	rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
	rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);

	//---------Producer config-------------------
	ret_conf = rd_kafka_conf_set(m_kafka_conf, "queue.buffering.max.messages", "500000", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	ret_conf = rd_kafka_conf_set(m_kafka_conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	ret_conf = rd_kafka_conf_set(m_kafka_conf, "retry.backoff.ms", "500", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	//---------Kafka topic config-------------------
	m_kafka_topic_conf = rd_kafka_topic_conf_new();

	ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);

	rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic

	m_partition = partition;

	//---------Create Kafka handle-------------------
	m_kafka_handle = rd_kafka_new(RD_KAFKA_PRODUCER, m_kafka_conf, errstr, sizeof(errstr));

	if(m_kafka_handle == NULL)
	{
		printf("Error: Failed to create Kafka producer: %s\n", errstr);
		return -1;
	}

	//---------Add broker(s)-------------------
	if(brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
	{
		printf("Error: No valid brokers specified\n");
		return -2;
	}

	m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups

	return ret;
}


int CKafkaProducer::sendMessage(char *str, int len)
{
	int ret = 0;

	if(str == NULL){return -1;}
	if(len <= 0){return -2;}

	char * topic = m_kafka_topic_partition_list->elems[0].topic;
	int partition = m_kafka_topic_partition_list->elems[0].partition;

//	char * buf = (char *)malloc(len);
//	memcpy(buf, str, len);

	//------------向kafka服务器发送消息----------------
	ret = rd_kafka_produce(m_kafka_topic, partition, RD_KAFKA_MSG_F_COPY | RD_KAFKA_MSG_F_FREE, str, len, NULL, 0, NULL);

	if(ret == -1)
	{
		rd_kafka_resp_err_t err = rd_kafka_last_error();

		if(err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
		{
			printf("Error: No such partition: %"PRId32"\n", partition);
		}else
		{
			printf("Error: produce error: %s%s\n", rd_kafka_err2str(err), err == RD_KAFKA_RESP_ERR__QUEUE_FULL ? " (backpressure)" : "");
		}

		rd_kafka_poll(m_kafka_handle, 10); //Poll to handle delivery reports

		ret = -2;
		goto end;
	}

	int ret2 = rd_kafka_poll(m_kafka_handle, 0);

end:
	//---------------------
//	free(buf);
//	buf = NULL;

	return ret;
}


void CKafkaProducer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
	printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}


void CKafkaProducer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
	printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, broker_name, broker_id);
}


void CKafkaProducer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
	int i;
	int verbosity = 1;

	if(err || verbosity >= 2)
	{
		printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
	}

	for(i = 0; i < offsets->cnt; i++)
	{
		rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];

		if(rktpar->err || verbosity >= 2)
		{
			printf("%%  %s [%"PRId32"] @ %"PRId64": %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
		}
	}
}


int CKafkaProducer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
	printf("%s\n", json);
	return 0;
}

// producer main.cpp : 定义控制台应用程序的入口点。
//

#include "KafkaProducer.h"


int main(int argc, char *argv[])
{
	CKafkaProducer kp;

	char topic[] = "test";
	char brokers[] = "192.168.2.73:9092";
	int partition = 0;

	char str_msg[] = "Hello Kafka!";
	int ret = 0;

	ret = kp.init(topic, brokers, partition);
	if(ret != 0){printf("Error: kp.init(): ret=%d;\n", ret); return 0;}

	ret = kp.sendMessage(str_msg, strlen(str_msg)); //向kafka服务器发送消息
	if(ret != 0){printf("Error: kp.sendMessage(): ret=%d;\n", ret); return 0;}

	return 0;
}

------------------------下面是 consumer的代码 ----------------------------------------

//KafkaConsumer.h
//对librdkafka进行封装的kafka消息消费者C++类
//386520874@qq.com & 2017.10.10

#pragma once

#include "librdkafka/rdkafka.h"


typedef void (* consumer_callback)(rd_kafka_message_t *rkmessage, void *opaque);


class CKafkaConsumer
{
public:
	rd_kafka_t *                         m_kafka_handle;  //kafka消息生产者句柄
	rd_kafka_topic_t *                   m_kafka_topic;   //kafka消息主题名称
	rd_kafka_conf_t *                    m_kafka_conf;    //kafka消息配置
	rd_kafka_topic_conf_t *              m_kafka_topic_conf;
	rd_kafka_topic_partition_list_t *    m_kafka_topic_partition_list;
	rd_kafka_queue_t *                   m_kafka_queue;

	consumer_callback                    m_consumer_callback; //消息回调函数
	void *                               m_consumer_callback_param; //消息回调函数的参数

public:
	CKafkaConsumer();
	~CKafkaConsumer();

	int init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb); //topic="my_test"; brokers="192.168.1.42:9092"; partitions="0,1,2"; groupId="my_group";
	int getMessage(); //从kafka服务器接收消息

	static void err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque);
	static void throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque);
	static void offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque);
	static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque);
	static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf);
	static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque);
};


#include "KafkaConsumer.h"


CKafkaConsumer::CKafkaConsumer()
{
	m_kafka_handle = NULL;
	m_kafka_topic = NULL;
	m_kafka_conf = NULL;
	m_kafka_topic_conf = NULL;
	m_kafka_topic_partition_list = NULL;
	m_kafka_queue = NULL;

	m_consumer_callback = NULL;
	m_consumer_callback_param = NULL;
}


CKafkaConsumer::~CKafkaConsumer()
{
	rd_kafka_flush(m_kafka_handle, 10*1000); //wait for max 10 seconds

	rd_kafka_queue_destroy(m_kafka_queue);
	rd_kafka_topic_destroy(m_kafka_topic);
	rd_kafka_destroy(m_kafka_handle);
	rd_kafka_topic_partition_list_destroy(m_kafka_topic_partition_list);
}


int CKafkaConsumer::init(char *topic, char *brokers, char *partitions, char *groupId, consumer_callback consumer_cb, void * param_cb)
{
	int ret = 0;
	rd_kafka_conf_res_t ret_conf = RD_KAFKA_CONF_OK;
	char errstr[512] = {0};

	if(topic == NULL){return -1;}
	if(brokers == NULL){return -1;}
	if(groupId == NULL){return -1;}

	m_consumer_callback = consumer_cb;
	m_consumer_callback_param = param_cb;

	m_kafka_conf = rd_kafka_conf_new();

	rd_kafka_conf_set_error_cb(m_kafka_conf, err_cb);
	rd_kafka_conf_set_throttle_cb(m_kafka_conf, throttle_cb);
	rd_kafka_conf_set_offset_commit_cb(m_kafka_conf, offset_commit_cb);
	rd_kafka_conf_set_stats_cb(m_kafka_conf, stats_cb);
	rd_kafka_conf_set_log_cb(m_kafka_conf, logger);

	//---------Consumer config-------------------
	ret_conf = rd_kafka_conf_set(m_kafka_conf, "queued.min.messages", "1000000", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 1; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	ret_conf = rd_kafka_conf_set(m_kafka_conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 2; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
	
//	ret_conf = rd_kafka_conf_set(m_kafka_conf, "group.id", groupId, errstr, sizeof(errstr));
//	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_conf_set() failed 3; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}
	
	//---------Kafka topic config-------------------
	m_kafka_topic_conf = rd_kafka_topic_conf_new();

	ret_conf = rd_kafka_topic_conf_set(m_kafka_topic_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
	if(ret_conf != RD_KAFKA_CONF_OK){printf("Error: rd_kafka_topic_conf_set() failed 4; ret_conf=%d; errstr:%s\n", ret_conf, errstr); return -1;}

	m_kafka_topic_partition_list = rd_kafka_topic_partition_list_new(1);

	//------------解析json字符串------------------------
	int cnt = 0;
	int len = strlen(partitions);
	char * pTemp = new char[len + 1];
	char * pTemp2 = pTemp;
	sprintf(pTemp, "%s", partitions); //partitions="0,1,2";

	while(*pTemp != '\0')
	{
		char * s = strstr(pTemp, ",");
		if(s != NULL)
		{
			*s = '\0';
		}

		int partition = atoi(pTemp);
		rd_kafka_topic_partition_list_add(m_kafka_topic_partition_list, topic, partition); //可以add一个以上的topic
		
		if(s != NULL)
		{
			pTemp = s + 1;
		}else
		{
			break;
		}
	}

	if(pTemp2){delete [] pTemp2; pTemp2 = NULL;}

	//---------Create Kafka handle-------------------
	m_kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, m_kafka_conf, errstr, sizeof(errstr));

	if(m_kafka_handle == NULL)
	{
		printf("Error: Failed to create Kafka producer: %s\n", errstr);
		return -1;
	}
	
	rd_kafka_poll_set_consumer(m_kafka_handle); //Redirect rd_kafka_poll() to consumer_poll()

	//---------Add broker(s)-------------------
	if(brokers && rd_kafka_brokers_add(m_kafka_handle, brokers) < 1)
	{
		printf("Error: No valid brokers specified\n");
		return -2;
	}

//	char * topic = m_kafka_topic_partition_list->elems[0].topic;
	int partition = m_kafka_topic_partition_list->elems[0].partition;
	int partition_cnt = m_kafka_topic_partition_list->cnt;

	m_kafka_topic = rd_kafka_topic_new(m_kafka_handle, topic, m_kafka_topic_conf); //Explicitly create topic to avoid per-msg lookups

	//-----------------------------------------
//	int64_t seek_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED
//	rd_kafka_resp_err_t err = rd_kafka_seek(m_kafka_topic, partition, seek_offset, 2000);
	
	m_kafka_queue = rd_kafka_queue_new(m_kafka_handle);

	return ret;
}


int CKafkaConsumer::getMessage()
{
	int ret = 0;

	char * topic = m_kafka_topic_partition_list->elems[0].topic;
	int partition = m_kafka_topic_partition_list->elems[0].partition;
	int partition_cnt = m_kafka_topic_partition_list->cnt;

	int64_t start_offset = RD_KAFKA_OFFSET_END; //RD_KAFKA_OFFSET_BEGINNING | RD_KAFKA_OFFSET_END | RD_KAFKA_OFFSET_STORED

	//------------从kafka服务器接收消息----------------
	for(int i = 0; i < partition_cnt; i++)
	{
		int partition = m_kafka_topic_partition_list->elems[i].partition;

		int r = rd_kafka_consume_start_queue(m_kafka_topic, partition, start_offset, m_kafka_queue);

		if(r == -1)
		{
			printf("Error: creating queue: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
			return -1;
		}
	}

	while(1)
	{
		int r = rd_kafka_consume_callback_queue(m_kafka_queue, 1000, msg_consume, this); //Queue mode
		if(r <= 0){rd_kafka_poll(m_kafka_handle, 1000); continue;}

		rd_kafka_poll(m_kafka_handle, 0); //Poll to handle stats callbacks

//		Sleep(1000);
//		break;
	}
	
	//----------Stop consuming------------------------------
	for(int i = 0; i < partition_cnt; i++)
	{
		int r = rd_kafka_consume_stop(m_kafka_topic, (int32_t)i);
		if(r == -1)
		{
			printf("Error: in consume_stop: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
		}
	}

	return ret;
}


void CKafkaConsumer::err_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
{
	printf("%% ERROR CALLBACK: %s: %s: %s\n", rd_kafka_name(rk), rd_kafka_err2str((rd_kafka_resp_err_t)err), reason);
}


void CKafkaConsumer::throttle_cb(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque)
{
	printf("%% THROTTLED %dms by %s (%"PRId32")\n", throttle_time_ms, broker_name, broker_id);
}


void CKafkaConsumer::offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque)
{
	int i;
	int verbosity = 1;

	if(err || verbosity >= 2)
	{
		printf("%% Offset commit of %d partition(s): %s\n", offsets->cnt, rd_kafka_err2str(err));
	}

	for(i = 0; i < offsets->cnt; i++)
	{
		rd_kafka_topic_partition_t * rktpar = &offsets->elems[i];

		if(rktpar->err || verbosity >= 2)
		{
			printf("%%  %s [%"PRId32"] @ %"PRId64": %s\n", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_err2str(err));
		}
	}
}


int CKafkaConsumer::stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
{
	printf("%s\n", json);
	return 0;
}


void CKafkaConsumer::logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
	fprintf(stdout, "RDKAFKA-%i-%s: %s: %s\n", level, fac, rd_kafka_name(rk), buf);
}


void CKafkaConsumer::msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
	CKafkaConsumer * p = (CKafkaConsumer *)opaque;
	
	if(p && p->m_consumer_callback)
	{
		p->m_consumer_callback(rkmessage, p->m_consumer_callback_param);
		return;
	}

	if(rkmessage->err)
	{
		if(rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
		{
			printf("[INFO] Consumer reached end of %s [%"PRId32"] message queue at offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset);
			return;
		}

		printf("Error: Consume error for topic \"%s\" [%"PRId32"] offset %"PRId64": %s\n", rkmessage->rkt ? rd_kafka_topic_name(rkmessage->rkt) : "", rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage));

		return;
	}
	
	if(rkmessage->key_len)
	{
		printf("Key: %d: %s\n", (int)rkmessage->key_len, (char *)rkmessage->key);
	}

	printf("%d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}

// consumer main.cpp : 定义控制台应用程序的入口点。
//

#include "KafkaConsumer.h"


static void msg_consume(rd_kafka_message_t *rkmessage, void *opaque)
{
	printf("[MSG] %d: %s\n", (int)rkmessage->len, (char *)rkmessage->payload);
}


int main(int argc, char *argv[])
{
	CKafkaConsumer kc;

	char topic[] = "test";
	char brokers[] = "192.168.2.73:9092";
	char partitions[] = "0";
	char groupId[] = "my_group1";
	consumer_callback consumer_cb = msg_consume; //注册消息回调函数,用户可以自定义此函数
	void * param_cb = NULL; //param_cb=this;
	int ret = 0;

	ret = kc.init(topic, brokers, partitions, groupId, consumer_cb, param_cb);
	if(ret != 0){printf("Error: kc.init(): ret=%d;\n", ret); return 0;}

	ret = kc.getMessage(); //从kafka服务器接收消息
	if(ret != 0){printf("Error: kc.getMessage(): ret=%d;\n", ret); return 0;}

	return 0;
}

-----------------------测试结果-------------------------------



工程源码链接:http://download.csdn.net/download/jfu22/10014222

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐