24 Star 323 Fork 221

huan1993/spring-cloud-parent

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

目的

学习 redis stream 类型

提前创建消费组

mkstream 表示如果这个Stream不存在,则会自动创建出来。

192.168.121.135:6379> del stream-001
(integer) 1
192.168.121.135:6379> XGROUP CREATE stream-001 group-a $ mkstream
OK
192.168.121.135:6379> XGROUP CREATE stream-001 group-b $ mkstream
OK

类解释

redis
├── CycleGeneratorStreamMessageRunner.java # Stream消息生产者,每隔5s产生一个消息
├── RedisStreamApplication.java
├── config
│   └── RedisConfig.java # redis 配置
├── constan
│   └── Cosntants.java # 常量
├── entity
│   └── Book.java # 实体类
└── stream
    ├── consumer # 消费者
    │   ├── group # 消费组
    │   ├── group
    │   │   ├── AsyncConsumeStreamListener.java # 异步消费消息
    │   │   ├── CustomErrorHandler.java # 处理消费消息或读取消息过程中发生的异常
    │   │   └── RedisStreamConfiguration.java # Stream 消费组消费消息
    │   └── xread # 直接消费,此时Stream可以理解成普通的list,但是Stream中的消息在读取后不会消失
    │       ├── XreadNonBlockConsumer01.java    # 非阻塞消费者
    │       └── XreadNonBlockConsumer02.java    # 非阻塞消费者 消费者02和消费者01实现的功能一样,可以看到同一个消息2个消费者都可以消费到
    └── producer
        └── StreamProducer.java # 向Stream中发送消息

RedisTemplate HashValue序列化器选择错误导致的异常

java.lang.IllegalArgumentException: Value must not be null!
	at org.springframework.util.Assert.notNull(Assert.java:201)
	at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
	at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
	at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
	at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
	at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
	at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

如果出现了上述异常,

  1. 那么我们需要检查一下 RedisTemplate的配置,此处可以考虑使用 redisTemplate.setHashValueSerializer(RedisSerializer.string())
  2. 检查 redisTemplate.opsForStream() 的配置,这个构造方法中是不是填写了别的HashMapper实现

提供一个可用的配置

  1. RedisTemplate 的 setHashValueSerializer(RedisSerializer.string())
  2. redisTemplate.opsForStream() 构造方法不用填写别的HashMapper的实现,就使用默认的ObjectHashMapper

关于上面的这个错误,我在Spring Data Redis的官方仓库提了一个 issue,得到官方的回复是,这是一个bug,后期会修复的。 官方回答

2.7 M3 这个版本官方已经修复了这个bug。 详情请求看这个issues https://github.com/spring-projects/spring-data-redis/issues/2198

注意事项

  1. stream 中的 recordId 必须是单调递增的,可以让redis自动生成,也可以自己提供。
  2. xread 读取到消息后,需要将读取到的最后一个消息的recordId当作下一次读取的id,否则读取到的数据会有问题。
  3. xread 取消消息阻塞的时间需要小于spring.redis.timeout配置的时间,否则会报超时错误。
  4. StreamMessageListenerContainer 可以同时支持消费者组消费和独立消费。
  5. StreamMessageListenerContainer 可以动态的增加或删除消费者。
  6. 消费组消费时,如果不是自动ack,则需要手动ack。
  7. 如果需要对某个消费者进行个性化配置在调用register方法的时候传递StreamReadRequest对象
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/huan1993/spring-cloud-parent.git
git@gitee.com:huan1993/spring-cloud-parent.git
huan1993
spring-cloud-parent
spring-cloud-parent
master

搜索帮助