本文分享Spring中如何实现Redis响应式交互模式。
本文将模仿一个用户服务,并应用Redis作为数据存储服务器。
本文波及两个java bean,用户与权利
public class User { private long id; private String name; // 标签 private String label; // 收货地址经度 private Double deliveryAddressLon; // 收货地址维度 private Double deliveryAddressLat; // 最新签到日 private String lastSigninDay; // 积分 private Integer score; // 权利 private List<Rights> rights; ...}public class Rights { private Long id; private Long userId; private String name; ...}
启动
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
增加Redis配置
spring.redis.host=192.168.56.102spring.redis.port=6379spring.redis.password=spring.redis.timeout=5000
SpringBoot启动
@SpringBootApplicationpublic class UserServiceReactive { public static void main(String[] args) { new SpringApplicationBuilder( UserServiceReactive.class) .web(WebApplicationType.REACTIVE).run(args); }}
利用启动后,Spring会主动生成ReactiveRedisTemplate(它的底层框架是Lettuce)。
ReactiveRedisTemplate与RedisTemplate应用相似,但它提供的是异步的,响应式Redis交互方式。
这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate发送Redis申请后不会阻塞线程,以后线程能够去执行其余工作。
等到Redis响应数据返回后,ReactiveRedisTemplate再调度线程解决响应数据。
响应式编程能够通过优雅的形式实现异步调用以及解决异步后果,正是它的最大的意义。
序列化
ReactiveRedisTemplate默认应用的序列化是Jdk序列化,咱们能够配置为json序列化
@Beanpublic RedisSerializationContext redisSerializationContext() { RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext(); builder.key(StringRedisSerializer.UTF_8); builder.value(RedisSerializer.json()); builder.hashKey(StringRedisSerializer.UTF_8); builder.hashValue(StringRedisSerializer.UTF_8); return builder.build();}@Beanpublic ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) { RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext); return reactiveRedisTemplate;}
builder.hashValue办法指定Redis列表值的序列化形式,因为本文Redis列表值只寄存字符串,所以还是设置为StringRedisSerializer.UTF_8。
根本数据类型
ReactiveRedisTemplate反对Redis字符串,散列,列表,汇合,有序汇合等根本的数据类型。
本文应用散列保留用户信息,列表保留用户权利,其余根本数据类型的应用本文不开展。
public Mono<Boolean> save(User user) { ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash(); Mono<Boolean> userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user)); if(user.getRights() != null) { ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList(); opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userRs;}
beanToMap办法负责将User类转化为map。
HyperLogLog
Redis HyperLogLog构造能够统计一个汇合内不同元素的数量。
应用HyperLogLog统计每天登录的用户量
public Mono<Long> login(User user) { ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog(); return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());}
BitMap
Redis BitMap(位图)通过一个Bit位示意某个元素对应的值或者状态。因为Bit是计算机存储中最小的单位,应用它进行贮存将十分节俭空间。
应用BitMap记录用户本周是否有签到
public void addSignInFlag(long userId) { String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16); redisTemplate.opsForValue().setBit( key, userId & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b));}
userId高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。
offset参数必须大于或等于0,小于2^32(bit 映射被限度在 512 MB 之内)。
Geo
Redis Geo能够存储地理位置信息,并对地理位置进行计算。
如查找给定范畴内的仓库信息
public Flux getWarehouseInDist(User u, double dist) { ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo(); Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist); RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending(); return geo.radius("warehouse:address", circle, args);}
warehouse:address
这个汇合中须要先保留好仓库地理位置信息。
ReactiveGeoOperations#radius办法能够查找汇合中地理位置在给定范畴内的元素,它中还反对增加元素到汇合,计算汇合中两个元素地理位置间隔等操作。
Lua
ReactiveRedisTemplate也能够执行Lua脚本。
上面通过Lua脚本实现用户签到逻辑:如果用户明天未签到,容许签到,积分加1,如果用户明天已签到,则拒接操作。
public Flux<String> addScore(long userId) { DefaultRedisScript<String> script = new DefaultRedisScript<>(); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua"))); List<String> keys = new ArrayList<>(); keys.add(String.valueOf(userId)); keys.add(LocalDateTime.now().toString().substring(0, 10)); return redisTemplate.execute(script, keys);}
signin.lua内容如下
local score=redis.call('hget','user:'..KEYS[1],'score')local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')if(day==KEYS[2]) then return '0'else redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2]) return '1'end
Stream
Redis Stream 是 Redis 5.0 版本新减少的数据类型。该类型能够实现音讯队列,并提供音讯的长久化和主备复制性能,并且能够记住每一个客户端的拜访地位,还能保障音讯不失落。
Redis借鉴了kafka的设计,一个Stream内能够存在多个生产组,一个生产组内能够存在多个消费者。
如果一个生产组内某个消费者生产了Stream中某条音讯,则这音讯不会被该生产组其余消费者生产到,当然,它还能够被其余生产组中某个消费者生产到。
上面定义一个Stream消费者,负责解决接管到的权利数据
@Componentpublic class RightsStreamConsumer implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class); @Autowired private RedisConnectionFactory redisConnectionFactory; private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container; // Stream队列 private static final String STREAM_KEY = "stream:user:rights"; // 生产组 private static final String STREAM_GROUP = "user-service"; // 消费者 private static final String STREAM_CONSUMER = "consumer-1"; @Autowired @Qualifier("reactiveRedisTemplate") private ReactiveRedisTemplate redisTemplate; public void run(ApplicationArguments args) throws Exception { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(100) //一批次拉取的最大count数 .executor(Executors.newSingleThreadExecutor()) //线程池 .pollTimeout(Duration.ZERO) //阻塞式轮询 .targetType(Rights.class) //指标类型(音讯内容的类型) .build(); // 创立一个音讯监听容器 container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // prepareStreamAndGroup查找Stream信息,如果不存在,则创立Stream prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP) .subscribe(stream -> { // 为Stream创立一个消费者,并绑定解决类 container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()), new StreamMessageListener()); container.start(); }); } @Override public void destroy() throws Exception { container.stop(); } // 查找Stream信息,如果不存在,则创立Stream private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) { // info办法查问Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume办法。 return ops.info(stream).onErrorResume(err -> { logger.warn("query stream err:{}", err.getMessage()); // createGroup办法创立Stream return ops.createGroup(stream, group).flatMap(s -> ops.info(stream)); }); } // 音讯解决对象 class StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> { public void onMessage(ObjectRecord<String, Rights> message) { // 解决音讯 RecordId id = message.getId(); Rights rights = message.getValue(); logger.info("receive id:{},rights:{}", id, rights); redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } }}
上面看一下如何发送信息
public Mono<RecordId> addRights(Rights r) { String streamKey = "stream:user:rights";//stream key ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r); Mono<RecordId> mono = redisTemplate.opsForStream().add(record); return mono;}
创立一个音讯记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。
Sentinel、Cluster
ReactiveRedisTemplate也反对Redis Sentinel、Cluster集群模式,只须要调整配置即可。
Sentinel配置如下
spring.redis.sentinel.master=mymasterspring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379spring.redis.sentinel.password=
spring.redis.sentinel.nodes
配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。
Cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379spring.redis.lettuce.cluster.refresh.period=10000spring.redis.lettuce.cluster.refresh.adaptive=true
如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2降级为主节点。但Lettuce不会主动将申请切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive
配置,Lettuce能够定时刷新Redis Cluster集群缓存信息,动静扭转客户端的节点状况,实现故障转移。
临时未发现ReactiveRedisTemplate实现pipeline,事务的计划。
官网文档:https://docs.spring.io/spring...:reactive
文章残缺代码:https://gitee.com/binecy/bin-...
如果您感觉本文不错,欢送关注我的微信公众号,系列文章继续更新中。您的关注是我保持的能源!