MQ的学习(RabbitMQ---消息模型)
一、什么是 MQ消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前主流的 MQ :Kafka、RabbitMQ、RocketMQ、ActiveMQ选型根据需求(数据量大的选RokectMQ(基于JMS)、Kafka,轻微使用Rabbi
一、什么是 MQ
消息队列(
Message Queue
,简称MQ
)技术是应用间交换信息的一种技术。消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前主流的MQ
:Kafka、RabbitMQ、RocketMQ、ActiveMQ
选型根据需求(数据量大的选RokectMQ
(基于JMS)、Kafka
,轻微使用RabbitMQ
(基于AMQP)就可以了)
二、RabbitMQ消息模型
- 克隆下来,找到
- 使用mvn工程运行,但是里面是空的,所以复制一份java的代码进来
- 运行教程前先启动
rabbitMQ
服务,未安装参考我之前的RabbitMQ安装成功记录
2.1简单点对点模式
生产者将消息发送到“hello”队列。消费者负责监听该队列接收消息,如果队列中有消息,就消费掉。
应用场景:聊天
注意:生产者、消费者和代理不必驻留在同一主机上;事实上,在大多数应用程序中它们都没有。应用程序也可以既是生产者又是消费者。
- 实例运行
- 发送者
- 接收者
2.2工作队列(又名:任务队列)模式
生产者与消费者一对多,消费者们同时监听同一个队列,互相争抢,谁先拿到谁负责消费消息(需要加锁保证一条消息只能被一个消费者使用)
应用场景:红包;资源任务调度
- 实例运行
- 新建两次任务(一人完成一个任务)
2.3publish/subscribe发布订阅(广播)模式
生产者与消费者一对多,
X
代表交换机,消息产生者将消息放入交换机,交换机发布订阅把消息发送到消息队列中(具体怎么发送取决于Exchange
的类型。),对应消息队列的消费者拿到消息进行消费。
应用场景:邮件群发,群聊天,广播
Exchange(交换机)
:只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange
绑定,或者没有符合路由规则的队列,那么消息会丢失!
-
官方实例
-
可以看到发布与接收,各定义一个队列
/**
* 单独发布消息
* @throws Exception
*/
static void publishMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
/**
* 批量发布消息
* @throws Exception
*/
static void publishMessagesInBatch() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
/**
* 异步处理发布确认
* @throws Exception
*/
static void handlePublishConfirmsAsynchronously() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
//队列声明
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
//清洁未完成确认
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
//确认监听器
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
//处理未完成的
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
ch.basicPublish("", queue, null, body.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
}
long end = System.nanoTime();
System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
- 实例运行
这里可以将这个图反向,并在P和X中间加一个队列就是了
剩下的就笼统为
路由模式
(通过交换机来实现)
广播发送Fanout
(交换机与批量
队列字符串相等)
定向路由Direct
(交换机与队列字符串相等)
通配符Topic
(交换机与队列通配符
匹配)
Topic
实例
未匹配,不接收
匹配接收(
routingKey
为o.1,bindingKey
为o.*)
ReceiveLogsTopic
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
String[] arr = {"o.*"};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (arr.length < 1) {
System.err.println("Usage: Exchange.ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : arr) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
EmitLogTopic
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
String[] arr = {"o.1"};
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(arr);
String message = getMessage(arr);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length < startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
消息模型理解先到这,
\(^o^)/~
代码放置在gitee
,仓库地址:https://gitee.com/lwstudy/study
更多推荐
所有评论(0)