一、什么是 MQ

消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。

消息队列主要解决应用耦合异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前主流的 MQ Kafka、RabbitMQ、RocketMQ、ActiveMQ
选型根据需求(数据量大的选RokectMQ(基于JMS)、Kafka,轻微使用RabbitMQ(基于AMQP)就可以了)
在这里插入图片描述

二、RabbitMQ消息模型

rabbitmq-tutorials官方教程

  • 克隆下来,找到在这里插入图片描述
  • 使用mvn工程运行,但是里面是空的,所以复制一份java的代码进来
    在这里插入图片描述
  • 运行教程前先启动rabbitMQ服务,未安装参考我之前的RabbitMQ安装成功记录

2.1简单点对点模式

生产者将消息发送到“hello”队列。消费者负责监听该队列接收消息,如果队列中有消息,就消费掉。
应用场景:聊天
在这里插入图片描述
注意:生产者、消费者和代理不必驻留在同一主机上;事实上,在大多数应用程序中它们都没有。应用程序也可以既是生产者又是消费者。

  • 实例运行
    在这里插入图片描述
  • 发送者
    在这里插入图片描述
  • 接收者
    在这里插入图片描述

2.2工作队列(又名:任务队列)模式

生产者与消费者一对多,消费者们同时监听同一个队列,互相争抢,谁先拿到谁负责消费消息(需要加锁保证一条消息只能被一个消费者使用)
应用场景:红包;资源任务调度
在这里插入图片描述

  • 实例运行
    在这里插入图片描述
  • 新建两次任务(一人完成一个任务)
    在这里插入图片描述
    在这里插入图片描述

2.3publish/subscribe发布订阅(广播)模式

生产者与消费者一对多,X代表交换机,消息产生者将消息放入交换机,交换机发布订阅把消息发送到消息队列中(具体怎么发送取决于Exchange的类型。),对应消息队列的消费者拿到消息进行消费。
应用场景:邮件群发,群聊天,广播
在这里插入图片描述
Exchange(交换机):只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

  • 官方实例
    在这里插入图片描述
    在这里插入图片描述

  • 可以看到发布与接收,各定义一个队列

/**
     * 单独发布消息
     * @throws Exception
     */
    static void publishMessagesIndividually() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);

            ch.confirmSelect();
            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }
/**
     * 批量发布消息
     * @throws Exception
     */
    static void publishMessagesInBatch() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);

            ch.confirmSelect();

            int batchSize = 100;
            int outstandingMessageCount = 0;

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                outstandingMessageCount++;

                if (outstandingMessageCount == batchSize) {
                    ch.waitForConfirmsOrDie(5_000);
                    outstandingMessageCount = 0;
                }
            }

            if (outstandingMessageCount > 0) {
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }

    /**
     * 异步处理发布确认
     * @throws Exception
     */
    static void handlePublishConfirmsAsynchronously() throws Exception {
        try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            //队列声明
            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);

            ch.confirmSelect();

            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

            //清洁未完成确认
            ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                            sequenceNumber, true
                    );
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(sequenceNumber);
                }
            };

            //确认监听器
            ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
                String body = outstandingConfirms.get(sequenceNumber);
                System.err.format(
                        "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                        body, sequenceNumber, multiple
                );
                //处理未完成的
                cleanOutstandingConfirms.handle(sequenceNumber, multiple);
            });

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
                ch.basicPublish("", queue, null, body.getBytes());
            }

            if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
                throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
            }

            long end = System.nanoTime();
            System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }
    }
  • 实例运行
    在这里插入图片描述
    在这里插入图片描述

这里可以将这个图反向,并在P和X中间加一个队列就是了
在这里插入图片描述

剩下的就笼统为路由模式(通过交换机来实现)
广播发送 Fanout (交换机与批量队列字符串相等)
定向路由 Direct(交换机与队列字符串相等)
通配符 Topic(交换机与队列通配符匹配)

  • Topic实例
    在这里插入图片描述
    在这里插入图片描述

未匹配,不接收

在这里插入图片描述
在这里插入图片描述

匹配接收(routingKey为o.1,bindingKey为o.*)

  • ReceiveLogsTopic
private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        String[] arr = {"o.*"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (arr.length < 1) {
            System.err.println("Usage: Exchange.ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : arr) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
  • EmitLogTopic
private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        String[] arr = {"o.1"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = getRouting(arr);
            String message = getMessage(arr);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

消息模型理解先到这,\(^o^)/~

代码放置在gitee,仓库地址:https://gitee.com/lwstudy/study

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐