教程为本地虚拟机安装:通过MobaxTerm连接本地Vmware搭建的Centos7环境

一.Zookeeper安装

1.下载:ZookeeperApache

在这里插入图片描述

2.安装

# 1.解压zookeeper到安装目录
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz
mv apache-zookeeper-3.8.0-bin /usr/local/zookeeper
# 2.修改配置文件(复制示例文件),指定持久的zookeeper数据目录,如:/mnt/data/zookeeper
cd /usr/local/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 3.启动服务
cd ..
bin/zkServer.sh start conf/zoo.cfg
# 4.查看状态
bin/zkServer.sh status
# 5.客户端连接
bin/zkCli.sh
# 6.连接远程的zookeeper server
bin/zkCli.sh ‐ server ip:port

在这里插入图片描述

3.配置文件说明

# The number of milliseconds of each tick
# 最小时间片,单位毫秒
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
# Follower初始化连接Leader超时设置,表示Tick倍数,即:tickTime * initLimit (ms)
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# Follower与Leader同步数据超时设置,表示Tick倍数,即:tickTime * initLimit (ms)
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# zookeeper 数据存储目录
dataDir=/tmp/zookeeper
# the port at which the clients will connect
# zookeeper 服务端口
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
# 单个客户端与zookeeper最大并发连接数
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
# 保存数据快照数量 超出后回滚
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
# 自动触发清除任务时间间隔 单位小时 默认为 0 表示不清空
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

二.Kafka安装

1.下载:KafkaApache

在这里插入图片描述

2.安装

# 1.解压并移动
tar zxvf kafka_2.13-3.2.0.tgz
mv kafka_2.13-3.2.0 /usr/local/kafka
# 2.修改配置文件
cd /usr/local/kafka/
vi config/server.properties
# 3.启动服务
bin/kafka-server-start.sh -daemon config/server.properties
# 4.连接ZK客户端查看Kafka节点
bin/zkCli.sh
ls /brokers/ids
# 5.停止Kafka
bin/kafka-server-stop.sh

3.配置文件说明

# broker.id 属性在kafka集群中必须要是唯一
broker.id=0
# kafka 本机地址和端口
listeners=PLAINTEXT://192.168.184.128:9092
# kafka 的消息存储文件
log.dir=/mnt/data/kafka‐logs
# kafka 连接 zookeeper 的地址
zookeeper.connect=192.168.184.128:2181

4.本机Java服务查看

jps

5.创建主题

# 1.创建只有一个Partition 且备份因子数也设置为1的主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --create --replication-factor 1 --partitions 1 --topic moon
# 2.查看Topic列表
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --list
# 3.删除主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --delete --topic moon

在这里插入图片描述

6.收发消息

# 1.生产者
bin/kafka-console-producer.sh --broker-list 192.168.184.128:9092 --topic moon
>i am msg
>this is a fish
>exit
>quit
# 2.消费者(默认为开始消费当前之后的数据)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 --topic moon
# 3.从头消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092 --from-beginning --topic moon

7.Kafka集群

通过创建多个Broker实例演示

# 1.复制服务端配置文件
cp server.properties server_1.properties
cp server.properties server_2.properties
# 2.编辑 broker.id 分别设为1、2 
#       修改日志目录 /mnt/data/kafka‐logs-1 /mnt/data/kafka‐logs-2
#       端口 9093 9094
vi server_1.properties
vi server_2.properties
# 3.启动服务
bin/kafka-server-start.sh -daemon config/server_1.properties
bin/kafka-server-start.sh -daemon config/server_2.properties

查看Zookeeper注册节点信息

bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids

在这里插入图片描述
Topic创建和查看

# 1.创建一个副本数为 3 和 分区数为 2 的主题
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --create --replication-factor 3 --partitions 2 --topic sun
# 2.查看Topic信息
bin/kafka-topics.sh --bootstrap-server 192.168.184.128:9092 --describe --topic sun
# 3.向集群发送消息
bin/kafka-console-producer.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic sun
# 4.消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --from-beginning --topic sun

在这里插入图片描述

三.Java客户端访问Kafka(关闭服务器防火墙)

1.依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.5.graal</version>
</dependency>

2.生产者

package com.example.kafkatest.kafka;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * Description: Kafka 生产者
 *
 * @Author: 
 * @Date: 2022-06-05 0:41
 * @version: V1.0.0
 */
public class KafkaProducer {


    private final static String TOPIC_NAME = "moon";

    /**
     * ACKS_CONFIG 消息持久化
     * acks=0       Producer 不需等待 Broker 回复 直接发送下一条
     * acks=1       Producer 至少 Leader 将数据写入本地 Log 不需等待 Follower 是否成功写入 发送下一条
     * acks=-1或all Producer 等待 min.insync.replicas 个 副本写入日志 才能发送下一条 一般金融级别的服务才会开启此配置
     *
     * RETRIES_CONFIG 重试
     * 默认重试间隔100ms 重试能保证消息发送的可靠性 但网络抖动可能造成消息重复发送 需要在消费端保证幂等
     *
     * RETRY_BACKOFF_MS_CONFIG 重试间隔
     *
     * BUFFER_MEMORY_CONFIG 发送消息的本地缓冲区 提高消息发送性能 默认值是33554432 32MB
     *
     * BATCH_SIZE_CONFIG kafka本地线程会从缓冲区取数据,批量发送到broker
     * 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
     *
     * LINGER_MS_CONFIG 默认 0 立即发送消息 一般设为10毫秒 10ms内数据满了一个batch立即发送 未满则到了10ms也发送避免消息延迟
     *
     * @param args
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.184.128:9092,192.168.184.128:9093");

        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        int msgNum = 5;
        final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
        for (int i = 1; i <= msgNum; i++) {
            MsgCon order = new MsgCon(i, 100 + i, 1, 1000.00);
            //指定发送分区
            /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
            //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));

            //等待消息发送成功的同步阻塞方法
            /*RecordMetadata metadata = producer.send(producerRecord).get();
            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());*/

            //异步回调方式发送消息
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("消息发送失败...");
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                    countDownLatch.countDown();
                }
            });
        }

        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.close();
    }


}

3.消费者

package com.example.kafkatest.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * Description: Kafka消费者
 *
 * @Author: 
 * @Date: 2022-06-05 0:47
 * @version: V1.0.0
 */
public class KafkaConsumers {

    private final static String TOPIC_NAME = "moon";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    /**
     * AUTO_OFFSET_RESET_CONFIG 消费位置
     * latest(默认) :只消费自己启动之后发送到主题的消息
     * earliest:第一次从头开始消费,以后按照消费offset记录继续消费
     *
     * HEARTBEAT_INTERVAL_MS_CONFIG Consumer 给 Broker发送心跳的间隔时间
     *
     * SESSION_TIMEOUT_MS_CONFIG Consumer 超时故障
     * 服务端broker多久感知不到一个 Consumer 心跳就认为他故障了 会将其踢出消费组 对应的 Partition 也会被重新分配给其他 Consumer 默认是10秒
     *
     *
     *
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.184.128:9092,192.168.184.128:9093");
        // 消费分组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        // 是否自动提交 Offset,默认就是 true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自动提交 Offset 的间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 消费位置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 心跳间隔
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        // 故障判断
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        // 一次 Poll 最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        // 消费指定分区
        //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                    record.offset(), record.key(), record.value());
        }
    }
}

在这里插入图片描述

四.SpringBoot 继承Kafka(关闭服务器防火墙)

1.依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.8.6</version>
  <type>pom</type>
</dependency>

2.配置文件

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 192.168.184.128:9092
    producer:
      retries: 3 #重试次数
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual_immediate

3.生产者

package com.example.kafkatest.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

/**
 * Description: 生产者
 *
 * @Author: 
 * @Date: 2022-06-05 14:28
 * @version: V1.0.0
 */
@RestController
@RequestMapping(("/send"))
public class Send {

    private static final String TOPIC = "moon";

    private KafkaTemplate<String,String> kafkaTemplate;

    @Autowired
    public void setKafkaTemplate(KafkaTemplate<String,String> kafkaTemplate){
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/producer")
    public void send(@RequestParam("msg") String msg){
        kafkaTemplate.send(TOPIC,0,"KEY",msg);
    }
}


4.消费者

package com.example.kafkatest.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * Description: 消费者
 *
 * @Author: 
 * @Date: 2022-06-05 14:40
 * @version: V1.0.0
 */
@Component
public class SendConsumer {

    @KafkaListener(topics = "moon",groupId = "group_1")
    public void listen_1(ConsumerRecord<String,String> record, Acknowledgment ack){
        String msg = record.value();
        System.out.println("listen_1:" + msg);
        ack.acknowledge();
    }

    @KafkaListener(topics = "moon",groupId = "group_2")
    public void listen_2(ConsumerRecord<String,String> record, Acknowledgment ack){
        String msg = record.value();
        System.out.println("listen_2:" + msg);
        ack.acknowledge();
    }

}

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐