转自:此岸舞\
链接:www.cnblogs.com/flower-dance/p/13714006.html

什么是JUC?

JUC就是java.util.concurrent包,这个包俗称JUC,外面都是解决并发问题的一些货色

该包的地位位于java上面的rt.jar包上面

4大罕用并发工具类:

CountDownLatch

CyclicBarrier

Semaphore

ExChanger

CountDownLatch:

CountDownLatch,俗称闭锁,作用是相似加强版的Join,是让一组线程期待其余的线程实现工作当前才执行

就比方在启动框架服务的时候,咱们主线程须要在环境线程初始化实现之后能力启动,这时候咱们就能够实现应用CountDownLatch来实现

/** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked *        before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");    this.sync = new Sync(count);}

在源码中能够看到,创立CountDownLatch时,须要传入一个int类型的参数,将决定在执行次扣减之后,期待的线程被唤醒

通过这个类图就能够晓得其实CountDownLatch并没有多少货色

办法介绍:

CountDownLatch:初始化办法

await:期待办法,同时带参数的是超时重载办法

countDown:每执行一次,计数器减一,就是初始化传入的数字,也代表着一个线程实现了工作

getCount:获取以后值

toString:这个就不用说了

外面的Sync是一个外部类,里面的办法其实都是操作这个外部类的,这个外部类继承了AQS,实现的规范办法,AQS将在前面的章节写

主线程中创立CountDownLatch(3),而后主线程await阻塞,而后线程A,B,C各自实现了工作,调用了countDown,之后,每个线程调用一次计数器就会减一,初始是3,而后A线程调用后变成2,B线程调用后变成1,C线程调用后,变成0,这时就会唤醒正在await的主线程,而后主线程继续执行

说一千道一万,不如代码写几行,上代码:

休眠工具类,之后的代码都会用到

package org.dance.tools;import java.util.concurrent.TimeUnit;/** * 类阐明:线程休眠辅助工具类 */public class SleepTools {    /**     * 按秒休眠     * @param seconds 秒数     */    public static final void second(int seconds) {        try {            TimeUnit.SECONDS.sleep(seconds);        } catch (InterruptedException e) {        }    }    /**     * 按毫秒数休眠     * @param seconds 毫秒数     */    public static final void ms(int seconds) {        try {            TimeUnit.MILLISECONDS.sleep(seconds);        } catch (InterruptedException e) {        }    }}package org.dance.day2.util;import org.dance.tools.SleepTools;import java.util.concurrent.CountDownLatch;/** * CountDownLatch的应用,有五个线程,6个扣除点 * 扣除实现后主线程和业务线程,能力执行工作 *  扣除点个别都是大于等于须要初始化的线程的 * @author ZYGisComputer */public class UseCountDownLatch {    /**     * 设置为6个扣除点     */    static CountDownLatch countDownLatch = new CountDownLatch(6);    /**     * 初始化线程     */    private static class InitThread implements Runnable {        @Override        public void run() {            System.out.println("thread_" + Thread.currentThread().getId() + " ready init work .....");            // 执行扣减 扣减不代表完结            countDownLatch.countDown();            for (int i = 0; i < 2; i++) {                System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");            }        }    }    /**     * 业务线程     */    private static class BusiThread implements Runnable {        @Override        public void run() {            // 业务线程须要在等初始化结束后能力执行            try {                countDownLatch.await();                for (int i = 0; i < 3; i++) {                    System.out.println("BusiThread " + Thread.currentThread().getId() + " do business-----");                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        // 创立独自的初始化线程        new Thread(){            @Override            public void run() {                SleepTools.ms(1);                System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 1st.....");                // 扣减一次                countDownLatch.countDown();                System.out.println("begin stop 2nd.....");                SleepTools.ms(1);                System.out.println("thread_" + Thread.currentThread().getId() + " ready init work step 2nd.....");                // 扣减一次                countDownLatch.countDown();            }        }.start();        // 启动业务线程        new Thread(new BusiThread()).start();        // 启动初始化线程        for (int i = 0; i <= 3; i++) {            new Thread(new InitThread()).start();        }        // 主线程进入期待        try {            countDownLatch.await();            System.out.println("Main do ites work.....");        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

返回后果:

thread_13 ready init work .....thread_13.....continue do its workthread_13.....continue do its workthread_14 ready init work .....thread_14.....continue do its workthread_14.....continue do its workthread_15 ready init work .....thread_15.....continue do its workthread_11 ready init work step 1st.....begin stop 2nd.....thread_16 ready init work .....thread_16.....continue do its workthread_16.....continue do its workthread_15.....continue do its workthread_11 ready init work step 2nd.....Main do ites work.....BusiThread 12 do business-----BusiThread 12 do business-----BusiThread 12 do business-----

通过返回后果就能够很间接的看到业务线程是在初始化线程齐全跑完之后,才开始执行的

CyclicBarrier:

CyclicBarrier,俗称栅栏锁,作用是让一组线程达到某个屏障,被阻塞,始终到组内的最初一个线程达到,而后屏障凋谢,接着,所有的线程持续运行

这个感觉和CountDownLatch有点类似,然而其实是不一样的,所谓的差异,将在上面详解

CyclicBarrier的结构参数有两个

/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} *        before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */public CyclicBarrier(int parties) {    this(parties, null);}/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} *        before the barrier is tripped * @param barrierAction the command to execute when the barrier is *        tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */public CyclicBarrier(int parties, Runnable barrierAction) {    if (parties <= 0) throw new IllegalArgumentException();    this.parties = parties;    this.count = parties;    this.barrierCommand = barrierAction;}

很显著能感觉进去,下面的结构参数调用了上面的结构参数,是一个构造方法重载

首先这个第一个参数也树Int类型的,传入的是执行线程的个数,这个数量和CountDownLatch不一样,这个数量是须要和线程数量吻合的,CountDownLatch则不一样,CountDownLatch能够大于等于,而CyclicBarrier只能等于,而后是第二个参数,第二个参数是barrierAction,这个参数是当屏障凋谢后,执行的工作线程,如果当屏障凋谢后须要执行什么工作,能够写在这个线程中

主线程创立CyclicBarrier(3,barrierAction),而后由线程开始执行,线程A,B执行实现后都调用了await,而后他们都在一个屏障前阻塞者,须要期待线程C也,执行实现,调用await之后,而后三个线程都达到屏障后,屏障凋谢,而后线程继续执行,并且barrierAction在屏障凋谢的一瞬间也开始执行

上代码:

package org.dance.day2.util;import org.dance.tools.SleepTools;import java.util.Map;import java.util.Random;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CyclicBarrier;/** * CyclicBarrier的应用 * * @author ZYGisComputer */public class UseCyclicBarrier {    /**     * 寄存子线程工作后果的平安容器     */    private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());    /**     * 后果打印线程     * 用来演示CyclicBarrier的第二个参数,barrierAction     */    private static class CollectThread implements Runnable {        @Override        public void run() {            StringBuffer result = new StringBuffer();            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {                result.append("[" + workResult.getValue() + "]");            }            System.out.println("the result = " + result);            System.out.println("do other business.....");        }    }    /**     * 工作子线程     * 用于CyclicBarrier的一组线程     */    private static class SubThread implements Runnable {        @Override        public void run() {            // 获取以后线程的ID            long id = Thread.currentThread().getId();            // 放入统计容器中            resultMap.put(String.valueOf(id), id);            Random random = new Random();            try {                if (random.nextBoolean()) {                    Thread.sleep(1000 + id);                    System.out.println("Thread_"+id+"..... do something");                }                System.out.println(id+" is await");                cyclicBarrier.await();                Thread.sleep(1000+id);                System.out.println("Thread_"+id+".....do its business");            } catch (InterruptedException e) {                e.printStackTrace();            } catch (BrokenBarrierException e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        for (int i = 0; i <= 4; i++) {            Thread thread = new Thread(new SubThread());            thread.start();        }    }}

返回后果:

11 is await14 is await15 is awaitThread_12..... do something12 is awaitThread_13..... do something13 is awaitthe result = [11][12][13][14][15]do other business.....Thread_11.....do its businessThread_12.....do its businessThread_13.....do its businessThread_14.....do its businessThread_15.....do its business

通过返回后果能够看出后面的11 14 15三个线程没有进入if语句块,在执行到await的时候进入了期待,而另外12 13两个线程进入到了if语句块当中,多休眠了1秒多,而后当5个线程同时达到await的时候,屏障凋谢,执行了barrierAction线程,而后线程组继续执行

解释一下CountDownLatch和CyclicBarrier的却别吧!

首先就是CountDownLatch的结构参数传入的数量个别都是大于等于线程,数量的,因为他是有第三方管制的,能够扣减屡次,而后就是CyclicBarrier的结构参数第一个参数传入的数量肯定是等于线程的个数的,因为他是由一组线程本身管制的

区别

CountDownLatch  CyclicBarrier

管制   第三方管制     本身管制

传入数量  大于等于线程数量 等于线程数量

Semaphore:

Semaphore,俗称信号量,作用于管制同时拜访某个特定资源的线程数量,用在流量管制

一说特定资源管制,那么第一工夫就想到了数据库连贯..

之前用期待超时模式写了一个数据库连接池,打算用这个Semaphone也写一个

/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. *        This value may be negative, in which case releases *        must occur before any acquires will be granted. */public Semaphore(int permits) {    sync = new NonfairSync(permits);}

在源码中能够看到在构建Semaphore信号量的时候,须要传入许可证的数量,这个数量就是资源的最大容许的拜访的线程数

接下里用信号量实现一个数据库连接池

连贯对象

package org.dance.day2.util.pool;import org.dance.tools.SleepTools;import java.sql.*;import java.util.Map;import java.util.Properties;import java.util.concurrent.Executor;/** * 数据库连贯 * @author ZYGisComputer */public class SqlConnection implements Connection {    /**     * 获取数据库连贯     * @return     */    public static final Connection fetchConnection(){        return new SqlConnection();    }    @Override    public void commit() throws SQLException {        SleepTools.ms(70);    }    @Override    public Statement createStatement() throws SQLException {        SleepTools.ms(1);        return null;    }    @Override    public PreparedStatement prepareStatement(String sql) throws SQLException {        return null;    }    @Override    public CallableStatement prepareCall(String sql) throws SQLException {        return null;    }    @Override    public String nativeSQL(String sql) throws SQLException {        return null;    }    @Override    public void setAutoCommit(boolean autoCommit) throws SQLException {    }    @Override    public boolean getAutoCommit() throws SQLException {        return false;    }    @Override    public void rollback() throws SQLException {    }    @Override    public void close() throws SQLException {    }    @Override    public boolean isClosed() throws SQLException {        return false;    }    @Override    public DatabaseMetaData getMetaData() throws SQLException {        return null;    }    @Override    public void setReadOnly(boolean readOnly) throws SQLException {    }    @Override    public boolean isReadOnly() throws SQLException {        return false;    }    @Override    public void setCatalog(String catalog) throws SQLException {    }    @Override    public String getCatalog() throws SQLException {        return null;    }    @Override    public void setTransactionIsolation(int level) throws SQLException {    }    @Override    public int getTransactionIsolation() throws SQLException {        return 0;    }    @Override    public SQLWarning getWarnings() throws SQLException {        return null;    }    @Override    public void clearWarnings() throws SQLException {    }    @Override    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {        return null;    }    @Override    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {        return null;    }    @Override    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {        return null;    }    @Override    public Map<String, Class<?>> getTypeMap() throws SQLException {        return null;    }    @Override    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {    }    @Override    public void setHoldability(int holdability) throws SQLException {    }    @Override    public int getHoldability() throws SQLException {        return 0;    }    @Override    public Savepoint setSavepoint() throws SQLException {        return null;    }    @Override    public Savepoint setSavepoint(String name) throws SQLException {        return null;    }    @Override    public void rollback(Savepoint savepoint) throws SQLException {    }    @Override    public void releaseSavepoint(Savepoint savepoint) throws SQLException {    }    @Override    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {        return null;    }    @Override    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {        return null;    }    @Override    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {        return null;    }    @Override    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {        return null;    }    @Override    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {        return null;    }    @Override    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {        return null;    }    @Override    public Clob createClob() throws SQLException {        return null;    }    @Override    public Blob createBlob() throws SQLException {        return null;    }    @Override    public NClob createNClob() throws SQLException {        return null;    }    @Override    public SQLXML createSQLXML() throws SQLException {        return null;    }    @Override    public boolean isValid(int timeout) throws SQLException {        return false;    }    @Override    public void setClientInfo(String name, String value) throws SQLClientInfoException {    }    @Override    public void setClientInfo(Properties properties) throws SQLClientInfoException {    }    @Override    public String getClientInfo(String name) throws SQLException {        return null;    }    @Override    public Properties getClientInfo() throws SQLException {        return null;    }    @Override    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {        return null;    }    @Override    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {        return null;    }    @Override    public void setSchema(String schema) throws SQLException {    }    @Override    public String getSchema() throws SQLException {        return null;    }    @Override    public void abort(Executor executor) throws SQLException {    }    @Override    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {    }    @Override    public int getNetworkTimeout() throws SQLException {        return 0;    }    @Override    public <T> T unwrap(Class<T> iface) throws SQLException {        return null;    }    @Override    public boolean isWrapperFor(Class<?> iface) throws SQLException {        return false;    }}

连接池对象

package org.dance.day2.util.pool;import java.sql.Connection;import java.util.ArrayList;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.concurrent.Semaphore;/** * 应用信号量管制数据库的链接和开释 * * @author ZYGisComputer */public class DBPoolSemaphore {    /**     * 池容量     */    private final static int POOL_SIZE = 10;    /**     * useful 代表可用连贯     * useless 代表已用连贯     *  为什么要应用两个Semaphore呢?是因为,在连接池中不只有连贯自身是资源,空位也是资源,也须要记录     */    private final Semaphore useful, useless;    /**     * 连接池     */    private final static LinkedList<Connection> POOL = new LinkedList<>();    /**     * 应用动态块初始化池     */    static {        for (int i = 0; i < POOL_SIZE; i++) {            POOL.addLast(SqlConnection.fetchConnection());        }    }    public DBPoolSemaphore() {        // 初始可用的许可证等于池容量        useful = new Semaphore(POOL_SIZE);        // 初始不可用的许可证容量为0        useless = new Semaphore(0);    }    /**     * 获取数据库连贯     *     * @return 连贯对象     */    public Connection takeConnection() throws InterruptedException {        // 可用许可证减一        useful.acquire();        Connection connection;        synchronized (POOL) {            connection = POOL.removeFirst();        }        // 不可用许可证数量加一        useless.release();        return connection;    }    /**     * 开释链接     *     * @param connection 连贯对象     */    public void returnConnection(Connection connection) throws InterruptedException {        if(null!=connection){            // 打印日志            System.out.println("以后有"+useful.getQueueLength()+"个线程期待获取连贯,,"                    +"可用连贯有"+useful.availablePermits()+"个");            // 不可用许可证减一            useless.acquire();            synchronized (POOL){                POOL.addLast(connection);            }            // 可用许可证加一            useful.release();        }    }}

测试类:

package org.dance.day2.util.pool;import org.dance.tools.SleepTools;import java.sql.Connection;import java.util.Random;/** * 测试Semaphore * @author ZYGisComputer */public class UseSemaphore {    /**     * 连接池     */    public static final DBPoolSemaphore pool = new DBPoolSemaphore();    private static class BusiThread extends Thread{        @Override        public void run() {            // 随机数工具类 为了让每个线程持有连贯的工夫不一样            Random random = new Random();            long start = System.currentTimeMillis();            try {                Connection connection = pool.takeConnection();                System.out.println("Thread_"+Thread.currentThread().getId()+                        "_获取数据库连贯耗时["+(System.currentTimeMillis()-start)+"]ms.");                // 模仿应用连贯查问数据                SleepTools.ms(100+random.nextInt(100));                System.out.println("查问数据实现偿还连贯");                pool.returnConnection(connection);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        for (int i = 0; i < 50; i++) {            BusiThread busiThread = new BusiThread();            busiThread.start();        }    }}

测试返回后果:

Thread_11_获取数据库连贯耗时[0]ms.Thread_12_获取数据库连贯耗时[0]ms.Thread_13_获取数据库连贯耗时[0]ms.Thread_14_获取数据库连贯耗时[0]ms.Thread_15_获取数据库连贯耗时[0]ms.Thread_16_获取数据库连贯耗时[0]ms.Thread_17_获取数据库连贯耗时[0]ms.Thread_18_获取数据库连贯耗时[0]ms.Thread_19_获取数据库连贯耗时[0]ms.Thread_20_获取数据库连贯耗时[0]ms.查问数据实现偿还连贯以后有40个线程期待获取连贯,,可用连贯有0个Thread_21_获取数据库连贯耗时[112]ms.查问数据实现偿还连贯...................查问数据实现偿还连贯以后有2个线程期待获取连贯,,可用连贯有0个Thread_59_获取数据库连贯耗时[637]ms.查问数据实现偿还连贯以后有1个线程期待获取连贯,,可用连贯有0个Thread_60_获取数据库连贯耗时[660]ms.查问数据实现偿还连贯以后有0个线程期待获取连贯,,可用连贯有0个查问数据实现偿还连贯...................以后有0个线程期待获取连贯,,可用连贯有8个查问数据实现偿还连贯以后有0个线程期待获取连贯,,可用连贯有9个

通过执行后果能够很明确的看到,一上来就有10个线程获取到了连贯,,而后前面的40个线程进入阻塞,而后只有开释链接之后,期待的线程就会有一个拿到,而后越前面的线程期待的工夫就越长,而后始终到所有的线程执行结束

最初打印的可用连贯有九个不是因为少了一个是因为在开释之前打印的,不是谬误

从后果中能够看到,咱们对连接池中的资源的到了管制,这就是信号量的流量管制

Exchanger:

Exchanger,俗称交换器,用于在线程之间替换数据,然而比拟受限,因为只能两个线程之间替换数据

/** * Creates a new Exchanger. */public Exchanger() {    participant = new Participant();}

这个构造函数没有什么好说的,也没有入参,只有在创立的时候指定一下须要替换的数据的泛型即可,上面看代码

package org.dance.day2.util;import java.util.HashSet;import java.util.Set;import java.util.concurrent.Exchanger;/** * 线程之间替换数据 * @author ZYGisComputer */public class UseExchange {    private static final Exchanger<Set<String>> exchanger = new Exchanger<>();    public static void main(String[] args) {        new Thread(){            @Override            public void run() {                Set<String> aSet = new HashSet<>();                aSet.add("A");                aSet.add("B");                aSet.add("C");                try {                    Set<String> exchange = exchanger.exchange(aSet);                    for (String s : exchange) {                        System.out.println("aSet"+s);                    }                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }.start();        new Thread(){            @Override            public void run() {                Set<String> bSet = new HashSet<>();                bSet.add("1");                bSet.add("2");                bSet.add("3");                try {                    Set<String> exchange = exchanger.exchange(bSet);                    for (String s : exchange) {                        System.out.println("bSet"+s);                    }                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }.start();    }}

执行后果:

bSetAbSetBbSetCaSet1aSet2aSet3

通过执行后果能够清晰的看到,两个线程中的数据产生了替换,这就是Exchanger的线程数据交换了

以上就是JUC的4大罕用并发工具类了

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2022最新版)

2.劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!