Kafka简单入门及阻塞队列了解
1.什么是阻塞队列;2.kafka安装及入门使用
文章目录
一、阻塞队列
BlockingQueue是接口;
下面有它的实现类,通过数组实现的、通过链表实现的、等等;
这里一个用一个小demo进行ArrayBlockingQueue的演示
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
for(int i = 0; i < 100 ; i++){
try {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + " 生产:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + " 消费:" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、Kafka
默认端口:9092
不是之前的私信功能,用户和用户发消息,而是服务器自动的给用户发送系统的消息或系统的通知。
消息持久化
:数据永久存到硬盘里的;上面的BlockingQueue是存到队列里,消费完以后出队就完了,没永久保存
kafka术语
:
Broke:kafka的服务器,kafka集群当中每一台服务器都称为一个Broke;
Zookeeper:并不是kafka的一个概念,而是一个独立的软件;比如Dubbo也会有用到zookeeper;用来管理其他的集群;使用kfka的时候可以单独安装一个Zookeeper或者使用内置的Zookeeper;
Topic:把生产者把消息发布到的位置/空间就是主题
Partition:分区,对Topic位置的一个分区;一个主题里可以分为多个分区,增强并发能力,每一个分区从前往后按顺序往里面追加写入数据;
Offset:消息在分区内存放的索引序列;
Leader Replica:主副本,对数据做备份,并且处理请求(比如向这个分区读取数据时,主副本可以做出响应);kafka是一个分布式的消息引擎,让数据更可靠;
Follower Replica:从副本,只是备份数据,不做响应;如果主副本挂了,这时集群就会选一个新的从副本作为Leader主副本;
消息队列实现的方式大致两种:
一种是点对点
的方式,上面写的BlockingQueue就是点对点的方式;就是生产者把数据放到一个队列里,消费者就从这个队列里取值,那消费者可能有多个,如果A消费者取到了一个数据,那么这个数据就出队了,那么B消费者取得就是另外的数据,不会冲突重复的;即每个数据只被一个消费者消费!
另一种是发布订阅
模式,生产者把消息放到了某一个位置,然后可以有多个消费者关注这个位置,订阅这个位置,然后读取消息;这个消息可以被多个消费者同时读到或者先后读到,kafka就是这一种模式,比较灵活方便;
1.安装Kafka
官网:http://kafka.apache.org
下载好对应版本压缩包后,直接解压打开即可!
1.1 zookeeper.properties
zookeeper是一个管理集群的工具,先对它做一些配置!
zookeeper运行的时候会响应一些数据,配置数据存放路径;
→
1.2 server.properties
配置kafka日志文件存放的位置:
→
这样子,kafka就已经安装好了!
1.3 测试kafka
1.3.1启动zookeeper
kafka依赖于zookeeper,所以先启动zookeeper;
启动成功!
1.3.2 启动kafka
这是还需要一个命令行窗口来启动kafka;
启动成功!
1.3.3⭐️使用kafka
再启动一个命令行,要是用kafka命令工具,所以要cd到对应目录之下;
然后:
1111
创建一个主体Topic,因为kafka作为消息队列采用的是“发布订阅”模式,不是点对点;要把消息发布到某一个主题下,所以先要创建一个主题;
(再强调一次Topic主题概念:第一个代表一个位置,第二个代表一种消息的类别分类:比如这是点赞的消息主题,这是关注的消息主题)
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testt
意思:打开kafka-topics.bat创建一个主题,端口号为默认的9092,副本数为1,分区为1,主题名为testt;
kafka-topics.bat --list --bootstrap-server localhost:9092
查看这个9092这个服务器的主题列表:
2222
主题创建好后,下面就要往主题上发送消息,发送消息是以生产者模式身份发送,所以要调用生产者的bat:
kafka-console-producer.bat 发送消息;
kafka-console-producer.bat --broker-list localhost:9092 --topic testt
意思是调用生产者,往9092服务器上的testt主题发送消息;
3333
再开一个命令窗口,代表消费者去读生产者的消息;
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testt --from-beginning
意思是读9092服务上testt大主题的消息,from-beginning代表从从开始读,读所有的;
3333
此时对比两个窗口,在生产者输入yty,右边的消费者立马读到了yty,成功!
更多推荐
所有评论(0)