关于java:Java高并发编程基础三大利器之Semaphore

34次阅读

共计 8145 个字符,预计需要花费 21 分钟才能阅读完成。

引言

最近能够进行个税申报了,还没有申报的同学能够连忙去试试哦。不过我反正是从上午到下午始终都没有胜利的进行申报,一进行申报
就返回“以后拜访人数过多,请稍后再试”。为什么有些人就可能申报胜利,有些人就间接返回失败。这很显著申报解决资源是无限的,
只能等他人解决完了在来解决你的,你如果运气好可能重试几次就轮到你了,如果运气不好可能重试一天也可能轮不到你。
我反正曾经是放弃了,等到夜深人静的时候再来试试。作为一个程序员咱们必定晓得这是个税申请 app 的限流操作,如果还有不懂什么
是限流操作的能够参考下这个文章《高并发零碎三大利器之限流》。
比方个税申报零碎每台机器只最多别离只能解决 1000 个申请,再多的申请就会把机器打挂。如果是多余的申请就把这些申请回绝掉。间接给你返回一句舒适提醒:“以后拜访人数过多,请稍后再试”, 如果要实现这个性能大家想想能够通过哪些办法算法来实现。

共享锁、独占锁

学习 semaphore 之前咱们必须要先理解下什么是共享锁。在上一篇文章《Java 高并发编程根底之 AQS》咱们介绍了偏心锁于非偏心锁的区别。

  • 共享锁:它是容许多个线程同时获取锁,并发的访问共享资源
  • 独占锁:也有人把它叫做“独享锁”,它是是独占的,排他的,只能被一个线程可持有,

当独占锁曾经被某个线程持有时,其余线程只能期待它被开释后,能力去争锁,并且同一时刻只有一个线程能争锁胜利。

什么是 Semaphore

在《Java 并发编程艺术 》(微信搜【java 金融】回复 电子书 能够收费获取 PDF 版本)这一书中是这么说的:

Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。很多年以来,我都感觉从字面上很难了解 Semaphore 所表白的含意,只能把它比作是管制流量的红绿灯,比方 XX 马路要限度流量,只容许同时有一百辆车在这条路上行使,其余的都必须在路口期待,所以前一百辆车会看到绿灯,能够开进这条马路,前面的车会看到红灯,不能驶入 XX 马路,然而如果前一百辆中有五辆车曾经来到了 XX 马路,那么前面就容许有 5 辆车驶入马路,这个例子里说的车就是线程,驶入马路就示意线程在执行,来到马路就示意线程执行实现,看见红灯就示意线程被阻塞,不能执行。

  • Semaphore机制是提供给线程抢占式获取许可,所以他能够实现偏心或者非偏心,相似于ReentrantLock

说了这么多咱们来个理论的例子看一看,比方咱们去停车场停车,停车场总共只有 5 个车位,然而当初有 8 辆汽车来停车,剩下的 3 辆汽车要么等其余汽车开走后进行停车,或者去找别的停车位?

/**
 * @author: 公众号【Java 金融】*/
public class SemaphoreTest {public static void main(String[] args) throws InterruptedException {
         // 初始化五个车位
        Semaphore semaphore = new Semaphore(5);
        // 等所有车子
        final CountDownLatch latch = new CountDownLatch(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            if (i == 5) {Thread.sleep(1000);
                new Thread(() -> {stopCarNotWait(semaphore, finalI);
                    latch.countDown();}).start();
                continue;
            }
            new Thread(() -> {stopCarWait(semaphore, finalI);
                latch.countDown();}).start();}
        latch.await();
        log("总共还剩:" + semaphore.availablePermits() + "个车位");
    }

    private static void stopCarWait(Semaphore semaphore, int finalI) {String format = String.format("车牌号 %d", finalI);
        try {semaphore.acquire(1);
            log(format + "找到车位了,去停车了");
            Thread.sleep(10000);
        } catch (Exception e) {e.printStackTrace();
        } finally {semaphore.release(1);
            log(format + "开走了");
        }
    }

    private static void stopCarNotWait(Semaphore semaphore, int finalI) {String format = String.format("车牌号 %d", finalI);
        try {if (semaphore.tryAcquire()) {log(format + "找到车位了,去停车了");
                Thread.sleep(10000);
                log(format + "开走了");
                semaphore.release();} else {log(format + "没有停车位了,不在这里等了去其余中央停车去了");
            }
        } catch (Exception e) {e.printStackTrace();
        }

    }

    public static void log(String content) {
        // 格式化
        DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 以后工夫
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now.format(fmTime) + " "+content);
    }
}
2021-03-01 18:54:57  车牌号 0 找到车位了,去停车了
2021-03-01 18:54:57  车牌号 3 找到车位了,去停车了
2021-03-01 18:54:57  车牌号 2 找到车位了,去停车了
2021-03-01 18:54:57  车牌号 1 找到车位了,去停车了
2021-03-01 18:54:57  车牌号 4 找到车位了,去停车了
2021-03-01 18:54:58  车牌号 5 没有停车位了,不在这里等了去其余中央停车去了
2021-03-01 18:55:07  车牌号 7 找到车位了,去停车了
2021-03-01 18:55:07  车牌号 6 找到车位了,去停车了
2021-03-01 18:55:07  车牌号 2 开走了
2021-03-01 18:55:07  车牌号 0 开走了
2021-03-01 18:55:07  车牌号 3 开走了
2021-03-01 18:55:07  车牌号 4 开走了
2021-03-01 18:55:07  车牌号 1 开走了
2021-03-01 18:55:17  车牌号 7 开走了
2021-03-01 18:55:17  车牌号 6 开走了
2021-03-01 18:55:17  总共还剩:5 个车位

从输入后果咱们能够看到 车牌号 5 这辆车看见没有车位了,就不在这个中央傻傻的等了,而是去其余中央了,然而 车牌号 6 车牌号 7 别离须要等到车库开出两辆车空出两个车位后才停进去。这就体现了 Semaphoreacquire 办法如果没有获取到凭证它就会阻塞,而tryAcquire 办法如果没有获取到凭证不会阻塞的。

semaphore 在 dubbo 中的利用

Dubbo 中能够给 Provider 配置线程池大小来控制系统提供服务的最大并行度,默认是200

<dubbo:provider  threads="200"/>

比方我当初这个订单零碎有三个接口,别离为创单、勾销订单、批改订单。这三个接口加起来的并发是 200 然而创单接口是外围接口,我想让它多分点线程来执行
让它能够有最大 150 个线程,勾销订单和批改订单别离最大 25 个线程执行就能够了。dubbo提供了 executes 这一属性来实现这个性能

<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
<dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
<dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

咱们能够看看 dubbo 外部是如何来 executes 的,具体实现是在 ExecuteLimitFilter 这个类咱们能够

 public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
            // 如果以后应用的线程数量曾经大于等于设置的阈值,那么间接抛出异样
//            if (count.getActive() >= max) {// throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
             
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {throw new RpcException("Failed to invoke method" + invocation.getMethodName() + "in provider" + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 计数器 +1
        RpcStatus.beginCount(url, methodName);
        try {Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {throw (RuntimeException) t;
            } else {throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
           // 计数器 -1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {executesLimit.release();
            }
        }
    }

从上述代码咱们也能够看出晚期这个是没有采纳 Semaphore 来实现的,而是间接采纳被正文的 if (count.getActive() >= max) 这个来来实现的,因为这个 count.getActive() >= max 和这个计数加 1 不是原子性的, 所以会有问题,具体 bug 号能够看 https://github.com/apache/dub…Semaphore 来修复非原子性问题。具体更具体的剖析能够参见代码的链接。不过当初最新版本(2.7.9)我看是采纳采纳自旋加上和 CAS 来实现的。

Semaphore

下面就是对 Semaphore 一个简略的应用以及 dubbo 中用到的例子, 说句实话 Semaphore 在工作中用的还是比拟少的,不过面试又有可能会被问到,所以还是有必要来一起学习一下它。咱们后面《Java 高并发编程根底之 AQS》通过 ReentrantLock 一起学习了下 AQS,其实 Semaphore 同样也是通过 AQS 来是实现的,咱们能够一起来对照下独占锁的办法,基本上都是有办法一一绝对应的。

这里有两点略微须要留神的中央:

  • 在独占锁模式中,咱们只有在获取了独占锁的节点开释锁时,才会唤醒后继节点,因为独占锁只能被一个线程持有,如果它还没有被开释,就没有必要去唤醒它的后继节点。
  • 在共享锁模式下,当一个节点获取到了共享锁,咱们在获取胜利后就能够唤醒后继节点了,而不须要等到该节点开释锁的时候,这是因为共享锁能够被多个线程同时持有,一个锁获取到了,则后继的节点都能够间接来获取。因而,在共享锁模式下,在获取锁和开释锁完结时,都会唤醒后继节点。

获取凭证

咱们同样还是通过非偏心锁的模式来老获取凭证
咱们能够看下 acquire 的外围办法

  public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {if (Thread.interrupted())
           throw new InterruptedException();
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
   }
    protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
   }
    
    // 次要看下这个办法,这个办法返回的值也就是 tryAcquireShared 返回的值,因为 tryAcquireShared->nonfairTryAcquireShared
    final int nonfairTryAcquireShared(int acquires) {
          // 自旋
          for (;;) {
               //Semaphore 用 AQS 的 state 变量的值代表可用许可数
               int available = getState();
               // 可用许可数减去本次须要获取的许可数即为残余许可数
               int remaining = available - acquires;
               // 如果残余许可数小于 0 或者 CAS 将以后可用许可数设置为残余许可数胜利,则返回胜利许可数
               if (remaining < 0 ||
                   compareAndSetState(available, remaining))
                   return remaining;
           }
  • tryAcquireShared 获取返回许可书小于 0 时阐明获取许可失败须要进入doAcquireSharedInterruptibly 这个办法去休眠。
  • tryAcquireShared 获取返回许可书小于 0 时阐明获取许可胜利间接完结。

doAcquireSharedInterruptibly

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {// 独占锁的 acquireQueued 调用的是 addWaiter(Node.EXCLUSIVE),// 而共享锁调用的是 addWaiter(Node.SHARED),表明了该节点处于共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {for (;;) {final Node p = node.predecessor();
                if (p == head) {int r = tryAcquireShared(arg);
                    if (r >= 0) {setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();}
        } finally {if (failed)
                cancelAcquire(node);
        }
    }

这个办法是不是跟咱们上篇文章讲的 AQS 的独占锁的 acquireQueued 很像,不过独占锁它是间接调用了用了 setHead(node) 办法,而共享锁调用的是 setHeadAndPropagate(node, r)
这个办法除了调用setHead 外面还调用了doReleaseShared(唤醒后继节点)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();}
    }

其余的办法基本上是和 ReentrantLock 来实现的独占锁差不多,我置信大家对源码剖析感兴趣的应该也不多,其余更多细节问题还是须要本人亲自动手去看源码的。

总结

  • 当信号量 Semaphore 初始化设置许可证为 1 时,它也能够当作互斥锁应用。其中 0、1 就相当于它的状态,当 = 1 时示意其余线程能够获取,当 = 0 时,排他,即其余线程必须要期待。
  • SemaphoreJUC 包中的一个很简略的工具类,用来实现多线程下对于资源的同一时刻的拜访线程数限度
  • Semaphore中存在一个【许可】的概念,即拜访资源之前,先要取得许可,如果以后许可数量为0,那么线程阻塞,直到取得许可
  • Semaphore外部应用 AQS 实现,由形象外部类 Sync 继承了 AQS。因为Semaphore 天生就是共享的场景,所以其外部实际上相似于共享锁的实现
  • 共享锁的调用框架和独占锁很类似,它们最大的不同在于获取锁的逻辑——共享锁能够被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
  • 因为共享锁同一时刻能够被多个线程持有,因而当头节点获取到共享锁时,能够立刻唤醒后继节点来争锁,而不用等到开释锁的时候。因而,共享锁触发唤醒后继节点的行为可能有两处,一处在以后节点胜利取得共享锁后,一处在以后节点开释共享锁后。
  • 采纳 semaphore 来进行限流的话会产生 突刺景象

指在肯定工夫内的一小段时间内就用完了所有资源,后大部分工夫中无资源可用。
比方在限流办法中的计算器算法,设置 1s 内的最大申请数为 100,在前 100ms 曾经永远了 100 个申请,则前面 900ms 将无奈解决申请,这就是突刺景象

完结

  • 因为本人满腹经纶,难免会有纰漏,如果你发现了谬误的中央,还望留言给我指出来, 我会对其加以修改。
  • 如果你感觉文章还不错,你的转发、分享、赞叹、点赞、留言就是对我最大的激励。
  • 感谢您的浏览, 非常欢送并感谢您的关注。

站在伟人的肩膀上摘苹果:
https://segmentfault.com/a/11…

正文完
 0