实现的要求

  1. 应用lua加锁和开释锁
  2. 加锁开释锁须要id,保障只能开释本人加的锁
  3. 可重入锁,一个线程取得锁后还能够再加锁
  4. 自旋锁或称为阻塞锁

环境:

redis 6.0

spring boot data redis 2.3.4

可重入锁的实现原理

个别都是间接应用set key value nx px实现分布式锁,这种形式无奈实现可重入锁。这里应用redis hash实现可重入锁,hash的field的值记录重入次数。

Lock锁实现

Lock接口

package com.example.shop.service;import java.util.concurrent.TimeUnit;public interface Lock {    //非阻塞的,立刻返回    boolean tryLock(String uid);    //有超时工夫    boolean tryLock(String uid,long timeOut, TimeUnit timeUnit);    //阻塞的    void lock(String uid);    //开释锁    void unLock(String uid);}

RedisLock接口

package com.example.shop.service;public interface RedisLock {    //获取Lock实例    Lock getLock(String key);}

RedisLockImpl实现类

package com.example.shop.service;import lombok.extern.slf4j.Slf4j;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.stereotype.Component;import java.io.Serializable;import java.util.List;import java.util.concurrent.TimeUnit;@Component@Slf4jpublic class RedisLockImpl implements RedisLock, Serializable {    private final RedisTemplate<String,String> redisTemplate;    public RedisLockImpl(RedisTemplate<String,String> redisTemplate){        this.redisTemplate=redisTemplate;    }    @Override    public Lock getLock(String key) {        return new LockImpl(key);    }    class LockImpl implements Lock{        private static final String lockStr = "local key = KEYS[1]\n" +                "local lockKey = ARGV[1]\n" +                "local lockCount = 1\n" +                "\n" +                "\n" +                "local val = redis.call('hget',key,lockKey)\n" +                "if val then\n" +                "    redis.call('hincrby',key,lockKey,1)\n" +                "    return 1\n"+                "end\n" +                "if redis.call('exists',key)==0 then\n" +                "    redis.call('hset',key,lockKey,1)\n"+                "    redis.call('expire',lockKey,5)\n" +                "    return 1\n" +                "end\n" +                "return 0";        private static final String unLockStr = "--开释锁\n" +                "local key = KEYS[1]\n" +                "local lockKey = ARGV[1]\n" +                "local value = redis.call('hget',key,lockKey)\n" +                "if value then\n" +                "    if tonumber(value)>1 then\n" +                "        redis.call('hincrby',key,lockKey,-1)\n" +                "    else\n" +                "        redis.call('del',key)\n" +                "    end\n" +                "end";        //key过期工夫        private static final long keyTimeOut = 30000;        private static final long SLEEP_TIME = 50;        private final DefaultRedisScript<Long> lockScript= new DefaultRedisScript<>(lockStr,Long.class);        private final DefaultRedisScript<Long> unLockScript = new DefaultRedisScript<>(unLockStr,Long.class);        private String key;        public LockImpl(String key){            this.key = key;        }        //尝试加锁,立刻返回        @Override        public  boolean tryLock(String uuid){            Long lc = redisTemplate.execute(lockScript, List.of(this.key),uuid,String.valueOf(keyTimeOut));            log.info(uuid+" 试图加锁");            return lc!=null&&lc==1;        }        //自旋锁        @Override        public  boolean tryLock(String uuid,long timeOut, TimeUnit timeUnit){            if (timeOut<0) throw new IllegalArgumentException("timeOut is illegal");            final long allTime = timeUnit.toMillis(timeOut);            long start = System.currentTimeMillis();            while (start+allTime>=System.currentTimeMillis()){                if (this.tryLock(uuid)) return true;                try{                    Thread.sleep(SLEEP_TIME);                }catch (Exception e){                    e.printStackTrace();                }            }            return false;        }        @Override        public  void lock(String uuid){            while (true){                if (this.tryLock(uuid)) return;                try{                    Thread.sleep(SLEEP_TIME);                }catch (Exception e){                    e.printStackTrace();                }            }        }        @Override        public void unLock(String uuid) {            redisTemplate.execute(unLockScript,List.of(this.key),uuid);            log.info(uuid+" 开释锁");        }    }}

实现上应用lua脚本,保障原子性,锁的key就是hset的key,加锁id为hash的一个key,加锁的次数为这个key的值,重入锁把这个值每次加1,开释锁每次减1,直到值为1时删除这个键。

开释锁时须要判断这个锁是不是他加的,不能呈现开释他人加的锁。

lua脚本调试

redis 3.2开始反对应用ldb调试lua脚本

lock.lua

local key = KEYS[1]local lockKey = ARGV[1]local lockCount = 1local val = redis.call('hget',key,lockKey)if val then    --可重入    redis.call('hincrby',key,lockKey,1)    return 1endif redis.call('exists',key)==0 then    redis.call('hset',key,lockKey,1)    redis.call('expire',lockKey,5)    return 1endreturn 0;

应用如下命令开始调试,逗号前为key,前面为参数

redis-cli --ldb --eval lock.lua order , consumer-1

进入debugger模式后

s 单步执行

p 打印变量值

b 增加断点

c 执行到下一个断点

简略测试

RedisService.java

package com.example.shop.service;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;@Component@Slf4jpublic class RedisService {    private final RedisLock redisLock;    private static int count = 10;    private Lock lock;    public RedisService(RedisLock redisLock) {        this.lock = redisLock.getLock("order");    }    public String buy(String uid) {        try {            lock.lock(uid);            if (count > 0) {                count--;                log.info("{}胜利购买到iphone,残余库存{}", uid, count);                return uid + ":胜利购买到iphone,残余库存:" + count;            } else {                return "库存有余";            }        } finally {            lock.unLock(uid);            log.info("unlock {}", uid);        }    }}

ShopApplication.java

package com.example.shop;import com.example.shop.service.RedisService;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RestController;@SpringBootApplication(scanBasePackages = {"com.example.shop"})@RestControllerpublic class ShopApplication {    private final RedisService redisService;    public ShopApplication(RedisService redisService) {        this.redisService = redisService;    }    public static void main(String[] args) {        SpringApplication.run(ShopApplication.class, args);    }    @PostMapping("/buy")    public String buy(String uid){        return redisService.buy(uid);    }}

模仿抢购10个商品。

python并发申请:

import requests as reqimport threadingdef start(name):    r = req.post('http://localhost:8080/buy',{'uid':name})     print(r.text)for i in range(15):    th = threading.Thread(target=start,args=["consumer-{}".format(i)])    th.start()

后果:

总结

redis实现分布式锁形式很多,最简略的形式是应用set key value nx px实现,间接一步实现。最重要的就是要谁加的锁谁开释,锁阻塞不是最好的实现形式,耗费大量网络和cpu资源,可重入锁须要应用hash数据结构实现。须要设置过期实现,防止出现死锁,还有就是锁超时问题,业务逻辑还没执行完,锁就被开释了,导致数据不统一,可在这个根底上退出主动续期性能。

相熟lua脚本很重要,lua在redis中是原子执行的。

分布式锁还能够应用zookeeper,etcd实现。要相熟分布式锁的利用场景,单体利用间接在办法或代码快加锁即可,服务集群,微服务,分布式系统须要利用分布式锁。

另外,还须要留神的就是redis主从复制对分布式锁带来的影响。