常见的消息中间间有很多,比如ActiveMQ,RabbitMQ,Kafka等,这篇博客主要写一下用Redis实现MQ的功能。

        MQ实现的功能简单说就是将生产出来的消息来消费,具体的消费模式有点对点消费,发布\订阅模式的消费,Redis中也实现了点对点,发布\订阅这种方式,具体代码展示。

        点对点消费者:

public class RedisQueueConsumer {

    @Test
    public  void consumer() {

        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        while (true)
        {
            //使用redis的rpop来获取对应的key的value
            String value = jedis.rpop("key");
            while(value != null)
            {
                System.out.println(value);
                break;
            }

        }

    }
}

        点对点生产者:

public class RedisQueueTest {

    @org.junit.Test
    public  void  producer()
    {
        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        // 生产消息
        for (int i = 1;i <= 10;i++)
        {
            //主要使用redis中的lpush
            jedis.lpush("key", "这是第" + i + "个消息");
        }

        jedis.close();
    }

}

        先启动消费者,然后生产者生产消息,结果图:

        发布\订阅模式:

        先编写监听代码,继承JedisPubSub,然后重写里面的onMessage方法:


public class Subscriber  extends JedisPubSub {
    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(message);
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        super.onSubscribe(channel, subscribedChannels);
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        super.onUnsubscribe(channel, subscribedChannels);
    }
}

        消费者:

public class RedisTopicConsumer {
    @Test
    public  void consumer() {

        Subscriber subscriber = new Subscriber();

        Jedis jedis = new Jedis("xxx.xxx.xxx.xx",6379);
        while (true)
        {
            //监听消息
            jedis.subscribe(subscriber,"key");
        }
    }
}

        生产者:

public class RedisTopicTest {
    @org.junit.Test
    public  void  producer()
    {
        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        // 生产消息
        for (int i = 1;i <= 10;i++)
        {
            //取消息
            jedis.publish("key", "这是第" + i + "个消息");
        }

        jedis.close();
    }
}

       启动消费者,打开生产者生产消息,消息被订阅消费:

        整个过程,实际上就是利用Redis的功能,生产消息使用lpush入队,取消息就是rpop出队;生产消息publish发布消息到指定的频道,subscribe来订阅具体的消息。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐