Semaphore-原理简介和使用

43次阅读

共计 2878 个字符,预计需要花费 8 分钟才能阅读完成。

Semaphore 共享锁

简介

` 在多线程环境下用于协调各个线程, 以保证它们能够正确、合理的使用公共资源
信号量维护了一个许可集,我们在初始化 Semaphore 时需要为这个许可集传入一个数量值,
该数量值代表同一时间能访问共享资源的线程数量。
线程可以通过 acquire()方法获取到一个许可,然后对共享资源进行操作,
如果许可集已分配完了,那么线程将进入等待状态,
直到其他线程释放许可才有机会再获取许可,线程释放一个许可通过 release()方法完成
`

DEMO 了解其用法

` 上述示例说明:

在创建 Semaphore 时初始化 5 个许可,这也就意味着同一个时间点允许 5 个线程进行共享资源访问,
使用 acquire()方法为每个线程获取许可,并进行休眠 1 秒,如果 5 个许可已被分配完,
新到来的线程将进入等待状态。如果线程顺利完成操作将通过 release()方法释放许可,
我们执行代码,可以发现每隔 1 秒几乎同一时间出现 5 条线程访问
`

Semaphore 实现互斥锁

简介

` 在初始化信号量时传入 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。
这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可或零个可用的许可。
按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)

`

DEMO 了解其用法

` 创建一个数量为 1 的互斥信号量 Semaphore,
然后并发执行 10 个线程,在线程中利用 Semaphore 控制线程的并发执行,
因为信号量数值只有 1,因此每次只能一条线程执行,其他线程进入等待状态
`

Semaphore 提供的方法

  • Semaphore(int permits) 创建具有给定的许可数和非公平的公平设置的 Semaphore
  • Semaphore(int permits, boolean fair) 创建具有给定的许可数和给定的公平设置的 Semaphore,true 即为公平锁
  • void acquireUninterruptibly() 从此信号量中获取许可,不可中断
  • int availablePermits() 返回此信号量中当前可用的许可数
  • int drainPermits() 获取并返回立即可用的所有许可
  • protected Collection<Thread> getQueuedThreads() 返回一个 collection,包含可能等待获取的线程
  • int getQueueLength() 返回正在等待获取的线程的估计数目
  • boolean hasQueuedThreads() 查询是否有线程正在等待获取
  • boolean isFair() 如果此信号量的公平设置为 true,则返回 true
  • boolean tryAcquire() 仅在调用时此信号量存在一个可用许可,才从信号量获取许可
  • boolean tryAcquire(long timeout, TimeUnit unit) 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可

内部原理分析

类图

`AQS 是基础组件,只负责核心并发操作,如加入或维护同步队列,控制同步状态,等,
而具体的加锁和解锁操作交由子类完成,
因此子类 Semaphore 共享锁的获取与释放需要自己实现,
这两个方法分别是获取锁的 tryAcquireShared(int arg)方法和释放锁的 tryReleaseShared(int arg)方法,这点从 Semaphore 的内部结构完全可以看出来
`

`Semaphore 的内部类公平锁 (FairSync) 和非公平锁 (NoFairSync) 各自实现不同的获取锁方法即 tryAcquireShared(int arg),
毕竟公平锁和非公平锁的获取稍后不同,
而释放锁 tryReleaseShared(int arg)的操作交由 Sync 实现,因为释放操作都是相同的,因此放在父类 Sync 中实现当然是最好的
`

源码分析

` 默认非公平锁
`

` 初始化信号量的时候 传入的 permits 参数 最终会赋值到 aqs 的 state 变量 
并对 state 进行 cas 操作

调用 Semaphore 的 acquire()方法后,
执行过程是这样的,
当一个线程请求到来时,如果 state 值代表的许可数足够使用,
那么请求线程将会获得同步状态即对共享资源的访问权,并更新 state 的值 (一般是对 state 值减 1),
但如果 state 值代表的许可数已为 0,则请求线程将无法获取同步状态,
线程将被加入到同步队列并阻塞,
直到其他线程释放同步状态 (一般是对 state 值加 1) 才可能获取对共享资源的访问权
`

` 先获取 state 的值,并执行减法操作,得到 remaining 值,
如果 remaining 大于等于 0,那么线程获取同步状态成功,可访问共享资源,并更新 state 的值,
如果 remaining 小于 0,那么线程获取同步状态失败,将被加入同步队列(通过 doAcquireSharedInterruptibly(arg))

采用无锁 (CAS) 并发的操作保证对 state 值修改的安全

`

`1、在方法中,由于当前线程没有获取同步状态,因此创建一个共享模式(Node.SHARED)的结点并通过 addWaiter(Node.SHARED)加入同步队列,

2、加入完成后,当前线程进入自旋状态,首先判断前驱结点是否为 head,

a、如果是,那么尝试获取同步状态并返回 r 值,如果 r 大于 0,则说明获取同步状态成功,将当前线程设置为 head 并传播,传播指的是,同步状态剩余的许可数值不为 0,通知后续结点继续获取同步状态,到此方法将会 return 结束,获取到同步状态的线程将会执行原定的任务。

b、如果前驱结点不为 head 或前驱结点为 head 并尝试获取同步状态失败,那么调用 shouldParkAfterFailedAcquire(p, node)方法判断前驱结点的 waitStatus 值是否为 SIGNAL 并调整同步队列中的 node 结点状态,如果返回 true,那么执行 parkAndCheckInterrupt()方法,将当前线程挂起并返回是否中断线程的 flag

`

`
在 AQS 中存在一个变量 state,当我们创建 Semaphore 对象传入许可数值时,
最终会赋值给 state,state 的数值代表同一个时刻可同时操作共享数据的线程数量,
每当一个线程请求 (如调用 Semaphored 的 acquire() 方法)获取同步状态成功,
state 的值将会减少 1,直到 state 为 0 时,表示已没有可用的许可数,
也就是对共享数据进行操作的线程数已达到最大值,其他后来线程将被阻塞,
此时 AQS 内部会将线程封装成共享模式的 Node 结点,加入同步队列中等待并开启自旋操作。
只有当持有对共享数据访问权限的线程执行完成任务并释放同步状态后,
同步队列中的对于的结点线程才有可能获取同步状态并被唤醒执行同步操作,注
意在同步队列中获取到同步状态的结点将被设置成 head 并清空相关线程数据(毕竟线程已在执行也就没有必要保存信息了),
AQS 通过这种方式便实现共享锁
`

正文完
 0