共计 5802 个字符,预计需要花费 15 分钟才能阅读完成。
分布式锁是在分布式环境下(多个 JVM 过程)管制多个客户端对某一资源的同步拜访的一种实现,与之绝对应的是线程锁,线程锁管制的是同一个 JVM 过程内多个线程之间的同步。分布式锁的个别实现办法是在应用服务器之外通过一个共享的存储服务器存储锁资源,同一时刻只有一个客户端能占有锁资源来实现。通常有基于 Zookeeper,Redis,或数据库三种实现模式。本文介绍基于 Redis 的实现计划。
要求
基于 Redis 实现分布式锁须要满足如下几点要求:
- 在分布式集群中,被分布式锁管制的办法或代码段同一时刻只能被一个客户端下面的一个线程执行,也就是互斥
- 锁信息须要设置过期工夫,防止一个线程长期占有(比方在做解锁操作前异样退出)而导致死锁
- 加锁与解锁必须统一,谁加的锁,就由谁来解(或过期超时),一个客户端不能解开另一个客户端加的锁
- 加锁与解锁的过程必须保障原子性
实现
1. 加锁实现
基于 Redis 的分布式锁加锁操作个别应用 SETNX
命令,其含意是“将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 曾经存在,则 SETNX 不做任何动作”。
在 Spring Boot 中,能够应用 StringRedisTemplate 来实现,如下,一行代码即可实现加锁过程。(下列代码给出两种调用模式——立刻返回加锁后果与给定超时工夫获取加锁后果)
/**
* 尝试获取锁(立刻返回)* @param key 锁的 redis key
* @param value 锁的 value
* @param expire 过期工夫 / 秒
* @return 是否获取胜利
*/
public boolean lock(String key, String value, long expire) {return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}
/**
* 尝试获取锁,并至少期待 timeout 时长
*
* @param key 锁的 redis key
* @param value 锁的 value
* @param expire 过期工夫 / 秒
* @param timeout 超时时长
* @param unit 工夫单位
* @return 是否获取胜利
*/
public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {long waitMillis = unit.toMillis(timeout);
long waitAlready = 0;
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {
try {Thread.sleep(waitMillisPer);
} catch (InterruptedException e) {log.error("Interrupted when trying to get a lock. key: {}", key, e);
}
waitAlready += waitMillisPer;
}
if (waitAlready < waitMillis) {return true;}
log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
return false;
}
上述实现如何满足后面提到的几点要求:
- 客户端互斥:能够将 expire 过期工夫设置为大于同步代码的执行工夫,比方同步代码块执行工夫为 1s,则可将 expire 设置为 3s 或 5s。防止同步代码执行过程中 expire 工夫到,其它客户端又能够获取锁执行同步代码块。
- 通过设置过期工夫 expire 来防止某个客户端长期占有锁。
- 通过 value 来管制谁加的锁,由谁解的逻辑,比方能够应用 requestId 作为 value,requestId 惟一标记一次申请。
- setIfAbsent 办法 底层通过调用 Redis 的
SETNX
命令,操作具备原子性。
谬误示例:
网上有如下实现,
public boolean lock(String key, String value, long expire) {boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(result) {stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
}
return result;
}
该实现的问题是如果在 result 为 true,但还没胜利设置 expire 时,程序异样退出了,将导致该锁始终被占用而导致死锁,不满足第二点要求。
2. 解锁实现
解锁也须要满足后面所述的四个要求,实现代码如下:
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
/**
* 开释锁
* @param key 锁的 redis key
* @param value 锁的 value
*/
public boolean unLock(String key, String value) {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}
这段实现应用一个 Lua 脚本来实现解锁操作,保障操作的原子性。传入的 value 值需与该线程加锁时的 value 统一,能够应用 requestId(具体实现上面给出)。
谬误示例:
public boolean unLock(String key, String value) {String oldValue = stringRedisTemplate.opsForValue().get(key);
if(value.equals(oldValue)) {stringRedisTemplate.delete(key);
}
}
该实现先获取锁的以后值,判断两值相等则删除。思考一种极其状况,如果在判断为 true 时,刚好该锁过期工夫到,另一个客户端加锁胜利,则接下来的 delete 将不管三七二十一将他人加的锁间接删掉了,不满足第三点要求。该示例次要是因为没有保障解锁操作的原子性导致。
3. 注解反对
为了方便使用,增加一个注解,能够放于办法上管制办法在分布式环境中的同步执行。
/**
* 标注在办法上的分布式锁注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {String key();
String prefix() default "disLock:";
long expire() default 10L; // 默认 10s 过期}
增加一个切面来解析注解的解决,
/**
* 分布式锁注解解决切面
*/
@Aspect
@Slf4j
public class DistributedLockAspect {
private DistributedLock lock;
public DistributedLockAspect(DistributedLock lock) {this.lock = lock;}
/**
* 在办法上执行同步锁
*/
@Around(value = "@annotation(lockable)")
public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
boolean locked = false;
String key = lockable.prefix() + lockable.key();
try {locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());
if(locked) {return point.proceed();
} else {log.info("Did not get a lock for key {}", key);
return null;
}
} catch (Exception e) {throw e;} finally {if(locked) {if(!lock.unLock(key, WebUtil.getRequestId())){log.warn("Unlock {} failed, maybe locked by another client already.", lockable.key());
}
}
}
}
}
RequestId 的实现如下,通过注册一个 Filter,在申请开始时生成一个 uuid 存于 ThreadLocal 中,在申请返回时革除。
public class WebUtil {
public static final String REQ_ID_HEADER = "Req-Id";
private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();
public static void setRequestId(String requestId) {reqIdThreadLocal.set(requestId);
}
public static String getRequestId(){String requestId = reqIdThreadLocal.get();
if(requestId == null) {requestId = ObjectId.next();
reqIdThreadLocal.set(requestId);
}
return requestId;
}
public static void removeRequestId() {reqIdThreadLocal.remove();
}
}
public class RequestIdFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
// 没有则生成一个
if (StringUtils.isEmpty(reqId)) {reqId = ObjectId.next();
}
WebUtil.setRequestId(reqId);
try {filterChain.doFilter(servletRequest, servletResponse);
} finally {WebUtil.removeRequestId();
}
}
}
// 在配置类中注册 Filter
/**
* 增加 RequestId
* @return
*/
@Bean
public FilterRegistrationBean requestIdFilter() {RequestIdFilter reqestIdFilter = new RequestIdFilter();
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(reqestIdFilter);
List<String> urlPatterns = Collections.singletonList("/*");
registrationBean.setUrlPatterns(urlPatterns);
registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
return registrationBean;
}
4. 应用注解
@DistributedLockable(key = "test", expire = 10)
public void test(){System.out.println("线程 -"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now());
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("线程 -"+Thread.currentThread().getName()+"完结执行..." + LocalDateTime.now());
}
总结
本文给出了基于 Redis 的分布式锁的实现计划与常见的谬误示例。要保障分布式锁的正确运行,需满足本文所提的四个要求,尤其留神保障加锁解锁操作的原子性,设置过期工夫,及对同一个锁的加锁解锁线程统一。原文地址:http://blog.jboost.cn/distrib…
[转载请注明出处]
作者:雨歌
欢送关注作者公众号:半路雨歌,查看更多技术干货文章