乐趣区

关于后端:Spring-Data-Redis-Stream的使用

一、背景

Stream类型是 redis5之后新增的类型,在这篇文章中,咱们实现应用 Spring boot data redis 来生产 Redis Stream 中的数据。实现独立生产和生产组生产。

二、整合步骤

1、引入 jar 包

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
</dependencies>

次要是上方的这个包,其余的不相干的包此处省略导入。

2、配置 RedisTemplate 依赖

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 这个中央不可应用 json 序列化,如果应用的是 ObjectRecord 传输对象时,可能会有问题,会呈现一个 java.lang.IllegalArgumentException: Value must not be null! 谬误
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        return redisTemplate;
    }
}

留神:

此处须要留神 setHashValueSerializer 的序列化的形式,具体注意事项前期再说。

3、筹备一个实体对象

这个实体对象是须要发送到 Stream 中的对象。

@Getter
@Setter
@ToString
public class Book {
    private String title;
    private String author;
    
    public static Book create() {com.github.javafaker.Book fakerBook = Faker.instance().book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        return book;
    }
}

每次调用 create 办法时,会主动产生一个 Book 的对象,对象模仿数据是应用 javafaker 来模仿生成的。

4、编写一个常量类,配置 Stream 的名称

/**
 * 常量
 *
 */
public class Cosntants {public static final String STREAM_KEY_001 = "stream-001";}

5、编写一个生产者,向 Stream 中生产数据

1、编写一个生产者,向 Stream 中产生 ObjectRecord 类型的数据

/**
 * 音讯生产者
 
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    public void sendRecord(String streamKey) {Book book = Book.create();
        log.info("产生一本书的信息:[{}]", book);
        
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());
        
        RecordId recordId = redisTemplate.opsForStream()
                .add(record);
        
        log.info("返回的 record-id:[{}]", recordId);
    }
}

2、每隔 5s 就生产一个数据到 Stream 中

/**
 * 周期性的向流中产生音讯
 */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
    
    private final StreamProducer streamProducer;
    
    @Override
    public void run(ApplicationArguments args) {Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0, 5, TimeUnit.SECONDS);
    }
}

三、独立生产

独立生产指的是脱离生产组的间接生产 Stream 中的音讯,是应用 xread办法读取流中的数据,流中的数据在读取后并不会被删除,还是存在的。如果多个程序同时应用 xread 读取,都是能够读取到音讯的。

1、实现从头开始生产 -xread 实现

此处实现的是从 Stream 的第一个音讯开始生产

package com.huan.study.redis.stream.consumer.xread;

import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 脱离生产组 - 间接生产 Stream 中的数据,能够获取到 Stream 中所有的音讯
 */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean, DisposableBean {
    
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    
    private volatile boolean stop = false;
    
    @Override
    public void afterPropertiesSet() {
        
        // 初始化线程池
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });
        
        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // 如果没有数据,则阻塞 1s 阻塞工夫须要小于 `spring.redis.timeout` 配置的工夫
                .block(Duration.ofMillis(1000))
                // 始终阻塞直到获取数据,可能会报超时异样
                // .block(Duration.ofMillis(0))
                // 1 次获取 10 个数据
                .count(10);
        
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {while (!stop) {
                // 应用 xread 读取数据时,须要记录下最初一次读取到 offset,而后当作下次读取的 offset,否则读取进去的数据会有问题
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {log.warn("没有获取到数据");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {log.info("获取到的数据信息 id:[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readOffset.setLength(0);
                    readOffset.append(objectRecord.getId());
                }
            }
        });
    }
    
    @Override
    public void destroy() throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
    }
}

留神:

下一次读取数据时,offset 是上一次最初获取到的 id 的值,否则可能会呈现漏数据。

2、StreamMessageListenerContainer 实现独立生产

见下方的生产组生产的代码

四、生产组生产

1、实现 StreamListener 接口

实现这个接口的目标是为了,生产 Stream 中的数据。须要留神在注册时应用的是 streamMessageListenerContainer.receiveAutoAck() 还是 streamMessageListenerContainer.receive() 办法,如果是第二个,则须要 手动 ack,手动 ack 的代码:redisTemplate.opsForStream().acknowledge("key","group","recordId");

/**
 * 通过监听器异步生产
 *
 * @author huan.fu 2021/11/10 - 下午 5:51
 */
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Book>> {
    /**
     * 消费者类型:独立生产、生产组生产
     */
    private String consumerType;
    /**
     * 生产组
     */
    private String group;
    /**
     * 生产组中的某个消费者
     */
    private String consumerName;
    
    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void onMessage(ObjectRecord<String, Book> message) {String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {log.info("[{}]: 接管到一个音讯 stream:[{}],id:[{}],value:[{}]", consumerType, stream, id, value);
        } else {log.info("[{}] group:[{}] consumerName:[{}] 接管到一个音讯 stream:[{}],id:[{}],value:[{}]", consumerType,
                    group, consumerName, stream, id, value);
        }
        
        // 当是生产组生产时,如果不是主动 ack,则须要在这个中央手动 ack
        // redisTemplate.opsForStream()
        //         .acknowledge("key","group","recordId");
    }
}

2、获取生产或生产音讯过程中谬误的解决

/**
 * StreamPollTask 获取音讯或对应的 listener 生产音讯过程中产生了异样
 *
 * @author huan.fu 2021/11/11 - 下午 3:44
 */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {log.error("产生了异样", t);
    }
}

3、生产组配置

/**
 * redis stream 生产组配置
 *
 * @author huan.fu 2021/11/11 - 下午 12:22
 */
@Configuration
public class RedisStreamConfiguration {
    
    @Resource
    private RedisConnectionFactory redisConnectionFactory;
    
    /**
     * 能够同时反对 独立生产 和 消费者组 生产
     * <p>
     * 能够反对动静的 减少和删除 消费者
     * <p>
     * 生产组须要事后创立进去
     *
     * @return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条音讯
                        .batchSize(10)
                        // 运行 Stream 的 poll task
                        .executor(executor)
                        // 能够了解为 Stream Key 的序列化形式
                        .keySerializer(RedisSerializer.string())
                        // 能够了解为 Stream 前方的字段的 key 的序列化形式
                        .hashKeySerializer(RedisSerializer.string())
                        // 能够了解为 Stream 前方的字段的 value 的序列化形式
                        .hashValueSerializer(RedisSerializer.string())
                        // Stream 中没有音讯时,阻塞多长时间,须要比 `spring.redis.timeout` 的工夫小
                        .pollTimeout(Duration.ofSeconds(1))
                        // ObjectRecord 时,将 对象的 filed 和 value 转换成一个 Map 比方:将 Book 对象转换成 map
                        .objectMapper(new ObjectHashMapper())
                        // 获取音讯的过程或获取到音讯给具体的音讯者解决的过程中,产生了异样的解决
                        .errorHandler(new CustomErrorHandler())
                        // 将发送到 Stream 中的 Record 转换成 ObjectRecord,转换成具体的类型是这个中央指定的类型
                        .targetType(Book.class)
                        .build();
        
        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        // 独立生产
        String streamKey = Cosntants.STREAM_KEY_001;
        streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
                new AsyncConsumeStreamListener("独立生产", null, null));
        
        // 生产组 A, 不主动 ack
        // 从生产组中没有调配给消费者的音讯开始生产
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产", "group-a", "consumer-a"));
        // 从生产组中没有调配给消费者的音讯开始生产
        streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 A", "group-a", "consumer-b"));
        
        // 生产组 B, 主动 ack
        streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 B", "group-b", "consumer-bb"));
        
        // 如果须要对某个消费者进行个性化配置在调用 register 办法的时候传递 `StreamReadRequest` 对象
        
        return streamMessageListenerContainer;
    }
}

留神:

提前建设好生产组

127.0.0.1:6379> xgroup create stream-001 group-a $
OK
127.0.0.1:6379> xgroup create stream-001 group-b $
OK

1、独有生产配置

 streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), new AsyncConsumeStreamListener("独立生产", null, null));

不传递 Consumer 即可。

2、配置生产组 - 不主动 ack 音讯

streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("生产组生产 A", "group-a", "consumer-b"));

1、须要留神 ReadOffset 的取值。

2、须要留神 group 须要提前创立好。

3、配置生产组 - 主动 ack 音讯

streamMessageListenerContainer.receiveAutoAck()

五、序列化策略

Stream Property Serializer Description
key keySerializer used for Record#getStream()
field hashKeySerializer used for each map key in the payload
value hashValueSerializer used for each map value in the payload

六、ReadOffset策略

生产音讯时的 Read Offset 策略

Read offset Standalone Consumer Group
Latest Read latest message(读取最新的音讯) Read latest message(读取最新的音讯)
Specific Message Id Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯) Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯)
Last Consumed Use last seen message as the next MessageId<br/>(读取大于指定的音讯 id 的音讯) Last consumed message as per consumer group<br/>(读取还未调配给生产组中的生产组的音讯)

七、注意事项

1、读取音讯的超时工夫

当咱们应用 StreamReadOptions.empty().block(Duration.ofMillis(1000)) 配置阻塞工夫时,这个配置的阻塞工夫必须要比 spring.redis.timeout配置的工夫短,否则可能会报超时异样。

2、ObjectRecord 反序列化谬误

如果咱们在读取音讯时产生如下异样,那么排查思路如下:

java.lang.IllegalArgumentException: Value must not be null!
    at org.springframework.util.Assert.notNull(Assert.java:201)
    at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
    at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138)
    at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164)
    at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
    at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
    at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.java:61)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1、检测 RedisTemplateHashValueSerializer 的序列化形式,最好不要应用 json 能够应用RedisSerializer.string()

2、查看 redisTemplate.opsForStream() 中配置的 HashMapper,默认是ObjectHashMapper 这个是把对象字段和值序列化成 byte[] 格局。

提供一个可用的配置

# RedisTemplate 的 hash value 应用 string 类型的序列化形式
redisTemplate.setHashValueSerializer(RedisSerializer.string());
# 这个办法 opsForStream()外面应用默认的 ObjectHashMapper
redisTemplate.opsForStream()

3、应用 xread 程序读取数据漏数据

如果咱们应用 xread 读取数据发现有写数据漏掉了,这个时候咱们须要查看第二次读取时配置的 StreamOffset 是否非法,这个值须要是上一次读取的最初一个值。

举例说明:

1、SteamOffset传递的是 $ 示意读取最新的一个数据。

2、解决上一步读取到的数据,此时另外的生产者又向 Stream 中插入了几个数据,这个时候读取到的数据还没有解决完。

3、再次读取 Stream 中的数据,还是传递的$,那么示意还是读取最新的数据。那么在上一步流入到 Stream 中的数据,这个消费者就读取不到了,因为它读取的是最新的数据。

4、StreamMessageListenerContainer的应用

1、能够动静的增加和删除消费者

2、能够进行生产组生产

3、能够间接独立生产

4、如果传输 ObjectRecord 的时候,须要留神一下序列化形式。参考下面的代码。

八、残缺代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream

九、参考文档

1、https://docs.spring.io/spring-data/redis/docs/2.5.5/reference/html/#redis.streams

退出移动版