使用Redis实现MQ
整个过程,实际上就是利用Redis的功能,生产消息使用lpush入队,取消息就是rpop出队;生产消息publish发布消息到指定的频道,subscribe来订阅具体的消息。MQ实现的功能简单说就是将生产出来的消息来消费,具体的消费模式有点对点消费,发布\订阅模式的消费,Redis中也实现了点对点,发布\订阅这种方式,具体代码展示。常见的消息中间间有很多,比如ActiveMQ,RabbitMQ,K
·
常见的消息中间间有很多,比如ActiveMQ,RabbitMQ,Kafka等,这篇博客主要写一下用Redis实现MQ的功能。
MQ实现的功能简单说就是将生产出来的消息来消费,具体的消费模式有点对点消费,发布\订阅模式的消费,Redis中也实现了点对点,发布\订阅这种方式,具体代码展示。
点对点消费者:
public class RedisQueueConsumer {
@Test
public void consumer() {
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
while (true)
{
//使用redis的rpop来获取对应的key的value
String value = jedis.rpop("key");
while(value != null)
{
System.out.println(value);
break;
}
}
}
}
点对点生产者:
public class RedisQueueTest {
@org.junit.Test
public void producer()
{
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
// 生产消息
for (int i = 1;i <= 10;i++)
{
//主要使用redis中的lpush
jedis.lpush("key", "这是第" + i + "个消息");
}
jedis.close();
}
}
先启动消费者,然后生产者生产消息,结果图:
发布\订阅模式:
先编写监听代码,继承JedisPubSub,然后重写里面的onMessage方法:
public class Subscriber extends JedisPubSub {
public Subscriber(){}
@Override
public void onMessage(String channel, String message) {
System.out.println(message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
super.onSubscribe(channel, subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
super.onUnsubscribe(channel, subscribedChannels);
}
}
消费者:
public class RedisTopicConsumer {
@Test
public void consumer() {
Subscriber subscriber = new Subscriber();
Jedis jedis = new Jedis("xxx.xxx.xxx.xx",6379);
while (true)
{
//监听消息
jedis.subscribe(subscriber,"key");
}
}
}
生产者:
public class RedisTopicTest {
@org.junit.Test
public void producer()
{
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
// 生产消息
for (int i = 1;i <= 10;i++)
{
//取消息
jedis.publish("key", "这是第" + i + "个消息");
}
jedis.close();
}
}
启动消费者,打开生产者生产消息,消息被订阅消费:
整个过程,实际上就是利用Redis的功能,生产消息使用lpush入队,取消息就是rpop出队;生产消息publish发布消息到指定的频道,subscribe来订阅具体的消息。
更多推荐
已为社区贡献1条内容
所有评论(0)