关于redis:Reactive-Spring实战-响应式Redis交互

38次阅读

共计 8186 个字符,预计需要花费 21 分钟才能阅读完成。

本文分享 Spring 中如何实现 Redis 响应式交互模式。

本文将模仿一个用户服务,并应用 Redis 作为数据存储服务器。
本文波及两个 java bean,用户与权利

public class User {
    private long id;
    private String name;
    // 标签
    private String label;
    // 收货地址经度
    private Double deliveryAddressLon;
    // 收货地址维度
    private Double deliveryAddressLat;
    // 最新签到日
    private String lastSigninDay;
    // 积分
    private Integer score;
    // 权利
    private List<Rights> rights;
    ...
}

public class Rights {
    private Long id;
    private Long userId;
    private String name;
    ...
}

启动

引入依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

增加 Redis 配置

spring.redis.host=192.168.56.102
spring.redis.port=6379
spring.redis.password=
spring.redis.timeout=5000

SpringBoot 启动

@SpringBootApplication
public class UserServiceReactive {public static void main(String[] args) {
        new SpringApplicationBuilder(UserServiceReactive.class)
                .web(WebApplicationType.REACTIVE).run(args);
    }
}

利用启动后,Spring 会主动生成 ReactiveRedisTemplate(它的底层框架是 Lettuce)。
ReactiveRedisTemplate 与 RedisTemplate 应用相似,但它提供的是异步的,响应式 Redis 交互方式。
这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate 发送 Redis 申请后不会阻塞线程,以后线程能够去执行其余工作。
等到 Redis 响应数据返回后,ReactiveRedisTemplate 再调度线程解决响应数据。
响应式编程能够通过优雅的形式实现异步调用以及解决异步后果,正是它的最大的意义。

序列化

ReactiveRedisTemplate 默认应用的序列化是 Jdk 序列化,咱们能够配置为 json 序列化

@Bean
public RedisSerializationContext redisSerializationContext() {RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
    builder.key(StringRedisSerializer.UTF_8);
    builder.value(RedisSerializer.json());
    builder.hashKey(StringRedisSerializer.UTF_8);
    builder.hashValue(StringRedisSerializer.UTF_8);

    return builder.build();}

@Bean
public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {RedisSerializationContext serializationContext = redisSerializationContext();
    ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
    return reactiveRedisTemplate;
}

builder.hashValue 办法指定 Redis 列表值的序列化形式,因为本文 Redis 列表值只寄存字符串,所以还是设置为 StringRedisSerializer.UTF_8。

根本数据类型

ReactiveRedisTemplate 反对 Redis 字符串,散列,列表,汇合,有序汇合等根本的数据类型。
本文应用散列保留用户信息,列表保留用户权利,其余根本数据类型的应用本文不开展。

public Mono<Boolean>  save(User user) {ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
    Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
    if(user.getRights() != null) {ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
        opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {logger.info("add rights:{}", l);
        });
    }
    return userRs;
}

beanToMap 办法负责将 User 类转化为 map。

HyperLogLog

Redis HyperLogLog 构造能够统计一个汇合内不同元素的数量。
应用 HyperLogLog 统计每天登录的用户量

public Mono<Long>  login(User user) {ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
    return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
}

BitMap

Redis BitMap(位图)通过一个 Bit 位示意某个元素对应的值或者状态。因为 Bit 是计算机存储中最小的单位,应用它进行贮存将十分节俭空间。
应用 BitMap 记录用户本周是否有签到

public void addSignInFlag(long userId) {String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
    redisTemplate.opsForValue().setBit(key, userId & 0xffff , true)
    .subscribe(b -> logger.info("set:{},result:{}", key, b));
}

userId 高 48 位用于将用户划分到不同的 key,低 16 位作为位图偏移参数 offset。
offset 参数必须大于或等于 0,小于 2^32(bit 映射被限度在 512 MB 之内)。

Geo

Redis Geo 能够存储地理位置信息,并对地理位置进行计算。
如查找给定范畴内的仓库信息

public Flux getWarehouseInDist(User u, double dist) {ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
    Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
    RedisGeoCommands.GeoRadiusCommandArgs args =
            RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
    return geo.radius("warehouse:address", circle, args);
}

warehouse:address这个汇合中须要先保留好仓库地理位置信息。
ReactiveGeoOperations#radius 办法能够查找汇合中地理位置在给定范畴内的元素,它中还反对增加元素到汇合,计算汇合中两个元素地理位置间隔等操作。

Lua

ReactiveRedisTemplate 也能够执行 Lua 脚本。
上面通过 Lua 脚本实现用户签到逻辑:如果用户明天未签到,容许签到,积分加 1,如果用户明天已签到,则拒接操作。

public Flux<String> addScore(long userId) {DefaultRedisScript<String> script = new DefaultRedisScript<>();
    script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
    List<String> keys = new ArrayList<>();
    keys.add(String.valueOf(userId));
    keys.add(LocalDateTime.now().toString().substring(0, 10));
    return redisTemplate.execute(script, keys);
}

signin.lua 内容如下

local score=redis.call('hget','user:'..KEYS[1],'score')
local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')
if(day==KEYS[2])
    then
    return '0'
else
    redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2])
    return '1'
end

Stream

Redis Stream 是 Redis 5.0 版本新减少的数据类型。该类型能够实现音讯队列,并提供音讯的长久化和主备复制性能,并且能够记住每一个客户端的拜访地位,还能保障音讯不失落。

Redis 借鉴了 kafka 的设计,一个 Stream 内能够存在多个生产组,一个生产组内能够存在多个消费者。
如果一个生产组内某个消费者生产了 Stream 中某条音讯,则这音讯不会被该生产组其余消费者生产到,当然,它还能够被其余生产组中某个消费者生产到。

上面定义一个 Stream 消费者,负责解决接管到的权利数据

@Component
public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
    // Stream 队列
    private static final String STREAM_KEY = "stream:user:rights";
    // 生产组
    private static final String STREAM_GROUP = "user-service";
    // 消费者
    private static final String STREAM_CONSUMER = "consumer-1";

    @Autowired
    @Qualifier("reactiveRedisTemplate")
    private ReactiveRedisTemplate redisTemplate;

    public void run(ApplicationArguments args) throws Exception {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(100) // 一批次拉取的最大 count 数
                        .executor(Executors.newSingleThreadExecutor())  // 线程池
                        .pollTimeout(Duration.ZERO) // 阻塞式轮询
                        .targetType(Rights.class) // 指标类型(音讯内容的类型).build();
        // 创立一个音讯监听容器
        container = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // prepareStreamAndGroup 查找 Stream 信息,如果不存在,则创立 Stream
        prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                .subscribe(stream -> {
            // 为 Stream 创立一个消费者,并绑定解决类
            container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                    StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                    new StreamMessageListener());
            container.start();});
    }

    @Override
    public void destroy() throws Exception {container.stop();
    }

    // 查找 Stream 信息,如果不存在,则创立 Stream
    private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
        // info 办法查问 Stream 信息,如果该 Stream 不存在,底层会报错,这时会调用 onErrorResume 办法。return ops.info(stream).onErrorResume(err -> {logger.warn("query stream err:{}", err.getMessage());
            // createGroup 办法创立 Stream
            return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
        });
    }

    // 音讯解决对象
    class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {public void onMessage(ObjectRecord<String, Rights> message) {
            // 解决音讯
            RecordId id = message.getId();
            Rights rights = message.getValue();
            logger.info("receive id:{},rights:{}", id, rights);
            redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {logger.info("add rights:{}", l);
            });
        }
    }
}

上面看一下如何发送信息

public Mono<RecordId> addRights(Rights r) {
    String streamKey = "stream:user:rights";//stream key
    ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
    Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
    return mono;
}

创立一个音讯记录对象 ObjectRecord,并通过 ReactiveStreamOperations 发送信息记录。

Sentinel、Cluster

ReactiveRedisTemplate 也反对 Redis Sentinel、Cluster 集群模式,只须要调整配置即可。
Sentinel 配置如下

spring.redis.sentinel.master=mymaster
spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
spring.redis.sentinel.password=

spring.redis.sentinel.nodes配置的是 Sentinel 节点 IP 地址和端口,不是 Redis 实例节点 IP 地址和端口。

Cluster 配置如下

spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
spring.redis.lettuce.cluster.refresh.period=10000
spring.redis.lettuce.cluster.refresh.adaptive=true

如 Redis Cluster 中 node2 是 node1 的从节点,Lettuce 中会缓存该信息,当 node1 宕机后,Redis Cluster 会将 node2 降级为主节点。但 Lettuce 不会主动将申请切换到 node2,因为它的缓冲没有刷新。
开启 spring.redis.lettuce.cluster.refresh.adaptive 配置,Lettuce 能够定时刷新 Redis Cluster 集群缓存信息,动静扭转客户端的节点状况,实现故障转移。

临时未发现 ReactiveRedisTemplate 实现 pipeline,事务的计划。

官网文档:https://docs.spring.io/spring…:reactive
文章残缺代码:https://gitee.com/binecy/bin-…

如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!

正文完
 0