RocketMQ(四)—消费者客户端详解
一、集群消费类似于Kafka模式的普通消费者。一个消费者群组的一个消费者,消费一个主题中的一个Queue。这种模式的消费进度(Consumer Offset)存储会持久化到Broker。public class BalanceComuser {public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名Defau
一、集群消费
类似于Kafka模式的普通消费者。一个消费者群组的一个消费者,消费一个主题中的一个Queue。这种模式的消费进度(Consumer Offset)存储会持久化到Broker。
public class BalanceComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("39.100.116.73:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
二、广播模式
这种模式会打破集群模式的规则。集群模式中,消费者群组中每个消费者只能消费一个Queue。而广播模式中,消费者群组中的消费者会消费每个Queue上的消息。当然这种模式下也就无法保证消息的顺序消费了。
public class BroadcastComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("B-test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("39.100.116.73:9876");//106.55.246.66
// 订阅Topic
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
三、顺序消费
顺序消费时,消费者方法要使用MessageListenerOrderly作为监听事件的参数。
public class BalanceComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark1");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("39.100.116.73:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
需要注意的是,consume 消费消息失败时, 不能返回 reconsume——later,这样会导致乱序 应该返回 suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
四、消费者重要属性设置
consumerGroup:消费者群组
MessageModel:消费模式:集群模式、广播模式
ConsumeFromWhere:指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费。ConsumeFromTimestamp 模式下只会在订阅组(消费者群组)第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了 新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和 ConsumeFromLastOffset 类似
consumeThreadMin:消费者最小线程数量
consumeThreadMax:消费者最大线程数量
pullInterval:推模式下任务间隔时间
pullBatchSize:推模式下任务拉取的条数,默认 32 条
maxReconsumeTimes:消息重试次数,-1 代表 16 次
consumeTimeout:消息消费超时时间
五、消息确认ACK
业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条) 是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。 如果业务的回调没有处理好而抛出异常,会认为是消费失败 ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。 为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某 个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投 递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。 另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态, 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
六、批量消费
批量消费代码和集群消费代码是一样的。一次性消费多少条数据,可以通过消费者属性来进行设置。
七、过滤消息
- TAG过滤
在RocketMQ生产者中,发送消息时,可以指定且只能指定一个TAG标签。在消费者端,订阅某个主题后,就可以一个或某几个TAG来筛选只收取带有这些TAG标签的数据。这样就做到了在某一主题下,再次过滤筛选消息的功能。
这里定义要过滤的标签即可。全部接受则写"*" - Sql过滤
消费端除了使用TAG过滤外,还可以通过写SQL的形式,对某一主题下的数据做高级过滤。这就类似于Kafka或Datahub中流处理了。只不过这里直接定义在了消费端API中,而Kafka和datahub有单独的API。所以说,单纯用于MQ的话,RocketMQ比Kafka要更合适一些。而Kafka涉及到的生态圈比较多,如ES会和Kafka结合使用等,所以在实际生产中,很多情况都用了Kafka作为了MQ,也作为大数据流处理的工具。
只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
基本语法:
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
需要注意的是,消费者端想要用sql进行数据过滤,生产者发消息时必须使用putUserProperty 来设置消息的属性。
消费端就可以通过sql进行高级过滤了:
要使用sql查询,需要开启Sql92 功能。需要修改 Broker.conf 配置文件。 加入 enablePropertyFilter=true 然后重启 Broker 服务
更多推荐
所有评论(0)