MQ如何保证消息不丢失。rabbitMq、kafka、RocketMq
MQ如何保证消息不丢失。rabbitMq、kafka、RocketMq
1.mq原则
MQ传输过程中,消息数据不能多,也不能少,不能多是说消息不能重复消费,这个我们下一章解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的,本章详细介绍不能少的问题。
一 RabbitMQ介绍基本概念
1 消息队列
消息队列属于进程间通信的一种方式,使用消息队列可以通过异步方式处理数据,借此可以提高系统性能。我们可以把消息当作存放数据的容器,消息的消费者可以从队列中获取数据,进行处理。常见的消息队列有:ActiveMQ,RabbitMQ,Kafka,RocketMQ等。
2 RabbitMQ
消息队列中间件
开发语言 Erlang
基于AMQP协议(Advanced Message Queue Protocol)
3 基本概念
Broker:消息队列的服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它主要是把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,投递消息的程序
consumer:消息消费者,接收消息的程序
channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务
2.丢失数据场景
丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景,
(1)rabbitmq
A:生产者弄丢了数据
生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据
如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。
(2)kafka
A:生产者弄丢了数据 生产者没有设置相应的策略,发送过程中丢失数据。
B:kafka弄丢了数据
比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,他就少了一部分数据。
C:消费者弄丢了数据
消费者消费到了这个数据,然后消费后自动提交了offset,让kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
3.如何防止消息丢失
(1)rabbitmq
A:生产者丢失消息
①:可以选择使用rabbitmq提供事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。
channel.txSelect();//开启事物
try{
//发送消息
}catch(Exection e){
channel.txRollback();//回滚事物
//重新提交
}
缺点:rabbitmq事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
②:可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirm
channel.confirm();
//发送成功回调
public void ack(String messageId){
}
// 发送失败回调
public void nack(String messageId){
//重发该消息
}
二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。
一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
(2)kafka
A:消费端弄丢了数据
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
B:kafka弄丢了数据
一般要求设置4个参数来保证消息不丢失: ①给topic设置
replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示
要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。③在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了
④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事败,就无限重试
C:生产者弄丢了数据
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
RocketMQ
Producer保证消息不丢失
1、RocketMQ发送消息有三种模式,即同步发送,异步发送、单向发送。
同步发送消息时会同步阻塞等待Broker返回发送结果,如果发送失败不会收到发送结果SendResult,这种是最可靠的发送方式。
异步发送消息可以在回调方法中得知发送结果。 单向发送是消息发送完之后就不管了,不管发送成功没成功,是最不可靠的一种方式。
/**
* @description: 单向发送
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
*/
public void sendMq() {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.convertAndSend("xiaojie-test", "测试发送消息》》》》》》》》》" + i);
}
}
/***********************************************************************************/
/**
* @description: 同步发送
* 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
*/
public void sync() {
SendResult sendResult = rocketMQTemplate.syncSend("xiaojie-test", "sync发送消息。。。。。。。。。。");
log.info("发送结果{}", sendResult);
}
/***********************************************************************************/
/**
* @description: 异步发送
* 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
*/
public void async() {
String msg = "异步发送消息。。。。。。。。。。";
log.info(">msg:<<" + msg);
rocketMQTemplate.asyncSend("xiaojie-test", msg, new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
log.info("异步发送成功{}", var1);
}
@Override
public void onException(Throwable var1) {
//发送失败可以执行重试
log.info("异步发送失败{}", var1);
}
});
}
2、生产者的重试机制
mq为生产者提供了失败重试机制,同步发送和异步发送默认都是失败重试两次当然可以修改重试次数,如果多次还是失败,那么可以采取记录这条信息,然后人工采取补偿机制。
Broker保证消息不丢失
1、刷盘策略
RocketMq持久化消息有两种策略即同步刷盘和异步刷盘。默认情况下是异步刷盘,此模式下当生产者把消息发送到broker,消息存到内存之后就认为消息发送成功了,就会返回给生产者消息发送成功的结果。但是如果消息还没持久化到硬盘,服务器宕机了,那么消息就会丢失。同步刷盘是当Broker接收到消息并且持久化到硬盘之后才会返回消息发送成功的结果,这样就会保证消息不会丢失,但是同步刷盘相对于异步刷盘来说效率上有所降低,大概降低10%,具体情况根据业务需求设定吧。
修改配置文件中刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
2、集群模式
rocketmq的集群模式保证可rocketMQ高可用。利用多master多slave节点保证rocketmq的高可用。
#主从复制方式ASYNC_MASTER异步复制,SYNC_MASTER同步复制
brokerRole=SYNC_MASTER
#刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=SYNC_FLUSH
此模式是broker保证消息不丢失的配置,主从复制同步复制,刷盘模式同步刷盘,但是这种模式下性能会有所降低。集群搭建方式如下
Consumer保证消息不丢失
1、手动ack
/**
* @description: 消费端确认消息消费成功的消费者
*/
@Component
@Slf4j
public class MqConsumerAck implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg:msgs){
log.info("接收到的消息是>>>>>>>{}",new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
2、消费者消费失败重试机制
消费者消费失败会自动重试,如果消费失败没有手动ack则会自动重试15次。
RabbitMQ
Producer保证消息不丢失
1、rabbitMQ引入了事务机制和确认机制(confirm)
事务机制开启之后,相当于同步执行,必然会降低系统的性能,因此一般我们不采用这种方式。
确实机制,是当mq收到生产者发送的消息时,会返回一个ack告知生产者,收到了这条消息,如果没有收到,那就采取重试机制后者其他方式补偿。
事务模式
public static void main(String[] args) {
try {
System.out.println("生产者启动成功..");
// 1.创建连接
connection = MyConnection.getConnection();
// 2.创建通道
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "测试事务机制保证消息发送可靠性。。。。";
channel.txSelect(); //开启事务
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
//发生异常时,mq中并没有新的消息入队列
//int i=1/0;
//没有发生异常,提交事务
channel.txCommit();
System.out.println("生产者发送消息成功:" + msg);
} catch (Exception e) {
e.printStackTrace();
//发生异常则回滚事务
try {
if (channel != null) {
channel.txRollback();
}
} catch (IOException ioException) {
ioException.printStackTrace();
}
} finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
confirm模式
#开启生产者确认模式
publisher-confirm-type: correlated
# 打开消息返回,如果投递失败,会返回消息
publisher-returns: true
#publisher-confirm-type有3种取值
#NONE值是禁用发布确认模式,是默认值
#CORRELATED值是发布消息成功到交换器后会触发回调方法
#SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法
回调函数方法类
@Component
@Slf4j
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlation>>>>>>>{},ack>>>>>>>>>{},cause>>>>>>>>{}", correlationData, ack, cause);
if (ack) {
//确认收到消息
} else {
//收到消息失败,可以自定义重试机制,或者将失败的存起来,进行补偿
}
}
/*
* @param returnedMessage
* 消息是否从Exchange路由到Queue, 只有消息从Exchange路由到Queue失败才会回调这个方法
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("被退回信息是》》》》》》{}", returnedMessage.getMessage());
log.info("replyCode》》》》》》{}", returnedMessage.getReplyCode());
log.info("replyText》》》》》》{}", returnedMessage.getReplyText());
log.info("exchange》》》》》》{}", returnedMessage.getExchange());
log.info("routingKey>>>>>>>{}", returnedMessage.getRoutingKey());
}
}
2、重试机制
rabbitmq同样为生产者设置了重试机制默认是3次,同样可以修改重试次数,超过了最大重试次数限制采取人工补偿机制。
Broker保证消息不丢失
1、rabbitMq持久化机制
消息到达mq之后,mq宕机了,然后消息又没有进行持久化,这时消息就会丢失。开启mq的持久化机制,消息队列,交换机、消息都要开启持久化。
开启持久化操作请参考 RabbitMq确认机制&SpringBoot整合RabbitMQ_熟透的蜗牛的博客-CSDN博客
2、使用镜像集群
rabbitmq集群搭建请参考消息中间件介绍&RabitMQ环境搭建(Linux)_熟透的蜗牛的博客-CSDN博客
3、如果队列满了,多余的消息发送到Broker时可以使用死信队列保证消息不会被丢弃
Consumer保证消息不丢失**
1、开启消费端的手动ack
manual-手动
ack auto 自动
none 不使用ack
手动ack代码
@Component
@Slf4j
public class SnailConsumer {
@RabbitListener(queues = "snail_direct_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 获取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId);
try {
int result = 1 / 0;
System.out.println("result" + result);
// // 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//拒绝消费消息(丢失消息) 给死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
2、同样可以使用消费者的重试机制,重试超过最大次数还没成功则采取人工补偿机制。
Kafka
Producer保证消息不丢失
1、producer的ack机制
kafka的生产者确认机制有三种取值分别为0、1、-1(all)
acks = 0
如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障)。
acks = 1
这意味着leader会将记录写入其本地日志,但无需等待所有follwer服务器的完全确认即可做出回应,在这种情况下,当leader还没有将数据同步到Follwer宕机,存在丢失数据的可能性。
acks = -1代表所有的所有的分区副本备份完成,不会丢失数据这是最强有力的保证。但是这种模式往往效率相对较低。
2、producer重试机制
Broker保证消息不丢失
kafka的broker使用副本机制保证数据的可靠性。每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
Consumer保证消息不丢失
1、手动ack
/*
ack
* @手动提交ack
* containerFactory 手动提交消息ack
* errorHandler 消费端异常处理器
*/
@KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
errorHandler = "consumerAwareListenerErrorHandler"
)
public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
for (int i=0;i<record.size();i++){
System.out.println(record.get(i).value());
}
ack.acknowledge();//直接提交offset
}
2、offset commit
消费者通过offset commit
来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。kafka并不像其他消息队列,消费完消息之后,会将数据从队列中删除,而是维护了一个日志文件,通过时间和储存大小进行日志删除策略。如果offset没有提交,程序启动之后,会从上次消费的位置继续消费,有可能存在重复消费的情况。
Offset Reset 三种模式
earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest(最新的):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
引用:完整代码请看
更多推荐
所有评论(0)