生产者生产消息概述

从创建一个ProducerRecord对象开始,此对象包含目标主题和要发送的内容,还可以指定键和分区。在发送ProducerRecord对象时,生产者首先要把键和值对象序列化成字节数组,这样才可以在网络上传输。

数据传送给分区器,如果数据指定了分区则使用此分区;如果没有指定分区,则通过ProducerRecord的键通过一定的算法来选择一个分区。这样生产者就知道将此消息往哪个主题的哪个分区发送了。

这条消息会被添加到一个记录批次里,这个批次里的所有消息会被发送到相同主题的同一分区。此时的消息是被放到一个缓冲区,然后有一个独立的线程负责把这些记录发送到相应的broker上

broker在收到这些消息时会返回一个响应。如果消息成功写入kafka,在返回的对象中会包含主题,分区信息以及记录在分区里的偏移量。如果写入消息失败,则返回错误,会根据错误类型和我们的配置,自动选择重试还是直接抛出异常。

创建生产者

要往kafka写入消息,首先要创建一个生产者对象,并设置一些属性。其中有三个属性是必选的:

  • bootstrap.servers:该属性指定broker的地址清单,地址格式:host:port,如果有多个地址用逗号隔开。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker信息。不过建议至少要提供两个broker的信息,防止一个宕机了,生产者仍然可以连接到集群。
  • key.serializer:broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把java对象作为键和值发送给broker。不过生产者需要知道如何把这些java对象转换成字节数组。生产者就是根据此参数配置的值来把键转换成字节数组的。
  • value.serializer:与key.serializer类似,value.serializer指定的类会将值序列化。

发送消息方式

  • 同步发送
    我们使用send()方法发送消息,它会返回一个future对象,调用get()方法进行等待,就可以知道消息是否发送成功。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。如果发送数据之前或者发送过程中发生了任何错误,比如broker返回了一个不允许重试的异常或者已经超过了重试次数,那么就会抛出异常。我们只是简单的把异常信息打印出来。
  • 异步发送
    我们使用send()方法发送消息,并指定一个回调函数,服务器在返回响应时调用该函数。
  • 发送并忘记
    把消息发送给服务器,但是并不关心它是否正常到达。

生产者配置

  • acks:指定必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对于消息丢失的可能性有重要影响,可取值:
    • 0:生产者在成功写入消息之前不会等待任何来自服务器的响应
    • 1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。这种方式有风险,如果首领节点服务器崩溃并且此时消息还未复制到跟随者副本,此时如果这个跟随者副本被选举为了新首领,消息就会丢失。
    • all:只有当所有的同步副本全部接收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器崩溃,整个集群仍然可以运行。不过它的延迟也最高,因为要等待不只一个服务器节点接收到消息。
  • buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲将要发送到服务器的消息
      如果应用程序发送消息的速度超过了发送到服务器的速度,就会导致生产者空间不足。这个时候send()方法调用要么阻塞,要么抛出异常,取决于如何设置max.block.ms参数,标识在抛出异常之前可以阻塞一段时间。
  • compression.type:指定了消息被发送服务器之前使用何种压缩方式。默认情况下,消息发送是不压缩的,该参数可以设置为:snappy,gzip和lz4,
  • retries:此参数的值决定了生产者可以重发消息的次数,如果达到了这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms来设置。
  • batch.size:该参数指定了一个批次可以使用的内存的大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。
  • linger.ms: 该参数制定了生产者在发送批次之前等待更多消息加入批次的时间
  • client.id:该参数可以是任意字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
  • max.in.flight.requests.per.connection:该参数指定了生产者在收到服务器响应之前可以发送多个消息。
    它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置成1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
  • timeout.ms,request.timeout.ms和metadata.fetch.timeout.ms
    • timeout.ms:指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配。如果在指定的时间内没有收到同步副本的确认,这broker就会返回一个错误。
    • request.timeout.ms:指定了生产者在发送数据时等待服务器返回响应时间。
    • metadata.fetch.timeout.ms:指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。
  • max.block.ms:该参数指定了在调用seed()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。
  • max.request.size:该参数用于控制生产者发送的请求大小。
    它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。注意此值与broker配置中可接收消息的最大值(message.max.bytes)相匹配,避免被broker拒绝。
      例如:假设此值为1MB,那么可以发送的单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息的大小为1kb。

生产者的可靠性保证

为了保证kafka生产者在使用时的可靠性,我们需要从两个方面去考虑:

  • 根据可靠性需求配置恰当的acks值
  • 在参数配置和代码里正确的处理错误

发送确认

所谓发送确认就是给acks参数设置恰当的值,上面讲解生产者配置时有详细接到,这里不在赘述。

配置生产者者的重试参数

生产者需要处理的错误有两部分:一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果broker返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。这句话的意思是broker返回的错误有些是可以重试的(例如:LEADER_NOT_AVAILABLE)被称为可重试错误;有些是不可以重试的(例如:INVALID_CONFIG)被称为不可重试错误。

一般情况,在遇到可重试错误时最好让生产者自动重试,这样可以大大减少开发者的关注成本。但是至于为生产者配置多少重试次数比较好,这个要根据你对放弃重试后的处理逻辑来定,没有统一的标准。要注意的是,重试可能带来的风险,例如:生产者因为网络问题没有收到broker的确认,但实际上消息已经写入成功,生产者会认为是网络临时故障,就重试发送该消息(因为他不知道消息已经成功写入)。这种情况下,broker就会收到两个相同的消息。重试和恰当的错误处理可以保证每个消息“至少被保存一次”,但目前的kafka版本还没发保证每个消息“只被保存一次”。这种重复的消息只能在消费者消费消息时做到消息的幂等处理。

代码中的异常处理

生产者内置的重试机制虽然可以处理大部分的错误,但是有些类型的错误还是需要开发者手动处理:

  • 不可重试的broker错误,例如:消息大小错误,认证错误
  • 消息发送之前发生的错误,例如:序列化的错误
  • 达到重试次数上限或者消息占用内存达到上限时的错误

但是至于怎么处理这类的错误,那还是要看我们的业务架构了,可以直接丢弃,也可以输出一个日志,也可以调用其他系统,也可以把消息保存到本地磁盘上。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐