阻塞队列

  • BlockingQueue

解决线程通信的问题

阻塞方法:put、take

  • 生产者消费者模式

生产者:产生数据的线程

消费者:使用数据的线程

  • 实现类

ArrayBlockingQueue

LinkedBlockingQueue

ProityBlockingQueue、SynchronousQueue、DelayQueue

  • 样例代码
package com.nowcoder.community;

import org.junit.Test;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {

    @Test
    public  void test() {
        BlockingQueue queue=new ArrayBlockingQueue(10);//指定队列存放的数量
        new Thread(new producer(queue)).start();//创建生产者线程
        new Thread(new Consumer(queue)).start();//创建消费者线程
        new Thread(new Consumer(queue)).start();//创建消费者线程
        }
    }

class producer implements  Runnable{
private BlockingQueue<Integer> queue;

    public producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        try {
            for(int i=0;i<100;i++){
            Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true){
               Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());
            }

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

效果:

Kafka入门

简介

kafka是一个分布式的流媒体平台

应用:消息系统、日志收集、用户行为追踪、流式处理

特点

高吞吐量、消息持久化、高可靠性、高扩展性

术语

Broker(kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker)

Zookeeper(zookeeper 存储了一些关于 consumer 和 broker 的信息)

Topic(每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic)

Partition(Partition是一个物理上的概念,每个Topic包含一个或者多个Partition)

Offset(任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量))

ReplicaLeader 

Folloer Replica

命令

启动zookeeper 

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

 启动服务

bin\windows\kafka-server-start.bat config\server.properties

创建主题

(默认端口9092)test为主题名 

查看所有主题

 

向某个服务器某个主题发送消息 

D:\newapp\kafka_2.13-3.1.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

消费者读取数据

 D:\newapp\kafka_2.13-3.1.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

Spring整合kafka

1、 添加依赖

spring kafka

2、配置Kafka

配置server、consumer

3、访问Kafka

生产者:

kafkaTemplate.send(topic,data);

消费者:

@KafkaListener(topics={"test"})

public void handleMessage(ConsumerRecord record){}

 4、kafka测试

package com.nowcoder.community;

import com.nowcoder.community.CommunityApplication;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
    @Autowired
    private KafkaProducer kafkaProducer;
    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test1","你好");
        kafkaProducer.sendMessage("test1","在吗");
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

/**
 * 生产者是主动发出消息
 */
@Component
class KafkaProducer{
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public void sendMessage(String topic,String content){
        kafkaTemplate.send(topic,content);

    }
}

/**
 * 消费者是被动执行
 */
@Component
class KafkaConsumer{
    @KafkaListener(topics = {"test1"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

 效果

发送系统消息

触发事件

评论后,发布通知

点赞后,发布通知

关注后,发布通知

处理事件

封装事件对象

开发事件的生产者

开发事件的消费者

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐