kafka生产者和消费者(java示例)
kafka笔记1--集群搭建:https://blog.csdn.net/u010452388/article/details/84674454本文主要讲解kafka生产者和消费者的API使用,以及部分配置说明目录一 引入依赖二 生产者2.1 代码2.2 生产者配置说明2.3结果-生产者三 消费者3.1 代码3.2 消费者配置说明3.3 结果-消费者...
kafka笔记1--集群搭建:https://blog.csdn.net/u010452388/article/details/84674454
本文主要讲解kafka生产者和消费者的API使用,以及部分配置说明
目录
一 引入依赖
这里引入客户端依赖的时候尽量保持与服务端版本一致,不然会出现奇怪的错误,看下服务端版本
从上面的图可以看出,服务端版本为1.1.0版本,前面的2.12是Scala版本,所以客户端引入下面的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
下图为生产者和消费者的工程结构图
二 生产者
2.1 代码
这里的生产者采用的是每隔1秒钟发送一条消息,总共发送19条
public class KafkaProducerDemo {
public static Properties kafkaProperties() {
Properties properties = new Properties();
/*设置集群kafka的ip地址和端口号*/
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.2.231:9092,192.168.2.232:9092");
/*发送的消息需要leader确认*/
properties.put(ProducerConfig.ACKS_CONFIG, "1");
/*用户id*/
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
/*对key进行序列化*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerSerializer");
/*对value进行序列化*/
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
public static void main(String[] args) throws InterruptedException {
/*创建一个kafka生产者*/
KafkaProducer<Integer, String> kafkaProducer =
new KafkaProducer<Integer, String>(kafkaProperties());
/*主题*/
String topic = "test";
/*循环发送数据*/
for (int i = 0; i < 20; i++) {
/*发送的消息*/
String message = "我是一条信息" + i;
/*发出消息*/
kafkaProducer.send(new ProducerRecord<>(topic, message));
System.out.println(message + "->已发送");
Thread.sleep(1000);
}
}
}
2.2 配置-生产者
官网生产者配置详情:http://kafka.apache.org/11/documentation.html#producerconfigs
下面是根据官网的配置进行翻译的,如果有翻译的不对的,可以留言沟通
属性名 | 描述 | 类型 | 默认值 | 有效值 | 重要程度 |
---|---|---|---|---|---|
key.serializer | 为key序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer 这个接口 | class | 高 | ||
value.serializer | 为value序列化的类,这个类要实现org.apache.kafka.common.serialization.Serializer 这个接口 | class | 高 | ||
acks | 在请求完成之前,生产者要求kafka集群的leader已经接收的确认数,这个可以控制发送记录的持久化能力(我们可以认为持久能力越高,也就是数据丢失率就越低)
| string | 1 | [all,-1,0,1] | 高 |
bootstrap.servers | 主机IP/端口成对的一组列表,用来与kafka集群建立初始化连接。这个列表的的形式为:host1:port1,host2:port2,...。因为这些服务端列表只是用来初始化连接并用来发现集群的成员的(可以动态的改变),所以这些列表不是必须包含所有服务器的集合(你应该超过1个,因为,一个服务器有可能宕机) | list | "" | 高 | |
buffer.memory | 总的缓存大小(单位:字节),生产者用来缓存发送的记录,此记录会等待被发送到服务端。如果生产的记录速度超过发送到服务端的速度,生产者可能会阻塞,阻塞的时间超过max.block.ms配置的值的话,就会抛出异常。 这个设置应该大致的符合生产者需要使用的总缓存空间,但不是一个硬性限制,因为不是所有的缓存空间都是给生产者用作缓存。一些额外的空间既要被用作压缩(如果compression是enabled的话)也要维持快速的请求。 | long | 33554432 | [0,...] | 高 |
compression.type | 为生产者数据提供的压缩类型。默认是none(也就是没有压缩)。有效值为none,gzip,snappy,or lz4.压缩是针对数据的批量压缩的,所以批量数据的效率将会影响压缩的效率(批量数据越多意味着越好的压缩) | string | none | 高 | |
retries | 设置为大于0的值的时候,如果客户端发送任何记录遇到临时的错误的话,客户端会重新发送。没有将max.in.flight.requests.per.connection配置成1的话,重新发送可能潜在的改变记录的发送顺序,因为如果两批记录发送到单一的分区(partition),第一批失败了,正在重新发送,但是第二批成功了,那么第二批的记录有可能出现在前面 | int | 0 | [0,...,2147483647] | 高 |
ssl.key.password | key存储文件的私钥密码。对于客户端来说,可选参数 | password | null | 高 | |
ssl.keystore.loca tion | key存储文件的位置。对于客户端来说,可选参数,以及可以被用作对客户端的双向认证 | string | null | 高 | |
ssl.keystore.pass word | key存储文件的存储密码。对客户端来说,可选参数,只有ssl.keystore.location配置了才需要此参数 | password | null | 高 | |
ssl.truststore.loc ation | 信任存储文件的位置 | string | null | 高 | |
ssl.truststore.pass word | 信任存储文件的密码,如果一个密码没有被设置可访问信任存储,这个密码也是有效的,但是完整性的检查是无效的 | password | null | 高 | |
batch.size | 每当有多个记录要发送到同样的分区的时候,生产者将尝试将记录批处理到一起以至于减少请求。这有助于客户端和服务端的性能。这个配置默认单位是字节。 记录大于这个大小的话,不会尝试去批处理。 小批量处理不怎么常见,并有可能减少吞吐量(批处理大小为0将禁止使用批处理)。 一个非常大的批量大小会使用很多内存,会造成浪费,在预计额外记录的情况下,因为我们总是分配指定的缓冲大小 | int | 16384 | [0,...] | 中 |
1.acks(默认为1)
"0":消息发送给broker以后,不需要确认(性能高,但是会出现数据丢失)
"1":只需要获得kafka集群中leader节点的确认即可返回(leader/follower)
all或者"-1" 需要ISR中的所有Replica(副本)进行确认(需要集群中所有节点确认)
2.batch.size(默认16KB) 调优的重要参数
producer对于同一个分区来说,会按照batch.size的大小进行统一收集,然后批量发送
就是说我们如果发送的消息,不会直接发出去,等达到batch.size之后,再发出去
3.linger.ms(默认0毫秒)
延迟发送,假如设置的值是1000的话,就是每隔1秒钟积累之前的信息,然后再发送
4.max.request.size(默认1M)
消息最大发送的字节数,超过默认值1M的话,就会拒绝,抛异常
2.3 结果展示
发送的结果如下图:
三 消费者
3.1 代码
public class KafkaConsumerDemo extends Thread {
private final KafkaConsumer<Integer,String> consumer;
public KafkaConsumerDemo(String topic) {
Properties props = new Properties();
/*设置集群kafka的ip地址和端口号*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.2.231:9092,192.168.2.232:9092");
/*设置消费者的group.id*/
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo1");
/*消费信息以后自动提交*/
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
/*控制消费者提交的频率*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
/*key反序列化*/
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
/*value反序列化*/
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
/*创建kafka消费者对象*/
consumer = new KafkaConsumer<Integer, String>(props);
/*订阅主题*/
consumer.subscribe(Collections.singleton(topic));
}
@Override
public void run() {
while (true) {
/*每隔一段时间到服务器拉取数据*/
ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord record : consumerRecords) {
System.out.println(record.value());
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("test").start();
}
}
3.2 配置说明
1.group.id
同一个gropu.id中的消费者,只能有一个消费者可以消费到信息
但是不同的group.id都会去消费消息(消息是持久化的)
2.enable.auto.commit
如果位true的话,消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到
3.auto.commit.interval.ms
控制消费者提交的频率,默认单位是毫秒,一般配合enable.auto.commit
4.auto.offset.reset
这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
消费指定的 topic 时,对于该参数的配置,会有不同的语义
auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的offset 处开始消费 Topic 下的消息
auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费
auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。
5.max.poll.records
设置限制每次消费者poll返回的消息数,通过调整此值,可以减少poll的间隔
官网消费者配置详情:http://kafka.apache.org/11/documentation.html#consumerconfigs
3.3 结果展示
消费的结果如下图:
更多推荐
所有评论(0)