spring阻塞队列(Kafka入门使用)、发送系统消息的应用
BlockingQueue解决线程通信的问题阻塞方法:put、take生产者消费者模式生产者:产生数据的线程消费者:使用数据的线程实现类ArrayBlockingQueueLinkedBlockingQueueProityBlockingQueue、SynchronousQueue、DelayQueue样例代码package com.nowcoder.community;import org.ju
阻塞队列
- BlockingQueue
解决线程通信的问题
阻塞方法:put、take
- 生产者消费者模式
生产者:产生数据的线程
消费者:使用数据的线程
- 实现类
ArrayBlockingQueue
LinkedBlockingQueue
ProityBlockingQueue、SynchronousQueue、DelayQueue
- 样例代码
package com.nowcoder.community;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTests {
@Test
public void test() {
BlockingQueue queue=new ArrayBlockingQueue(10);//指定队列存放的数量
new Thread(new producer(queue)).start();//创建生产者线程
new Thread(new Consumer(queue)).start();//创建消费者线程
new Thread(new Consumer(queue)).start();//创建消费者线程
}
}
class producer implements Runnable{
private BlockingQueue<Integer> queue;
public producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for(int i=0;i<100;i++){
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true){
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
效果:
Kafka入门
简介
kafka是一个分布式的流媒体平台
应用:消息系统、日志收集、用户行为追踪、流式处理
特点
高吞吐量、消息持久化、高可靠性、高扩展性
术语
Broker(kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker)
Zookeeper(zookeeper 存储了一些关于 consumer 和 broker 的信息)
Topic(每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic)
Partition(Partition是一个物理上的概念,每个Topic包含一个或者多个Partition)
Offset(任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量))
ReplicaLeader
Folloer Replica
命令
启动zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动服务
bin\windows\kafka-server-start.bat config\server.properties
创建主题
(默认端口9092)test为主题名
查看所有主题
向某个服务器某个主题发送消息
D:\newapp\kafka_2.13-3.1.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者读取数据
D:\newapp\kafka_2.13-3.1.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Spring整合kafka
1、 添加依赖
spring kafka
2、配置Kafka
配置server、consumer
3、访问Kafka
生产者:
kafkaTemplate.send(topic,data);
消费者:
@KafkaListener(topics={"test"})
public void handleMessage(ConsumerRecord record){}
4、kafka测试
package com.nowcoder.community;
import com.nowcoder.community.CommunityApplication;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test1","你好");
kafkaProducer.sendMessage("test1","在吗");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 生产者是主动发出消息
*/
@Component
class KafkaProducer{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String content){
kafkaTemplate.send(topic,content);
}
}
/**
* 消费者是被动执行
*/
@Component
class KafkaConsumer{
@KafkaListener(topics = {"test1"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
效果
发送系统消息
触发事件
评论后,发布通知
点赞后,发布通知
关注后,发布通知
处理事件
封装事件对象
开发事件的生产者
开发事件的消费者
更多推荐
所有评论(0)