librdkafka poll等问题
文章目录1. librdkafka中producer的poll是干什么用的?2. 不调用poll会有什么后果?3. linger.ms 、batch.num.messages干什么用?1. librdkafka中producer的poll是干什么用的?由于producer是异步调用,消息是否发送成功,是通过回调而得知结果。poll就是调用回调函数用的,回调函数就是 dr_cb函数。poll函数的参
文章目录
1. librdkafka中producer的poll是干什么用的?
由于producer是异步调用,消息是否发送成功,是通过回调而得知结果。poll就是调用回调函数用的,回调函数就是 dr_cb 函数。
poll函数的参数:
0:非阻塞调用,立马返回;
n: 阻塞n 毫秒,期间成功发送的消息会调到dr_cb函数;
-1:一直阻塞等到消息为止。(例如linger.ms=5秒,batch.num.messages=1000,那么要么producer后达到5秒时间,要么producer生成了1000条数据,否则poll会一直阻塞)
2. 不调用poll会有什么后果?
导致事件队列满,生产者写数据失败,事件队列满后报错如下:
Produce failed: Local: Queue full
3. linger.ms 、batch.num.messages干什么用?
linger.ms = queue.buffering.max.ms
两者都是一个等待时间,等待什么呢?调用producer后,数据不会立马等上前往broker的列车,而是先等待,到达一定时间发车,或者达到一定消息数量发车。就像街上的黑车,他不会一直等着,但人少也不发车。
linger.ms就是他的等待时间;
batch.num.messages就是等待人数。
kafka比黑车控制的更精确,它还能根据体重发车(batch.size等参数)。
4. 打印librdkafka的默认配置参数
打印全局配置和topic配置
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
int pass;
for (pass = 0 ; pass < 2 ; pass++)
{
std::list<std::string> *dump;
if (pass == 0) {
dump = conf->dump();
std::cout << "# Global config" << std::endl;
} else {
dump = tconf->dump();
std::cout << "# Topic config" << std::endl;
}
for (std::list<std::string>::iterator it = dump->begin();it != dump->end(); )
{
std::cout << *it << " = ";
it++;
std::cout << *it << std::endl;
it++;
}
std::cout << std::endl;
}
5.一次librakafak写kafka效率问题排查历程
5.1 发现程序循环调用produce 1000次,获取发送条数后,每条调用一次poll,每次调用阻塞1毫秒,导致每发送成功1000数据阻塞1秒。
5.2发现此问题后改为每发送1000条数据后调用一次poll,但又有了新问题。poll调用次数太少,阻塞时间太短,导致报错 Produce failed: Local: Queue full
。
5.3 查询Queue full的解决办法,github的issues列表中有一条同样的问题: librdkafka issues。根据作者的建议,加统计回调stats_cb(这个回调函数其实就包含在event_cb中,根据事件类型区分,害我找半天),加参数 statistics.interval.ms
,否则默认统计信息是关闭的。
5.4 打印statistic信息后,可以比较清楚看到待发送队列中数据的bytes,条数等信息。根据打印信息,调节poll为10ms, 调大queue.buffering.max.kbytes 为2048000后,错误消失。
5.5 Local Queue full 错误再现 ,poll调整为50ms, linger.ms调整为100ms, queue.buffering.max.messages 调整为100万, 并在消息投递错误时重试两次。目前未见错误。
参数设置如下:
queue.buffering.max.ms = 100
queue.buffering.max.messages = 1000000
queue.buffering.max.kbytes = 2000000
更多推荐
所有评论(0)