一、集群消费

类似于Kafka模式的普通消费者。一个消费者群组的一个消费者,消费一个主题中的一个Queue。这种模式的消费进度(Consumer Offset)存储会持久化到Broker。

public class BalanceComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark1");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("39.100.116.73:9876");
        // 订阅Topic
        consumer.setMaxReconsumeTimes(1);
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

二、广播模式

这种模式会打破集群模式的规则。集群模式中,消费者群组中每个消费者只能消费一个Queue。而广播模式中,消费者群组中的消费者会消费每个Queue上的消息。当然这种模式下也就无法保证消息的顺序消费了。

public class BroadcastComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("B-test");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("39.100.116.73:9876");//106.55.246.66
        // 订阅Topic
        consumer.subscribe("TopicTest", "*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                             ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

三、顺序消费

顺序消费时,消费者方法要使用MessageListenerOrderly作为监听事件的参数。

public class BalanceComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mark1");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("39.100.116.73:9876");
        // 订阅Topic
        consumer.setMaxReconsumeTimes(1);
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

需要注意的是,consume 消费消息失败时, 不能返回 reconsume——later,这样会导致乱序 应该返回 suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

四、消费者重要属性设置

consumerGroup:消费者群组
MessageModel:消费模式:集群模式、广播模式
ConsumeFromWhere:指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费。ConsumeFromTimestamp 模式下只会在订阅组(消费者群组)第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了 新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和 ConsumeFromLastOffset 类似

consumeThreadMin:消费者最小线程数量

consumeThreadMax:消费者最大线程数量

pullInterval:推模式下任务间隔时间

pullBatchSize:推模式下任务拉取的条数,默认 32 条

maxReconsumeTimes:消息重试次数,-1 代表 16 次

consumeTimeout:消息消费超时时间

五、消息确认ACK

业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ 才会认为这批消息(默认是 1 条) 是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 就会认为这批消息消费失败了。 如果业务的回调没有处理好而抛出异常,会认为是消费失败 ConsumeConcurrentlyStatus.RECONSUME_LATER 处理。 为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费组的 RETRY topic),在延迟的某 个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投 递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。 另外如果使用顺序消费的回调 MessageListenerOrderly 时,由于顺序消费是要前者消费成功才能继续消费,所以没有 RECONSUME_LATER 的这个状态, 只有 SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

六、批量消费

批量消费代码和集群消费代码是一样的。一次性消费多少条数据,可以通过消费者属性来进行设置。

七、过滤消息

  • TAG过滤
    在RocketMQ生产者中,发送消息时,可以指定且只能指定一个TAG标签。在消费者端,订阅某个主题后,就可以一个或某几个TAG来筛选只收取带有这些TAG标签的数据。这样就做到了在某一主题下,再次过滤筛选消息的功能。
    在这里插入图片描述
    这里定义要过滤的标签即可。全部接受则写"*"
  • Sql过滤
    消费端除了使用TAG过滤外,还可以通过写SQL的形式,对某一主题下的数据做高级过滤。这就类似于Kafka或Datahub中流处理了。只不过这里直接定义在了消费端API中,而Kafka和datahub有单独的API。所以说,单纯用于MQ的话,RocketMQ比Kafka要更合适一些。而Kafka涉及到的生态圈比较多,如ES会和Kafka结合使用等,所以在实际生产中,很多情况都用了Kafka作为了MQ,也作为大数据流处理的工具。

只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
基本语法:
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;

常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

需要注意的是,消费者端想要用sql进行数据过滤,生产者发消息时必须使用putUserProperty 来设置消息的属性。
在这里插入图片描述

消费端就可以通过sql进行高级过滤了:
在这里插入图片描述
要使用sql查询,需要开启Sql92 功能。需要修改 Broker.conf 配置文件。 加入 enablePropertyFilter=true 然后重启 Broker 服务

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐