乐趣区

关于java:Java同步组件之CountDownLatchSemaphore

Java 同步组件详情

  • CountDownLatch : 是闭锁, 通过一个计数来保障线程是否始终阻塞
  • Semaphore: 管制同一时间, 并发线程数量
  • CyclicBarrier: 字面意思是回环栅栏, 通过它能够实现让一组线程期待至某个状态之后再全副同时执行。
  • ReentrantLock: 是一个重入锁, 一个线程取得了锁之后依然能够重复加锁, 不会呈现本人阻塞本人的状况。
  • Condition: 配合ReentrantLock, 实现期待 / 告诉模型
  • FutureTask:FutureTask 实现了接口 Future,同 Future 一样,代表异步计算的后果。

CountDownLatch 同步辅助类

CountDownLatch类位于 java.util.concurrent 包, 利用它能够实现相似计数器的性能, 比方有一个工作 A, 它要期待其它 4 个工作执行结束后能力执行, 此时就能够应用 CountDownLatch 来实现这种性能。

假如计数器的值是 3, 线程 A 调用 await() 办法后, 以后线程就进入了期待状态, 之后其它线程中执行 CountDownLatch, 计数器就会减 1, 当计数器从 3 变成 0, 线程 A 继续执行,CountDownLatch 这个类能够阻塞以后线程, 保障线程在某种条件下, 继续执行。

结构器中的计数值 (count) 实际上就是闭锁须要期待的线程数量, 这个值只能被设置一次, 而且 CountDownLatch 没有提供任何机会批改这个计数值。

CountDownLatch 代码案例

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{Thread.sleep(3000);
                System.out.println("工作一实现");

            }catch (Exception e){e.printStackTrace();
            }


            countDownLatch.countDown();});
        executorService.execute(()->{
            try{Thread.sleep(5000);
                System.out.println("工作二实现");

            }catch (Exception e){e.printStackTrace();
            }

            countDownLatch.countDown();});
        countDownLatch.await();
        // 所有子工作执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();}
}

工作一实现
工作二实现
主线程开始工作.....

CountDownlatch 指定工夫实现工作, 如果在规定工夫内实现, 则期待之前的期待线程 (countDownLatch.await()) 继续执行

countDownLatch.await(int timeout,TimeUnit timeUnit);设置,第一个参数没超时工夫,第二个参数为工夫单位。

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{Thread.sleep(3000);
                System.out.println("工作一实现");

            }catch (Exception e){e.printStackTrace();
            }


            countDownLatch.countDown();});
        executorService.execute(()->{
            try{Thread.sleep(5000);
                System.out.println("工作二实现");

            }catch (Exception e){e.printStackTrace();
            }

            countDownLatch.countDown();});
        // 这里只等 3 秒
        countDownLatch.await(3, TimeUnit.SECONDS);
        // 所有子工作执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();}
}
// 工作一实现
// 主线程开始工作.....
// 工作二实现

Semaphore控制线程数量

Semaphore常常用于限度获取某种资源的线程数量, 其外部是基于 AQS 的共享模式,AQS 的状态能够示意许可证的数量, 许可证数量不够线程被挂起; 而一旦有一个线程开释资源, 那么可唤醒期待队列中的线程继续执行。

Semaphore翻译过去就是信号量,Semaphore能够阻塞过程并管制同时拜访的线程数, 通过 acquire() 获取一个许可, 如果没有就期待, 而 release() 开释一个许可,Semaphore有点相似锁。

CountDownLatchSemaphore 在应用时, 通过和线程池配合应用。
Semaphore适宜管制并发,CountDownLatch比拟适宜保障线程执行完后再执行其它解决, 因而模仿并发两者联合最好。

Semaohore利用场景

Semaphore适宜做流量管制, 特地是共享的无限资源, 比方数据库连贯, 如果有一个需要,要读取几万个文件的数据,因为都是 IO 密集型工作,咱们能够启动几十个线程并发的读取,然而如果读到内存后,还须要存储到数据库中,而数据库的连接数只有 10 个,这时咱们必须管制只有十个线程同时获取数据库连贯保留数据,否则会报错无奈获取数据库连贯。这个时候,咱们就能够应用 Semaphore 来做流控。

package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{semaphore.acquire(); // 获取一个许可
                    update(j);
                    semaphore.release(); // 开释一个许可}catch (Exception e) {e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();}

    private static void update(Integer j) throws Exception {System.out.println(j);
        Thread.sleep(2000);

    }
}

每 2 秒打印 3 个数字。

package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{semaphore.acquire(3); // 获取多个许可
                    update(j);
                    semaphore.release(3); // 开释多个许可

                }catch (Exception e) {e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();}

    private static void update(Integer j) throws Exception {System.out.println(j);
        Thread.sleep(2000);

    }
}

每 2 秒打印一个数字。

tryAcquire

尝试获取许可,如果获取不胜利,则放弃操作,tryAcquire 办法提供几个重载

  • tryAcquire() : boolean
  • tryAcquire(int permits) : boolean 尝试获取指定数量的许可
  • tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
  • tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候能够期待一段时间,在指定工夫内未获取到许可则放弃

Semaphore源码剖析

Semaphore 有两种模式,偏心模式和非偏心模式。偏心模式就是调用 acquire 的程序就是获取许可证的程序,遵循 FIFO;而非偏心模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证开释时失去了这个许可证,而后面还有期待的线程。

// 非偏心模式
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
// fair=true 为偏心模式,false= 非偏心模式
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public class Semaphore implements java.io.Serializable {
    /*
     * 只指定许可量,结构不偏心模式
     */
    public Semaphore(int permits) {sync = new NonfairSync(permits);
    }
   
    /*
     * 指定许可量,并指定模式
     */
    public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    //Semaphore 外部基于 AQS 的共享模式,所以实现都委托给了 Sync 类。abstract static class Sync extends AbstractQueuedSynchronizer {}
    
        /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            // 能够看到调用了 setState 办法,也就是说 AQS 中的资源就是许可证的数量。super(permits);
        }

        protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            // 能够看到调用了 setState 办法,也就是说 AQS 中的资源就是许可证的数量。super(permits);
        }

        protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    
}

关注微信公众号:【入门小站】, 解锁更多知识点

退出移动版