一、背景
Stream
类型是 redis5
之后新增的类型,在这篇文章中,咱们实现应用 Spring boot data redis
来生产 Redis Stream
中的数据。实现独立生产和生产组生产。
二、整合步骤
1、引入 jar 包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
次要是上方的这个包,其余的不相干的包此处省略导入。
2、配置 RedisTemplate 依赖
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
// 这个中央不可应用 json 序列化,如果应用的是 ObjectRecord 传输对象时,可能会有问题,会呈现一个 java.lang.IllegalArgumentException: Value must not be null! 谬误
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
}
留神:
此处须要留神 setHashValueSerializer
的序列化的形式,具体注意事项前期再说。
3、筹备一个实体对象
这个实体对象是须要发送到 Stream
中的对象。
@Getter
@Setter
@ToString
public class Book {
private String title;
private String author;
public static Book create() {com.github.javafaker.Book fakerBook = Faker.instance().book();
Book book = new Book();
book.setTitle(fakerBook.title());
book.setAuthor(fakerBook.author());
return book;
}
}
每次调用 create
办法时,会主动产生一个 Book
的对象,对象模仿数据是应用 javafaker
来模仿生成的。
4、编写一个常量类,配置 Stream 的名称
/**
* 常量
*
*/
public class Cosntants {public static final String STREAM_KEY_001 = "stream-001";}
5、编写一个生产者,向 Stream 中生产数据
1、编写一个生产者,向 Stream 中产生 ObjectRecord 类型的数据
/**
* 音讯生产者
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
public void sendRecord(String streamKey) {Book book = Book.create();
log.info("产生一本书的信息:[{}]", book);
ObjectRecord<String, Book> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(book)
.withId(RecordId.autoGenerate());
RecordId recordId = redisTemplate.opsForStream()
.add(record);
log.info("返回的 record-id:[{}]", recordId);
}
}
2、每隔 5s 就生产一个数据到 Stream 中
/**
* 周期性的向流中产生音讯
*/
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args) {Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
0, 5, TimeUnit.SECONDS);
}
}
三、独立生产
独立生产指的是脱离生产组的间接生产 Stream
中的音讯,是应用 xread
办法读取流中的数据,流中的数据在读取后并不会被删除,还是存在的。如果多个程序同时应用 xread
读取,都是能够读取到音讯的。
1、实现从头开始生产 -xread 实现
此处实现的是从 Stream 的第一个音讯开始生产
package com.huan.study.redis.stream.consumer.xread;
import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 脱离生产组 - 间接生产 Stream 中的数据,能够获取到 Stream 中所有的音讯
*/
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
private ThreadPoolExecutor threadPoolExecutor;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private volatile boolean stop = false;
@Override
public void afterPropertiesSet() {
// 初始化线程池
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("xread-nonblock-01");
return thread;
});
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
// 如果没有数据,则阻塞 1s 阻塞工夫须要小于 `spring.redis.timeout` 配置的工夫
.block(Duration.ofMillis(1000))
// 始终阻塞直到获取数据,可能会报超时异样
// .block(Duration.ofMillis(0))
// 1 次获取 10 个数据
.count(10);
StringBuilder readOffset = new StringBuilder("0-0");
threadPoolExecutor.execute(() -> {while (!stop) {
// 应用 xread 读取数据时,须要记录下最初一次读取到 offset,而后当作下次读取的 offset,否则读取进去的数据会有问题
List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(objectRecords)) {log.warn("没有获取到数据");
continue;
}
for (ObjectRecord<String, Book> objectRecord : objectRecords) {log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
readOffset.setLength(0);
readOffset.append(objectRecord.getId());
}
}
});
}
@Override
public void destroy() throws Exception {
stop = true;
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
}
留神:
下一次读取数据时,offset 是上一次最初获取到的 id 的值,否则可能会呈现漏数据。
2、StreamMessageListenerContainer 实现独立生产
见下方的生产组生产的代码
四、生产组生产
1、实现 StreamListener 接口
实现这个接口的目标是为了,生产 Stream
中的数据。须要留神在注册时应用的是 streamMessageListenerContainer.receiveAutoAck()
还是 streamMessageListenerContainer.receive()
办法,如果是第二个,则须要 手动 ack
,手动 ack 的代码:redisTemplate.opsForStream().acknowledge("key","group","recordId");
/**
* 通过监听器异步生产
*
* @author huan.fu 2021/11/10 - 下午 5:51
*/
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
/**
* 消费者类型:独立生产、生产组生产
*/
private String consumerType;
/**
* 生产组
*/
private String group;
/**
* 生产组中的某个消费者
*/
private String consumerName;
public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, Book> message) {String stream = message.getStream();
RecordId id = message.getId();
Book value = message.getValue();
if (StringUtils.isBlank(group)) {log.info("[{}]: 接管到一个音讯 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
} else {log.info("[{}] group:[{}] consumerName:[{}] 接管到一个音讯 stream:[{}],id:[{}],value:[{}]", consumerType,
group, consumerName, stream, id, value);
}
// 当是生产组生产时,如果不是主动 ack,则须要在这个中央手动 ack
// redisTemplate.opsForStream()
// .acknowledge("key","group","recordId");
}
}
2、获取生产或生产音讯过程中谬误的解决
/**
* StreamPollTask 获取音讯或对应的 listener 生产音讯过程中产生了异样
*
* @author huan.fu 2021/11/11 - 下午 3:44
*/
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {log.error("产生了异样", t);
}
}
3、生产组配置
/**
* redis stream 生产组配置
*
* @author huan.fu 2021/11/11 - 下午 12:22
*/
@Configuration
public class RedisStreamConfiguration {
@Resource
private RedisConnectionFactory redisConnectionFactory;
/**
* 能够同时反对 独立生产 和 消费者组 生产
* <p>
* 能够反对动静的 减少和删除 消费者
* <p>
* 生产组须要事后创立进去
*
* @return StreamMessageListenerContainer
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条音讯
.batchSize(10)
// 运行 Stream 的 poll task
.executor(executor)
// 能够了解为 Stream Key 的序列化形式
.keySerializer(RedisSerializer.string())
// 能够了解为 Stream 前方的字段的 key 的序列化形式
.hashKeySerializer(RedisSerializer.string())
// 能够了解为 Stream 前方的字段的 value 的序列化形式
.hashValueSerializer(RedisSerializer.string())
// Stream 中没有音讯时,阻塞多长时间,须要比 `spring.redis.timeout` 的工夫小
.pollTimeout(Duration.ofSeconds(1))
// ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比方:将 Book 对象转换成 map
.objectMapper(new ObjectHashMapper())
// 获取音讯的过程或获取到音讯给具体的音讯者解决的过程中,产生了异样的解决
.errorHandler(new CustomErrorHandler())
// 将发送到 Stream 中的 Record 转换成 ObjectRecord,转换成具体的类型是这个中央指定的类型
.targetType(Book.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 独立生产
String streamKey = Cosntants.STREAM_KEY_001;
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new AsyncConsumeStreamListener("独立生产", null, null));
// 生产组 A, 不主动 ack
// 从生产组中没有调配给消费者的音讯开始生产
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产", "group-a", "consumer-a"));
// 从生产组中没有调配给消费者的音讯开始生产
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 A", "group-a", "consumer-b"));
// 生产组 B, 主动 ack
streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 B", "group-b", "consumer-bb"));
// 如果须要对某个消费者进行个性化配置在调用 register 办法的时候传递 `StreamReadRequest` 对象
return streamMessageListenerContainer;
}
}
留神:
提前建设好生产组
127.0.0.1:6379> xgroup create stream-001 group-a $
OK
127.0.0.1:6379> xgroup create stream-001 group-b $
OK
1、独有生产配置
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("独立生产", null, null));
不传递 Consumer
即可。
2、配置生产组 - 不主动 ack 音讯
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 A", "group-a", "consumer-b"));
1、须要留神 ReadOffset
的取值。
2、须要留神 group
须要提前创立好。
3、配置生产组 - 主动 ack 音讯
streamMessageListenerContainer.receiveAutoAck()
五、序列化策略
Stream Property | Serializer | Description |
---|---|---|
key | keySerializer | used for Record#getStream() |
field | hashKeySerializer | used for each map key in the payload |
value | hashValueSerializer | used for each map value in the payload |
六、ReadOffset
策略
生产音讯时的 Read Offset 策略
Read offset | Standalone | Consumer Group |
---|---|---|
Latest | Read latest message(读取最新的音讯) | Read latest message(读取最新的音讯) |
Specific Message Id | Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯) | Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯) |
Last Consumed | Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯) | Last consumed message as per consumer group<br/>(读取还未调配给生产组中的生产组的音讯) |
七、注意事项
1、读取音讯的超时工夫
当咱们应用 StreamReadOptions.empty().block(Duration.ofMillis(1000))
配置阻塞工夫时,这个配置的阻塞工夫必须要比 spring.redis.timeout
配置的工夫短,否则可能会报超时异样。
2、ObjectRecord 反序列化谬误
如果咱们在读取音讯时产生如下异样,那么排查思路如下:
java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
1、检测 RedisTemplate
的 HashValueSerializer
的序列化形式,最好不要应用 json
能够应用RedisSerializer.string()
。
2、查看 redisTemplate.opsForStream()
中配置的 HashMapper
,默认是ObjectHashMapper
这个是把对象字段和值序列化成 byte[]
格局。
提供一个可用的配置
# RedisTemplate 的 hash value 应用 string 类型的序列化形式
redisTemplate.setHashValueSerializer(RedisSerializer.string());
# 这个办法 opsForStream()外面应用默认的 ObjectHashMapper
redisTemplate.opsForStream()
3、应用 xread 程序读取数据漏数据
如果咱们应用 xread
读取数据发现有写数据漏掉了,这个时候咱们须要查看第二次读取时配置的 StreamOffset
是否非法,这个值须要是上一次读取的最初一个值。
举例说明:
1、SteamOffset
传递的是 $
示意读取最新的一个数据。
2、解决上一步读取到的数据,此时另外的生产者又向 Stream
中插入了几个数据,这个时候读取到的数据还没有解决完。
3、再次读取 Stream
中的数据,还是传递的$
,那么示意还是读取最新的数据。那么在上一步流入到 Stream 中的数据,这个消费者就读取不到了,因为它读取的是最新的数据。
4、StreamMessageListenerContainer
的应用
1、能够动静的增加和删除消费者
2、能够进行生产组生产
3、能够间接独立生产
4、如果传输 ObjectRecord 的时候,须要留神一下序列化形式。参考下面的代码。
八、残缺代码
https://gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream
九、参考文档
1、https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams