AQS同步组件Semaphore

38次阅读

共计 2575 个字符,预计需要花费 7 分钟才能阅读完成。

Semaphore

什么是 Semaphore?

是用于控制某个资源同一时间被线程访问的个数,提供 acquire()和 release()方法,acquire 获取一个许可,如果没有获取的到就等待,release 是在操作完成后释放一个许可,Semaphore 维护了当前访问的个数,通过同步机制来控制同时访问的个数,在数据结构里链表中的节点是可以无限个的,而 Semaphore 里维护的是一个有大小的限链表。

Semaphore 的使用场景

Semaphore 用于仅能提供有限访问的资源, 比如数据库中的链接数只有 20 但是我们上层应用数可能远大于 20,如果同时都对数据库链接进行获取,那很定会因为链接获取不到而报错,所以我们就要对数据库链接的访问进行控制。

演示代码
@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放一个许可} catch (Exception e) {log.error("exception", e);
                }
            });
        }
        exec.shutdown();}

    private static void test(int threadNum) throws Exception {log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

我们在执行 test(threadNum) 方式前后包裹上 acquire 和 release,这样其实我们就相当于一个单线程在执行。当执行 acquire 后就只能等待执行 release 后再执行新的线程,然后我们在 acquire() 和 release()都是没有传参也就是 1,每次只允许一个线程执行,如果我们改成

semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可 

那么我们就是每 3 个 3 个执行直到把线程池中的线程执行完。在打印的日志中我们也可以看到是每三个每三个打印。

假设我们的数据库允许获取连接是 3 剩余的获取线程我们不想要只想丢弃改如何实现?

Semaphore 提供了一个尝试获取许可的方法,tryAcquire()尝试获取许可成功就执行,尝试获取许可失败就丢弃线程。下面看代码

@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可}
                } catch (Exception e) {log.error("exception", e);
                }
            });
        }
        exec.shutdown();}

    private static void test(int threadNum) throws Exception {log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

这段代码执行结果就只打印了 3 行日志,其他的线程就被丢弃了。tryAcquire() 共提供如下几种方法。

我们用一个例子来演示一下参数的方法的使用。

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可}
                } catch (Exception e) {log.error("exception", e);
                }
            });
        }
        exec.shutdown();}

    private static void test(int threadNum) throws Exception {log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

这次我们使用的是一个 tryAcquire(5000, TimeUnit.MILLISECONDS)) 方法,这个方法的第一个参数是表示等待 5000 毫秒,第二参数是表示多长时间尝试一次,TimeUnit.MILLISECONDS 表示 1 毫秒。这时候我们会发现 20 个线程都执行了,为什么会这样呢?因为我们在执行时等待超时时间是 5 秒,每次执行就是 sleep 1 秒,所以可以获取成 tryAcquire 进而执行。

正文完
 0