引言

最近能够进行个税申报了,还没有申报的同学能够连忙去试试哦。不过我反正是从上午到下午始终都没有胜利的进行申报,一进行申报
就返回“以后拜访人数过多,请稍后再试”。为什么有些人就可能申报胜利,有些人就间接返回失败。这很显著申报解决资源是无限的,
只能等他人解决完了在来解决你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正曾经是放弃了,等到夜深人静的时候再来试试。作为一个程序员咱们必定晓得这是个税申请app的限流操作,如果还有不懂什么
是限流操作的能够参考下这个文章《高并发零碎三大利器之限流》。
比方个税申报零碎每台机器只最多别离只能解决1000个申请,再多的申请就会把机器打挂。如果是多余的申请就把这些申请回绝掉。间接给你返回一句舒适提醒:“以后拜访人数过多,请稍后再试”,如果要实现这个性能大家想想能够通过哪些办法算法来实现。

共享锁、独占锁

学习semaphore之前咱们必须要先理解下什么是共享锁。在上一篇文章《Java高并发编程根底之AQS》咱们介绍了偏心锁于非偏心锁的区别。

  • 共享锁:它是容许多个线程同时获取锁,并发的访问共享资源
  • 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,

当独占锁曾经被某个线程持有时,其余线程只能期待它被开释后,能力去争锁,并且同一时刻只有一个线程能争锁胜利。

什么是Semaphore

在《Java并发编程艺术》(微信搜【java金融】回复电子书能够收费获取PDF版本)这一书中是这么说的:

Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。很多年以来,我都感觉从字面上很难了解Semaphore所表白的含意,只能把它比作是管制流量的红绿灯,比方XX马路要限度流量,只容许同时有一百辆车在这条路上行使,其余的都必须在路口期待,所以前一百辆车会看到绿灯,能够开进这条马路,前面的车会看到红灯,不能驶入XX马路,然而如果前一百辆中有五辆车曾经来到了XX马路,那么前面就容许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就示意线程在执行,来到马路就示意线程执行实现,看见红灯就示意线程被阻塞,不能执行。
  • Semaphore机制是提供给线程抢占式获取许可,所以他能够实现偏心或者非偏心,相似于ReentrantLock

说了这么多咱们来个理论的例子看一看,比方咱们去停车场停车,停车场总共只有5个车位,然而当初有8辆汽车来停车,剩下的3辆汽车要么等其余汽车开走后进行停车,或者去找别的停车位?

/** * @author: 公众号【Java金融】 */public class SemaphoreTest {    public static void main(String[] args) throws InterruptedException {         // 初始化五个车位        Semaphore semaphore = new Semaphore(5);        // 等所有车子        final CountDownLatch latch = new CountDownLatch(8);        for (int i = 0; i < 8; i++) {            int finalI = i;            if (i == 5) {                Thread.sleep(1000);                new Thread(() -> {                    stopCarNotWait(semaphore, finalI);                    latch.countDown();                }).start();                continue;            }            new Thread(() -> {                stopCarWait(semaphore, finalI);                latch.countDown();            }).start();        }        latch.await();        log("总共还剩:" + semaphore.availablePermits() + "个车位");    }    private static void stopCarWait(Semaphore semaphore, int finalI) {        String format = String.format("车牌号%d", finalI);        try {            semaphore.acquire(1);            log(format + "找到车位了,去停车了");            Thread.sleep(10000);        } catch (Exception e) {            e.printStackTrace();        } finally {            semaphore.release(1);            log(format + "开走了");        }    }    private static void stopCarNotWait(Semaphore semaphore, int finalI) {         String format = String.format("车牌号%d", finalI);        try {            if (semaphore.tryAcquire()) {                log(format + "找到车位了,去停车了");                Thread.sleep(10000);                log(format + "开走了");                semaphore.release();            } else {                log(format + "没有停车位了,不在这里等了去其余中央停车去了");            }        } catch (Exception e) {            e.printStackTrace();        }    }    public static void log(String content) {        // 格式化        DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");        // 以后工夫        LocalDateTime now = LocalDateTime.now();        System.out.println(now.format(fmTime) + "  "+content);    }}
2021-03-01 18:54:57  车牌号0找到车位了,去停车了2021-03-01 18:54:57  车牌号3找到车位了,去停车了2021-03-01 18:54:57  车牌号2找到车位了,去停车了2021-03-01 18:54:57  车牌号1找到车位了,去停车了2021-03-01 18:54:57  车牌号4找到车位了,去停车了2021-03-01 18:54:58  车牌号5没有停车位了,不在这里等了去其余中央停车去了2021-03-01 18:55:07  车牌号7找到车位了,去停车了2021-03-01 18:55:07  车牌号6找到车位了,去停车了2021-03-01 18:55:07  车牌号2开走了2021-03-01 18:55:07  车牌号0开走了2021-03-01 18:55:07  车牌号3开走了2021-03-01 18:55:07  车牌号4开走了2021-03-01 18:55:07  车牌号1开走了2021-03-01 18:55:17  车牌号7开走了2021-03-01 18:55:17  车牌号6开走了2021-03-01 18:55:17  总共还剩:5个车位

从输入后果咱们能够看到车牌号5这辆车看见没有车位了,就不在这个中央傻傻的等了,而是去其余中央了,然而车牌号6车牌号7别离须要等到车库开出两辆车空出两个车位后才停进去。这就体现了Semaphoreacquire 办法如果没有获取到凭证它就会阻塞,而tryAcquire办法如果没有获取到凭证不会阻塞的。

semaphore在dubbo中的利用

Dubbo中能够给Provider配置线程池大小来控制系统提供服务的最大并行度,默认是200

<dubbo:provider  threads="200"/>

比方我当初这个订单零碎有三个接口,别离为创单、勾销订单、批改订单。这三个接口加起来的并发是200然而创单接口是外围接口,我想让它多分点线程来执行
让它能够有最大150个线程,勾销订单和批改订单别离最大25个线程执行就能够了。dubbo提供了executes这一属性来实现这个性能

<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/><dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/><dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

咱们能够看看dubbo外部是如何来executes的,具体实现是在ExecuteLimitFilter这个类咱们能够

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {        URL url = invoker.getUrl();        String methodName = invocation.getMethodName();        Semaphore executesLimit = null;        boolean acquireResult = false;        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);        if (max > 0) {            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());            // 如果以后应用的线程数量曾经大于等于设置的阈值,那么间接抛出异样//            if (count.getActive() >= max) {// throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");            /**             * http://manzhizhen.iteye.com/blog/2386408             * use semaphore for concurrency control (to limit thread number)             */                         executesLimit = count.getSemaphore(max);            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");            }        }        long begin = System.currentTimeMillis();        boolean isSuccess = true;        // 计数器+1        RpcStatus.beginCount(url, methodName);        try {            Result result = invoker.invoke(invocation);            return result;        } catch (Throwable t) {            isSuccess = false;            if (t instanceof RuntimeException) {                throw (RuntimeException) t;            } else {                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);            }        } finally {           // 计数器-1            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);            if(acquireResult) {                executesLimit.release();            }        }    }

从上述代码咱们也能够看出晚期这个是没有采纳Semaphore来实现的,而是间接采纳被正文的 if (count.getActive() >= max) 这个来来实现的,因为这个count.getActive() >= max 和这个计数加1不是原子性的,所以会有问题,具体bug号能够看https://github.com/apache/dub...Semaphore来修复非原子性问题。具体更具体的剖析能够参见代码的链接。不过当初最新版本(2.7.9)我看是采纳采纳自旋加上和CAS来实现的。

Semaphore

下面就是对Semaphore一个简略的应用以及dubbo中用到的例子,说句实话Semaphore在工作中用的还是比拟少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。咱们后面《Java高并发编程根底之AQS》通过ReentrantLock 一起学习了下AQS,其实Semaphore同样也是通过AQS来是实现的,咱们能够一起来对照下独占锁的办法,基本上都是有办法一一绝对应的。

这里有两点略微须要留神的中央:

  • 在独占锁模式中,咱们只有在获取了独占锁的节点开释锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被开释,就没有必要去唤醒它的后继节点。
  • 在共享锁模式下,当一个节点获取到了共享锁,咱们在获取胜利后就能够唤醒后继节点了,而不须要等到该节点开释锁的时候,这是因为共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点都能够间接来获取。因而,在共享锁模式下,在获取锁和开释锁完结时,都会唤醒后继节点。

获取凭证

咱们同样还是通过非偏心锁的模式来老获取凭证
咱们能够看下acquire的外围办法

  public final void acquireSharedInterruptibly(int arg)           throws InterruptedException {       if (Thread.interrupted())           throw new InterruptedException();       if (tryAcquireShared(arg) < 0)           doAcquireSharedInterruptibly(arg);   }    protected int tryAcquireShared(int acquires) {            return nonfairTryAcquireShared(acquires);   }        // 次要看下这个办法,这个办法返回的值也就是tryAcquireShared返回的值,因为tryAcquireShared->nonfairTryAcquireShared    final int nonfairTryAcquireShared(int acquires) {          //自旋          for (;;) {               //Semaphore用AQS的state变量的值代表可用许可数               int available = getState();               //可用许可数减去本次须要获取的许可数即为残余许可数               int remaining = available - acquires;               //如果残余许可数小于0或者CAS将以后可用许可数设置为残余许可数胜利,则返回胜利许可数               if (remaining < 0 ||                   compareAndSetState(available, remaining))                   return remaining;           }
  • tryAcquireShared 获取返回许可书小于0时阐明获取许可失败须要进入doAcquireSharedInterruptibly这个办法去休眠。
  • tryAcquireShared 获取返回许可书小于0时阐明获取许可胜利间接完结。

doAcquireSharedInterruptibly

 private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        // 独占锁的acquireQueued调用的是addWaiter(Node.EXCLUSIVE),        // 而共享锁调用的是addWaiter(Node.SHARED),表明了该节点处于共享模式        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }

这个办法是不是跟咱们上篇文章讲的AQS的独占锁的acquireQueued很像,不过独占锁它是间接调用了用了setHead(node)办法,而共享锁调用的是setHeadAndPropagate(node, r)
这个办法除了调用setHead 外面还调用了doReleaseShared(唤醒后继节点)

    private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        setHead(node);        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            if (s == null || s.isShared())                doReleaseShared();        }    }

其余的办法基本上是和ReentrantLock来实现的独占锁差不多,我置信大家对源码剖析感兴趣的应该也不多,其余更多细节问题还是须要本人亲自动手去看源码的。

总结

  • 当信号量Semaphore初始化设置许可证为1 时,它也能够当作互斥锁应用。其中0、1就相当于它的状态,当=1时示意其余线程能够获取,当=0时,排他,即其余线程必须要期待。
  • SemaphoreJUC包中的一个很简略的工具类,用来实现多线程下对于资源的同一时刻的拜访线程数限度
  • Semaphore中存在一个【许可】的概念,即拜访资源之前,先要取得许可,如果以后许可数量为0,那么线程阻塞,直到取得许可
  • Semaphore外部应用AQS实现,由形象外部类Sync继承了AQS。因为Semaphore天生就是共享的场景,所以其外部实际上相似于共享锁的实现
  • 共享锁的调用框架和独占锁很类似,它们最大的不同在于获取锁的逻辑——共享锁能够被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
  • 因为共享锁同一时刻能够被多个线程持有,因而当头节点获取到共享锁时,能够立刻唤醒后继节点来争锁,而不用等到开释锁的时候。因而,共享锁触发唤醒后继节点的行为可能有两处,一处在以后节点胜利取得共享锁后,一处在以后节点开释共享锁后。
  • 采纳semaphore来进行限流的话会产生突刺景象
指在肯定工夫内的一小段时间内就用完了所有资源,后大部分工夫中无资源可用。
比方在限流办法中的计算器算法,设置1s内的最大申请数为100,在前100ms曾经永远了100个申请,则前面900ms将无奈解决申请,这就是突刺景象

完结

  • 因为本人满腹经纶,难免会有纰漏,如果你发现了谬误的中央,还望留言给我指出来,我会对其加以修改。
  • 如果你感觉文章还不错,你的转发、分享、赞叹、点赞、留言就是对我最大的激励。
  • 感谢您的浏览,非常欢送并感谢您的关注。

站在伟人的肩膀上摘苹果:
https://segmentfault.com/a/11...