关于缓存设计:玩转Java并发工具精通JUC成为并发多面手构建高性能缓存

7次阅读

共计 24747 个字符,预计需要花费 62 分钟才能阅读完成。

引言

《玩转 Java 并发工具、精通 JUC、成为并发多面手》构建高性能缓存这部分的集体笔记。本节为单纯的实战,次要是把之前学习并发编程的知识点串起来。

挺有意思的一个 demo,能够疾速理解到一些并发编程的时候须要留神的一些问题。

目录

整个高性能构建的梳理思路如下:

    1. 应用最简略的 HashMap
    1. 剖析 HashMap 实现的问题

      • 高并发拜访反复计算性能问题
      • 复用性能较差的问题
    1. 装璜模式形象计算业务

      • 解决复用性能较差的问题
    1. 应用 Future 改写计算实现接口

      • 避免业务反复计算
      • 如何解决高并发拜访反复计算性能问题
    1. 减少泛型
    1. 缓存过期和减少随机性

      • 为什么须要缓存过期?

        • 避免缓存净化
      • 为什么要减少随机性?

        • 避免同一个时刻大量缓存过期减少零碎压力
    1. 整体测试

代码

一个简略的小 demo,能够间接拷贝上面的包所属的各个版本代码到本人的我的项目浏览即可:
https://gitee.com/lazyTimes/interview/tree/master/src/main/java/com/zxd/interview/mycache

一、构建步骤

1. 应用最简略的 HashMap

最根底的版本实现非常简单,这是咱们通常会想到的利用缓存实现计划,这里应用了 Lombok 的 @Slf4j 注解进行日志打印。

整个逻辑非常简单,首先通过计算方法匹配缓存,如果有就取缓存内容,否则就调用计算方法而后把后果缓存到 HashMap 当中。

/**  
 * 初版高速缓存实现  
 * 1. 应用简略的 HashMap  
 * 2. 在高速缓存中进行计算  
 *  
 * 裸露问题:* 1. 复用性能查  
 * 2. HashMap 线程不平安,有可能反复判断  
 */  
@Slf4j  
public class MyCacheVersion1 {private final Map<String, Integer> cache = new HashMap<>();  
  
    /**  
     * 依据参数计算结果,此参数对于同一个参数会计算同样的后果  
     * 1. 如果缓存存在后果,间接返回  
     * 2. 如果缓存不存在,则须要计算增加到 map 之后才返回  
     * @param userId  
     * @return  
     */  
    public Integer compute(String userId) throws InterruptedException {if(cache.containsKey(userId)){log.info("cached => {}", userId);  
            return cache.get(userId);  
        }  
        log.info("doCompute => {}", userId);  
        Integer result = doCompute(userId);  
        // 不存在缓存就退出  
        cache.put(userId, result);  
        return result;  
    }  
  
    private Integer doCompute(String userId) throws InterruptedException {TimeUnit.SECONDS.sleep(5);  
        return Integer.parseInt(userId);  
    }  
  
  
}

初版存在较多的问题,比较显著的问题是 compute 这个办法在多线程的环境是不平安的,咱们能够编写测试程序验证。

在测试程序中,咱们应用线程池构建 100 个线程解决 1000 个计算工作。从打印后果中咱们随机抽取其中一个数字很容易呈现计算两次的状况,比方上面的状况:

  
/**  
 * 对应以后版本的测试程序  
 * 1. HashMap 线程不平安  
 * 2. 此程序验证线程平安问题  
 */  
@Slf4j  
public class Test {public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(100);  
        MyCacheVersion1 objectObjectMyCacheVersion1 = new MyCacheVersion1();  
        Random random = new Random(100);  
        for (int i = 0; i < 1000; i++) {executorService.execute(() -> {int randomInt = random.nextInt(100);  
                try {Integer user = objectObjectMyCacheVersion1.compute(String.valueOf(randomInt));  
                } catch (InterruptedException e) {throw new RuntimeException(e);  
                }  
  
            });  
        }  
        executorService.shutdown();}/** 运行后果
     测试后果:能够看到高并发的状况下十分有可能呈现反复计算而后 cache 的状况  
     10:56:41.437 [pool-1-thread-53] INFO  c.z.i.m.version1.MyCacheVersion1 - doCompute => 59  
     10:56:41.447 [pool-1-thread-97] INFO  c.z.i.m.version1.MyCacheVersion1 - doCompute => 59     
     */
}

能够发现 doCompute => 59 算了屡次。

解决线程不平安问题

线程不平安问题最简略的解决形式就是办法串行:

public synchronized Integer compute(String userId) throws InterruptedException

如果退出synchronized,则整个线程的解决会串行执行,然而效率极低。

/** 运行后果  
 串行之后执行效率极低,根本无奈应用 
 11:14:15.851 [pool-1-thread-1] INFO  c.z.i.m.version1.MyCacheVersion1 - doCompute => 15  
 11:14:20.862 [pool-1-thread-100] INFO  c.z.i.m.version1.MyCacheVersion1 - doCompute => 52 11:14:25.874 [pool-1-thread-99] INFO  c.z.i.m.version1.MyCacheVersion1 - doCompute => 55 

 */

2. 剖析 HashMap 实现的问题

简略剖析 HashMap 的缓存实现,次要的问题如下:

  • 高并发拜访反复计算性能问题
  • 复用性能较差的问题

3. 装璜模式形象计算业务

咱们先解决复用性能较差的问题,,这里须要应用装璜模式进行改写。首先定义 ComputeAble<A, V> 接口,这个接口定义了形象计算行为。

/**  
 * 可计算接口  
 * 装璜接口  
 */  
public interface ComputeAble<A, V>{  
  
    /**  
     * 依据指定参数 A 进行计算,计算结果为 V  
     * @description 依据指定参数 A 进行计算,计算结果为 V  
     * @param arg 泛型参数  
     * @return V 返回计算后后果  
     * @author xander  
     * @date 2023/6/15 15:42  
     */    
     V doCompute(A arg) throws Exception;  
}

定义一个实现类实现计算接口。

/**  
 * 装璜模式改写接口  
 */  
public class ExpensiveCompute implements ComputeAble<String, Integer>{  
    @Override  
    public Integer doCompute(String arg) throws InterruptedException {TimeUnit.SECONDS.sleep(5);
        return Integer.parseInt(arg);  
    }  
}

第二个版本的缓存实现区别是应用了装璜模式封装计算方法,其余办法临时不做调整。

/**  
 * 第二版,应用装璜模式进行革新  
 * synchronized 同步加锁  
 * @author  
 * @version v1.0.0  
 * @Package : version2  
 * @Description : 应用装璜模式进行革新  
 * @Create on : 2023/6/15 16:29  
 **/@Slf4j  
public class MyCacheVersion2 {  
  
    /**  
     * 缓存  
     */  
    private final Map<String, Integer> cache = new HashMap<>();  
    /**  
     * 计算方法实现对象  
     */  
    private final static ComputeAble<String, Integer> COMPUTE = new ExpensiveCompute();  
  
    public synchronized Integer compute(String userId) throws Exception {if(cache.containsKey(userId)){log.info("cached => {}", userId);  
            return cache.get(userId);  
        }  
        log.info("doCompute => {}", userId);  
        Integer result = doCompute(userId);  
        // 不存在缓存就退出  
        cache.put(userId, result);  
        return result;  
    }  
  
    /**  
     * 计算方法由具体的类实现封装  
     * @param userId  
     * @return  
     * @throws InterruptedException  
     */
     private Integer doCompute(String userId) throws Exception {return COMPUTE.doCompute(userId);  
    }  
}

通过下面的代码理解到,MyCacheVersion2 缓存实现类的具体计算逻辑形象到具体的实现类当中,如果想要切换新的逻辑,能够改写 COMPUTE 的实现类。

4. 应用 Future 改写计算实现接口解决反复计算问题

咱们简略剖析 HashMap 的缓存实现,次要问题如下:

  • 高并发拜访反复计算性能问题
  • <s> 复用性能较差的问题 </s>,通过装璜模式改写。

复用较差的问题解决了,上面介绍高并发拜访反复计算问题解决方法,咱们一步步介绍改写过程。为了不便了解这里临时先用具体类型代替泛型。

  1. 首先把 Map 的 value 存储改为存储Future< V > 后果,并且把 HashMap 改为线程平安的 ConcurrentHashMap。
private final Map<String, Future<Integer>> concurrentHashMap = new ConcurrentHashMap<>();
  1. 构建 FutureTask 对象,这里应用 Lambda 表达式间接调用 doCompute 办法。
FutureTask<Integer> future = new FutureTask<>(() -> doCompute(userId));
  1. 在计算函数中,大抵的逻辑并没有扭转,然而须要留神上面的细节:
  2. future.get() 在获取到后果之前会进行阻塞。
  3. ConcurrentHashMap 在相似如果不存在就退出的复合操作状况下须要思考反复设置缓存的问题。
  4. putIfAbsent 能够做一些复合操作,如果设置缓存失败,阐明有其余线程做过同样的操作,此时就能够从新操作一次获取后果即可。
  5. putIfAbsent 不胜利为什么不间接获取后果,而是要再计算一次,这是为了避免缓存刚好获取获取到一个 null 的值。
  
/**  
 * 第三个版本  
 * 1. 优化多线程拜访反复计算问题  
 *  
 * * @author  
 * @version v1.0.0  
 * @Package : version3  
 * @Description : 第三个版本  
 * @Create on : 2023/6/15 16:47  
 **/@Slf4j  
public class MyCacheVersion3 {  
    /**  
     * 革新,并发不平安汇合改为并发平安汇合  
     * value 存储为 future 的值  
     */  
    private final Map<String, Future<Integer>> concurrentHashMap = new ConcurrentHashMap<>();  
  
    /**  
     * 计算实现类  
     */  
    private static final ComputeAble<String, Integer> COMPUTEABLE = new ExpensiveCompute();  
  
    /**  
     * 先应用具体类型实现,后续改为应用泛型实现  
     * 1. 应用 FutureTask 对于要计算的值进行封装,依据 FutureTask 个性,获取到后果之前单个线程会始终期待  
     * 2. 因为计算方法变动,所有的代码须要调整  
     * 3. concurrentHashMap.get() 在 if 判断的时候仍然存在非原子行为,所以在设置的时候应用 putIfAbsent 原子操作  
     *  
     * @param userId  
     * @return  
     * @throws InterruptedException  
     * @throws ExecutionException  
     */    public Integer compute(String userId) throws InterruptedException, ExecutionException {Future<Integer> result = concurrentHashMap.get(userId);  
        // 如果获取不到内容,阐明不在缓存当中  
        if(Objects.isNull(result)){  
            // 此时利用 callAble 线程工作指定工作获取,在获取到后果之前线程会阻塞  
            FutureTask<Integer> future = new FutureTask<>(() -> doCompute(userId));  
            // 把新的 future 笼罩之前获取的 future  
            result = future;  
            // 执行  
            future.run();  
            log.info("FutureTask 调用计算函数");  
            result = concurrentHashMap.putIfAbsent(userId, result);  
            // 如果返回 null,阐明这个记录被增加过了  
            if(Objects.isNull(result)){log.info("其余线程进行设置, 从新执行计算");  
                // 阐明其余线程曾经设置过值,这里从新跑一次计算方法即可间接获取  
                result = future;  
                // 再从新跑一次  
                future.run();  
                return result.get();}else{return result.get();  
            }  
        }  
        return result.get();}  
  
    /**  
     * 计算方法由具体的类实现封装  
     * @param userId  
     * @return  
     * @throws InterruptedException  
     */    private Integer doCompute(String userId) throws Exception {return COMPUTEABLE.doCompute(userId);  
    }  
}

5. 泛型接口改写

有了下面的实现根底,改为为泛型就容易很多了,泛型的写法实际上就是把之前的具体类型转为泛型即可。

这里的代码可能不残缺,倡议把结尾局部的仓库代码放到本地验证。

/**  
 * 第四个版本,办法改为泛型实现  
 * @author  
 * @version v1.0.0  
 * @Package : version3  
 * @Description : 第三个版本  
 * @Create on : 2023/6/15 16:47  
 **/@Slf4j  
public class MyCacheVersion4<A, V> {  
    /**  
     * 革新,并发不平安汇合改为并发平安汇合  
     * value 存储为 future 的值  
     */  
    private final Map<A, Future<V>> concurrentHashMap = new ConcurrentHashMap<>();  
  
    private final ComputeAble computeAble = new ExpensiveCompute<>();  
  
    /**  
     * 先应用具体类型实现,后续改为应用泛型实现  
     * 1. 应用 FutureTask 对于要计算的值进行封装,依据 FutureTask 个性,获取到后果之前单个线程会始终期待  
     * 2. 因为计算方法变动,所有的代码须要调整  
     * 3. concurrentHashMap.get() 在 if 判断的时候仍然存在非原子行为,所以在设置的时候应用 putIfAbsent 原子操作  
     * 4. 重构,应用泛型参数  
     * @param arg  
     * @return  
     * @throws InterruptedException  
     * @throws ExecutionException  
     */
     public V compute(A arg) throws InterruptedException, ExecutionException {Future<V> result = concurrentHashMap.get(arg);  
        // 如果获取不到内容,阐明不在缓存当中  
        if(Objects.isNull(result)){  
            // 此时利用 callAble 线程工作指定工作获取,在获取到后果之前线程会阻塞  
            FutureTask<V> future = new FutureTask<>(new Callable<V>() {  
                @Override  
                @SuppressWarnings("unchecked")  
                public V call() throws Exception {return (V) computeAble.doCompute(arg);  
                }  
            });  
            // 把新的 future 笼罩之前获取的 future  
            result = future;  
            // 执行  
            future.run();  
            log.info("FutureTask 调用计算函数");  
            result = concurrentHashMap.putIfAbsent(arg, result);  
            // 如果返回 null,阐明这个记录被增加过了  
            if(Objects.isNull(result)){log.info("其余线程进行设置, 从新执行计算");  
                // 阐明其余线程曾经设置过值,这里从新跑一次计算方法即可间接获取  
                result = future;  
                // 再从新跑一次  
                future.run();  
                return result.get();}else{return result.get();  
            }  
        }  
        return result.get();}  
}

6. 可能失败的计算导致缓存净化问题解决

察看另一个计算实现,当咱们的应用上面的形式会有什么样的成果?

/**  
 *  可能会呈现失败的计算方法  
 * @author Xander  
 * @version v1.0.0  
 * @Package : compute  
 * @Description : 可能会呈现失败的计算方法  
 * @Create on : 2023/6/19 10:40  
 **/public class MayFailCompute implements ComputeAble<String, Integer>{  
  
    /**  
     * 触发失败阈值  
     */  
    private static final int FAILURE_THRESHOLD = 50;  
  
    /**  
     * 随机数生成器  
     */  
    private static final Random RANDOM = new Random(100);  
  
    /**  
     * 有可能会呈现失败的计算方法  
     * @description 有可能会呈现失败的计算方法  
     * @param arg  
     * @return java.lang.Integer  
     * @author xander  
     * @date 2023/6/19 10:41  
     */    @Override  
    public Integer doCompute(String arg) throws Exception {if(RANDOM.nextInt() < FAILURE_THRESHOLD){throw new Exception("自定义异样");  
        }  
        TimeUnit.MILLISECONDS.sleep(5);  
        return Integer.parseInt(arg);  
    }  
}

因为一开始咱们就应用装璜模式改写过代码,所以要替换实现类非常简单:

private final ComputeAble computeAble = new MayFailCompute();

测试的后果毫不意外的呈现大量的失败。这样后果不合乎预期,尽管 50% 的失败率相当高,但实际上更多的是 从缓存中获取的后果就是异样信息 ,这种状况就是 缓存净化问题

为了解决缓存净化问题,咱们须要在 try/catch 中对于不同的状况进行不同的解决。在之前计算解决逻辑中一共会呈现上面三种状况:

  • CancellationException:线程被勾销抛出的异样。
  • InterruptedException:线程被中断时候抛出的异样。
  • ExecutionException:试图检索一个因抛出异样而停止的工作的后果时抛出的异样。

对于不同的异样要对应不同的解决态度:

  • CancellationExceptionInterruptedException 根本都是人为操作,这时候应该立刻终止工作。
  • 依据办法逻辑咱们晓得办法是有可能计算胜利的,只不过须要 多重试几次
  • while(true) 的退出能够让出错之后主动从新进行计算直到胜利为止,然而如果是人为勾销,就须要抛出异样并且手动结束任务。

咱们把下面的解决思路转化为代码,相干正文曾经退出,能够看上面的后果:

  
/**  
 * <pre>  
 * 第五个版本,当碰到会抛出异样的计算方法的状况这时候应该从新计算  
 * 对于不同的异样,也要对应不同的解决态度:*  
 * - CancellationException 和 InterruptedException 根本都是人为操作,这时候应该立刻终止工作。* - 依据办法逻辑,咱们能够晓得办法是有可能计算胜利的,只不过须要多重试几次。* - while(true) 的退出能够让出错之后主动从新进行计算直到胜利为止,然而如果是人为勾销,就须要抛出异样并且完结。* </pre>  
 * @author  
 * @version v1.0.0  
 * @Package : version3  
 * @Description : 第五个版本  
 * @Create on : 2023/6/15 16:47  
 **/@Slf4j  
public class MyCacheVersion5<A, V> {  
    /**  
     * 革新,并发不平安汇合改为并发平安汇合  
     * value 存储为 future 的值  
     */  
    private final Map<A, Future<V>> concurrentHashMap = new ConcurrentHashMap<>();  
  
    private final ComputeAble computeAble = new MayFailCompute();  
  
   public V compute(A arg) {return doCompute(arg);  
    }  
  
    private V doCompute(A arg) {  
        // 对于反复计算进行解决  
        while (true) {Future<V> result = concurrentHashMap.get(arg);  
            try {  
                // 如果获取不到内容,阐明不在缓存当中  
                if (Objects.isNull(result)) {  
                    // 此时利用 callAble 线程工作指定工作获取,在获取到后果之前线程会阻塞  
                    FutureTask<V> future = new FutureTask<>(new Callable<V>() {  
                        @Override  
                        @SuppressWarnings("unchecked")  
                        public V call() throws Exception {return (V) computeAble.doCompute(arg);  
  
                        }  
                    });  
                    // 把新的 future 笼罩之前获取的 future  
                    result = future;  
                    // 执行  
                    future.run();  
                    System.out.println("FutureTask 调用计算函数");  
                    result = concurrentHashMap.putIfAbsent(arg, result);  
                    // 如果返回 null,阐明这个记录被增加过了  
                    if (Objects.isNull(result)) {System.out.println("其余线程进行设置, 从新执行计算");  
                        // 阐明其余线程曾经设置过值,这里从新跑一次计算方法即可间接获取  
                        result = future;  
                        // 再从新跑一次  
                        future.run();  
                        return result.get();} else {return result.get();  
                    }  
                }  
                return result.get();} catch (CancellationException cancellationException) {log.warn("CancellationException result => {}", result);  
                // 线程在执行过程当中有可能被勾销  
                // 被勾销的时候不论如何解决,首先须要先从缓存中移除掉净化缓存  
                concurrentHashMap.remove(arg);  
                throw new RuntimeException(cancellationException);  
            } catch (InterruptedException e) {log.warn("InterruptedException result => {}", result);  
                // 线程被中断的异样解决  
                concurrentHashMap.remove(arg);  
                throw new RuntimeException(e);  
            } catch (ExecutionException e) {//            log.warn("ExecutionException result => {}", result);  
                log.info("移除缓存 Key => {},从新计算", arg);  
                concurrentHashMap.remove(arg);  
                // 不会抛出异样,而是从新在下一次循环中计算  
//            throw new RuntimeException(e);  
            /*            打印后果如下:FutureTask 调用计算函数  
            FutureTask 调用计算函数  
            result => 65            其余线程进行设置, 从新执行计算  
            result => 33            result => 65            result => 26            15:59:56.584 [pool-1-thread-30] INFO version5.MyCacheVersion5 - 移除缓存 Key => 75,从新计算  
            result => 75            15:59:56.584 [pool-1-thread-3] INFO version5.MyCacheVersion5 - 移除缓存 Key => 35,从新计算  
            15:59:56.584 [pool-1-thread-42] INFO version5.MyCacheVersion5 - 移除缓存 Key => 67,从新计算  
            15:59:56.584 [pool-1-thread-36] INFO version5.MyCacheVersion5 - 移除缓存 Key => 75,从新计算  
            15:59:56.584 [pool-1-thread-90] INFO version5.MyCacheVersion5 - 移除缓存 Key => 40,从新计算  
            15:59:56.585 [pool-1-thread-31] INFO version5.MyCacheVersion5 - 移除缓存 Key => 13,从新计算  
            15:59:56.586 [pool-1-thread-94] INFO version5.MyCacheVersion5 - 移除缓存 Key => 60,从新计算  
            Disconnected from the target VM, address: '127.0.0.1:11054', transport: 'socket'  
            Process finished with exit code 0  
            * */
        } catch (Exception e) {log.warn("Exception result => {}", result);  
                concurrentHashMap.remove(arg);  
                // 无奈解决的未知异样,间接抛出运行时异样不做任何解决。throw new RuntimeException(e);  
            }  
  
        }  
    }  
}

最初是测试局部。

  
/**  
 * 可能失败的计算导致缓存净化问题解决  
 * 1. 解决缓存净化问题。* 2. 异常情况尝试始终反复计算。*  
 * @author Xander  
 * @version v1.0.0  
 * @Package : version5  
 * @Description :  
 * @Create on : 2023/6/19 10:49  
 **/public class Test {public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(100);  
        MyCacheVersion5<String, Integer> myCacheVersion5 = new MyCacheVersion5<>();  
        Random random = new Random(100);  
        for (int i = 0; i < 1000; i++) {executorService.submit(() -> {int randomInt = random.nextInt(100);  
  
                Integer user = myCacheVersion5.compute(String.valueOf(randomInt));  
                System.out.println("result =>" + user);  
  
  
            });  
        }  
        executorService.shutdown();}/**  
     运行后果:短期内会有海量异样,这不合乎预期状况。根本原因是缓存不存在过期工夫,会存在有效的内容缓存计算  
     其余线程进行设置, 从新执行计算  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     java.util.concurrent.ExecutionException: java.lang.Exception: 自定义异样  
     at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)  
     at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:63)     at interview/version5.Test.lambda$main$0(Test.java:30)     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)     at java.base/java.lang.Thread.run(Thread.java:829)     Caused by: java.lang.Exception: 自定义异样  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:37)  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:14)     at interview/version5.MyCacheVersion5$1.call(MyCacheVersion5.java:47)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:53)     ... 7 more     java.util.concurrent.ExecutionException: java.lang.Exception: 自定义异样  
     at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)  
     at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:63)     at interview/version5.Test.lambda$main$0(Test.java:30)     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)     at java.base/java.lang.Thread.run(Thread.java:829)     Caused by: java.lang.Exception: 自定义异样  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:37)  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:14)     at interview/version5.MyCacheVersion5$1.call(MyCacheVersion5.java:47)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:53)     ... 7 more     java.util.concurrent.ExecutionException: java.lang.Exception: 自定义异样  
     at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)  
     at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:63)     at interview/version5.Test.lambda$main$0(Test.java:30)     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)     at java.base/java.lang.Thread.run(Thread.java:829)     Caused by: java.lang.Exception: 自定义异样  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:37)  
     at interview/compute.MayFailCompute.doCompute(MayFailCompute.java:14)     at interview/version5.MyCacheVersion5$1.call(MyCacheVersion5.java:47)     at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)     at interview/version5.MyCacheVersion5.compute(MyCacheVersion5.java:53)     ... 7 more  
     通过修复之后:16:09:07.705 [pool-1-thread-83] INFO version5.MyCacheVersion5 - 移除缓存 Key => 9,从新计算  
     FutureTask 调用计算函数  
     result => 37  
     16:09:07.705 [pool-1-thread-84] INFO version5.MyCacheVersion5 - 移除缓存 Key => 84,从新计算  
     16:09:07.705 [pool-1-thread-66] INFO version5.MyCacheVersion5 - 移除缓存 Key => 84,从新计算  
     FutureTask 调用计算函数  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     16:09:07.705 [pool-1-thread-91] INFO version5.MyCacheVersion5 - 移除缓存 Key => 9,从新计算  
     16:09:07.706 [pool-1-thread-2] INFO version5.MyCacheVersion5 - 移除缓存 Key => 84,从新计算  
     16:09:07.706 [pool-1-thread-3] INFO version5.MyCacheVersion5 - 移除缓存 Key => 40,从新计算  
     16:09:07.706 [pool-1-thread-5] INFO version5.MyCacheVersion5 - 移除缓存 Key => 84,从新计算  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     16:09:07.706 [pool-1-thread-91] INFO version5.MyCacheVersion5 - 移除缓存 Key => 9,从新计算  
     其余线程进行设置, 从新执行计算  
     16:09:07.706 [pool-1-thread-3] INFO version5.MyCacheVersion5 - 移除缓存 Key => 40,从新计算  
     16:09:07.706 [pool-1-thread-5] INFO version5.MyCacheVersion5 - 移除缓存 Key => 84,从新计算  
     FutureTask 调用计算函数  
     FutureTask 调用计算函数  
     其余线程进行设置, 从新执行计算  
     FutureTask 调用计算函数  
     FutureTask 调用计算函数  
     */  
}

7. 缓存过期和减少随机性

下面解决了缓存净化的问题,上面咱们尝试为整个高性能缓存增加缓存过期工夫,同时为避免“缓存雪崩”,减少过期工夫随机性,为了不便了解,这里拆分两个小局部介绍如何解决。

缓存过期

实现缓存过期这里要用到 定时线程池 ScheduledExecutorService。咱们间接定一个带过期工夫的新办法解决。

  
public V compute(A arg, long expireTime) {if (expireTime > 0) {SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() {  
            @Override  
            public void run() {  
                // 定期革除缓存的办法  
                expire(arg);  
            }  
  
            /**  
             * @description 留神须要同步办法,避免多线程反复增加定时工作  
             * @param arg  
             * @return void  
             * @author xander  
             * @date 2023/6/20 16:58  
             */            private synchronized void expire(A arg) {  
                // 查看以后 key 是否存在  
                Future<V> vFuture = concurrentHashMap.get(arg);  
                // 如果 value 存在,则须要进行  
                if(Objects.nonNull(vFuture)){  
                    // 如果工作被勾销,此时须要敞开对应的定时工作  
                    if(vFuture.isDone() ){log.warn("future 工作被勾销");  
                        vFuture.cancel(true);  
                    }  
                    log.warn("过期工夫到了,缓存被革除");  
                    concurrentHashMap.remove(arg);  
                }  
            }  
        }, expireTime, TimeUnit.MILLISECONDS);  
    }  
    return doCompute(arg);  
}

减少随机性

减少随机性的目标是避免缓存在同一个时刻大量生效这种状况。减少随机性的最简略办法就是在设置超时工夫的时候给一个随机 random 值。

  
public V compute(A arg, long expireTime, boolean isRandom){if(isRandom){return compute( arg,  expireTime);  
    }else{return compute( arg,  expireTime + RANDOM.nextInt(1000));  
    }  
}

8. 残缺代码

至此咱们整个高性能缓存构建实现,最终的成绩代码如下:

  
/**  
 * 高性能缓存第六版  
 *  
 * @author Xander  
 * @version v1.0.0  
 * @Package : version6  
 * @Description : 高性能缓存第六版  
 * @Create on : 2023/6/20 16:30  
 **/@Slf4j  
public class MyCacheVersion6<A, V> {private final Map<A, Future<V>> concurrentHashMap = new ConcurrentHashMap<>();  
    private static final Random RANDOM = new Random();;  
  
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10);  
  
    private final ComputeAble computeAble = new MayFailCompute();  
  
  
    public V compute(A arg) {return doCompute(arg);  
    }  
  
    public V compute(A arg, long expireTime) {if (expireTime > 0) {SCHEDULED_EXECUTOR_SERVICE.schedule(new Runnable() {  
                @Override  
                public void run() {  
                    // 定期革除缓存的办法  
                    expire(arg);  
                }  
  
                /**  
                 * @description 留神须要同步办法,避免多线程反复增加定时工作  
                 * @param arg  
                 * @return void  
                 * @author xander  
                 * @date 2023/6/20 16:58  
                 */                private synchronized void expire(A arg) {  
                    // 查看以后 key 是否存在  
                    Future<V> vFuture = concurrentHashMap.get(arg);  
                    // 如果 value 存在,则须要进行  
                    if(Objects.nonNull(vFuture)){  
                        // 如果工作被勾销,此时须要敞开对应的定时工作  
                        if(vFuture.isDone() ){log.warn("future 工作被勾销");  
                            vFuture.cancel(true);  
                        }  
                        log.warn("过期工夫到了,缓存被革除");  
                        concurrentHashMap.remove(arg);  
                    }  
                }  
            }, expireTime, TimeUnit.MILLISECONDS);  
        }  
        return doCompute(arg);  
    }  
  
    public V compute(A arg, long expireTime, boolean isRandom){if(isRandom){return compute( arg,  expireTime);  
        }else{return compute( arg,  expireTime + RANDOM.nextInt(1000));  
        }  
    }  
  
  
    private V doCompute(A arg) {  
        // 对于反复计算进行解决  
        while (true) {Future<V> result = concurrentHashMap.get(arg);  
            try {  
                // 如果获取不到内容,阐明不在缓存当中  
                if (Objects.isNull(result)) {  
                    // 此时利用 callAble 线程工作指定工作获取,在获取到后果之前线程会阻塞  
                    FutureTask<V> future = new FutureTask<>(new Callable<V>() {  
                        @Override  
                        @SuppressWarnings("unchecked")  
                        public V call() throws Exception {return (V) computeAble.doCompute(arg);  
  
                        }  
                    });  
                    // 把新的 future 笼罩之前获取的 future  
                    result = future;  
                    // 执行  
                    future.run();  
                    System.out.println("FutureTask 调用计算函数");  
                    result = concurrentHashMap.putIfAbsent(arg, result);  
                    // 如果返回 null,阐明这个记录被增加过了  
                    if (Objects.isNull(result)) {System.out.println("其余线程进行设置, 从新执行计算");  
                        // 阐明其余线程曾经设置过值,这里从新跑一次计算方法即可间接获取  
                        result = future;  
                        // 再从新跑一次  
                        future.run();  
                        return result.get();} else {return result.get();  
                    }  
                }  
                return result.get();} catch (CancellationException cancellationException) {log.warn("CancellationException result => {}", result);  
                // 线程在执行过程当中有可能被勾销  
                // 被勾销的时候不论如何解决,首先须要先从缓存中移除掉净化缓存  
                concurrentHashMap.remove(arg);  
                throw new RuntimeException(cancellationException);  
            } catch (InterruptedException e) {log.warn("InterruptedException result => {}", result);  
                // 线程被中断的异样解决  
                concurrentHashMap.remove(arg);  
                throw new RuntimeException(e);  
            } catch (ExecutionException e) {//            log.warn("ExecutionException result => {}", result);  
                log.info("移除缓存 Key => {},从新计算", arg);  
                concurrentHashMap.remove(arg);  
                // 不会抛出异样,而是从新在下一次循环中计算  
//            throw new RuntimeException(e);  
            } catch (Exception e) {log.warn("Exception result => {}", result);  
                concurrentHashMap.remove(arg);  
                // 无奈解决的未知异样,间接抛出运行时异样不做任何解决。throw new RuntimeException(e);  
            }  
  
        }  
    }  
}

测试代码

/**  
 * 缓存过期功能测试  
 * @author Xander  
 * @version v1.0.0  
 * @Package : version6  
 * @Description : 缓存过期功能测试  
 * @Create on : 2023/6/20 17:06  
 **/
public class Test {public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(100);  
        MyCacheVersion6<String, Integer> myCacheVersion5 = new MyCacheVersion6<>();  
        Random random = new Random(100);  
        for (int i = 0; i < 1000; i++) {executorService.submit(() -> {int randomInt = random.nextInt(100);  
  
                Integer user = myCacheVersion5.compute(String.valueOf(randomInt));  
                System.out.println("result =>" + user);  
  
  
            });  
        }  
        executorService.shutdown();}  
}

二、测试缓存性能

测试缓存性能的点蕴含上面的局部:

  • 应用线程池测试高性能缓存的性能
  • 应用 CountDownLatch 压力测试
  • 线程安全类 ThreadSafeFormatter 验证 CountDownLatch

之前咱们的 Test 测试都是应用线程池的模式,这里不过多介绍,这里提一下如何应用 CountDownLatch 进行”压力测试“,以及应用 ThreadSafeFormatter 验证 CountDownLatch 的性能。

  
/**  
 * ThreadSafeFormatter * @author Xander  
 * @version v1.0.0  
 * @Package : com.zxd.interview.mycache.version7  
 * @Description : 线程平安 ThreadSafeFormatter  
 * @Create on : 2023/6/22 11:11  
 **/public class ThreadSafeFormatter {public static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {  
  
        /**  
         * 每个线程会调用本办法一次,用于初始化  
         * @description 每个线程会调用本办法一次,用于初始化  
         * @param  
         * @return java.text.SimpleDateFormat  
         * @author xander  
         * @date 2023/6/22 11:30  
         */        
         @Override  
        protected SimpleDateFormat initialValue() {return new SimpleDateFormat("mm:ss");  
        }  
  
        /**  
         * 首次调用本办法时,会调用 initialValue();前面的调用会返回第一次创立的值  
         * @description 首次调用本办法时,会调用 initialValue();前面的调用会返回第一次创立的值  
         * @param  
         * @return java.text.SimpleDateFormat  
         * @author xander  
         * @date 2023/6/22 11:30  
         */        
        @Override  
        public SimpleDateFormat get() {return super.get();  
        }  
    };  
}
  
/**  
 * 整体性能测试  
 * @author Xander  
 * @version v1.0.0  
 * @Package : com.zxd.interview.mycache.version6  
 * @Description : 整体性能测试  
 * @Create on : 2023/6/20 17:06  
 **/public class Test {public static void main(String[] args) throws InterruptedException {long beginTime = System.currentTimeMillis();  
        ExecutorService executorService = Executors.newFixedThreadPool(100);  
        MyCacheVersion6<String, Integer> myCacheVersion5 = new MyCacheVersion6<>();  
        Random random = new Random(200);  
        CountDownLatch countDownLatch = new CountDownLatch(1);  
        for (int i = 0; i < 100; i++) {executorService.submit(() -> {int randomInt = random.nextInt(100);  
  
                try {countDownLatch.await();  
                    // 从线程平安的汇合当中取出以后工夫  
                    SimpleDateFormat simpleDateFormat = ThreadSafeFormatter.dateFormatter.get();  
                    System.out.println("simpleDateFormat =>"+ simpleDateFormat.format(new Date()));  
                } catch (InterruptedException e) {e.printStackTrace();  
                }  
                Integer user = myCacheVersion5.compute(String.valueOf(randomInt), 5000);  
                System.out.println("result =>" + user);  
            });  
        }  
        // 假如此时所有的申请须要 5 秒工夫筹备。Thread.sleep(1000);  
        countDownLatch.countDown();  
        executorService.shutdown();  
        long endTime = System.currentTimeMillis();  
        // 如果线程池没有进行始终死循环  
        while(!executorService.isTerminated()){ }  
        System.out.println("最终工夫" + (endTime - beginTime));  
    }/**  
  
     10:59:34.521 [pool-2-thread-3] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - future 工作被勾销  
     10:59:34.522 [pool-2-thread-3] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.521 [pool-2-thread-4] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - future 工作被勾销  
     10:59:34.522 [pool-2-thread-4] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.521 [pool-2-thread-1] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - future 工作被勾销  
     10:59:34.522 [pool-2-thread-1] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.521 [pool-2-thread-8] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - future 工作被勾销  
     10:59:34.522 [pool-2-thread-8] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.521 [pool-2-thread-7] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.521 [pool-2-thread-2] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - future 工作被勾销  
     10:59:34.522 [pool-2-thread-2] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
     10:59:34.522 [pool-2-thread-5] WARN com.zxd.interview.mycache.version6.MyCacheVersion6 - 过期工夫到了,缓存被革除  
  
     最终工夫 146  
  
     如果批改为多个工夫同时发动申请:最终工夫 1074 - 1000 主线程的睡眠工夫 = 74  
  
  
     能够看到工夫点都是在同一个分秒,能够人为 countDownlatch 是失效的  
     simpleDateFormat => 14:50  
     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50     simpleDateFormat => 14:50    
      */  
}
}

三、写在最初

这个小 demo 在本人实际上手写的时候还是有不少卡壳的中央,很多时候看懂不肯定能写进去,如果有疑难或者改良意见欢送探讨。

正文完
 0