关于java:Java深入学习并发原理总结

75次阅读

共计 72609 个字符,预计需要花费 182 分钟才能阅读完成。

Java 并发多线程根底总结

线程池

线程池的简介

线程池就是首先创立一些线程,它们的汇合称为线程池。应用线程池能够很好地进步性能,线程池在系统启动时即创立大量闲暇的线程,程序将一个工作传给线程池,线程池就会启动一条线程来执行这个工作,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,期待执行下一个工作。

为什么要应用线程池

如果不应用线程池,每一个工作都会新开一个线程解决。

为了缩小创立和销毁线程的次数,让每个线程能够屡次应用,可依据零碎状况调整执行的线程数量,避免耗费过多内存,所以咱们能够应用线程池。

线程池的益处

  • 放慢响应速度
  • 正当利用 CPU 和内存
  • 对立治理

线程池的工作机制

  1. 线程池刚创立的时候没有任何线程,当来了新的申请的时候才会创立 外围线程 去解决对应的申请。
  2. 当解决实现之后,外围线程并不会回收。
  3. 在外围线程达到指定的数量之前,每一个申请都会在线程池中创立一个新的外围线程。
  4. 当外围线程全都被占用的时候,新来的申请会放入工作队列中。工作队列实质上是一个 阻塞队列
  5. 当工作队列被占满,再来的新申请会交给长期线程来解决。
  6. 长期线程在应用实现之后会持续存活一段时间,直到没有申请解决才会被销毁。

线程池参数详解

线程池构造函数的参数

参数名 类型 含意
corePoolSize int 外围线程数
maxPoolSize int 最大线程数
keepAliveTime long 放弃存活工夫
workQueue BlockingQueue 工作存储队列
threadFactory ThreadFactory 当线程池须要新的线程时,应用 ThreadFactory 来创立新的线程
Handler RejectedExecutionHandler 因为线程池无奈承受所提交的工作所给出的回绝策略
  • corePoolSize:指的是外围线程数,线程池初始化实现后,默认状况下,线程池并没有任何线程,线程池会期待工作到来时,再创立新的线程去执行工作。
  • maxPoolSize:线程池有可能会在外围线程数上,额定减少一些线程,然而这些新减少的线程有一个下限,最大不能超过 maxPoolSize。

    • 如果线程数小于 corePoolSize,即便其余工作线程处于闲暇状态,也会创立一个新的线程来运行工作。
    • 如果线程数大于等于 corePoolSize 但少于 maxPoolSize,则将工作放进工作队列中。
    • 如果队列已满,并且线程数小于 maxPoolSize,则创立一个新线程来运行工作。
    • 如果队列已满,并且线程数曾经大于等于 maxPoolSize,则应用回绝策略来回绝该工作。
  • keepAliveTime:一个线程如果处于闲暇状态,并且以后的线程数量大于 corePoolSize,那么在指定工夫后,这个闲暇线程会被销毁,这里的指定工夫由 keepAliveTime 来设定。
  • workQueue:新工作被提交后,会先进入到此工作队列中,任务调度时再从队列中取出工作。jdk 中提供了四种工作队列:

    • ArrayBlockingQueue:基于数组的 有界阻塞队列,按 FIFO 排序。新工作进来后,会放到该队列的队尾,有界的数组能够避免资源耗尽问题。当线程池中线程数量达到 corePoolSize 后,再有新工作进来,则会将工作放入该队列的队尾,期待被调度。如果队列曾经是满的,则创立一个新线程,如果线程数量曾经达到 maxPoolSize,则会执行回绝策略。
    • LinkedBlockingQueue:基于链表的 无界阻塞队列(其实最大容量为 Interger.MAX),依照 FIFO 排序。因为该队列的近似无界性,当线程池中线程数量达到 corePoolSize 后,再有新工作进来,会始终存入该队列,而不会去创立新线程直到 maxPoolSize,因而应用该工作队列时,参数 maxPoolSize 其实是不起作用的。
    • SynchronousQueue:一个 不缓存工作的阻塞队列,生产者放入一个工作必须等到消费者取出这个工作。也就是说新工作进来时,不会缓存,而是间接被调度执行该工作,如果没有可用线程,则创立新线程,如果线程数量达到 maxPoolSize,则执行回绝策略。
    • PriorityBlockingQueue:具备优先级的 无界阻塞队列,优先级通过参数 Comparator 实现。
    • delayQueue:具备优先级的 延时无界阻塞队列
    • LinkedTransferQueue:基于链表的 无界阻塞队列
    • LinkedBlockingDeque:基于链表的 双端阻塞队列
  • threadFactory:创立一个新线程时应用的工厂,能够用来设定线程名、是否为 daemon 线程等等
  • handler:当工作队列中的工作已达到最大限度,并且线程池中的线程数量也达到最大限度,这时如果有新工作提交进来,就会执行回绝策略。

增加线程的流程

线程池用法演示

  • newFixedThreadPool:固定大小线程池
public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(5);// 外围线程数
        for (int i = 0; i < 1000; i++) {executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();}

    @Override
    public void run() {System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-5
...

咱们能够看到,打印进去的最多的线程也就是五个。

咱们看一下源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
  • 第一个参数:corePoolSize,外围线程数:5
  • 第二个参数:maxPoolSize,最大线程数:5
  • 第三个参数:keepAliveTime,最大存活工夫:0
  • 第四个参数:存活工夫单位,单位毫秒
  • 第五个参数:workQueue,阻塞队列应用的是 LinkedBlockingQueue,也就是无界队列

最初 new ThreadPoolExecutor(),咱们看下这个办法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

将咱们的参数传递过来后,线程工厂应用的是默认的线程工厂,和默认的回绝策略处理器。

因为咱们应用的是无界阻塞队列,所以相当于 maxPoolSize 没有用途。如果工作特地多,外围线程解决不过去的话,就会始终将工作放入到 LinkedBlockingQuene 中,可能会导致 OOM。

演示 OOM:

//-Xms5m -Xmx5m
public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(1);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();}

    @Override
    public void run() {
        try {TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

运行后果:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:13)
  • newFixedThreadPool:单个外围线程的线程池
public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();}

    @Override
    public void run() {System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

咱们看下源码:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newSingleThreadExecutor 其实和 newFixedThreadPool 差距不大,只是将外围线程数和最大线程数都设置为了 1,同样也是应用的 LinkedBlockingQueue,也可能会导致 OOM。

  • newCachedThreadPool:可缓存的线程池
public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {executorService.execute(new ThreadPoolTest());
        }
        executorService.shutdown();}

    @Override
    public void run() {System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-5
pool-1-thread-3
pool-1-thread-9
pool-1-thread-6
pool-1-thread-10
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
pool-1-thread-12
pool-1-thread-12
pool-1-thread-10
pool-1-thread-15
pool-1-thread-13

咱们看下源码:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

能够看进去,它外围线程数为 0,最大线程数量为 int 的最大值,存活工夫为 60 秒,应用的是 SynchronousQueue,也就是不存储工作的阻塞队列。

SynchronousQueue 确实不会导致 OOM,然而!咱们的线程池能够寄存 2147483647 个线程。在内存不够的状况下仍然会报出 OOM!

  • newFixedThreadPool:反对定时及周期性工作执行的线程池
public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        //executorService.schedule(new ThreadPoolTest(),5, TimeUnit.SECONDS); // 延时运行
        executorService.scheduleAtFixedRate(new ThreadPoolTest(),1,3,TimeUnit.SECONDS);// 反复运行
    }

    @Override
    public void run() {System.out.println(Thread.currentThread().getName());
    }
}

schedule()办法参数:工作,多久后运行、工夫单位

scheduleAtFixedRate()办法参数:工作、第一次执行工夫:1、每隔多久运行一次:3、工夫单位

咱们看一下源码:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

咱们看到 DelayedWordkQueue 继承了 AbstractCollection 接口,实现了 BlockingQueue,所以和 ArrayBlockingQueue 以及 LinkedBlockingQueue 是兄弟关系。
DelayedWorkQueue 定义了一个 DelayQueue,所以 DelayedWorkQueue 的实现是依赖 DelayQueue 的。

DelayQueue:Delayed 元素的一个无界阻塞队列,只有在提早期满时能力从中提取元素。该队列的头部是提早期满后保留工夫最长的 Delayed 元素。如果提早都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS)办法返回一个小于等于 0 的值时,将产生到期。即便无奈应用 take 或 poll 移除未到期的元素,也不会将这些元素作为失常元素看待。例如,size 办法同时返回到期和未到期元素的计数。此队列不容许应用 null 元素。

BlockingQueue 外围办法

办法类型 抛出异样 非凡值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
查看 element() peek() 不可用 不可用
  • 抛出异样

当阻塞队列满时,再往队列里 add 插入元素会抛出 IllegalStateException:Queue full

当阻塞队列空时,再往队列里 remove 移除元素会抛出 NoSuchElementException

  • 非凡值

插入方法,胜利 true 失败 false。

移除办法,胜利返回元素,没有元素就返回 null。

  • 阻塞

当阻塞队列满时,生产者线程持续往队列里 put 元素,队列就会始终阻塞生产线程直到 put 数据 or 响应退出。

当阻塞队列空时,消费者线程试图从队列里 take 元素,队列就会始终阻塞消费者线程直到队列可用。

  • 超时退出

当阻塞队列满时,队列会阻塞生产者线程肯定工夫,超出工夫后生产者线程就会推出。

正确的创立线程池的办法

Executors 存在什么问题?

在阿里巴巴 Java 开发手册中提到,应用 Executors 创立线程池可能会导致 OOM(OutOfMemory , 内存溢出)。

咱们之前也曾经演示了 OOM 的状况,咱们看下如何正确创立线程池。

防止应用 Executors 创立线程池,次要是防止应用其中的默认实现,那么咱们能够本人间接调用 ThreadPoolExecutor 的构造函数来本人创立线程池。在创立的同时,给 BlockQueue 指定容量就能够了。

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10))

具体咱们须要依据不同的业务场景、本人设置线程池的参数、想应用某种队列、想应用本人的线程工厂、想指定某种回绝策略等等,来实现更适合的线程池。

进行线程池的正确办法

第一种:shutdown

调用线程池的此办法后,不再承受新的工作,如果有新的工作减少则会抛出异样,待所有工作都执行敞开后,进行敞开。

public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {if (i == 5) {executorService.shutdown();
            }
            executorService.execute(new ThreadPoolTest());
        }
    }

    @Override
    public void run() {
        try {TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@3764951d rejected from java.util.concurrent.ThreadPoolExecutor@4b1210ee[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:16)
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-1

第二种:isShutdown

当调用 shutdown 之后,此值为 true。并不是所有工作都执行结束才是 true。

第三种:isTerminated

线程池所有工作是否曾经敞开, 包含正在执行和队列中的工作都完结了则返回 true。

public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {if (i >= 3) {executorService.shutdown();
                System.out.println(executorService.isTerminated());
            }else{executorService.execute(new ThreadPoolTest());
            }
        }
        try {TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("最初线程池状态是否敞开:"+executorService.isTerminated());
    }

    @Override
    public void run() {
        try {TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

false
false
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
最初线程池状态是否敞开:true

第四种:awaitTermination

检测阻塞期待一段时间后,如果线程池工作都执行完了,返回 true,否则 false。

第五种:shutdownNow

立即敞开所有线程。该办法会返回所未实现办法的汇合。

public class ThreadPoolTest implements Runnable {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {if (i >= 3) {Collection<Runnable> runnables = executorService.shutdownNow();
                runnables.forEach(System.out::println);
            }else{executorService.execute(new ThreadPoolTest());
            }
        }
    }

    @Override
    public void run() {
        try {TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {System.out.println("我被中断了!");
        }
        System.out.println(Thread.currentThread().getName());
    }
}

运行后果:

我被中断了!pool-1-thread-1
我被中断了!pool-1-thread-2
com.thread.ThreadPoolTest@4e50df2e

回绝策略解析

拒接机会

  1. 当 executor 敞开时,提交 新工作 会被 回绝
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.ThreadPoolTest@2b193f2d rejected from java.util.concurrent.ThreadPoolExecutor@355da254[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.thread.ThreadPoolTest.main(ThreadPoolTest.java:15)
  1. 当 executor 对最大线程和工作队列容量应用无限边界并且 曾经饱和 时。

四种回绝策略

  • CallerRunsPolicy:在调用者线程中间接执行被回绝工作的 run 办法,除非线程池曾经 shutdown,则间接摈弃工作。
  • AbortPolicy:间接抛弃工作,并抛出 RejectedExecutionException 异样。(默认回绝策略)
  • DiscardPolicy:间接抛弃工作,什么都不做。
  • DiscardOldestPolicy:该策略下,摈弃进入队列最早的那个工作,而后尝试把这次回绝的工作放入队列。

Executor 家族解析

Executor、ExecutorService、ThreadPoolExecutor、Executors 之间的关系

  1. Executor
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

Executor 外面只有一个 execute(Runnable command)回调接口。用于执行已提交的 Runnable 工作对象。

  1. ExecutorService
public interface ExecutorService extends Executor {

ExecutorService 接口是继承 Executor 接口,减少了一些对于中断的办法。

办法 invokeAny 和 invokeAll 是批量执行的最罕用模式,它们执行工作 collection,而后期待至多一个,
或全副工作实现(可应用 ExecutorCompletionService 类来编写这些办法的自定义变体)。

submit 办法是提交一个返回值的工作用于执行,返回一个示意工作的未决后果的 Future。该 Future 的 get 办法在胜利实现时将会返回该工作的后果。

  1. ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {

ThreadPoolExecutor 是 ExecutorService 的一个实现类,它应用可能的几个池线程之一执行每个提交的工作,通常应用 Executors 工厂办法配置。
线程池能够解决两个不同问题:因为缩小了每个工作调用的开销,它们通常能够在执行大量异步工作时提供加强的性能,并且还能够提供绑定和治理资源(包含执行工作集时应用的线程)的办法。

  1. Executors
public class Executors {

Executors 是一个工具类,能够用于不便的创立线程池。

线程池实现线程复用的原理

咱们间接看 execute 办法源码:

public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果正在运行的外围线程数小于外围线程总数
        if (workerCountOf(c) < corePoolSize) {
            // 减少一个外围线程来执行工作
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

咱们看一下 addWorker 办法:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                if (workerAdded) {t.start();
                    workerStarted = true;
                }
            }
        } finally {if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}

首先判断以后线程池的状态,如果曾经状态不是 shutdown 或者 running,或者曾经为 shutdown 然而工作队列曾经为空,那么这个时候间接返回增加工作失败。接下来是对线程池线程数量的判断,依据调用时的 core 的值来判断是跟 corePoolSize 还是 maximumPoolSize 判断。

在确认了线程池状态以及线程池中工作线程数量之后,才真正开始增加工作线程。

新建设一个 worker 类(线程池的外部类,具体的工作线程),将要执行的具体线程做为构造方法中的参数传递进去,接下来将其退出线程池的工作线程容器 workers,并且更新工作线程最大量,最初调用 worker 工作线程的 start()办法,就实现了工作线程的建设与启动。

接下来咱们能够看最重要的,也就是咱们之前建设完 Worker 类之后立马调用的 run()办法了

public void run() {runWorker(this);
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {while (task != null || (task = getTask()) != null) {w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {beforeExecute(wt, task);
                Throwable thrown = null;
                try {task.run();
                } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();}
        }
        completedAbruptly = false;
    } finally {processWorkerExit(w, completedAbruptly);
    }
}

接下来可见,咱们所须要的工作,间接在工作线程中间接以 run()形式以非线程的形式所调用,这里也就是咱们所须要的工作真正执行的中央。

在执行结束后,工作线程的使命并没有真正宣告段落。在 while 局部 worker 仍旧会通过 getTask()办法试图获得新的工作。

上面是 getTask()的实现:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {timedOut = false;}
    }
}

首先仍旧会判断线程池的状态是否是 running 还是 shutdown 以及 stop 状态下队列是否仍旧有须要期待执行的工作。

如果状态没有问题,则会跟据 allowCoreThreadTimeOut 和 corePoolSize 的值通过对后面这两个属性解释的形式来抉择从工作队列中取得工作的形式(是否设置 timeout)。

其中的 timedOut 保障了确认前一次试图取工作时超时产生的记录,以确保工作线程的回收。

在 runWorker()办法的最初调用了 processWorkerExit 来执行工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
}

先确保曾经从新更新了线程池中工作线程的数量,之后从线程池中的工作线程容器移去当前工作线程,并且将实现的工作总数加到线程池的工作总数当中。

之后尝试设置线程池状态为 TERMINATED。

如果线程池的线程数量小于外围线程时, 则减少一个线程来持续解决工作队列中工作。

execute 执行流程图

线程池状态

  • RUNNING:承受新的工作并解决排队工作
  • SHUTDOWN:不承受新的工作,但解决排队工作
  • STOP:不承受新工作,也不解决排队工作,并中断正在执行的工作
  • TIDYING:所有工作都已终止并 workerCount 为 0 时,并执行 terminate()办法
  • TERMINATED:terminate()运行实现

源码:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

应用线程池的留神点

  • 防止工作 沉积
  • 防止线程数 适度减少
  • 排查 线程泄露

ThreadLocal 详解

什么是 ThreadLocal

ThreadLocal 提供一个线程(Thread)局部变量,拜访到某个变量的每一个线程都领有本人的局部变量。说白了,ThreadLocal 就是想在多线程环境上来保障成员变量的平安。

ThreadLocal 的用处

  • 用处一:每个线程须要独享的对象
  • 用处二:每个线程内 须要保留全局变量(例如在拦截器中获取的用户信息),能够让不同办法间接应用,防止参数传递 的麻烦

用处一:每个线程须要一个独享的对象

每个 Thread 内有 本人 的实例正本,不共享

比方:教材只有一本,一起做笔记有线程平安的问题,复印 后就能够解决这个问题。

需要:咱们想打印出两个线程不同的工夫

public class ThreadLocalTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();}

    public static String getDate(int seconds) {Date date = new Date(1000 * seconds);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return simpleDateFormat.format(date);
    }
}

运行后果:

1970-01-01 08:01:41
1970-01-01 08:01:40

看起来是咱们想要的后果。

然而如果咱们想打印 1000 条不同的工夫,须要用到很多线程,咱们就会创立销毁 1000 个 SimpleDateFormat 对象,无疑是节约内存的写法。

既然这样,那咱们就把 SimpleDateFormat 创立为类变量试试看。

public class ThreadLocalTest {static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();}

    public static String getDate(int seconds) {Date date = new Date(1000 * seconds);
        return simpleDateFormat.format(date);
    }
}

运行后果:

能够看到这样会引发线程平安的问题。

当然,咱们也能够进行加锁来解决这个问题,然而会引发效率问题。

正确计划应用 ThreadLocal 来解决这个问题

public class ThreadLocalTest {static ThreadLocal<SimpleDateFormat> threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            executorService.execute(() -> System.out.println(getDate(finalI + 100)));
        }
        executorService.shutdown();
        threadLocal.remove();}

    public static String getDate(int seconds) {Date date = new Date(1000 * seconds);
        return threadLocal.get().format(date);
    }
}

用处二:以后用户信息须要被线程内所有办法共享

比拟繁琐的计划就是作为参数层层专递。

如果应用 map 也须要保障线程平安问题,所以须要加锁或者应用 ConcurrentHashMap,但都对性能有影响

所以咱们应用 ThreadLocal 来实现。

public class ThreadLocalTest {public static void main(String[] args) {new Service1().precess();}
}

class Service1 {static ThreadLocal<User> threadLocal = new ThreadLocal<>();

    public void precess() {User user = new User("Jack");
        threadLocal.set(user);
        new Service2().precess();
        threadLocal.remove();}
}

class Service2 {public void precess() {System.out.println("Service2 拿到" + Service1.threadLocal.get().name);
        new Service3().precess();
    }
}

class Service3 {public void precess() {System.out.println("Service3 拿到" + Service1.threadLocal.get().name);
    }
}

class User {
    public String name;

    public User(String name) {this.name = name;}
}

运行后果:

Service2 拿到 Jack
Service3 拿到 Jack

ThreadLocal 的两个作用

  1. 让某个须要用到的对象在 线程间隔离
  2. 在任何办法中都能够轻松获取到该对象。

场景一:initialValue:在 ThreadLocal第一次 get的时候把对象给初始化进去,对象的初始化机会能够 由咱们管制

场景二:set:如果须要保留到 ThreadLocal 中的对象生成机会 不禁咱们管制 ,咱们能够应用 ThreadLocal.set() 间接放到 ThreadLocal 中去,以便后续应用。

应用 ThreadLocal 带来的益处

  • 达到 线程平安
  • 不须要加锁 ,进步执行 效率
  • 更高效的 利用内存 节俭开销
  • 免去传参 的繁琐

ThreadLocal 原理

Thread、ThreadLocal、ThreadLocalMap 之间的关系

在 Thread 类中蕴含一个成员变量

 /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;

一个 ThreadLocalMap 又能够蕴含无数个 ThreadLocal。

图解如下:

ThreadLocal 重要办法介绍

  • T initialValue():初始化,咱们看一下办法原理

咱们先看下 get 办法:

public T get() {Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {@SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();}

能够看到在 ThreadLocalMap 为 null 的时候咱们调用了 setInitialValue()办法

private T setInitialValue() {T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在 initialValue 办法没有被重写的时候返回的是 null,因为咱们曾经重写了,所以它会将咱们的 value 放到 ThreadLocalMap 中的 ThreadLocal 对象中。

通常,每个线程最多调用 一次 此办法,如果曾经调用了 remove()办法后,再调用 get(),就会再次触发 initialValue()办法。

  • set(T t):为这个线程设置一个新值
  • T get():失去这个线程对应的 value,如果是第一次调用 get,则会调用 InitialValue()来获取值。
  • void remove():删除对应这个线程的值

ThreadLocal 重要办法解析

get 办法解析:

咱们先看源码:

public T get() {Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {@SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();}

方才曾经讲过 map 为 null 的状况,咱们看下如果不为 null 是如何获取到值的。

首先在 map.getEntry(this)中咱们从不为 null 的 ThreadLocalMap 中 getEntry 也就是咱们的 key,this 就是咱们以后的 ThreadLocal 对象,失去的 e 也就是咱们的键值对,而后.value 来返回咱们的后果。

set 办法解析:

public void set(T value) {
        // 获取以后线程对象
        Thread t = Thread.currentThread();
        // 获取以后线程对象的 ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        // 如果不为 null 就 set 进去,k 为以后 ThreadLocal,v 就是咱们传入的对象
        if (map != null)
            map.set(this, value);
        else
        // 为 null 就去创立 ThreadLocalMap 并 set 以后 k、v
            createMap(t, value);
}

remove 办法解析:

public void remove() {ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
}

remove 办法比较简单,就是拿到 ThreadLocalMap 而后删除掉 k 等于以后对象的 ThreadLocal。

ThreadLocalMap 类

ThreadLocalMap,也就是 Thread.threadlocals

ThreadLocalMap 类是每个线程 Thread 类外面的变量,外面最重要的一个键值对数组Entry[] table,能够认为是一个 map。

  • k:以后 ThreadLocal
  • v:理论存储的成员变量
        /**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
    private Entry[] table;

如果产生哈希抵触

ThreadLocalMap 和 HashMap 有所不同,HashMap(jdk8)采纳的是链表 + 红黑树

而 ThreadLocalMap 采纳的是线性探测法,如果发生冲突,就 持续寻找下一个空地位,而不是应用链表。

ThreadLocal 留神点

  1. 内存透露

在 ThreadLocal 中有一个动态外部类也就是 ThreadLocalMap。

ThreadLocalMap 中的 Entry 是继承了 WeakReference 也就是 弱援用

弱援用的特点就是在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具备弱援用的对象,不论以后内存空间足够与否,都会回收它的内存。

然而咱们发现上面一句 value = v; 又蕴含了强援用。

失常状况下,当线程终止,保留在 ThreadLocal 中的 value 就会被垃圾回收,因为没有任何强援用了。

然而如果线程不终止(比方线程须要放弃很久),那么 key 对应的 value 就不能被回收,因为有以下调用链:

Thread –> ThreadLocalMap –> Entry(key 为 null)–> Value

因为 value 和 Thread 之间还保留这个强援用链路,所以导致value 无奈被回收,就可能回呈现 OOM。

JDK 曾经思考到这个问题,所以在 set、remove、rehash 办法中会扫描 key 为 null,如果 key 为 null 则会把 value 也设置为 null。

然而如果 ThreadLocal 不被应用,那么 set、remove、rehash 办法也不会被调用,如果同时线程并没有进行,则调用链会始终存在,就会导致 value 的内存透露。

所以咱们须要在应用完 ThreadLocal 后被动应用 remove()办法来防止内存透露。

  1. 如果 set 进去的是一个 static 对象,则还是会有并发拜访的问题
  2. 子线程拜访问题

咱们来看一下什么是子线程拜访问题。

public class ThreadLocalTest {public static void main(String[] args) {ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("Hello");
        new Thread(() -> {System.out.println(threadLocal.get());
        }, "Thread01").start();}
}

运行后果:

null

咱们看一下为什么是 null,咱们间接跟进到 get 办法中:

能够很分明的看到,咱们在 get 的时候拿到以后线程是 Thead01,而咱们 set 进去的是 main 线程,所以咱们拿到的 ThreadLocalMap 是 null。

而后咱们调用 setInitialValue()办法

private T setInitialValue() {T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
}

在第一句调用了 initialValue()办法:

protected T initialValue() {return null;}

这下咱们就明确了,咱们返回了个 null,并且在 Thead01 子线程中创立了一个 ThreadLocalMap,value 为 null。

咱们看另一个例子:

public class ThreadLocalTest {public static void main(String[] args) {ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "Hello");
        new Thread(() -> {System.out.println(threadLocal.get());
            // System.out.println(threadLocal1.get());
        }, "Thread01").start();}
}

运行后果:

Hello

我置信大家曾经明确为什么能获取到 Hello 了。

咱们看源码:

static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {return supplier.get();
        }
}

因为在 withInitial 外面咱们继承了 ThreadLocal 并且重写了 initialValue 办法,所以咱们取得到了 Hello。

然而,这样做咱们在子线程中,相当于是又创立了一个 ThreadLocalMap 将 value 存了进去。

InheritableThreadLocal 解析

咱们方才曾经看到了在子线程中无法访问到父线程 ThreadLocal 类型变量的值。

咱们试试 InheritableThreadLocal 类

public class ThreadLocalTest {public static void main(String[] args) {ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("hello");
        new Thread(() -> {System.out.println(threadLocal.get());
        }, "Thread01").start();}
}

运行后果:

hello

然而,InheritableThreadLocal 为什么可能读取进去?

在 Thread 类中,inheritableThreadLocals,他的类型同 Thread 外部的 threadLocals 变量。

咱们看一下这个类源码:

public class InheritableThreadLocal<T> extends ThreadLocal<T> {

    // 该函数在父线程创立子线程,向子线程复制 InheritableThreadLocal 变量时应用
    protected T childValue(T parentValue) {return parentValue;}

    /**
     * 因为重写了 getMap,操作 InheritableThreadLocal 时,* 将只影响 Thread 类中的 inheritableThreadLocals 变量,* 与 threadLocals 变量不再有关系
     */
    ThreadLocalMap getMap(Thread t) {return t.inheritableThreadLocals;}

    /**
     * 相似于 getMap,操作 InheritableThreadLocal 时,* 将只影响 Thread 类中的 inheritableThreadLocals 变量,* 与 threadLocals 变量不再有关系
     */
    void createMap(Thread t, T firstValue) {t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

InheritableThreadLocal 继承了 ThreadLocal 并且重写了三个办法。

咱们这个时候回过头看 Thread 类的初始化 init 办法

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {

如果 parent 的 inheritableThreadLocals 不是 null,那么就会将以后线程的 inheritableThreadLocals 设置为 parent 的 inheritableThreadLocals

parent 是什么?之前也说过了,就是创立这个线程的线程,也就是平时说的父线程。

所以说 借助于 inheritableThreadLocals,能够实现,创立线程向被创立线程之间数据传递

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {return new ThreadLocalMap(parentMap);
}

逻辑很清晰,创立了一个 ThreadLocalMap

简略了解:这个创立的 ThreadLocalMap 就是依据入参的 ThreadLocalMap,拷贝创立一份。

总结

其实就是从父线程(以后创立线程)中复制的一份,而后后续的数据读取解析,则是通过 inheritableThreadLocals 变量,和外部的那个 threadLocals 没有什么关系。

Lock 接口

什么是 Lock

锁是一种工具,用于管制对 共享资源 的拜访。

Lock 和 synchronized,这两个都是最常见的锁,它们都能够达到 线程平安 的目标,然而在应用上和性能上有较大不同。

Lock 并不是用来 代替synchronized 的,而是在应用 synchronized 不适宜或者不足以满足要求的时候,来提供更高级更灵便的性能。

Lock 接口最常见的实现类是ReentrantLock

通常状况下,Lock 只容许一个线程来拜访这个共享资源。不过有的时候,一些非凡的实现也能够容许 并发拜访,比方 ReadWriteLock 外面的ReadLock

为什么须要 Lock

首先咱们先看一下为什么 synchronized 不够用?

  1. 效率低:锁的开释状况少,视图获取锁时不能设定超时、不能中断一个正在试图获取锁的线程。
  2. 不够灵便:加锁和开释的机会繁多,每个锁仅有繁多的条件(某个对象)。
  3. 无奈晓得是否胜利取得到锁

Lock 次要办法介绍

在 Lock 中申明了 四个 办法来获取锁

  • lock()
  • tryLock()
  • tryLock(long time,TImeUnit unit)
  • lockInterruptibly()

lock()

lock()就是最一般的获取锁,如果锁曾经被其它线程获取,则进行期待。

lock 不会像 synchronized 一样在 异样时主动开释锁

因而最佳实际是在finally 中开释锁,以保障产生异样时锁肯定会被开释。

public class LockTest {private static Lock lock = new ReentrantLock();

    public static void main(String[] args) {lock.lock();
        try {// 业务逻辑} catch (Exception e) {e.printStackTrace();
        } finally {lock.unlock();
        }

    }

}

顺便说一下为什么不能在 try 内写上 lock.lock();

阿里巴巴标准手册:在应用阻塞期待获取锁的形式中,必须在 try 代码块之外,并且在加锁办法与 try 代码块之间没有任何可能抛出异样的办法调用,防止加锁胜利后,在 finally 中无奈解锁。

  • 阐明一:如果在 lock 办法与 try 代码块之间的办法调用抛出异样,那么无奈解锁,造成其它线程无奈胜利获取锁。
  • 阐明二:如果 lock 办法在 try 代码块之内,可能因为其它办法抛出异样,导致在 finally 代码块中,unlock 对未加锁的对象解锁,它会调用 AQS 的 tryRelease 办法(取决于具体实现类),抛出 IllegalMonitorStateException 异样。
  • 阐明三:在 Lock 对象的 lock 办法实现中可能抛出 unchecked 异样,产生的结果与阐明二雷同。

lock 办法不能被中断,这会带来很大隐患:一旦陷入 死锁 ,lock() 就会陷入永恒期待。

tryLock()

tryLock 用来 尝试获取锁,如果以后锁没有被其余线程占用,则获取胜利,返回 true,否则返回 false,代表获取锁失败。

相比于 lock,这样的办法显然性能更弱小了,咱们能够依据是否能获取到锁来 决定后续程序的行为

该办法会 立刻返回,并不会在拿不到锁时阻塞。

tryLock(long time,TimeUnit unit)

该办法就是在该时间段内尝试获取锁,如果超过工夫就放弃。

lockInterruptibly()

相当于是把 tryLock 的工夫设为有限,在期待锁的过程中,线程能够被 中断

可见性保障

Lock 同样也是遵循 Happens-before 准则。

Lock 的加锁解锁和 synchronized 有同样的 内存语义 ,也就是说,下一个线程 加锁后能够看到所有前一个线程解锁前产生的所有操作

锁的分类图

这些分类并 不是互斥 的,也就是多个类型能够 并存:有可能一个锁,同时属于两种类型。

乐观锁和乐观锁

为什么会诞生非互斥同步锁————互斥同步锁的劣势

  • 阻塞和唤醒带来的性能劣势
  • 永恒 阻塞:如果持有锁的线程被永恒阻塞,比方遇到了有限循环、死锁等活跃性问题,那么期待线该程开释锁的那几个悲催线程,将永远得不到执行。

什么是乐观锁和乐观锁

乐观锁:顾名思义,乐观锁是基于一种乐观的态度类来避免所有数据抵触,它是以一种预防的姿势在批改数据之前把数据锁住,而后再对数据进行读写,在它开释锁之前任何人都不能对其数据进行操作,直到后面一个人把锁开释后下一个人数据加锁才可对数据进行加锁,而后才能够对数据进行操作,个别数据库自身锁的机制都是基于乐观锁的机制实现的。

典型例子:synchronized、Lock 接口

乐观锁:乐观锁是对于数据抵触放弃一种乐观态度,操作数据时不会对操作的数据进行加锁(这使得多个工作能够并行的对数据进行操作),只有到数据提交的时候才通过一种机制来验证数据是否存在抵触,个别应用 CAS 算法来实现的。

典型例子:Atomic 原子类、并发容器等

开销比照

乐观锁的原始开销要高于乐观锁,然而特点是 一劳永逸,临界区持锁工夫就算越来越差,也不会对互斥锁的开销产生影响。

相同,尽管乐观锁一开始开销比拟小,然而如果自旋工夫很长,或者不停重试,那么 耗费的资源也会越来越多

两种锁各自的应用场景:各有千秋

  • 乐观锁:实用于并发写入多的状况,实用于临界区持锁工夫比拟长的状况,乐观锁能够防止大量的无用自旋等耗费。
  • 乐观锁:实用于读多写少的场景,不加锁能够让读取性能大幅提高。

可重入锁和不可重入锁

可重入锁 就是一个类的 A、B 两个办法,A、B 都有领有同一把锁,当 A 办法调用时,取得锁,在 A 办法的锁还没有被开释时,调用 B 办法时,B 办法也取得该锁。

不可重入锁 就是一个类的 A、B 两个办法,A、B 都有领有同一把锁,当 A 办法调用时,取得锁,在 A 办法的锁还没有被开释时,调用 B 办法时,B 办法也取得不了该锁,必须等 A 办法开释掉这个锁。

synchronized 和 ReentrantLock 都是 可重入锁

上面应用 ReentrantLock 证实可重入锁的例子:

public class LockTest {private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {methodA();
    }

    public static void methodA() {System.out.println("未取得锁之前,count 为:" + lock.getHoldCount());
        lock.lock();
        try {System.out.println("取得 A 的锁,count 为:" + lock.getHoldCount());
            methodB();} finally {lock.unlock();
            System.out.println("开释 A 的锁,count 为:" + lock.getHoldCount());
        }
    }

    public static void methodB() {lock.lock();
        try {System.out.println("取得 B 的锁,count 为:" + lock.getHoldCount());
        } finally {lock.unlock();
            System.out.println("开释 B 的锁,count 为:" + lock.getHoldCount());
        }
    }

}

运行后果:

未取得锁之前,count 为:0
取得 A 的锁,count 为:1
取得 B 的锁,count 为:2
开释 B 的锁,count 为:1
开释 A 的锁,count 为:0

证实了 ReentrantLock 是可重入锁,在 holdCount = 0 的时候就会开释该锁。


public void unlock() {sync.release(1);
}

public final boolean release(int arg) {if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected final boolean tryRelease(int releases) {int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
}

在 unlock()办法中咱们看到以后状态 – 1,如果 c == 0 就阐明开释该锁,不然就只批改锁的状态 state。

可重入锁

不可重入锁

偏心锁和非偏心锁

什么是偏心和非偏心?

偏心指的是依照线程的申请程序,来调配锁;非偏心指的是,不齐全依照申请的程序,在肯定状况下,能够插队。

为什么要有非偏心锁

假如线程 A 持有一把锁,线程 B 申请这把锁,因为线程 A 曾经持有这把锁了,所以线程 B 会陷入期待,在期待的时候线程 B 会被挂起,也就是进入阻塞状态,那么当线程 A 开释锁的时候,本该轮到线程 B 昏迷获取锁,但如果此时忽然有一个线程 C 插队申请这把锁,那么依据非偏心的策略,会把这把锁给线程 C,这是因为唤醒线程 B 是须要很大开销的,很有可能在唤醒之前,线程 C 曾经拿到了这把锁并且执行完工作开释了这把锁。

相比于期待唤醒线程 B 的漫长过程,插队的行为会让线程 C 自身跳过陷入阻塞的过程,如果在锁代码中执行的内容不多的话,线程 C 就能够很快实现工作,并且在线程 B 被齐全唤醒之前,就把这个锁交出去,这样是一个双赢的场面,对于线程 C 而言,不须要期待进步了它的效率,而对于线程 B 而言,它取得锁的工夫并没有推延,因为等它被唤醒的时候,线程 C 早就开释锁了,因为线程 C 的执行速度相比于线程 B 的唤醒速度,是很快的,所以 Java 设计非偏心锁,是为了 进步整体的运行效率 防止唤醒带来的空档期

代码案例偏心锁

public class LockTest {public static void main(String[] args) {PrintQueue printQueue = new PrintQueue();
        Thread thread[] = new Thread[10];
        for (int i = 0; i < 10; i++) {thread[i] = new Thread(new Job(printQueue));
        }
        for (int i = 0; i < 10; i++) {thread[i].start();
            try {TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }
}

class Job implements Runnable {

    PrintQueue printQueue;

    public Job(PrintQueue printQueue) {this.printQueue = printQueue;}

    @Override
    public void run() {System.out.println(Thread.currentThread().getName() + "开始打印");
        printQueue.printJob(new Object());
        System.out.println(Thread.currentThread().getName() + "打印结束");
    }
}

class PrintQueue {private Lock queueLock = new ReentrantLock(true);

    public void printJob(Object document) {queueLock.lock();
        try {int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,须要" + duration);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {queueLock.unlock();
        }

        queueLock.lock();
        try {int duration = new Random().nextInt(10) + 1;
            System.out.println(Thread.currentThread().getName() + "正在打印,须要" + duration + "秒");
            Thread.sleep(duration * 1000);
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {queueLock.unlock();
        }
    }
}

运行后果:

Thread- 0 开始打印
Thread- 0 正在打印,须要 3
Thread- 1 开始打印
Thread- 2 开始打印
Thread- 3 开始打印
Thread- 4 开始打印
Thread- 5 开始打印
Thread- 6 开始打印
Thread- 1 正在打印,须要 4
Thread- 2 正在打印,须要 7
Thread- 3 正在打印,须要 7
Thread- 4 正在打印,须要 6
Thread- 5 正在打印,须要 5
Thread- 6 正在打印,须要 5
Thread- 0 正在打印,须要 8 秒
Thread- 0 打印结束
Thread- 1 正在打印,须要 2 秒
Thread- 1 打印结束
Thread- 2 正在打印,须要 3 秒
Thread- 2 打印结束
Thread- 3 正在打印,须要 4 秒
Thread- 3 打印结束
Thread- 4 正在打印,须要 2 秒
Thread- 4 打印结束
...

测试非偏心锁只须要将参数改为 false 即可。true 代表偏心锁

private Lock queueLock = new ReentrantLock(true);

源码剖析

偏心锁:

protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //  和非偏心锁相比,这里多了一个判断:队列中是否有线程在期待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
}

非偏心锁:

static final class NonfairSync extends Sync {final void lock() {
        //  和偏心锁相比,这里会间接先进行一次 CAS,胜利就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}
    protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
    }
}

留神

如果应用 tryLock()办法,它是不恪守设定的偏心准则,如果有线程执行 tryLock()的时候,一旦有线程开释了锁,那么这个正在执行 tryLock()的线程会立刻取得锁,即便之前曾经有人在排队了。

总结

劣势 劣势
偏心锁 在平等状况下,每个线程在期待一段时间后都会取得执行的机会 更慢,吞吐量小
不偏心锁 更快,吞吐量大 可能会导致在阻塞队列中的线程长期处于饥饿状态

非偏心锁和偏心锁的两处不同:

非偏心锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候凑巧锁没有被占用,那么间接就获取到锁返回了。

非偏心锁在 CAS 失败后,和偏心锁一样都会进入到 tryAcquire 办法,在 tryAcquire 办法中,如果发现锁这个时候被开释了(state == 0),非偏心锁会间接 CAS 抢锁,然而偏心锁会判断期待队列是否有线程处于期待状态,如果有则不去抢锁,乖乖排到前面。

偏心锁和非偏心锁就这两点区别,如果这两次 CAS 都不胜利,那么前面非偏心锁和偏心锁是一样的,都要进入到阻塞队列期待唤醒。

相对来说,非偏心锁会有更好的性能,因为它的吞吐量比拟大。当然,非偏心锁让获取锁的工夫变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

共享锁和排它锁

什么是共享锁和排它锁

排它锁:又称为独占锁、共享锁

共享锁:又称为读锁,取得共享锁之后,能够查看但无奈批改和删除数据,其余线程此时也能够取得到共享锁,也能够查看但 无奈批改和删除 数据。

共享锁和排它锁的典型是读写锁 ReentrantReadWriteLock,其中 读锁是共享锁,写锁是排它锁

在没有读写锁之前,咱们假如应用 ReentrantLock,尽管保障了线程平安,然而也 节约了肯定的资源 多个读操作同时进行,并没有线程平安问题

在读的中央应用读锁,写的中央应用写锁,灵便管制,如果没有写锁的状况下,读是无阻塞的,大大提高效率。

读写锁的规定

  • 多个线程只申请读锁,都能够申请到。
  • 如果有一个线程占用了读锁,则此时其余线程如果申请写锁,则申请写锁的线程会期待开释读锁。
  • 如果有一个线程占用了写锁,则此时其余线程如果申请读锁,则申请读锁的线程会期待开释写锁。

ReentrantReadWriteLock 具体用法

public class LockTest {private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {new Thread(() -> write()).start();
        new Thread(() -> read()).start();
        new Thread(() -> read()).start();
        new Thread(() -> write()).start();
        new Thread(() -> read()).start();}

    private static void read() {readLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();}
    }

    private static void write() {writeLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
        } finally {System.out.println(Thread.currentThread().getName() + "印刷实现");
            writeLock.unlock();}
    }
}

运行后果:

Thread- 0 开始印刷《Thinking in Java》Thread- 0 印刷实现
Thread- 1 开始学习《Thinking in Java》Thread- 1 太难了!我不学了!Thread- 2 开始学习《Thinking in Java》Thread- 2 太难了!我不学了!Thread- 3 开始印刷《Thinking in Java》Thread- 3 印刷实现
Thread- 4 开始学习《Thinking in Java》Thread- 4 太难了!我不学了!

读锁插队策略

假如线程 1 和线程 2 在读取,线程 3 想要写入,然而拿不到锁,于是进入期待队列,线程 4 不在队列中,当初想要读取。

此时有两种策略

  1. 读能够插队,效率高

然而这样可能会导致前面一堆读线程过去,始终轮不到线程 3 来写。导致写入饥饿。

  1. 防止饥饿

一个个排队,这样就不会导致饥饿,ReentrantReadWriteLock 就是采纳第二种策略。

更确切的说就是:在非偏心锁状况下,容许写锁插队,也容许读锁插队,然而 读锁插队的前提是队列中的头节点不能是想获取写锁的线程。

偏心锁源码:

非偏心锁源码:

锁的升降级

升降级是指读锁降级为写锁,写锁降级为度锁。在 ReentrantReadWriteLock 读写锁中,只反对写锁降级为读锁,而不反对读锁降级为写锁。

代码演示:

public class LockTest {private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {new Thread(() -> write()).start();
        new Thread(() -> read()).start();}

    private static void read() {readLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "取得到了写锁");
        } finally {writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();}
    }

    private static void write() {writeLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "在写锁中获取到了读锁");
        } finally {readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "印刷实现");
            writeLock.unlock();}
    }
}

运行后果:

Thread- 0 开始印刷《Thinking in Java》Thread- 0 在写锁中获取到了读锁
Thread- 0 印刷实现
Thread- 1 开始学习《Thinking in Java》

咱们能够看到在写锁中胜利取得到了读锁,而在 读锁中被始终阻塞。阐明不反对锁降级!

为什么 ReentrantReadWriteLock 不反对锁降级

次要是防止死锁,例如两个线程 A 和 B 都在读,A 降级要求 B 开释读锁,B 降级要求 A 开释读锁,相互期待造成死循环。如果能严格保障每次都只有一个线程降级那也是能够的。

总结

  1. 读写锁特点特点:读锁是共享锁,写锁是排他锁,读锁和写锁不能同时存在
  2. 插队策略:为了避免线程饥饿,读锁不能插队
  3. 降级策略:只能降级,不能降级
  4. ReentrantReadWriteLock 适宜于 读多写少 的场合,能够进步并发效率,而 ReentrantLock 适宜一般场合

自旋锁和阻塞锁

阻塞或者唤醒一个 Java 线程须要操作系统切换 CPU 状态来实现,这种状态转换须要消耗处理器工夫。

如果同步代码块中的内容过于简略,状态转换耗费的工夫有可能比用户代码执行的工夫还要长

在许多场景中,同步资源的锁定工夫很短,为了这一小段时间去切换线程,线程挂起和复原现场的话费可能会让零碎 得失相当

如果物理机器有多个处理器,可能让两个或以上的线程同时并行,咱们就能够让前面那个申请锁的线程不放弃 CPU 的执行工夫,看看持有锁是否会在短时间内开释锁。

而为了让以后线程 ” 稍等一下 ”,咱们须要让以后线程进行自旋,如果在自旋过程中后面锁定的线程开释了锁,那么以后线程就能够间接获取同步资源,防止了 资源耗费 ,这就是 自旋锁

阻塞锁就是如果没拿到锁,会间接阻塞以后线程,直到被唤醒。

自旋锁的毛病

如果锁被占用工夫很长,那么自旋的线程就会白白浪费处理器资源。

代码演示

public class LockTest {private AtomicReference<Thread> sign = new AtomicReference<>();

    public void lock() {Thread current = Thread.currentThread();
        while (!sign.compareAndSet(null, current)) {System.out.println("自旋获取失败,再次尝试");
        }
    }

    public void unlock() {Thread current = Thread.currentThread();
        sign.compareAndSet(current, null);
    }

    public static void main(String[] args) {LockTest spinLock = new LockTest();
        Runnable runnable = () -> {System.out.println(Thread.currentThread().getName() + "开始尝试获取自旋锁");
            spinLock.lock();
            System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
            try {Thread.sleep(300);
            } catch (InterruptedException e) {e.printStackTrace();
            } finally {spinLock.unlock();
                System.out.println(Thread.currentThread().getName() + "开释了自旋锁");
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();}

}

运行后果:

Thread- 0 开始尝试获取自旋锁
Thread- 1 开始尝试获取自旋锁
Thread- 0 获取到了自旋锁
Thread- 0 开释了自旋锁
Thread- 1 获取到了自旋锁
Thread- 1 开释了自旋锁

在 while 中会进行大量的循环判断,能够尝试打印语句看看。

后续会讲述 Atomic 原子类是如何应用 CAS 算法来实现自旋。

自旋锁的应用场景

  • 自选锁个别用于多核的服务器,在 并发度不是特地高 的状况下,比阻塞锁的效率高。
  • 另外,自旋锁实用于临界区比拟短小的状况,否则如果 临界区很大(一旦拿到锁,很久才开释)那也是不适合的。

可中断锁和不可中断锁

在 Java 中,synchronized 是不可中断锁,而 Lock 是可中断锁,因为 tryLock(time)和 lockinterruptibly 都能响应中断。

synchronized 原理以及锁优化

同步代码块:

monitorenter 指令插入到同步代码块的开始地位,monitorexit 指令插入到同步代码块的完结地位,JVM 须要保障每一个 monitorenter 都有一个 monitorexit 与之绝对应。任何对象都有一个 monitor 与之相关联,当且一个 monitor 被持有之后,他将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 所有权,即尝试获取对象的锁。

同步办法:

synchronized 办法则会被翻译成一般的办法调用和返回指令如:invokevirtual、areturn 指令,在 JVM 字节码层面并没有任何特地的指令来实现被 synchronized 润饰的办法,而是在 Class 文件的办法表中将该办法的 accessflags 字段中的 synchronized 标记地位 1,示意该办法是同步办法并应用调用该办法的对象或该办法所属的 Class 在 JVM 的外部对象示意 Klass 做为锁对象。

synchronized 锁 和 对象头非亲非故。所以咱们先理解一下对象在堆中的构造:

咱们须要先理解两个重要的概念:Java 对象头、Monitor。

Java 对象头

synchronized 用的锁是存在 Java 对象头里的,那么什么是 Java 对象头呢?

Hotspot 虚拟机的对象头次要包含两局部数据:Mark Word(标记字段)Klass Pointer(类型指针)。其中 Klass Point 是是对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例,Mark Word 用于存储对象本身的运行时数据,它是实现轻量级锁和偏差锁的要害。然而如果对象是数组类型,则须要三个机器码,因为 JVM 虚拟机能够通过 Java 对象的元数据信息确定 Java 对象的大小,然而无奈从数组的元数据来确认数组的大小,所以用一块来记录数组长度。

Mark Word

Mark Word 用于存储对象本身的运行时数据,如哈希码(HashCode)、GC 分代年龄、锁状态标记、线程持有的锁、偏差线程 ID、偏差工夫戳等等,占用内存大小与虚拟机位长统一。

Klass Word

存储指向对象所属类(元数据)的指针,JVM 通过这个确定这个对象属于哪个类。

Monitor

什么是 Monitor?

咱们能够把它了解为一个同步工具,也能够形容为一种同步机制,它通常被形容为一个对象。

与所有皆对象一样,所有的 Java 对象是天生的 Monitor,每一个 Java 对象都有成为 Monitor 的潜质,因为在 Java 的设计中,每一个 Java 对象都带了一把看不见的锁,它叫做外部锁或者 Monitor 锁。

Monitor 是线程公有的数据结构,每一个线程都有一个可用 Monitor Record 列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个 Monitor 关联(对象头的 Mark Word 中的 Lock Word 指向 Monitor 的起始地址),同时 Monitor 中有一个 Owner 字段寄存领有该锁的线程的惟一标识,示意该锁被这个线程占用。

Monitor 是由 ObjectMonitor 实现的,源码是 C++ 来实现的。次要构造如下:

  ObjectMonitor() {
        _header       = NULL;
        _count        = 0;   // 记录个数
        _waiters      = 0,   // 期待线程数
        _recursions   = 0;  //  重入次数
        _object       = NULL;
        _owner        = NULL;  // 以后持有锁的线程
        _WaitSet      = NULL;  // 调用了 wait 办法的线程被阻塞 放在这里
        _WaitSetLock  = 0 ;    // 爱护期待队列,简略的自旋
        _Responsible  = NULL ;
        _succ         = NULL ;
        _cxq          = NULL ;
        FreeNext      = NULL ;
        _EntryList    = NULL ; // 期待锁 处于 block 的线程 有资格成为候选资源的线程
        _SpinFreq     = 0 ;
        _SpinClock    = 0 ;
        OwnerIsThread = 0 ;
      }

咱们晓得 synchronized 是重量级锁,效率很低。不过在 JDK 1.6 中对 synchronized 的实现进行了各种优化,使得它显得不是那么重了。

锁优化

JDK1.6 对锁的实现引入了大量的优化,如 自旋锁 适应性自旋锁 锁打消 锁粗化 偏差锁 轻量级锁 等技术来缩小锁操作的开销。

锁次要存在四中状态,顺次是:无锁状态 偏差锁状态 轻量级锁状态 重量级锁状态 。他们会随着竞争的强烈而逐步降级。留神 锁能够降级不可降级,这种策略是为了进步取得锁和开释锁的效率。

适应自旋锁

所谓自适应就意味着自旋的次数不再是固定的,它是由前一次在同一个锁上的自旋工夫及锁的拥有者的状态来决定。

线程如果自旋胜利了,那么下次自旋的次数会更加多,因为虚拟机认为既然上次胜利了,那么此次自旋也很有可能会再次胜利,那么它就会容许自旋期待继续的次数更多。反之,如果对于某个锁,很少有自旋可能胜利的,那么在当前要锁的时候自旋的次数会缩小甚至省略掉自旋过程,免得节约处理器资源。

锁打消

锁打消是产生在编译器级别的一种锁优化形式。

有时候咱们写的代码齐全不须要加锁,却执行了加锁操作。

比方 StringBuffer 的 append()办法,Vector 的 add()办法。

如果 JVM 显著检测到没有产生办法逃逸,就会将外部的锁打消。

锁粗化

通常状况下,为了保障多线程间的无效并发,会要求每个线程持有锁的工夫尽可能短,然而在某些状况下,一个程序对同一个锁不间断、高频地申请、同步与开释,会消耗掉肯定的系统资源,因为锁的申请、同步与开释自身会带来性能损耗,这样高频的锁申请就反而不利于零碎性能的优化了,尽管单次同步操作的工夫可能很短。锁粗化就是通知咱们任何事件都有个度,有些状况下咱们反而心愿把很屡次锁的申请合并成一个申请,以升高短时间内大量锁申请、同步、开释带来的性能损耗。

public void doSomethingMethod(){synchronized(lock){//do some thing}
    synchronized(lock){//do other thing}
}

偏差锁

如果应用锁的线程都只有一个,那么,保护轻量级锁都是节约的。偏差锁的指标是,缩小无竞争且只有一个线程应用锁的状况下,应用轻量级锁产生的性能耗费。轻量级锁每次申请、开释锁都至多须要一次 CAS,但偏差锁只有初始化时须要一次 CAS。

“偏差”的意思是,偏差锁假设未来只有第一个申请锁的线程会应用锁(不会有任何线程再来申请锁),因而,只须要在 Mark Word 中 CAS 记录 owner,如果记录胜利,则偏差锁获取胜利,记录锁状态为偏差锁,当前以后线程等于 owner 就能够零老本的间接取得锁;否则,阐明有其余线程竞争,收缩为 轻量级锁

偏差锁无奈应用自旋锁优化,因为一旦有其余线程申请锁,就毁坏了偏差锁的假设。

轻量级锁

轻量级锁的指标是,缩小无理论竞争状况下,应用重量级锁产生的性能耗费,包含零碎调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。

顾名思义,轻量级锁是绝对于重量级锁而言的。应用轻量级锁时,不须要申请互斥量,仅仅将 Mark Word 中的局部字节 CAS 更新指向线程栈中的 Lock Record,如果更新胜利,则轻量级锁获取胜利,记录锁状态为轻量级锁;否则,阐明曾经有线程取得了轻量级锁,目前产生了锁竞争(不适宜持续应用轻量级锁),接下来收缩为 重量级锁

当然,因为轻量级锁人造瞄准不存在锁竞争的场景,如果存在锁竞争但不强烈,依然能够用自旋锁优化,自旋失败后再收缩为重量级锁。

重量级锁

内置锁在 Java 中被形象为监视器锁(monitor)。在 JDK 1.6 之前,监视器锁能够认为间接对应底层操作系统中的互斥量(mutex)。这种同步形式的老本十分高,包含零碎调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。因而,起初称这种锁为“重量级锁”。

synchronized 锁的降级过程

锁降级是单向的: 无锁 -> 偏差锁 -> 轻量级锁 -> 重量级锁


图片援用 blog.dreamtobe.cn

原子类

什么是原子类,有什么用

  • 首先原子,就是不可分割。一个操作是不可中断的,即便在多线程环境下也能够保障。
  • java.util.concurrent.atomic 包。
  • 原子类的作用和锁类似,是为了保障并发状况下 线程平安。不过原子类相比于锁,有肯定的劣势。
  • 粒度更细:原子变量能够把竞争范畴放大到变量级别,这是咱们能够取得到最细粒度的状况,通常锁的粒度都要大于变量的粒度。
  • 效率更高:通常,应用原子类的效率比应用锁要高。

六类原子类纵览

Atomic 根本类型原子类

以 AtomicInteger 为例,罕用办法:

  • int get():获取以后值
  • int getAndSet(int i):获取以后值,并设置新值
  • int getAndIncrement():获取以后的值,并自增
  • int getAndDecrement():获取以后值,并自减
  • int getAndAdd(int delta):获取以后值,并在以后值上减少预期值
  • boolean compareAndSet(int expect,int update):如果输出的数值等于预期值,则以原子的形式将该值设置为输出值(update)。

代码演示:

public class AtomicIntegerTest implements Runnable {private static AtomicInteger atomicInteger = new AtomicInteger();
    private static int i = 0;

    public static void main(String[] args) throws InterruptedException {AtomicIntegerTest test = new AtomicIntegerTest();
        Thread thread = new Thread(test);
        Thread thread1 = new Thread(test);
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        System.out.println("原子类后果为:" + atomicInteger.get());
        System.out.println("一般 int 后果为:" + i);
    }

    @Override
    public void run() {for (int j = 0; j < 10000; j++) {atomicInteger.getAndIncrement();
            i++;
        }
    }
}

运行后果:

原子类后果为:20000
一般 int 后果为:18647

Atomic 数组原子类

间接代码演示:

public class AtomicArrTest {public static AtomicIntegerArray integerArray = new AtomicIntegerArray(1000);

    public static void main(String[] args) throws InterruptedException {

        // 自减
        Runnable runnable1 = () -> {for (int i = 0; i < integerArray.length(); i++) {integerArray.getAndDecrement(i);
            }
        };

        // 自加
        Runnable runnable2 = () -> {for (int i = 0; i < integerArray.length(); i++) {integerArray.getAndIncrement(i);
            }
        };

        Thread[] threads1 = new Thread[100];
        Thread[] threads2 = new Thread[100];
        for (int i = 0; i < 100; i++) {threads1[i] = new Thread(runnable1);
            threads2[i] = new Thread(runnable2);
            threads1[i].start();
            threads2[i].start();}

        // 期待线程运行完结
        for (int i = 0; i < 100; i++) {threads1[i].join();
            threads2[i].join();}

        for (int i = 0; i < integerArray.length(); i++) {if (integerArray.get(i) != 0) {System.out.println("原子类型不平安!产生不等于 0 的谬误" + i);
            }
        }
        System.out.println("运行完结");
    }

}

运行后果:

运行完结

能够发现后果并没有一加一减或者一减一加不等于 0 的谬误。

Atomic Reference 援用类型原子类

AtomicReference 和 AtomicInteger 十分相似,不同之处就在于 AtomicInteger 是对整数的封装,而 AtomicReference 则对应一般的对象援用。也就是它能够保障你在批改对象援用时的线程安全性。

AtomicReference 是作用是对”对象”进行原子操作。提供了一种读和写都是原子性的对象援用变量。

代码演示:

public class AtomicReferenceTest {public static void main(String[] args) throws InterruptedException {AtomicReference<Integer> ref = new AtomicReference<>(new Integer(1000));
        Runnable runnable = () -> {for (; ;) {Integer num = ref.get();
                if (ref.compareAndSet(num, num + 1)) {//cas
                    break;
                }
            }
        };
        List<Thread> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {Thread t = new Thread(runnable, "Thread-" + i);
            list.add(t);
            t.start();}
        for (Thread t : list) {t.join();
        }
        System.out.println(ref.get()); // 输入后果:2000
    }

}

把一般变量降级为具备原子性能

能够应用 AtomicIntegerFieldUpdater 对一般变量进行 降级

那为什么不间接在一开始就进行申明为原子变量呢?

因为在有的时候,比方咱们只有在某一时刻须要原子操作,存在大量并发的状况。而在大部分时候都没有并发问题的话,就没有必要始终都进行原子操作。

代码演示

public class AtomicIntegerFieldUpdaterTest implements Runnable {private static Candidate tom = new Candidate();
    private static Candidate peter = new Candidate();
    private static AtomicIntegerFieldUpdater<Candidate> candidateUpdater;

    public static class Candidate {volatile int score;}

    public static void main(String[] args) throws InterruptedException {candidateUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
        AtomicIntegerFieldUpdaterTest test = new AtomicIntegerFieldUpdaterTest();
        Thread thread1 = new Thread(test);
        Thread thread2 = new Thread(test);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("一般变量:" + tom.score);
        System.out.println("原子变量:" + peter.score);
    }

    @Override
    public void run() {for (int i = 0; i < 10000; i++) {
            tom.score++;
            candidateUpdater.getAndIncrement(peter);
        }
    }
}

留神点

AtomicIntegerFieldUpdater 不反对 static,以及修饰符不可见范畴。

Adder 累加器

Adder 是 Java 8 中引入的一个类。

高并发下 LongAdder 比 AtomicLong效率高 ,不过实质还是 空间换工夫

竞争强烈的时候,LongAdder 把不同线程对应到不同的 Cell 上进行批改,升高了抵触概率,是 多段锁 的理念,进步了并发效率。

代码演示 AtomicLong 耗时

public class AtomicLongTest {public static void main(String[] args) {AtomicLong counter = new AtomicLong(0);
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) { }
        long end = System.currentTimeMillis();
        System.out.println(counter.get());
        System.out.println("AtomicLong 耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private AtomicLong counter;

        public Task(AtomicLong counter) {this.counter = counter;}

        @Override
        public void run() {for (int i = 0; i < 10000; i++) {counter.incrementAndGet();
            }
        }
    }

}

运行后果:

100000000
AtomicLong 耗时:1624

代码演示 AtomicLong 耗时

public class LongAdderTest {public static void main(String[] args) {LongAdder counter = new LongAdder();
        ExecutorService service = Executors.newFixedThreadPool(20);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {service.submit(new Task(counter));
        }
        service.shutdown();
        while (!service.isTerminated()) { }
        long end = System.currentTimeMillis();
        System.out.println(counter.sum());
        System.out.println("LongAdder 耗时:" + (end - start));
    }

    private static class Task implements Runnable {

        private LongAdder counter;

        public Task(LongAdder counter) {this.counter = counter;}

        @Override
        public void run() {for (int i = 0; i < 10000; i++) {counter.increment();
            }
        }
    }

}

运行后果:

100000000
LongAdder 耗时:464

能够看到差距十分大,咱们看一下为什么 AtomicLong 在并发下执行工夫这么长。

AtomicLong 的弊病

因为每一次加法,都要进行 flush 和 refresh 导致消耗资源。

在线程 1 进行了批改操作后,就要立刻刷新到主存,而后其余线程再去进行更新。

LongAdder 的改良

LongAdder 的实现原理是,在 每个线程外部都有一个本人的计数器,仅在本人外部计数,这样就不会被其余线程的计数器烦扰。

如图示,第一个线程计数器的值也就是 ctr‘为 1 的时候,可能线程 2 的 str‘’曾经是 3 了,它们之间并不存在竞争关系,所以在 加和的过程中,不须要同步,也不须要 flush 和 refresh。

LongAdder 引入了分段累加的概念,外部有一个 base 变量和一个 Cell[]数组独特参加计数:

base 变量:竞争不强烈,间接累加到变量上。

Cell[] 数组:竞争强烈,各个线程扩散累加到本人的槽 Cell[i] 中。

sum 办法源码

 public long sum() {Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {// 如果没有用到 cell 间接返回
            for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
                    sum += a.value;// 逐渐累加
            }
        }
        return sum;
}

总结

  1. 在低争用的状况下,两者差距不大,然而在竞争强烈的状况下,LongAdder 吞吐量要高,然而要耗费更多的空间。
  2. LongAdder 适宜的场景是统计求和的场景,而且 LongAdder 只提供了 add 办法,而 AtomicLong 还具备 CAS 办法。

Accumulator 累加器

Accumulator 和 Adder 相似,就是一个更通用版本的 Adder。

代码演示

public class LongAccumulatorTest {public static void main(String[] args) {LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 100);
        ExecutorService executor = Executors.newFixedThreadPool(8);
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        executor.shutdown();
        while (!executor.isTerminated()) { }
        System.out.println(accumulator.getThenReset());
    }
}

运行后果:

145

CAS

CAS 是 compare and swap 的缩写,也就是咱们所说的比拟并替换。cas 是一种基于锁的操作,而且是乐观锁。

举例就是我认为 V 的值应该是张三,如果是的话我就把它改为李四,如果不是张三,就阐明被人就改过了,那我就不批改了。

CAS 中有三个操作数:内存值 V,预期值 A,要批改的值 B,当V == A 时,则批改为 B 。否则什么都不做,返回当初的 V 值。

CAS 源码解析

例如 AtomicInteger 原子类加载了 Unsafe 工具,用来 间接操作内存数据

用 volatile 润饰 value 字段,保障可见性。

就以 getAndAdd 办法举例,咱们看下源码:

public final int getAndAdd(int delta) {return unsafe.getAndAddInt(this, valueOffset, delta);
    }

底层调用了 unsafe 类的办法:

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

外面应用了 do while 循环,如果 a,b 线程同时执行这个办法,a 线程拿到值 1 后 cpu 执行工夫到了挂起,b 开始执行,也拿到 1,然而没有挂起,接着将值变成了 2。

这个时候 a 线程复原执行,去比拟的时候发现手上的 1 和内存外面的值 2 不等,这个时候就要进行下一个循环。

compareAndSwapInt 是 native 办法,Unsafe 类提供了 硬件级别的原子操作,而咱们传入的 valueOffset 就是依据内存偏移地址获取数据原值,这样就能够通过 unsafe 来实现 CAS。

总结

  • CAS 存在 ABA 问题。

ABA 问题就是在主内存中本来是 A 起初有另外一个线程批改为了 B 后又改回了 A,第一个线程回来看后还是 A 认为没有变动,实际上曾经有了变动。

如何解决 ABA 问题?

AtomicStampedReference 减少版本号,进行版本号判断。

  • 自旋工夫可能过长。

并发容器

并发容器概览

  • ConcurrentHashMap:线程平安的 HashMap
  • CopyOnWriteArrayList:线程平安的 List
  • BlockingQueue:这是一个接口,示意阻塞队列
  • ConcurrentLinkedQueue:高效的非阻塞并发队列,应用链表实现,是一个线程平安的 LinkedList
  • ConcurrentSkipListMap:是一个 Map,应用跳表的数据结构进行疾速查找

古老的同步容器

Vector 和 HashTable

并发性能较差,要害办法都是应用 synchronized 润饰的办法级别。

public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {throw new NullPointerException();
        }

        // Makes sure the key is not already in the hashtable.
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        @SuppressWarnings("unchecked")
        Entry<K,V> entry = (Entry<K,V>)tab[index];
        for(; entry != null ; entry = entry.next) {if ((entry.hash == hash) && entry.key.equals(key)) {
                V old = entry.value;
                entry.value = value;
                return old;
            }
        }

        addEntry(hash, key, value, index);
        return null;
    }

HashMap 和 ArrayList

尽管这两个类不是线程平安的,然而咱们能够应用 Collections.synchronizedList()和 Collections.synchronizedMap()使其变为线程平安的。

关上源码能够看到是应用的 同步代码块 的形式:

ConcurrentHashMap

Map 家族概览:

HashMap 对于并发的特点

  1. 非线程平安
  2. 迭代时不容许批改
  3. 只读的并发是平安的
  4. 如果要用在并发的话,应用 Collections.synchronizedMap(new HashMap())

Java1.7 中 ConcurrentHashMap 构造

java 1.7 中 ConcurrentHashMap 最外层是多个 segment 每个 segment 的底层数据结构和 HashMap 相似,依然是数组和链表组成的 拉链法

每个 segment 中蕴含 独立的 ReentrantLock 锁,每个 segment 之间互不影响,进步了并发效率。

ConcurrentHashMap 默认有 16 个 segment,所以最多反对 16 个线程同时并发写入。这个值能够在初始化时填入,一旦初始化后,是不能扩容的。

Java8 中 ConcurrentHashMap 构造

put 办法解析

public V put(K key, V value) {return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();
    // 失去 hash 值
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果数组为空,进行数组初始化
        if (tab == null || (n = tab.length) == 0)
            // 初始化数组
            tab = initTable();

        // 找该 hash 值对应的数组下标,失去第一个节点 f
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果数组该地位为空,// 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就完结了
            // 如果 CAS 失败,那就是有并发操作,进入下一次循环
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;
        }
        // 阐明正在扩容
        else if ((fh = f.hash) == MOVED)
            // 数据迁徙
            tab = helpTransfer(tab, f);

        else { // f 是该地位的头结点,而且不为空

            V oldVal = null;
            // 获取数组该地位的头结点的监视器锁
            synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) { // 头结点的 hash 值大于 0,阐明是链表
                        // 用于累加,记录链表的长度
                        binCount = 1;
                        // 遍历链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果发现了 "相等" 的 key,判断是否要进行值笼罩,而后也就能够 break 了
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 到了链表的最末端,将这个新值放到链表的最初面
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 红黑树
                        Node<K,V> p;
                        binCount = 2;
                        // 调用红黑树的插值办法插入新节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }

            if (binCount != 0) {
                // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 这个办法和 HashMap 中略微有一点点不同,那就是它不是肯定会进行红黑树转换,// 如果以后数组的长度小于 64,那么会抉择进行数组扩容,而不是转换为红黑树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }

    addCount(1L, binCount);
    return null;
}

get 办法剖析

  1. 计算 hash 值
  2. 依据 hash 值找到数组对应地位: (n – 1) & h
  3. 依据该地位处结点性质进行相应查找

如果该地位为 null,那么间接返回 null

如果该地位处的节点刚好就是咱们须要的,返回该节点的值即可

如果该地位节点的 hash 值小于 0,阐明正在扩容,或者是红黑树,而后通过 find 办法去寻找

如果以上 3 条都不满足,那就是链表,进行遍历比对

CopyOnWriteArrayList

Vector 和 SynchronizedList 的锁粒度太大,并发效率较低 ,并且 迭代时无奈编辑

另外 CopyOnWriteSet 是用来代替同步 Set。

实用场景

读多写少

读操作能够尽可能的快,而写即便慢一些也没关系。

在很多利用场景中,读操作可能会远远多于写操作。比方,有些零碎级别的信息,往往只须要加载或者批改很少的次数,然而会被零碎内所有模块频繁的拜访。对于这种场景,咱们最心愿看到的就是读操作能够尽可能的快,而写即便慢一些也没关系。

读写规定

之前的读写锁:读读共享、写写互斥、读写互斥。

读写锁规定的降级:读取是齐全不须要加锁的 ,并且更强的是, 写入也不会阻塞读取操作 ,只有 写入和写入 之间须要同步期待。

代码演示

首先咱们看一下应用 ArrayList 带来的批改问题。

对 Vector、ArrayList 在迭代的时候如果同时对其进行批改就会抛出 java.util.ConcurrentModificationException 异样

public class Test {public static void main(String[] args) {List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {list.remove("5");
            }
            if (next.equals("4")) {list.add("new");
            }
        }
    }

}

运行后果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
Exception in thread "main" java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
    at java.util.ArrayList$Itr.next(ArrayList.java:859)
    at test.Test.main(Test.java:25)

咱们看一下源码:

final void checkForComodification() {if (modCount != expectedModCount)
                throw new ConcurrentModificationException();}

在创立迭代器的时候会把对象的 modCount 的值传递给迭代器的 expectedModCount。

每次 next 的时候判断是否统一如果不统一则抛出异样。

应用 CopyOnWriteArrayList

public class Test {public static void main(String[] args) {List<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Iterator<String> iterator = list.iterator();
        while (iterator.hasNext()) {String next = iterator.next();
            System.out.println(list);
            if (next.equals("3")) {list.remove("5");
            }
            if (next.equals("4")) {list.add("new");
            }
        }
    }

}

运行后果:

[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4]
[1, 2, 3, 4, new]

源码解析

先看一下 add 办法

public boolean add(E e) {
        //1. 取得独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//2. 取得 Object[]数组
            Object[] elements = getArray();
            //3. 取得 elements 的长度
            int len = elements.length;
            //4. 复制到新的数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //5. 将 add 的元素增加到新元素
            newElements[len] = e;
            //6. 替换之前的数据
            setArray(newElements);
            return true;
        } finally {
            //7. 开释独占锁
            lock.unlock();}
}

CopyOnWriteArrayList 应用了 ReentrantLock 独占锁,保障同时只有一个线程对汇合进行批改操作。

数据是存储在 CopyOnWriteArrayList 中的 array 数组中的。

在增加元素的时候,并不是间接往 array 外面 add 元素,而是复制进去了一个新的数组,并且复制进去的数组的长度是【旧数组的长度 +1】,再把旧的数组替换成新的数组。

get 办法

public E get(int index) {return get(getArray(), index);
}

get 办法没有加锁,很简略,间接获取元素。然而不保证数据是最新的,也就是 弱一致性

set 办法

public E set(int index, E element) {
        // 取得独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 取得 Object 数组
            Object[] elements = getArray();
            // 依据下标, 取得旧的元素
            E oldValue = get(elements, index);
            // 如果旧的元素不等于新的元素
            if (oldValue != element) {
                // 取得旧数组的长度
                int len = elements.length;
                // 复制出新的数组
                Object[] newElements = Arrays.copyOf(elements, len);
                // 批改
                newElements[index] = element;
                // 替换
                setArray(newElements);
            } else {
                // 为了保障 volatile 语义,即便没有批改,也要替换成新的数组
                setArray(elements);
            }
            return oldValue;
        } finally {
            // 开释独占锁
            lock.unlock();}
    }

还是应用 lock 加锁,而后复制一个 arr 正本进行批改,之后笼罩。

remove 办法

public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {lock.unlock();
        }
    }

能够看到,remove 办法和 add,set 办法是一样的,第一步还是先获取独占锁,来保障线程安全性,如果要删除的元素是最初一个,则复制出一个长度为【旧数组的长度 -1】的新数组,随之替换,这样就奇妙的把最初一个元素给删除了,如果要删除的元素不是最初一个,则分两次复制,随之替换。

CopyOnWrite 的毛病

CopyOnWrite 容器有很多长处,然而同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候须要留神一下。

内存占用问题:因为 CopyOnWrite 的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象。

数据一致性问题:CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。

阻塞队列简介

什么是阻塞队列?

阻塞队列(BlockingQueue)是一个反对两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会期待队列变为非空。当队列满时,存储元素的线程会期待队列可用。阻塞队列罕用于生产者和消费者的场景,生产者是往队列里增加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者寄存元素的容器,而消费者也只从容器里拿元素。

罕用的队列次要有以下两种:

先进先出(FIFO):先插入队列的元素也最先出队列,相似于排队的性能。

后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先解决最近产生的事件。

外围办法

办法类型 抛出异样 非凡值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
查看 element() peek() 不可用 不可用

ArrayBlockingQueue

ArrayBlockingQueue 是一个阻塞式的队列,继承自 AbstractBlockingQueue, 间接的实现了 Queue 接口和 Collection 接口。底层以数组的模式保留数据(实际上可看作一个循环数组)。并且是一个基于数组的阻塞队列。

ArrayBlockingQueue 是一个有界队列,有界也就意味着,它不可能存储有限多数量的对象。所以在创立 ArrayBlockingQueue 时,必须要给它指定一个队列的大小。

并且还能够指定是否偏心,如果保障偏心的话,那么期待了最长工夫的线程会被优先解决,不过会带来性能损耗。

代码示例

有 10 个面试者,一共只有一个面试官,大厅里有 3 个地位,每个面试工夫是 10 秒,模仿面试场景。

public class ArrayBlockingQueueTest {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        Interviewer r1 = new Interviewer(queue);
        Consumer r2 = new Consumer(queue);
        new Thread(r1).start();
        new Thread(r2).start();}
}

class Interviewer implements Runnable {

    BlockingQueue<String> queue;

    public Interviewer(BlockingQueue queue) {this.queue = queue;}

    @Override
    public void run() {System.out.println("10 个候选人都来啦");
        for (int i = 0; i < 10; i++) {
            String candidate = "Candidate" + i;
            try {queue.put(candidate);
                System.out.println("安顿好了" + candidate);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        try {queue.put("stop");
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {this.queue = queue;}

    @Override
    public void run() {
        try {Thread.sleep(1000);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        String msg;
        try {while (!(msg = queue.take()).equals("stop")) {System.out.println(msg + "到了");
            }
            System.out.println("所有候选人都完结了");
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

运行后果:

10 个候选人都来啦
安顿好了 Candidate0
安顿好了 Candidate1
安顿好了 Candidate2
Candidate0 到了
安顿好了 Candidate3
Candidate1 到了
Candidate2 到了
安顿好了 Candidate4
Candidate3 到了
Candidate4 到了
Candidate5 到了
安顿好了 Candidate5
安顿好了 Candidate6
Candidate6 到了
安顿好了 Candidate7
Candidate7 到了
安顿好了 Candidate8
Candidate8 到了
安顿好了 Candidate9
Candidate9 到了
所有候选人都完结了

源码解析

ArrayBlockingQueue 进队操作采纳了加锁的形式保障并发平安。

public void put(E e) throws InterruptedException {
    // 非空判断
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lockInterruptibly();
    try {while (count == items.length) {
            // 始终阻塞,晓得队列非满时,被唤醒
            notFull.await();}
        // 进队
        enqueue(e);
    } finally {lock.unlock();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue 不同于 ArrayBlockingQueue,它如果不指定容量,默认为 Integer.MAX_VALUE,也就是无界队列。所以为了防止队列过大造成机器负载或者内存爆满的状况呈现,咱们在应用的时候倡议手动传入一个队列的大小。

源码剖析

/**
 * 节点类,用于存储数据
 */
static class Node<E> {
    E item;
    Node<E> next;

    Node(E x) {item = x;}
}

/** 阻塞队列的大小,默认为 Integer.MAX_VALUE */
private final int capacity;

/** 以后阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 阻塞队列的头结点
 */
transient Node<E> head;

/**
 * 阻塞队列的尾节点
 */
private transient Node<E> last;

/** 获取并移除元素时应用的锁,如 take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** notEmpty 条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();

/** 增加元素时应用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** notFull 条件对象,当队列数据已满时用于挂起执行增加的线程 */
private final Condition notFull = putLock.newCondition();

从下面的属性咱们晓得,每个增加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,增加的链表队列中,其中 head 和 last 别离指向队列的头结点和尾结点。与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 外部别离应用了 takeLock 和 putLock 对并发进行管制,也就是说,增加和删除操作并不是互斥操作,能够同时进行,这样也就能够大大提高吞吐量。

put 办法

public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取锁
    putLock.lockInterruptibly();
    try {
        // 判断队列是否已满,如果已满阻塞期待
        while (count.get() == capacity) {notFull.await();
        }
        // 把 node 放入队列中
        enqueue(node);
        c = count.getAndIncrement();
        // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行增加操作
        if (c + 1 < capacity)
            notFull.signal();} finally {putLock.unlock();
    }
    // 如果队列中有一条数据,唤醒生产线程进行生产
    if (c == 0)
        signalNotEmpty();}
  • 队列已满,阻塞期待。
  • 队列未满,创立一个 node 节点放入队列中,如果放完当前队列还有残余空间,持续唤醒下一个增加线程进行增加。如果放之前队列中没有元素,放完当前要唤醒生产线程进行生产。

PriorityBlockingQueue

PriorityBlockingQueue 是一个反对优先级的无界阻塞队列,直到系统资源耗尽。默认状况下元素采纳天然程序升序排列。也能够自定义类实现 compareTo()办法来指定元素排序规定,或者初始化 PriorityBlockingQueue 时,指定结构参数 Comparator 来对元素进行排序。但须要留神的是不能保障同优先级元素的程序。PriorityBlockingQueue 也是基于 最小二叉堆 实现,应用基于 CAS 实现的自旋锁来管制队列的动静扩容,保障了 扩容操作不会阻塞 take 操作 的执行。

SynchronousQueue

SynchronousQueue 是一个外部只能蕴含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且以后不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

SynchronousQueue 没有 peek 等函数,因为 peek 的含意是取出头结点,然而 SynchronousQueue 容量是 0,所以没有头结点。

SynchronousQueue 是线程池 Executors.newCachedThreadPool()应用的阻塞队列。

DelayQueue

DelayQueue 是一个没有边界 BlockingQueue 实现,退出其中的元素必须实现 Delayed 接口。当生产者线程调用 put 之类的办法退出元素时,会触发 Delayed 接口中的 compareTo 办法进行排序,也就是说队列中元素的程序是按到期工夫排序的,而非它们进入队列的程序。排在队列头部的元素是最早到期的,越往后到期工夫赿晚。底层基于后面说过的 PriorityBlockingQueue 实现的。

ConcurrentLikedQueue

是一个实用于高并发场景下的队列,通过无锁的形式,底层应用 CAS,实现了高并发状态下的高性能,通常 ConcurrentLikedQueue 性能好于 BlockingQueue。

它是一个基于连贯节点的无界限程平安队列。该队列的元素遵循先进先出的准则。头是最先退出的,尾是最近退出的,该队列不容许 null 元素。

管制并发流程

什么是管制并发流程

管制并发流程的工具类,作用就是帮忙咱们更容易的让线程之间单干,相互配合,来满足业务逻辑。

比方线程 A 期待线程 B 执行实现后再执行某段代码。

罕用的管制并发流程的工具类

作用 阐明
Semaphore 信号量,能够通过管制 ” 许可证 ” 的数量,来保障线程之间的配合 线程只有在拿到 ” 许可证 ” 后能力持续运行,更加灵便
CyclicBarrier 线程会期待,直到足够多线程达到了当时规定的数目,一旦达到触发条件,就能够进行下一步操作 是用于线程间互相期待处理结果就绪的状况
Phaser 和 CyclicBarrier 相似,但计数可变 java1.7 中退出
CountDownLatch 和 CyclicBarrier 相似,数量递加到 0 时候触发 不能够重复使用
Exchanger 让两个对象在适合时候替换对象 实用于在两个线程工作同一个类的不同实例时,替换数据
Condition 能够控制线程的期待和唤醒 是 Object.wait()升级版

CountDownLatch

什么是 CountDownLatch

CountDownLatch 这个类使一个线程期待其余线程各自执行结束后再执行。
是通过一个计数器来实现的,传入须要倒数的值。每当一个线程执行结束后,计数器的值就减 1,当计数器的值为 0 时,示意所有线程都执行结束,而后在闭锁上期待的线程就能够复原工作了。

次要办法介绍

  • CountDownLatch(int count):仅有一个 构造函数,参数为 count 须要倒数的值。
  • await():调用 await()办法的线程会被挂起,他会期待直到 count 为 0 才会继续执行。
  • countDown():将 count 值减 1. 直到为 0 时,其余期待的线程就会被唤醒。

用法一:一个线程期待多个线程都执行结束,再持续本人的工作

public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "曾经上车!");
            countDownLatch.countDown();};
        for (int i = 0; i < 5; i++) {executorService.execute(r);
        }
        System.out.println("期待大家上车");
        countDownLatch.await();
        System.out.println("5 集体都曾经上车,能够登程咯");
        executorService.shutdown();}
}

运行后果:

期待大家上车
pool-1-thread-2 曾经上车!pool-1-thread-1 曾经上车!pool-1-thread-3 曾经上车!pool-1-thread-4 曾经上车!pool-1-thread-5 曾经上车!5 集体都曾经上车,能够登程咯

用处二:多个线程同时期待完结后一起工作

public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Runnable r = () -> {
            try {System.out.println(Thread.currentThread().getName() + "曾经就绪!");
                countDownLatch.await();
                System.out.println(Thread.currentThread().getName() + "开始跑步!");
            } catch (InterruptedException e) {e.printStackTrace();
            }
        };
        for (int i = 0; i < 5; i++) {executorService.execute(r);
        }
        TimeUnit.SECONDS.sleep(1);
        System.out.println("信号枪!biu!");
        countDownLatch.countDown();
        executorService.shutdown();}
}

运行后果:

pool-1-thread-1 曾经就绪!pool-1-thread-2 曾经就绪!pool-1-thread-3 曾经就绪!pool-1-thread-4 曾经就绪!pool-1-thread-5 曾经就绪!信号枪!biu!pool-1-thread-1 开始跑步!pool-1-thread-2 开始跑步!pool-1-thread-5 开始跑步!pool-1-thread-4 开始跑步!pool-1-thread-3 开始跑步!

留神点

CountDownLatch 是 不可能重用 的,如果须要从新计数能够应用 CyclicBarrier,或者创立新的 CountDownLatch 实例。

Semaphore 信号量

Semaphore 能够用来 限度 和治理数量 优先资源 的应用状况。

信号量的作用是保护一个 许可证 的计数,线程能够获取许可证,那信号量残余的许可证就减一,线程也能够开释一个许可证,那就会加一,当信号量所领有的许可证为 0 的时候,则须要期待,直到有线程开释了许可证。

次要办法介绍

  • new Semaphore(int permits,boolean fair):第一个参数为许可证数量,第二个是否为偏心策略,即期待的线程放到 FIFO 队列中。
  • acquire():获取一个许可证,如果没有则期待,容许被中断。
  • acquireUninterruptibly():取一个许可证,如果没有则期待,不容许被中断。
  • tryAcquire():看看目前有没有闲暇的许可证,有就获取,无则干别的事,不阻塞。
  • tryAcquire(long timeout):如果在 timeout 时间段内拿不到,就做别的事。
  • release():偿还许可证。

代码演示:每次只有三个人的做工作

public class SemaphoreTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);
        Semaphore semaphore = new Semaphore(3, true);
        Runnable r = () -> {
            try {semaphore.acquire(); //acquire 外面能够传入数值,比方传入 3 也就是一下能够拿三个许可,同时开释时候也要传入对应的值
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("拿到许可证!开始做工作!");
            try {TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("工作完结!开释许可证!");
            semaphore.release();};
        for (int i = 0; i < 1000; i++) {executorService.submit(r);
        }
        executorService.shutdown();}

}

Condition

Condition 作用

当线程 1 须要期待某个条件的时候,它就去执行 condition.await()办法,一旦执行了 await()办法,线程就进入 阻塞 状态。

而后假如线程 2 执行 condition.signal() 办法,这时 JVM 就会从被阻塞的线程中找到那些被 condition.await()中的线程,这样线程 1 就会受到可执行信号,状态就变成Runnable

signalAll()和 signal()的区别

signalAll()会唤起 所有 的正在期待的线程。

然而 signal()是 偏心 的,只会唤起等待时间最长的线程。

Condition 根本应用

public class ConditionTest {private static Lock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    public static void main(String[] args) {ConditionTest test = new ConditionTest();
        new Thread(() -> {
            try {TimeUnit.SECONDS.sleep(1);
                test.methodB();} catch (InterruptedException e) {e.printStackTrace();
            }
        }).start();
        test.methodA();}

    private void methodA() {lock.lock();
        try {System.out.println("开始阻塞");
            condition.await();
            System.out.println("我被唤醒了!");
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {lock.unlock();
        }
    }

    private void methodB() {lock.lock();
        try {condition.signal();
        } finally {lock.unlock();
        }
    }
}

运行后果:

开始阻塞
我被唤醒了!

应用 Condition 实现生产者消费者

public class ConditionTest {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {ConditionTest test = new ConditionTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        producer.start();
        consumer.start();}

    class Consumer extends Thread {

        @Override
        public void run() {consume();
        }

        private void consume() {while (true) {lock.lock();
                try {while (queue.size() == 0) {System.out.println("队列空,期待数据");
                        try {notEmpty.await();
                        } catch (InterruptedException e) {e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println("从队列里取走了一个数据,队列残余" + queue.size() + "个元素");
                } finally {lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {produce();
        }

        private void produce() {while (true) {lock.lock();
                try {while (queue.size() == queueSize) {System.out.println("队列满,期待有空余");
                        try {notFull.await();
                        } catch (InterruptedException e) {e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列残余空间" + (queueSize - queue.size()));
                } finally {lock.unlock();
                }
            }
        }
    }
}

Condition 留神点

实际上,Condition 就是用来代替 Object.wait/nofity 的,所以用法上和性质上简直一样。

await()办法会主动开释 Lock 锁,和 Object.wait 一样,不须要本人手动开释锁。

调用 await()的时候,必须持有锁,否则会抛出异样。

CyclicBarrier 循环栅栏

CyclicBarrier 和 CountDownLatch 很像,都能阻塞一组线程。

当有大量的线程相互配合,别离计算不同工作,最初对立汇总时候,咱们能够应用 CyclicBarrier,CyclicBarrier 能够结构一个集结点,当某一个线程结束后,就会达到集结点期待,等所有线程都到了之后,栅栏就会被撤销,而后所有线程对立登程,继续执行剩下的工作。

代码演示

public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("所有人都到场了,大家对立登程!"));
        for (int i = 0; i < 10; i++) {new Thread(new Task(i, cyclicBarrier)).start();}
    }

    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {System.out.println("线程" + id + "当初返回集合地点");
            try {Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到了集合地点,开始期待其他人达到");
                cyclicBarrier.await();
                System.out.println("线程" + id + "登程了");
            } catch (InterruptedException e) {e.printStackTrace();
            } catch (BrokenBarrierException e) {e.printStackTrace();
            }
        }
    }
}

运行后果:

线程 0 当初返回集合地点
线程 1 当初返回集合地点
线程 2 当初返回集合地点
线程 3 当初返回集合地点
线程 4 当初返回集合地点
线程 5 当初返回集合地点
线程 6 当初返回集合地点
线程 7 当初返回集合地点
线程 8 当初返回集合地点
线程 9 当初返回集合地点
线程 9 到了集合地点,开始期待其他人达到
线程 8 到了集合地点,开始期待其他人达到
线程 6 到了集合地点,开始期待其他人达到
线程 5 到了集合地点,开始期待其他人达到
线程 0 到了集合地点,开始期待其他人达到
所有人都到场了,大家对立登程!线程 0 登程了
线程 9 登程了
线程 6 登程了
线程 8 登程了
线程 5 登程了
线程 1 到了集合地点,开始期待其他人达到
线程 4 到了集合地点,开始期待其他人达到
线程 2 到了集合地点,开始期待其他人达到
线程 3 到了集合地点,开始期待其他人达到
线程 7 到了集合地点,开始期待其他人达到
所有人都到场了,大家对立登程!线程 7 登程了
线程 1 登程了
线程 2 登程了
线程 3 登程了
线程 4 登程了

CyclicBarrier 和 CountDownLatch 的区别

作用不同:CyclicBarrier 要期待固定线程数量都到了栅栏地位能力继续执行;而 CountDownLatch 只须要期待数字为 0,也就是说 CountDownLatch 用于事件,然而 CyclicBarrier 用于线程。

可重用性不同:CountDownLatch 在达到 0 后关上门闩,就不能在应用了,除非用新的实例,而 CyclicBarrier 能够重复使用。

AQS

AQS 全名:AbstractQueuedSynchronizer,是并发容器 java.lang.concurrent 下 locks 包内的一个类。它实现了一个 FIFO 的队列。底层实现的数据结构是一个 双向链表

AQS 核心思想是,如果被申请的共享资源闲暇,则将以后申请资源的线程设置为无效的工作线程,并且将共享资源设置为锁定状态。如果被申请的共享资源被占用,那么就须要一套线程阻塞期待以及被唤醒时锁调配的机制,这个机制 AQS 是用 CLH 队列锁实现的,行将临时获取不到锁的线程退出到队列中。

AQS 外部保护了一个 CLH 队列来治理锁。线程会首先尝试获取锁,如果失败就将以后线程及期待状态等信息包装成一个 node 节点退出到同步队列 sync queue 里。接着会一直的循环尝试获取锁,条件是以后节点为 head 的间接后继才会尝试。如果失败就会阻塞本人直到本人被唤醒。而当持有锁的线程开释锁的时候,会唤醒队列中的后继线程。

CLH(Craig,Landin,and Hagersten)队列是一个虚构的双向队列(虚构的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条申请共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的调配。

AQS 外部外围局部

AQS 最外围的三大部分:

  • state
  • 控制线程抢锁和配合的 FIFO 队列
  • 冀望合作工具类去实现的 获取 / 开释 等重要办法

state 状态

/**
* The synchronization state.
*/
private volatile int state;

这个 state 具体含意,会依据具体实现类不同而不同,比方在 Semaphore 里,它示意 残余的许可证数量 ,而在 CountDownLatch 中,示意 还须要倒数的数量

state 是 volatile 润饰的,会被并发的批改,所以所有批改 state 的办法都须要 保障线程平安,比方 getState、setState 以及 compareAndSetState 操作来读取和更新这个状态。这些办法都依赖与 atomic 包的反对。

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

控制线程抢锁和配合的 FIFO 队列

这个队列用来寄存 期待的线程,AQS 就是排队管理器,当多个线程争用同一把锁时,必须有排队机制将没有拿到线程的锁串在一起,当锁开释的时候,管理器就会筛选一个适合的线程来占有开释的锁。

AQS 会保护一个期待的线程队列,把线程都放到队列中。

冀望合作工具类去实现的 获取 / 开释 等重要办法

这里的获取和开释办法,是利用 AQS 的写作工具类中最重要的办法,是由合作类本人去实现的,并且 含意各不相同

获取办法

  • 会依赖 state 变量,常常会阻塞
  • 在 Semaphore 中,获取就是 acquire 办法,作用就是获取一个许可证
  • 在 CountDownLatch 中,获取就是 await 办法,作用就是期待直到 0 完结

开释办法

  • 开释操作不会阻塞
  • 在 Semaphore 中,开释就是 release 办法,作用就是开释一个许可证
  • 在 CountDownLatch 中,获取就是 CountDown 办法,作用就是缩小一个数

并且子类还须要重写 tryAcquire 和 tryRelease 办法。

AQS 源码剖析

AQS 用法

第一步:写一个类,想好 合作的逻辑 ,实现 获取 / 开释 办法。

第二步:外部写一个Sync 类继承 AbstractQueuedSynchronizer

第三步:依据是否独占来 重写 tryAcquire/tryRelease 或 tryAcquireShared(int acquires)和 tryReleaseShared(int releases)等办法,在之前写的获取 / 开释办法中调用 AQS 的 acquire 或者 shared 办法。

AQS 在 CountDownLatch 中的利用

  • 外部类 Sync 继承了 AQS

首先咱们看一下构造方法

底层创立了一个 Sync 对象。

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
}

getCount 办法中也只是返回了 state 的值。

public long getCount() {return sync.getCount();
}

int getCount() {return getState();
}

await 办法解析:

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())// 判断以后线程是否中断
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)  //tryAcquireShared 次要判断以后状态是否 ==0,如果返回 1 能够间接放行,否则返回 -1 进入队列
            doAcquireSharedInterruptibly(arg);
    }
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 退出到 node 节点中,SHARED 示意共享模式
        boolean failed = true;
        try {for (;;) {final Node p = node.predecessor();
                if (p == head) {int r = tryAcquireShared(arg);
                    if (r >= 0) {setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  // 阻塞以后线程
                    throw new InterruptedException();}
        } finally {if (failed)
                cancelAcquire(node);
        }
}

countDown 办法解析:

public void countDown() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); // 如果返回 true,则会调用此办法唤醒所有期待中的线程。return true;
        }
        return false;
    }
 protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();
                if (c == 0) // == 0 阐明曾经开释
                    return false;
                int nextc = c-1; // 将 state - 1
                if (compareAndSetState(c, nextc)) // 通过 CAS 更新 state
                    return nextc == 0; // 如果 == 0 阐明门闸关上
            }
}

Future 和 Callable

根本用法

首先看一下 Runnable 的缺点

  • 没有返回值
  • 无奈抛出异样

Callable 接口

  • 相似于 Runnable,被其余线程执行的工作
  • 实现 call 办法
  • 有返回值
  • 能够抛出异样
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

Future 类

在并发编程中,咱们常常用到非阻塞的模型,在之前的多线程的三种实现中,不论是继承 Thread 类还是实现 Runnable 接口,都无奈保障获取到之前的执行后果。通过实现 Callable 接口,并用 Future 能够来接管多线程的执行后果。

Future 示意一个可能还没有实现的异步工作的后果,针对这个后果能够增加 Callable 以便在工作执行胜利或失败后作出相应的
操作。

Future 接口定义了次要的 5 个接口办法,有 RunnableFuture 和 SchedualFuture 继承这个接口,以及 CompleteFuture 和 ForkJoinTask 继承这个接口。

Callable 和 Future 的关系

  • 咱们能够用 Future.get()办法来获取 Callable 接口返回的执行后果,还能够通过 Future.isDone()来判断工作是否以及执行完了,以及勾销这个工作,限时获取工作的后果等。
  • 在 call()未执行结束之前,调用 get()的线程会被 阻塞 ,晓得 call() 办法返回了后果后,才会失去后果,而后线程切换至 Runnable 状态。

所以 Future 是一个 存储器,它存储了 call()这个工作的后果 ,而这个工作的执行工夫是无奈提前确定的,因为这齐全取决于 call() 办法执行的状况。

次要办法介绍

  • get():获取后果,get 办法的行为取决于 Callable 工作的状态,只有以下五种状况:
  1. 工作失常实现,get 办法立刻返回后果
  2. 工作没有实现,get 办法会阻塞到工作实现
  3. 工作执行中抛出异样,get 办法就会抛出 ExecutionException:这里抛出的异样是 call()执行时产生的异样,不管外面 call()抛出的是什么异样
  4. 工作被勾销,get 办法抛出 CancellationException
  5. 工作超时,get 办法能够传入超时工夫,如果工夫到了还没获取到后果,get 办法就会抛出 TimeoutException
  • get(long timeout,TimeUnit unit):有超时的获取
  • cancel():勾销工作的执行
  • isDone():判断线程是否执行结束
  • isCancelled():判断是否被勾销

根本应用

在阻塞一秒后获取到返回值。

public class FutureTest {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(10);
        Callable<Integer> callable = () -> {TimeUnit.SECONDS.sleep(1);
            return 10;
        };
        Future<Integer> future = service.submit(callable);
        try {System.out.println(future.get());
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {e.printStackTrace();
        }
        service.shutdown();}
}

运行后果:

10

异样捕捉演示

不论外面产生什么异样,咱们只能捕捉到 ExecutionException 异样。

public class FutureTest {public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());
        try {System.out.println(future.isDone()); // 并不关怀是否抛出异样
            future.get();} catch (InterruptedException e) {e.printStackTrace();
            System.out.println("InterruptedException 异样");
        } catch (ExecutionException e) {e.printStackTrace();
            System.out.println("ExecutionException 异样");
        }finally {service.shutdown();
        }
    }

    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {throw new IllegalArgumentException("Callable 抛出异样");
        }
    }
}

运行后果:

true
ExecutionException 异样
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable 抛出异样
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.concurrent.FutureTest.main(FutureTest.java:12)
Caused by: java.lang.IllegalArgumentException: Callable 抛出异样
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:27)
    at com.concurrent.FutureTest$CallableTask.call(FutureTest.java:24)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

cancel 办法:勾销工作的执行

  1. 如果这个工作还没开始执行,那么这种状况最简略,工作被失常的勾销,将来也不会被执行,办法返回 true。
  2. 如果工作曾经实现,或者曾经勾销,返回 false。
  3. 如果曾经开始了,那么不会勾销该工作,而是依据咱们填入的参数 MayInterruptIfRunning 做判断。如果传入 true 则收回中断信号,false 则不发送。

FutureTask

咱们也能够应用 FutureTask 来获取 Future 的工作后果,FutureTask 能够把 Callable 转化成 Future 和 Runnable,它同时实现了二者的接口。

把 Callable 实例当做参数,生成 FutureTask 对象,而后把这个对象当做一个 Runnable 对象,用线程池或另起线程去执行 Runnable 对象,最初通过 FutureTask 获取方才执行的后果。

代码演示

public class FutureTest {public static void main(String[] args) {Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        ExecutorService service = Executors.newCachedThreadPool();
        service.submit(integerFutureTask);
        try {System.out.println("task 运行后果:"+integerFutureTask.get());
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {e.printStackTrace();
        } finally {service.shutdown();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {System.out.println("子线程正在计算");
        Thread.sleep(3000);
        int sum = 0;
        for (int i = 0; i <= 100; i++) {sum += i;}
        return sum;
    }
}

运行后果:

子线程正在计算
task 运行后果:5050

FutureTask 留神点

  • Future 的生命周期不能后退,一旦实现后,就停留在实现状态。
  • 当 for 循环批量获取 future 的后果时,容易产生一部分线程慢的状况,get 办法调用时应应用 timeout 限度。也能够应用 CompletableFuture 工具类,它的作用是哪个线程先实现就先获取哪个后果。

正文完
 0