共计 1467 个字符,预计需要花费 4 分钟才能阅读完成。
Semaphore 信号量
在管程被提出来之前用的是信号量。
信号量模型
一个计数器,一个等待队列,三个方法。计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问他们,这三个方法分别是:init()、down()、up()
- init(): 设置计数器的初始值
- down(): 计数器的值减 1; 如果此时计数器的值小于 0, 则当前线程被阻塞, 否则当前线程继续执行
- up(): 计数器的值加 1; 如果此时计数器的值小于或等于 0, 则欢迎等待队列中的一个线程, 并将其从等待队列中移除。
这三个方法都是原子性的,原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这 三个方法都是原子操作。
白话计数器
- 计数器记录设置请求的并发数。
- 服务器接受一个请求计数器 -1。
- 当计数器等于 0 时请求进入等待队列等待。
- 当服务器处理完一个请求计数器 +1(上限时计数器的初始值,最大也只能是计数器的初始值)
亲手实现一个信号量
import java.util.Queue;
/**
* 信号量
**/
public class Semaphore {
// 计数器
int count;
// 等待队列
Queue<String> queue;
// 初始化操作
Semaphore(int count){this.count = count;}
void down(){
this.count--;
if (this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count ++ ;
if (this.count>=0){
// 移出等待队列中的某个线程 T
// 唤醒线程 T
}
}
}
在 JavaSDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。
实现一个限流器:
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
/**
*
*
**/
public class ObjPool<T,R> {
// 对象池
final List<T> pool;
// 信号量
final Semaphore sem;
/**
* 向线程池里添加数据
* @param size 对象数量
* @param t
*/
public ObjPool(int size, T t) {this.pool = new Vector<>();
for (int i=0; i<size;i++){pool.add(t);
}
sem = new Semaphore(size);
}
R exec(Function<T,R> func) throws Exception{
T t = null;
sem.acquire();// 计数器 -1
try {t = pool.remove(0);// 从队列中去除一个值
return func.apply(t);
}finally {pool.add(t);
sem.release();// 计数器 +1}
}
public static void main(String[] args) throws Exception{ObjPool<Long, String> pool = new ObjPool<>(10, 2L);
pool.exec(t -> {System.out.println(t);
return t.toString();});
}
}
码字不易如果对你有帮助请给个关注
爱技术爱生活 QQ 群: 894109590
正文完
发表至: java
2019-09-24