一、阻塞队列

在这里插入图片描述

BlockingQueue是接口;
下面有它的实现类,通过数组实现的、通过链表实现的、等等;
这里一个用一个小demo进行ArrayBlockingQueue的演示

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }
}

class Producer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        for(int i = 0; i < 100 ; i++){
            try {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + " 生产:" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

class Consumer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(1000));
            queue.take();
            System.out.println(Thread.currentThread().getName() + " 消费:" + queue.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

二、Kafka

默认端口:9092

不是之前的私信功能,用户和用户发消息,而是服务器自动的给用户发送系统的消息或系统的通知。

在这里插入图片描述

消息持久化:数据永久存到硬盘里的;上面的BlockingQueue是存到队列里,消费完以后出队就完了,没永久保存

kafka术语
Broke:kafka的服务器,kafka集群当中每一台服务器都称为一个Broke;
Zookeeper:并不是kafka的一个概念,而是一个独立的软件;比如Dubbo也会有用到zookeeper;用来管理其他的集群;使用kfka的时候可以单独安装一个Zookeeper或者使用内置的Zookeeper;
Topic:把生产者把消息发布到的位置/空间就是主题
Partition:分区,对Topic位置的一个分区;一个主题里可以分为多个分区,增强并发能力,每一个分区从前往后按顺序往里面追加写入数据;
Offset:消息在分区内存放的索引序列;
Leader Replica:主副本,对数据做备份,并且处理请求(比如向这个分区读取数据时,主副本可以做出响应);kafka是一个分布式的消息引擎,让数据更可靠;
Follower Replica:从副本,只是备份数据,不做响应;如果主副本挂了,这时集群就会选一个新的从副本作为Leader主副本;

消息队列实现的方式大致两种:
一种是点对点的方式,上面写的BlockingQueue就是点对点的方式;就是生产者把数据放到一个队列里,消费者就从这个队列里取值,那消费者可能有多个,如果A消费者取到了一个数据,那么这个数据就出队了,那么B消费者取得就是另外的数据,不会冲突重复的;即每个数据只被一个消费者消费!
另一种是发布订阅模式,生产者把消息放到了某一个位置,然后可以有多个消费者关注这个位置,订阅这个位置,然后读取消息;这个消息可以被多个消费者同时读到或者先后读到,kafka就是这一种模式,比较灵活方便;

1.安装Kafka

官网:http://kafka.apache.org

下载好对应版本压缩包后,直接解压打开即可!

1.1 zookeeper.properties

zookeeper是一个管理集群的工具,先对它做一些配置!

zookeeper运行的时候会响应一些数据,配置数据存放路径;
在这里插入图片描述

在这里插入图片描述

1.2 server.properties

配置kafka日志文件存放的位置:
在这里插入图片描述

在这里插入图片描述

这样子,kafka就已经安装好了!

1.3 测试kafka

1.3.1启动zookeeper

kafka依赖于zookeeper,所以先启动zookeeper;
在这里插入图片描述
启动成功!
在这里插入图片描述

1.3.2 启动kafka

这是还需要一个命令行窗口来启动kafka;
在这里插入图片描述启动成功!
在这里插入图片描述

1.3.3⭐️使用kafka

再启动一个命令行,要是用kafka命令工具,所以要cd到对应目录之下;
在这里插入图片描述
然后:
1111

创建一个主体Topic,因为kafka作为消息队列采用的是“发布订阅”模式,不是点对点;要把消息发布到某一个主题下,所以先要创建一个主题;
(再强调一次Topic主题概念:第一个代表一个位置,第二个代表一种消息的类别分类:比如这是点赞的消息主题,这是关注的消息主题)
在这里插入图片描述

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testt

意思:打开kafka-topics.bat创建一个主题,端口号为默认的9092,副本数为1,分区为1,主题名为testt;

kafka-topics.bat --list --bootstrap-server localhost:9092

查看这个9092这个服务器的主题列表:
在这里插入图片描述
2222

主题创建好后,下面就要往主题上发送消息,发送消息是以生产者模式身份发送,所以要调用生产者的bat:
kafka-console-producer.bat 发送消息;

kafka-console-producer.bat --broker-list localhost:9092 --topic testt

意思是调用生产者,往9092服务器上的testt主题发送消息;
在这里插入图片描述
3333

再开一个命令窗口,代表消费者去读生产者的消息;
在这里插入图片描述

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testt --from-beginning

意思是读9092服务上testt大主题的消息,from-beginning代表从从开始读,读所有的;
在这里插入图片描述
3333
此时对比两个窗口,在生产者输入yty,右边的消费者立马读到了yty,成功!
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐