(一)安装zookeeper(windows)

kafka需要用到zookeeper,所以需要先安装zookeeper

1.到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/

2.解压到你喜欢的路径,我这里为:E:\zookeeper\zookeeper-3.4.10

3.复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper

4.修改系统环境变量,在Path后添加    ;E:\zookeeper\zookeeper-3.4.10\bin

5.运行cmd命令窗口,输入zkServer回车,出现下图的就表示zookeeper启动成功,也表明安装成功了。


安装zookeeper(Linux)

1. Xshell等工具连接Linux服务器,切换到任意目录,下载zookeeper最新稳定版,下载地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下

cd /usr/soft

wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz

2.解压

tar -xzvf zookeeper-3.4.10.tar.gz

3.切换到conf配置文件目录,复制zoo_sample.cfg为zoo.cfg可以按需修改配置文件内容

4.切换到bin目录,启动zookeeper,看到Starting zookeeper ... STARTED字样表示启动成功了

./zkServer.sh start


(二)安装kafka(windows)

1. 到官网下载最新版kafka,http://kafka.apache.org/downloads

2.解压到你喜欢的路径,我这里解压路径为:E:\kafka_2.12-0.10.2.0

3.修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka

4.添加系统环境变量,在Path后添加   ;E:\kafka_2.12-0.10.2.0\bin\windows

5.启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令

 .\bin\windows\kafka-server-start.bat .\config\server.properties

出现started (kafka.server.KafkaServer)字样表示启动成功


启动时若出现“wvim不是内部或外部命令...”错误提示,则需要在系统Path环境变量后添加
;C:\Windows\System32\wbem
6.运行cmd命令行,创建一个topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

7.再打开一个cmd,创建一个Producer

kafka-console-producer.bat --broker-list localhost:9092 --topic test

8.再打开一个cmd,创建一个Customer

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

9.在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功



安装kafka(Linux)

1.下载kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

2.解压,文件夹重命名

tar -xzvf kafka_2.12-0.10.2.0.tgz

mv kafka_2.12-0.10.2.0 kafka

3.切换目录到kafka目录下的bin目录,用vi命令修改kafka-server-start.sh中jvm内存大小,把

export KAFKA_HEAP_OPTS="-Xms1G -Xms1G" 修改为
export KAFKA_HEAP_OPTS="-Xms256M -Xms128M",当然如果你的内存够大可以不修改


4.切换到kafka根目录,启动kafka,启动成功如下图

bin/kafka-server-start.sh config/server.properties


5.创建topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic,只有一个副本,一个分区。
通过list命令查看刚刚创建的topic
bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181

6.启动producer并发送消息启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动之后就可以发送消息了

按Ctrl+C退出发送消息

7.启动consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
启动consumer之后就可以在console中看到producer发送的消息了
可以开启两个终端,一个发送消息,一个接受消息。


(三)kafka编程之Java接口

1.新建Maven工程,我这里用的是Eclipse;pom加入kafka依赖,如下:

<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka_2.11</artifactId>
    	<version>0.10.2.0</version>
</dependency>

2.新建生产测试类TestProducer.java

import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
        //“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
         props.put("acks", "all");
         //如果请求失败,生产者也会自动重试,即使设置成0 the producer can automatically retry.
         props.put("retries", 0);
         //The producer maintains buffers of unsent records for each partition. 
         props.put("batch.size", 16384);
         //默认立即发送,这里这是延时毫秒数
         props.put("linger.ms", 1);
         //生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
         props.put("buffer.memory", 33554432);
         //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         //创建kafka的生产者类
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         //生产者的主要方法
         // close();//Close this producer.
         //   close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
         //  flush() ;所有缓存记录被立刻发送
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i)));
             producer.close();
    }
}


3.新建消费测试类TestCustomer.java

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        System.out.println("this is the group part test 1");
        //消费者的组id
        props.put("group.id", "GroupA");//这里是GroupA或者GroupB
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        //从poll(拉)的回话处理时长
        props.put("session.timeout.ms", "30000");
        //poll的数量限制
        //props.put("max.poll.records", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题列表topic
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                // 正常这里应该使用线程池处理,不应该在这里处理
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");

        }
    }

}

4.先运行(run/debug)TestCustomer再运行TestProducer,在TestCustomer的控制台看到下图的结果就表示消息发送并接收成功了



并且在之前启动的消费端的命令窗口也能看到接收到的数据:



dazu表示kakazhj

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐