引言
最近能够进行个税申报了,还没有申报的同学能够连忙去试试哦。不过我反正是从上午到下午始终都没有胜利的进行申报,一进行申报
就返回“以后拜访人数过多,请稍后再试”。为什么有些人就可能申报胜利,有些人就间接返回失败。这很显著申报解决资源是无限的,
只能等他人解决完了在来解决你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正曾经是放弃了,等到夜深人静的时候再来试试。作为一个程序员咱们必定晓得这是个税申请app的限流操作,如果还有不懂什么
是限流操作的能够参考下这个文章《高并发零碎三大利器之限流》。
比方个税申报零碎每台机器只最多别离只能解决1000
个申请,再多的申请就会把机器打挂。如果是多余的申请就把这些申请回绝掉。间接给你返回一句舒适提醒:“以后拜访人数过多,请稍后再试”,如果要实现这个性能大家想想能够通过哪些办法算法来实现。
共享锁、独占锁
学习semaphore
之前咱们必须要先理解下什么是共享锁。在上一篇文章《Java高并发编程根底之AQS》咱们介绍了偏心锁于非偏心锁的区别。
- 共享锁:它是容许多个线程同时获取锁,并发的访问共享资源
- 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,
当独占锁曾经被某个线程持有时,其余线程只能期待它被开释后,能力去争锁,并且同一时刻只有一个线程能争锁胜利。
什么是Semaphore
在《Java并发编程艺术》(微信搜【java金融】回复电子书能够收费获取PDF版本)这一书中是这么说的:
Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。很多年以来,我都感觉从字面上很难了解Semaphore所表白的含意,只能把它比作是管制流量的红绿灯,比方XX马路要限度流量,只容许同时有一百辆车在这条路上行使,其余的都必须在路口期待,所以前一百辆车会看到绿灯,能够开进这条马路,前面的车会看到红灯,不能驶入XX马路,然而如果前一百辆中有五辆车曾经来到了XX马路,那么前面就容许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就示意线程在执行,来到马路就示意线程执行实现,看见红灯就示意线程被阻塞,不能执行。
Semaphore
机制是提供给线程抢占式获取许可,所以他能够实现偏心或者非偏心,相似于ReentrantLock
。
说了这么多咱们来个理论的例子看一看,比方咱们去停车场停车,停车场总共只有5
个车位,然而当初有8
辆汽车来停车,剩下的3
辆汽车要么等其余汽车开走后进行停车,或者去找别的停车位?
/** * @author: 公众号【Java金融】 */public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { // 初始化五个车位 Semaphore semaphore = new Semaphore(5); // 等所有车子 final CountDownLatch latch = new CountDownLatch(8); for (int i = 0; i < 8; i++) { int finalI = i; if (i == 5) { Thread.sleep(1000); new Thread(() -> { stopCarNotWait(semaphore, finalI); latch.countDown(); }).start(); continue; } new Thread(() -> { stopCarWait(semaphore, finalI); latch.countDown(); }).start(); } latch.await(); log("总共还剩:" + semaphore.availablePermits() + "个车位"); } private static void stopCarWait(Semaphore semaphore, int finalI) { String format = String.format("车牌号%d", finalI); try { semaphore.acquire(1); log(format + "找到车位了,去停车了"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); log(format + "开走了"); } } private static void stopCarNotWait(Semaphore semaphore, int finalI) { String format = String.format("车牌号%d", finalI); try { if (semaphore.tryAcquire()) { log(format + "找到车位了,去停车了"); Thread.sleep(10000); log(format + "开走了"); semaphore.release(); } else { log(format + "没有停车位了,不在这里等了去其余中央停车去了"); } } catch (Exception e) { e.printStackTrace(); } } public static void log(String content) { // 格式化 DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 以后工夫 LocalDateTime now = LocalDateTime.now(); System.out.println(now.format(fmTime) + " "+content); }}
2021-03-01 18:54:57 车牌号0找到车位了,去停车了2021-03-01 18:54:57 车牌号3找到车位了,去停车了2021-03-01 18:54:57 车牌号2找到车位了,去停车了2021-03-01 18:54:57 车牌号1找到车位了,去停车了2021-03-01 18:54:57 车牌号4找到车位了,去停车了2021-03-01 18:54:58 车牌号5没有停车位了,不在这里等了去其余中央停车去了2021-03-01 18:55:07 车牌号7找到车位了,去停车了2021-03-01 18:55:07 车牌号6找到车位了,去停车了2021-03-01 18:55:07 车牌号2开走了2021-03-01 18:55:07 车牌号0开走了2021-03-01 18:55:07 车牌号3开走了2021-03-01 18:55:07 车牌号4开走了2021-03-01 18:55:07 车牌号1开走了2021-03-01 18:55:17 车牌号7开走了2021-03-01 18:55:17 车牌号6开走了2021-03-01 18:55:17 总共还剩:5个车位
从输入后果咱们能够看到车牌号5
这辆车看见没有车位了,就不在这个中央傻傻的等了,而是去其余中央了,然而车牌号6
和车牌号7
别离须要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphore
的acquire
办法如果没有获取到凭证它就会阻塞,而tryAcquire
办法如果没有获取到凭证不会阻塞的。
semaphore在dubbo中的利用
在Dubbo
中能够给Provider
配置线程池大小来控制系统提供服务的最大并行度,默认是200
。
<dubbo:provider threads="200"/>
比方我当初这个订单零碎有三个接口,别离为创单、勾销订单、批改订单。这三个接口加起来的并发是200然而创单接口是外围接口,我想让它多分点线程来执行
让它能够有最大150
个线程,勾销订单和批改订单别离最大25
个线程执行就能够了。dubbo
提供了executes
这一属性来实现这个性能
<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/><dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/><dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>
咱们能够看看dubbo
外部是如何来executes
的,具体实现是在ExecuteLimitFilter
这个类咱们能够
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // 如果以后应用的线程数量曾经大于等于设置的阈值,那么间接抛出异样// if (count.getActive() >= max) {// throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; // 计数器+1 RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 计数器-1 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
从上述代码咱们也能够看出晚期这个是没有采纳Semaphore
来实现的,而是间接采纳被正文的 if (count.getActive() >= max)
这个来来实现的,因为这个count.getActive() >= max 和这个计数加1不是原子性的,所以会有问题,具体bug号能够看https://github.com/apache/dub...Semaphore
来修复非原子性问题。具体更具体的剖析能够参见代码的链接。不过当初最新版本(2.7.9)我看是采纳采纳自旋加上和CAS
来实现的。
Semaphore
下面就是对Semaphore
一个简略的应用以及dubbo
中用到的例子,说句实话Semaphore在工作中用的还是比拟少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。咱们后面《Java高并发编程根底之AQS》通过ReentrantLock 一起学习了下AQS,其实Semaphore同样也是通过AQS来是实现的,咱们能够一起来对照下独占锁的办法,基本上都是有办法一一绝对应的。
这里有两点略微须要留神的中央:
- 在独占锁模式中,咱们只有在获取了独占锁的节点开释锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被开释,就没有必要去唤醒它的后继节点。
- 在共享锁模式下,当一个节点获取到了共享锁,咱们在获取胜利后就能够唤醒后继节点了,而不须要等到该节点开释锁的时候,这是因为共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点都能够间接来获取。因而,在共享锁模式下,在获取锁和开释锁完结时,都会唤醒后继节点。
获取凭证
咱们同样还是通过非偏心锁的模式来老获取凭证
咱们能够看下acquire的外围办法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 次要看下这个办法,这个办法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { //自旋 for (;;) { //Semaphore用AQS的state变量的值代表可用许可数 int available = getState(); //可用许可数减去本次须要获取的许可数即为残余许可数 int remaining = available - acquires; //如果残余许可数小于0或者CAS将以后可用许可数设置为残余许可数胜利,则返回胜利许可数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }
- 当
tryAcquireShared
获取返回许可书小于0时阐明获取许可失败须要进入doAcquireSharedInterruptibly
这个办法去休眠。 - 当
tryAcquireShared
获取返回许可书小于0时阐明获取许可胜利间接完结。
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE), // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这个办法是不是跟咱们上篇文章讲的AQS
的独占锁的acquireQueued
很像,不过独占锁它是间接调用了用了setHead(node)
办法,而共享锁调用的是setHeadAndPropagate(node, r)
这个办法除了调用setHead
外面还调用了doReleaseShared
(唤醒后继节点)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
其余的办法基本上是和ReentrantLock
来实现的独占锁差不多,我置信大家对源码剖析感兴趣的应该也不多,其余更多细节问题还是须要本人亲自动手去看源码的。
总结
- 当信号量
Semaphore
初始化设置许可证为1 时,它也能够当作互斥锁应用。其中0、1就相当于它的状态,当=1时示意其余线程能够获取,当=0时,排他,即其余线程必须要期待。 Semaphore
是JUC
包中的一个很简略的工具类,用来实现多线程下对于资源的同一时刻的拜访线程数限度Semaphore
中存在一个【许可】的概念,即拜访资源之前,先要取得许可,如果以后许可数量为0
,那么线程阻塞,直到取得许可Semaphore
外部应用AQS
实现,由形象外部类Sync
继承了AQS
。因为Semaphore
天生就是共享的场景,所以其外部实际上相似于共享锁的实现- 共享锁的调用框架和独占锁很类似,它们最大的不同在于获取锁的逻辑——共享锁能够被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
- 因为共享锁同一时刻能够被多个线程持有,因而当头节点获取到共享锁时,能够立刻唤醒后继节点来争锁,而不用等到开释锁的时候。因而,共享锁触发唤醒后继节点的行为可能有两处,一处在以后节点胜利取得共享锁后,一处在以后节点开释共享锁后。
- 采纳
semaphore
来进行限流的话会产生突刺景象。
指在肯定工夫内的一小段时间内就用完了所有资源,后大部分工夫中无资源可用。
比方在限流办法中的计算器算法,设置1s内的最大申请数为100,在前100ms曾经永远了100个申请,则前面900ms将无奈解决申请,这就是突刺景象
完结
- 因为本人满腹经纶,难免会有纰漏,如果你发现了谬误的中央,还望留言给我指出来,我会对其加以修改。
- 如果你感觉文章还不错,你的转发、分享、赞叹、点赞、留言就是对我最大的激励。
- 感谢您的浏览,非常欢送并感谢您的关注。
站在伟人的肩膀上摘苹果:
https://segmentfault.com/a/11...