共计 12963 个字符,预计需要花费 33 分钟才能阅读完成。
明天的文章将给大家分享 Java 并发编程相干的知识点,尽管相似的文章已有很多,但本文将以更贴近理论应用场景的形式进行论述。具体将对 Java 常见的并发编程形式和伎俩进行总结,以便能够从应用角度更好地感知 Java 并发编程带来的成果,从而为后续更深刻的了解 Java 并发机制进行铺垫。
Java 多线程概述
在 Java 中应用多线程是进步程序并发响应能力的重要伎俩,但同时它也是一把双刃剑;如果使用不当也很容易导致程序出错,并且还很难直观地找到问题。这是因为:1)、线程运行自身是由操作系统调度,具备肯定的随机性;2)、Java 共享内存模型在多线程环境下很容易产生线程平安问题;3)、不合理的封装依赖,极容易导致公布对象的不经意逸出。
所以,要用好多线程这把剑,就须要对 Java 内存模型、线程平安问题有较深的意识。但因为 Java 丰盛的生态,在理论研发工作中,须要咱们本人进行并发解决的场景大都被各类框架或组件给屏蔽了。这也是造成很多 Java 开发人员对并发编程意识淡薄的次要起因。
首先从 Java 内存模型的角度了解下应用多线程编程最外围的问题,具体如下图所示:
如上图所示,在 Java 内存模型中,对于用户程序来说用得最频繁的就是堆内存和栈内存,其中 堆内存次要寄存对象及数组 ,例如由 new() 产生的实例。而 栈内存则次要是存储运行办法时所需的局部变量、操作数及办法进口 等信息。
其中 堆内存是线程共享的,一个类被实例化后生成的对象、及对象中定义的成员变量能够被多个线程共享拜访,这种共享次要体现在多个线程同时执行、同一个对象实例的某个办法时,会将该办法中操作的对象成员变量别离以多个正本的形式拷贝到办法栈中进行操作,而不是间接批改堆内存中对象的成员变量值;线程操作实现后,会再次将批改后的变量值同步至堆内存中的主内存地址,并实现对其余线程的可见。
这个过程尽管看似行云流水,但在 JVM 中却至多须要 6 个原子步骤 能力实现,具体如下图所示:
如上图所示,在不思考对共享变量进行加锁的状况下,堆内存中一个对象的成员变量被线程批改大略须要以下 6 个步骤:
1、read(读取):从堆内存中的读取要操作的变量;
2、load(载入):将读取的变量拷贝到线程栈内存;
3、use(应用):将栈内存中的变量值传递给执行引擎;
4、assign(赋值):将从执行引擎失去的后果赋值给栈内存中变量;
5、store(存储):将变更后的栈内存中的变量值传递到主内存;
6、write(写入):变更主内存中的变量值,此时新值对所有线程可见;
由此可见,每个线程都能够按这几个步骤并行操作同一个共享变量。可想而知,如果没有任何同步措施,那么在多线程环境下,该共享变量的值将变得飘忽不定,很难失去最终正确的后果。而这就是所谓的 线程平安 问题,也 是咱们在应用多线程编程时,最须要关注的问题!
线程池的应用
在理论场景中,多线程的应用并不是单打独斗,线程作为贵重的系统资源,其创立和销毁都须要消耗肯定的系统资源;而无限度的创立线程资源,也会导致系统资源的耗尽。所以,为了重复使用线程资源、限度线程的创立行为,个别都会通过线程池来实现。以 Java Web 服务中应用最广的 Tomcat 服务器举例,为了并行处理网络申请就应用了线程池,源码示例如下:
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {if (socketWrapper == null) {return false;}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {sc = processorCache.pop();
}
if (sc == null) {sc = createSocketProcessor(socketWrapper, event);
} else {sc.reset(socketWrapper, event);
}
// 这里通过线程池对线程执行进行治理
Executor executor = getExecutor();
if (dispatch && executor != null) {executor.execute(sc);
} else {sc.run();
}
} catch (RejectedExecutionException ree) {getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
上述代码为 Tomcat 源码应用线程池并发解决网络申请的示例,这里以 Tomcat 为例,次要是因为基于 Spring Boot、Spring MVC 开发的 Web 服务大都运行在 Tomcat 容器,而对于线程、线程池应用的复杂度都被屏蔽在中间件和框架中了,所以很多同学尽管写了不少 Java 代码,但在业务研发中额定应用线程的场景可能并不多,举这个例子的目标就是为了晋升下并发编程的意识!
在 Java 中应用线程池的次要形式是 Executor 框架,该框架作为 JUC 并发包的一部分,为 Java 程序提供了一个灵便的线程池实现。其逻辑档次如下图所示:
如图所示,应用 Executor 框架,既能够通过间接自定义配置、扩大 ThreadPoolExecutor 来创立一个线程池,也能够通过 Executors 类间接调用 “newSingleThreadExecutor()、newFixedThreadPool()、newCachedThreadPool()” 这三个办法来创立具备肯定性能特色的线程池。
除此之外,也能够通过自定义配置、扩大 ScheduledThreadPoolExecutor 来创立一个具备周期性、定时性能的线程池,例如线程 10s 后运行、线程每分钟运行一次等。同样,与 ThreadPoolExecutor 一样,如果不想自定义配置,也能够通过 Executors 类间接调用 “newScheduledThreadPool()、newSingleThreadScheduledExecutor()” 这两个办法来别离创立具备自动线程规模扩大能力和线程池中只容许有单个线程的特定线程池。
而 ForkJoinPool 是 jdk1.8 当前新增的一种线程池实现类型,相似于 Fork-Join 框架所反对的性能。这是一种能够将一个大工作拆分成多个工作队列,并具体调配给不同线程解决的机制,而要害的个性在于,通过 窃取算法,某个线程在执行完本队列工作后,能够窃取其余队列的工作进行执行,从而最大限度进步线程的利用效率。
在理论利用中,尽管能够通过 Executors 不便的创立单个线程、固定线程或具备主动膨胀能力的线程池,但个别还是倡议 间接通过 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 自定义配置 ,这次要是因为 Executors 默认创立的线程池,很多采纳的是 无界队列,例如 LinkedBlockingQueue,这样线程就能够被无限度的增加都线程池的工作执行队列,如果申请量过大容易造成 OOM。
接下来以一个理论的例子来演示通过 ThreadPoolExecutor 如何自定义配置一个业务线程池,具体如下:
1)、配置一个线程池类
public final class SingleBlockPoolExecutor {
/**
* 自定义配置线程池(线程池外围线程数、最大线程数、存活工夫设置、采纳的队列类型、线程工厂类、线程池回绝解决类)*/
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(30, 100, 5, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(100), new BlockThreadFactory(), new BlockRejectedExecutionHandler());
public ThreadPoolExecutor getPool() {return pool;}
private SingleBlockPoolExecutor() {}
/**
* 定义线程工厂
*/
public static class BlockThreadFactory implements ThreadFactory {private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {Thread t = new Thread(r);
String threadName = SingleBlockPoolExecutor.class.getSimpleName() + "-" + count.addAndGet(1);
t.setName(threadName);
return t;
}
}
/**
* 定义线程池回绝机制解决类
*/
public static class BlockRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 被拒线程再次返回阻塞队列进行期待解决
executor.getQueue().put(r);
} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}
}
/**
* 在动态外部类中持有单例类的实例,并且可间接被初始化
*/
private static class Holder {private static SingleBlockPoolExecutor instance = new SingleBlockPoolExecutor();
}
/**
* 调用 getInstance 办法,事实上是取得 Holder 的 instance 动态属性
*
* @return
*/
public static SingleBlockPoolExecutor getInstance() {return Holder.instance;}
/**
* 线程池销毁办法
*/
public void destroy() {if (pool != null) {
// 线程池销毁
pool.shutdownNow();}
}
}
如上述代码所示,通过单例模式配置了一个线程池。在对 ThreadPoolExecutor 的配置中,须要设置“外围线程数、最大线程数、存活工夫设置、采纳的队列类型、线程工厂类、线程池回绝解决类”,这几个外围参数。
2)、定义零碎全局线程池治理类
public class AsyncManager {
/**
* 工作解决公共线程池
*/
public static final ExecutorService service = SingleBlockPoolExecutor.getInstance().getPool();
}
在利用中,除了框架定义的线程池外,如果自定义线程池,为了不便对立治理和应用,能够建设一个全局治理类,如上所示,该类通过动态变量的形式初始化了后面咱们所定义的线程池。
3)、业务中应用
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Override
public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
//1、同步解决外围业务逻辑
log.info("同步解决业务逻辑");
//2、通过线程池提交,异步解决非核心逻辑,例如日志埋点
AsyncManager.service.execute(() -> {System.out.println("线程 ->" + Thread.currentThread().getName() + ", 正在执行异步日志解决工作");
});
return CreateOrderBO.builder().result(true).build();}
}
如上代码所示,业务中须要通过线程池异步解决时,能够通过线程池治理类获取对应的线程池,并向其提交执行线程工作。
FutureTask 实现异步后果返回
在应用 Thread 或 Runnable 实现的线程解决中,个别是不能返回线程处理结果的。但如果心愿在调用线程异步解决实现后,可能取得线程异步解决的后果,那么就能够通过 FutureTask 框架 实现。示例代码如下:
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Override
public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
//Future 异步解决返回执行后果
// 定义接管线程执行后果的 FutureTask 对象
List<Future<Integer>> results = Collections.synchronizedList(new ArrayList<>());
// 实现 Callable 接口定义线程执行逻辑
results.add(AsyncManager.service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int a = 1, b = 2;
System.out.println("Callable 接口执行中");
return a + b;
}
}));
// 输入线程返回后果
for (Future<Integer> future : results) {
try {
// 这里获取后果,等待时间设置 200 毫秒
System.out.println("a+b=" + future.get(200, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
} catch (TimeoutException e) {e.printStackTrace();
}
}
// 判断线程是否执行结束,结束则获取执行后果
return CreateOrderBO.builder().result(true).build();}
}
如上述代码,如果心愿线程返回执行后果,那么能够通过实现 Callable 接口定义线程类,并通过 FutureTask 接管线程处理结果。不过在 理论应用时,须要留神线程临时未执行实现状况下的业务解决逻辑。
CountDownLatch 实现线程并行同步
在并发编程中,一个简单的业务逻辑能够通过多个线程并发执行来进步速度;但如果须要同步期待这些线程执行完后能力进行后续的逻辑,那么就能够通过 CountDownLatch 来实现对多个线程执行的同步汇聚。其逻辑示意图如下:
从原理上看 CountDownLatch 实际上是在其外部创立并保护了一个 volatile 类型的整数计数器,当调用 countDown() 办法时,会尝试将整数计数器 -1,当调用 wait()办法时,以后线程就会判断整数计数器是否为 0,如果为 0,则持续往下执行,如果不为 0,则使以后线程进入阻塞状态,直到某个线程将计数器设置为 0,才会唤醒在 await()办法中期待的线程继续执行。
常见的代码应用示例如下:
1)、创立执行具体业务逻辑的线程解决类
public class DataDealTask implements Runnable {
private List<Integer> list;
private CountDownLatch latch;
public DataDealTask(List<Integer> list, CountDownLatch latch) {
this.list = list;
this.latch = latch;
}
@Override
public void run() {
try {System.out.println("线程 ->" + Thread.currentThread().getName() + ",解决" + list.size());
} finally {
// 解决完计数器递加
latch.countDown();}
}
}
该线程解决类,在实例化时接管除了待处理数据参数外,还会接管 CountDownLatch 对象,在执行完线程逻辑,留神,无论胜利或失败,都须要调用 countDown()办法。
2)、具体的应用办法
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Override
public CreateOrderBO createOrder(CreateOrderDTO createOrderDTO) {
//CountDownLatch 的应用示例
// 模仿待处理数据生成
Integer[] array = {10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101, 102};
List<Integer> list = new ArrayList<>();
Arrays.asList(array).stream().map(o -> list.add(o)).collect(Collectors.toList());
// 对数据进行分组解决(5 条记录为 1 组)
Map<String, List<?>> entityMap = this.groupListByAvg(list, 6);
// 依据数据分组数量,确定同步计数器的值
CountDownLatch latch = new CountDownLatch(entityMap.size());
Iterator<Entry<String, List<?>>> it = entityMap.entrySet().iterator();
try {
// 将分组数据分批提交给不同线程解决
while (it.hasNext()) {DataDealTask dataDealTask = new DataDealTask((List<Integer>) it.next().getValue(), latch);
AsyncManager.service.submit(dataDealTask);
}
// 期待分批解决线程解决实现
latch.await();} catch (InterruptedException e) {e.printStackTrace();
}
return CreateOrderBO.builder().result(true).build();}
}
如上所示代码,在业务逻辑中如果解决数据量多,则能够通过分组的形式并行处理,而期待所有线程解决实现后,再同步返回调用方。这种场景就能够通过 CountDownLatch 来实现同步!
CycliBarrier 栅栏实现线程阶段性同步
CountDownLatch 的性能次要是实现线程的一次性同步。而在理论的业务场景中也可能存在这样的状况,执行一个阶段性的工作,例如”阶段 1 -> 阶段 2 -> 阶段 3 -> 阶段 4 -> 阶段 5 ″。那么在并发解决这个阶段性工作时,就要在每个阶段设置栅栏,只有当所有线程执行到某个阶段点之后,能力持续推动下一个阶段工作的执行,其逻辑如图所示:
针对上述场景,就能够通过 CycliBarrier 来实现。而从实现上看,CyclicBarrier 应用了基于 ReentrantLock 的互斥锁实现;在 CyclicBarrier 的外部有一个计数器 count,当 count 不为 0 时,每个线程在达到同步点会先调用 await 办法将本人阻塞,并将计数器会减 1,直到计数器减为 0 的时候,所有因调用 await 办法而被阻塞的线程就会被唤醒继续执行。并进入下一轮阻塞,此时在 new CyclicBarrier(parties) 时设置的 parties 值,会被赋值给 count 从而实现复用。
例如,计算某个部门的员工工资,要求在所有员工工资都计算完之后能力进行下一步整合操作。其代码示例如下:
@Slf4j
@Service
public class SalaryStatisticServiceImpl implements SalaryStatisticService {
/**
* 模仿部门员工存储数据
*/
public static Map<String, List<EmployeeSalaryInfo>> employeeMap = Collections.synchronizedMap(new HashMap<>());
static {EmployeeSalaryInfo employeeA = new EmployeeSalaryInfo();
employeeA.setEmployeeNo("100");
employeeA.setBaseSalaryAmount(10000);
employeeA.setSubsidyAmount(3000);
EmployeeSalaryInfo employeeB = new EmployeeSalaryInfo();
employeeB.setEmployeeNo("101");
employeeB.setBaseSalaryAmount(30000);
employeeB.setSubsidyAmount(3000);
List<EmployeeSalaryInfo> list = new ArrayList<>();
list.add(employeeA);
list.add(employeeB);
employeeMap.put("10", list);
}
@Override
public StatisticReportBO statisticReport(StatisticReportDTO statisticReportDTO) {
// 查问部门下所有员工信息(模仿)List<EmployeeSalaryInfo> employeeSalaryInfos = employeeMap.get(statisticReportDTO.getDepartmentNo());
if (employeeSalaryInfos == null) {log.info("部门员工信息不存在");
return StatisticReportBO.builder().build();
}
// 定义统计总工资的平安变量
AtomicInteger totalSalary = new AtomicInteger();
// 开启栅栏(在各线程触发之后触发)
CyclicBarrier cyclicBarrier = new CyclicBarrier(employeeSalaryInfos.size(), new Runnable() {// 执行程序 -B1(随机)
// 该线程不会阻塞主线程
@Override
public void run() {log.info("汇总已别离计算出的两个员工的工资 ->" + totalSalary.get() + ", 执行程序 ->B");
}
});
// 执行程序 -A
for (EmployeeSalaryInfo e : employeeSalaryInfos) {AsyncManager.service.submit(new Callable<Integer>() {
@Override
public Integer call() {int totalAmount = e.getSubsidyAmount() + e.getBaseSalaryAmount();
log.info("计算出员工{}", e.getEmployeeNo() + "的工资 ->" + totalAmount + ", 执行程序 ->A");
// 汇总总工资
totalSalary.addAndGet(totalAmount);
try {
// 期待其余线程同步
cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();
} catch (BrokenBarrierException e) {e.printStackTrace();
}
return totalAmount;
}
});
}
// 执行程序 -A/B(之前或之后随机,totalSalary 值不能保障肯定会失去,所以 CyclicBarrier 更适宜无返回的可反复并行计算)// 封装响应参数
StatisticReportBO statisticReportBO = StatisticReportBO.builder().employeeCount(employeeSalaryInfos.size())
.departmentNo(statisticReportDTO.getDepartmentNo())
.salaryTotalAmount(totalSalary.get()).build();
log.info("封装接口响应参数,执行程序 ->A/B");
return statisticReportBO;
}
@Data
public static class EmployeeSalaryInfo {
/**
* 员工编号
*/
private String employeeNo;
/**
* 基本工资
*/
private Integer baseSalaryAmount;
/**
* 贴补金额
*/
private Integer subsidyAmount;
}
}
上述代码的执行后果如下:
[kPoolExecutor-1] c.w.c.s.impl.SalaryStatisticServiceImpl : 计算出员工 100 的工资 ->13000, 执行程序 -
[kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 计算出员工 101 的工资 ->33000, 执行程序 -
[kPoolExecutor-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 汇总已别离计算出的两个员工的工资 ->46000,
[nio-8080-exec-2] c.w.c.s.impl.SalaryStatisticServiceImpl : 封装接口响应参数,执行程序 ->A/B
从上述后果能够看出,受 CycliBarrier 管制的线程会期待其余线程执行实现后同步向后执行,并且 CycliBarrier 并不会阻塞主线程,所以最初响应参数封装代码可能在 CycliBarrier 汇总线程之前执行,也可能在其之后执行,应用时须要留神!
Semaphore(信号量)限度拜访资源的线程数
Semaphore 能够实现对某个共享资源拜访线程数的限度,实现限流性能。以停车场线程为例,代码如下:
@Service
@Slf4j
public class ParkServiceImpl implements ParkService {
/**
* 模仿停车场的车位数
*/
private static Semaphore semaphore = new Semaphore(2);
@Override
public AccessParkBO accessPark(AccessParkDTO accessParkDTO) {AsyncManager.service.execute(() -> {if (semaphore.availablePermits() == 0) {log.info(Thread.currentThread().getName() + ", 车牌号 ->" + accessParkDTO.getCarNo() + ", 车位有余请急躁期待");
} else {
try {
// 获取令牌尝试进入停车场
semaphore.acquire();
log.info(Thread.currentThread().getName() + ", 车牌号 ->" + accessParkDTO.getCarNo() + ", 胜利进入停车场");
// 模仿车辆在停车场停留的工夫(30 秒)Thread.sleep(30000);
// 开释令牌,腾出停车场车位
semaphore.release();
log.info(Thread.currentThread().getName() + ", 车牌号 ->" + accessParkDTO.getCarNo() + ", 驶出停车场");
} catch (InterruptedException e) {e.printStackTrace();
}
}
});
// 封装返回信息
return AccessParkBO.builder().carNo(accessParkDTO.getCarNo())
.currentPositionCount(semaphore.availablePermits())
.isPermitAccess(semaphore.availablePermits() > 0 ? true : false).build();}
}
上述代码模仿停车场有 2 车位,并且每辆车进入车场后会停留 30 秒,而后并行模仿 3 次停车申请,具体执行成果如下:
[kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-1, 车牌号 ->10, 胜利进入停车场 程序 ->A
[kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-2, 车牌号 ->20, 胜利进入停车场 程序 ->A
[kPoolExecutor-3] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-3, 车牌号 ->30, 车位有余请急躁期待 00, 执行程序 ->B
[kPoolExecutor-1] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-1, 车牌号 ->10, 驶出停车场
[kPoolExecutor-2] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-2, 车牌号 ->20, 驶出停车场
[kPoolExecutor-4] c.w.c.service.impl.ParkServiceImpl : SingleBlockPoolExecutor-4, 车牌号 ->30, 胜利进入停车场
能够看到因为通过 Semaphore 限度了可容许进入的线程数是 2 个,所以第三次申请会被回绝,直到前两次申请通过 .release() 办法开释证书后第 4 次申请才会被容许进入!
后记
本文从利用层面总结了,JVM 根本的内存模型以及线程对共享内存操作的原子形式,并着重介绍了 线程池、FutrueTask、CountDownLatch、CycliBarrier 以及 Semaphore这几种在 Java 并发编程中常常应用的 JUC 工具类。
写在最初
欢送大家关注我的公众号【惊涛骇浪如码】,海量 Java 相干文章,学习材料都会在外面更新,整顿的材料也会放在外面。
感觉写的还不错的就点个赞,加个关注呗!点关注,不迷路,继续更新!!!