代码拉取完成,页面将自动刷新
学习 redis stream 类型
mkstream
表示如果这个Stream不存在,则会自动创建出来。
192.168.121.135:6379> del stream-001
(integer) 1
192.168.121.135:6379> XGROUP CREATE stream-001 group-a $ mkstream
OK
192.168.121.135:6379> XGROUP CREATE stream-001 group-b $ mkstream
OK
redis
├── CycleGeneratorStreamMessageRunner.java # Stream消息生产者,每隔5s产生一个消息
├── RedisStreamApplication.java
├── config
│ └── RedisConfig.java # redis 配置
├── constan
│ └── Cosntants.java # 常量
├── entity
│ └── Book.java # 实体类
└── stream
├── consumer # 消费者
│ ├── group # 消费组
│ ├── group
│ │ ├── AsyncConsumeStreamListener.java # 异步消费消息
│ │ ├── CustomErrorHandler.java # 处理消费消息或读取消息过程中发生的异常
│ │ └── RedisStreamConfiguration.java # Stream 消费组消费消息
│ └── xread # 直接消费,此时Stream可以理解成普通的list,但是Stream中的消息在读取后不会消失
│ ├── XreadNonBlockConsumer01.java # 非阻塞消费者
│ └── XreadNonBlockConsumer02.java # 非阻塞消费者 消费者02和消费者01实现的功能一样,可以看到同一个消息2个消费者都可以消费到
└── producer
└── StreamProducer.java # 向Stream中发送消息
java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
如果出现了上述异常,
RedisTemplate
的配置,此处可以考虑使用 redisTemplate.setHashValueSerializer(RedisSerializer.string())
redisTemplate.opsForStream()
的配置,这个构造方法中是不是填写了别的HashMapper实现提供一个可用的配置
关于上面的这个错误,我在Spring Data Redis的官方仓库提了一个 issue,得到官方的回复是,这是一个bug,后期会修复的。 官方回答
2.7 M3 这个版本官方已经修复了这个bug。 详情请求看这个issues https://github.com/spring-projects/spring-data-redis/issues/2198
spring.redis.timeout
配置的时间,否则会报超时错误。StreamMessageListenerContainer
可以同时支持消费者组消费和独立消费。StreamMessageListenerContainer
可以动态的增加或删除消费者。StreamReadRequest
对象此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。