MQ,缓存,分布式锁
MOM:消息中间件MQ:消息队列实现方式:AMQP:高级消息队列协议,本质协议,规定数据格式;任何语言可以实现;多种消息模型(5种)JMS:java消息服务,本质接口;必须是java语言实现;只有2种消息模型(点对点 发布订阅)主要作用:1.异步 2.解耦 3.削峰填谷主流的MQ:ActiveMQ:RabbitMQRocketMQkafkaZeroMQ五种消息模型:1.简单模型(Simple)2.
·
MOM:消息中间件
MQ:消息队列
实现方式:
AMQP:高级消息队列协议,本质协议,规定数据格式;任何语言可以实现;多种消息模型(5种)
JMS:java消息服务,本质接口;必须是java语言实现;只有2种消息模型(点对点 发布订阅)
主要作用:
1.异步 2.解耦 3.削峰填谷
主流的MQ:
ActiveMQ:
RabbitMQ
RocketMQ
kafka
ZeroMQ
五种消息模型:
1.简单模型(Simple)
2.工作模型(work)
3.发布订阅(Fanout)
4.发布订阅(Direct 路由模型)
5.发布订阅(Topic 通配模型)
面试题:
防止消息堆积?
1.工作模型搭建消费者集群
2.多线程消费消息 concurrency
避免消息丢失?
1.生产者确认:
spring.rabbitmq.publisher-confirm-type=simple/CORRELATED
spring.rabbitmq.publisher-returns=true
@Configuration
public class RabbitConfig{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
this.rabbitTemplate.setConfirmCallback(ConfirmCallback); // 不管有没有到达该交换机都会执行该回调
this.rabbitTemplate.setReturnsCallback(ReturnsCallback); // 消息未到达队列执行该回调
}
}
2.消息持久化:交换机持久化(springAMQP中默认就是持久化) 队列持久化 消息持久化(默认就是持久化的)
3.消费者确认
spring.rabbitmq.listener.simple.acknowledge-mode=none/auto/manul
channl.basicAck(message.getMessageProperties.getDeliveryTag(), false);
channl.basicNack(message.getMessageProperties.getDeliveryTag(), false, true);
channl.basicReject(message.getMessageProperties.getDeliveryTag(), false);
死信队列
Dead letter:死信消息
转发死信消息的交换机就是死信交换机
存放死信消息的队列就是死信队列
死信消息产生的3个场景:
1.basicNack或者basicReject拒绝了消息又不重新入队
2.队列中的消息达到TTL
3.队列已满依然入队
给业务队列绑定死信交换机:
x-dead-letter-exchange: DLX
x-dead-letter-rounting-key:DLK
延时队列:
和普通的队列有何区别:
1.队列会指定消息的生存时间:x-message-ttl:毫秒值的生存时间
2.队列没有消费者
绑定死信队列方式同上
基础配置:
spring.rabbitmq.host=rabbit地址
spring.rabbitmq.virtual-host=虚拟机主机
spring.rabbitmq.username=用户名
spring.rabbitmq.password=密码
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.concurrency=4
缓存:
场景:
1.读的并发非常高
2.写的频率非常低
目的:
1.保护数据库
2.提高系统并发能力
3.保护系统关键环节
缓存逻辑:
1.先查询缓存,缓存中可以命中,直接返回
2.缓存中没有,查询数据库,并放入缓存
缓存数据一致性问题:
1.双写模式
2.失效模式
3.双删模式
缓存的读问题:
1.穿透:大量的请求同时访问不存在的数据,由于数据不存在所以没有对应的缓存数据,请求直达数据库。解决方案:数据即使为null也要缓存
2.雪崩:由于缓存时间一样,导致缓存大面积的同时过期,此时如果大量请求过来,就会直达数据库。解决方案:给缓存时间添加一个随机值
3.击穿:由于一个热点数据缓存过期,此时大量请求同时访问该数据,请求直达数据库。解决方案:加锁(分布式锁) 布隆过滤器
分布式锁:
分布式锁的特点:
1.独占排他
2.防止死锁的发生:获取到锁之后服务立马宕机(过期时间) 如果A、B两个都需要分布式锁,而A又调用了B(可重入锁)
3.保证原子性:获取锁和过期时间之间(set k v ex 30 nx) 判断和释放锁(lua)
4.解铃还须系铃人:不能释放别人的锁(判断是否自己的锁)
5.可重入锁(可选):ThreadLocal hash
6.自动续期
7.redis集群情况下也可能会导致锁失效
分布式锁的主流实现:
1.基于mysql数据库实现:
2.基于redis实现:1.执行setnx 2.执行成功的获取到锁执行业务,执行完成释放锁 3.没有获取到锁的重试即可
3.基于zookeeper实现
zk实现分布式锁的安全性是最高的,实现比较麻烦
mysqsl实现分布式锁性能是最低的,性能最高的是redis
1.防止死锁的发生:一个请求到达testLock获取锁成功,接着首页服务宕机,导致redis中锁无法释放,产生死锁问题
解决方案:给锁加过期时间
指令:set k v ex 30 nx
代码:this.redisTemplate.opsForValue().setIfAbsent("lock", "111", 3, TimeUnit.SECONDS);
可重入锁获取锁:
// exists为0:第一次获取锁,执行hincrby lock uuid 1可以获取到锁
// exists不为0,hexists lock uuid:如果锁已经被占用,如果该锁是被当前线程占用,则可重入:hincrby lock uuid 1
// 获取锁之后防止死锁的发生,必须设置过期时间
if(exists lock == 0 or hexists lock uuid == 1){
hincrby lock uuid 1;
expire lock 30;
}
可重入锁释放锁:
if(hexists lock uuid == 0){
return nil; // 尝试解锁别人的锁
}
if(hincrby lock uuid -1 > 0){
return 0; // 推出了一次
} else {
del lock;
return 1; // 已经释放了锁
}
自动续期伪代码:
if(hexists lock uuid == 1){
expire lock 30
}
redisson入门:
1.引入redisson的依赖
2.java配置初始化RedissonClient客户端
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://172.16.116.100:6379");
return Redisson.create(config);
}
}
3.获取锁加锁解锁
RLock lock = redissonClient.getLock("lock")
lock.lock()/unlock();
更多推荐
已为社区贡献1条内容
所有评论(0)