一、下载安装
kafka官网:http://kafka.apache.org/intro
(可选)看下官网的introduction
选择get started-quick start ,按照步骤来(官网的例子不是每个版本都能用,所以我这里稍作修改)
在这里插入图片描述

1.下载
tar -xzf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0
注:还需要有jdk环境
jdk下载:https://www.oracle.com/java/technologies/downloads/,
选择XXX-linux-XX.tar.gz的包下载
安装jdk
(1)tar -zxf 解压文件(压缩为gzip)
(2)配置环境变量,修改.bash_profile
ll -a 查看隐藏文件夹
修改命令:vim -> i ->esc ->shift + : ->wq/q!
在文件后添加

export JAVA_HOME=/root/jdk/jdk1.8.0_311
export PATH=$JAVA_HOME/bin:$PATH

(3)source .bash_profile使配置文件生效
(4)java -version测试安装是否成功

2.启动环境
注:高版本kafka内置zookeeper
cd 到安装目录下比如我的是/root/kafka/kafka_2.13-3.0.0
启动zookeeper端:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动kafka端:

bin/kafka-server-start.sh -daemon config/server.properties

lsof -i:2181 --检查zookeeper是否启动
lsof -i:9092 --检查kafka是否启动
–俩端口都在config/server.properties配置,这里是默认值
–若启动不成功,可查看logs/server.log查看错误原因

lsof命令还没安装的,可以通过下面命令安装一下。

curl -o /etc/yum.repos.d/CentOS-Base.repo https://www.xmpan.com/Centos-6-Vault-Aliyun.repo
yum install lsof

3.创建Topic (kafka端)
具体语法查看(10)

bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server 192.168.124.128:9092

若报错,需要配置config/server.properties的listeners host_name填你自己的ip
监听端口指定 listeners=PLAINTEXT://192.168.124.128:9092
对外部暴露端口 advertised.listeners=PLAINTEXT://192.168.124.128:9092
服务器端口对应暴露端口 listener.security.protocol.map=PLAINTEXT:PLAINTEXT
查看topic情况

bin/kafka-topics.sh --list --bootstrap-server 192.168.124.128:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 192.168.124.128:9092

4.往Topic中添加事件(生产者)

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.124.128:9092

This is my first event
This is my second event

Ctrl+C退出保存

5.复制会话,打开另一session,读取事件 (消费者)
–生产者若不退出,可以持续的添加事件

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.124.128:9092

此时再生产者输入,消费者会一逐条接收到消息
6.注入数据创建事件流

7.实现的kafka事件流
可以从API开始看,这里有详细的使用教程,包括改导入什么依赖
http://kafka.apache.org/documentation/#producerapi
kafka流的编写,主要需要创建Pipe.java、LineSplit.java、WordCount.java,示例一步步加强功能
https://kafka.apache.org/30/documentation/streams/tutorial

或者你们直接看我写的,参照官网自己码的代码
(1)添加相关topic(参照第3步):我这里用到了TextLinesTopic、streams-plaintext-input、streams-linesplit-output、streams-pipe-output、streams-wordcount-output等。
(2)在pom.xml添加相关依赖

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>3.0.0</version>
 </dependency>
 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams</artifactId>
     <version>3.0.0</version>
 </dependency>
 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams-scala_2.13</artifactId>
     <version>3.0.0</version>
 </dependency>

(3)kafka事件流编写

package com.example.springb_web.utils.Kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        final StreamsBuilder builder = new StreamsBuilder();
        //创建输入输出流input/output
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.to("streams-pipe-output");
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);

        //添加CountdownLatch在终止该程序时关闭客户端
        final CountDownLatch latch = new CountDownLatch(1);

        //若虚拟机停止kafka如Ctrl+C,关闭kafka流
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            //调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
            latch.await();
        } catch (Throwable e) {
            //0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
            System.exit(1);
        }
        System.exit(0);
    }
}
package com.example.springb_web.utils.Kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

//效果把输入分词换行后输出。如生产者输入hello world,消费者为hello  world两次输出
public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        //它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-lineSplit");
        //指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        final StreamsBuilder builder = new StreamsBuilder();
        //创建输入输出流input/output,每一个流都是String类型化的键值对  需要创建streams-plaintext-input等topic
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        //* 将值字符串视为文本行,使用FlatMapValues将其拆分为单词
        /*KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split("\\W+"));
            }
        });*/
        //KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
        //source.to("streams-linesplit-output");
        //lamda表达式写法结合流式写法
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);

        //添加CountdownLatch在终止该程序时关闭客户端
        final CountDownLatch latch = new CountDownLatch(1);

        //若虚拟机停止kafka如Ctrl+C,关闭kafka流
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            //调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
            latch.await();
        } catch (Throwable e) {
            //0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
            System.exit(1);
        }
        System.exit(0);
    }
}
package com.example.springb_web.utils.Kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/*分词并统计出现单词次数。调用方法不同于Pipe和lineSplit,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer,不然无法看到正常的output
* bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.128:9092\
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
*
* */
public class WordCount {

    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        //它提供您的 Streams 应用程序的唯一标识符以将其与其他与同一个 Kafka 集群通信的应用程序
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-lineSplit");
        //指定用于建立与 Kafka 集群的初始连接的主机/端口对列表
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
        //记录键值对的默认序列化和反序列化库
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        final StreamsBuilder builder = new StreamsBuilder();
        //创建输入输出流input/output,每一个流都是String类型化的键值对  需要创建streams-plaintext-input等topic
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                //生成一个新的分组流,然后可以由count运算符聚合
                .groupBy((key, value) -> value)
                //count运算符有一个Materialized参数,该参数指定应将运行计数存储在名为 的状态存储中counts-store。这个Countsstore 可以实时查询
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                //返回值不再是String类型,需要对Long类型提供重写的序列化方法
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        /*
         * Sub-topologies:
         *   Sub-topology: 0
         *     Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
         *     Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
         *     Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
         *     Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
         *     Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
         *   Sub-topology: 1
         *     Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
         *     Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
         *     Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
         *     Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
         */

        KafkaStreams streams = new KafkaStreams(topology, props);
        //添加CountdownLatch在终止该程序时关闭客户端
        final CountDownLatch latch = new CountDownLatch(1);

        //若虚拟机停止kafka如Ctrl+C,关闭kafka流
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            //调用 await()方法的任务将一直阻塞等待, 直到这个 CountDownLatch 对象的计数值减到 0 为止
            latch.await();
        } catch (Throwable e) {
            //0.正常退出,是指如果当前程序还有在执行的任务,则等待所有任务执行完成以后再退出;1.非正常退出,只要时间到了,立刻停止
            System.exit(1);
        }
        System.exit(0);
    }

}

(4)生产者消费者你可以直接在linux上用第4、5步中的命令,也可以在java中自己创建相应的类,比如下面

package com.example.springb_web.utils.Kafka;


import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerTest {

        public static void main(String[] args) throws ExecutionException, InterruptedException {

            // 0.配置一系列参数
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");//kafka集群,broker-list
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 1);//重试次数
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//批次大小
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);//等待时间
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//RecordAccumulator缓冲区大小
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            // 1.创建一个生产者对象
            Producer<String, String> producer = new KafkaProducer<String, String>(props);

            // 2.调用send方法
            for (int i = 0; i < 10; i++) {
                //回调函数在Producer收到ack时异步调用
                /*producer.send(new ProducerRecord<String, String>("testKafka1", Integer.toString(i), Integer.toString(i)), new Callback() {

                      //回调函数在Producer收到ack时异步调用
                      @Override
                      public void onCompletion(RecordMetadata metadata, Exception exception) {
                          if (exception == null) {
                              System.out.println("消息发送成功->" + metadata.offset());
                          } else {
                              exception.printStackTrace();
                          }
                      }
                  });*/
                producer.send(new ProducerRecord<String, String>("TextLinesTopic", Integer.toString(i), Integer.toString(i)), (metadata, exception) -> {
                    if (exception == null) {
                        System.out.println("消息发送成功->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });

            }
            // 3.关闭生产者
            producer.close();
        }

}

package com.example.springb_web.utils.Kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

//kafka消费消息的方式,kafka是使用pull(拉)模式从broker中读取数据,根据消费者的消费能力以适当的速率消费broker里的消息
//消费者消费数据的核心点在于offset的维护
public class KafkaConsumerTest {
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.128:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //自动提交offset,默认为true,由于是根据时间来自动提交的,因此是出了问题之后完全不可控的,因此在实际生产中不常使用
        //如果先消费数据后提交offset,这时候如果在提交offset的时候挂掉了,后来恢复后,会重复消费那条offset的数据,这样会数据重复,但也就是保证了数据的最少一次性
        //如果先提交offset后消费数据,这时候如果在提交offset的时候挂掉了,后来恢复后,那部分offset虽然提交了,但其实是没有消费的,因此就照成了数据的丢失,但是不会重复,也就保证了数据的最多一次性(at most once)。
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //消费者组ID,只要group.id相同,就属于同一个消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groud18");

        //创建1个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //订阅主题topic
        //consumer.subscribe(Arrays.asList("WordsWithCountsTopic"));
        consumer.subscribe(Arrays.asList("TextLinesTopic"));
        while (true) {
            //如果当前没有数据可供消费,消费者会等待100ms之后再返回数据(批量)
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                //offset相当于序号,标记每一条数据的位置,consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费
                System.out.println("----------------------------topic = " + record.topic()
                        + " offset = " + record.offset() + " value = " + record.value());
            }
            consumer.commitAsync();
//            consumer.commitSync();
        }



    }
}

(5)验证测试
①首先验证生产者消费者,先启动KafkaConsumerTest,可以看到下面的日志一直在滚动,再启动KafkaProducerTest,可以看到消费者打出了消费日志(眼尖一点,如果没找到就打个断点看)
在这里插入图片描述
②Pipe
开启Pipe.java,生产者中topic改为streams-plaintext-input,消费者topic改为
streams-pipe-output ,为了省眼睛,我这里就用linux直接操作好了。。
生产者

bin/kafka-console-producer.sh --topic streams-plaintext-input --bootstrap-server 192.168.124.128:9092

消费者

bin/kafka-console-consumer.sh --topic streams-pipe-output --from-beginning --bootstrap-server 192.168.124.128:9092

可以看到消费者成功输出

③LineSplit(效果把输入分词换行后输出。如生产者输入hello world,消费者为hello world两次输出)
生产者同上,消费者将streams-pipe-output换成streams-linesplit-output,开启LineSplit,生产者输入hello world可以看到结果如下:
在这里插入图片描述
④Wordcount(分词并统计出现单词次数)
生产者同上,开启Wordcount,
这里注意,因为输出的结果不是String而且Long,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer,不然无法看到正常的output。生产者输入hello world后,执行下面操作

bin/kafka-console-consumer.sh --bootstrap-server 192.168.124.128:9092\
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

可以看到结果如下,这里我重复输入了好几遍,所以统计的hello共有四次:
在这里插入图片描述
8.关闭kafka

9.相关参数解释
partions:主题分区数。kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上(如两个broker,副本因子为1,四个分区,则每个broker分配两分区),
当只有一个broker时,所有的分区就只分配到该Broker上。消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。
分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。
replication-factor:副本因子,用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
若partions 设置为10,replicationFactor设置为1. Broker为2.分区会均匀在broker。broker1分区为13579,broker2为246810;
若partions 设置为10,replicationFactor设置为2. Broker为2.每个broker都有副本存在。broker1和broker2副本均为1到10;
若partions 设置为3,replicationFactor设置为1. Broker为3.每个broker都有副本存在。broker1分区为1,broker2为2,broker2为3,当一个broker宕机了,该topic就无法使用了;
若partions 设置为3,replicationFactor设置为2. Broker为3.每个broker都有副本存在。broker1分区为12,broker2为23,broker2为13,当一个broker宕机了,该topic还能使用了。
可以理解平均每个broker分区数=partions*replication-factor/broker数

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐