共计 9715 个字符,预计需要花费 25 分钟才能阅读完成。
Java 并发设计模式
一、Thread Local Storage 模式
1. ThreadLocal 的使用
Thread Local Storage 表示线程本地存储模式。
大多数并发问题都是由于变量的共享导致的,多个线程同时读写同一变量便会出现原子性,可见性等问题。局部变量是线程安全的,本质上也是由于各个线程各自拥有自己的变量,避免了变量的共享。
Java 中使用了 ThreadLocal 来实现避免变量共享的方案。ThreadLocal 保证在线程访问变量时,会创建一个这个变量的副本,这样每个线程都有自己的变量值,没有共享,从而避免了线程不安全的问题。
下面是 ThreadLocal 的一个简单使用示例:
public class ThreadLocalTest {
private static final ThreadLocal<SimpleDateFormat> threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public static SimpleDateFormat safeDateFormat() {return threadLocal.get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<SimpleDateFormat> task1 = new FutureTask<>(ThreadLocalTest::safeDateFormat);
FutureTask<SimpleDateFormat> task2 = new FutureTask<>(ThreadLocalTest::safeDateFormat);
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
t1.start();
t2.start();
System.out.println(task1.get() == task2.get());// 返回 false,表示两个对象不相等
}
}
程序中构造了一个线程安全的 SimpleDateFormat,两个线程取到的是不同的示例对象,这样就保证了线程安全。
2. ThreadLocal 原理浅析
线程 Thread 类内部有两个 ThreadLocalMap 类型的变量:
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
其中第二个变量的用途是创建可继承父线程变量的子线程,只不过这并不常用,主要介绍第一个。
ThreadLocalMap 是一个用于存储 ThreadLocal 的特殊 HashMap,map 中 key 就是 ThreadLocal,value 是线程变量值。只不过这个 map 并不被 ThreadLocal 持有,而是被 Thread 持有。
当调用 ThreadLocal 类中的 set 方法时,就会创建 Thread 中的 threadLocals 属性。
//ThreadLocal 的 set 方法
public void set(T value) {Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);// 获取 Thread 中的 ThreadLocalMap
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
可以看到,最终的 ThreadLocal 对象和变量值并不是创建在 ThreadLocal 内部,而是 Thread 中的 ThreadLocalMap,ThreadLocal 在这里只是充当了代理的作用。
3. ThreadLocal 内存泄漏问题
存储数据的 TheadLocalMap 被 Thread 持有,而不是 ThreadLocal,主要的原因便是 ThreadLocal 的生命周期比 Thread 要长,如果 ThreadLocal 对象一直存在,那么 map 中的线程就不能被回收,容易导致内存泄漏。
而 Thread 持有 ThreadLocalMap,并且 ThreadLocalMap 对 ThreadLocal 的引用还是弱引用,这样当线程被回收时,map 也能够被回收,更加安全。
但是 Java 的这种设计并没有完全避免内存泄漏问题。如果线程池中的线程存活时间过长,那么其持有的 ThreadLocalMap 一直不会被释放。ThreadLocalMap 中的 Entry 对其 value 是强引用的(对 ThreadLocal 是弱引用),这样就算 ThreadLocalMap 的生命周期结束了,但是 value 值并没有被回收。
解决的办法便是手动释放 ThreadLocalMap 中对 value 的强引用,可以使用 TheadLocal 的 remove 方法。在 finally 语句块中执行。例如下面这个简单的示例:
public class ThreadLocalTest {private final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
public void test(){
// 设置变量值
threadLocal.set(10);
try {System.out.println(threadLocal.get());
}
finally {
// 释放
threadLocal.remove();}
}
}
二、Immutability 模式
1. 不可变的概念
Immutability,即不变模式。可以理解为只要对象一经创建,其状态是不能够被改变的,无法进行写操作。
要实现 Immuatability 模式很简单, 将一个类本身及其所有的属性都设为 final,并且方法都是只读的,需要注意的是,如果类的属性也是引用类型,那么其对应的类也要满足不可变的特性 。final 应该都很熟悉了,用它来修饰类和方法,分别表示类不可继承、属性不可改变。
Java 中具备不可变性的类型包括:
- String
- final 修饰的基本数据类型
- Integer、Long、Double 等基本数据类型的包装类
- Collections 中的不可变集合
具备不可变性的类,如果需要有类似修改这样的功能,那么它不会像普通的对象一样改变自己的属性,而是创建新的对象。
下面是 String 的字符串连接方法 concat() 的源码,仔细观察,可以看到最后方法返回的时候,创建了一个新的 Sring 对象:
public String concat(String str) {int otherLen = str.length();
if (otherLen == 0) {return this;}
int len = value.length;
char buf[] = Arrays.copyOf(value, len + otherLen);
str.getChars(buf, len);
// 创建新的对象
return new String(buf, true);
}
而 Collections 工具可以将集合变为不可变的,完全禁止写、修改等操作。示例如下:
//Collections 中构建不可变集合的方法
Collections.unmodifiableList();
Collections.unmodifiableSet();
Collections.unmodifiableMap();
Collections.unmodifiableSortedSet();
Collections.unmodifiableSortedMap();
---
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 构建不可变集合
List<Integer> unmodifiableList = Collections.unmodifiableList(list);
unmodifiableList.remove(1);// 抛出异常
2. 对象池
对于一个不可变性的类,如果频繁的对其进行修改操作,那么一直会创建性新的对象,这样就比较浪费内存空间了,一种解决办法便是利用对象池。
原理也很简单,新建对象的时候,去对象池看是否存在对象,如果存在则直接利用,如果不存在才会创建新的对象,创建之后再将对象放到对象池中。
以长整型的包装类 Long 为例,它缓存了 -128 到 127 的数据,如果创建的是这个区间的对象,那么会直接使用缓存中的对象。例如 Long 中的 valueOf 方法就用到了这个缓存,然后直接返回:
public static Long valueOf(long l) {
final int offset = 128;
// 在这个区间则直接使用缓存中的对象
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
三、Guarded Suspension 模式
1. Guarded Suspension 实现
Guarded Suspension 意为保护性暂停。一个典型的使用场景是:当客户端线程 T 发送请求后,服务端这时有大量的请求需要处理,这时候就需要排队,线程 T 进入等待状态,直到服务端处理完请求并且返回结果。
Guarded Suspension 的实现很简单,有一个对象 GuardedObject,其内部有一个属性,即被保护的对象,还有两个方法,客户端调用 get() 方法,如果未获取到结果,则进入等待状态,即“保护性暂停”;还有一个 notice() 通知方法,当服务端处理完请求后,调用这个方法,并且唤醒等待中的线程。示意图如下:
示例代码如下:
public class GuardedObject<T> {
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition finished = lock.newCondition();
// 调用方线程获取结果
T get(){lock.lock();
try {while ( 未获取到结果){finished.await();
}
} catch (InterruptedException e) {e.printStackTrace();
}
finally {lock.unlock();
}
return obj;
}
// 执行完后通知
void notice(T obj){lock.lock();
try {
this.obj = obj;
finished.signalAll();}
finally {lock.unlock();
}
}
}
从代码中可以看到,Guarded Suspension 模式本质上就是一种等待 - 通知机制,只不过使用这种模式,在解决实际的问题的时候,需要根据情况进行程序功能的扩展。
2. 使用示例
还是上面提到的那个例子,当客户端发送请求后,需要等待服务端的响应结果,这时候就可以使用 Guarded Suspension 来实现,下面是代码示例:
public class SendRequest<T> {
// 相当于消息队列
private final BlockingQueue<Request> queue = new ArrayBlockingQueue<>(5);
// 客户端发送请求
void send(Request request) throws InterruptedException {
// 将消息存放至队列中
queue.put(request);
// 创建 Guarded Suspension 模式的对象
GuardedObject<Request> guardedObject = GuardedObject.create(request.id);
// 循环等待,获取结果
Request res = guardedObject.get(Objects::nonNull);
}
// 服务端处理请求
void handle() throws InterruptedException {
// 从队列中获取请求
Request request = queue.take();
// 调用请求对应的 GuardedObject,并处理请求
GuardedObject.handleRequest(request.id, request);
}
// 请求类
private static class Request{
private int id;
private String content;
}
}
需要注意的是,这里并不是直接使用 new GuardedObject()
的方式来创建对象,这是因为需要找到每个请求和对象之间的对应关系,所以 GuardedObject 内部使用了一个 map 来保存对象,key 是对应的请求 id。
GuardedObject 类代码如下:
public class GuardedObject<T> {
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition finished = lock.newCondition();
private static final ConcurrentHashMap<Integer, GuardedObject> map = new ConcurrentHashMap<>();
// 创建对象
public static GuardedObject create(int id){GuardedObject guardedObject = new GuardedObject();
// 保存对象和请求的对应关系
map.put(id, guardedObject);
return guardedObject;
}
// 处理请求
public static void handleRequest(int id, Object obj){GuardedObject guardedObject = map.remove(id);
if (guardedObject != null){
// 具体的处理逻辑省略
// 处理完后通知
guardedObject.notice(obj);
}
}
// 调用方线程获取结果
T get(Predicate<T> p){lock.lock();
try {while (!p.test(obj)){finished.await();
}
} catch (InterruptedException e) {e.printStackTrace();
}
finally {lock.unlock();
}
return obj;
}
// 执行完后通知
void notice(T obj){lock.lock();
try {
this.obj = obj;
finished.signalAll();}
finally {lock.unlock();
}
}
}
四、Balking 模式
Balking 模式的典型应用场景是,业务逻辑依赖于某个条件变量的状态,因此这种模式又可以理解为多线程版本的 if。
public class BalkingTest {
private boolean flag = false;
public void execute(){if (!flag){return;}
// 具体的执行操作省略
flag = false;
}
public void test(){
// 省略业务代码若干
flag = true;
}
}
例如上面这个例子,一段业务逻辑会改变 flag 的值,另一个方法会根据 flag 的值来决定是否继续执行。
这个程序并不是线程安全的,解决的办法也很简单,就是加互斥锁,然后可以将改变 flag 值的逻辑单独拿出来,如下:
public class BalkingTest {
private boolean flag = false;
public synchronized void execute(){if (!flag){return;}
// 具体的执行操作省略
flag = false;
}
public void test(){
// 省略业务代码若干
change();}
public synchronized void change(){flag = true;}
}
Balking 模式一般可以使用互斥锁来实现,并且可以将对条件变量的改变的逻辑和业务逻辑进行分离,这样能够减小锁的粒度,提升性能。Balking 模式大多应用于需要快速失败的场景,即当条件变量不满足,则直接失败。这也是它和 Guarded Suspension 模式的区别,因为 Guarded Suspension 模式在条件不满足的时候,会一直等待条件满足。
五、Worker – Thread 模式
Worker Thread 模式,对应到现实世界,类似工厂中的工人做任务,当有任务的时候,工人取出任务执行。
解决的办法是使用线程池,并且使用一个阻塞队列来存储任务,线程池中的线程从队列中取出任务执行。线程池的使用需要注意几点:
- 任务队列尽量使用有界队列,避免任务过多造成 OOM。
- 应该明确指定拒绝策略,可以根据实际情况实现 RejectedExecutionHandler 接口自定义拒绝策略。
- 应该给线程指定一个有意义的名字,最好和业务相关。
- 为不同的任务创建不同的线程池,这样能够有效的避免死锁问题。
六、Two – Phase Termination 模式
1. 两阶段终止概念
Two – Phase Termination,即两阶段终止,主要是为解决如何正确的终止一个线程,这里说的是一个线程终止另一个线程,而不是线程终止自己。
Java 中的线程提供了一个 stop() 方法用来终止线程,这不过这个方法会直接将线程杀死,风险太高,并且这个方法已经被标记为废弃,不建议使用了。
两阶段终止,即将线程的结束分为了两个阶段,第一个阶段是一个线程 T1 向另一个线程 T2 发送终止指令,第二个阶段是线程 T2 响应终止指令。
根据 Java 的线程状态,线程如果要进入 TERMINATED 状态则必须先进入 RUNNABLE 状态,而处于 RUNNABLE 状态的线程有可能转换到休眠状态。
Java 的线程提供了 interrupt() 方法,这个方法的作用便是将线程的状态从休眠状态转换到 RUNNABLE 状态。
切换到 RUNNABLE 状态之后,线程有两种方式可以终止,一是执行完 run() 方法,自动进入终止状态;二是设置一个标志,线程如果检测到这个标志,则退出 run() 方法,这就是两阶段终止的响应终止指令。
2. 程序示例
下面是一个简单的使用 interrupt() 方法和中断标志位来终止线程的示例:
public class Test {public static void main(String[] args) {Thread thread = new Thread(() -> {
// 检测到中断则退出
while (!Thread.currentThread().isInterrupted()){
try {Thread.sleep(3000);
} catch (InterruptedException e) {e.printStackTrace();
// 重新设置中断标志
Thread.currentThread().interrupt();
}
System.out.println("I am roseduan");
}
});
thread.start();
thread.interrupt();}
}
程序要每隔三秒打印语句,但是线程启动之后就直接调用了 interrupt() 方法,所以线程直接退出了。需要注意的是这里在捕获异常之后,需要重新设置线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。
在实际的生产中,并不推荐使用这种方式,因为在 Thread 内部可能会调用其他的方法,而其他的方法并不能够保证正确的处理了线程中断,解决的办法便是自定义一个线程的中断标志,如下所示:
public class Test {
// 自定义中断标志
private volatile boolean isTerminated = false;
private Thread thread;
public synchronized void start(){thread = new Thread(() -> {
// 检测到中断则退出
while (!isTerminated) {
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
// 重新设置中断状态
Thread.currentThread().interrupt();
}
System.out.println("I am roseduan");
}
isTerminated = false;
});
thread.start();}
// 线程终止方法
public synchronized void stop(){
isTerminated = true;
thread.interrupt();}
}
3. 终止线程池
Java 中并不太会显式的创建和终止一个线程,使用更多的是线程池。
Java 中的线程池提供了两个方法来终止,分别是 shutdown() 和 shutdownNow(),两个方法的区别如下:
- shutdown():拒绝新的任务,等待正在执行的和已经在阻塞队列中的任务执行完后,再关闭线程池
- shutdownNow():直接关闭线程池,拒绝新的任务,并且中断正在执行的任务,已经在阻塞队列中的任务也不会被执行了。
七、Producer – Consumer 模式
这是较为常用的生产者 – 消费者模式,Java 中的线程池就使用了这种模式,线程的使用方是生产者,提供任务,线程池本身是消费者,取出并执行任务。
生产者 – 消费者模式使用了一个任务队列,生产者将任务添加到队列中,消费者从队列中取出任务执行。
这样的设计的目的有三个:
- 解耦,生产者和消费者之间没有直接的关联,而是通过队列进行通信。
- 其次可以实现异步,例如生产者可以不用管消费者的行为,直接将任务添加到队列中。消费者也可以不在乎生产者,直接从队列中取任务。
- 最后,可以平衡生产者和消费者之间的速度差异。
下面是一个简单的生产者 – 消费者程序示例:
public class ProducerConsumerTest {private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);
public void produce() {queue.add(new Task());
}
public void consume() {Task task = queue.poll();
while (task != null){task.execute();
task = queue.poll();}
System.out.println("没有任务了");
}
public static void main(String[] args) throws InterruptedException {Test test = new Test();
// 生产者线程,创建 10 个任务
Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {test.produce();
}
});
producer.start();
producer.join();
// 消费者线程
Thread consumer = new Thread(test::consume);
consumer.start();}
}
class Task{public void execute(){System.out.println("执行任务");
}
}