kafka不丢失数据与不重复消费数据
一、不丢失数据1.生产者数据不丢失同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待)。leader partition挂了,数据就会丢失。解决:设置为-1保证produce写入所有副本算成功 producer.type=sync request.required.acks=-1异步模式,当缓冲区满了,如果配置为0(没有收到确认,一...
一、不丢失数据
1.生产者数据不丢失
同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待)。leader partition挂了,数据就会丢失。
解决:设置为-1保证produce写入所有副本算成功
producer.type=sync
request.required.acks=-1
异步模式,当缓冲区满了,如果配置为0(没有收到确认,一满就丢弃),数据立刻丢弃
解决:不限制阻塞超时时间。就是一满生产者就阻塞
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
2.消费者数据不丢失
在获取到kafka的消息后正准备入库(未入库),但是消费者挂了,那么如果让kafka自动去维护offset,它就会认为这条数据已经被消费了,那么会造成数据丢失。
解决:
使用kafka的高级API,自己手动维护偏移量,当数据入库后进行偏移量的更新。(适用于基本数据源)
流计算,基本数据源不适用。高级数据源以kafka为例,由2种方式:receiver(开启WAL,失败可恢复)和director(checkpoint保证)
流处理中的几种可靠性语义:
1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;
3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。
但是开启WAL后,依旧存在数据丢失问题,即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]
WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。我们可以调用StreamingContext的stop方法,其原型如下:
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit
可以如下使用:
sys.addShutdownHook({
ssc.stop(true,true)
)})
WAL带来的问题
WAL实现的是At-least-once语义。如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,因为需要把数据写入到可靠的外部系统,这会牺牲系统的整个吞吐量。
Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程。(建议使用)
3… 若是storm在消费,开启storm的ackfail机制;若不是storm,数据处理完更新offset,低级API手动控制offset
4. Kafka发送数据过快,导致服务器网卡流量暴增。或磁盘过忙,出现丢包。
1》 首先,对kafka进行限速,
2》 其次启用重试机制,使重试间隔变长。
3》 Kafka设置ack=all,即需要处于ISR(副本列表)的分区都确认,才算发送成功。 rops.put(“compression.type”, “gzip”);
props.put(“linger.ms”, “50”);
props.put(“acks”, “all”)表示至少成功发送一次;
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put(“retry.backoff.ms”, 20000)
5.消费者速度很慢,导致一个session周期(0.1版本是默认30s)内未完成消费。导致心跳机制检测报告出问题。
导致消费了的数据未及时提交offset.配置由可能是自动提交
问题场景:1.offset为自动提交,正在消费数据,kill消费者线程,下次重复消费
2.设置自动提交,关闭kafka,close之前,调用consumer.unsubscribed()则由可能部分offset没有提交。
3.消费程序和业务逻辑在一个线程,导致offset提交超时
二、不重复消费数据
当第一个消费者进行消费kafka中的数据的时候,已经消费完成,由于某种原因突然宕掉,但是此时消费者的offset还未来得及更新,此时重新启动消费者便会重复消费数据。
解决:
1.幂等操作,重复消费不会产生问题
2.事务
dstream.foreachRDD {(rdd, time) =
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds,partitionId) //将uniqueID存入数据库中
//use this uniqueId to transationally commit the data in partitionIterator
对每个partitionID,产生一个uniqueID,.只有这个partition的数据被完全消费,才算成功,否则失败回滚。下次若重复执行,就skip
更多推荐
所有评论(0)