关于后端:13分钟聊聊并发包中常用同步组件并手写一个自定义同步组件

37次阅读

共计 11397 个字符,预计需要花费 29 分钟才能阅读完成。

前言

上篇文章 10 分钟从源码级别搞懂 AQS(AbstractQueuedSynchronizer)说到 JUC 并发包中的同步组件大多应用 AQS 来实现

本篇文章通过 AQS 本人来实现一个同步组件,并从源码级别聊聊 JUC 并发包中的罕用同步组件

本篇文章须要的前置常识就是 AQS,如果不理解 AQS 的同学能够看上一篇文章哈~

浏览本篇文章大略须要 13 分钟

自定义同步组件

为了更容易了解其余同步组件,咱们先来应用 AQS 本人来实现一个罕用的可重入锁

AQS 模板办法流程是固定的,咱们次要只须要来实现它的尝试获取同步状态和尝试开释同步状态办法即可

首先咱们先规定要实现的可重入锁是独占式的

规定同步状态一开始为 0,当有线程获取锁胜利同步状态就为 1,当这个线程重入时就累加同步状态

规定开释同步状态时每次扣减 1 个同步状态,只有当同步状态扣减到 0 时,才是真正的开释独占锁

咱们应用一个外部类 Sync 来继承 AQS 并重写 tryAcquire 尝试获取同步状态、tryRelease 尝试开释同步状态、isHeldExclusively判断以后线程是否持有同步状态(期待、告诉时会用到该办法)

    static class Sync extends AbstractQueuedSynchronizer {
        /**
         * 判断以后线程是否持有同步状态
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
    }

在获取同步状态中

  1. 先判断是否有同步状态(即同步状态是否为 0),如果有同步状态就用 CAS 去获取(0->1),胜利就设置以后线程为获取同步状态的线程
  2. 如果没有同步状态(即同步状态不为 0),就查看获取同步状态的线程是否为以后线程,如果是以后线程则阐明此次是重入,累减轻入次数
  3. 其余状况阐明未获取到同步状态,返回 false 后续走 AQS 流程(构建节点退出 AQS)
        /**
         * 尝试获取同步状态
         *
         * @param arg 获取同步状态的数量
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            //1. 获取同步状态
            int state = getState();
            //2. 如果有同步状态则 CAS 替换 0->1
            if (state == 0) {if (compareAndSetState(state, 1)) {
                    // 替换胜利 阐明获取到同步状态 设置以后获取同步状态线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
                //3. 没有同步状态  查看获取同步资源的线程是否为以后线程  可重入  累减轻入次数
                setState(state + arg);
                return true;
            }

            // 其余状况就是没获取到同步状态
            return false;
        }

在开释同步状态中

只有当同步状态要改成 0 时才是真正开释,否则状况状况下就是重入扣减次数

        /**
         * 尝试开释同步状态
         *
         * @param arg 开释同步状态的数量
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            // 指标状态
            int targetState = getState() - arg;

            // 真正开释锁
            if (targetState == 0) {setExclusiveOwnerThread(null);
                setState(targetState);
                return true;
            }

            // 其余状况 扣减状态
            setState(targetState);
            return false;
        }

应用外部类实现 AQS 的办法后,咱们在自定义同步组件类中去实现 Lock 接口,并用外部类实现 AQS 的办法去实现 Lock 接口的办法

将要获取、开释的同步状态都设置成 1,对应响应中断、超时的办法就用 AQS 中对应的办法即可

public class MySynchronizedComponent implements Lock {public MySynchronizedComponent() {sync = new Sync();
    }

    private Sync sync;

    @Override
    public void lock() {sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {sync.release(1);
    }

    @Override
    public Condition newCondition() {return sync.new ConditionObject();
    }

}

实际上咱们只须要去实现尝试获取、开释同步状态办法就可能实现本人的同步组件,这就是应用 AQS 带来的益处

代码案例能够去 git 仓库获取,放在本文最初

ReentrantLock

ReentrantLock 是并发包中提供的可重入锁,它除了可能实现 synchronized 的性能外还能够响应中断、超时、实现偏心锁等,其底层也是通过 AQS 来实现的

ReentrantLock 的性能与 synchronized 相似,可重入的独占锁,用于保障并发场景下同步操作

应用时须要显示加锁、解锁,罕用格局如下:

reentrantLock.lock();
try{//....}finally {reentrantLock.unlock();
}

finally 中最先去解锁,并且加锁要放在 try 块的最外层,并保障加锁和 try 块之间不会抛出异样

加锁不放在 try 中是因为加锁实现未知可能抛出不受查看 unchecked 的异样,当加锁抛出异样时,后续 finally 块解锁也会抛出非法监视器的异样从而导致笼罩

加锁和 try 块之间如果抛出异样,那么就无奈执行解锁了

ReentrantLock 除了提供根本的同步性能,还提供响应中断、超时的 API,同学们能够私下去查看

相熟 ReentrantLock 实现的同学,可能看下面自定义同步组件的代码很相熟,其实就是参考 ReentrantLock 非偏心锁写的

ReentrantLock 中应用外部类 Sync 来继承 AQS,同时外部类 NonfairSync 和 FairSync 来继承 Sync 去实现非偏心、偏心的获取同步状态

非偏心锁尝试获取同步状态 流程相似就不过多形容

        final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

那偏心锁如何来实现获取同步状态呢?

其实看过上篇 AQS 文章的同学就晓得了,在上篇文章中曾经说过

只须要在尝试获取同步状态前加上一个条件:队列中是否有前置工作(即在队列中 FIFO 排队获取)

偏心锁也是这么去实现的,前置条件hasQueuedPredecessors

        protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

ReentrantReadWriteLock

性能与实现

ReentrantReadWriteLock 在 ReentrantLock 性能的根底上,提供读写锁的性能,让锁的粒度更细

在一些读多写少的场景下是容许同时读的,容许多个线程获取,其实想到了 AQS 的共享式,读锁也就是共享式

在读读的场景下,都是读锁 / 共享锁,不会进行阻塞

在读写、写读、写写的场景下,都会进行阻塞

比方要获取写锁时,须要期待读锁、写锁都解锁;要获取读锁时,须要期待写锁解锁

ReentrantReadWriteLock 在 ReentrantLock 的根底上减少 ReadLockWriteLock别离作为读锁和写锁

实际上读锁就是共享锁、写锁就是独占锁,在实现加锁、解锁的办法时别离调用共享式、独占式的获取、开释同步状态即可

在结构时,读写锁中理论应用的都是同一个 AQS

        public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }

        // 读锁结构
        protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}

        // 写锁结构
        protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}

即同步状态会被读写锁共享,那么它们如何查看 / 批改本人的那局部同步状态呢?

在读写锁中,同步状态被一分为二,高 16 位的同步状态是读锁的,低 16 位的同步状态是写锁的

当线程获取写锁时,写状态 +1,因为写状态在低位,相当于同步状态 +1

当线程获取读锁时,读状态 +1,因为读状态在高位,相当于同步状态 +(1<<16)

写锁获取

写锁的获取实现在 sync.tryAcquire 中 sync 能够是偏心也能够是非偏心,实际上是独占式的获取

protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
    // 失去同步状态 c
    int c = getState();
    // 失去写状态(同步状态低 16 位 与上 全 1)int w = exclusiveCount(c);
    if (c != 0) {
        // 同步状态不为 0, 写状态为 0, 阐明读状态不为 0, 读锁曾经被获取, 此时获取写锁失败
        // 同步状态不为 0, 写状态也不为 0, 查看以后线程是否是获取写锁的线程, 不是的话获取写锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        
        // 只有以后线程获取过写锁能力进入这里
        
        // 如果原来的写状态 + 这次重入的写状态 超过了 同步状态的 0~15 位 则抛出异样
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        
        // 设置同步状态 因为写状态在低 16 位所以不必左移 (重入累加)
        setState(c + acquires);
        return true;
    }
    
    // 同步状态为 0 无锁时 
    //writerShouldBlock 在非偏心锁下返回 false 在偏心锁下查看是否有前驱工作
    // 如果 CAS 失败则返回 false
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    
    //CAS 胜利则 设置以后线程为取得独占锁 (写锁) 的线程
    setExclusiveOwnerThread(current);
    return true;
}

查看源码能够晓得:

  1. 当有锁时(同步状态不为 0 状况),如果只有读锁(没有写锁),那么间接失败;如果只有写锁则查看以后线程是否为获取写锁的线程(重入状况)
  2. 当无锁时进行 CAS 获取写锁,胜利则设置获取写锁的线程,失败则返回

依据源码剖析能够晓得,写锁容许重入 ,并且 获取写锁时,如果有读锁会被阻塞

写锁开释

写锁的开释实现在 sync.tryRelease

protected final boolean tryRelease(int releases) {// 判断以后线程是不是获取写 (独占) 锁的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    
    // 新状态
    int nextc = getState() - releases;
    // 如果新状态低 16 位为 0(没有写锁)就设置获取写锁的线程为空, 而后设置同步状态, 再返回
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    
    setState(nextc);
    return free;
}

开释其实也相似,只有当写状态为 0 时才是真正开释,其余状况都是扣减重入次数

读锁获取

读锁的获取也就是共享式的获取

        protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
            // 同步状态
            int c = getState();
            
            //exclusiveCount 为获取写锁状态 低 16 位全与 1
            // 如果有写锁 并且 获取写锁的线程不是以后线程 则失败(阐明容许同一线程获取写锁再获取读锁)
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            
            // 获取读状态  (同步状态右移 16 位)
            int r = sharedCount(c);
            // 读没被阻塞 没超过最大值 且 CAS 胜利 记录信息 返回胜利
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {firstReaderHoldCount++;} else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在读锁中 容许同一线程获取写锁再获取读锁

在某些场景下要先写数据再读数据,比方:

  1. 获取写锁
  2. 写数据
  3. 开释写锁
  4. 应用(读)数据

这样会导致开释完写锁后,其余线程能够获取写锁,从而导致第四步会呈现脏读

正确的用法应该在开释写锁前获取读锁:

  1. 获取写锁
  2. 写数据
  3. 获取读锁
  4. 开释写锁
  5. 读数据

这样其余线程获取写锁时因为都读锁会被阻塞,而其余线程须要读时又不会被阻塞

在读多写少的场景,读写锁粒度更细,读读不阻塞,并发性能更好

信号量

性能

信号量用于管制同时拜访资源的线程数量

线程拜访资源时须要先拿到信号量能力拜访,拜访完开释信号量,信号量容许同时 N 个线程获取

上面是管制同时只能有 2 个线程获取到信号量

        // 初始化信号量
        Semaphore semaphore = new Semaphore(2);
        
        // 每次只有两个线程可能获取到信号量执行
        ExecutorService executor =  Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {executor.execute(()->{
                try {semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"取得资源");

                    // 执行工作
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {e.printStackTrace();
                }finally {System.out.println(Thread.currentThread().getName()+"开释资源 ======");
                    semaphore.release();}
            });
        }

        executor.shutdown();

实现

相熟 AQS 的同学应该能够猜到信号量其实就是通过共享式实现的

信号量结构时提供初始化信号量的数量,实际上就是初始化同步状态,比方设置 2 个信号量就是设置同步状态为 2;还能够在结构中设置偏心、非偏心

在获取信号量时,应用响应中断的共享式,在非偏心状况下执行nonfairTryAcquireShared

        final int nonfairTryAcquireShared(int acquires) {for (;;) {
                // 获取同步状态
                int available = getState();
                // 指标同步状态 
                int remaining = available - acquires;
                // 没有信号量 或 CAS 胜利 都会返回指标同步状态 为正数时获取失败
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

在获取时实际上就是扣减要获取的信号量,可能多个线程同时获取信号量,应用 CAS+ 失败重试保障原子性,直到没有信号量或 CAS 胜利

在开释信号量时理论就是加上开释的信号量,可能多个线程同时开释信号量,因而开释时应用 CAS+ 失败重试保障原子性

        protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

CountDownLatch

CountDownLatch 相当于一个计数器,在结构时设置计数数量

性能

调用 countDown 办法会对数量进行自减

调用 await 办法时,如果还有数量没被扣减完,则会阻塞,直到数量都被扣减完

当一个线程执行 N 个工作,或者多个线程执行一个工作时,要期待它们执行完再进行下一步操作时,就能够应用 CountDownLatch

// 初始化 10
CountDownLatch countDownLatch = new CountDownLatch(10);
// 固定线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++) {
    final int index = i;
    executor.execute(() -> {System.out.println(Thread.currentThread() + "解决工作" + index);
        
        // 执行工作...
        
        // 数量 -1
        countDownLatch.countDown();});
}


// 计数量为 0 时才能够继续执行
countDownLatch.await();
System.out.println("解决完工作");

executor.shutdown();

实现

其实它的实现与信号量相似,也是通过共享式

在结构中设置初始值时,实际上就是在设置同步状态

当执行 countDown 扣减数量时,实际上就是在扣减同步状态,因为可能多线程同时执行,应用 CAS+ 失败重试保障扣减同步状态胜利

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

执行 await 时,理论就是判断同步状态是否为 0,不是则阐明有的线程还未执行完工作,阻塞期待

protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}

CyclicBarrier

cyclic Barrier 是一个可循环应用的屏障,它经常被用来和 countdownlatch 作比拟

它就像一个屏障,让线程执行完工作后遇到屏障阻塞,直到所有线程都执行完工作(都达到屏障),并且它是可重复使用的

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("所有线程达到屏障后, 优先执行结构规定的 runnable");
        });

        Thread t1 = new Thread(() -> {
            // 执行工作
            task(cyclicBarrier);
        }, "t1");

        Thread t2 = new Thread(() -> {
            // 执行工作
            task(cyclicBarrier);
        }, "t2");

        Thread t3 = new Thread(() -> {
            // 执行工作
            task(cyclicBarrier);
        }, "t3");

        t1.start();
        t2.start();
        t3.start();

task 办法中会执行 await 阻塞直到所有线程达到屏障

private static void task(CyclicBarrier cyclicBarrier) {System.out.println(Thread.currentThread() + "执行工作...");

    try {TimeUnit.SECONDS.sleep(1);

        cyclicBarrier.await();
        System.out.println("所有线程都执行完," + Thread.currentThread() + "走出屏障");
    } catch (InterruptedException e) {e.printStackTrace();
    } catch (BrokenBarrierException e) {e.printStackTrace();
    }
}

cyclic barrier 会记录须要多少线程达到屏障,并且通过代来达到重复使用

应用 reentrant lock 在 await 中加锁、解锁,每当一个线程达到屏障(执行 await 时),都会进行自减,如果不为 0 会阻塞,自减到 0 时阐明所有线程达到屏障,唤醒其余线程,并更新新的代

Exchange

Exchanger 用于线程间的合作,能够用来替换变量

Exchanger<String> exchanger = new Exchanger();

new Thread(() -> {
    String A = "A";
    try {
        //B
        System.out.println(exchanger.exchange(A));
    } catch (InterruptedException e) {e.printStackTrace();
    }
}).start();

String B = "B";
try {
    //A
    String A = exchanger.exchange(B);
    System.out.println("A=" + A + "B=" + B);
} catch (InterruptedException e) {e.printStackTrace();
}

当一个线程先执行 exchange 时会期待另一个线程执行,等到另一个线程 exchange 时则唤醒期待的线程

总结

本篇文章围绕前置常识 AQS 原理,来实现自定义的同步组件,并对并发包中罕用同步组件的性能和原理进行阐明

继承 AQS 后,只须要实现尝试获取、开释同步状态等办法就能够自定义同步组件

ReentrantLock 是由 AQS 实现的独占式可重入锁,初始值同步状态为 0;获取锁时,如果是无锁则尝试 CAS 自增,胜利就获取了锁;如果有锁则判断获取锁的线程是不是以后线程,是则阐明是可重入锁自增次数;在开释锁时因为可重入的关系,只有自减为 0 才是真正开释锁

ReentrantLock 还提供响应中断、超时、偏心锁的其余性能,偏心锁实现只须要加上获取锁的前提:在 AQS 中 FIFO 排队,前驱节点为首节点

ReentrantReadWriteLock 提供共享的读锁和独占的写锁,将锁的情况更加细粒度,将同步状态高下 16 位拆分为读、写的状态,在读多写少的场景并发性能会更好;在获取写锁时,如果有读锁那么会阻塞,如果有写锁会查看是否为可重入;在获取读锁时,没有写锁就能够获取,如果写锁是以后线程也能够获取

信号量用于控制线程拜访资源,初始化自定义的信号量数量,线程拜访资源时先获取信号量,获取到信号量才可能拜访资源;应用共享式来实现,因为可能多个线程同时获取、开释信号量,在实现时都须要应用 CAS+ 失败重试保障原子性

CountDownLatch 用于计数,能够用于一个线程执行 N 个工作,也能够用于多个线程执行 1 个工作,当执行完工作应用 countdown 来对同步状态进行扣减,执行 await 办法时只有同步状态不为 0 就会阻塞线程,直到所有工作执行完(将同步状态扣减完)

CyclicBarrier 是可循环应用的屏障,用于多线程达到屏障后,须要期待其余线程都达到屏障才继续执行;应用 reentrant lock 和 代 来实现,调用 await 时自减,当计数为 0 时阐明所有线程达到屏障,唤醒其余阻塞的线程

Exchange 用于线程间的合作,可能替换线程间的变量

最初(不要白嫖,一键三连求求拉~)

本篇文章被支出专栏 由点到线,由线到面,深入浅出构建 Java 并发编程常识体系,感兴趣的同学能够继续关注喔

本篇文章笔记以及案例被支出 gitee-StudyJava、github-StudyJava 感兴趣的同学能够 stat 下继续关注喔~

案例地址:

Gitee-JavaConcurrentProgramming/src/main/java/C_AQSComponent

Github-JavaConcurrentProgramming/src/main/java/C_AQSComponent

有什么问题能够在评论区交换,如果感觉菜菜写的不错,能够点赞、关注、珍藏反对一下~

关注菜菜,分享更多干货,公众号:菜菜的后端私房菜

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0