本文主要介绍Java中调用RabbitMQ的三种方式。三种方式实际上对应了三种不同的抽象级别:

首先,通过Java原生代码来访问RabbitMQ。在这种方式下,需要手动创建Connection,创建Channel,然后通过Channel对象可以显式的创建Exchange、Queue和Bind等等。这种方式的好处就是使得我们能够很显式地了解到整个RabbitMQ操作的生命周期,建议新手可以通过这种方式学习RabiitMQ的入门。

spring-boot-starter-amqp对RabbitMQ的使用进行了进一步的封装,通过这种方式使用集成到spring boot中的RabbitMQ时,我们不再关心Connect和Channel的创建,spring boot会替我们创建好。我们索要做的,只是通过注解的方式创建Exchange、Queue和Bind对象,并把他们交给spring ioc进行管理,然后spring boot又会自动生成这些对象对应的交换机、队列和绑定。

Java中操作RabbitMQ的最后一种方法是通过EDA(Event Driven Achitecture,事件驱动架构)框架的spring cloud stream。spring cloud stream对RabitMQ(准确的说应该是消息队列)封装的更加彻底,我们甚至不用关心使用的消息队列是RabbitMQ还是Kafka(spring cloud stream可以配置RabbitMQ和Kafak两种消息队列,并进行无缝切换)。在使用时spring cloud stream时,只需一个标签就能自动创建RabitMQ的Connection、Chanel,甚至你都不用关心Exchange、Queue和Bind这些在spring-boot-starter-amqp中还需要手动创建的对象,他们就被创建好了。spring cloud stream的强大之处就在于它的封装,但是不足之处也在于它的封装,封装的太强,必然增加了学习成本和调试难度,而且类似RabbitMQ和Kafka这种中间件的使用,一般在系统创建之处就一定确定,进行无缝切换就显得有些鸡肋了。

下面,我们就以代码的方式演示这三种调用RabbitMQ的方式:

一、Java原生代码调用RabbitMQ

1.1 交换机和队列的创建

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.rabbitMqHost);
factory.setPort(this.rabbitMqPort);
factory.setConnectionTimeout(this.rabbitMqTimeOut);
factory.setUsername(this.rabbitMqUsername);
factory.setPassword(this.rabbitMqPassword);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
channel.queueDeclare("test-queue", true, false, false, null);
com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
channel.basicPublish("direct-exchange", "test-queue", null, msg.getBytes("UTF-8"));

上述的代码,创建了一个直连交换机、一个队列,并进行绑定,最后向交换机中发送了一个"Hello World"的字符串。

1~7行,创建了一个ConnectionFactory 对象并进行配置,配置的参数包括RabbitMQ的ip地址(host),端口号(port)、超时(connectionTimeout)等等。

第8行,通过ConnectionFactory 对象,创建了一个Connection 对象,此时已经完成了对RabbitMQ服务器的连接。如果我们通过RabitMQ Magement Web查看,可以看到这个链接。

第9行,创建用来通信的信道Channel。

第10行,声明和创建交换机。这里exchangeDeclare有五个参数。第1个参数指定了交换机的名称;第2个参数指定了交换机的类型:direct、topic或者fanout;第3个参数,指定交换机是否要持久化,如果设置为true,那么交换机的元数据要持久化到内存中;第4个参数,指定交换机在没有队列与其绑定时,是否删除,设置为false表示不删除;最后一个参数是Map<String, Object>类型,用来指定交换机其它一些结构化的参数,我在这里直接设置为null。

第11行,声明了一个名为test-queue的队列。queueDeclare有5个参数:第1个参数指定了队列的名称;第2个参数表示队列是否要持久化,但是需要注意,这里的持久化只是队列名称等这些队列元数据的持久化,不是队列中消息的持久化。第3个参数,表示队列是不是私有的,如果是私有的,只有创建它的应用程序才能从队列消费消息;第4个参数表示队列在没有消费者订阅时是否自动删除;第5个参数是队列的一些结构化信息,比如声明死信队列、磁盘队列会用到。

第12行,创建了一个bind对象,将交换机和队列进行绑定,queueBind的三个参数中:第1个参数指定了队列名称,第2个参数指定了交换机名称,第3个参数是路由键,在直连模式下为队列名称。

第13行,发送消息,在直连模式下需要指定:直连交换机名称(参数1);路由键(参数2,也就是目标队列名称);参数3类型为BasicProperties,可以为消息附带一些额外的附件,比如在使用RabbitMQ远程RPC调用模式发送消息时可以用到,这里直接设置为null。参数4就是要发送的消息转换成的二进制数组。

上面就是一个创建direct exchange和queue并发送消息的例子,如果要使用topic exchange或者fanout exchange只需要一些小小的改动即可。

比如创建topic exchange要明确指明交换机的类型为topic:

 channel.exchangeDeclare("topic-exchange", "topic", true, false, null);

绑定时指定主题为路由键:

 channel.queueBind("test-queue", "topic-exchange", "fruit");

发送消息时指定主题为路由键:

channel.basicPublish("topic-exchange", "fruit", null, msg.getBytes("UTF-8"));

再比如创建fanout exchange要明确指明交换机的类型为fanout:

channel.exchangeDeclare("fanout-exchange", "fanout", true, false, null);

绑定时指定路由键为空:

channel.queueBind("test-queue", "fanout-exchange", "");

发布消息时指定路由键为空:

channel.basicPublish("fanout-exchange", "", null, msg.getBytes("UTF-8"));

1.2 消费者订阅队列的消息

在上面的例子中,我们演示了创建direct、topic和fanout三种类型的exchange以及关联好了队列,现在,我们创建一个消费者来订阅队列里面的消息。首先实现Consumer接口:

public class TestConsumer implements Consumer {

    @Override
    public void handleConsumeOk(String s) {
        System.out.println(s);
    }

    @Override
    public void handleCancelOk(String s) {}

    @Override
    public void handleCancel(String s) throws IOException {}

    @Override
    public void handleShutdownSignal(String s, ShutdownSignalException e) {}

    @Override
    public void handleRecoverOk(String s) {}

    @Override
    public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        String str = new String(bytes);
        System.out.println("接受到的字符串是:" + str);
    }
}
              

Consumer接口的方法有6个,在这里我们只用到了2个,handleConsumeOk在消费者获取到消息后调用,而handleDelivery是在调用handleConsumeOk后调用。我们业务的主要逻辑在handleDelivery中,因为在这个方法之中,我们可以获取到消息,并进行相应的处理。实现了自己的消费者,接下来需要用该消费者订阅队列:

// 创建连接和信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.rabbitMqHost);
factory.setPort(this.rabbitMqPort);
factory.setConnectionTimeout(this.rabbitMqTimeOut);
factory.setUsername(this.rabbitMqUsername);
factory.setPassword(this.rabbitMqPassword);
factory.setVirtualHost("/");
Connection connection = null;
try {
    connection = factory.newConnection();
    Channel channel = connection.createChannel();
    Consumer consumer = new TestConsumer();
    channel.basicConsume("test-queue", true, consumer);
    while (true) {
        Thread.sleep(3600000);
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (connection != null && connection.isOpen()) {
        connection.close();
    }
}

第1~12行,和生产者创建Connection的过程一致。

第13行,创建了一个TestConsumer对象。

第14行,订阅队列test-queue中的消息。basicConsume方法有三个参数:第1个参数是指明要订阅的通道的名称;第2个参数指明是否自动ack,如果是true,这个方法结束后会自动进行ack,如果是false,需要额外手动的ack;第3个参数就是装配的消费者。

第15~17行,没有业务上的功能,只是单纯不让程序结束。

当运行了消费者以后,就可以看到,消费者消费了队列中的消息。

消费者打印消息:

队列中的消息已经清空:

以上就是通过Java原生的代码调用RabbitMQ的例子,接下来,我们学习下另外一种调用RabbitMQ的方式,通过 spring-boot-starter-amqp调用。

二、 spring-boot-starter-amqp调用RabbitMQ

2.1 生产者

首先,先要创建spring boot代码工程,并且pom文件中引入spring-boot-starter-amqp的依赖。

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建RabbitMQ的配置:

@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue", true, false, false);
    }
    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange", true, false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectQueue
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
    }
}

可以看到,在上面的代码中我们创建了connectionFactory、rabbitTemplate、TestDirectQueue、TestDirectExchange和bindingDirect,我想从bean name大家也都能猜到,这是在创建RabbitMq的连接工厂、交换机、队列和绑定等等。其中rabbitTemplate我们在上文中没有接触过,实际上这就是spring boot对RabbitMQ根据bean创建信道、交换机等基本组件的封装,利用模板方法模式,将创建过程进行了隐藏,也对消息的发布和订阅过程进行了隐藏。

完成了上述配置之后,运行程序,是不是就自动创建了这些交换机、通道等等呢?也许你会回答是,但是笔者在运行程序后,查看RabbitMQ的web,发现还是一片空白:

是我们没有创建成功?其实不然,是因为rabbitTemplate在创建这些组件时,是采用的懒加载模式,只有在发送消息之前,才会去真正创建这些交换机、通道等等。所以,接下来,我们创建一个工具类,并在单元测试中通过该工具类发送消息:

@Service
public class RbmqServiceImpl implements RbmqService {

    @Autowired
    RabbitTemplate rabbitTemplate;


    @Override
    public void sendMsg(String str) {
        Message message=new Message(str.getBytes());
        rabbitTemplate.send("TestDirectExchange","TestDirectQueue",message);
    }
}

在单元测试中测试消息的发送:

@Test
public void sendMsgTest() {
    rbmqService.sendMsg("Hello World");
}
              

此时,再去看RabbitMQ的管理Web,发现交换机和队列都创建完成,而且队列中也缓存了消息:

2.2 消费者

前面介绍了使用spring boot集成的RabbitMQ组件创建生产者的方法,下面我们介绍下,消费者的创建。

配置类RabbitConfig的定义,基本和生产者一致,这里不再赘述,我们重点介绍下使用@RabbitListener标签监听队列的方法。首先,还是创建一个工具类:

@Service
public class RbmqServiceImpl implements RbmqService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "TestDirectQueue")
    @Override
    public void consumer(String msg) {
        System.out.println("接受到的消息是:" + msg);
    }
}

可以看到,通过RabbitListener标签,我们直接实现了订阅TestDirectQueue队列。此时运行程序,会接受并打印我们用生产者发送的消息:

@RabbitListener标签除了指定监听的队列之外,还可以创建交换机和队列,并进行绑定,然后再开启监听:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "TestTopicQueue", durable = "true"),
        exchange = @Exchange(value = "orderTopicExchange", type = ExchangeTypes.TOPIC),
        key = "TestTopicRouting")
)

以上就是使用spring-boot-starter-amqp集成RabbitMQ的方法,最后,我们学习下使用spring cloud stream来操作RabbitMQ。

三、spring cloud stream调用RabbitMQ

3.1 生产者

新建一个spring-boot工程,pom文件中引入以下依赖:

<dependencyManagement>
    <dependencies>
        <!-- spring-cloud-dependencies start-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Finchley.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!-- spring-cloud-dependencies end-->
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

同样在配置文件application.yml中进行RabbitMQ链接信息的配置:

server:
  port: 8022
spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    #virtual-host: JCcccHost

使用spring cloud stream方式调用RabbitMQ,首先要创建绑定接口:

 public interface OutputMessageBinding {
    /** Topic 名称*/
    String OUTPUT = "message-center";

    @Output(OUTPUT)
    MessageChannel output();
}

这里需要说明一下:

第5行通过Output标签指定了消息的输出exchange,这里会创建一个名称为message-center-out的exchange,而且类型为topic。通过Spring cloud stream创建的exchange默认的类型都是topic。

接下来,我们需要再次创建一个工具类:

@Service
@EnableBinding(OutputMessageBinding.class)
public class RbmqServiceImpl implements RbmqService {

    @Resource
    private OutputMessageBinding outputMessageBinding;

    @Override
    public void sendMsg(String msg) {
        outputMessageBinding.output().send(MessageBuilder.withPayload(msg).build());
    }
}

第2行中的EnableBinding就是为了激活绑定类OutputMessageBinding。OutputMessageBinding被激活之后会产生一个名称为outputMessageBinding的bean托管到IOC中,然后在第6行获取到了这个bean。在第10行中,获取outputMessageBinding的output对象进行消息的发送。

在单元测试中对发送信息的接口进行调用:

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class RbmqTest {

    @Autowired
    RbmqService rbmqService;

    @Test
    public void sendMsgTest() {
        rbmqService.sendMsg("Hello World");
    }
}

然后在web上可以看到新建的topic类型的exchange:

3.1 消费者

消费者的创建和使用过程其实和生产者比较类似,首先也是需要创建一个绑定接口:

public interface InputMessageBinding {

    String INPUT = "message-center";

    @Input(INPUT)
    SubscribableChannel input();
}

在消费者的绑定接口中,使用@Input标签用来表明该对象为消费者对象。接下来,同样也需要创建一个工具类并注入到IOC容器中:

@Service
@EnableBinding({InputMessageBinding.class})
public class RbmqServiceImpl implements RbmqService {

    @StreamListener(InputMessageBinding.INPUT)
    @Override
    public void consume(String msg) {
        System.out.println("接受到的消息是:" + msg);
    }
}

可以看到,在第8行,我们使用了StreamListener标签监听了创建的InputMessageBinding的INPUT字段,StreamListener会在内部进行处理,实际上监听的是名名为message-center+随机字符串的队列,而队列和message-center也自动进行了绑定:

看下上面红色圈出部分的队列和交换机的绑定,你会发现,绑定的路由键为'#',表示路由匹配任意规则,也就是说从名为message-center的exchange发出的消息都会路由到该队列上。

至此,Java中三种操作RabbitMQ的方式都已经介绍完毕。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐