共计 9965 个字符,预计需要花费 25 分钟才能阅读完成。
最近遇到了一个麻烦的需要,咱们须要一个微服务利用同时拜访两个不同的 Redis 集群。个别咱们不会这么应用 Redis,然而这两个 Redis 原本是不同业务集群,当初须要一个微服务同时拜访。
其实咱们在理论业务开发的时候,可能还会遇到相似的场景。例如 Redis 读写拆散,这个也是 spring-data-redis 没有提供的性能,底层连接池例如 Lettuce 或者 Jedis 都提供了获取只读连贯的 API,然而缺点有两个:
- 下层 spring-data-redis 并没有封装这种接口
- 基于 redis 的架构实现的,哨兵模式须要配置 sentinel 的地址,集群模式须要感知集群拓扑,在云原生环境中,这些都默认被云提供商暗藏了,裸露到里面的只有一个个动静 VIP 域名。
因而,咱们须要在 spring-data-redis 的根底上 实现一个动静切换 Redis 连贯的机制。
spring-data-redis 的配置类为:org.springframework.boot.autoconfigure.data.redis.RedisProperties
,能够配置单个 Redis 实例或者 Redis 集群的连贯配置。依据这些配置,会生成对立的 Redis 连贯工厂 RedisConnectionFactory
spring-data-redis 外围接口与背地的连贯相干形象关系为:
通过这个图,咱们能够晓得,咱们实现一个能够动静返回不同 Redis 连贯的 RedisConnectionFactory
即可,并且依据 spring-data-redis 的主动装载源码能够晓得,框架内的所有 RedisConnectionFactory
是 @ConditionalOnMissingBean
的,即咱们能够应用咱们本人实现的 RedisConnectionFactory
进行替换。
我的项目地址:https://github.com/JoJoTec/sp…
咱们能够给 RedisProperties
配置外层封装一个多 Redis 连贯的配置,即MultiRedisProperties
:
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "spring.redis")
public class MultiRedisProperties {
/**
* 默认连贯必须配置,配置 key 为 default
*/
public static final String DEFAULT = "default";
private boolean enableMulti = false;
private Map<String, RedisProperties> multi;
}
这个配置是在原有配置根底上的,也就是用户能够应用原有配置,也能够应用这种多 Redis 配置,就是须要配置 spring.redis.enable-multi=true
。multi 这个 Map 中放入的 key 是数据源名称,用户能够在应用 RedisTemplate 或者 ReactiveRedisTemplate 之前,通过这个数据源名称指定用哪个 Redis。
接下来咱们来实现 MultiRedisLettuceConnectionFactory
,即能够动静切换 Redis 连贯的 RedisConnectionFactory
,咱们的我的项目采纳的 Redis 客户端是 Lettuce:
public class MultiRedisLettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {this.connectionFactoryMap = connectionFactoryMap;}
public void setCurrentRedis(String currentRedis) {if (!connectionFactoryMap.containsKey(currentRedis)) {throw new RedisRelatedException("invalid currentRedis:" + currentRedis + ", it does not exists in configuration");
}
MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
}
@Override
public void destroy() throws Exception {connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
}
@Override
public void afterPropertiesSet() throws Exception {connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
}
private LettuceConnectionFactory currentLettuceConnectionFactory() {String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
if (StringUtils.isNotBlank(currentRedis)) {MultiRedisLettuceConnectionFactory.currentRedis.remove();
return connectionFactoryMap.get(currentRedis);
}
return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
}
@Override
public ReactiveRedisConnection getReactiveConnection() {return currentLettuceConnectionFactory().getReactiveConnection();}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {return currentLettuceConnectionFactory().getReactiveClusterConnection();}
@Override
public RedisConnection getConnection() {return currentLettuceConnectionFactory().getConnection();}
@Override
public RedisClusterConnection getClusterConnection() {return currentLettuceConnectionFactory().getClusterConnection();}
@Override
public boolean getConvertPipelineAndTxResults() {return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();}
@Override
public RedisSentinelConnection getSentinelConnection() {return currentLettuceConnectionFactory().getSentinelConnection();}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
}
}
逻辑非常简单,就是提供了设置 Redis 数据源的接口,并且放入了 ThreadLocal 中,并且仅对以后一次无效,读取后就清空。
而后,将 MultiRedisLettuceConnectionFactory 作为 Bean 注册到咱们的 ApplicationContext 中:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false)
@Configuration(proxyBeanMethods = false)
public class RedisCustomizedConfiguration {
/**
* @param builderCustomizers
* @param clientResources
* @param multiRedisProperties
* @return
* @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration
*/
@Bean
public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory(
ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
ClientResources clientResources,
MultiRedisProperties multiRedisProperties,
ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider
) {
// 读取配置
Map<String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap();
Map<String, RedisProperties> multi = multiRedisProperties.getMulti();
multi.forEach((k, v) -> {
// 这个其实就是框架中原有的源码应用 RedisProperties 的形式,咱们其实就是在 RedisProperties 里面包装了一层而已
LettuceConnectionConfiguration lettuceConnectionConfiguration = new LettuceConnectionConfiguration(
v,
sentinelConfigurationProvider,
clusterConfigurationProvider
);
LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources);
connectionFactoryMap.put(k, lettuceConnectionFactory);
});
return new MultiRedisLettuceConnectionFactory(connectionFactoryMap);
}
}
咱们来测试下,应用 embedded-redis 来启动本地 redis,从而实现单元测试。咱们启动两个 Redis,在两个 Redis 中放入不同的 Key,验证是否存在,并且测试同步接口,多线程调用同步接口,和屡次异步接口无期待订阅从而测试有效性。:
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"spring.redis.enable-multi=true",
"spring.redis.multi.default.host=127.0.0.1",
"spring.redis.multi.default.port=6379",
"spring.redis.multi.test.host=127.0.0.1",
"spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
// 启动两个 redis
private static RedisServer redisServer;
private static RedisServer redisServer2;
@BeforeAll
public static void setUp() throws Exception {System.out.println("start redis");
redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
redisServer.start();
redisServer2.start();
System.out.println("redis started");
}
@AfterAll
public static void tearDown() throws Exception {System.out.println("stop redis");
redisServer.stop();
redisServer2.stop();
System.out.println("redis stopped");
}
@EnableAutoConfiguration
@Configuration
public static class App { }
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ReactiveStringRedisTemplate reactiveRedisTemplate;
@Autowired
private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;
private void testMulti(String suffix) {
// 应用默认连贯,设置 "testDefault" + suffix, "testDefault" 键值对
redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
// 应用 test 连贯,设置 "testSecond" + suffix, "testDefault" 键值对
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
// 应用默认连贯,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在
Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
// 应用 test 连贯,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
}
// 单次验证
@Test
public void testMultiBlock() {testMulti("");
}
// 多线程验证
@Test
public void testMultiBlockMultiThread() throws InterruptedException {Thread thread[] = new Thread[50];
AtomicBoolean result = new AtomicBoolean(true);
for (int i = 0; i < thread.length; i++) {
int finalI = i;
thread[i] = new Thread(() -> {
try {testMulti("" + finalI);
} catch (Exception e) {e.printStackTrace();
result.set(false);
}
});
}
for (int i = 0; i < thread.length; i++) {thread[i].start();}
for (int i = 0; i < thread.length; i++) {thread[i].join();}
Assertions.assertTrue(result.get());
}
//reactive 接口验证
private Mono<Boolean> reactiveMulti(String suffix) {return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
.flatMap(b -> {multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
}).flatMap(b -> {return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {Assertions.assertTrue(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {Assertions.assertTrue(b);
return b;
});
}
// 屡次调用 reactive 验证,并且 subscribe,这自身就是多线程的
@Test
public void testMultiReactive() throws InterruptedException {for (int i = 0; i < 10000; i++) {reactiveMulti("" + i).subscribe(System.out::println);
}
TimeUnit.SECONDS.sleep(10);
}
}
运行测试,通过。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: