springboot+redission消息队列应用探索
在分布式系统中必备的一个中间件就是消息队列,通过消息队列对服务与服务间进行异步处理、应用解耦、流量削峰,常用的消息中间件有rabbitmq、rocketmq、kafka等。消息队列是一种异步的服务间通信方式,适用于分布式或微服务架构中,消息在被处理之前一直存储在队列上。如消息队列被用于分离重量级处理、缓冲或批处理工作或缓解高峰期工作。3个节点producer:消息生产者,负载生产和发送消息到bro
在分布式系统中必备的一个中间件就是消息队列,通过消息队列对服务与服务间进行异步处理、应用解耦、流量削峰,常用的消息中间件有rabbitmq、rocketmq、kafka等。
消息队列是一种异步的服务间通信方式,适用于分布式或微服务架构中,消息在被处理之前一直存储在队列上。如消息队列被用于分离重量级处理、缓冲或批处理工作或缓解高峰期工作。
3个节点
producer:消息生产者,负载生产和发送消息到broker;
boroker:消息处理中心,负责消息存储、确认、重试等,一般其中会包含多个queue;
consumer:消息消费者,负责从broker中获取消息,并进行相应处理;
3个典型应用场景
异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
应用解耦:发送方和接收方之间不需要了解双方,只需要约定消息,多应用间通过消息队列对同一消息进行处理,避免调用接口失败而导致整个过程失败;
限流削峰:应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉;
3个典型特性
消息有序性:消息是异步处理的,但消费者需要按照生产者发送消息的顺序来消费,避免出现后生产的消息被先消费;
重复消费处理:生产者可能因为网络问题出现消息重复生产导致消费者可能会消费到多条重复消息;
可靠性:当生产消息时消费者未消费,消息队列会保留消息,直到消费成功。
redis作为分布式系统中必备的缓存中间件,那么可以应用消息队列吗?
下面使用redis中list数据类型进行分析
redis的列表list是一种线性的有序结构,可以按照元素被推入列表中的顺序来存储元素,满足先进先出,元素可以是文字数据,也可以是二进制数据。
LPUSH:生产者使用 LPUSH key element[element...] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。
RPOP:消费者使用 RPOP key 依次读取队列的消息,先进先出
LPUSH和RPOP可以生产消费,但是存在一个性能风险,生产者向队列中存储消息时,list并不会主动通知消费者及时消费。我们需要写一个while(true)死循环不停的调用RPOP,有新消息就处理;程序不断的轮询判断,会导致若没有新消息写入到队列时,消费者也要不停的调用RPOP占用cpu资源。
这时redis提供了BLPOP、BRPOP堵塞读取的命令,消费者在读取队列时,若没有数据则自动堵塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。
示例
1、pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>
2、application.properties
server.port=9001
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
#spring.redis.ssl=false
3、队列服务
@Slf4j
@Service
public class QueueService {
private static final String REDIS_MQ = "redisMQ";
@Autowired
private RedissonClient redissonClient;
/**
* 发送消息到队列头部
*
* @param message
*/
public void sendMessage(String message) {
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);
try {
blockingDeque.putFirst(message);
log.info("将消息: {} 插入到队列。", message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 从队列尾部阻塞读取消息,若没有消息,线程就会阻塞等待新消息插入,防止 CPU 空转
*
*/
public void onMessage() {
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);
while (true) {
try {
String message = blockingDeque.takeLast();
log.info("从队列 {} 中读取到消息:{}.", REDIS_MQ, message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4、测试
@Autowired
private QueueService queueService;
@Test
public void testQueue() throws InterruptedException {
new Thread(() -> {
for (int i = 0; i < 1000; i++) {
queueService.sendMessage("消息" + i);
}
}).start();
new Thread(() -> {
queueService.onMessage();
}).start();
Thread.currentThread().join();
}
可见可以使用list数据结构来简单实现消息队列,但其应用场景还需结合具体需求。
redission的使用还需深入理解和应用提供的各种方法。
更多推荐
所有评论(0)