一、不丢失数据
1.生产者数据不丢失
同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待)。leader partition挂了,数据就会丢失。
解决:设置为-1保证produce写入所有副本算成功
          producer.type=sync
          request.required.acks=-1
异步模式,当缓冲区满了,如果配置为0(没有收到确认,一满就丢弃),数据立刻丢弃
解决:不限制阻塞超时时间。就是一满生产者就阻塞
          producer.type=async
          request.required.acks=1
          queue.buffering.max.ms=5000
          queue.buffering.max.messages=10000
          queue.enqueue.timeout.ms = -1
          batch.num.messages=200
2.消费者数据不丢失
在获取到kafka的消息后正准备入库(未入库),但是消费者挂了,那么如果让kafka自动去维护offset,它就会认为这条数据已经被消费了,那么会造成数据丢失。
解决:
使用kafka的高级API,自己手动维护偏移量,当数据入库后进行偏移量的更新。(适用于基本数据源)

流计算,基本数据源不适用。高级数据源以kafka为例,由2种方式:receiver(开启WAL,失败可恢复)和director(checkpoint保证)
流处理中的几种可靠性语义:
  1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
  2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;
  3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。
但是开启WAL后,依旧存在数据丢失问题,即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:

ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]
WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.

在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。我们可以调用StreamingContext的stop方法,其原型如下:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

可以如下使用:

sys.addShutdownHook({
  ssc.stop(true,true)
)})

WAL带来的问题
WAL实现的是At-least-once语义。如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,因为需要把数据写入到可靠的外部系统,这会牺牲系统的整个吞吐量。
 Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程。(建议使用)
 
3… 若是storm在消费,开启storm的ackfail机制;若不是storm,数据处理完更新offset,低级API手动控制offset
4. Kafka发送数据过快,导致服务器网卡流量暴增。或磁盘过忙,出现丢包。
      1》 首先,对kafka进行限速,
      2》 其次启用重试机制,使重试间隔变长。
      3》 Kafka设置ack=all,即需要处于ISR(副本列表)的分区都确认,才算发送成功。 rops.put(“compression.type”, “gzip”);
       props.put(“linger.ms”, “50”);
       props.put(“acks”, “all”)表示至少成功发送一次;
       props.put("retries ", 30);
       props.put("reconnect.backoff.ms ", 20000);
       props.put(“retry.backoff.ms”, 20000)

5.消费者速度很慢,导致一个session周期(0.1版本是默认30s)内未完成消费。导致心跳机制检测报告出问题。

导致消费了的数据未及时提交offset.配置由可能是自动提交

问题场景:1.offset为自动提交,正在消费数据,kill消费者线程,下次重复消费

2.设置自动提交,关闭kafka,close之前,调用consumer.unsubscribed()则由可能部分offset没有提交。

3.消费程序和业务逻辑在一个线程,导致offset提交超时
      
二、不重复消费数据

当第一个消费者进行消费kafka中的数据的时候,已经消费完成,由于某种原因突然宕掉,但是此时消费者的offset还未来得及更新,此时重新启动消费者便会重复消费数据。
解决:
1.幂等操作,重复消费不会产生问题
2.事务

 dstream.foreachRDD {(rdd, time) =

  rdd.foreachPartition { partitionIterator =>

    val partitionId = TaskContext.get.partitionId()

    val uniqueId = generateUniqueId(time.milliseconds,partitionId) //将uniqueID存入数据库中

    //use this uniqueId to transationally commit the data in partitionIterator

对每个partitionID,产生一个uniqueID,.只有这个partition的数据被完全消费,才算成功,否则失败回滚。下次若重复执行,就skip

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐