乐趣区

关于并发:30行自己写并发工具类Semaphore-CyclicBarrier-CountDownLatch是什么体验

30 行本人写并发工具类 (Semaphore, CyclicBarrier, CountDownLatch) 是什么体验?

前言

在本篇文章当中首先给大家介绍三个工具 Semaphore, CyclicBarrier, CountDownLatch 该如何应用,而后认真分析这三个工具外部实现的原理,最初会跟大家一起用 ReentrantLock 实现这三个工具。

并发工具类的应用

CountDownLatch

CountDownLatch最次要的作用是容许一个或多个线程期待其余线程实现操作。比方咱们当初有一个工作,有 $N$ 个线程会往数组 data[N] 当中对应的地位依据不同的工作放入数据,在各个线程将数据放入之后,主线程须要将这个数组当中所有的数据进行求和计算,也就是说主线程在各个线程放入之前须要阻塞住!在这样的场景下,咱们就能够应用CountDownLatch

下面问题的代码:

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {public static int[] data = new int[10];

    public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            int temp = i;
            new Thread(() -> {Random random = new Random();
                data[temp] = random.nextInt(100001);
                latch.countDown();}).start();}

        // 只有函数 latch.countDown() 至多被调用 10 次
        // 主线程才不会被阻塞
        // 这个 10 是在 CountDownLatch 初始化传递的 10
        latch.await();
        System.out.println("求和后果为:" + Arrays.stream(data).sum());
    }
}

在下面的代码当中,主线程通过调用 latch.await(); 将本人阻塞住,而后须要等他其余线程调用办法 latch.countDown() 只有这个办法被调用的次数等于在初始化时给 CountDownLatch 传递的参数时,主线程才会被开释。

CyclicBarrier

CyclicBarrier它要做的事件是,让一 组线程达到一个屏障(也能够叫同步点)时被阻塞,直到最初一个线程达到屏障时,屏障才会开门,所有被屏障拦挡的线程才会持续运行。咱们通常也将 CyclicBarrier 称作 路障

示例代码:

public class CycleBarrierDemo {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(5);

        for (int i = 0; i < 5; i++) {new Thread(() -> {
                try {System.out.println(Thread.currentThread().getName() + "开始期待");
                    // 所有线程都会调用这行代码
                    // 在这行代码调用的线程个数有余 5
                    // 个的时候所有的线程都会阻塞在这里
                    // 只有到 5 的时候,这 5 个线程才会被放行
                    // 所以这行代码叫做同步点 
                    barrier.await();
                    // 如果有第六个线程执行这行代码时
                    // 第六个线程也会被阻塞 晓得第 10
                    // 线程执行这行代码 6-10 这 5 个线程
                    // 才会被放行
                } catch (InterruptedException e) {e.printStackTrace();
                } catch (BrokenBarrierException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "期待实现");
            }).start();}
    }
}

咱们在初始化 CyclicBarrier 对象时,传递的数字为 5,这个数字示意只有 5 个线程达到同步点的时候,那 5 个线程才会同时被放行,而如果到了 6 个线程的话,第一次没有被放行的线程必须等到下一次有5 个线程达到同步点 barrier.await() 时,才会放行 5 个线程。

  • 比方刚开始的时候 5 个线程的状态如下,同步点 还没有 5 个线程达到,因而不会放行。
  • 当有 5 个线程或者更多的线程达到 同步点 barrier.await() 的时候,才会放行 5 个线程,留神是 5 个线程,如果有多的线程必须等到下一次汇合 5 个线程才会进行又一次放行,也就是说每次只放行 5 个线程,这也是它叫做CyclicBarrier(循环路障)的起因(因为每次放行 5 个线程,放行完之后从新计数,直到又有 5 个新的线程到来,才再次放行)。

Semaphore

Semaphore信号量)艰深一点的来说就是管制能执行某一段代码的线程数量,他能够控制程序的并发量!

semaphore.acquire

$\mathcal{R}$

semaphore.release

比方在下面的 acquirerelease之间的代码 $\mathcal{R}$ 就是咱们须要管制的代码,咱们能够通过 信号量 管制在某一个时刻能有多少个线程执行代码 $\mathcal{R}$。在信号量外部有一个计数器,在咱们初始化的时候设置为 $N$,当有线程调用 acquire 函数时,计数器须要减一,调用 release 函数时计数器须要加一,只有当计数器大于 0 时,线程调用 acquire 时才可能进入代码块 $\mathcal{R}$,否则会被阻塞,只有线程调用 release 函数时,被阻塞的线程能力被唤醒,被唤醒的时候计数器会减一。

示例代码:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {public static void main(String[] args) {Semaphore mySemaphore = new Semaphore(5);
        for (int i = 0; i < 10; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "筹备进入临界区");
                try {mySemaphore.acquire();
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "曾经进入临界区");
                try {TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "筹备来到临界区");
                mySemaphore.release();
                System.out.println(Thread.currentThread().getName() + "曾经来到临界区");
            }).start();}
    }
}

本人入手写并发工具类

在这一大节当中次要应用 ReentrantLock 实现下面咱们提到的三个并发工具类,因而你首先须要理解 ReentrantLock 这个工具。ReentrantLock中有两个次要的函数 lockunlock,次要用于临界区的爱护,在同一个时刻只能有一个线程进入被 lockunlock突围的代码块。除此之外你还须要理解 ReentrantLock.newCondition 函数,这个函数会返回一个条件变量 Condition,这个条件变量有三个次要的函数awaitsignalsignalAll,这三个函数的作用和成果跟 Object 类的 waitnotifynotifyAll一样,在浏览下文之前,大家首先须要理解他们的用法。

  • 哪个线程调用函数condition.await,那个线程就会被挂起。
  • 如果线程调用函数 conditon.signal,则会唤醒一个被condition.await 函数阻塞的线程。
  • 如果线程调用函数 conditon.signalAll,则会唤醒所有被condition.await 函数阻塞的线程。

CountDownLatch

咱们在应用 CountDownLatch 时,会有线程调用 CountDownLatchawait函数,其余线程会调用 CountDownLatchcountDown函数。在 CountDownLatch 外部会有一个计数器,计数器的值咱们在初始化的时候能够进行设置,线程每调用一次 countDown 函数计数器的值就会减一。

  • 如果在线程在调用 await 函数之前,计数器的值曾经小于或等于 0 时,调用 await 函数的线程就不会阻塞,间接放行。
  • 如果在线程在调用 await 函数之前,计数器的值大于 0 时,调用 await 函数的线程就会被阻塞,当有其余线程将计数器的值升高为 0 时,那么这个将计数器升高为 0 线程就须要应用 condition.signalAll() 函数将其余所有被 await 阻塞的函数唤醒。
  • 线程如果想阻塞本人的话能够应用函数 condition.await(),如果某个线程在进入临界区之后达到了唤醒其余线程的条件,咱们则能够应用函数condition.signalAll() 唤醒所有被函数 await 阻塞的线程。

下面的规定曾经将 CountDownLatch 的整体性能形容分明了,为了可能将代码解释分明,我将对应的文字解释放在了代码当中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCountDownLatch {private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int curValue;

    public MyCountDownLatch(int targetValue) {
        // 咱们须要有一个变量去保留计数器的值
        this.curValue = targetValue;
    }

    public void countDown() {
        // curValue 是一个共享变量
        // 咱们须要用锁爱护起来
        // 因而每次只有一个线程进入 lock 爱护
        // 的代码区域
        lock.lock();
        try {
            // 每次执行 countDown 计数器都须要减一
            // 而且如果计数器等于 0 咱们须要唤醒哪些被
            // await 函数阻塞的线程
            curValue--;
            if (curValue <= 0)
                condition.signalAll();}catch (Exception ignored){}
        finally {lock.unlock();
        }
    }

    public void await() {lock.lock();
        try {
            // 如果 curValue 的值大于 0
            // 则阐明 countDown 调用次数还不够
            // 须要将线程挂起 否则间接放行
            if (curValue > 0)
                // 应用条件变量 condition 将线程挂起
                condition.await();}catch (Exception ignored){}
        finally {lock.unlock();
        }
    }
}

能够应用上面的代码测试咱们本人写的CountDownLatch

public static void main(String[] args) throws InterruptedException {MyCountDownLatch latch = new MyCountDownLatch(5);
    for (int i = 0; i < 3; i++) {new Thread(() -> {latch.countDown();
            System.out.println(Thread.currentThread().getName() + "countDown 执行实现");
        }).start();}

    for (int i = 0; i < 10; i++) {new Thread(() -> {
            try {TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            try {latch.await();
            } catch (Exception e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() +  "latch 执行实现");
        }).start();}
}

CyclicBarrier

CyclicBarrier有一个路障(同步点),所有的线程达到路障之后都会被阻塞,当被阻塞的线程个数达到指定的数目的时候,就须要对指定数目的线程进行放行。

  • CyclicBarrier 当中会有一个数据 threadCount,示意在路障须要达到这个threadCount 个线程的时候才进行放行,而且须要放行 threadCount 个线程,这里咱们能够循环应用函数 condition.signal() 去唤醒指定个数的线程,从而将他们放行。如果线程须要将本人阻塞住,能够应用函数condition.await()
  • CyclicBarrier 当中须要有一个变量currentThreadNumber,用于记录以后被阻塞的线程的个数。
  • 用户还能够给 CyclicBarrier 传入一个 Runnable 对象,当放行的时候须要执行这个 Runnable 对象,你能够新开一个线程去执行这个 Runnable 对象,或者让唤醒其余线程的这个线程执行 Runnable 对象。

依据下面的 CyclicBarrier 要求,写出的代码如下(剖析和解释在正文当中):

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCyclicBarrier {private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int threadCount;
    private int currentThreadNumber;
    private Runnable runnable;

    public MyBarrier(int count) {threadCount = count;}

    /**
     * 容许传入一个 runnable 对象
     * 当放行一批线程的时候就执行这个 runnable 函数
     * @param count
     * @param runnable
     */
    public MyBarrier(int count, Runnable runnable) {this(count);
        this.runnable = runnable;
    }
    
    public void await() {lock.lock();
        currentThreadNumber++;
        try {
            // 如果阻塞的线程数量不到 threadCount 须要进行阻塞
            // 如果到了须要由这个线程唤醒其余线程
            if (currentThreadNumber == threadCount) {
                // 放行之后须要从新进行计数
                // 因为放行之后 condition.await();
                // 阻塞的线程个数为 0
                currentThreadNumber = 0;
                if (runnable != null) {new Thread(runnable).start();}
                // 唤醒 threadCount - 1 个线程 因为以后这个线程
                // 曾经是在运行的状态 所以只须要唤醒 threadCount - 1
                // 个被阻塞的线程
                for (int i = 1; i < threadCount; i++)
                    condition.signal();}else {
                // 如果数目还没有达到则须要阻塞线程
                condition.await();}
        }catch (Exception ignored){}
        finally {lock.unlock();
        }
    }

}

上面是测试咱们本人写的 路障 的代码:

public static void main(String[] args) throws InterruptedException {MyCyclicBarrier barrier = new MyCyclicBarrier(5, () -> {System.out.println(Thread.currentThread().getName() + "开启一个新线程");
        for (int i = 0; i < 1; i++) {System.out.println(i);
        }
    });

    for (int i = 0; i < 5; i++) {new Thread(() -> {System.out.println(Thread.currentThread().getName() + "进入阻塞");
            try {TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            barrier.await();
            System.out.println(Thread.currentThread().getName() + "阻塞实现");
        }).start();}
}

Semaphore

Semaphore能够管制执行某一段临界区代码的线程数量,在 Semaphore 当中会有两个计数器 semCountcurCount

  • semCount示意能够执行临界区代码的线程的个数。
  • curCount示意正在执行临界区代码的线程的个数。

这个工具实现起来也并不简单,具体分析都在正文当中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MySemaphore {private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int semCount;
    private int curCount;

    public MySemaphore(int semCount) {this.semCount = semCount;}

    public void acquire() {lock.lock();
        try {
            // 正在执行临界区代码的线程个数加一
            curCount++;
            // 如果线程个数大于指定的可能执行的线程个数
            // 须要将以后这个线程阻塞起来
            // 否则间接放行
            if (curCount > semCount) {condition.await();
            }
        }catch (Exception ignored) {}
        finally {lock.unlock();
        }
    }

    public void release() {lock.lock();
        try {
            // 线程执行完临界区的代码
            // 将要来到临界区 因而 curCount 
            // 须要减一
            curCount--;
            // 如果有线程阻塞须要唤醒被阻塞的线程
            // 如果没有被阻塞的线程 这个函数执行之后
            // 对后果也不会产生影响 因而在这里不须要进行
            // if 判断
            condition.signal();
            // signal 函数只对在调用 signal 函数之前
            // 被 await 函数阻塞的线程产生影响 如果
            // 某个线程调用 await 函数在 signal 函数
            // 执行之后,那么后面那次 signal 函数调用
            // 不会影响前面这次 await 函数
        }catch (Exception ignored){}
        finally {lock.unlock();
        }
    }
}

应用上面的代码测试咱们本人写的MySemaphore

public static void main(String[] args) {MySemaphore mySemaphore = new MySemaphore(5);
    for (int i = 0; i < 10; i++) {new Thread(() -> {mySemaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "曾经进入临界区");
            try {TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            mySemaphore.release();
            System.out.println(Thread.currentThread().getName() + "曾经来到临界区");
        }).start();}
}

总结

在本文当中次要给大家介绍了三个在并发当中罕用的工具类该如何应用,而后介绍了咱们本人实现三个工具类的细节,其实次要是利用 条件变量 实现的,因为它能够实现线程的阻塞和唤醒,其实只有大家理解 条件变量 的应用办法,和三种工具的需要大家也能够本人实现一遍。

以上就是本文所有的内容了,心愿大家有所播种,我是 LeHung,咱们下期再见!!!(记得 点赞珍藏 哦!)


更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版