1 Star 8 Fork 2

咔呲/springboot集成redis实现发布订阅者模式

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
Loading...
README

spring-boot集成redis实现消息发布订阅模式,以及多个订阅模式的实现

1.需要的依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.配置redis以及连接池

# redis
spring:
  redis:
    host: 127.0.0.1
    port: 6379
#    password:
    database: 1
    timeout: 5000
    jedis:
      pool:
        max-active: 8
        max-wait: 1
        max-idle: 500
        min-idle: 0

3.创建消息的发布者和消息处理者类

消息发布者

@EnableScheduling//开启定时器功能
@Component
public class MessageSender {
  @Autowired
  private StringRedisTemplate stringRedisTemplate;

  /**
   * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat频道发布消息
   */
  @Scheduled(fixedDelay = 2000)
  public void sendMessage() {
    stringRedisTemplate.convertAndSend("chat", String.valueOf(Math.random()));
  }
}

消息处理器POJO

  • @Component: 自动注入,在 RedisConfig 中不用再写 @Bean 注入 MessageReceiver
@Component
public class MessageReceiver {

  /**
   * 接收消息方法
   */
  public void receiverMessage(String message) {
    System.out.println("MessageReceiver收到一条新消息:" + message);
  }
}

4.设置消息发布者、消息处理者POJO、redis消息监听容器以及redis监听器注入IOC容器

/**
 * redis配置
 *
 * @author 段誉
 * @create 2019-03-25 9:59
 */
@Configuration//相当于xml中的beans
public class RedisConfig {

  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                          MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅了一个叫chat的通道
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    return container;
  }

  /**
   * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
   * @param receiver
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
    //给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
    //不填defaultListenerMethod默认调用handleMessage
    return new MyMessageListenerAdapter1(receiver, "receiverMessage");
  }

  /**
   * 读取内容的template
   */
  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

5.启动项目查看控制台

MessageReceiver收到一条新消息:0.7372683821918483
MessageReceiver收到一条新消息:0.800095651812774

配置成功

6.如何配置多个监听通道

方式1:一个监听器订阅多个通道

1.监听容器配置

  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                          MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    //订阅了一个叫chat的通道
    container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
    
    container.addMessageListener(listenerAdapter, new PatternTopic("chat1"));
    return container;
  }

2.添加一个新的消息发布者

@EnableScheduling
@Component
public class MessageSender2 {
  @Autowired
  private StringRedisTemplate stringRedisTemplate;

  /**
   * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat频道发布消息
   */
  @Scheduled(fixedDelay = 2000)
  public void sendMessage() {
    stringRedisTemplate.convertAndSend("chat1", "来自chat1的消息" + Math.random());
  }
}

3.启动项目查看控制台打印

MessageReceiver收到一条新消息:0.9988032526665156
MessageReceiver收到一条新消息:来自chat1的消息0.5760191019007642
MessageReceiver收到一条新消息:0.37241454741448377
MessageReceiver收到一条新消息:来自chat1的消息0.639498468451238

说明配置成功

方式2:配置多个监听器监听不同的通道

1.注入一个新的bean,名字要和之前不一样,调用 MessageReceiverreceiverMessage2 方法。

@Bean
MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
  return new MessageListenerAdapter(receiver, "receiverMessage2");
}
  • 这里也可以自己新注入一个新的 ReceiverMessageReceiver2
@Component
public class MessageReceiver2 {

  /**
   * 接收消息方法
   */
  public void receiverMessage(String message) {
    System.out.println("MessageReceiver收到一条新消息:" + message);
  }
}
@Bean
MessageListenerAdapter listenerAdapter2(MessageReceiver2 receiver) {
  return new MessageListenerAdapter(receiver, "receiverMessage");
}

2.配置监听容器,这里参数命名要和上边bean注入的方法名一致

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                        MessageListenerAdapter listenerAdapter1,
                                        MessageListenerAdapter listenerAdapter2) {
  RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  //订阅了一个叫chat的通道
  container.addMessageListener(listenerAdapter1, new PatternTopic("chat"));
  container.addMessageListener(listenerAdapter2, new PatternTopic("chat1"));
  return container;
}

3.添加一个新的消息发布者

//开启定时器功能
@EnableScheduling
@Component
public class MessageSender {
  @Autowired
  private StringRedisTemplate stringRedisTemplate;

  /**
   * 间隔2秒,通过stringRedisTemplate对象向redis消息队列chat1频道发布消息
   */
  @Scheduled(fixedDelay = 2000)
  public void sendMessage1() {
    stringRedisTemplate.convertAndSend("chat1", String.valueOf(Math.random()));
  }

  @Scheduled(fixedDelay = 2000)
  public void sendMessage2() {
    stringRedisTemplate.convertAndSend("chat2", String.valueOf(System.currentTimeMillis()));
  }
}

4.启动项目打印日志如下

收到一条chat2新消息:1555564092816
收到一条chat1新消息:0.257614690680789
收到一条chat2新消息:1555564094830
收到一条chat1新消息:0.3996585101720651
收到一条chat2新消息:1555564096833
收到一条chat1新消息:0.6767629866464419
收到一条chat2新消息:1555564098836
收到一条chat1新消息:0.14106474791507906

配置成功

空文件

简介

取消

发行版

暂无发行版

贡献者 (1)

全部

近期动态

6年多前推送了新的提交到 master 分支,aa1002f...5b9de0e
6年多前推送了新的提交到 master 分支,cb5e451...aa1002f
6年多前推送了新的提交到 master 分支,9f6d7cb...cb5e451
6年多前推送了新的 master 分支
6年多前创建了仓库
加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/fengzxia/springboot-redis-queue.git
git@gitee.com:fengzxia/springboot-redis-queue.git
fengzxia
springboot-redis-queue
springboot集成redis实现发布订阅者模式
master

搜索帮助