一. 常用消息队列工具

  目前常用的消息队列大概有三种类型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它们的使用场景分别是: 
  1.RabbitMQ: 相对重量级高并发的情况,比如数据的异步处理 任务的串行执行等. 
  2.Kafka: 基于Pull的模式来处理,具体很高的吞吐量,一般用来进行 日志的存储和收集. 
  3.Redis: 轻量级高并发,实时性要求高的情况,比如缓存,秒杀,及时的数据分析

 

.二. SpringBoot基于Redis集成消息队

1.消息发布:使用redisTemplate.convertAndSend


@RestController
@RequestMapping
public class PublisherController {

    @Autowired
	private RedisTemplate<Object, Object> redisTemplate;

    @GetMapping(value = "pub/{msg}")
    public String pubMsg(@PathVariable String msg){
        redisTemplate.convertAndSend("demochannel",id);
        return msg;
    }
}

 

2.消息订阅

package com.cxm.consumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;


@Configuration
public class RedisSubConfig {

	/**
	 * 创建连接工厂
	 *
	 * @param connectionFactory
	 * @param adapter
	 * @return
	 */
	@Bean
	public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
			MessageListenerAdapter adapter) {
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		//监听对应的channel
		container.addMessageListener(adapter, new PatternTopic("demochannel"));
		return container;
	}

	/**
	 * 	绑定消息监听者和接收监听的方法
	 * @param message
	 * @return
	 */
	@Bean
	public MessageListenerAdapter adapter(RedisMessageListener message) {
		// onMessage 如果RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称一样
		return new MessageListenerAdapter(message, "onMessage");
	}

}

MessageListener

@Component
public class RedisMessageListener implements MessageListener {

	@Autowired
	private RedisTemplate<Object, Object> redisTemplate;
	
	@Override
	public void onMessage(Message message, byte[] pattern) {
		RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
		String msg = serializer.deserialize(message.getBody());
		System.out.println("接收到的消息是:" + msg);
	}

}

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐