关于thread:线程池-ThreadPoolExecutor-详解

一 为什么要应用线程池对于操作系统而言,创立一个线程的代价是非常低廉的, 须要给它分配内存、列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要从新从内存中读取信息,毁坏了数据的局部性。因而在并发编程中,当线程创立过多时,会影响程序性能,甚至引起程序解体。而线程池属于池化管理模式,具备以下长处: 升高资源耗费:通过反复利用已创立的线程升高线程创立和销毁造成的性能耗费。进步响应速度:当工作达到时,工作能够不须要等到线程创立就能立刻执行。进步线程的可管理性:可能对线程进行统一分配、调优和监控。 二 线程池原理详解2.1 线程池外围组成线程池蕴含 3 个外围局部: 线程汇合:外围线程和工作线程阻塞队列:用于待执行工作排队回绝策略处理器:阻塞队列满后,对工作解决进行 2.2 Execute 原理当一个新工作提交至线程池之后,线程池的解决流程如下: 首先判断以后运行的线程数量是否小于 corePoolSize。如果是,则创立一个工作线程来执行工作;如果都在执行工作,则进入步骤 2。判断 BlockingQueue 是否曾经满了,若没满,则将工作放入 BlockingQueue;若满了,则进入步骤 3。判断以后运行的总线程数量是否小于 maximumPoolSize,如果是则创立一个新的工作线程来执行工作。否则交给 RejectedExecutionHandler 来解决工作。 当 ThreadPoolExecutor 创立新线程时,通过 CAS 来更新线程池的状态 ctl。 三 线程池的应用线程池的应用次要分为以下三个步骤: 3.1 创立线程池3.1.1 自定义线程池线程池的真正实现类是 ThreadPoolExecutor,其构造方法有如下 4 种:public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,         Executors.defaultThreadFactory(), defaultHandler);} public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,         threadFactory, defaultHandler);} public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          RejectedExecutionHandler handler) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,         Executors.defaultThreadFactory(), handler);} public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}复制代码上面具体来看构造函数须要传入的重点参数: ...

July 29, 2022 · 4 min · jiezi

关于thread:C线程池

title: C线程池categories: [C++]tags:[编程语言]date: 2021/06/28 <div align = 'right'>作者:hackett</div> <div align = 'right'>微信公众号:加班猿</div> C线程池1、筹备工作查看线程相干接口函数:线程创立int pthread_create(pthread_t thread, const pthread_attr_t attr,void (start_routine) (void ), void arg); 参数阐明: 1.参数thread指向寄存新创建线程的线程ID的地址 2.attr参数用于定制各种不同的线程属性,暂能够把它设置为NULL,以创立默认属性的线程。 3.start_routine是个函数指针,该函数返回类型是void,同时形式参数也是void。新创建的线程从start_routine函数的地址开始运行。该函数只有一个无类型指针参数arg.如果须要向start_routine函数传递的参数不止一个,那么须要把这些参数放到一个构造中,而后把这个构造的地址作为arg参数传入。 返回值: 线程创立胜利返回0,失败返回其余数值 线程退出void pthread_exit(void *retval); 参数阐明: retval是一个无类型指针,过程中的其余线程能够通过调用pthread_join函数拜访到这个指针。 线程期待int pthread_join(pthread_t thread, void **retval); 参数阐明: 调用这个函数的线程将始终阻塞,直到指定的线程调用pthread_exit. 如果对线程的返回值不感兴趣,能够把retval置为NULL。在这种状况下,调用pthread_join函数将期待指定的线程终止,但并不取得线程的终止状态。 线程勾销int pthread_cancel(pthread_t thread); 参数阐明: thread为线程的id 设置线程的cancle信号int pthread_setcancelstate(int state, int *oldstate) ; PTHREAD_CANCEL_ENABLE:线程可勾销。这是所有新线程的默认勾销状态,包含初始线程。线程的可勾销类型决定了可勾销线程何时响应勾销申请。 PTHREAD_CANCEL_DISABLE:线程不可勾销。如果收到一个勾销申请,它将被阻塞,直到可勾销启用。 清理线程void pthread_cleanup_push(void (*rtn)(void *), void *arg); 参数阐明: void(*rtn)(void *):线程清理函数 arg传递的参数 激活所有期待线程pthread_cond_broadcast(pthread_cond_t *cond); 查看互斥锁相干接口函数:创立互斥锁int pthread_mutex_init(pthread_mutex_t restrict mutex,const pthread_mutexattr_t restrict attr); ...

June 30, 2021 · 3 min · jiezi

关于thread:springboot中使用线程池实现异步调用

1.什么是线程池?线程池是一种多线程解决模式,解决过程中将工作增加到队列,而后在创立线程后主动启动这些工作。线程池线程都是后盾线程。每个线程都应用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中闲暇(如正在期待某个事件),则线程池将插入另一个辅助线程来使所有处理器放弃忙碌。如果所有线程池线程都始终保持忙碌,但队列中蕴含挂起的工作,则线程池将在一段时间后创立另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程能够排队,但他们要等到其余线程实现后才启动。 2.常见的线程池2.1 newCachedThreadPool创立一个可缓存线程池,如果线程池长度超过解决须要,可灵便回收闲暇线程,若无可回收,则新建线程。这种类型的线程池特点是:工作线程的创立数量简直没有限度(其实也有限度的,数目为Interger. MAX_VALUE), 这样可灵便的往线程池中增加线程。如果长时间没有往线程池中提交工作,即如果工作线程闲暇了指定的工夫(默认为1分钟),则该工作线程将主动终止。终止后,如果你又提交了新的工作,则线程池从新创立一个工作线程。在应用CachedThreadPool时,肯定要留神管制工作的数量,否则,因为大量线程同时运行,很有会造成零碎瘫痪。 2.2 newFixedThreadPool创立一个指定工作线程数量的线程池。每当提交一个工作就创立一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的工作存入到池队列中。FixedThreadPool是一个典型且优良的线程池,它具备线程池进步程序效率和节俭创立线程时所耗的开销的长处。然而,在线程池闲暇时,即线程池中没有可运行工作时,它不会开释工作线程,还会占用肯定的系统资源。 2.3 newSingleThreadExecutor创立一个单线程化的Executor,即只创立惟一的工作者线程来执行工作,它只会用惟一的工作线程来执行工作,保障所有工作依照指定程序(FIFO, LIFO, 优先级)执行。如果这个线程异样完结,会有另一个取代它,保障程序执行。单工作线程最大的特点是可保障程序地执行各个工作,并且在任意给定的工夫不会有多个线程是流动的。 2.4 newScheduleThreadPool创立一个定长的线程池,而且反对定时的以及周期性的工作执行,反对定时及周期性工作执行。 3.5 下图可见newScheduleThreadPool线程池是ThreadPoolExecutor的子类,所以本章将围绕ThreadPoolExecutor线程池去做详解。 ThreadPoolExecutor详解进入ThreadPoolExecutor类中,能够看到,创立这个线程池,有四种形式,且全是有参结构。 查看全参结构corePoolSize:外围线程池大小maximumPoolSize:容许线程池同时并行的线程数量keepAliveTime:当线程数大于内核数时,这是多余的闲暇线程将在终止之前期待新工作的最长工夫unit:TimeUnit类型,这没什么好说workQueue:在执行工作之前用于保留工作的队列,此队列将仅保留execute办法提交的Runnable工作。threadFactory:执行程序创立新线程时要应用的工厂handler:当线期待队列中的数量超过既定容量,所须要解决策略 综上所述,咱们做异步的话,不须要回绝策略,所以,咱们抉择没有回绝策略参数的构造方法去创立。 1.初始化线程池@Componentpublic class ThreadPoolStarter { @Value("${thread-pool.corePoolSize:2}") private Integer corePoolSize; @Value("${thread-pool.maxPoolSize:3}") private Integer maximumPoolSize; @Value("${thread-pool.queueSize:10}") private Integer queueSize; @Value("${thread-pool.keepAliveTime:600}") private Long keepAliveTime; private TimeUnit unit = TimeUnit.SECONDS; @Bean("workerPool") public ThreadPoolExecutor threadPool() { // 线程工厂 ThreadFactory threadFactory = new CustomizableThreadFactory("worker-pool-"); // 初始化线程池 return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(queueSize), threadFactory); }}2.创立线程类实现runnable接口/** * @description * @author: yzr * @date: 2021-06-02 11:12 **/public class ThreadRunner implements Runnable { @Override public void run() { try { log.info("我要睡眠了"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("我的业务办法"); }}3.测试,注入线程池,并把线程交由线程池去解决。@SpringBootTest(classes = CasbinApplication.class)@Slf4jpublic class SpringBootTestTest { @Autowired private ThreadPoolExecutor threadPoolExecutor; @Autowired ThreadRunner threadRunner; @Test public void test() { log.info("开始了"); threadPoolExecutor.execute(threadRunner); log.info("完结了"); }}4.执行并查看后果由此可见咱们的异步调用胜利了 ...

June 2, 2021 · 1 min · jiezi

关于thread:4种解决线程安全问题的方式

前言线程平安问题,在做高并发的零碎的时候,是程序员常常须要思考的中央。怎么无效的避免线程平安问题,保证数据的准确性?怎么正当的最大化的利用系统资源等,这些问题都须要充沛的了解并运行线程。当然对于多线程的问题在面试的时候也是呈现频率比拟高的。上面就来学习一下吧! 线程先来看看什么是过程和线程? 过程是资源(CPU、内存等)调配的根本单位,它是程序执行时的一个实例。程序运行时零碎就会创立一个过程,并为它分配资源,而后把该过程放入过程就绪队列,过程调度器选中它的时候就会为它调配CPU工夫,程序开始真正运行。就比如说,咱们开发的一个单体我的项目,运行它,就会产生一个过程。 线程是程序执行时的最小单位,它是过程的一个执行流,是CPU调度和分派的根本单位,一个过程能够由很多个线程组成,线程间共享过程的所有资源,每个线程有本人的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就容许多个线程同时运行。同样多线程也能够实现并发操作,每个申请调配一个线程来解决。在这里强调一点就是:计算机中的线程和应用程序中的线程不是同一个概念。 总之一句话形容就是:过程是资源分配的最小单位,线程是程序执行的最小单位。 什么是线程平安什么是线程平安呢?什么样的状况会造成线程平安问题呢?怎么解决线程平安呢?这些问题都是在下文中所要讲述的。 线程平安:当多个线程拜访一个对象时,如果不必思考这些线程在运行时环境下的调度和交替执行,也不须要进行额定的同步,或者在调用方进行任何其余的协调操作,调用这个对象的行为都能够取得正确的后果,那这个对象就是线程平安的。 那什么时候会造成线程平安问题呢?当多个线程同时去拜访一个对象时,就可能会呈现线程平安问题。那么怎么解决呢?请往下看! 解决线程平安在这里提供4种办法来解决线程平安问题,也是最罕用的4种办法。前提是我的项目在一个服务器中,如果是分布式我的项目可能就会用到散布锁了,这个就放到前面文章来详谈了。 讲4种办法前,还是先来理解一下乐观锁和乐观锁吧! 乐观锁,顾名思义它是乐观的。讲得艰深点就是,认为本人在应用数据的时候,肯定有别的线程来批改数据,因而在获取数据的时候先加锁,确保数据不会被线程批改。形象了解就是总感觉有刁民想害朕。 而乐观锁就比拟乐观了,认为在应用数据时,不会有别的线程来批改数据,就不会加锁,只是在更新数据的时候去判断之前有没有别的线程来更新了数据。具体用法在上面解说。 当初来看有那4种办法吧! 办法一:应用synchronized关键字,一个体现为原生语法层面的互斥锁,它是一种乐观锁,应用它的时候咱们个别须要一个监听对象 并且监听对象必须是惟一的,通常就是以后类的字节码对象。它是JVM级别的,不会造成死锁的状况。应用synchronized能够拿来润饰类,静态方法,一般办法和代码块。比方:Hashtable类就是应用synchronized来润饰办法的。put办法局部源码: public synchronized V put(K key, V value) { // Make sure the value is not null if (value == null) { throw new NullPointerException(); } 而ConcurrentHashMap类中就是应用synchronized来锁代码块的。putVal办法局部源码: else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1;synchronized关键字底层实现次要是通过monitorenter 与monitorexit计数 ,如果计数器不为0,阐明资源被占用,其余线程就不能拜访了,然而可重入的除外。说到这,就来讲讲什么是可重入的。这里其实就是指的可重入锁:指的是同一线程外层函数取得锁之后,内层递归函数依然有获取该锁的代码,但不受影响,执行对象中所有同步办法不必再次取得锁。防止了频繁的持有开释操作,这样既晋升了效率,又防止了死锁。 ...

November 29, 2020 · 2 min · jiezi

关于thread:第三阶段-Day20-购物车模块实现-添加拦截器-添加用户权限校检-实现订单模块

购物车删除操作=========== 1.1 页面剖析 1.2 编辑CartController `/** * 购物车删除操作 * url地址: http://www.jt.com/cart/delete/562379.html * 参数: 获取itemId * 返回值: 重定向到购物车的展示页面 */ @RequestMapping("/delete/{itemId}") public String deleteCarts(@PathVariable Long itemId){ Long userId = 7L; cartService.deleteCarts(userId,itemId); return "redirect:/cart/show.html"; }` 1.3 编辑CartService `@Override public void deleteCarts(Long userId, Long itemId) { QueryWrapper<Cart> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("user_id", userId); queryWrapper.eq("item_id", itemId); cartMapper.delete(queryWrapper); }` 京淘权限实现========== 2.1 业务需要当用户进行敏感操作时,必须要求用户先登录之后才能够拜访后端服务器. 例如京东商城…应用技术:1.AOP2.拦截器 :拦挡用户的申请 2.2 定义京淘拦截器2.2.1 SpringMVC调用原理图2.2.2 SpringMVC拦截器工作原理 2.2.3 配置拦截器`@Component //spring容器治理对象public class UserInterceptor implements HandlerInterceptor { @Autowired private JedisCluster jedisCluster; //Spring版本升级 4 必须实现所有的办法 spring 5 只须要重写指定的办法即可. /** * 需要: 拦挡/cart结尾的所有的申请进行拦挡.,并且校验用户是否登录..... * 拦截器抉择: preHandler * 如何判断用户是否登录: 1.查看cookie信息 2.查看Redis中是否有记录. * true : 申请应该放行 * false: 申请应该拦挡 则配合重定向的语法实现页面跳转到登录页面 使得程序流转起来 */ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.判断用户是否登录 查看cookie是否有值 String ticket = CookieUtil.getCookieValue(request,"JT_TICKET"); //2.校验ticket if(!StringUtils.isEmpty(ticket)){ //3.判断redis中是否有值. if(jedisCluster.exists(ticket)){ //4.动静获取json信息 String userJSON = jedisCluster.get(ticket); User user = ObjectMapperUtil.toObj(userJSON,User.class); request.setAttribute("JT_USER",user); return true; } } response.sendRedirect("/user/login.html"); return false; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { //销毁数据 request.removeAttribute("JT_USER"); }}` 2.2.4 动静获取UserId ...

November 9, 2020 · 4 min · jiezi

Java并发22并发设计模式-ThreadPerMessage-与-Worker-Thread-模式

我们曾经把并发编程领域的问题总结为三个核心问题:分工、同步和互斥。其中,同步和互斥相关问题更多地源自微观,而分工问题则是源自宏观。我们解决问题,往往都是从宏观入手,同样,解决并发编程问题,首要问题也是解决宏观的分工问题。 并发编程领域里,解决分工问题也有一系列的设计模式,比较常用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生产者 - 消费者模式等等。今天我们重点介绍 Thread-Per-Message 模式。 如何理解 Thread-Per-Message 模式比如写一个 HTTP Server,很显然只能在主线程中接收请求,而不能处理 HTTP 请求,因为如果在主线程中处理 HTTP 请求的话,那同一时间只能处理一个请求,太慢了!怎么办呢?可以利用代办的思路,创建一个子线程,委托子线程去处理 HTTP 请求。 这种委托他人办理的方式,在并发编程领域被总结为一种设计模式,叫做Thread-Per-Message 模式,简言之就是为每个任务分配一个独立的线程。这是一种最简单的分工方法,实现起来也非常简单。 用 Thread 实现 Thread-Per-Message 模式Thread-Per-Message 模式的一个最经典的应用场景是网络编程里服务端的实现,服务端为每个客户端请求创建一个独立的线程,当线程处理完请求后,自动销毁,这是一种最简单的并发处理网络请求的方法。 下面我们就以 echo 程序的服务端为例,介绍如何实现 Thread-Per-Message 模式。 final ServerSocketChannel ssc = ServerSocketChannel.open().bind( new InetSocketAddress(8080));// 处理请求 try { while (true) { // 接收请求 SocketChannel sc = ssc.accept(); // 每个请求都创建一个线程 new Thread(()->{ try { // 读 Socket ByteBuffer rb = ByteBuffer .allocateDirect(1024); sc.read(rb); // 模拟处理请求 Thread.sleep(2000); // 写 Socket ByteBuffer wb = (ByteBuffer)rb.flip(); sc.write(wb); // 关闭 Socket sc.close(); }catch(Exception e){ throw new UncheckedIOException(e); } }).start(); }} finally { ssc.close();} 如果你熟悉网络编程,相信你一定会提出一个很尖锐的问题:上面这个 echo 服务的实现方案是不具备可行性的。原因在于 Java 中的线程是一个重量级的对象,创建成本很高,一方面创建线程比较耗时,另一方面线程占用的内存也比较大。所以,为每个请求创建一个新的线程并不适合高并发场景。 ...

July 12, 2019 · 2 min · jiezi

What一个-Dubbo-服务启动要两个小时

前言前几天在测试环境碰到一个非常奇怪的与 dubbo 相关的问题,事后我在网上搜索了一圈并没有发现类似的帖子或文章,于是便有了这篇。 希望对还未碰到或正在碰到的朋友有所帮助。 现象现象是这样的,有一天测试在测试环境重新部署一个 dubbo 应用的时候发现应用“启动不起来”。 但过几个小时候之后又能自己慢慢恢复,并能够对外提供 dubbo 服务。 但其实经过我后续排查发现刚开始其实并不是启动不起来,而是启动速度非常缓慢,所以当应用长时间启动后才会对外提供服务。 而这个速度慢到居然要花费 2 个小时。 导致的一个结果是测试完全不敢在测试环境发版验证了,每验证一个功能修复一个 bug 就得等上两个小时,这谁受得了????。 而且经过多次观察,确实每次都是花费两小时左右应用才能启动起来。尝试解决最后测试顶不住了,只能让我这个“事故报告撰写专家”来看看。 当我得知这个问题的现象时其实完全没当一回事: 都不用想,这不就是主线程阻塞了嘛,先看看是否在初始化的时候数据库、Zookeeper 之类的连不上导致阻塞了-------来之多次事故处理的经验告诉我。于是我把这事打回给测试让他先找运维排查下,不到万不得已不要影响我 Touch fish????。 第二天一早看到测试同学的微信头像跳动时我都已经准备接受又一句 “膜拜大佬????” 的回复时,却收到 “网络一切正常,没人动过,再不解决就要罢工了????”。 好吧,忽悠不过去了。 首先这类问题的排查方向应该不会错,就是主线程阻塞了,至于是啥导致的阻塞就不能像之前那样瞎猜了。 我将应用重启后用 jstack pid 将线程快照打印到终端,直接拉到最后看看 main 线程到底在干啥。 前几次的快照都是很正常: 加载 Spring ---->连接 Zookeeper ---> 连接 Redis,都是依次执行下来没有阻塞。 隔了一段后应用确实还没起来,我再次 jstack 后得到如下信息: 翻源码我一直等了十几分钟再多次 jstack 得到的快照得到的信息都是一样的。 如图所示可见主线程是卡在了 dubbo 的某个方法 ServiceConfig.java 的 303 行中。 于是我找到此处的源码: 简单来说这里的逻辑就是要获取本机的 IP 将其注册到 Zookeeper 中用于其他服务调用。 ...

July 5, 2019 · 1 min · jiezi

一个线程罢工的诡异事件

背景事情(事故)是这样的,突然收到报警,线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。虽然是前人写的代码,但作为 Bug maker&killer 只能咬着牙上了。<!–more–>因为之前没有接触过出问题这块的逻辑,所以简单理了下如图:有一个生产线程一直源源不断的往队列写数据。消费线程也一直不停的取出数据后写入后续的业务线程池。业务线程池里的线程会对每个任务进行入库操作。整个过程还是比较清晰的,就是一个典型的生产者消费者模型。尝试定位接下来便是尝试定位这个问题,首先例行检查了以下几项:是否内存有内存溢出?应用 GC 是否有异常?通过日志以及监控发现以上两项都是正常的。紧接着便 dump 了线程快照查看业务线程池中的线程都在干啥。结果发现所有业务线程池都处于 waiting 状态,队列也是空的。同时生产者使用的队列却已经满了,没有任何消费迹象。结合上面的流程图不难发现应该是消费队列的 Consumer 出问题了,导致上游的队列不能消费,下有的业务线程池没事可做。review 代码于是查看了消费代码的业务逻辑,同时也发现消费线程是一个单线程。结合之前的线程快照,我发现这个消费线程也是处于 waiting 状态,和后面的业务线程池一模一样。他做的事情基本上就是对消息解析,之后丢到后面的业务线程池中,没有发现什么特别的地方。但是由于里面的分支特别多(switch case),看着有点头疼;所以我与写这个业务代码的同学沟通后他告诉我确实也只是入口处解析了一下数据,后续所有的业务逻辑都是丢到线程池中处理的,于是我便带着这个前提去排查了(埋下了伏笔)。因为这里消费的队列其实是一个 disruptor 队列;它和我们常用的 BlockQueue 不太一样,不是由开发者自定义一个消费逻辑进行处理的;而是在初始化队列时直接丢一个线程池进去,它会在内部使用这个线程池进行消费,同时回调一个方法,在这个方法里我们写自己的消费逻辑。所以对于开发者而言,这个消费逻辑其实是一个黑盒。于是在我反复 review 了消费代码中的数据解析逻辑发现不太可能出现问题后,便开始疯狂怀疑是不是 disruptor 自身的问题导致这个消费线程罢工了。再翻了一阵 disruptor 的源码后依旧没发现什么问题后我咨询对 disruptor 较熟的@咖啡拿铁,在他的帮助下在本地模拟出来和生产一样的情况。本地模拟本地也是创建了一个单线程的线程池,分别执行了两个任务。第一个任务没啥好说的,就是简单的打印。第二个任务会对一个数进行累加,加到 10 之后就抛出一个未捕获的异常。接着我们来运行一下。发现当任务中抛出一个没有捕获的异常时,线程池中的线程就会处于 waiting 状态,同时所有的堆栈都和生产相符。细心的朋友会发现正常运行的线程名称和异常后处于 waiting 状态的线程名称是不一样的,这个后续分析。解决问题当加入异常捕获后又如何呢?程序肯定会正常运行。同时会发现所有的任务都是由一个线程完成的。虽说就是加了一行代码,但我们还是要搞清楚这里面的门门道道。源码分析于是只有直接 debug 线程池的源码最快了;通过刚才的异常堆栈我们进入到 ThreadPoolExecutor.java:1142 处。发现线程池已经帮我们做了异常捕获,但依然会往上抛。在 finally 块中会执行 processWorkerExit(w, completedAbruptly) 方法。看过之前《如何优雅的使用和理解线程池》的朋友应该还会有印象。线程池中的任务都会被包装为一个内部 Worker 对象执行。processWorkerExit 可以简单的理解为是把当前运行的线程销毁(workers.remove(w))、同时新增(addWorker())一个 Worker 对象接着处理;就像是哪个零件坏掉后重新换了一个新的接着工作,但是旧零件负责的任务就没有了。接下来看看 addWorker() 做了什么事情:只看这次比较关心的部分;添加成功后会直接执行他的 start() 的方法。由于 Worker 实现了 Runnable 接口,所以本质上就是调用了 runWorker() 方法。在 runWorker() 其实就是上文 ThreadPoolExecutor 抛出异常时的那个方法。它会从队列里一直不停的获取待执行的任务,也就是 getTask();在 getTask 也能看出它会一直从内置的队列取出任务。而一旦队列是空的,它就会 waiting 在 workQueue.take(),也就是我们从堆栈中发现的 1067 行代码。线程名字的变化上文还提到了异常后的线程名称发生了改变,其实在 addWorker() 方法中可以看到 new Worker()时就会重新命名线程的名称,默认就是把后缀的计数+1。这样一切都能解释得通了,真相只有一个:在单个线程的线程池中一但抛出了未被捕获的异常时,线程池会回收当前的线程并创建一个新的 Worker;它也会一直不断的从队列里获取任务来执行,但由于这是一个消费线程,根本没有生产者往里边丢任务,所以它会一直 waiting 在从队列里获取任务处,所以也就造成了线上的队列没有消费,业务线程池没有执行的问题。总结所以之后线上的那个问题加上异常捕获之后也变得正常了,但我还是有点纳闷的是:既然后续所有的任务都是在线程池中执行的,也就是纯异步了,那即便是出现异常也不会抛到消费线程中啊。这不是把我之前储备的知识点推翻了嘛?不信邪!之后我让运维给了加上异常捕获后的线上错误日志。结果发现在上文提到的众多 switch case 中,最后一个竟然是直接操作的数据库,导致一个非空字段报错了????!!这事也给我个教训,还是得眼见为实啊。虽然这个问题改动很小解决了,但复盘整个过程还是有许多需要改进的:消费队列的线程名称竟然和业务线程的前缀一样,导致我光找它就花了许多时间,命名必须得调整。开发规范,防御式编程大家需要养成习惯。未知的技术栈需要谨慎,比如 disruptor,之前的团队应该只是看了个高性能的介绍就直接使用,并没有深究其原理;导致出现问题后对它拿不准。实例代码:https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/thread/ThreadExceptionTest.java你的点赞与分享是对我最大的支持 ...

March 13, 2019 · 1 min · jiezi

线程池?面试?看这篇就够了!

<!– more –>本文原创地址,我的博客:https://jsbintask.cn/2019/03/10/jdk/jdk8-threadpool/(食用效果最佳),转载请注明出处!前言在实际工作中,线程是一个我们经常要打交道的角色,它可以帮我们灵活利用资源,提升程序运行效率。但是我们今天不是探讨线程!我们今天来聊聊另一个与线程息息相关的角色:线程池.本篇文章的目的就是全方位的解析线程池的作用,以及jdk中的接口,实现以及原理,另外对于某些重要概念,将从源码的角度探讨。tip:本文较长,建议先码后看。线程池介绍首先我们看一段创建线程并且运行的常用代码:for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println(“run thread->” + Thread.currentThread().getName()); //to do something, send email, message, io operator, network… }).start();}上面的代码很容易理解,我们为了异步,或者效率考虑,将某些耗时操作放入一个新线程去运行,但是这样的代码却存在这样的问题:创建销毁线程资源消耗; 我们使用线程的目的本是出于效率考虑,可以为了创建这些线程却消耗了额外的时间,资源,对于线程的销毁同样需要系统资源。cpu资源有限,上述代码创建线程过多,造成有的任务不能即时完成,响应时间过长。线程无法管理,无节制地创建线程对于有限的资源来说似乎成了“得不偿失”的一种作用。手动创建执行线程存在以上问题,而线程池就是用来解决这些问题的。怎么解决呢?我们可以先粗略的定义一下线程池:线程池是一组已经创建好的,一直在等待任务执行的线程的集合。因为线程池中线程是已经创建好的,所以对于任务的执行不会消耗掉额外的资源,线程池中线程个数由我们自定义添加,可相对于资源,资源任务做出调整,对于某些任务,如果线程池尚未执行,可手动取消,线程任务变得能够管理!所以,线程池的作用如下:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。提高线程的可管理性。jdk线程池详解上面我们已经知道了线程池的作用,而对于这样一个好用,重要的工具,jdk当然已经为我们提供了实现,这也是本篇文章的重点。在jdk中,关于线程池的接口,类都定义在juc(java.util.concurrent)包中,这是jdk专门为我们提供用于并发编程的包,当然,本篇文章我们只介绍与线程池有关的接口和类,首先我们看下重点要学习的接口和类:如图所示,我们将一一讲解这6个类的作用并且分析。Executor首先我们需要了解就是Executor接口,它有一个方法,定义如下:Executor自jdk1.5引入,这个接口只有一个方法execute声明,它的作用以及定义如下:接收一个任务(Runnable)并且执行。注意:同步执行还是异步执行均可! 由它的定义我们就知道,它是一个线程池最基本的作用。但是在实际使用中,我们常常使用的是另外一个功能更多的子类ExecutorService。ExecutorService这个接口继承自Executor,它的方法定义就丰富多了,可以关闭,提交Future任务,批量提交任务,获取执行结果等,我们一一讲解下每个方法作用声明:void shutdown(): “优雅地”关闭线程池,为什么是“优雅地”呢?因为这个线程池在关闭前会先等待线程池中已经有的任务执行完成,一般会配合方法awaitTermination一起使用,调用该方法后,线程池中不能再加入新的任务。List<Runnable> shutdownNow();: “尝试”终止正在执行的线程,返回在正在等待的任务列表,调用这个方法后,会调用正在执行线程的interrupt()方法,所以如果正在执行的线程如果调用了sleep,join,await等方法,会抛出InterruptedException异常。boolean awaitTermination(long timeout, TimeUnit unit): 该方法是一个阻塞方法,参数分别为时间和时间单位。这个方法一般配合上面两个方法之后调用。如果先调用shutdown方法,所有任务执行完成返回true,超时返回false,如果先调用的是shutdownNow方法,正在执行的任务全部完成true,超时返回false。boolean isTerminated();: 调用方法1或者2后,如果所有人物全部执行完毕则返回true,也就是说,就算所有任务执行完毕,但是不是先调用1或者2,也会返回false。<T> Future<T> submit(Callable<T> task);: 提交一个能够返回结果的Callable任务,返回任务结果抽象对象是Future,调用Future.get()方法可以阻塞等待获取执行结果,例如:result = exec.submit(aCallable).get();,提交一个任务并且一直阻塞知道该任务执行完成获取到返回结果。<T> Future<T> submit(Runnable task, T result);: 提交一个Runnable任务,执行成功后调用Future.get()方法返回的是result(这是什么骚操作?)。Future<?> submit(Runnable task);:和6不同的是调用Future.get()方法返回的是null(这又是什么操作?)。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks): 提交一组任务,并且返回每个任务执行结果的抽象对象List<Future<T>>,Future作用同上,值得注意的是:当调用其中任一Future.isDone()(判断任务是否完成,正常,异常终止都算)方法时,必须等到所有任务都完成时才返回true,简单说:全部任务完成才算完成。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 同方法8,多了一个时间参数,不同的是:如果超时,Future.isDone()同样返回true。<T> T invokeAny(Collection<? extends Callable<T>> tasks):这个看名字和上面对比就容易理解了,返回第一个正常完成的任务地执行结果,后面没有完成的任务将被取消。<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):同10相比,多了一个超时参数。不同的是:在超时时间内,一个任务都没有完成,将抛出TimeoutException。到现在,我们已经知道了一个线程池基本的所有方法,知道了每个方法的作用,接下来我们就来看看具体实现,首先我们研究下ExecutorService的具体实现抽象类:AbstractExecutorService。AbstractExecutorServiceAbstractExecutorService是一个抽象类,继承自ExecutorService,它实现了ExecutorService接口的submit, invokeAll, invokeAny方法,主要用于将ExecutorService的公共实现封装,方便子类更加方便使用,接下来我们看看具体实现:1. submit方法:public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}判空利用task构建一个Future的子类RunnableFuture,最后返回执行这个任务(execute方法声明在Executor接口中,所以也是交由子类实现)。execute方法交由子类实现了,这里我们主要分析newTaskFor方法,看它是如何构建Future对象的:首先,RunnableFuture接口定义如下:public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}他就是Future和Runnable的组合,它的实现是FutureTask:2. invokeAll方法:public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; // ① try { for (Callable<T> t : tasks) { // ② RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // ③ if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // ④ return futures; } finally { if (!done) // ⑤ for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); }}声明一个flag判断所有任务是否全部完成调用newTaskFor方法构建RunnableFuture对象,循环调用execute方法添加每一个任务。遍历每个任务结果,判断是否执行完成,没有完成调用 get()阻塞方法等待完成。所有任务全部完成,将flag设置成true。出现异常,还有任务没有完成,所有任务取消:Future.cancel()(实际是调用执行线程的interrupt方法。上面代码分析和我们一开始讲解ExecutorService的invokeAll一致。3. invokeAny方法invokeAny实际调用doInvokeAny:private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = // ① new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); // ② –ntasks; int active = 1; for (;;) { Future<T> f = ecs.poll(); // ③ if (f == null) { if (ntasks > 0) { –ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) break; else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else // ④ f = ecs.take(); } if (f != null) { // ⑤ –active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { for (int i = 0, size = futures.size(); i < size; i++) // ⑥ futures.get(i).cancel(true); } }声明一个ExecutorCompletionService ecs,这个对象实际是一个任务执行结果阻塞队列和线程池的结合,所以它可以加入任务,执行任务,将任务执行结果加入阻塞队列。向ecs添加tasks中的第一个任务并且执行。从ecs的阻塞队列中取出第一个(队头),如果为null(不为null跳到注释⑤),说明一个任务都还没执行完成,继续添加任务。如果所有任务都被添加了,阻塞等待任务的执行结果,知道有任一任务执行完成。如果取到了某个任务的执行结果,直接返回。取消所有还没执行的任务。上面代码分析和我们一开始讲解ExecutorService的invokeAny一致。 到现在,我们已经分析完了AbstractExecutorService中的公共的方法,接下来就该研究最终的具体实现了:ThreadPoolExecutorThreadPoolExecutorThreadPoolExecutor继承自AbstractExecutorService,它是线程池的具体实现:我们首先分析下构造方法:public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue&lt;Runnable&gt; workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)。corePoolSize:核心线程数,maximumPoolSize:线程池最大允许线程数,workQueue:任务队列,threadFactory:线程创建工厂,handler: 任务拒绝策,keepAliveTime, unit:等待时长,它们的具体作用如下:提交一个task(Runnable)后(执行execute方法),检查总线程数是否小于corePoolSize,小于等于则使用threadFactory直接创建一个线程执行任务,大于则再次检查线程数量是否等于maximumPoolSize,等于则直接执行handler拒绝策略,小于则判断workQueue是否已经满了,没满则将任务加入等待线程执行,满了则使用threadFactory创建新线程执行队头任务。通过流程图我们知道每个参数作用,这里值得注意的是,如果我们将某些参数特殊化,则可以得到特殊的线程池:corePoolSize=maximuPoolSize,我们可以创建一个线程池线程数量固定的任务。maximumPoolSize设置的足够大(Integer.MAX_VALUE),可以无限制的加入任务。workQueue设置的足够大,线程池中的数量不会超过corePoolSize,此时maximumPoolSize参数无用。corePoolSize=0,线程池一旦空闲(超过时间),线程都将被回收。我们上面知道,如果多余的空闲线程空闲时间超过keepAliveTimeunit,这些线程将被回收。我们可以通过方法allowCoreThreadTimeOut使这个参数对线程池中所有线程都有效果。workQueue一般有三种实现:SynchronousQueue,这是一个空队列,不会保存提交的task(添加操作必须等待另外的移除操作)。ArrayBlockingQueue,数组实现的丢列,可以指定队列的长度。LinkedBlockingQueue, 链表实现的队列,所以理论上可以无限大,也可以指定链表长度。而RejectedExecutionHandler一般由四种实现:AbortPolicy, 直接抛出RejectedExecutionException,这是线程池中的默认实现DiscardPolicy,什么都不做DiscardOldestPolicy,丢弃workQueue队头任务,加入新任务CallerRunsPolicy,直接在调用者的线程执行任务最后,我们再分析下ThreadPoolExecutor核心方法execute:public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // ① if (workerCountOf(c) < corePoolSize) { // ② if (addWorker(command, true)) return; c = ctl.get(); // ③ } if (isRunning(c) && workQueue.offer(command)) { // ④ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // ⑤ reject(command); else if (workerCountOf(recheck) == 0) // ⑥ addWorker(null, false); } else if (!addWorker(command, false)) // ⑦ reject(command); }获取线程池中的线程数量线程池中线程数量小于corePoolSize,直接调用addWorker添加新线程执行任务返回。因为多线程的关系,上一步可能调用addWorker失败(其它线程创建了,数以数量已经超过了),重启获取线程数量。向workQueue添加添加任务,如果添加成功,double获取线程数量,添加失败,走到步骤⑦double检查后发现线程池已经关闭或者数量超出,回滚已经添加的任务(remove(command))并且执行拒绝策略。double检查通过,添加一个新线程。再次添加线程,失败则调用拒绝策略。好了,到现在jdk中的线程池核心的实现,策略,分析我们已经分析完成了。接下来我我们就来看看关于线程池的另外的一些扩展,也就是图上的剩下的接口和类:ScheduledExecutorServiceScheduledExecutorService继承自ExecutorService,ExecutorService的分析上面我们已经知道了,我们来看看它扩展了哪些方法:这个接口作为线程池的定义主要增加了可以定时执行任务(执行一次)和定期执行任务(重复执行),我们来一一简述下每个方法的作用。public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);: 这个方法用于定时执行任务command,延迟的时间为delayunit,它返回一个ScheduledFuture对象用于获取执行结果或者剩余延时,调用Future.get()方法将阻塞当前线程最后返回null。public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);:同上,不同的是,调用Future.get()方法将返回执行的结果,而不是null。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,TimeUnit unit);: 重复执行任务command,第一次执行时间为initialDelay延迟后,以后的执行时间将在initialDelay + period * n,unit代表时间单位,值得注意的是,如果某次执行出现异常,后面该任务就不会再执行。或者通过返回对象Future手动取消,后面也将不再执行。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);: 效果同上,不同点:如果command耗时为 y,则上面的计算公式为initialDelay + period * n + y,也就是说,它的定时时间会加上任务耗时,而上面的方法则是一个固定的频率,不会算上任务执行时间!这是它扩展的四个方法,其中需要注意的是scheduleAtFixedRate和scheduleWithFixedDelay的细微差别,最后,我们来看下它的实现类:ScheduledThreadPoolExecutorScheduledThreadPoolExecutorScheduledThreadPoolExecutor继承自ThreadPoolExecutor类,实现了ScheduledExecutorService接口,上面均已经分析。它的构造器如下:看起来比它的父类构造器简洁,主要因为它的任务队列workQueue是默认的(DelayedWorkQueue),并且最大的线程数为最大值。接着我们看下DelayedWorkQueue实现:它内部使用数组维护了一个二叉树,提高了任务查找时间,而之所以ScheduledThreadPoolExecutor能够实现延时的关键也在于DelayedWorkQueue的take()方法: public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // ① RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don’t retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }工作线程调用take方法获取剩余任务。检查这个任务是否已经到了执行时间。未到执行时间,await等待。自己唤醒,进入循环再次计算时间。好了,到目前为止jdk中关于线程池的6个核心类已经全部分析完毕了。接下来还有最后一个小问题,我们手动创建线程池参数也太了,不管是ThreadPoolExecutor还是ScheduledThreadPoolExecutor,这对于用户来说似乎并不太友好,当然,jdk已经想到了这个问题,所以,我们最后再介绍一个创建这些线程池的工具类:Executors:Executors它的主要工具方法如下:比起手动创建,它帮我们加了很多默认值,用起来当然就方便多了,比如说newFixedThreadPool创建一个线程数固定的线程池,其实就是核心线程数等于最大线程数,和我们一开始分析的结果一样。值得注意的是:为了我们的程序安全可控性考虑,我们应该尽量考虑手动创建线程池,知晓每一个参数的作用,降低不稳定性!总结本次,我们首先从代码出发,分析了线程池给我们带来的好处以及直接使用线程的弊端,接着引出了jdk中的已经实现了的线程池。然后重点分析了jdk中关于线程池的六个最重要的接口和类,并且从源码角度讲解了关键点实现,最后,处于方便考虑,我们还知道jdk给我们留了一个创建线程池的工具类,简化了手动创建线程池的步骤。真正做到了知其然,知其所以然。关注我,这里只有干货! ...

March 12, 2019 · 4 min · jiezi

用多线程去处理 123,456,789 三个字符串,然后以147,258,369输出

import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/** * 用多线程去处理 “123”,“456”,“789” 三个字符串,然后以"147",“258”,“369"输出 * */public class ThreadSample { public static void main(String[] args) throws InterruptedException, ExecutionException { String str1 = “123”,str2 = “456”,str3 = “789”; ProcessThread thread3 = new ProcessThread(str3, null); ProcessThread thread2 = new ProcessThread(str2, thread3); ProcessThread thread1 = new ProcessThread(str1, thread2); for (int i = 0; i < str1.length(); i++) { thread1.setIndex(i); FutureTask<String> future = new FutureTask<String>(thread1); new Thread(future).start(); String outStr = future.get(); System.out.println(outStr); } }}输出:147258369import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;public class ProcessThread implements Callable<String>{ private String value; private ProcessThread next; private Integer index; public String call() throws Exception { if(this.next!=null){ this.next.setIndex(this.index); //开启下一个线程 FutureTask<String> future = new FutureTask<String>(this.next); new Thread(future).start(); String nextString = future.get(); return value.charAt(this.index)+nextString; } return String.valueOf(value.charAt(this.index)); } public ProcessThread(String value, ProcessThread next) { this.value = value; this.next = next; } //set/get 省略} ...

March 5, 2019 · 1 min · jiezi

深入解读MySQL8.0 新特性 :Crash Safe DDL

前言在MySQL8.0之前的版本中,由于架构的原因,mysql在server层使用统一的frm文件来存储表元数据信息,这个信息能够被不同的存储引擎识别。而实际上innodb本身也存储有元数据信息。这给ddl带来了一定的挑战,因为这种架构无法做到ddl的原子化,我们在线上经常能够看到数据目录下遗留的临时文件,或者类似server层和innodb层列个数不一致之类的错误。甚至某些ddl可能还遗留元数据在innodb内,而丢失了frm,导致无法重建表…..(我们为了解决这个问题,实现了一个叫drop table force的功能,去强制做清理….)(以下所有的讨论都假定使用InnoDB存储引擎)到了8.0版本,我们知道所有的元数据已经统一用InnoDB来进行管理,这就给实现原子ddl带来了可能,几乎所有的对innodb表,存储过程,触发器,视图或者UDF的操作,都能做到原子化:- 元数据修改,binlog以及innodb的操作都放在一个事务中- 增加了一个内部隐藏的系统表mysql.innodb_ddl_log,ddl操作被记录到这个表中,注意对该表的操作产生的redo会fsync到磁盘上,而不会考虑innodb_flush_log_at_trx_commit的配置。当崩溃重启时,会根据事务是否提交来决定通过这张表的记录去回滚或者执行ddl操作- 增加了一个post-ddl的阶段,这也是ddl的最后一个阶段,会去:1. 真正的物理删除或重命名文件; 2. 删除innodb_ddl_log中的记录项; 3.对于一些ddl操作还会去更新其动态元数据信息(存储在mysql.innodb_dynamic_metadata,例如corrupt flag, auto_inc值等)- 一个正常运行的ddl结束后,其ddl log也应该被清理,如果这中间崩溃了,重启时会去尝试重放:1.如果已经走到最后一个ddl阶段的(commit之后),就replay ddl log,把ddl完成掉;2. 如果处于某个中间态,则回滚ddl由于引入了atomic ddl, 有些ddl操作的行为也发生了变化:- DROP TABLE: 在之前的版本中,一个drop table语句中如果要删多个表,比如t1,t2, t2不存在时,t1会被删除。但在8.0中,t1和t2都不会被删除,而是抛出错误。因此要注意5.7->8.0的复制问题 (DROP VIEW, CREATE USER也有类似的问题)- DROP DATABASE: 修改元数据和ddl_log先提交事务,而真正的物理删除数据文件放在最后,因此如果在删除文件时崩溃,重启时会根据ddl_log继续执行drop database测试:MySQL很贴心的加了一个选项innodb_print_ddl_logs,打开后我们可以从错误日志看到对应的ddl log,下面我们通过这个来看下一些典型ddl的过程root@(none) 11:12:19>SET GLOBAL innodb_print_ddl_logs = 1; Query OK, 0 rows affected (0.00 sec)root@(none) 11:12:22>SET GLOBAL log_error_verbosity = 3; Query OK, 0 rows affected (0.00 sec)CREATE DATABASEmysql> CREATE DATABASE test;Query OK, 1 row affected (0.02 sec)创建数据库语句没有写log_ddl,可能觉得这不是高频操作,如果创建database的过程中失败了,重启后可能需要手动删除目录。CREATE TABLEmysql> USE test;Database changedmysql> CREATE TABLE t1 (a INT PRIMARY KEY, b INT);Query OK, 0 rows affected (0.06 sec)[InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=428, thread_id=7, space_id=76, old_file_path=./test/t1.ibd][InnoDB] DDL log delete : by id 428[InnoDB] DDL log insert : [DDL record: REMOVE CACHE, id=429, thread_id=7, table_id=1102, new_file_path=test/t1][InnoDB] DDL log delete : by id 429[InnoDB] DDL log insert : [DDL record: FREE, id=430, thread_id=7, space_id=76, index_id=190, page_no=4][InnoDB] DDL log delete : by id 430[InnoDB] DDL log post ddl : begin for thread id : 7InnoDB] DDL log post ddl : end for thread id : 7从日志来看有三类操作,实际上描述了如果操作失败需要进行的三项逆向操作:删除数据文件,释放内存中的数据词典信息,删除索引btree。在创建表之前,这些数据被写入到ddl_log中,在创建完表并commit后,再从ddl log中删除这些记录。另外上述日志中还有DDL log delete日志,其实在每次写入ddl log时是单独事务提交的,但在提交之后,会使用当前事务执行一条delete操作,直到操作结束了才会提交。加列(instant)mysql> ALTER TABLE t1 ADD COLUMN c INT;Query OK, 0 rows affected (0.08 sec)Records: 0 Duplicates: 0 Warnings: 0[InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log post ddl : end for thread id : 7注意这里执行的是Instant ddl, 这是8.0.13新支持的特性,加列操作可以只修改元数据,因此从ddl log中无需记录数据删列mysql> ALTER TABLE t1 DROP COLUMN c;Query OK, 0 rows affected (2.77 sec)Records: 0 Duplicates: 0 Warnings: 0[InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=487, thread_id=7, space_id=83, old_file_path=./test/#sql-ib1108-1917598001.ibd][InnoDB] DDL log delete : by id 487[InnoDB] DDL log insert : [DDL record: REMOVE CACHE, id=488, thread_id=7, table_id=1109, new_file_path=test/#sql-ib1108-1917598001][InnoDB] DDL log delete : by id 488[InnoDB] DDL log insert : [DDL record: FREE, id=489, thread_id=7, space_id=83, index_id=200, page_no=4][InnoDB] DDL log delete : by id 489[InnoDB] DDL log insert : [DDL record: DROP, id=490, thread_id=7, table_id=1108][InnoDB] DDL log insert : [DDL record: RENAME SPACE, id=491, thread_id=7, space_id=82, old_file_path=./test/#sql-ib1109-1917598002.ibd, new_file_path=./test/t1.ibd][InnoDB] DDL log delete : by id 491[InnoDB] DDL log insert : [DDL record: RENAME TABLE, id=492, thread_id=7, table_id=1108, old_file_path=test/#sql-ib1109-1917598002, new_file_path=test/t1][InnoDB] DDL log delete : by id 492[InnoDB] DDL log insert : [DDL record: RENAME SPACE, id=493, thread_id=7, space_id=83, old_file_path=./test/t1.ibd, new_file_path=./test/#sql-ib1108-1917598001.ibd][InnoDB] DDL log delete : by id 493[InnoDB] DDL log insert : [DDL record: RENAME TABLE, id=494, thread_id=7, table_id=1109, old_file_path=test/t1, new_file_path=test/#sql-ib1108-1917598001][InnoDB] DDL log delete : by id 494[InnoDB] DDL log insert : [DDL record: DROP, id=495, thread_id=7, table_id=1108][InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=496, thread_id=7, space_id=82, old_file_path=./test/#sql-ib1109-1917598002.ibd][InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log replay : [DDL record: DELETE SPACE, id=496, thread_id=7, space_id=82, old_file_path=./test/#sql-ib1109-1917598002.ibd][InnoDB] DDL log replay : [DDL record: DROP, id=495, thread_id=7, table_id=1108][InnoDB] DDL log replay : [DDL record: DROP, id=490, thread_id=7, table_id=1108][InnoDB] DDL log post ddl : end for thread id : 7这是个典型的三阶段ddl的过程:分为prepare, perform 以及commit三个阶段:Prepare: 这个阶段会修改元数据,创建临时ibd文件#sql-ib1108-1917598001.ibd, 如果发生异常崩溃,我们需要能把这个临时文件删除掉, 因此和create table类似,也为这个idb写了三条日志:delete space, remove cache,以及free btreePerform: 执行操作,将数据拷贝到上述ibd文件中,(同时处理online dmllog), 这部分不涉及log ddl操作Commit: 更新数据词典信息并提交事务, 这里会写几条日志:DROP : table_id=1108RENAME SPACE: #sql-ib1109-1917598002.ibd文件被rename成t1.ibdRENAME TABLE: #sql-ib1109-1917598002被rename成t1RENAME SPACE: t1.ibd 被rename成#sql-ib1108-1917598001.ibdRENAME TABLE: t1表被rename成#sql-ib1108-1917598001DROP TABLE: table_id=1108DELETE SPACE: 删除#sql-ib1109-1917598002.ibd实际上这一步写的ddl log描述了commit阶段操作的逆向过程:将t1.ibd rename成#sql-ib1109-1917598002, 并将sql-ib1108-1917598001 rename成t1表,最后删除旧表。其中删除旧表的操作这里不执行,而是到post-ddl阶段执行Post-ddl: 在事务提交后,执行最后的操作:replay ddl log, 删除旧文件,清理mysql.innodb_dynamic_metadata中相关信息DELETE SPACE: #sql-ib1109-1917598002.ibdDROP: table_id=1108DROP: table_id=1108加索引mysql> ALTER TABLE t1 ADD KEY(b);Query OK, 0 rows affected (0.14 sec)Records: 0 Duplicates: 0 Warnings: 0[InnoDB] DDL log insert : [DDL record: FREE, id=431, thread_id=7, space_id=76, index_id=191, page_no=5][InnoDB] DDL log delete : by id 431[InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log post ddl : end for thread id : 7创建索引采用inplace创建的方式,没有临时文件,但如果异常发生的话,依然需要在发生异常时清理临时索引, 因此增加了一条FREE log,用于异常发生时能够删除临时索引.TRUNCATE TABLEmysql> TRUNCATE TABLE t1;Query OK, 0 rows affected (0.13 sec)[InnoDB] DDL log insert : [DDL record: RENAME SPACE, id=439, thread_id=7, space_id=77, old_file_path=./test/#sql-ib1103-1917597994.ibd, new_file_path=./test/t1.ibd][InnoDB] DDL log delete : by id 439[InnoDB] DDL log insert : [DDL record: DROP, id=440, thread_id=7, table_id=1103][InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=441, thread_id=7, space_id=77, old_file_path=./test/#sql-ib1103-1917597994.ibd][InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=442, thread_id=7, space_id=78, old_file_path=./test/t1.ibd][InnoDB] DDL log delete : by id 442[InnoDB] DDL log insert : [DDL record: REMOVE CACHE, id=443, thread_id=7, table_id=1104, new_file_path=test/t1][InnoDB] DDL log delete : by id 443[InnoDB] DDL log insert : [DDL record: FREE, id=444, thread_id=7, space_id=78, index_id=194, page_no=4][InnoDB] DDL log delete : by id 444[InnoDB] DDL log insert : [DDL record: FREE, id=445, thread_id=7, space_id=78, index_id=195, page_no=5][InnoDB] DDL log delete : by id 445[InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log replay : [DDL record: DELETE SPACE, id=441, thread_id=7, space_id=77, old_file_path=./test/#sql-ib1103-1917597994.ibd][InnoDB] DDL log replay : [DDL record: DROP, id=440, thread_id=7, table_id=1103][InnoDB] DDL log post ddl : end for thread id : 7Truncate table是个比较有意思的话题,在早期5.6及之前的版本中, 是通过删除旧表创建新表的方式来进行的,5.7之后为了保证原子性,改成了原地truncate文件,同时增加了一个truncate log文件,如果在truncate过程中崩溃,可以通过这个文件在崩溃恢复时重新truncate。到了8.0版本,又恢复成了删除旧表,创建新表的方式,与之前不同的是,8.0版本在崩溃时可以回滚到旧数据,而不是再次执行。以上述为例,主要包括几个步骤:将表t1.ibd rename成#sql-ib1103-1917597994.ibd创建新文件t1.ibdpost-ddl: 将老文件#sql-ib1103-1917597994.ibd删除RENAME TABLEmysql> RENAME TABLE t1 TO t2;Query OK, 0 rows affected (0.06 sec)DDL LOG:[InnoDB] DDL log insert : [DDL record: RENAME SPACE, id=450, thread_id=7, space_id=78, old_file_path=./test/t2.ibd, new_file_path=./test/t1.ibd][InnoDB] DDL log delete : by id 450[InnoDB] DDL log insert : [DDL record: RENAME TABLE, id=451, thread_id=7, table_id=1104, old_file_path=test/t2, new_file_path=test/t1][InnoDB] DDL log delete : by id 451[InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log post ddl : end for thread id : 7这个就比较简单了,只需要记录rename space 和rename table的逆操作即可. post-ddl不需要做实际的操作DROP TABLEDROP TABLE t2[InnoDB] DDL log insert : [DDL record: DROP, id=595, thread_id=7, table_id=1119][InnoDB] DDL log insert : [DDL record: DELETE SPACE, id=596, thread_id=7, space_id=93, old_file_path=./test/t2.ibd][InnoDB] DDL log post ddl : begin for thread id : 7[InnoDB] DDL log replay : [DDL record: DELETE SPACE, id=596, thread_id=7, space_id=93, old_file_path=./test/t2.ibd][InnoDB] DDL log replay : [DDL record: DROP, id=595, thread_id=7, table_id=1119][InnoDB] DDL log post ddl : end for thread id : 7先在ddl log中记录下需要删除的数据,再提交后,再最后post-ddl阶段执行真正的删除表对象和文件操作代码实现:主要实现代码集中在文件storage/innobase/log/log0ddl.cc中,包含了向log_ddl表中插入记录以及replay的逻辑。隐藏的innodb_log_ddl表结构如下 def->add_field(0, “id”, “id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT”); def->add_field(1, “thread_id”, “thread_id BIGINT UNSIGNED NOT NULL”); def->add_field(2, “type”, “type INT UNSIGNED NOT NULL”); def->add_field(3, “space_id”, “space_id INT UNSIGNED”); def->add_field(4, “page_no”, “page_no INT UNSIGNED”); def->add_field(5, “index_id”, “index_id BIGINT UNSIGNED”); def->add_field(6, “table_id”, “table_id BIGINT UNSIGNED”); def->add_field(7, “old_file_path”, “old_file_path VARCHAR(512) COLLATE UTF8_BIN”); def->add_field(8, “new_file_path”, “new_file_path VARCHAR(512) COLLATE UTF8_BIN”); def->add_index(0, “index_pk”, “PRIMARY KEY(id)”); def->add_index(1, “index_k_thread_id”, “KEY(thread_id)”);记录类型根据不同的操作类型,可以分为如下几类:FREE_TREE_LOG目的是释放索引btree,入口函数log_DDL::write_free_tree_log,在创建索引和删除表时会调用到对于drop table中涉及的删索引操作,log ddl的插入操作放到父事务中,一起要么提交要么回滚对于创建索引的case, log ddl就需要单独提交,父事务将记录标记删除,这样后面如果ddl回滚了,也能将残留的index删掉。DELETE_SPACE_LOG入口函数:Log_DDL::write_delete_space_log用于记录删除tablespace操作,同样分为两种情况:drop table/tablespace, 写入的记录随父事务一起提交,并在post-ddl阶段replay创建tablespace, 写入的记录单独提交,并被父事务标记删除,如果父事务回滚,就通过replay删除参与的tablespaceRENAME_SPACE_LOG入口函数:Log_DDL::write_rename_space_log用于记录rename操作,例如如果我们把表t1 rename成t2,在其中就记录了逆向操作t2 rename to t1.在函数Fil_shard::space_rename()中,总是先写ddl log, 再做真正的rename操作. 写日志的过程同样是独立事务提交,父事务做未提交的删除操作DROP_LOG入口函数: Log_DDL::write_drop_log用于记录删除表对象操作,这里不涉及文件层操作,写ddl log在父事务中执行RENAME_TABLE_LOG入口函数: Log_DDL::write_rename_table_log用于记录rename table对象的逆操作,和rename space类似,也是独立事务提交ddl log, 父事务标记删除REMOVE_CACHE_LOG入口函数: Log_DDL::write_remove_cache_log用于处理内存表对象的清理,独立事务提交,父事务标记删除ALTER_ENCRYPT_TABLESPACE_LOG入口函数: Log_DDL::write_alter_encrypt_space_log用于记录对tablespace加密属性的修改,独立事务提交. 在写完ddl log后修改tablespace page0 中的加密标记综上,在ddl的过程中可能会提交多次事务,大概分为三类:独立事务写ddl log并提交,父事务标记删除, 如果父事务提交了,ddl log也被顺便删除了,如果父事务回滚了,那就要根据ddl log做逆操作来回滚ddl独立事务写ddl log 并提交, (目前只有ALTER_ENCRYPT_TABLESPACE_LOG)使用父事务写ddl log,在ddl结束时提交。需要在post-ddl阶段处理post_ddl如上所述,有些ddl log是随着父事务一起提交的,有些则在post-ddl阶段再执行, post_ddl发生在父事提交或回滚之后: 若事务回滚,根据ddl log做逆操作,若事务提交,在post-ddl阶段做最后真正不可逆操作(例如删除文件)入口函数: Log_DDL::post_ddl –>Log_DDL::replay_by_thread_id根据执行ddl的线程thread id通过innodb_log_ddl表上的二级索引,找到log id,再到聚集索引上找到其对应的记录项,然后再replay这些操作,完成ddl后,清理对应记录崩溃恢复在崩溃恢复结束后,会调用ha_post_recover接口函数,进而调用innodb内的函数Log_DDL::recover(), 同样的replay其中的记录,并在结束后删除记录。但ALTER_ENCRYPT_TABLESPACE_LOG类型并不是在这一步删除,而是加入到一个数组ts_encrypt_ddl_records中,在之后调用resume_alter_encrypt_tablespace来恢复操作,参考文档1. 官方文档2. wl#9536: support crash safe ddl本文作者:zhaiwx_yinfeng阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

February 26, 2019 · 5 min · jiezi

浅析State-Thread

State-Thread(以下简称st),是一个由C语言编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析State-Thread,欢迎大家积极留言,和我们共同讨论。在开始这个话题之前,我们先来聊一聊协程。什么是协程?协程是一种程序组件。通常我们把协程理解为是一种程序自己实现调度、用于提高运行效率、降低开发复杂度的东西。提高运行效率很好理解,因为在程序层自己完成了部分的调度,降低了对系统调度的依赖,减少了大量的中断和换页操作。而降低了开发复杂度,则是指对于开发者而言,可以使用同步的方式去进行代码开发(不需要考虑异步模型的诸多回调),也不需要考虑多线程模型的线程调度和诸多的临界资源问题。很多语言都拥有协程,例如python或者golang。而对于c/c++而言,通常实现协程的常见方式,通常是依赖于glibc提供的setjump&longjump或者基于汇编语言,当然还有基于语义实现(protothread)。linux上使用协程库的方式,通常也会分为替换函数和更为暴力的替换so来实现。当然而各种方式有各自的优劣。而st选用的汇编语言实现setjump&longjump和要求用户调用st_打头的函数来嵌入程序。所以st具备了跨平台的能力,以及让开发者们更开心的“与允许调用者自行选择切换时机”的能力。st究竟是如何实现了这一切?首先我们先看看st的整体工作流程:在宏观的来看,ST的结构主要分成:vp_schedule。主要是负责了一个调度的能力。有点类似于linux内核当中的schedule()函数。每次当这个函数被调用的时候,都会完成一次线程的切换。各种Queue。用于保存各种状态下等待被调度协程(st_thread)Timer。用于记录各种超时和sleep。poll。用于监听各种io事件,会根据系统能力不同而进行切换(kqueue、epoll、poll、select)。st_thread。用于保存各种协程的信息。其中比较重要的是schedule模块和thread模块两者。这两者实现了一个完整的协程切换和调度。属于st的核心。而schedule部分通常是开发者们最需要关心的部分。接下来我们会深入到代码层,看一下具体在这个过程里做了些什么。通常对于st而言,所有暴露给用户的除了init函数,就是一系列的st_xxx函数了。那么先看看init函数。int st_init(void){ _st_thread_t thread;if (_st_active_count) { / Already initialized / return 0; }/ We can ignore return value here / st_set_eventsys(ST_EVENTSYS_DEFAULT);if (_st_io_init() < 0) return -1;memset(&_st_this_vp, 0, sizeof(_st_vp_t));ST_INIT_CLIST(&_ST_RUNQ); ST_INIT_CLIST(&_ST_IOQ); ST_INIT_CLIST(&_ST_ZOMBIEQ);if ((_st_eventsys->init)() < 0) return -1;_st_this_vp.pagesize = getpagesize(); _st_this_vp.last_clock = st_utime();/Create idle thread/ _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0); if (!_st_this_vp.idle_thread) return -1; _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; _st_active_count–; _ST_DEL_RUNQ(_st_this_vp.idle_thread);/Initialize primordial thread/ thread = (_st_thread_t ) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX sizeof(void ))); if (!thread) return -1; thread->private_data = (void **) (thread + 1); thread->state = _ST_ST_RUNNING; thread->flags = _ST_FL_PRIMORDIAL; _ST_SET_CURRENT_THREAD(thread); _st_active_count++;return 0;}这段函数一共做了3事情,创建了一个idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、_ST_ZOMBIEQ三个队列,把当前调用者初始化成原始函数(通常st_init会在main里面调用,所以这个原始的thread相当于是主线程)。idle_thread函数,其实就是整个IO和定时器相关的本体函数了。st会在每一次_ST_RUNQ运行完成后,调用idle_thread来获取可读写的io和定时器。这个我们后续再说。那么,st_xxx一般会分成io类和延迟类(sleep)。两者入口其实是同一个,只不过在io类的会多调用一层。我们这里选择st_send为代表。int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags, st_utime_t timeout){ int n;while ((n = sendmsg(fd->osfd, msg, flags)) < 0) { if (errno == EINTR) continue; if (!_IO_NOT_READY_ERROR) return -1; / Wait until the socket becomes writable / if (st_netfd_poll(fd, POLLOUT, timeout) < 0) return -1; }return n;}本质上所有的st函数都是以异步接口+ st_netfd_poll来实现的。在st_netfd_poll以内,会去调用st_poll,而st_poll本质上会调用并且切换线程。int st_netfd_poll(_st_netfd_t fd, int how, st_utime_t timeout){ struct pollfd pd; int n;pd.fd = fd->osfd; pd.events = (short) how; pd.revents = 0;if ((n = st_poll(&pd, 1, timeout)) < 0) return -1; if (n == 0) { / Timed out / errno = ETIME; return -1; } if (pd.revents & POLLNVAL) { errno = EBADF; return -1; }return 0;}int st_poll(struct pollfd pds, int npds, st_utime_t timeout){ struct pollfd pd; struct pollfd epd = pds + npds; _st_pollq_t pq; _st_thread_t me = _ST_CURRENT_THREAD(); int n;if (me->flags & _ST_FL_INTERRUPT) { me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1; }if ((_st_eventsys->pollset_add)(pds, npds) < 0) return -1;pq.pds = pds; pq.npds = npds; pq.thread = me; pq.on_ioq = 1; _ST_ADD_IOQ(pq); if (timeout != ST_UTIME_NO_TIMEOUT) _ST_ADD_SLEEPQ(me, timeout); me->state = _ST_ST_IO_WAIT;_ST_SWITCH_CONTEXT(me);n = 0; if (pq.on_ioq) { / If we timed out, the pollq might still be on the ioq. Remove it / _ST_DEL_IOQ(pq); (_st_eventsys->pollset_del)(pds, npds); } else { / Count the number of ready descriptors / for (pd = pds; pd < epd; pd++) { if (pd->revents) n++; } }if (me->flags & _ST_FL_INTERRUPT) { me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1; }return n;}那么到此为止,st_poll中就出现了我们最关心的调度部分了。当一个线程进行调度的时候一般都是poll_add(如果是io操作),add_queue, _ST_SWITCH_CONTEXT完成一次调度。根据不同的类型,会add到不同的queue。例如需要超时,则会add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,则是最关键的切换线程操作了。_ST_SWITCH_CONTEXT其实是一个宏,它的本质是调用了MD_SETJMP和_st_vp_schedule().define _ST_SWITCH_CONTEXT(_thread) \ST_BEGIN_MACRO \ ST_SWITCH_OUT_CB(_thread); \ if (!MD_SETJMP((_thread)->context)) { \ _st_vp_schedule(); \ } \ ST_DEBUG_ITERATE_THREADS(); \ ST_SWITCH_IN_CB(_thread); \ ST_END_MACRO这个函数其实就是一个完成的线程切换了。在st里线程的切换会使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其实就是st使用汇编自己写的setjmp和longjmp函数(glibc),效果也是几乎等效的。(因为st本身会做平台适配,所以我们以x86-64的汇编为例)elif defined(amd64) || defined(x86_64)/Internal __jmp_buf layout/define JB_RBX 0define JB_RBP 1define JB_R12 2define JB_R13 3define JB_R14 4define JB_R15 5define JB_RSP 6define JB_PC 7.file “md.S” .text/ _st_md_cxt_save(__jmp_buf env) /.globl _st_md_cxt_save .type _st_md_cxt_save, @function .align 16_st_md_cxt_save: /Save registers./ movq %rbx, (JB_RBX8)(%rdi) movq %rbp, (JB_RBP8)(%rdi) movq %r12, (JB_R128)(%rdi) movq %r13, (JB_R138)(%rdi) movq %r14, (JB_R148)(%rdi) movq %r15, (JB_R158)(%rdi) / Save SP / leaq 8(%rsp), %rdx movq %rdx, (JB_RSP8)(%rdi) / Save PC we are returning to / movq (%rsp), %rax movq %rax, (JB_PC8)(%rdi) xorq %rax, %rax ret .size _st_md_cxt_save, .-_st_md_cxt_save/// _st_md_cxt_restore(__jmp_buf env, int val) /.globl _st_md_cxt_restore .type _st_md_cxt_restore, @function .align 16_st_md_cxt_restore: /Restore registers./ movq (JB_RBX8)(%rdi), %rbx movq (JB_RBP8)(%rdi), %rbp movq (JB_R128)(%rdi), %r12 movq (JB_R138)(%rdi), %r13 movq (JB_R148)(%rdi), %r14 movq (JB_R158)(%rdi), %r15 / Set return value / test %esi, %esi mov $01, %eax cmove %eax, %esi mov %esi, %eax movq (JB_PC8)(%rdi), %rdx movq (JB_RSP8)(%rdi), %rsp / Jump to saved PC / jmpq *%rdx .size _st_md_cxt_restore, .-_st_md_cxt_restore//MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成一次函数间的跳转。那么我们已经看到了MD_SETJMP的调用,MD_LONGJMP调用在哪儿呢?让我们继续看下去,在最一开始,我们就提及过_st_vp_schedule()这个核心函数。void _st_vp_schedule(void){ _st_thread_t *thread;if (_ST_RUNQ.next != &_ST_RUNQ) { / Pull thread off of the run queue / thread = _ST_THREAD_PTR(_ST_RUNQ.next); _ST_DEL_RUNQ(thread); } else { / If there are no threads to run, switch to the idle thread / thread = _st_this_vp.idle_thread; } ST_ASSERT(thread->state == _ST_ST_RUNNABLE);/ Resume the thread / thread->state = _ST_ST_RUNNING; _ST_RESTORE_CONTEXT(thread);}这个函数其实非常简单,基本工作原理可以认为是执行以下几步: 1.查看当前RUNQ是否有可以调用的,如果有,则RUNQ pop一个thread。 2. 如果没有,则运行idle_thread。 3. 调用_ST_RESTORE_CONTEXT。那么_ST_RESTORE_CONTEXT做了什么呢?define _ST_RESTORE_CONTEXT(_thread) \ST_BEGIN_MACRO \ _ST_SET_CURRENT_THREAD(_thread); \ MD_LONGJMP((_thread)->context, 1); \ ST_END_MACRO简单来说,_ST_RESTORE_CONTEXT就是调用了我们之前所没有看到的MD_LONGJMP。所以,我们可以简单地认为,在携程需要schedule的时候,会先把自身当前的栈通过MD_SETJMP保存起来,当线程被schedule再次调度出来的时候,则会使用MD_SETJMP来还原栈,完成一次协程切换。然后我们来看看idle_thread做了什么。虽然这个协程名字叫做idle,但是其实做了很多的事情。void _st_idle_thread_start(void arg){ _st_thread_t *me = _ST_CURRENT_THREAD();while (_st_active_count > 0) { / Idle vp till I/O is ready or the smallest timeout expired / _ST_VP_IDLE();/ Check sleep queue for expired threads / _st_vp_check_clock();me->state = _ST_ST_RUNNABLE; _ST_SWITCH_CONTEXT(me); }/ No more threads / exit(0);/ NOTREACHED / return NULL;}总的来说,idle_thread做了两件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是检查定时器是否超时,如果超时了,则设置超时标记之后,放回RUNQ。而_ST_VP_IDLE,其实就是查看io是否已经ready了。例如linux的话,则会调用epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,_st_epoll_data->evtlist_size, timeout)去查看是否有可响应的io。timeout值会根据当前空闲情况进行变化,通常来说会是一个极小的值。那么看到这里,整体的线程调度已经全部走完了。(详见前面最一开始的流程图)总体流程总结来说基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。所以对于st而言,所以的调度,是基于用户调用。那么如果用户一直不调用st_xxx()(例如计算密集性服务),st也就无法进行协程切换,那么其他协程也就产生极大的阻塞了。这也是为什么st并不太合适计算密集型的原因(其实单线程框架大多都不合适计算密集型)想要阅读更多技术干货文章,欢迎关注网易云信博客。了解网易云信,来自网易核心架构的通信与视频云服务。 ...

January 21, 2019 · 3 min · jiezi

线程池的C实现

一. 概述相信大家一直有听过线程池, 但是不一定都知道这到底是个什么东西,是如何实现的;1.1 为什么要使用线程池?因为线程的创建和销毁开销较大, 在单位时间内处理大量任务时再创建线程进行处理时间来不及.可以控制线程数目, 保证不会过度消耗内存.1.2 线程池适合应用的场合当一个服务器接受到大量短小线程的请求时, 使用线程池技术是非常合适的, 它可以大大减少线程的创建和销毁次数, 提高服务器的工作效率. 但是线程要求的运行时间比较长, 则不适用.二. 功能说明2.1 线程池比喻Setup1: 一个医院,每天面对成千上万的病人,处理方式是:来一个病人找来一个医生处理,处理完了医生也走了。当看病时间较短的时候,医生来去的时间,显得尤为费时了Setup2: 医院引进了线程池的概念。设置门诊,把医生全派出去坐诊,病人来看病先挂号排队,医生根据病人队列顺序依次处理各个病人,这样就省去医生来来去去的时间了。但是,很多时候病人不多,医生却很多导致很多医生空闲浪费水电资源撒Setup3: 医院引进了可伸缩性线程池的概念,如阶段二,但是门诊一开始只派出了部分医生,但是增加了一个领导,病人依旧是排队看病,领导负责协调整个医院的医生。当病人很多医生忙不过来的时候,领导就去多叫几个医生来帮忙;当病人不多医生太多的时候,领导就叫一些医生回家休息去免得浪费医院资源2.2 线程池功能线程池一般有以下三个功能:创建线程池销毁线程池添加新任务以上是对外的三个接口方法;本次实现的线程池对外有四个接口:struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time);void sl_thread_pool_destory(struct sl_thread_pool *pool);void sl_thread_pool_destory_now(struct sl_thread_pool *pool);int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg);在销毁线程的时候我做了个功能细化,分为两种: 一种是立即销毁线程池, 一种是执行完任务队列中的任务再销毁线程池,两种方式都是为阻塞方式;2.3 API 介绍2.3.1 创建线程池struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time);core_td_num: 初始化线程数 max_td_num: 最大线程数目(线程数量是动态分配) alive_time: 线程空闲时存活的时间,单位:毫秒 return: 返回线程池句柄该接口主要是用于创建一个线程池, 笔者写的该线程池可以动态的伸缩所以加入了最大线程数限制和存活时间.2.3.2 销毁线程池void sl_thread_pool_destory(struct sl_thread_pool *pool);调用该接口时,线程池不会立马被注销而是处理完任务队列中的所有任务才注销;void sl_thread_pool_destory_now(struct sl_thread_pool *pool);调用该接口时,立即注销线程池;2.3.3 添加新任务int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void arg);向线程池中添加一个新任务, 形参task_fun为任务的函数指针, arg为函数指针的参数;三. 实现原理笔者写的该线程池有两个重要的链表:一个是线程链表,一个是任务链表,还有一个重要的线程:manager线程,用于管理线程的销毁和创建;3.1 线程池创建struct sl_thread_pool sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time){ struct sl_thread_pool pstp = NULL; struct sl_thread thread = NULL; int create_ret = -1; pstp = (struct sl_thread_pool)malloc(sizeof(struct sl_thread_pool)); ① if (pstp == NULL) { ERR("%s: malloc error for creat pool", FUNCTION); goto malloc_pool_err; } create_ret = sl_create_thread_key(destructor_fun); ② if (create_ret != 0) { ERR("%s: create thread key error", FUNCTION); goto create_key_err; } / 创建manager/ create_manager_looper(pstp); thread = sl_thread_create(sl_thread_manager_do, pstp); ③ if (thread == NULL) { ERR("%s: malloc error for create pool", FUNCTION); goto create_manager_err; } else { pstp->thread_manager = thread; pthread_setname_np(thread->thread_id, “manager_thread”); } / 初始化线程池链表 / list_head_init(&pstp->thread_head); list_head_init(&pstp->task_queue.task_head); / 初始化线程池计数 / pstp->core_threads_num = core_td_num; pstp->max_threads_num = max_td_num; pstp->keep_alive_time = alive_time; pstp->alive_threads_num = 0; pstp->destory = 0; pthread_mutex_init(&pstp->thread_mutex, NULL); // pthread_cond_init(&pstp->thread_run_signal, NULL); / 初始化工作锁 / pthread_mutex_init(&pstp->task_queue.task_mutex, NULL); / 初始化工作队列同步条件 / pthread_cond_init(&pstp->task_queue.task_ready_signal, NULL); / 创建核心线程 / for (int i = 0; i < pstp->core_threads_num; i++) { ④ thread = sl_thread_create(sl_thread_do, pstp); if (thread != NULL) { list_add(&pstp->thread_head, &thread->thread_list); pthread_setname_np(thread->thread_id, “core_thread”); } else { i–; } } / 等待核心线程创建完成 / while (pstp->alive_threads_num != pstp->core_threads_num); return pstp;create_manager_err: pthread_key_delete(g_key);create_key_err: free(pstp);malloc_pool_err: return NULL;}①: 为线程池分配空间.②: 创建线程私有数据.③: 创建manager线程.④: 创建核心线程,这一定数量的线程是不会被释放的.线程池的数据结构如下:struct sl_thread_pool { struct list_head thread_head; / 线程链表 / struct sl_task_queue task_queue; / 任务链表 / unsigned int core_threads_num; / 初始化需要创建的线程数 / unsigned int max_threads_num; / 创建线程最大上限数 / unsigned int alive_threads_num; / 当前创建线程数量 / pthread_mutex_t thread_mutex; pthread_cond_t thread_run_signal; / 线程run信号 / int keep_alive_time; / 空闲线程保持存活时间 unit: ms */ struct sl_thread thread_manager; / 领导 */ unsigned int destory;};3.2 线程管理static void *sl_thread_manager_do(void *arg){ struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; int next_poll_time = -1; int keep_alive_time = -1; if (pstp == NULL) { ERR("%s: pool is NULL", FUNCTION); return NULL; } do { usleep(100); } while(pstp->thread_manager == NULL); while (pstp->thread_manager->thread_status != THREAD_QUIT) { keep_alive_time = poll_event(pstp, next_poll_time); next_poll_time = get_next_poll_time(keep_alive_time); } INFO(“sl_thread_manager_do quit”); return NULL;}manager线程主要是epoll来轮询事件,然后做出相应的处理;主要的事件有三个:线程挂起事件(空闲)新增任务事件注销事件static int poll_event(struct sl_thread_pool pool, int time_out){ … struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int event_count = epoll_wait(g_epoll_fd, eventItems, EPOLL_MAX_EVENTS, time_out); … // Check for poll timeout. if (event_count == 0) { ① list_for_each(plh, &pstp->thread_head) { pst = sl_list_entry(plh, struct sl_thread, thread_list); DEBUG("%s: pstp->alive_threads_num = %d, %ld thread status %s", FUNCTION, pstp->alive_threads_num, pst->thread_id, get_status(pst->thread_status)); if (pstp->alive_threads_num > pstp->core_threads_num) { if (pst->thread_status == THREAD_SUPPEND) { pst->thread_status = THREAD_QUIT; sl_notify_all(&pstp->task_queue); delete_when_each(plh); pthread_join(pst->thread_id, NULL); free(pst); keep_time = 50; // 50ms再检测一次 break; } } else { keep_time = -1; break; } } return keep_time; } // despatch for poll event for (int i = 0; i < event_count; i++) { fd = eventItems[i].data.fd; epoll_events = eventItems[i].events; if ((fd == g_wake_read_pip_fd) && (epoll_events & EPOLLIN)) { / thread和task同时来临只处理thread / ret_event = sl_get_event(); switch(ret_event) { case EVENT_THREAD: ② DEBUG(“EVENT_THREAD”); if (pstp->alive_threads_num > pstp->core_threads_num) { keep_time = pstp->keep_alive_time; } else { keep_time = -1; } break; case EVENT_TASK: ③ DEBUG(“EVENT_TASK”); / 判断当前线程的消息和当前运行线程比例 / pstq = &pstp->task_queue; if(pstq->num_tasks_alive >= (pstp->alive_threads_num * 2) && (pstp->alive_threads_num <= pstp->max_threads_num)) { / 创建线程 / pst = sl_thread_create(sl_thread_do, pstp); if (pst != NULL) { list_add(&pstp->thread_head, &pst->thread_list); pthread_setname_np(pst->thread_id, “other_thread”); } } break; case EVENT_SHUTDOWN: ④ DEBUG(“EVENT_SHUTDOWN”); / 执行完任务对列中的任务才shutdown */ pstp->core_threads_num = 0; pool->destory = 1; break; default: break; } } } return keep_time;}①: wait超时的处理,一般进入超时状态都是准备注销线程, 线程空闲时则注销. ②: 线程状态变化处理,判断当前线程是否多余核心线程,如果是则设置存活时间为下一轮的wait超时时间. ③: 发送任务事件后,主要是判断当前任务数量,线程池是否处理的过来,否则创建新线程. ④: 注销事件,核心线程数设置为0,等待任务链表中的任务处理完再注销;事件的轮询主要是借助epoll监控管道的变化实现,想了解的可以详细看下代码;3.3 任务的执行static void *sl_thread_do(void *arg){ struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; struct sl_thread_task *pstt = NULL; struct sl_task_queue *pstq = NULL; if (pstp == NULL) { ERR("%s: pool is NULL", FUNCTION); return NULL; } pstq = &pstp->task_queue; pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num++; pthread_mutex_unlock(&pstp->thread_mutex); sl_save_thread_self(pstp); while (sl_get_thread_self()->thread_status != THREAD_QUIT) { pstt = sl_task_pull(pstq); ① if (pstt != NULL) { sl_update_thread_status(THREAD_WORKING); pstt->task_fun(&pstt->arg); ② free(pstt); } } pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num–; pthread_mutex_unlock(&pstp->thread_mutex); sl_update_thread_status(THREAD_IDLE); sl_clear_thread_self(); ③ INFO(“thread_run_task %ld quit, currten threads count %d, currten tasks count %d\n”, pthread_self(), pstp->alive_threads_num, pstq->num_tasks_alive); return NULL;}①: 从任务对列中取出一个任务, 没有则休眠; ②: 执行任务 ③: 清除私有数据中存放的值这在说明一点,用线程的私有数据进行存储, 主要是为了更新线程的状态方便;3.4 任务添加int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg){ struct sl_task_queue *pstq = NULL; struct sl_thread_task pstt = NULL; if (pool == NULL || task_fun == NULL || pool->destory == 1) { ERR("%s: pool or task_fun is NULL or is destory status", FUNCTION); return -1; } pstq = &pool->task_queue; pstt = (struct sl_thread_task)malloc(sizeof(struct sl_thread_task)); if (pstt == NULL) { ERR("%s: malloc error for creat a task", FUNCTION); return -1; } pstt->task_fun = task_fun; pstt->arg = arg; return sl_task_push(pstq, pstt);}该接口主要分配了一个空间初始化传进来的任务,往下看:static int sl_task_push(struct sl_task_queue *_stq, struct sl_thread_task *new_task){ struct sl_task_queue *pstq = _stq; struct sl_thread_task *pstt = new_task; if (pstq == NULL || pstt == NULL) { ERR("%s: pstq or pstt is NULL", FUNCTION); return -1; } pthread_mutex_lock(&pstq->task_mutex); list_add(&pstq->task_head, &pstt->task_list); pstq->num_tasks_alive++; pthread_mutex_unlock(&pstq->task_mutex); sl_notify_one(pstq); sl_update_task_queue_info(); return pstq->num_tasks_alive;}将刚才保存的任务添加进任务对列并发送通知;四. 总结笔者写这个线程池,主要涉及到这个点有: 同步变量, 锁, 线程私有数据, 管道, epoll和双向队列;代码已经放到我的github上了: thread pool ...

January 2, 2019 · 4 min · jiezi

简说Java线程的那几个启动方式

本文首发于 猫叔的博客,转载请申明出处前言并发是一件很美妙的事情,线程的调度与使用会让你除了业务代码外,有新的世界观,无论你是否参与但是这对于你未来的成长帮助很大。所以,让我们来好好看看在Java中启动线程的那几个方式与介绍。Thread对于 Thread 我想这个基本上大家都认识的,在Java源码是这样说: java 虚拟机允许应用程序同时运行多个执行线程。 而这个的 Thread 就是程序的执行线程。如何使用它呢,其实在这个类中的源码已经给我们写好了,甚至是下面的 Runnable 的使用方式。(如下是Thread源码)/** * A <i>thread</i> is a thread of execution in a program. The Java * Virtual Machine allows an application to have multiple threads of * execution running concurrently. * <hr><blockquote><pre> * class PrimeThread extends Thread { * long minPrime; * PrimeThread(long minPrime) { * this.minPrime = minPrime; * } * * public void run() { * // compute primes larger than minPrime * &nbsp;.&nbsp;.&nbsp;. * } * } * </pre></blockquote><hr> * <p> * The following code would then create a thread and start it running: * <blockquote><pre> * PrimeThread p = new PrimeThread(143); * p.start(); * </pre></blockquote> * <p> * <hr><blockquote><pre> * class PrimeRun implements Runnable { * long minPrime; * PrimeRun(long minPrime) { * this.minPrime = minPrime; * } * * public void run() { * // compute primes larger than minPrime * &nbsp;.&nbsp;.&nbsp;. * } * } * </pre></blockquote><hr> * <p> * The following code would then create a thread and start it running: * <blockquote><pre> * PrimeRun p = new PrimeRun(143); * new Thread(p).start(); * </pre></blockquote> * <p> /public class Thread implements Runnable { //…}阅读源码的信息其实是最全的 ,我截取了部分的注释信息,起码我们现在可以无压力的使用这个两个方式来启动自己的线程。如果我们还要传递参数的话,那么我们设定一个自己的构造函数也是可以,如下方式:public class MyThread extends Thread { public MyThread(String name) { super(name); } @Override public void run() { System.out.println(“一个子线程 BY " + getName()); }}这时读者应该发现,这个构造函数中的 name ,居然在 Thread 中也是有的,其实在Java中的线程都会自己的名称,如果我们不给其定义名称的话,java也会自己给其命名。/** Allocates a new {@code Thread} object. This constructor has the same* effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}* {@code (null, null, name)}.** @param name* the name of the new thread*/public Thread(String name) { init(null, null, name, 0);}而我们最核心,也是大家最在意的应该就是如何启动并执行我们的线程了,是的,这个大家都知道的,就是这个 run 方法了。同时大家如果了解过了 Runnable ,我想大家都会知道这个 run 方法,其实是 Runnable 的方法,而我们本节的 Thread 也是实现了这个接口。这里,大家可能会好奇,不是应该是 start 这个方法吗?那么让我们看看 start 的源码。/** * Causes this thread to begin execution; the Java Virtual Machin * calls the <code>run</code> method of this thread. /public synchronized void start() { //…}通过 start 方法,我们可以了解到,就如同源码的启动模板中那样,官网希望,对于线程的启动,使用者是通过 start 的方式来启动线程,因为这个方法会让Java虚拟机会调用这个线程的 run 方法。其结果就是,一个线程去运行 start 方法,而另一个线程则取运行 run 方法。同时对于这样线程,Java官方也说了,线程是不允许多次启动的,这是不合法的。所以如果我们执行下面的代码,就会报 java.lang.IllegalThreadStateException 异常。MyThread myThread = new MyThread(“Thread”);myThread.start();myThread.start();但是,如果是这样的代码呢?MyThread myThread = new MyThread(“Thread”);myThread.run();myThread.run();myThread.start();//运行结果一个子线程 BY Thread一个子线程 BY Thread一个子线程 BY Thread这是不合理的,如果大家有兴趣,可以去试试并动手测试下,最好开调试模式。下面我们再看看,连 Thread 都要实现,且核心的 run 方法出处的 Runnable 。Runnable比起 Thread 我希望大家跟多的使用 Runnable 这个接口实现的方式,对于好坏对比会在总结篇说下。我想大家看 Runnable 的源码会更加容易与容易接受,毕竟它有一个 run 方法。(如下为其源码)/* * The <code>Runnable</code> interface should be implemented by any * class whose instances are intended to be executed by a thread. The * class must define a method of no arguments called <code>run</code>. /@FunctionalInterfacepublic interface Runnable { /* * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object’s * <code>run</code> method to be called in that separately executing * thread. / public abstract void run();}首先,所有打算执行线程的类均可实现这个 Runnable 接口,且必须实现 run 方法。它将为各个类提供一个协议,就像 Thread 一样,其实当我们的类实现了 Runnable 的接口后,我们的类与 Thread 是同级,只是可能仅有 run 方法,而没有 Thread 提供的跟丰富的功能方法。而对于 run 方法,则是所有实现了 Runnable 接口的类,在调用 start 后,将使其单独执行 run 方法。那么我们可以写出这样的测试代码。MyThreadRunnable myThreadRunnable = new MyThreadRunnable(“Runnabel”);myThreadRunnable.run();new Thread(myThreadRunnable).start();Thread thread = new Thread(myThreadRunnable);thread.start();thread.start();//运行效果Exception in thread “main” java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:705) at com.github.myself.runner.RunnableApplication.main(RunnableApplication.java:14)这是一个子线程 BY Runnabel这是一个子线程 BY Runnabel这是一个子线程 BY Runnabel同样的,线程是不允许多次启动的,这是不合法的。同时,这时我们也看出了使用 Thread 与 Runnable 的区别,当我们要多次启用一个相同的功能时。我想 Runnable 更适合你。但是,用了这两个方式,我们要如何知道线程的运行结果呢???FutureTask这个可能很少人(初学者)用到,不过这个现在是我最感兴趣的。它很有趣。其实还有一个小兄弟,那就是 Callable。 它们是一对搭档。如果上面的内容,你已经细细品味过,那么你应该已经发现 Callable 了。没错,他就在 Runnable 的源码中出现过。/* * @author Arthur van Hoff * @see java.lang.Thread * @see java.util.concurrent.Callable * @since JDK1.0 / @FunctionalInterfacepublic interface Runnable {}那么我们先去看看这个 Callable 吧。(如下为其源码)/* * A task that returns a result and may throw an exception. * Implementors define a single method with no arguments called * {@code call}. /@FunctionalInterfacepublic interface Callable<V> { /* * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result / V call() throws Exception;}其实,这是一个与 Runnable 基本相同的接口,当时它可以返回执行结果与检查异常,其计算结果将由 call() 方法返回。那么其实我们现在可以写出一个实现的类。public class MyCallable implements Callable { private String name; public MyCallable(String name) { this.name = name; } @Override public Object call() throws Exception { System.out.println(“这是一个子线程 BY " + name); return “successs”; }}关于更深入的探讨,我将留到下一篇文章中。好了,我想我们应该来看看 FutureTask 这个类的相关信息了。/* * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the {@code get} * methods will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled (unless the computation is invoked using * {@link #runAndReset}). / public class FutureTask<V> implements RunnableFuture<V> { //… }源码写的很清楚,这是一个可以取消的异步计算,提供了查询、计算、查看结果等的方法,同时我们还可以使用 runAndRest 来让我们可以重新启动计算。在查看其构造函数的时候,很高兴,我们看到了我们的 Callable 接口。/* * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}即我们将创建一个未来任务,来执行 Callable 的实现类。那么我们现在可以写出这样的代码了。final FutureTask fun = new FutureTask(new MyCallable(“Future”));那么接下来我们就可以运行我们的任务了吗?是的,我知道了 run() 方法,但是却没有 start 方法。官方既然说有结果,那么我找到了 get 方法。同时我尝试着写了一下测试代码。public static void main(String[] args) { MyCallable myCallable = new MyCallable(“Callable”); final FutureTask fun = new FutureTask(myCallable); fun.run(); try { Object result = fun.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }}运行效果,是正常的,这好像是那么回事。//运行效果这是一个子线程 BY Callablesuccesss可是,在我尝试着加多一些代码的时候,却发现了一些奇妙的东西 。我加多了一行 fun.run(); 代码,同时在 MyCallable 类中,将方法加一个时间线程去等待3s。结果是: 结果只输出了一次,同时 get 方法需要等运行3s后才有返回。这并不是我希望看到的。但是,起码我们可以知道,这次即使我们多次运行使用 run 方法,但是这个线程也只运行了一次。这是一个好消息。同时,我们也拿到了任务的结果,当时我们的进程被阻塞了,我们需要去等我们的任务执行完成。最后,在一番小研究后,以下的代码终于完成了我们预期的期望。public static void main(String[] args) { MyCallable myCallable = new MyCallable(“Callable”); ExecutorService executorService = Executors.newCachedThreadPool(); final FutureTask fun = new FutureTask(myCallable); executorService.execute(fun);// fun.run(); //阻塞进程 System.out.println(”–继续执行”); try { Object result = fun.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }}我们使用线程池去运行我们的 FutureTask 同时使用 get 方法去获取运行后的结果。结果是友好的,进程并不会被阻塞。关于更深入的探讨,我将留到下一篇文章中。总结一波好了,现在应该来整理以下了。Thread 需要我们继承实现,这是比较局限的,因为Java的 继承资源 是有限的,同时如果多次执行任务,还需要 多次创建任务类。Runnable 以接口的形式让我们实现,较为方便,同时多次执行任务也无需创建多个任务类,当时仅有一个 run 方法。以上两个方法都 无法获取任务执行结果 ,FutureTask可以获取任务结果。同时还有更多的新特性方便我们使用···公众号:Java猫说现架构设计(码农)兼创业技术顾问,不羁平庸,热爱开源,杂谈程序人生与不定期干货。 ...

December 30, 2018 · 5 min · jiezi

Java 中断异常的正确处理方式

处理InterruptedException这个故事可能很熟悉:你正在写一个测试程序,你需要暂停某个线程一段时间,所以你调用 Thread.sleep()。然后编译器或 IDE 就会抱怨说 InterruptedException 没有抛出声明或捕获。什么是 InterruptedException,你为什么要处理它?最常见的响应 InterruptedException 做法是吞下它 - 捕获它并且什么也不做(或者记录它,也没好多少) - 正如我们将在清单4中看到的那样。不幸的是,这种方法抛弃了关于中断发生的重要信息,这可能会损害应用程序取消活动或响应及时关闭的能力。阻塞方法当一个方法抛出 InterruptedException 时,意味着几件事情: 除了它可以抛出一个特定的检查异常, 它还告诉你它是一种阻塞方法,它会尝试解除阻塞并提前返回。阻塞方法不同于仅需要很长时间才能运行完成的普通方法。普通方法的完成仅取决于你要求它做多少事以及是否有足够的计算资源(CPU周期和内存)。另一方面,阻塞方法的完成还取决于某些外部事件,例如计时器到期,I/O 完成或另一个线程的操作(释放锁,设置标志或放置任务到工作队列)。普通方法可以在完成工作后立即结束,但阻塞方法不太好预测,因为它们依赖于外部事件。因为如果他们正在等待永远不会在事件,发生堵塞的方法有可能永远不结束,常用在阻塞可取消的操作。对于长时间运行的非阻塞方法,通常也是可以取消的。可取消操作是可以在通常自行完成之前从外部强制移动到完成状态的操作。 Thread提供的Thread.sleep() 和 Object.wait() 方法中断机制是一种取消线程继续阻塞的机制; 它允许一个线程请求另一个线程提前停止它正在做的事情。当一个方法抛出时 InterruptedException,它告诉你如果执行方法的线程被中断,它将尝试停止它正在做的事情提前返回, 并通过抛出 InterruptedException 表明它的提早返回。表现良好的阻塞库方法应该响应中断并抛出 InterruptedException 异常, 以便它们可以应用在可取消的活动中而不会妨碍程序的响应性。线程中断每个线程都有一个与之关联的布尔属性,表示其中断状态。中断状态最初为假; 当某个线程被其他线程通过调用中断 Thread.interrupt() 时, 会发生以下两种情况之一: 如果该线程正在执行低级别的中断阻塞方法 Thread.sleep(),Thread.join()或 Object.wait()等,它取消阻塞并抛出 InterruptedException。除此以外,interrupt() 仅设置线程的中断状态。在中断的线程中运行的代码可以稍后轮询中断的状态以查看是否已经请求停止它正在做的事情; 中断状态可以通过 Thread.isInterrupted() 读取,并且可以在命名不佳的单个操作Thread.interrupted()中读取和清除 。中断是一种合作机制。当一个线程中断另一个线程时,被中断的线程不一定会立即停止它正在做的事情。相反,中断是一种礼貌地要求另一个线程在方便的时候停止它正在做什么的方式。有些方法,比如Thread.sleep()认真对待这个请求,但方法不一定要注意中断请求。不阻塞但仍可能需要很长时间才能执行完成的方法可以通过轮询中断状态来尊重中断请求,并在中断时提前返回。你可以自由地忽略中断请求,但这样做可能会影响响应速度。中断的合作性质的一个好处是它为安全地构建可取消的活动提供了更大的灵活性。我们很少想立即停止活动; 如果活动在更新期间被取消,程序数据结构可能会处于不一致状态。中断允许可取消活动清理正在进行的任何工作,恢复不变量,通知其他活动取消事件,然后终止。处理InterruptedException如果 throw InterruptedException 意味着这个方法是一个阻塞方法,那么调用一个阻塞方法意味着你的方法也是一个阻塞方法,你应该有一个处理策略 InterruptedException。通常最简单的策略是你自己也抛出 InterruptedException 异常,如清单1 中的 putTask() 和 getTask() 方法所示。这样做会使你的方法响应中断,并且通常只需要添加 InterruptedException 到 throws 子句。清单1.通过不捕获它来向调用者传播InterruptedExceptionpublic class TaskQueue { private static final int MAX_TASKS = 1000; private BlockingQueue<Task> queue = new LinkedBlockingQueue<Task>(MAX_TASKS); public void putTask(Task r) throws InterruptedException { queue.put(r); } public Task getTask() throws InterruptedException { return queue.take(); }}有时在传播异常之前需要进行一些清理。在这种情况下,你可以捕获 InterruptedException,执行清理,然后重新抛出异常。清单2是一种用于匹配在线游戏服务中的玩家的机制,说明了这种技术。该 matchPlayers() 方法等待两个玩家到达然后开始新游戏。如果在一个玩家到达之后但在第二个玩家到达之前它被中断,则在重新投掷之前将该玩家放回队列 InterruptedException,以便玩家的游戏请求不会丢失。清单2.在重新抛出 InterruptedException 之前执行特定于任务的清理public class PlayerMatcher { private PlayerSource players; public PlayerMatcher(PlayerSource players) { this.players = players; } public void matchPlayers() <strong>throws InterruptedException</strong> { Player playerOne, playerTwo; try { while (true) { playerOne = playerTwo = null; // 等待两个玩家到来以便开始游戏 playerOne = players.waitForPlayer(); // 会抛出中断异常 playerTwo = players.waitForPlayer(); // 会抛出中断异常 startNewGame(playerOne, playerTwo); } } catch (InterruptedException e) { // 如一个玩家中断了, 将这个玩家放回队列 if (playerOne != null) players.addFirst(playerOne); // 然后传播异常 throw e; } }}不要吞下中断有时抛出 InterruptedException 不是一种选择,例如当通过 Runnable 调用可中断方法定义的任务时。在这种情况下,你不能重新抛出 InterruptedException,但你也不想做任何事情。当阻塞方法检测到中断和抛出时 InterruptedException,它会清除中断状态。如果你抓住 InterruptedException 但不能重新抛出它,你应该保留中断发生的证据,以便调用堆栈上的代码可以了解中断并在需要时响应它。此任务通过调用 interrupt()实现“重新中断”当前线程,如清单3所示。至少,无论何时捕获 InterruptedException 并且不重新抛出它,都要在返回之前重新中断当前线程。清单3.捕获InterruptedException后恢复中断状态public class TaskRunner implements Runnable { private BlockingQueue<Task> queue; public TaskRunner(BlockingQueue<Task> queue) { this.queue = queue; } public void run() { try { while (true) { Task task = queue.take(10, TimeUnit.SECONDS); task.execute(); } } catch (InterruptedException e) { //重要: 恢复中断状态 Thread.currentThread().interrupt(); } }}你可以做的最糟糕的事情 InterruptedException 就是吞下它 - 抓住它,既不重新抛出它也不重新确定线程的中断状态。处理你没有规划的异常的标准方法 - 捕获它并记录它 - 也算作吞噬中断,因为调用堆栈上的代码将无法找到它。(记录 InterruptedException 也很愚蠢,因为当人类读取日志时,对它做任何事都为时已晚。)清单4显示了吞下中断的常见模式:清单4.吞下中断 - 不要这样做// 不要这么做!public class TaskRunner implements Runnable { private BlockingQueue<Task> queue; public TaskRunner(BlockingQueue<Task> queue) { this.queue = queue; } public void run() { try { while (true) { Task task = queue.take(10, TimeUnit.SECONDS); task.execute(); } } catch (InterruptedException swallowed) { /* DON’T DO THIS - RESTORE THE INTERRUPTED STATUS INSTEAD / / 不要这么做 - 要让线程中断 / } }}如果你不能重新抛出 InterruptedException,无论你是否计划对中断请求执行操作,你仍然希望重新中断当前线程,因为单个中断请求可能有多个“收件人”。标准线程池(ThreadPoolExecutor)工作线程实现响应中断,因此中断线程池中运行的任务可能具有取消任务和通知执行线程线程池正在关闭的效果。如果作业吞下中断请求,则工作线程可能不会知道请求了中断,这可能会延迟应用程序或服务关闭。实施可取消的任务语言规范中没有任何内容给出任何特定语义的中断,但在较大的程序中,除了取消之外,很难保持中断的任何语义。根据活动,用户可以通过 GUI 或通过 JMX 或 Web 服务等网络机制请求取消。它也可以由程序逻辑请求。例如,如果 Web 爬虫检测到磁盘已满,则可能会自动关闭自身,或者并行算法可能会启动多个线程来搜索解决方案空间的不同区域,并在其中一个找到解决方案后取消它们。仅仅因为一个任务是取消并不意味着它需要一个中断请求响应立即。对于在循环中执行代码的任务,通常每次循环迭代仅检查一次中断。根据循环执行的时间长短,在任务代码通知线程中断之前可能需要一些时间(通过使用 Thread.isInterrupted()或通过调用阻塞方法轮询中断状态)。如果任务需要更具响应性,则可以更频繁地轮询中断状态。阻止方法通常在进入时立即轮询中断状态,InterruptedException 如果设置为提高响应性则抛出 。吞下一个中断是可以接受的,当你知道线程即将退出时。这种情况只发生在调用可中断方法的类是一个 Thread,而不是 Runnable 一般或通用库代码的一部分时,如清单5所示。它创建一个枚举素数的线程,直到它被中断并允许线程退出中断。寻求主要的循环在两个地方检查中断:一次是通过轮询 isInterrupted() while 循环的头部中的方法,一次是在调用阻塞 BlockingQueue.put() 方法时。清单5.如果你知道线程即将退出,则可以吞下中断public class PrimeProducer extends Thread { private final BlockingQueue<BigInteger> queue; PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } public void run() { try { BigInteger p = BigInteger.ONE; while (!Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { / Allow thread to exit / / 允许线程退出 */ } } public void cancel() { interrupt(); }}不间断阻塞并非所有阻止方法都抛出 InterruptedException。输入和输出流类可能会阻止等待 I/O 完成,但它们不会抛出InterruptedException,并且如果它们被中断,它们不会提前返回。但是,在套接字 I/O 的情况下,如果一个线程关闭了套接字,那么阻塞其他线程中该套接字上的 I/O 操作将在早期完成SocketException。非阻塞 I/O 类 java.nio 也不支持可中断 I/O,但可以通过关闭通道或请求唤醒来类似地取消阻塞操作 Selector。同样,尝试获取内在锁(输入一个 synchronized 块)不能被中断,但 ReentrantLock 支持可中断的采集模式。不可取消的任务有些任务只是拒绝被打断,使它们无法取消。但是,即使是不可取消的任务也应该尝试保留中断状态,以但在调用堆栈上层的代码在非可取消任务完成后想要对发生的中断进行响应。清单6显示了一个等待阻塞队列直到某个项可用的方法,无论它是否被中断。为了成为一个好公民,它在完成后恢复最终块中的中断状态,以免剥夺呼叫者的中断请求。它无法提前恢复中断状态,因为它会导致无限循环 - BlockingQueue.take(), 完成后则可以在进入时立即轮询中断状态, 如果发现中断状态设置,则可以抛出InterruptedException。清单6. 在返回之前恢复中断状态的非可执行任务public Task getNextTask(BlockingQueue<Task> queue) { boolean interrupted = false; try { while (true) { try { return queue.take(); } catch (InterruptedException e) { interrupted = true; // 失败了再试 } } } finally { if (interrupted) Thread.currentThread().interrupt(); }}摘要你可以使用 Java 平台提供的协作中断机制来构建灵活的取消策略。作业可以决定它们是否可以取消,它们希望如何响应中断,如果立即返回会影响应用程序的完整性,它们可以推迟中断以执行特定于任务的清理。即使你想完全忽略代码中断,也要确保在捕获 InterruptedException 并且不重新抛出代码时恢复中断状态 ,以便调用它的代码能够发现中断。 ...

December 22, 2018 · 2 min · jiezi

Java 几种线程状态之间的相互关系

Java Thread 可能处在以下几种状态Java Doc 里通过一个枚举类型 Enum<Thread.State> 来定义。线程可以处于以下状态之一:NEW 尚未启动的线程处于此状态。RUNNABLE 在Java虚拟机中执行的线程处于此状态。BLOCKED 被阻塞等待监视器锁定的线程处于此状态。WAITING 无限期等待另一个线程执行特定操作的线程处于此状态。TIMED_WAITING 正在等待另一个线程执行最多指定等待时间的操作的线程处于此状态。TERMINATED 已退出的线程处于此状态。线程在给定时间点只能处于一种状态。这些状态是虚拟机状态,不反映任何操作系统线程状态。通过示例而不是Java doc中给出的正式定义,可以很容易地理解任何令人困惑的概念。如果它们是现实生活中的例子,它可能更具有相关性。我想分享一些可能有助于理解这些线程状态的现实例子。由http://fastthread.io生成的传递图,显示哪些线程阻塞了哪些线程BLOCKED 阻塞Java doc正式将BLOCKED状态定义为:“阻塞等待监视器锁的线程处于此状态。”现实生活中的例子:今天你要去面试。这是您梦寐以求的工作,这是您过去几年一直瞄准的目标。你早上醒来,准备好了,穿上你最好的衣服,在镜子前面看起来很敏锐。现在你走出你的车库,意识到你的妻子已经开车了。在这种情况下,你只有一辆车,那么会发生什么?在现实生活中,可能会发生斗争:-)。在这里你被阻止,因为你的妻子已经开车了。你将无法参加面试。这是BLOCKED状态。用技术术语解释它,你是线程T1,你的妻子是线程T2,锁是汽车。T1在锁(即汽车)上被阻挡,因为T2已经获得了这个锁。提示:线程在等待监视器锁进入同步块/方法或在调用Object#wait()方法后重新输入同步块/方法时,将进入BLOCKED状态。WAITING 等候Java doc正式将WAITING状态定义为:“无限期等待另一个线程执行特定操作的线程处于此状态。”现实生活中的例子:让我们说几分钟后你的妻子带着车回家了。现在你意识到面试的时间已经到了,而且到达那里还有很长的路要走。因此,您将所有动力都放在汽车的油门踏板上。当允许的速度限制仅为60英里/小时时,您以100英里/小时的速度行驶。你不走运,交通警察看到你超过限速,他把你拉到路边。现在你进入WAITING状态,我的朋友。你停止开车,坐在车里闲逛,直到警察调查你,然后让你走。基本上,在他让你离开之前,你会陷入等待状态。用技术术语解释它,你是线程T1,警察是线程T2。你释放锁(即你停止开车),然后进入等待状态。直到警察(即T2)让你离开,你将陷入这种等待状态。提示:线程在调用以下方法之一时将进入WAITING状态:Object#wait() 未指定超时时间Thread#join() 未指定超时时间LockSupport#park()在对象上调用Object.wait()的线程处于WAITING状态,直到另一个线程调用该对象上的Object.notify()或Object.notifyAll()。调用Thread.join()的线程处于WAITING状态,以使指定的线程终止。TIMED_WAITING 指定时间的等待Java doc正式将TIMED_WAITING状态定义为:“等待另一个线程在指定的等待时间内执行操作的线程处于此状态。”现实生活中的例子: 尽管所有的戏剧性,你在采访中表现得非常好,给每个人留下了深刻的印象并得到了这份高薪工作。(恭喜!)你回到家里,告诉你的邻居关于这份新工作,以及你对此感到非常兴奋。你的朋友说他也在同一栋办公楼工作。他建议你们两个应该一起开车。你认为这是一个好主意。所以在工作的第一天,你去他家。你把车停在他家门口。你等了10分钟,但你的邻居仍然没有出来。你继续开始上班,因为你不想在第一天被推迟。现在这是TIMED_WAITING。用技术术语解释它,你是线程T1,你的邻居是线程T2。你释放锁定(即停止驾驶汽车)并等待长达 10分钟。如果你的邻居T2在10分钟内没出来,你就开始再次开车了。提示:线程在调用以下方法之一时将进入TIMED_WAITING状态:Thread#sleep()Object#wait() 指定超时时间Thread#join() 指定超时时间LockSupport#parkNanos()LockSupport#parkUntil()结论当有人在分析线程转储时,理解这些不同的线程状态是至关重要的。处于RUNNABLE,BLOCKED,WAITING和TIMED_WATING状态的线程数是多少?哪些线程被阻止?谁阻止了他们?用于锁定的对象是什么?这些是在线程转储中要分析的一些重要指标。这些详细的线程转储分析可以通过在线工具轻松完成,例如:http://fastthread.io/

December 20, 2018 · 1 min · jiezi

深入分析AQS实现原理

简单解释一下J.U.C,是JDK中提供的并发工具包,java.util.concurrent。里面提供了很多并发编程中很常用的实用工具类,比如atomic原子操作、比如lock同步锁、fork/join等。从Lock作为切入点我想以lock作为切入点来讲解AQS,毕竟同步锁是解决线程安全问题的通用手段,也是我们工作中用得比较多的方式。Lock APILock是一个接口,方法定义如下void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回trueboolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法void unlock() // 释放锁Lock的实现实现Lock接口的类有很多,以下为几个常见的锁实现ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数ReentrantReadWriteLock:重入读写锁,它实现了ReadWriteLock接口,在这个类中维护了两个锁,一个是ReadLock,一个是WriteLock,他们都分别实现了Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。StampedLock: stampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock是一种乐观的读策略,使得乐观锁完全不会阻塞写线程ReentrantLock的简单实用如何在实际应用中使用ReentrantLock呢?我们通过一个简单的demo来演示一下public class Demo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); count++; } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } }这段代码主要做一件事,就是通过一个静态的incr()方法对共享变量count做连续递增,在没有加同步锁的情况下多线程访问这个方法一定会存在线程安全问题。所以用到了ReentrantLock来实现同步锁,并且在finally语句块中释放锁。那么我来引出一个问题,大家思考一下多个线程通过lock竞争锁时,当竞争失败的锁是如何实现等待以及被唤醒的呢?什么是AQSaqs全称为AbstractQueuedSynchronizer,它提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。可以这么说,只要搞懂了AQS,那么J.U.C中绝大部分的api都能轻松掌握。AQS的两种功能从使用层面来说,AQS的功能分为两种:独占和共享独占锁,每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是以独占方式实现的互斥锁共享锁,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLockReentrantLock的类图仍然以ReentrantLock为例,来分析AQS在重入锁中的使用。毕竟单纯分析AQS没有太多的含义。先理解这个类图,可以方便我们理解AQS的原理AQS的内部实现AQS的实现依赖内部的同步队列,也就是FIFO的双向队列,如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node其实是由线程封装,当线程争抢锁失败后会封装成Node加入到ASQ队列中去Node类的组成如下static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; //前驱节点 volatile Node next; //后继节点 volatile Thread thread;//当前线程 Node nextWaiter; //存储在condition队列中的后继节点 //是否为共享锁 final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //将线程构造成一个Node,添加到等待队列 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //这个方法会在Condition队列使用,后续单独写一篇文章分析condition Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }释放锁以及添加线程对于队列的变化添加节点当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化,首先看一下添加节点的场景。这里会涉及到两个变化新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己通过CAS讲tail重新指向新的尾部节点释放锁移除节点head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下这个过程也是涉及到两个变化修改head节点指向下一个获得锁的节点新的获得锁的节点,将prev的指针指向null这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可AQS的源码分析清楚了AQS的基本架构以后,我们来分析一下AQS的源码,仍然以ReentrantLock为模型。ReentrantLock的时序图调用ReentrantLock中的lock()方法,源码的调用过程我使用了时序图来展现从图上可以看出来,当锁获取失败时,会调用addWaiter()方法将当前线程封装成Node节点加入到AQS队列,基于这个思路,我们来分析AQS的源码实现分析源码ReentrantLock.lock()public void lock() { sync.lock();}这个是获取锁的入口,调用sync这个类里面的方法,sync是什么呢?abstract static class Sync extends AbstractQueuedSynchronizersync是一个静态内部类,它继承了AQS这个抽象类,前面说过AQS是一个同步工具,主要用来实现同步控制。我们在利用这个工具的时候,会继承它来实现同步控制功能。通过进一步分析,发现Sync这个类有两个具体的实现,分别是NofairSync(非公平锁),FailSync(公平锁).公平锁 表示所有线程严格按照FIFO来获取锁非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁公平锁和非公平锁的实现上的差异,我会在文章后面做一个解释,接下来的分析仍然以非公平锁作为主要分析逻辑。NonfairSync.lockfinal void lock() { if (compareAndSetState(0, 1)) //通过cas操作来修改state状态,表示争抢锁的操作 setExclusiveOwnerThread(Thread.currentThread());//设置当前获得锁状态的线程 else acquire(1); //尝试去获取锁}这段代码简单解释一下由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁如果抢占锁成功,保存获得锁成功的当前线程抢占锁失败,调用acquire来走锁竞争逻辑compareAndSetStatecompareAndSetState的代码实现逻辑如下// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}这段代码其实逻辑很简单,就是通过cas乐观锁的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的state的值和预期值expect相等,则替换为update。更新成功返回true,否则返回false.这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,一级涉及到state这个属性的意义。state当state=0时,表示无锁状态当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁private volatile int state;需要注意的是:不同的AQS实现,state所表达的含义是不一样的。UnsafeUnsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;acquireacquire是AQS中的方法,如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,这里大家思考一下,acquire方法中的1的参数是用来做什么呢?如果没猜中,往前面回顾一下state这个概念 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }这个方法的主要逻辑是通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部acquireQueued,将Node作为参数,通过自旋去尝试获取锁。如果大家看过我写的Synchronized源码分析的文章,就应该能够明白自旋存在的意义NonfairSync.tryAcquire这个方法的作用是尝试获取锁,如果成功返回true,不成功返回false它是重写AQS类中的tryAcquire方法,并且大家仔细看一下AQS中tryAcquire方法的定义,并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实现的模版方法,那应该定义成abstract,让子类来实现呀?大家想想为什么protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires);}nonfairTryAcquiretryAcquire(1)在NonfairSync中的实现代码如下ffinal boolean nonfairTryAcquire(int acquires) { //获得当前执行的线程 final Thread current = Thread.currentThread(); int c = getState(); //获得state的值 if (c == 0) { //state=0说明当前是无锁状态 //通过cas操作来替换state的值改为1,大家想想为什么要用cas呢? //理由是,在多线程环境中,直接修改state=1会存在线程安全问题,你猜到了吗? if (compareAndSetState(0, acquires)) { //保存当前获得锁的线程 setExclusiveOwnerThread(current); return true; } } //这段逻辑就很简单了。如果是同一个线程来获得锁,则直接增加重入次数 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //增加重入次数 if (nextc < 0) // overflow throw new Error(“Maximum lock count exceeded”); setState(nextc); return true; } return false;}获取当前线程,判断当前的锁的状态如果state=0表示当前是无锁状态,通过cas更新state状态的值如果当前线程是属于重入,则增加重入次数addWaiter当tryAcquire方法获取锁失败以后,则会先调用addWaiter将当前线程封装成Node,然后添加到AQS队列private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE //将当前线程封装成Node,并且mode为独占锁 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法 Node pred = tail; if (pred != null) { //tail不为空的情况,说明队列中存在节点数据 node.prev = pred; //讲当前线程的Node的prev节点指向tail if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列 pred.next = node;//cas成功,把旧的tail的next指针指向新的tail return node; } } enq(node); //tail=null,将node添加到同步队列中 return node; }将当前线程封装成Node判断当前链表中的tail节点是否为空,如果不为空,则通过cas操作把当前线程的node添加到AQS队列如果为空或者cas失败,调用enq将节点添加到AQS队列enqenq就是通过自旋操作把当前节点加入到队列中private Node enq(final Node node) { //自旋,不做过多解释,不清楚的关注公众号[架构师修炼宝典] for (;;) { Node t = tail; //如果是第一次添加到队列,那么tail=null if (t == null) { // Must initialize //CAS的方式创建一个空的Node作为头结点 if (compareAndSetHead(new Node())) //此时队列中只一个头结点,所以tail也指向它 tail = head; } else {//进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node node.prev = t; if (compareAndSetTail(t, node)) {//t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点 t.next = node; return t; } } } }假如有两个线程t1,t2同时进入enq方法,t==null表示队列是首次使用,需要先初始化另外一个线程cas失败,则进入下次循环,通过cas操作将node添加到队尾到目前为止,通过addwaiter方法构造了一个AQS队列,并且将线程添加到了队列的节点中acquireQueued将添加到队列中的Node作为参数传入acquireQueued方法,这里面会做抢占锁的操作final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺 setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null p.next = null; // help GC failed = false; //获取锁成功 return interrupted; }//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 interrupted = true; } } finally { if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作 cancelAcquire(node); }}获取当前节点的prev节点如果prev节点为head节点,那么它就有资格去争抢锁,调用tryAcquire抢占锁抢占锁成功以后,把获得锁的节点设置为head,并且移除原来的初始化head节点如果获得锁失败,则根据waitStatus决定是否需要挂起线程最后,通过cancelAcquire取消获得锁的操作前面的逻辑都很好理解,主要看一下shouldParkAfterFailedAcquire这个方法和parkAndCheckInterrupt的作用shouldParkAfterFailedAcquire从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点,if (p == head)条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)操作 shouldParkAfterFailedAcquire方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,如果是,是说明此节点已经将状态设置-如果锁释放,则应当通知它,所以它可以安全的阻塞了,返回true。private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前继节点的状态 if (ws == Node.SIGNAL)//如果是SIGNAL状态,意味着当前线程需要被unpark唤醒 return true;如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。 if (ws > 0) {// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don’t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}parkAndCheckInterrupt如果shouldParkAfterFailedAcquire返回了true,则会执行:parkAndCheckInterrupt()方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}LockSupportLockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time); unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。permit相当于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 如果再调用一次park会阻塞,因为permit已经是0了。直到permit变成1.这时调用unpark会把permit设置为1.每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积锁的释放ReentrantLock.unlock加锁的过程分析完以后,再来分析一下释放锁的过程,调用release方法,这个方法里面做两件事,1,释放锁 ;2,唤醒park的线程public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}tryRelease这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。protected final boolean tryRelease(int releases) { int c = getState() - releases; // 这里是将锁的数量减1 if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 由于重入的关系,不是每次释放锁c都等于0, // 直到最后一次释放锁时,才会把当前线程释放 free = true; setExclusiveOwnerThread(null); } setState(c); return free;}unparkSuccessor在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态, s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的 s = t; }//内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁 if (s != null) LockSupport.unpark(s.thread); //释放许可}总结通过这篇文章基本将AQS队列的实现过程做了比较清晰的分析,主要是基于非公平锁的独占锁实现。在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。 ...

December 14, 2018 · 4 min · jiezi

一份针对于新手的多线程实践

前言前段时间在某个第三方平台看到我写作字数居然突破了 10W 字,难以想象高中 800 字作文我都得巧妙的利用换行来完成(懂的人肯定也干过????)。干了这行养成了一个习惯:能撸码验证的事情都自己验证一遍。于是在上周五通宵加班的空余时间写了一个工具:https://github.com/crossoverJie/NOWS利用 SpringBoot 只需要一行命令即可统计自己写了多少个字。java -jar nows-0.0.1-SNAPSHOT.jar /xx/Hexo/source/_posts传入需要扫描的文章目录即可输出结果(目前只支持 .md 结尾 Markdown 文件)当然结果看个乐就行(40 几万字),因为早期的博客我喜欢大篇的贴代码,还有一些英文单词也没有过滤,所以导致结果相差较大。如果仅仅只是中文文字统计肯定是准的,并且该工具内置灵活的扩展方式,使用者可以自定义统计策略,具体请看后文。其实这个工具挺简单的,代码量也少,没有多少可以值得拿出来讲的。但经过我回忆不管是面试还是和网友们交流都发现一个普遍的现象:大部分新手开发都会去看多线程、但几乎都没有相关的实践。甚至有些都不知道多线程拿来在实际开发中有什么用。为此我想基于这个简单的工具为这类朋友带来一个可实践、易理解的多线程案例。至少可以让你知道:为什么需要多线程?怎么实现一个多线程程序?多线程带来的问题及解决方案?单线程统计再谈多线程之前先来聊聊单线程如何实现。本次的需求也很简单,只是需要扫描一个目录读取下面的所有文件即可。所有我们的实现有以下几步:读取某个目录下的所有文件。将所有文件的路径保持到内存。遍历所有的文件挨个读取文本记录字数即可。先来看前两个如何实现,并且当扫描到目录时需要继续读取当前目录下的文件。这样的场景就非常适合递归: public List<String> getAllFile(String path){ File f = new File(path) ; File[] files = f.listFiles(); for (File file : files) { if (file.isDirectory()){ String directoryPath = file.getPath(); getAllFile(directoryPath); }else { String filePath = file.getPath(); if (!filePath.endsWith(".md")){ continue; } allFile.add(filePath) ; } } return allFile ; }}读取之后将文件的路径保持到一个集合中。需要注意的是这个递归次数需要控制下,避免出现栈溢出(StackOverflow)。最后读取文件内容则是使用 Java8 中的流来进行读取,这样代码可以更简洁:Stream<String> stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8);List<String> collect = stringStream.collect(Collectors.toList());接下来便是读取字数,同时要过滤一些特殊文本(比如我想过滤掉所有的空格、换行、超链接等)。扩展能力简单处理可在上面的代码中遍历 collect 然后把其中需要过滤的内容替换为空就行。但每个人的想法可能都不一样。比如我只想过滤掉空格、换行、超链接就行了,但有些人需要去掉其中所有的英文单词,甚至换行还得留着(就像写作文一样可以充字数)。所有这就需要一个比较灵活的处理方式。看过上文《利用责任链模式设计一个拦截器》应该很容易想到这样的场景责任链模式再合适不过了。关于责任链模式具体的内容就不在详述了,感兴趣的可以查看上文。这里直接看实现吧:定义责任链的抽象接口及处理方法:public interface FilterProcess { /** * 处理文本 * @param msg * @return / String process(String msg) ;}处理空格和换行的实现:public class WrapFilterProcess implements FilterProcess{ @Override public String process(String msg) { msg = msg.replaceAll("\s", “”); return msg ; }}处理超链接的实现:public class HttpFilterProcess implements FilterProcess{ @Override public String process(String msg) { msg = msg.replaceAll("^((https|http|ftp|rtsp|mms)?:\/\/)[^\s]+",""); return msg ; }}这样在初始化时需要将这些处理 handle 都加入责任链中,同时提供一个 API 供客户端执行即可。这样一个简单的统计字数的工具就完成了。多线程模式在我本地一共就几十篇博客的条件下执行一次还是很快的,但如果我们的文件是几万、几十万甚至上百万呢。虽然功能可以实现,但可以想象这样的耗时绝对是成倍的增加。这时多线程就发挥优势了,由多个线程分别去读取文件最后汇总结果即可。这样实现的过程就变为:读取某个目录下的所有文件。将文件路径交由不同的线程自行处理。最终汇总结果。多线程带来的问题也不是使用多线程就万事大吉了,先来看看第一个问题:共享资源。简单来说就是怎么保证多线程和单线程统计的总字数是一致的。基于我本地的环境先看看单线程运行的结果:总计为:414142 字。接下来换为多线程的方式:List<String> allFile = scannerFile.getAllFile(strings[0]);logger.info(“allFile size=[{}]",allFile.size());for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager));}public class ScanNumTask implements Runnable { private static Logger logger = LoggerFactory.getLogger(ScanNumTask.class); private String path; private FilterProcessManager filterProcessManager; public ScanNumTask(String path, FilterProcessManager filterProcessManager) { this.path = path; this.filterProcessManager = filterProcessManager; } @Override public void run() { Stream<String> stringStream = null; try { stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8); } catch (Exception e) { logger.error(“IOException”, e); } List<String> collect = stringStream.collect(Collectors.toList()); for (String msg : collect) { filterProcessManager.process(msg); } }}使用线程池管理线程,更多线程池相关的内容请看这里:《如何优雅的使用和理解线程池》执行结果:我们会发现无论执行多少次,这个值都会小于我们的预期值。来看看统计那里是怎么实现的。@Componentpublic class TotalWords { private long sum = 0 ; public void sum(int count){ sum += count; } public long total(){ return sum; }}可以看到就是对一个基本类型进行累加而已。那导致这个值比预期小的原因是什么呢?我想大部分人都会说:多线程运行时会导致有些线程把其他线程运算的值覆盖。但其实这只是导致这个问题的表象,根本原因还是没有讲清楚。内存可见性核心原因其实是由 Java 内存模型(JMM)的规定导致的。这里引用一段之前写的《你应该知道的 volatile 关键字》一段解释:由于 Java 内存模型(JMM)规定,所有的变量都存放在主内存中,而每个线程都有着自己的工作内存(高速缓存)。线程在工作时,需要将主内存中的数据拷贝到工作内存中。这样对数据的任何操作都是基于工作内存(效率提高),并且不能直接操作主内存以及其他线程工作内存中的数据,之后再将更新之后的数据刷新到主内存中。这里所提到的主内存可以简单认为是堆内存,而工作内存则可以认为是栈内存。如下图所示:所以在并发运行时可能会出现线程 B 所读取到的数据是线程 A 更新之前的数据。更多相关内容就不再展开了,感兴趣的朋友可以翻翻以前的博文。直接来说如何解决这个问题吧,JDK 其实已经帮我们想到了这些问题。在 java.util.concurrent 并发包下有许多你可能会使用到的并发工具。这里就非常适合 AtomicLong,它可以原子性的对数据进行修改。来看看修改后的实现:@Componentpublic class TotalWords { private AtomicLong sum = new AtomicLong() ; public void sum(int count){ sum.addAndGet(count) ; } public long total(){ return sum.get() ; }}只是使用了它的两个 API 而已。再来运行下程序会发现结果居然还是不对。甚至为 0 了。线程间通信这时又出现了一个新的问题,来看看获取总计数据是怎么实现的。List<String> allFile = scannerFile.getAllFile(strings[0]);logger.info(“allFile size=[{}]",allFile.size());for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager));}executorService.shutdown();long total = totalWords.total();long end = System.currentTimeMillis();logger.info(“total sum=[{}],[{}] ms”,total,end-start);不知道大家看出问题没有,其实是在最后打印总数时并不知道其他线程是否已经执行完毕了。因为 executorService.execute() 会直接返回,所以当打印获取数据时还没有一个线程执行完毕,也就导致了这样的结果。关于线程间通信之前我也写过相关的内容:《深入理解线程通信》大概的方式有以下几种:这里我们使用线程池的方式:在停用线程池后加上一个判断条件即可:executorService.shutdown();while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info(“worker running”);}long total = totalWords.total();long end = System.currentTimeMillis();logger.info(“total sum=[{}],[{}] ms”,total,end-start);这样我们再次尝试,发现无论多少次结果都是正确的了:效率提升可能还会有朋友问,这样的方式也没见提升多少效率啊。这其实是由于我本地文件少,加上一个文件处理的耗时也比较短导致的。甚至线程数开的够多导致频繁的上下文切换还是让执行效率降低。为了模拟效率的提升,每处理一个文件我都让当前线程休眠 100 毫秒来模拟执行耗时。先看单线程运行需要耗时多久。总共耗时:[8404] ms接着在线程池大小为 4 的情况下耗时:总共耗时:[2350] ms可见效率提升还是非常明显的。更多思考这只是多线程其中的一个用法,相信看到这里的朋友应该多它的理解更进一步了。再给大家留个阅后练习,场景也是类似的:在 Redis 或者其他存储介质中存放有上千万的手机号码数据,每个号码都是唯一的,需要在最快的时间内把这些号码全部都遍历一遍。有想法感兴趣的朋友欢迎在文末留言参与讨论????????。总结希望看完的朋友心中能对文初的几个问题能有自己的答案:为什么需要多线程?怎么实现一个多线程程序?多线程带来的问题及解决方案?文中的代码都在此处。https://github.com/crossoverJie/NOWS你的点赞与转发是最大的支持。 ...

October 29, 2018 · 2 min · jiezi