引言
最近能够进行个税申报了,还没有申报的同学能够连忙去试试哦。不过我反正是从上午到下午始终都没有胜利的进行申报,一进行申报
就返回“以后拜访人数过多,请稍后再试”。为什么有些人就可能申报胜利,有些人就间接返回失败。这很显著申报解决资源是无限的,
只能等他人解决完了在来解决你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正曾经是放弃了,等到夜深人静的时候再来试试。作为一个程序员咱们必定晓得这是个税申请 app 的限流操作,如果还有不懂什么
是限流操作的能够参考下这个文章《高并发零碎三大利器之限流》。
比方个税申报零碎每台机器只最多别离只能解决 1000
个申请,再多的申请就会把机器打挂。如果是多余的申请就把这些申请回绝掉。间接给你返回一句舒适提醒:“以后拜访人数过多,请稍后再试”, 如果要实现这个性能大家想想能够通过哪些办法算法来实现。
共享锁、独占锁
学习 semaphore
之前咱们必须要先理解下什么是共享锁。在上一篇文章《Java 高并发编程根底之 AQS》咱们介绍了偏心锁于非偏心锁的区别。
- 共享锁:它是容许多个线程同时获取锁,并发的访问共享资源
- 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,
当独占锁曾经被某个线程持有时,其余线程只能期待它被开释后,能力去争锁,并且同一时刻只有一个线程能争锁胜利。
什么是 Semaphore
在《Java 并发编程艺术 》(微信搜【java 金融】回复 电子书 能够收费获取 PDF 版本)这一书中是这么说的:
Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。很多年以来,我都感觉从字面上很难了解 Semaphore 所表白的含意,只能把它比作是管制流量的红绿灯,比方 XX 马路要限度流量,只容许同时有一百辆车在这条路上行使,其余的都必须在路口期待,所以前一百辆车会看到绿灯,能够开进这条马路,前面的车会看到红灯,不能驶入 XX 马路,然而如果前一百辆中有五辆车曾经来到了 XX 马路,那么前面就容许有 5 辆车驶入马路,这个例子里说的车就是线程,驶入马路就示意线程在执行,来到马路就示意线程执行实现,看见红灯就示意线程被阻塞,不能执行。
Semaphore
机制是提供给线程抢占式获取许可,所以他能够实现偏心或者非偏心,相似于ReentrantLock
。
说了这么多咱们来个理论的例子看一看,比方咱们去停车场停车,停车场总共只有 5
个车位,然而当初有 8
辆汽车来停车,剩下的 3
辆汽车要么等其余汽车开走后进行停车,或者去找别的停车位?
/**
* @author: 公众号【Java 金融】*/
public class SemaphoreTest {public static void main(String[] args) throws InterruptedException {
// 初始化五个车位
Semaphore semaphore = new Semaphore(5);
// 等所有车子
final CountDownLatch latch = new CountDownLatch(8);
for (int i = 0; i < 8; i++) {
int finalI = i;
if (i == 5) {Thread.sleep(1000);
new Thread(() -> {stopCarNotWait(semaphore, finalI);
latch.countDown();}).start();
continue;
}
new Thread(() -> {stopCarWait(semaphore, finalI);
latch.countDown();}).start();}
latch.await();
log("总共还剩:" + semaphore.availablePermits() + "个车位");
}
private static void stopCarWait(Semaphore semaphore, int finalI) {String format = String.format("车牌号 %d", finalI);
try {semaphore.acquire(1);
log(format + "找到车位了,去停车了");
Thread.sleep(10000);
} catch (Exception e) {e.printStackTrace();
} finally {semaphore.release(1);
log(format + "开走了");
}
}
private static void stopCarNotWait(Semaphore semaphore, int finalI) {String format = String.format("车牌号 %d", finalI);
try {if (semaphore.tryAcquire()) {log(format + "找到车位了,去停车了");
Thread.sleep(10000);
log(format + "开走了");
semaphore.release();} else {log(format + "没有停车位了,不在这里等了去其余中央停车去了");
}
} catch (Exception e) {e.printStackTrace();
}
}
public static void log(String content) {
// 格式化
DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 以后工夫
LocalDateTime now = LocalDateTime.now();
System.out.println(now.format(fmTime) + " "+content);
}
}
2021-03-01 18:54:57 车牌号 0 找到车位了,去停车了
2021-03-01 18:54:57 车牌号 3 找到车位了,去停车了
2021-03-01 18:54:57 车牌号 2 找到车位了,去停车了
2021-03-01 18:54:57 车牌号 1 找到车位了,去停车了
2021-03-01 18:54:57 车牌号 4 找到车位了,去停车了
2021-03-01 18:54:58 车牌号 5 没有停车位了,不在这里等了去其余中央停车去了
2021-03-01 18:55:07 车牌号 7 找到车位了,去停车了
2021-03-01 18:55:07 车牌号 6 找到车位了,去停车了
2021-03-01 18:55:07 车牌号 2 开走了
2021-03-01 18:55:07 车牌号 0 开走了
2021-03-01 18:55:07 车牌号 3 开走了
2021-03-01 18:55:07 车牌号 4 开走了
2021-03-01 18:55:07 车牌号 1 开走了
2021-03-01 18:55:17 车牌号 7 开走了
2021-03-01 18:55:17 车牌号 6 开走了
2021-03-01 18:55:17 总共还剩:5 个车位
从输入后果咱们能够看到 车牌号 5
这辆车看见没有车位了,就不在这个中央傻傻的等了,而是去其余中央了,然而 车牌号 6
和 车牌号 7
别离须要等到车库开出两辆车空出两个车位后才停进去。这就体现了 Semaphore
的acquire
办法如果没有获取到凭证它就会阻塞,而tryAcquire
办法如果没有获取到凭证不会阻塞的。
semaphore 在 dubbo 中的利用
在 Dubbo
中能够给 Provider
配置线程池大小来控制系统提供服务的最大并行度,默认是200
。
<dubbo:provider threads="200"/>
比方我当初这个订单零碎有三个接口,别离为创单、勾销订单、批改订单。这三个接口加起来的并发是 200 然而创单接口是外围接口,我想让它多分点线程来执行
让它能够有最大 150
个线程,勾销订单和批改订单别离最大 25
个线程执行就能够了。dubbo
提供了 executes
这一属性来实现这个性能
<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
<dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
<dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>
咱们能够看看 dubbo
外部是如何来 executes
的,具体实现是在 ExecuteLimitFilter
这个类咱们能够
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// 如果以后应用的线程数量曾经大于等于设置的阈值,那么间接抛出异样
// if (count.getActive() >= max) {// throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
// 计数器 +1
RpcStatus.beginCount(url, methodName);
try {Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {throw (RuntimeException) t;
} else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
// 计数器 -1
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if(acquireResult) {executesLimit.release();
}
}
}
从上述代码咱们也能够看出晚期这个是没有采纳 Semaphore
来实现的,而是间接采纳被正文的 if (count.getActive() >= max)
这个来来实现的,因为这个 count.getActive() >= max 和这个计数加 1 不是原子性的, 所以会有问题,具体 bug 号能够看 https://github.com/apache/dub…Semaphore
来修复非原子性问题。具体更具体的剖析能够参见代码的链接。不过当初最新版本(2.7.9)我看是采纳采纳自旋加上和 CAS
来实现的。
Semaphore
下面就是对 Semaphore
一个简略的应用以及 dubbo
中用到的例子, 说句实话 Semaphore 在工作中用的还是比拟少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。咱们后面《Java 高并发编程根底之 AQS》通过 ReentrantLock 一起学习了下 AQS,其实 Semaphore 同样也是通过 AQS 来是实现的,咱们能够一起来对照下独占锁的办法,基本上都是有办法一一绝对应的。
这里有两点略微须要留神的中央:
- 在独占锁模式中,咱们只有在获取了独占锁的节点开释锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被开释,就没有必要去唤醒它的后继节点。
- 在共享锁模式下,当一个节点获取到了共享锁,咱们在获取胜利后就能够唤醒后继节点了,而不须要等到该节点开释锁的时候,这是因为共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点都能够间接来获取。因而,在共享锁模式下,在获取锁和开释锁完结时,都会唤醒后继节点。
获取凭证
咱们同样还是通过非偏心锁的模式来老获取凭证
咱们能够看下 acquire 的外围办法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
// 次要看下这个办法,这个办法返回的值也就是 tryAcquireShared 返回的值,因为 tryAcquireShared->nonfairTryAcquireShared
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
//Semaphore 用 AQS 的 state 变量的值代表可用许可数
int available = getState();
// 可用许可数减去本次须要获取的许可数即为残余许可数
int remaining = available - acquires;
// 如果残余许可数小于 0 或者 CAS 将以后可用许可数设置为残余许可数胜利,则返回胜利许可数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
- 当
tryAcquireShared
获取返回许可书小于 0 时阐明获取许可失败须要进入doAcquireSharedInterruptibly
这个办法去休眠。 - 当
tryAcquireShared
获取返回许可书小于 0 时阐明获取许可胜利间接完结。
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {// 独占锁的 acquireQueued 调用的是 addWaiter(Node.EXCLUSIVE),// 而共享锁调用的是 addWaiter(Node.SHARED),表明了该节点处于共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor();
if (p == head) {int r = tryAcquireShared(arg);
if (r >= 0) {setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
}
这个办法是不是跟咱们上篇文章讲的 AQS
的独占锁的 acquireQueued
很像,不过独占锁它是间接调用了用了 setHead(node)
办法,而共享锁调用的是 setHeadAndPropagate(node, r)
这个办法除了调用setHead
外面还调用了doReleaseShared
(唤醒后继节点)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();}
}
其余的办法基本上是和 ReentrantLock
来实现的独占锁差不多,我置信大家对源码剖析感兴趣的应该也不多,其余更多细节问题还是须要本人亲自动手去看源码的。
总结
- 当信号量
Semaphore
初始化设置许可证为 1 时,它也能够当作互斥锁应用。其中 0、1 就相当于它的状态,当 = 1 时示意其余线程能够获取,当 = 0 时,排他,即其余线程必须要期待。 Semaphore
是JUC
包中的一个很简略的工具类,用来实现多线程下对于资源的同一时刻的拜访线程数限度Semaphore
中存在一个【许可】的概念,即拜访资源之前,先要取得许可,如果以后许可数量为0
,那么线程阻塞,直到取得许可Semaphore
外部应用AQS
实现,由形象外部类Sync
继承了AQS
。因为Semaphore
天生就是共享的场景,所以其外部实际上相似于共享锁的实现- 共享锁的调用框架和独占锁很类似,它们最大的不同在于获取锁的逻辑——共享锁能够被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
- 因为共享锁同一时刻能够被多个线程持有,因而当头节点获取到共享锁时,能够立刻唤醒后继节点来争锁,而不用等到开释锁的时候。因而,共享锁触发唤醒后继节点的行为可能有两处,一处在以后节点胜利取得共享锁后,一处在以后节点开释共享锁后。
- 采纳
semaphore
来进行限流的话会产生 突刺景象。
指在肯定工夫内的一小段时间内就用完了所有资源,后大部分工夫中无资源可用。
比方在限流办法中的计算器算法,设置 1s 内的最大申请数为 100,在前 100ms 曾经永远了 100 个申请,则前面 900ms 将无奈解决申请,这就是突刺景象
完结
- 因为本人满腹经纶,难免会有纰漏,如果你发现了谬误的中央,还望留言给我指出来, 我会对其加以修改。
- 如果你感觉文章还不错,你的转发、分享、赞叹、点赞、留言就是对我最大的激励。
- 感谢您的浏览, 非常欢送并感谢您的关注。
站在伟人的肩膀上摘苹果:
https://segmentfault.com/a/11…