Java-多线程中使用-JDK-自带工具类实现计数器

40次阅读

共计 5185 个字符,预计需要花费 13 分钟才能阅读完成。

前言

在实际开发过程中,经常遇到需要多线程并行的业务,最后需要进行将各个线程完成的任务进行汇总,但主线程一般会早于子线程结束,如果要想等各个子线程完成后再继续运行主线程,这时就需要对各个线程是否执行完成进行标识,JDK 并发包中就给开发者提供了几个不错的使用工具类。

接下来将通过 Thread#join 方法以及 CountDownLatch、CyclicBarrier 类进行上面案例方案的分析。

Thread#join 方法

使用 join() 方法的子线程对象正常执行 run() 中代码,但当前线程会被无超时阻塞,等待执行 join() 方法的线程销毁后,继续执行被阻塞的当前线程。join() 方法阻塞原理是该方法内使用 wait() 方法阻塞,源码如下所示:

子线程 join() 完成时会调用 notifyAll() 来通知当前线程继续执行接下来的代码。

假如现在有两个线程产生数据结果,最后将两个线程结果进行相加,如果直接将两个线程执行并进行汇总,如下实现代码:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 * 
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {public static void main(String[] args) throws InterruptedException {Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果:

由于主线程的汇总计算可能早于子线程完成,所以这时获取子线程结果为空指针异常。

通过增加 join() 方法实现阻塞主线程,等待子线程完成后再进行汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * htpps://ytao.top
 * 
 * Created by YangTao on 2020/5/17 0017.
 */
public class JoinTest {public static void main(String[] args) throws InterruptedException {Map<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {map.put("thread1", 1);
            System.out.println("run thread1");
        });

        Thread thread2 = new Thread(() -> {map.put("thread2", 2);
            System.out.println("run thread2");
        });


        thread1.start();
        thread2.start();
        
        // 两个线程分别调用 join() 方法,使主线程被阻塞
        thread1.join();
        thread2.join();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end....");

    }
}

执行结果为:

通过结果可以看到子线程汇总求和为 3。此时主线程在两个子线程销毁前都处于等待状态,直至两个销毁后主线程再执行汇总求和,所以两个线程产生的值都已存在。

同时,子线程 join() 方法可以使当前线程无期限等待,也可以设置最长等待时长 join(long) 方法,无论子线程是否执行完成,当前线程会继续执行后面代码。使用方法加入超时参数即可,其它与 join() 方法使用相同。

CountDownLatch

CountDownLatch 可以使一个或多个线程等待其他线程完成操作后再继续执行当前线程后面代码。

CountDownLatch 的使用:首先创建 CountDownLatch 对象,通过传入参数 int 构造 CountDownLatch 对象。该参数是值将要等待的执行点的数量。

CountDownLatch 中有几个方法:

  • getCount() 返回当前计数器数,即当前剩余的等待数量。官方解释说该方法通常用于调试和测试目的。
  • countDown 每调用一次,计数器便会进行减 1 操作,但计数器必须大于 0。
  • await 该方法会阻塞当前线程,直至计数器为 0 时,就会不再阻塞当前线程。同时也提供 await(long timeout, TimeUnit unit) 方法,可设置超时时间。

利用 CountDownLatch 实现汇总求和案例,实现代码如下:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {Map<String, Integer> map = new ConcurrentHashMap<>();

        CountDownLatch count = new CountDownLatch(2);

        Thread thread1 = new Thread(() -> {map.put("thread1", 1);
            System.out.println("run thread1");
            count.countDown();});

        Thread thread2 = new Thread(() -> {map.put("thread2", 2);
            System.out.println("run thread2");
            count.countDown();});


        thread1.start();
        thread2.start();

        // 一直阻塞当前线程,直至计数器为 0
        count.await();

        System.out.println(map.get("thread1") + map.get("thread2"));

        System.out.println("end.... getCount:" + count.getCount());
    }

}

执行结果如下:

上图中求和结果为 3,同时计数器为 0。

通过查看 CountDownLatch 源码,主要是通过一个继承 AbstractQueuedSynchronizer 类的内部类 Sync 来实现的,可知其实现原理为 AQS,这里不进行展开讲述。

CyclicBarrier

CyclicBarrier 是一个可循环使用的屏障。实现原理解释,就是在一个或多个线程运行中设置一个屏障,线程到达这个屏障时会被阻塞,直到最后一个线程到达时,被屏障阻塞的线程继续执行。

CyclicBarrier 构造方法有两个,CyclicBarrier(int count)CyclicBarrier(int count, Runnable barrierAction):

  • 单个 int 参数构造方法,表示构造到达屏障线程的数量。
  • 一个 int 和一个 Runnable 参数构造方法,前者参数表示到达屏障线程的数量,后者参数表示所有线程到达屏障后接下来要执行的代码;

CyclicBarrier 中方法:

方法 说明
await() 阻塞前线程,等待 trip.signal() 或 trip.signalAll() 方法唤醒
await(long, TimeUnit) 在 await() 上增加两个参数,等待超时时间 timeout,单位为 unit
breakBarrier() 放开屏障,设置标志,唤醒被屏障阻塞的线程
isBroken() 阻塞的线程是否被中断
reset() 重置 CyclicBarrier 对象
getNumberWaiting() 当前被阻塞线程的数量
getParties() 到达屏障的线程总数量,即创建时指定的数量

使用 CyclicBarrier 实现上面汇总:

package top.ytao.demo.thread.count;

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * https://ytao.top
 *
 * Created by YangTao on 2020/5/17 0017.
 */
public class CyclicBarrierTest {public static void main(String[] args) throws BrokenBarrierException, InterruptedException {Map<String, Integer> map = new ConcurrentHashMap<>();

        CyclicBarrier barrier = new CyclicBarrier(2, new Thread(()->{
            // 所有线程到达屏障后,需要执行的代码
            System.out.println(map.get("thread1") + map.get("thread2"));
            System.out.println("CyclicBarrier end....");
        }));

        Thread thread1 = new Thread(() -> {map.put("thread1", 1);
            System.out.println("run thread1");
            try {barrier.await();
            } catch (InterruptedException e) {e.printStackTrace();
            } catch (BrokenBarrierException e) {e.printStackTrace();
            }

        });

        Thread thread2 = new Thread(() -> {map.put("thread2", 2);
            System.out.println("run thread2");
            try {barrier.await();
            } catch (InterruptedException e) {e.printStackTrace();
            } catch (BrokenBarrierException e) {e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();}
}

执行结果:

执行完两条子线程,并且在子线程里调用 barrier.await() 后,屏障被打开,最后执行 CyclicBarrier 的最后的代码逻辑。

通过上面 CyclicBarrier 的方法可知,CyclicBarrier 比 CountDownLatch 使用更加灵活,CyclicBarrier 的 reset() 方法可以重置计数器,而 CountDownLatch 则只能使用一次。同时,CyclicBarrier 拥有更多线程阻塞信息的方法提供使用,在使用过程中,提供更加灵活的使用方式。

总结

上面三种方式,均由 JDK 的并发包中提供的工具。在多线程协作任务中,对计数器场景问题的解决方案,实现 main 线程对 worker 线程的等待完成。在实际开发应用中,使用频率也是非常之高。

推荐阅读

Java 线程基础,从这篇开始

Java 线程通信之 wait/notify 机制

你必须会的 JDK 动态代理和 CGLIB 动态代理

Dubbo 扩展点加载机制:从 Java SPI 到 Dubbo SPI

Netty 中粘包 / 拆包处理

正文完
 0