乐趣区

关于java:redis实现可重入分布式锁

实现的要求

  1. 应用 lua 加锁和开释锁
  2. 加锁开释锁须要 id, 保障只能开释本人加的锁
  3. 可重入锁,一个线程取得锁后还能够再加锁
  4. 自旋锁或称为阻塞锁

环境:

redis 6.0

spring boot data redis 2.3.4

可重入锁的实现原理

个别都是间接应用 set key value nx px 实现分布式锁,这种形式无奈实现可重入锁。这里应用 redis hash 实现可重入锁,hash 的 field 的值记录重入次数。

Lock 锁实现

Lock 接口

package com.example.shop.service;

import java.util.concurrent.TimeUnit;

public interface Lock {
    // 非阻塞的,立刻返回
    boolean tryLock(String uid);
    // 有超时工夫
    boolean tryLock(String uid,long timeOut, TimeUnit timeUnit);
    // 阻塞的
    void lock(String uid);
    // 开释锁
    void unLock(String uid);
}

RedisLock 接口

package com.example.shop.service;

public interface RedisLock {
    // 获取 Lock 实例
    Lock getLock(String key);
}

RedisLockImpl 实现类

package com.example.shop.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class RedisLockImpl implements RedisLock, Serializable {

    private final RedisTemplate<String,String> redisTemplate;

    public RedisLockImpl(RedisTemplate<String,String> redisTemplate){this.redisTemplate=redisTemplate;}

    @Override
    public Lock getLock(String key) {return new LockImpl(key);
    }

    class LockImpl implements Lock{private static final String lockStr = "local key = KEYS[1]\n" +
                "local lockKey = ARGV[1]\n" +
                "local lockCount = 1\n" +
                "\n" +
                "\n" +
                "local val = redis.call('hget',key,lockKey)\n" +
                "if val then\n" +
                "redis.call('hincrby',key,lockKey,1)\n" +
                "return 1\n"+
                "end\n" +
                "if redis.call('exists',key)==0 then\n" +
                "redis.call('hset',key,lockKey,1)\n"+
                "redis.call('expire',lockKey,5)\n" +
                "return 1\n" +
                "end\n" +
                "return 0";

        private static final String unLockStr = "-- 开释锁 \n" +
                "local key = KEYS[1]\n" +
                "local lockKey = ARGV[1]\n" +
                "local value = redis.call('hget',key,lockKey)\n" +
                "if value then\n" +
                "if tonumber(value)>1 then\n" +
                "redis.call('hincrby',key,lockKey,-1)\n" +
                "else\n" +
                "redis.call('del',key)\n" +
                "end\n" +
                "end";

        //key 过期工夫
        private static final long keyTimeOut = 30000;

        private static final long SLEEP_TIME = 50;

        private final DefaultRedisScript<Long> lockScript= new DefaultRedisScript<>(lockStr,Long.class);

        private final DefaultRedisScript<Long> unLockScript = new DefaultRedisScript<>(unLockStr,Long.class);

        private String key;

        public LockImpl(String key){this.key = key;}

        // 尝试加锁,立刻返回
        @Override
        public  boolean tryLock(String uuid){Long lc = redisTemplate.execute(lockScript, List.of(this.key),uuid,String.valueOf(keyTimeOut));
            log.info(uuid+"试图加锁");
            return lc!=null&&lc==1;
        }

        // 自旋锁
        @Override
        public  boolean tryLock(String uuid,long timeOut, TimeUnit timeUnit){if (timeOut<0) throw new IllegalArgumentException("timeOut is illegal");
            final long allTime = timeUnit.toMillis(timeOut);
            long start = System.currentTimeMillis();
            while (start+allTime>=System.currentTimeMillis()){if (this.tryLock(uuid)) return true;
                try{Thread.sleep(SLEEP_TIME);
                }catch (Exception e){e.printStackTrace();
                }
            }
            return false;
        }

        @Override
        public  void lock(String uuid){while (true){if (this.tryLock(uuid)) return;
                try{Thread.sleep(SLEEP_TIME);
                }catch (Exception e){e.printStackTrace();
                }
            }
        }

        @Override
        public void unLock(String uuid) {redisTemplate.execute(unLockScript,List.of(this.key),uuid);
            log.info(uuid+"开释锁");
        }
    }
}

实现上应用 lua 脚本,保障原子性,锁的 key 就是 hset 的 key,加锁 id 为 hash 的一个 key,加锁的次数为这个 key 的值,重入锁把这个值每次加 1,开释锁每次减 1,直到值为 1 时删除这个键。

开释锁时须要判断这个锁是不是他加的,不能呈现开释他人加的锁。

lua 脚本调试

redis 3.2 开始反对应用 ldb 调试 lua 脚本

lock.lua

local key = KEYS[1]
local lockKey = ARGV[1]
local lockCount = 1
local val = redis.call('hget',key,lockKey)
if val then
    -- 可重入
    redis.call('hincrby',key,lockKey,1)
    return 1
end
if redis.call('exists',key)==0 then
    redis.call('hset',key,lockKey,1)
    redis.call('expire',lockKey,5)
    return 1
end
return 0;

应用如下命令开始调试,逗号前为 key,前面为参数

redis-cli --ldb --eval lock.lua order , consumer-1

进入 debugger 模式后

s 单步执行

p 打印变量值

b 增加断点

c 执行到下一个断点

简略测试

RedisService.java

package com.example.shop.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RedisService {

    private final RedisLock redisLock;

    private static int count = 10;

    private Lock lock;


    public RedisService(RedisLock redisLock) {this.lock = redisLock.getLock("order");
    }

    public String buy(String uid) {
        try {lock.lock(uid);
            if (count > 0) {
                count--;
                log.info("{} 胜利购买到 iphone, 残余库存 {}", uid, count);
                return uid + ":胜利购买到 iphone, 残余库存:" + count;
            } else {return "库存有余";}
        } finally {lock.unLock(uid);
            log.info("unlock {}", uid);

        }
    }
}

ShopApplication.java

package com.example.shop;

import com.example.shop.service.RedisService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication(scanBasePackages = {"com.example.shop"})
@RestController
public class ShopApplication {

    private final RedisService redisService;

    public ShopApplication(RedisService redisService) {this.redisService = redisService;}

    public static void main(String[] args) {SpringApplication.run(ShopApplication.class, args);
    }

    @PostMapping("/buy")
    public String buy(String uid){return redisService.buy(uid);
    }
}

模仿抢购 10 个商品。

python 并发申请:

import requests as req
import threading
def start(name):
    r = req.post('http://localhost:8080/buy',{'uid':name}) 
    print(r.text)

for i in range(15):

    th = threading.Thread(target=start,args=["consumer-{}".format(i)])
    th.start()

后果:

总结

​ redis 实现分布式锁形式很多,最简略的形式是应用 set key value nx px 实现,间接一步实现。最重要的就是要谁加的锁谁开释,锁阻塞不是最好的实现形式,耗费大量网络和 cpu 资源,可重入锁须要应用 hash 数据结构实现。须要设置过期实现,防止出现死锁,还有就是锁超时问题,业务逻辑还没执行完,锁就被开释了,导致数据不统一,可在这个根底上退出主动续期性能。

​ 相熟 lua 脚本很重要,lua 在 redis 中是原子执行的。

​ 分布式锁还能够应用 zookeeper,etcd 实现。要相熟分布式锁的利用场景,单体利用间接在办法或代码快加锁即可,服务集群,微服务,分布式系统须要利用分布式锁。

另外,还须要留神的就是 redis 主从复制对分布式锁带来的影响。

退出移动版