用户不仅能通过命令行的形式操作Kafka服务,Kafka还提供了许多编程语言的客户端工具,用户在开发独立项目时,通过调用Kafka API 来操作Kafka集群,其核心API主要有以下5种。

(1)Producer API:构建应用程序发送数据流到Kafka集群中的主题。(2)Consumer API:构建应用程序从Kafka集群中的主题读取数据流。(3)StreamsAPI:构建流处理程序的库,能够处理流式数据。

(4)Connect API:实现连接器,用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。

(5)AdminClientAPI:构建集群管理工具,查看Kafka集群组件信息。

Kafka作为流数据处理平台,本身功能强大,技术难度较高,有兴趣的读者可以通过官网深入学习。本章将介绍常用 Producer API 以及 Consumer API 来辅助学习 Spark实时计算框架。

在开发生产者客户端时,ProducerAPI提供了KafkaProducer类,该类的实例化对象用来代表一个生产者进程,生产者发送消息时,并不是直接发送给服务端,而是先在客户端中把消息存入队列中,然后由一个发送线程从队列中消费消息,并以批量的方式发送消息给服务端,关于KafkaProducer类常用的方法如下图所示。

生产者客户端用来向Kafka集群中发送消息,消费者客户端则是从Kafka集群中消费消息。作为分布式消息系统,Kafka支支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同节点的不同分区上,消费费者也可以消费集群中多个节点的多个分区上的消息,

消费者应用程序是由KafkaConsumer 对象代表的一个消费者客户端进程,KafkaConsumer

类常用的方法如下图所示。

 接下来,以实例演示的分式介绍Kafka的Java API操作方式。

1、创建工程,添加以下依赖:

创建一个Maven工程,在pom.xml文件中添加Kafka依赖包。

<dependencies>

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>2.0.0</version>

    </dependency>

</dependencies>

 

2、编写生产者客户端,代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProduceTest {

    public static void main(String[] args) {

        Properties props = new Properties();

        //指定Kafka集群的IP地址和端口号

        props.put("bootstrap.servers",

                "master:9092,slave1:9092,slave2:9092");

        //指定等待所有副本节点的应答

        props.put("acks","all");

        //指定消息发送最大尝试次数

        props.put("retries",0);

        //指定一批消息处理大小

        props.put("batch.size",16384);

        //指定请求延时

        props.put("linger.ms",1);

        //指定缓存区内存大小

        props.put("buffer.memory",33554432);

        //设置key序列化

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //设置value序列化

        props.put("value.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        //生产数据

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        for (int i =0; i < 50; i++){

            producer.send(new ProducerRecord<String, String>

                    ("whhtopic",Integer.toString(i),"hello world-" + i));

        }

            producer.close();

    }

}

(1)bootstrap.servers:设置Kafka集群的IP地址和端口号。

2)acks:消息确认机制,该值设置为all,这种策略会保证只要有一个备份存活就不会丢失数据,这种方案是最安全可靠的,但同时效率也会降低。

(3)retries:如果当前请求失败,则生产者可以自动重新连接,但是设置retries=0参数,则意味请求失败不会重复连接,这样可以避免消息重复发送的可能。

(4)batch.size:生产者为每个分区维护了未发送数据的内存缓冲区,该缓冲区设置的越大,吞吐量和效率就越高,但也会浪费更多的内存。

(5)linger.ms:指定请求延时,意味着如果在缓冲区没有被填满的情况下,会增加1ms的延迟,等待更多的数据进入缓冲区从而增加内存利用率。在默认情况下,即使缓冲区中有其他未使用的空间,也可以立即发送缓冲区。

(6)buffer.memory:指定缓冲区大小。

(7)key.serializer、value.serializer:数据在网络中传输需要进行序列化。

第27~32行代码,作用是模拟消息源,向名为itcasttopic的主题中发送消息数据。向 Kafka集群发送消息数据时,只需要调用KafkaProducer类的send(()方法,该方法是异步的,调用时,它会将消息数据添加到待处理消息数据发送的缓冲区中,最终以批处理的方式处理消息数据,从而提高效率。send()方法中有3个参数,第1个参数是指定发送主题,第2个参数是设置消息的Key,第3个参数是消息的Value。

3、编写消费者客户端

        通过Kafka API 创建KafkaConsumer对象,用来消费Kafka集群中名为whhtopic主题的消息数据。在工程下创建KafkaConsumerTest.java文件,代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProduceTest {

    public static void main(String[] args) {

        Properties props = new Properties();

        //指定Kafka集群的IP地址和端口号

        props.put("bootstrap.servers",

                "master:9092,slave1:9092,slave2:9092");

        //指定等待所有副本节点的应答

        props.put("acks","all");

        //指定消息发送最大尝试次数

        props.put("retries",0);

        //指定一批消息处理大小

        props.put("batch.size",16384);

        //指定请求延时

        props.put("linger.ms",1);

        //指定缓存区内存大小

        props.put("buffer.memory",33554432);

        //设置key序列化

        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //设置value序列化

        props.put("value.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        //生产数据

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        for (int i =0; i < 50; i++){

            producer.send(new ProducerRecord<String, String>

                    ("whhtopic",Integer.toString(i),"hello world-" + i));

        }

            producer.close();

    }

}

生产者客户端和消费者客户端编写完之后,就先运行生产者的代码,再运行消费者代码,此时消费者客户端并没有数据,需要再次运行生产者,运行结果如下:

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐