# spring-redis-mq **Repository Path**: zhangjun-1998/spring-redis-mq ## Basic Information - **Project Name**: spring-redis-mq - **Description**: 基于 Spring 和 Redis 的分布式消息队列(MessageQueue)实现 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-12-11 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # spring-redis-mq 基于Spring和Redis的分布式消息队列(MessageQueue) ### 使用方法 **创建项目** 由于这个库还没有提交到Maven的中央仓库,所以需要手动将其导入到你的私人仓库中。首先`fork`源码到本地后使用`mvn package`打包。 然后添加到本地仓库: ``` mvn install:install-file -DgroupId=com.scienjus -DartifactId=spring-redis-mq -Dversion=1.0-SNAPSHOT -Dpackaging=jar -Dfile=/path/to/jar/spring-redis-mq.jar ``` 所有依赖 Jar: ``` 4.1.8.RELEASE 2.7.3 1.8.7 2.2.1 redis.clients jedis ${jedis.version} org.springframework spring-context-support ${spring.version} org.springframework spring-tx ${spring.version} org.quartz-scheduler quartz ${quartz.version} org.springframework spring-aop ${spring.version} org.aspectj aspectjweaver ${aspectj.version} ``` **配置Spring Bean** 配置Jedis客户端: ``` @Bean public JedisPool jedisPool() { return new JedisPool("127.0.0.1", 6379); } ``` 配置消费者: ``` @Bean public Consumer consumer() { RedisConsumer consumer = new RedisConsumer(); consumer.setJedisPool(jedisPool()); return consumer; } ``` 配置生产者: ``` @Bean public Producer producer() { RedisProducer producer = new RedisProducer(); producer.setJedisPool(jedisPool()); return producer; } ``` 配置消费者定时扫描任务(仅当使用注解驱动的消费者时才需要配置): ``` @Bean(initMethod = "init") public SchedulerBeanFactory schedulerBeanFactory() { SchedulerBeanFactory schedulerBeanFactory = new SchedulerBeanFactory(); schedulerBeanFactory.setConsumer(consumer()); return schedulerBeanFactory; } ``` 注意一定要将`initMethod`设为`init`方法。 配置生产者自动推送任务(仅当使用注解驱动的生产者时才需要配置): ``` @Bean public ProducerWorker producerWorker() { ProducerWorker producerWorker = new ProducerWorker(); producerWorker.setProducer(producer()); return producerWorker; } ``` **创建生产者实例** 方法1:注入`producer`,调用`sendMessage`方法: ``` @Component public class SayHelloProducer { @Autowired private Producer producer; public void sayHello(String name) { producer.sendMessage("say_hello", new Message(name)); } } ``` 方法2:使用`@ToQueue`注解,`retrun`需要发送的对象(需要配置`producerWorker`): ``` @Producer public class UserServiceImpl implements UserService { @Autowired private UserDao userDao; @ToQueue(topic = "new_user") //添加新的用户后,将其发送到消息队列 public User insert(User user) { this.userDao.insert(user); return user; } } ``` **创建消费者实例** 方法1:注入`consumer`,调用`getMessage`方法(需要自己开线程循环获取): ``` @Component public class SayHelloConsumer { @Autowired private Consumer consumer; public void sayHello() { Message message; while ((message = consumer.getMessage("say_hello")) != null) { System.out.println("Hello ! " + message.getContent() + " !"); } } } ``` 方法2:为类添加`@Consumer`注解,为对应的方法添加`@OnMessage`注解(需要配置`schedulerBeanFactory`): ``` @Consumer public class SayHelloConsumer { @OnMessage(topic = "say_hello") public void onSayHello(String name) { System.out.println("Hello ! " + name + " !"); } } ``` **消费者的重试机制** 当`@OnMessage`方法的返回值类型为`boolean`类型,并且执行的结果为`false`时,系统认定此消息执行失败。 消息执行失败后,系统会将这个消息重新插入到消息队列中(顺序排在最后)。 通过`@ToQueue`的`expire`属性可以设置消息的生存时间(单位为秒),默认为永不过期。 当消息的生存时间超过后还没有消费成功,系统将会丢掉这个消息。 一个简单的例子: ``` //生产者 @Producer public class UserServiceImpl implements UserService { @Autowired private UserDao userDao; @ToQueue(topic = "new_user", expire = 24 * 3600) //添加新的用户后,将其发送到消息队列,消息的生存时间是24小时 public User insert(User user) { this.userDao.insert(user); return user; } } @Consumer public class NewUserConsumer { @OnMessage(value = "new_user") //如果邮件发送失败,需要尝试重新发送。 public boolean onNewUser(User user) { try { //发送邮件 MailSender.sendWelcomeMail(user.getEmail(), user.getNickname()); //发送成功,任务完成,返回true retrun true; } catch (Exception e) { //发送失败,尝试重试,返回false retrun false; } } } ``` 当然,如果一个消费方法永远不会失败(或是失败后不需要重试),可以直接设置为`void`方法。 PS:之前版本使用重试次数控制失败处理。但是系统修复需要的是时间,而重试次数很有可能会被浪费掉,因此改为了生存时间。 ### 待办事项 - [x] 消息处理失败的处理(重新插入队列,设置消息的生存周期) - [ ] 将队列分为两个阶段,等待投递和等待接收 - [ ] 监控页面 ### 帮助 我的邮箱:`i@scienjus.com` 由于 Redis 本身的限制,这个项目并不适合使用在生产环境中,在此推荐 Redis 作者开发的消息队列 Disque。一些介绍: - 该项目的地址:[Disque, an in-memory, distributed job queue][1] - 该项目的中文介绍:[Disque 使用教程][2] - Java 的客户端实现(Jedis 的作者开发):[Jedisque][3] - 与这个项目用法相同的Disque实现:[Spring Dsique][4] [1]: https://github.com/antirez/disque [2]: http://disquebook.com/ [3]: https://github.com/xetorthio/jedisque [4]: https://github.com/scienjus/spring-disque