关于java:Redis高并发架构实战学习笔记

37次阅读

共计 12777 个字符,预计需要花费 32 分钟才能阅读完成。

(1)先来一个小案例作为切入点

/*
这里记为代码一
*/
@RestController
public class IndexController {
    
    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;    // 组件 spring-boot-starter-data-redis

    @RequestMapping("/deduct_stock")
    public String deductStock(){int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }

        return "end";
    }
}

而后在 redis 中搞一个库存为 200

当初很显著,代码一 存在线程平安问题,会有可能读到都是 200,而后都减 1 后设置为 199,就不对了。
很多同学都会想到加一把锁

(2)synchronized

/*
代码二
*/
public String deductStock(){synchronized (this){int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }

    return "end";
}

这样确实是只能有一个线程执行操作,的确是线程平安了。然而它只能在单机环境下运行,只能锁住一个 tomcat,分布式的时候就不行了。

(3)分布式锁

这时,应该思考分布式锁。SETNX(SET if Not eXists)。和 set 的区别是:
set tuling A
set tuling B
后果会是 B
setnx tuling A
setnx tuling B
后果会是 A

/*
代码三
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}

    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
        System.out.println("扣减胜利,残余库存:" + realStock);
    }else{System.out.println("扣减失败,库存有余");
    }

    stringRedisTemplate.delete(lockKey);

    return "end";
}


redis 那边是单线程操作的,会排队,只有排队头的能够设置胜利,前面的设置不胜利,这样入门级的分布式锁设计完了。大家想想还有没有问题?
这个时候还是存在问题,当获取到锁的线程有异样,导致没法删除 key,就会导致其余线程获取不到锁,就算能捕捉异样,但如果是零碎挂了呢,运维重启呢

/*
代码四
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}

    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }finally {stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

示例 代码四 还是存在问题,大家先想想解决办法。

(4)锁超时

这样的话,能够加一个超时工夫来解决,给 key 一个超时工夫,即便零碎挂了,一段时间之后,其余机器还是能失常拜访

/*
代码五
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
    stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    
    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}

    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }finally {stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

大家想想 代码五 还有问题吗?

(5)加锁操作原子性

假如设置了 key 之后,正筹备设置超时工夫,但零碎挂了,那还是回到之前的问题了,得保障原子性。应该应用 setIfAbsent 的其余重载办法,有一个是能够同时设置超时工夫的

/*
代码六
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling", 10, TimeUnit.SECONDS);

    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}

    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }finally {stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

大家思考一下,代码六 还有没有问题?
遇到高并发的时候,通常执行会比较慢,慢执行啊,两头 sql 语句执行很慢这样,假如执行完这个办法须要 15 秒,当线程执行了 10 秒的时候,因为设置了超时工夫是 10 秒,并且是高并发场景,这个时候 key 就删除了,另外的线程就获取了锁

这样就相当于锁永恒生效。尽管把过期工夫放大是能够防止,但还是无奈彻底解决问题。
实质是本人加的锁被他人解掉了,所以解决就是锁只能本人解锁

/*
代码七
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    String clientId = UUID.randomUUID().toString();

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);

    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}

    try{int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }finally {if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){stringRedisTemplate.delete(lockKey);
        }
    }

    return "end";
}

代码七 按下面的例子,锁是本人过期的,这代码只是能保障线程 1 无奈删除线程 2 的锁,但线程 1 和线程 2 还是同时在跑啊。这个工夫还有问题,然而先不论,先放放,因为不是想要引申的内容,要持续思考这个代码还有除工夫外的什么其余问题?
就是 finally 中的两行代码非原子,写并发代码和写高并发代码时的区别,应该要习惯性的在代码之间空几行,表明这里执行有时间差,非原子。
假如执行判断完 clientId 的确是等于以后线程的 value,假如这时刚好是 9.9 秒,忽然产生卡顿,但这个 if 判断曾经是 true 了,正筹备 delete 的时候,卡顿了,这时曾经过了 10 秒,线程 2 曾经获取了锁,而后线程 1 执行 delete,又出问题了,依然是线程 1 删除了线程 2 的锁。
怎么解决?

(6)锁续命

锁续命 :通常是这样解决的,有一个分线程定时工作,用来监测线程还是否持有锁,还持有的就缩短锁的过期工夫,例如锁超时是 30 秒,那么分线程每 10 秒判断一下,线程还是否持有锁,还持有就更新过期工夫,不能说是缩短,是按以后工夫又从新设置 30 秒过期,当不持有了,定时工作就完结,分线程也完结。
redisson:操作 redis 的客户端,有很多分布式性能,其中就有分布式锁。想起了吧? 代码一 中就曾经引入了 redisson

/*
代码八
*/
public String deductStock(){

    String lockKey = "product_101";
    // 如果返回 false,阐明 redis 中有这个 key 了,不做任何操作。如果返回 true 阐明执行这个命令之前没有这个 key,并设置胜利了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    /*String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    if(!result){return "error_code";    // 给前端错误码,以后零碎忙碌,请稍后再试}*/

    RLock redissonLock = redisson.getLock(lockKey);

    try{
        // 加锁
        redissonLock.lock();    // 了解为执行了 setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS)
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   // 能够了解为 jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     // 能够了解为 jedis.set(key,value)
            System.out.println("扣减胜利,残余库存:" + realStock);
        }else{System.out.println("扣减失败,库存有余");
        }
    }finally {redissonLock.unlock();
        /*if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){stringRedisTemplate.delete(lockKey);
        }*/
    }

    return "end";
}

redisson 加锁外围 lua 脚本

KEYS[1]:product_101
ARGV[2]:getLockName(threadId)
ARGV[1]:internalLockLeaseTime(初始化是 30 秒)
能够看到第 250 行和 251 行,就相当于 代码五 中的

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     // 就了解为 jedis.setnx(key,value)
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

而这两行代码是不具备原子性的,线程不平安。Lua 脚本能够保障原子性

锁续命:

/*
https://github.com/redisson/redisson/blob/redisson-3.6.5/redisson/src/main/java/org/redisson/RedissonLock.java
redisson-3.6.5 RedissonLock.java,其余版本会不太不一样,但原理应该不变吧
*/
    private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                            "redis.call('pexpire', KEYS[1], ARGV[1]);" +
                            "return 1;" +
                        "end;" +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {log.error("Can't update lock "+ getName() +" expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();
        }
    }

它提早 internalLockLeaseTime / 3 秒执行 run 办法,为它从新设置 expire 为 internalLockLeaseTime
commandExecutor.evalWriteAsync返回了一个 future,而后future 又增加监听器,最初执行以后办法scheduleExpirationRenewal(threadId);,就是始终反复续命,又再提早调用,相当于定时工作。

到目前为止,根本就没有什么坑了,redisson 曾经是填了很多坑,能够放心使用 代码八 进行实现。
然而,还有点问题,假如有多个申请在执行 redissonLock.lock()加锁,只能有一个线程在解决,其余都得等着,零碎就会很慢,存在性能问题,该怎么优化能做到双十一能用的级别?

(7)zookeeper

redis 个别都是有主从架构的,根本不会是单机应用

redis 主节点马上通知客户端加锁胜利,线程 1 就执行业务代码逻辑,而后 redis 筹备把 key 同步给从节点时候,后果主节点挂了,某个从节点选举成为新的 Master 主节点,来了个线程 3 拜访新的主节点加锁,线程 3 就发现没有 product_101 这个 key,又能够加锁胜利了,线程 1 业务逻辑还没执行结束,线程 3 就开始执行,就又呈现了问题

主从架构锁生效的问题,能够用 zookeeper 来实现分布式锁,和 redis 相似,是树形构造。redis 更多的实现是 AP 架构,zookeeper 更多的实现是 CAP 架构。
zookeeper 的话,当要写一个 key,不是就立刻返回胜利的,会先把 key 同步给集群的其余节点,子节点会返回同步胜利的信息,主节点会判断是否曾经有超过半数的子节点都同步胜利,这时才通知客户端胜利了,是为了保障一致性,就义了及时响应,但它能保障那些曾经同步了子节点能力胜利 leader,redis 就没有这个机制,也就是线程 3 再来申请 leader 的时候,必然会有 key,加锁就不胜利,解决了上述问题。
但如果不应用 zookeeper,就是要应用 redis 来解决呢?(因为 redis 的并发比 zookeeper 高不少)如果要高并发,就用 redis,就有上述主从锁问题,如果要保障健壮性就用 zookeeper,但就义了并发数。

(8)Redlock

硬是要应用 redis 的话,看看 Redlock

redis 没有主从关系,是对等的,往每个节点发送加锁命令,只有超过半数的节点返回胜利才认为客户端加锁胜利,和 zookeeper 原理相似。但这种形式不举荐,原来是一个 redis 节点,当初搞多个,要半数加锁胜利,对咱们加锁性能受肯定影响,这样的话,还不如用 zookeeper,因为 redlock 还有不少问题。

@RequestMapping("/redlock")
public String redlock(){

    String lockKey = "product_101";
    RLock lock1 = redisson.getLock(lockKey);
    RLock lock2 = redisson.getLock(lockKey);
    RLock lock3 = redisson.getLock(lockKey);

    // 依据多个 RLock 对象构件 RedissonRedLock
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

    try{
        /*
        * waitTimeout 尝试获取锁的最大等待时间,超过这个数,则认为获取锁失败
        * leaseTime   锁的持有工夫,超过这个工夫锁会主动失败(值应设置为大于业务解决的工夫,确保在锁有效期内业务能解决完)* */
        boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
        if(res){// 胜利获取锁,解决业务}
    }catch (Exception e) {throw  new RuntimeException("lock fail");
    }finally {
        // 无论如何,最初都要解锁
        redLock.unlock();}

    return "end";
}

回到 代码八,redissonLock.lock(); 会导致其余线程期待,也就是分布式锁把并行申请变串行化执行了。那么如何晋升分布式锁性能?

(9)分段锁

模拟 ConcurrentHashMap,分段锁。
假如 product_101 的数量是 200,那么能够分十段,
product_101_1=20
product_101_2=20
product_101_3=20
……
product_101_10=20
200 个库存分 10 个 key 存到 redis 中去,让每个线程去减不同的段位的库存,如果不够减的话就减一下个段位,实现的话有点难,但能够了解这个思维,就不再去扩大了。

(10)缓存数据库双写不统一

接下来说 redis 作为缓存应用的时候,常见问题有:缓存无底洞、缓存穿透、缓存雪崩、缓存生效、热点 key 歪斜、热点 key 重建、缓存数据库双写不统一。
这里针对缓存数据库双写不统一的问题说一下。
什么是缓存数据库双写不统一?

看上去线程 1 写数据库,而后更新缓存,线程 2 写数据库,而后更新缓存,没有什么问题,但如果线程 1 操作较慢(小卡顿)呢?

有些人就会说,通常不会间接更新缓存,而是把缓存删掉,即更新就删缓存,读数据的时候再设置缓存,确实这样是比拟好,因为每次写完就更新缓存的话,如果不读缓存,相当于白更新。

但这样还是有问题

还有什么办法解决?

(10)解决双写不统一的办法

提早双删 :删缓存删两次,删除之后 sleep(一段时间) 后再删一次
但这种办法只能说是缩小,并不能解决问题,并且还让所有的写申请都得 sleep 一段时间

内存队列:用 hash 运算把操作路由到某个队列中程序执行。是能够解决,但简单,写不好很可能有性能问题或是 bug

还有没有其余解决办法?

问题的实质就是操作过程中不是原子性,如果(写数据库 - 删除缓存)是不可分割的操作,(查缓存 - 查数据库 - 更新缓存)是不可分割的操作,即在操作前加分布式锁,操作完后解锁,所有线程的操作为队列,把多个并发执行的线程串行化

间接这样上锁,性能必定是有问题的,怎么优化?

(11)读多写少的状况

间接上分布式锁会有问题,应用读写锁
读写锁 :读操作加读锁,写操作加写锁,读操作不互斥,写锁跟读锁、写锁跟写锁互斥。
因为很多零碎都是读多写少的状况,所以能够进步性能

/*
代码九
*/
@RequestMapping("/get_stock")
public String getStock(@RequestParam("clientId") Long clientId) throws InterruptedException{

    String lockKey = "product_101";

    RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
    RLock rLock = readWriteLock.readLock();

    rLock.lock();
    System.out.println("获取读锁胜利:client="+clientId);
    String stock = stringRedisTemplate.opsForValue().get("stock");
    if(StringUtils.isEmpty(stock)){System.out.println("查询数据库库存为 10。。。");
        Thread.sleep(5000);
        stringRedisTemplate.opsForValue().set("stock", 10);
    }
    rLock.unlock();
    System.out.println("开释读锁胜利:client="+clientId);

    return "end";
}

@RequestMapping("/update_stock")
public String updateStock(@RequestParam("clientId") Long clientId) throws InterruptedException{

    String lockKey = "product_101";

    RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
    RLock writeLock = readWriteLock.writeLock();

    writeLock.lock();
    System.out.println("获取写锁胜利:client="+clientId);
    System.out.println("批改商品 101 的数据库库存为 6。。。");
    stringRedisTemplate.delete("stock");
    Thread.sleep(5000);
    writeLock.unlock();
    System.out.println("开释写锁胜利:client="+clientId);

    return "end";
}

原理就是 lua 脚本为每个 key 设置一个 mode 的值来记录是 read 还是 write。
RedissonWriteLock.java

但如果读多写也多的状况呢,怎么解决?
不采纳下面的办法,依然是给缓存过期工夫,而后操作的时候间接操作数据库。例如在页面上看到的库存,其实很多时候都是和数据库的值不统一的,就是为了实现高并发,又要用数据库又要用缓存,只能就义一致性,就义一致性其实关系并不大,想一想,假如统一的话,退出购物车、下订单,两头是有时间差的,这个时候可能就没有了库存了,对用户来说是不统一,但对程序来说,程序以及保障了统一,只是意义不大,所以就义一致性来进步性能。假如过期工夫是一分钟,那在这一分钟内可能是不统一,但如果一分钟后库存不变,又读取更新了缓存,这个时候就变统一了,只须要确保在下单的时候是用 db 的数据即可。

(12)读多写多的状况

如果是读多写多,又要保障缓存数据库一致性,怎么办?
对读多写多的场景,就不应该用缓存,间接操作数据库就好了,对吧。
也有办法既应用缓存,又应答读多写多的场景,中间件 canal。前面就学不着了,须要报课。。。。当前再看看


原本想附上视频地址的,但被驳回了说是广告。。。这里我说一下以上是从诸葛老师那里学习的,仅作集体记录

正文完
 0