乐趣区

共享锁

共享锁和独占锁

  • 共享锁:允许多个线程同时获取锁,如 Semaphore,CountDownLatch,ReadLock 等。
  • 独占锁:每次只能一个线程持有锁,如 ReentrantLock,synchronized,WriteLock 等。

AQS 中实现共享锁

AQS 中提供了独占锁和共享锁。独占锁使用 acquire 和 release 方法实现;共享锁使用 acquireShared 和 releaseShared 实现。下面我们看看源码中共享锁具体怎么实现的,以 Semaphore 和 CountDownLatch 为例

Semaphore

Semaphore(信号量):控制同时访问的线程个数。可以设置一个阈值,阈值内的线程可以同时获取锁,超过阈值的线程则需要等待锁释放,有点类似于线程池。

举例:公司有五名员工要去上厕所,但是厕所只有三个坑位,则另外两个人需要等待前面的人上完以后才能进,代码如下

public static void main(String[] args) {ThreadFactory threadFactory = new ThreadNameFactory();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),threadFactory);
    // 公司里厕所有 3 个坑位
    Semaphore semaphore = new Semaphore(3);
    // 5 个人去拉粑粑
    for (int i = 0; i < 5; i++) {People people = new People(i);
        threadPoolExecutor.execute(()->{
            try {System.out.println(people.getName() + "准备去上厕所");
                semaphore.acquire();
                System.out.println(people.getName() + "找到坑位,开始拉粑粑");
                Thread.sleep(3000);// 模拟拉粑粑
                System.out.println(people.getName() + "拉完粑粑,准备出厕所");
                semaphore.release();
                System.out.println(people.getName() + "走出厕所");
            } catch (InterruptedException e) {e.printStackTrace();
            }
        });
    }
    threadPoolExecutor.shutdown();}

Semaphore 通过 new Semaphore(3) 设置阈值

public Semaphore(int permits) {sync = new NonfairSync(permits);
}

这里可以看出信号量也是支持公平锁的,默认非公平锁。acquire() 获取锁,此方法调用的是 AQS 的 tryAcquireShared 方法,然后在自己 NonfairSync 中实现此方法。

static final class NonfairSync extends Sync {
    ......
    protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
    }
}
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

release() 释放锁,此方法调用的是 AQS 的 tryReleaseShared 方法,然后在自己 NonfairSync 中实现此方法。

protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

如果通过 new Semaphore(1) 这样来初始化信号量,这就相当于独占锁,和 ReentrantLock 作用类似。

CountDownLatch

CountDownLatch(线程计数器):一个线程等待其他线程多个线程执行完再一起执行。使用场景:主线程依赖其他子线程的数据。异步转同步操作。

举例:计算阿里接口返回的数据和腾讯接口返回数据的总和。代码如下

public static void main(String[] args) throws Exception{CountDownLatch countDownLatch = new CountDownLatch(2);
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadNameFactory());
    final Integer[] ints = new Integer[2];
    threadPoolExecutor.execute(()->{
        try {System.out.println("请求阿里接口");
            Thread.sleep(2000);
            System.out.println("请求阿里接口结束");
            ints[0] = 10;
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {countDownLatch.countDown();
        }
    });
    threadPoolExecutor.execute(()->{
        try {System.out.println("请求腾讯接口");
            Thread.sleep(3000);
            System.out.println("请求腾讯接口结束");
            ints[1] = 20;
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {countDownLatch.countDown();
        }
    });
    System.out.println("等待两个接口请求完");
    countDownLatch.await();
    System.out.println("两个接口都已请求完");
    int result = ints[0] + ints[1];
    System.out.println(result);
    threadPoolExecutor.shutdown();}

new CountDownLatch(int count) 当需要几个子线程加入主线程时就传几个,如果传多了,主线程会一直等待。CountDownLatch 也是通过 AQS 来实现的,new CountDownLatch(int count) 初始化方法设置 AQS 中的 state 的初始值,每调用 countDown() 方法,state 都会减 1,当主线程调用 await() 方法时,会等待 state 等于 0 时才会继续往下执行。

总结

共享锁允许多个线程同时获取锁,并发访问共享资源,提高效率。


退出移动版