前言

关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。

市面上的消息队列产品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年底阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我自己的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此之前先看下消息队列的相关概念。

1.什么是消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在

2.为何用消息队列

从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。

RabbitMQ特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  2. 灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
  3. 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  4. 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  5. 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  6. 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  7. 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  8. 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  9. 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 中的概念模型

1.消息模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

2.RabbitMQ 基本概念

上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:

  1. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  3. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  4. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  5. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  6. Connection
    网络连接,比如一个TCP连接。

  7. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  8. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  9. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  10. Broker
    表示消息队列服务器实体。

3.AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

4.Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

1.direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

2.fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

3.topic

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

RabbitMQ 安装

1.mac安装

brew update
brew install rabbitmq

这样 RabbitMQ 就安装好了,安装过程中会自动其所依赖的 Erlang 。

2.centos7安装

安装erlang
#1.安装依赖环境
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC-devel
#2.下载Erlang
wget http://erlang.org/download/otp_src_21.3.tar.gz
#3解压
tar -zxvf otp_src_21.3.tar.gz
#4.进入解压后的Erlang目录
cd otp_src_21.3
#5.构建
./otp_build autoconf
注意:如果出现 ./otp_build: line 319: autoconf: command not found ,需要
yum install -y autoconf
#6.配置安装
./configure
make
make install
#7.配置环境变量
vim /etc/profile
末尾添加
export ERLANG_HOME=/usr/local/lib/erlang
export PATH=$PATH:$ERLANG_HOME/bin
#8.重新加载环境变量
source /etc/profile
#9.查看信息
erl

安装erlang 所遇到的问题及解决办法

  1. no java compile found

    yum install unixODBC.x86_64 unixODBC-devel.x86_64
    
  2. odbc: ODBC library - link check failed

    yum install unixODBC.x86_64 unixODBC-devel.x86_64
    
  3. wx: Can not link the wx driver, wx will NOT be useable

    这条警告可以忽略

  4. documentation : fop is missing. Using fakefop to generate placeholder PDF files.

    $ yum install fop.noarch
    
  5. wx not found

    需要erlang依赖wxWidgets

    #安装wxWidgets的依赖gtk,>2.0版本或者>3.0版本都可以
    $ yum install gtk2-devel.x86_64
    #或者
    $ yum install gtk3-devel.x86_64
    

注意:centos 7安装最新版的rabbitmq-server-3.8.14-1.el7.noarch.rpm需要erlang >= 21.3

安装RabbitMQ

1.下载rabbitmq

将rabbitmq-server-generic-unix-3.8.6.tar.xz到以下目录

cd /usr/local 

2.解压

tar -xvf rabbitmq-server-generic-unix-3.8.6.tar.xz

3.配置

#重命名
mv rabbitmq_server-3.8.6/ rabbitmq
vi /etc/profile
#在文件后添加
#set rabbitmq environment
export PATH=$PATH:/usr/local/rabbitmq/sbin
#使文件生效
source /etc/profile
#查看配置
echo $PATH

4.启动服务

#启动rabbitmq,-detached代表后台守护进程方式启动。
rabbitmq-server -detached 

注意:启动后可能提示Warning: PID file not written; -detached was passed.但是通过浏览器是可以访问的

如果启动失败,查看是否端口:5672被占用:

netstat -lnp|grep 5672 #检查端口被哪个进程占用
ps 762 #查看进程的详细信息,加入进程号是762
kill -9 762 #杀掉编号为762的进程(请根据实际情况输入)

5.查看状态

rabbitmqctl status

6.配置网页插件

  • 创建目录

    mkdir /etc/rabbitmq
    
  • 在目录下启用插件

    rabbitmq-plugins enable rabbitmq_management
    

7.开放防火墙端口

//永久的添加该端口。去掉--permanent则表示临时。
firewall-cmd --permanent --zone=public --add-port=5672/tcp
firewall-cmd --permanent --zone=public --add-port=15672/tcp
//重新加载配置,使得修改有效。
firewall-cmd --reload 
//查看开启的端口,出现5672/15672这开启正确
firewall-cmd --permanent --zone=public --list-ports 

8.配置账户

默认网页是不允许访问的,需要增加一个用户修改一下权限,代码如下:

rabbitmqctl add_user username password //添加用户,后面两个参数分别是用户名和密码
rabbitmqctl set_permissions -p / username ".*" ".*" ".*" //添加权限
rabbitmqctl set_user_tags username administrator //修改用户角色

#eg:
[root@localhost sbin]#rabbitmqctl add_user admin admin
[root@localhost sbin]#rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
[root@localhost sbin]#rabbitmqctl set_user_tags admin administrator 

9.访问

浏览器输入:http://ip:15672 端口默认为15672

出现RabbitMQ登陆页面表示访问成功。

10.相关命令

  1. 启动服务:rabbitmq-server -detached
  2. 查看状态:rabbitmqctl status
  3. 关闭服务:rabbitmqctl stop
  4. 列出角色:rabbitmqctl list_users
开机自动重启设置

1.在/etc/init.d 目录下新建一个 rabbitmq文件

vi rabbitmq 文件内容如下

#!/bin/bash
#
# chkconfig: 2345 80 05
# description: rabbitmq 
# processname: rabbitmq
 
#RabbitMQ安装目录
RABBITMQ_HOME=/usr/local/rabbitmq
export RABBITMQ_HOME
 
case "$1" in
    start)
    echo "Starting RabbitMQ ..."
	cd $RABBIT_HOME/sbin
	rabbitmq-server
    ;;
stop)
    echo "Stopping RabbitMQ ..."
	cd $RABBIT_HOME/sbin
    rabbitmqctl stop
    ;;
status)
    echo "Status RabbitMQ ..."
	cd $RABBIT_HOME/sbin
    rabbitmqctl status
    ;;
restart)
    echo "Restarting RabbitMQ ..."
	cd $RABBIT_HOME/sbin
    rabbitmq-server
    ;;
 
*)
    echo "Usage: $prog {start|stop|status|restart}"
    ;;
esac
exit 0
 

2.对rabbitmq授予可执行权限

[root@localhost init.d]# chmod 777 rabbitmq

3.添加rabbitmq服务到系统服务中

[root@localhost init.d]# chkconfig --add rabbitmq

4.设置自启动

[root@localhost init.d]# chkconfig rabbitmq on

5.查看自启动项是否设置成功

[root@localhost init.d]# chkconfig --list rabbitmq

6.开启rabbit服务

rabbitmq-server -detached 

7.测试开机重启

[root@localhost init.d]#reboot

[root@localhost ~]# ps -elf|grep rabbitmq

Erlang和RabbitMQ卸载

1.卸载erlang

yum list | grep erlang
yum -y remove erlang-*
rm -rf /usr/lib64/erlang

2.卸载rabbitmq

yum list | grep rabbitmq
yum -y remove rabbitmq-server.noarch
find / -name rabbit*
rm -rf 依次删除find出的文件

php使用RabiitMQ

1.安装rabbitmq-c

1.1下载amqp安装包https://github.com/alanxz/rabbitmq-c我选择的是最新版本rabbitmq-c-0.10.0.tar.gz

tar -zxvf rabbitmq-c-0.10.0.tar.gz   //解压
cd rabbitmq-c-0.10.0 
cmake -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c
//如果没有安装cmke  yum -y install cmake进行安装
make
make install
//注意:如果不进行下面这一步,安装amqp时可能会出现问题。
cp -r /home/vagrant/rabbitmq-c-0.10.0/lrabbitmq /usr/local/rabbitmq-c/lib

2.php扩展amqp的安装

下载地址https://pecl.php.net/package/amqp 我安装的是amqp-1.10.2

tar zxf amqp-1.10.2.tgz
cd amqp-1.10.2
#注意:phpize php-config 或者 find / -name php-config 查看正确php-config 位置。librabbitmq-dir 填写rabbit-c正确安装位置
./configure --with-php-config=/usr/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c
Make
make install
#Installing shared extensions:     /usr/lib64/php/modules/ 看到这个提示说明安装完成
vim /etc/php.ini
#在vim /etc/php.ini 下添加
extension=amqp.so
#重新启动php-fpm
systemctl restart php-fpm
#查看安装扩展
php-m

安装时出现的问题

  1. ./configure: 没有那个文件或目录

    改成cmke安装了,需要安装cmake

yum -y install cmake
cmake -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c
make
make install
  1. make安装amqp时出现 /usr/bin/ld: cannot find -lrabbitmq

经过检查发现安装在/usr/local/rabbitmq-c目录中缺少lib文件

则需要

cp -r /home/vagrant/rabbitmq-c-0.10.0/lrabbitmq /usr/local/rabbitmq-c/lib

3.基本代码详解

  • 生产者

    producter.php

    <?php
    
    //定义交换机
    define('EXEHANGE_NAME','RMQ_EN');
    
    //定义路由
    define('ROUTE_KEY_NAME','RMQ_RKN');
    
    //定义队列
    define('QUEUE_NAME','RMQ_QN');
    
    try {
        $arr = [
            'host'     => '127.0.0.1',
            'port'     => 5672,
            'user'     => 'guest',
            'password' => 'guest',
            'vhost'    => '/',
        ];
        //构造函数
        $con = new AMQPConnection($arr);
        if(!$con->connect()) {
            var_dump('连接失败1');
        }
        //先通道声明--传入连接的套接字--构造函数 通过通道连接创建消息通道
        $channel = new AMQPChannel($con);
    
        //交换机声明--传入声明的通道-- 构造函数 通过通道连接交换机
        $exchange = new AMQPExchange($channel);
    
        //设置交换机名
        $exchange->setName(EXEHANGE_NAME);
    
        $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
        //发送消息 参数一:要发送的消息内容,参数二:路由
    
        for ($i = 0; $i < 20; $i++) {
            $message = date('Y-m-d H:i:s',time()).'--'.mt_rand(100,1000)*$i.'---消息'.$i;
            $exchange->publish($message,ROUTE_KEY_NAME);
        }
    } catch (Exception $e) {
        var_dump('连接失败');
    }
    
    
    ?>
    
  • 消费者

    customer.php

    <?php
    //定义交换机
    define('EXEHANGE_NAME','RMQ_EN');
    
    //定义路由
    define('ROUTE_KEY_NAME','RMQ_RKN');
    
    //定义队列
    define('QUEUE_NAME','RMQ_QN');
    
    try {
        $arr = [
            'host'     => '127.0.0.1',
            'port'     => 5672,
            'user'     => 'guest',
            'password' => 'guest',
            'vhost'    => '/',
        ];
        //构造函数   AMQPConnection
        $con = new AMQPConnection($arr);
        if(!$con->connect()) {
            var_dump('连接失败1');
        }
    
        //先通道声明--传入连接的套接字--构造函数 通过通道连接创建消息通道
        $channel = new AMQPChannel($con);
    
    //交换机声明--传入声明的通道-- 构造函数 通过通道连接交换机
        $exchange = new AMQPExchange($channel);
    
    //设置交换机名
        $exchange->setName(EXEHANGE_NAME);//设置通道名称
    
    //设置连接方式--直连 [直连,主题,广播]
        $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
    //消息持久化
        $exchange->setFlags(AMQP_DURABLE);
    
    //声明
        $exchange->declareExchange();
    
    //声明队列,绑定交换机和路由
        $queue = new AMQPQueue($channel);
    
    //设置队列名字
        $queue->setName(QUEUE_NAME);
    
    //消息持久化
        $queue->setFlags(AMQP_DURABLE);
    
    //声明
        $queue->declareQueue();
    
    //绑定获取数据 参数一:交换机名  参数二:路由
        $queue->bind(EXEHANGE_NAME,ROUTE_KEY_NAME);
    
    //消费,没有数据时,阻塞监听获取数据
        $queue->consume(function($event,$queue){
            $body = $event->getBody();
            var_dump($body);
    
            $queue->ack($event->getDeliveryTag());
        });
    
    } catch (Exception $e) {
        var_dump('连接失败2');
    }
    ?>
    
    
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐