转自:此岸舞 \
链接:www.cnblogs.com/flower-dance/p/13714006.html
什么是 JUC?
JUC 就是 java.util.concurrent 包, 这个包俗称 JUC, 外面都是解决并发问题的一些货色
该包的地位位于 java 上面的 rt.jar 包上面
4 大罕用并发工具类:
CountDownLatch
CyclicBarrier
Semaphore
ExChanger
CountDownLatch:
CountDownLatch, 俗称闭锁, 作用是相似加强版的 Join, 是让一组线程期待其余的线程实现工作当前才执行
就比方在启动框架服务的时候, 咱们主线程须要在环境线程初始化实现之后能力启动, 这时候咱们就能够实现应用 CountDownLatch 来实现
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
在源码中能够看到, 创立 CountDownLatch 时, 须要传入一个 int 类型的参数, 将决定在执行次扣减之后, 期待的线程被唤醒
通过这个类图就能够晓得其实 CountDownLatch 并没有多少货色
办法介绍:
CountDownLatch: 初始化办法
await: 期待办法, 同时带参数的是超时重载办法
countDown: 每执行一次, 计数器减一, 就是初始化传入的数字, 也代表着一个线程实现了工作
getCount: 获取以后值
toString: 这个就不用说了
外面的 Sync 是一个外部类, 里面的办法其实都是操作这个外部类的, 这个外部类继承了 AQS, 实现的规范办法,AQS 将在前面的章节写
主线程中创立 CountDownLatch(3), 而后主线程 await 阻塞, 而后线程 A,B,C 各自实现了工作, 调用了 countDown, 之后, 每个线程调用一次计数器就会减一, 初始是 3, 而后 A 线程调用后变成 2,B 线程调用后变成 1,C 线程调用后, 变成 0, 这时就会唤醒正在 await 的主线程, 而后主线程继续执行
说一千道一万, 不如代码写几行, 上代码:
休眠工具类, 之后的代码都会用到
package org.dance.tools;
import java.util.concurrent.TimeUnit;
/**
* 类阐明:线程休眠辅助工具类
*/
public class SleepTools {
/**
* 按秒休眠
* @param seconds 秒数
*/
public static final void second(int seconds) {
try {TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {}}
/**
* 按毫秒数休眠
* @param seconds 毫秒数
*/
public static final void ms(int seconds) {
try {TimeUnit.MILLISECONDS.sleep(seconds);
} catch (InterruptedException e) {}}
}
package org.dance.day2.util;
import org.dance.tools.SleepTools;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch 的应用, 有五个线程,6 个扣除点
* 扣除实现后主线程和业务线程, 能力执行工作
* 扣除点个别都是大于等于须要初始化的线程的
* @author ZYGisComputer
*/
public class UseCountDownLatch {
/**
* 设置为 6 个扣除点
*/
static CountDownLatch countDownLatch = new CountDownLatch(6);
/**
* 初始化线程
*/
private static class InitThread implements Runnable {
@Override
public void run() {System.out.println("thread_" + Thread.currentThread().getId() + "ready init work .....");
// 执行扣减 扣减不代表完结
countDownLatch.countDown();
for (int i = 0; i < 2; i++) {System.out.println("thread_" + Thread.currentThread().getId() + ".....continue do its work");
}
}
}
/**
* 业务线程
*/
private static class BusiThread implements Runnable {
@Override
public void run() {
// 业务线程须要在等初始化结束后能力执行
try {countDownLatch.await();
for (int i = 0; i < 3; i++) {System.out.println("BusiThread" + Thread.currentThread().getId() + "do business-----");
}
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 创立独自的初始化线程
new Thread(){
@Override
public void run() {SleepTools.ms(1);
System.out.println("thread_" + Thread.currentThread().getId() + "ready init work step 1st.....");
// 扣减一次
countDownLatch.countDown();
System.out.println("begin stop 2nd.....");
SleepTools.ms(1);
System.out.println("thread_" + Thread.currentThread().getId() + "ready init work step 2nd.....");
// 扣减一次
countDownLatch.countDown();}
}.start();
// 启动业务线程
new Thread(new BusiThread()).start();
// 启动初始化线程
for (int i = 0; i <= 3; i++) {new Thread(new InitThread()).start();}
// 主线程进入期待
try {countDownLatch.await();
System.out.println("Main do ites work.....");
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
返回后果:
thread_13 ready init work .....
thread_13.....continue do its work
thread_13.....continue do its work
thread_14 ready init work .....
thread_14.....continue do its work
thread_14.....continue do its work
thread_15 ready init work .....
thread_15.....continue do its work
thread_11 ready init work step 1st.....
begin stop 2nd.....
thread_16 ready init work .....
thread_16.....continue do its work
thread_16.....continue do its work
thread_15.....continue do its work
thread_11 ready init work step 2nd.....
Main do ites work.....
BusiThread 12 do business-----
BusiThread 12 do business-----
BusiThread 12 do business-----
通过返回后果就能够很间接的看到业务线程是在初始化线程齐全跑完之后, 才开始执行的
CyclicBarrier:
CyclicBarrier, 俗称栅栏锁, 作用是让一组线程达到某个屏障, 被阻塞, 始终到组内的最初一个线程达到, 而后屏障凋谢, 接着, 所有的线程持续运行
这个感觉和 CountDownLatch 有点类似, 然而其实是不一样的, 所谓的差异, 将在上面详解
CyclicBarrier 的结构参数有两个
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {this(parties, null);
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
很显著能感觉进去, 下面的结构参数调用了上面的结构参数, 是一个构造方法重载
首先这个第一个参数也树 Int 类型的, 传入的是执行线程的个数, 这个数量和 CountDownLatch 不一样, 这个数量是须要和线程数量吻合的,CountDownLatch 则不一样,CountDownLatch 能够大于等于, 而 CyclicBarrier 只能等于, 而后是第二个参数, 第二个参数是 barrierAction, 这个参数是当屏障凋谢后, 执行的工作线程, 如果当屏障凋谢后须要执行什么工作, 能够写在这个线程中
主线程创立 CyclicBarrier(3,barrierAction), 而后由线程开始执行, 线程 A,B 执行实现后都调用了 await, 而后他们都在一个屏障前阻塞者, 须要期待线程 C 也, 执行实现, 调用 await 之后, 而后三个线程都达到屏障后, 屏障凋谢, 而后线程继续执行, 并且 barrierAction 在屏障凋谢的一瞬间也开始执行
上代码:
package org.dance.day2.util;
import org.dance.tools.SleepTools;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier 的应用
*
* @author ZYGisComputer
*/
public class UseCyclicBarrier {
/**
* 寄存子线程工作后果的平安容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,new CollectThread());
/**
* 后果打印线程
* 用来演示 CyclicBarrier 的第二个参数,barrierAction
*/
private static class CollectThread implements Runnable {
@Override
public void run() {StringBuffer result = new StringBuffer();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {result.append("[" + workResult.getValue() + "]");
}
System.out.println("the result =" + result);
System.out.println("do other business.....");
}
}
/**
* 工作子线程
* 用于 CyclicBarrier 的一组线程
*/
private static class SubThread implements Runnable {
@Override
public void run() {
// 获取以后线程的 ID
long id = Thread.currentThread().getId();
// 放入统计容器中
resultMap.put(String.valueOf(id), id);
Random random = new Random();
try {if (random.nextBoolean()) {Thread.sleep(1000 + id);
System.out.println("Thread_"+id+"..... do something");
}
System.out.println(id+"is await");
cyclicBarrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+".....do its business");
} catch (InterruptedException e) {e.printStackTrace();
} catch (BrokenBarrierException e) {e.printStackTrace();
}
}
}
public static void main(String[] args) {for (int i = 0; i <= 4; i++) {Thread thread = new Thread(new SubThread());
thread.start();}
}
}
返回后果:
11 is await
14 is await
15 is await
Thread_12..... do something
12 is await
Thread_13..... do something
13 is await
the result = [11][12][13][14][15]
do other business.....
Thread_11.....do its business
Thread_12.....do its business
Thread_13.....do its business
Thread_14.....do its business
Thread_15.....do its business
通过返回后果能够看出后面的 11 14 15 三个线程没有进入 if 语句块, 在执行到 await 的时候进入了期待, 而另外 12 13 两个线程进入到了 if 语句块当中, 多休眠了 1 秒多, 而后当 5 个线程同时达到 await 的时候, 屏障凋谢, 执行了 barrierAction 线程, 而后线程组继续执行
解释一下 CountDownLatch 和 CyclicBarrier 的却别吧!
首先就是 CountDownLatch 的结构参数传入的数量个别都是大于等于线程, 数量的, 因为他是有第三方管制的, 能够扣减屡次, 而后就是 CyclicBarrier 的结构参数第一个参数传入的数量肯定是等于线程的个数的, 因为他是由一组线程本身管制的
区别
CountDownLatch CyclicBarrier
管制 第三方管制 本身管制
传入数量 大于等于线程数量 等于线程数量
Semaphore:
Semaphore, 俗称信号量, 作用于管制同时拜访某个特定资源的线程数量, 用在流量管制
一说特定资源管制, 那么第一工夫就想到了数据库连贯..
之前用期待超时模式写了一个数据库连接池, 打算用这个 Semaphone 也写一个
/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
在源码中能够看到在构建 Semaphore 信号量的时候, 须要传入许可证的数量, 这个数量就是资源的最大容许的拜访的线程数
接下里用信号量实现一个数据库连接池
连贯对象
package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
* 数据库连贯
* @author ZYGisComputer
*/
public class SqlConnection implements Connection {
/**
* 获取数据库连贯
* @return
*/
public static final Connection fetchConnection(){return new SqlConnection();
}
@Override
public void commit() throws SQLException {SleepTools.ms(70);
}
@Override
public Statement createStatement() throws SQLException {SleepTools.ms(1);
return null;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {return null;}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {return null;}
@Override
public String nativeSQL(String sql) throws SQLException {return null;}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException { }
@Override
public boolean getAutoCommit() throws SQLException {return false;}
@Override
public void rollback() throws SQLException {}
@Override
public void close() throws SQLException {}
@Override
public boolean isClosed() throws SQLException {return false;}
@Override
public DatabaseMetaData getMetaData() throws SQLException {return null;}
@Override
public void setReadOnly(boolean readOnly) throws SQLException { }
@Override
public boolean isReadOnly() throws SQLException {return false;}
@Override
public void setCatalog(String catalog) throws SQLException { }
@Override
public String getCatalog() throws SQLException {return null;}
@Override
public void setTransactionIsolation(int level) throws SQLException { }
@Override
public int getTransactionIsolation() throws SQLException {return 0;}
@Override
public SQLWarning getWarnings() throws SQLException {return null;}
@Override
public void clearWarnings() throws SQLException {}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {return null;}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {return null;}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {return null;}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException { }
@Override
public void setHoldability(int holdability) throws SQLException { }
@Override
public int getHoldability() throws SQLException {return 0;}
@Override
public Savepoint setSavepoint() throws SQLException {return null;}
@Override
public Savepoint setSavepoint(String name) throws SQLException {return null;}
@Override
public void rollback(Savepoint savepoint) throws SQLException { }
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException { }
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {return null;}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {return null;}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {return null;}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {return null;}
@Override
public Clob createClob() throws SQLException {return null;}
@Override
public Blob createBlob() throws SQLException {return null;}
@Override
public NClob createNClob() throws SQLException {return null;}
@Override
public SQLXML createSQLXML() throws SQLException {return null;}
@Override
public boolean isValid(int timeout) throws SQLException {return false;}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException { }
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException { }
@Override
public String getClientInfo(String name) throws SQLException {return null;}
@Override
public Properties getClientInfo() throws SQLException {return null;}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {return null;}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {return null;}
@Override
public void setSchema(String schema) throws SQLException { }
@Override
public String getSchema() throws SQLException {return null;}
@Override
public void abort(Executor executor) throws SQLException { }
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { }
@Override
public int getNetworkTimeout() throws SQLException {return 0;}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {return null;}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {return false;}
}
连接池对象
package org.dance.day2.util.pool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
* 应用信号量管制数据库的链接和开释
*
* @author ZYGisComputer
*/
public class DBPoolSemaphore {
/**
* 池容量
*/
private final static int POOL_SIZE = 10;
/**
* useful 代表可用连贯
* useless 代表已用连贯
* 为什么要应用两个 Semaphore 呢? 是因为, 在连接池中不只有连贯自身是资源, 空位也是资源, 也须要记录
*/
private final Semaphore useful, useless;
/**
* 连接池
*/
private final static LinkedList<Connection> POOL = new LinkedList<>();
/**
* 应用动态块初始化池
*/
static {for (int i = 0; i < POOL_SIZE; i++) {POOL.addLast(SqlConnection.fetchConnection());
}
}
public DBPoolSemaphore() {
// 初始可用的许可证等于池容量
useful = new Semaphore(POOL_SIZE);
// 初始不可用的许可证容量为 0
useless = new Semaphore(0);
}
/**
* 获取数据库连贯
*
* @return 连贯对象
*/
public Connection takeConnection() throws InterruptedException {
// 可用许可证减一
useful.acquire();
Connection connection;
synchronized (POOL) {connection = POOL.removeFirst();
}
// 不可用许可证数量加一
useless.release();
return connection;
}
/**
* 开释链接
*
* @param connection 连贯对象
*/
public void returnConnection(Connection connection) throws InterruptedException {if(null!=connection){
// 打印日志
System.out.println("以后有"+useful.getQueueLength()+"个线程期待获取连贯,,"
+"可用连贯有"+useful.availablePermits()+"个");
// 不可用许可证减一
useless.acquire();
synchronized (POOL){POOL.addLast(connection);
}
// 可用许可证加一
useful.release();}
}
}
测试类:
package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
* 测试 Semaphore
* @author ZYGisComputer
*/
public class UseSemaphore {
/**
* 连接池
*/
public static final DBPoolSemaphore pool = new DBPoolSemaphore();
private static class BusiThread extends Thread{
@Override
public void run() {
// 随机数工具类 为了让每个线程持有连贯的工夫不一样
Random random = new Random();
long start = System.currentTimeMillis();
try {Connection connection = pool.takeConnection();
System.out.println("Thread_"+Thread.currentThread().getId()+
"_获取数据库连贯耗时 ["+(System.currentTimeMillis()-start)+"]ms.");
// 模仿应用连贯查问数据
SleepTools.ms(100+random.nextInt(100));
System.out.println("查问数据实现偿还连贯");
pool.returnConnection(connection);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
public static void main(String[] args) {for (int i = 0; i < 50; i++) {BusiThread busiThread = new BusiThread();
busiThread.start();}
}
}
测试返回后果:
Thread_11_获取数据库连贯耗时 [0]ms.
Thread_12_获取数据库连贯耗时 [0]ms.
Thread_13_获取数据库连贯耗时 [0]ms.
Thread_14_获取数据库连贯耗时 [0]ms.
Thread_15_获取数据库连贯耗时 [0]ms.
Thread_16_获取数据库连贯耗时 [0]ms.
Thread_17_获取数据库连贯耗时 [0]ms.
Thread_18_获取数据库连贯耗时 [0]ms.
Thread_19_获取数据库连贯耗时 [0]ms.
Thread_20_获取数据库连贯耗时 [0]ms.
查问数据实现偿还连贯
以后有 40 个线程期待获取连贯,, 可用连贯有 0 个
Thread_21_获取数据库连贯耗时 [112]ms.
查问数据实现偿还连贯
................... 查问数据实现偿还连贯
以后有 2 个线程期待获取连贯,, 可用连贯有 0 个
Thread_59_获取数据库连贯耗时 [637]ms.
查问数据实现偿还连贯
以后有 1 个线程期待获取连贯,, 可用连贯有 0 个
Thread_60_获取数据库连贯耗时 [660]ms.
查问数据实现偿还连贯
以后有 0 个线程期待获取连贯,, 可用连贯有 0 个
查问数据实现偿还连贯...................
以后有 0 个线程期待获取连贯,, 可用连贯有 8 个
查问数据实现偿还连贯
以后有 0 个线程期待获取连贯,, 可用连贯有 9 个
通过执行后果能够很明确的看到, 一上来就有 10 个线程获取到了连贯,, 而后前面的 40 个线程进入阻塞, 而后只有开释链接之后, 期待的线程就会有一个拿到, 而后越前面的线程期待的工夫就越长, 而后始终到所有的线程执行结束
最初打印的可用连贯有九个不是因为少了一个是因为在开释之前打印的, 不是谬误
从后果中能够看到, 咱们对连接池中的资源的到了管制, 这就是信号量的流量管制
Exchanger:
Exchanger, 俗称交换器, 用于在线程之间替换数据, 然而比拟受限, 因为只能两个线程之间替换数据
/**
* Creates a new Exchanger.
*/
public Exchanger() {participant = new Participant();
}
这个构造函数没有什么好说的, 也没有入参, 只有在创立的时候指定一下须要替换的数据的泛型即可, 上面看代码
package org.dance.day2.util;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
* 线程之间替换数据
* @author ZYGisComputer
*/
public class UseExchange {private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
public static void main(String[] args) {new Thread(){
@Override
public void run() {Set<String> aSet = new HashSet<>();
aSet.add("A");
aSet.add("B");
aSet.add("C");
try {Set<String> exchange = exchanger.exchange(aSet);
for (String s : exchange) {System.out.println("aSet"+s);
}
} catch (InterruptedException e) {e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {Set<String> bSet = new HashSet<>();
bSet.add("1");
bSet.add("2");
bSet.add("3");
try {Set<String> exchange = exchanger.exchange(bSet);
for (String s : exchange) {System.out.println("bSet"+s);
}
} catch (InterruptedException e) {e.printStackTrace();
}
}
}.start();}
}
执行后果:
bSetAbSetBbSetCaSet1aSet2aSet3
通过执行后果能够清晰的看到, 两个线程中的数据产生了替换, 这就是 Exchanger 的线程数据交换了
以上就是 JUC 的 4 大罕用并发工具类了
近期热文举荐:
1.1,000+ 道 Java 面试题及答案整顿 (2022 最新版)
2. 劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!