Erlang-源码阅读-scheduler

移步 https://ruby-china.org/topics...

August 8, 2019 · 1 min · jiezi

Thread-and-AbstractQueuedSynchronizer

Thread详解Java并发之AQS详解 Thread中join实现如下: public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } }}其中利用了Object的wait方法,调用的前提是已经获得join线程的锁,如果thread对象被锁住则会等待其被释放。 ...

June 30, 2019 · 2 min · jiezi

Java-Monitor管程

操作系统在面对线程间同步的时候,会支持例如semaphore信号量和mutex互斥量等同步原语,而monitor是在编程语言中被实现的,下面介绍一下java中monitor的实现原理: 以一个阻塞队列的实现来举例: 同时,java内置的synchronized关键字可以认为是MESA模型的简化版,其只能有一个条件变量,但编译器会自动添加加锁与解锁的代码。synchronized关键字可以修饰实例方法、类方法以及代码块,如果修饰的是代码块,需要制定关联的Object;如果修饰的是实例方法,那么其关联的对象实际上是this;如果修饰的是类方法,那么其关联的对象是this.class。这些关联的对象就是MESA模型里的条件变量。 管程:并发编程的万能钥匙

June 16, 2019 · 1 min · jiezi

Java并发核心浅谈二

回顾在上一篇 Java并发核心浅谈 我们大概了解到了Lock和synchronized的共同点,再简单总结下: Lock主要是自定义一个 counter,从而利用CAS对其实现原子操作,而synchronized是c++ hotspot实现的 monitor(具体的咱也没看,咱就不说)二者都可重入(递归,互调,循环),其本质都是维护一个可计数的 counter,在其它线程访问加锁对象时会判断 counter 是否为 0理论上讲二者都是阻塞式的,因为线程在拿锁时,如果拿不到,最终的结果只能等待(前提是线程的最终目的就是要获取锁)读写锁分离成两把锁了,所以不一样举个例子:线程 A 持有了某个对象的 monitor,其它线程在访问该对象时,发现 monitor 不为 0,所以只能阻塞挂起或者加入等待队列,等着线程 A 处理完退出后将 monitor 置为 0。在线程 A 处理任务期间,其它线程要么循环访问 monitor,要么一直阻塞等着线程 A 唤醒,再不济就真的如我所说,放弃锁的竞争,去处理别的任务。但是应该做不到去处理别的任务后,任务处理到一半,被线程 A 通知后再回去抢锁 公平锁与非公平锁不共享 counter // 非公平锁在第一次拿锁失败也会调用该方法 public final void acquire(int arg) { // 没拿到锁就加入队列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 非公平锁方法 final void lock() { // 走来就尝试获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); // 上面那个方法 } // 公平锁 Acquire 计数 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 拿到计数 int c = getState(); if (c == 0) { // 公平锁会先尝试排队 非公平锁少个 !hasQueuedPredecessors() 其它代码一样 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * @return {@code true} if there is a queued thread preceding the // 当前线程前有线程等待,则排队 * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty // 队列为空不用排队 * @since 1.7 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; // 当前线程处于头节点的下一个且不为空则不用排队 // 或该线程就是当前持有锁的线程,即重入锁,也不用排队 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } // 加入等待队列 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 获取失败会检查节点状态 // 然后 park 线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; // 线程取消加锁 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; // 解除线程 park /** waitStatus value to indicate thread is waiting on condition */ // static final int CONDITION = -2; // 线程被阻塞 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; // 广播 // 官方注释 /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus;读锁与写锁(共享锁与排他锁)读锁:共享 counter ...

June 16, 2019 · 7 min · jiezi

记python-logging非进程安全踩得坑

背景有两个python进程A和B在共用同一个logger,用的是TimedRotatingFileHandler,并且每天午夜进行文件rollover,保留15天的文件 现象偶尔会发现某一天的日志里记录的时间是后一天的,并且只有几行 原因虽然官方文档中说logging handler提供的类都是多线程安全的,但并不是多进程安全的,通过分析源码发现事实也确实如此。logging模块利用handler来负责日志文件的rollover,下面以TimedRotatingFileHandler为例来看下它的rollover是如何实现的: 所有打log的函数都会在Handler类中调用handle函数,然后调用emit函数: def handle(self, record): """ Conditionally emit the specified logging record. Emission depends on filters which may have been added to the handler. Wrap the actual emission of the record with acquisition/release of the I/O thread lock. Returns whether the filter passed the record for emission. """ rv = self.filter(record) if rv: self.acquire() try: self.emit(record) finally: self.release() return rv在TimedRotatingFileHandler的父类BaseRotatingHandler中重载了emit函数: def emit(self, record): """ Emit a record. Output the record to the file, catering for rollover as described in doRollover(). """ try: if self.shouldRollover(record): self.doRollover() logging.FileHandler.emit(self, record) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record)可以看到其中利用shouldRollover判断是否需要rollover,利用doRollover来执行rollover ...

May 22, 2019 · 7 min · jiezi

多线程安全的单例模式

单例模式被认为是最简单的设计模式,也经常被用到,下面以我在实际项目中用到的一个单例模式为例,看下如何利用经典的两次判空方法令其高效、安全得工作在多线程环境(见代码中注释)。 package core;import org.apache.ibatis.io.Resources;import org.apache.ibatis.session.SqlSessionFactory;import org.apache.ibatis.session.SqlSessionFactoryBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.io.Reader;import java.util.Properties;public class SqlSessionFactorySingleton { private static Logger logger = LoggerFactory.getLogger("SqlSessionFactorySingleton"); private static final String MYBATIS_CONFIG_FILE = "mybatis-config.xml"; // 使用volatile关键字令A线程的修改对B线程立即可见 private static volatile SqlSessionFactory factory = null; // 屏蔽默认的公共构造函数 private SqlSessionFactorySingleton() { } public static SqlSessionFactory getInstance() { if (factory == null) { // 第一次判空 // 只有创建SqlSessionFactory实例时才需要同步,不直接在方法上加synchronized关键字可以避免在每次判断实例是否创建时加锁,极大得提高并发效率 synchronized (SqlSessionFactorySingleton.class) { // 如果A、B两个线程同时通过第一次判空,A获得锁,B等待,等A创建完SqlSessionFactory实例释放锁,B获得锁,此时B需要再次判断实例是否已创建来避免重复创建 if (factory == null) { // 第二次判空 String configFile = "config.properties"; try (Reader configReader = Resources.getResourceAsReader(configFile); Reader mybatisReader = Resources.getResourceAsReader(MYBATIS_CONFIG_FILE)) { Properties properties = new Properties(); properties.load(configReader); SqlSessionFactoryBuilder builder = new SqlSessionFactoryBuilder(); factory = builder.build(mybatisReader, properties); } catch (IOException e) { logger.error("Exception when reading {} and {}:", configFile, MYBATIS_CONFIG_FILE, e); } } } } return factory; }}

May 21, 2019 · 1 min · jiezi

Java并发之线程组ThreadGroup介绍

线程组介绍线程组的构造ThreadGroup方法介绍 查看线程组信息终止线程组中的所有线程总结Links 作者资源相关资源线程组介绍线程组(ThreadGroup)简单来说就是一个线程集合。线程组的出现是为了更方便地管理线程。 线程组是父子结构的,一个线程组可以集成其他线程组,同时也可以拥有其他子线程组。从结构上看,线程组是一个树形结构,每个线程都隶属于一个线程组,线程组又有父线程组,这样追溯下去,可以追溯到一个根线程组——System线程组。 <div align=center> </div> 下面介绍一下线程组树的结构: JVM创建的system线程组是用来处理JVM的系统任务的线程组,例如对象的销毁等。system线程组的直接子线程组是main线程组,这个线程组至少包含一个main线程,用于执行main方法。main线程组的子线程组就是应用程序创建的线程组。你可以在main方法中看到JVM创建的system线程组和main线程组: public static void main(String[] args) { ThreadGroup mainThreadGroup=Thread.currentThread().getThreadGroup(); ThreadGroup systenThreadGroup=mainThreadGroup.getParent(); System.out.println("systenThreadGroup name = "+systenThreadGroup.getName()); System.out.println("mainThreadGroup name = "+mainThreadGroup.getName()); }console输出: systenThreadGroup name = systemmainThreadGroup name = main一个线程可以访问其所属线程组的信息,但不能访问其所属线程组的父线程组或者其他线程组的信息。 线程组的构造java.lang.ThreadGroup提供了两个构造函数: ConstructorDescriptionThreadGroup(String name)根据线程组名称创建线程组,其父线程组为main线程组ThreadGroup(ThreadGroup parent, String name)根据线程组名称创建线程组,其父线程组为指定的parent线程组下面演示一下这两个构造函数的用法: public static void main(String[] args) { ThreadGroup subThreadGroup1 = new ThreadGroup("subThreadGroup1"); ThreadGroup subThreadGroup2 = new ThreadGroup(subThreadGroup1, "subThreadGroup2"); System.out.println("subThreadGroup1 parent name = " + subThreadGroup1.getParent().getName()); System.out.println("subThreadGroup2 parent name = " + subThreadGroup2.getParent().getName());}console输出: ...

April 29, 2019 · 3 min · jiezi

Go语言高阶:调度器系列(1)起源

如果把语言比喻为武侠小说中的武功,如果只是会用,也就是达到四五层,如果用的熟练也就六七层,如果能见招拆招也得八九层,如果你出神入化,立于不败之地十层。如果你想真正掌握一门语言的,怎么也得八层以上,需要你深入了解这门语言方方面面的细节。希望以后对Go语言的掌握能有八九层,怎么能不懂调度器!?Google、百度、微信搜索了许多Go语言调度的文章,这些文章上来就讲调度器是什么样的,它由哪些组成,它的运作原理,搞的我只能从这些零散的文章中形成调度器的“概貌”,这是我想要的结果,但这还不够。学习不仅要知其然,还要知其所以然。学习之前,先学知识点的历史,再学知识,这样你就明白,为什么它是当下这个样子。所以,我打算写一个goroutine调度器的系列文章,从历史背景讲起,循序渐进,希望大家能对goroutine调度器有一个全面的认识。这篇文章介绍调度器相关的历史背景,请慢慢翻阅。远古时代上面这个大家伙是ENIAC,它诞生在宾夕法尼亚大学,是世界第一台真正的通用计算机,和现代的计算机相比,它是相当的“笨重”,它的计算能力,跟现代人手普及的智能手机相比,简直是一个天上一个地下,ENIAC在地下,智能手机在天上。它上面没有操作系统,更别提进程、线程和协程了。进程时代后来,现代化的计算机有了操作系统,每个程序都是一个进程,但是操作系统在一段时间只能运行一个进程,直到这个进程运行完,才能运行下一个进程,这个时期可以成为单进程时代——串行时代。和ENIAC相比,单进程是有了几万倍的提度,但依然是太慢了,比如进程要读数据阻塞了,CPU就在哪浪费着,伟大的程序员们就想了,不能浪费啊,怎么才能充分的利用CPU呢?后来操作系统就具有了最早的并发能力:多进程并发,当一个进程阻塞的时候,切换到另外等待执行的进程,这样就能尽量把CPU利用起来,CPU就不浪费了。线程时代多进程真实个好东西,有了对进程的调度能力之后,伟大的程序员又发现,进程拥有太多资源,在创建、切换和销毁的时候,都会占用很长的时间,CPU虽然利用起来了,但CPU有很大的一部分都被用来进行进程调度了,怎么才能提高CPU的利用率呢?大家希望能有一种轻量级的进程,调度不怎么花时间,这样CPU就有更多的时间用在执行任务上。后来,操作系统支持了线程,线程在进程里面,线程运行所需要资源比进程少多了,跟进程比起来,切换简直是“不算事”。一个进程可以有多个线程,CPU在执行调度的时候切换的是线程,如果下一个线程也是当前进程的,就只有线程切换,“很快”就能完成,如果下一个线程不是当前的进程,就需要切换进程,这就得费点时间了。这个时代,CPU的调度切换的是进程和线程。多线程看起来很美好,但实际多线程编程却像一坨屎,一是由于线程的设计本身有点复杂,而是由于需要考虑很多底层细节,比如锁和冲突检测。协程多进程、多线程已经提高了系统的并发能力,但是在当今互联网高并发场景下,为每个任务都创建一个线程是不现实的,因为会消耗大量的内存(每个线程的内存占用级别为MB),线程多了之后调度也会消耗大量的CPU。伟大的程序员们有开始想了,如何才能充分利用CPU、内存等资源的情况下,实现更高的并发?既然线程的资源占用、调度在高并发的情况下,依然是比较大的,是否有一种东西,更加轻量?你可能知道:线程分为内核态线程和用户态线程,用户态线程需要绑定内核态线程,CPU并不能感知用户态线程的存在,它只知道它在运行1个线程,这个线程实际是内核态线程。用户态线程实际有个名字叫协程(co-routine),为了容易区分,我们使用协程指用户态线程,使用线程指内核态线程。协程跟线程是有区别的,线程由CPU调度是抢占式的,协程由用户态调度是协作式的,一个协程让出CPU后,才执行下一个协程。协程和线程有3种映射关系:N:1,N个协程绑定1个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1个进程的所有协程都绑定在1个线程上,一是某个程序用不了硬件的多核加速能力,二是一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。1:1,1个协程绑定1个线程,这种最容易实现。协程的调度都由CPU完成了,不存在N:1缺点,但有一个缺点是协程的创建、删除和切换的代价都由CPU完成,有点略显昂贵了。M:N,M个协程绑定1个线程,是N:1和1:1类型的结合,克服了以上2种模型的缺点,但实现起来最为复杂。协程是个好东西,不少语言支持了协程,比如:Lua、Erlang、Java(C++即将支持),就算语言不支持,也有库支持协程,比如C语言的coroutine(风云大牛作品)、Kotlin的kotlinx.coroutines、Python的gevent。goroutineGo语言的诞生就是为了支持高并发,有2个支持高并发的模型:CSP和Actor。鉴于Occam和Erlang都选用了CSP(来自Go FAQ),并且效果不错,Go也选了CSP,但与前两者不同的是,Go把channel作为头等公民。就像前面说的多线程编程太不友好了,Go为了提供更容易使用的并发方法,使用了goroutine和channel。goroutine来自协程的概念,让一组可复用的函数运行在一组线程之上,即使有协程阻塞,该线程的其他协程也可以被runtime调度,转移到其他可运行的线程上。最关键的是,程序员看不到这些底层的细节,这就降低了编程的难度,提供了更容易的并发。Go中,协程被称为goroutine(Rob Pike说goroutine不是协程,因为他们并不完全相同),它非常轻量,一个goroutine只占几KB,并且这几KB就足够goroutine运行完,这就能在有限的内存空间内支持大量goroutine,支持了更多的并发。虽然一个goroutine的栈只占几KB,但实际是可伸缩的,如果需要更多内容,runtime会自动为goroutine分配。Go语言的老调度器终于来到了Go语言的调度器环节。调度器的任务是在用户态完成goroutine的调度,而调度器的实现好坏,对并发实际有很大的影响,并且Go的调度器就是M:N类型的,实现起来也是最复杂。现在的Go语言调度器是2012年重新设计的(设计方案),在这之前的调度器称为老调度器,老调度器的实现不太好,存在性能问题,所以用了4年左右就被替换掉了,老调度器大概是下面这个样子:最下面是操作系统,中间是runtime,runtime在Go中很重要,许多程序运行时的工作都由runtime完成,调度器就是runtime的一部分,虚线圈出来的为调度器,它有两个重要组成:M,代表线程,它要运行goroutine。Global G Queue,是全局goroutine队列,所有的goroutine都保存在这个队列中,goroutine用G进行代表。M想要执行、放回G都必须访问全局G队列,并且M有多个,即多线程访问同一资源需要加锁进行保证互斥/同步,所以全局G队列是有互斥锁进行保护的。老调度器有4个缺点:创建、销毁、调度G都需要每个M获取锁,这就形成了激烈的锁竞争。M转移G会造成延迟和额外的系统负载。比如当G中包含创建新协程的时候,M创建了G’,为了继续执行G,需要把G’交给M’执行,也造成了很差的局部性,因为G’和G是相关的,最好放在M上执行,而不是其他M’。M中的mcache是用来存放小对象的,mcache和栈都和M关联造成了大量的内存开销和差的局部性。系统调用导致频繁的线程阻塞和取消阻塞操作增加了系统开销。Go语言的新调度器面对以上老调度的问题,Go设计了新的调度器,设计文稿:https://golang.org/s/go11sched新调度器引入了:P:Processor,它包含了运行goroutine的资源,如果线程想运行goroutine,必须先获取P,P中还包含了可运行的G队列。work stealing:当M绑定的P没有可运行的G时,它可以从其他运行的M’那里偷取G。现在,调度器中3个重要的缩写你都接触到了,所有文章都用这几个缩写,请牢记:G: goroutineM: 工作线程P: 处理器,它包含了运行Go代码的资源,M必须和一个P关联才能运行G。这篇文章的目的不是介绍调度器的实现,而是调度器的一些理念,帮助你后面更好理解调度器的实现,所以我们回归到调度器设计思想上。调度器的有两大思想:复用线程:协程本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有2个体现:1)work stealing,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。2)hand off,当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。利用并行:GOMAXPROCS设置P的数量,当GOMAXPROCS大于1时,就最多有GOMAXPROCS个线程处于运行状态,这些线程可能分布在多个CPU核上同时运行,使得并发利用并行。另外,GOMAXPROCS也限制了并发的程度,比如GOMAXPROCS = 核数/2,则最多利用了一半的CPU核进行并行。调度器的两小策略:抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死,这就是goroutine不同于coroutine的一个地方。全局G队列:在新的调度器中依然有全局G队列,但功能已经被弱化了,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。上面提到并行了,关于并发和并行再说一下:Go创始人Rob Pike一直在强调go是并发,不是并行,因为Go做的是在一段时间内完成几十万、甚至几百万的工作,而不是同一时间同时在做大量的工作。并发可以利用并行提高效率,调度器是有并行设计的。并行依赖多核技术,每个核上在某个时间只能执行一个线程,当我们的CPU有8个核时,我们能同时执行8个线程,这就是并行。结束语这篇文章的主要目的是为后面介绍Go语言调度器做铺垫,由远及近的方式简要介绍了多进程、多线程、协程、并发和并行有关的“史料”,希望你了解为什么Go采用了goroutine,又为何调度器如此重要。如果你等不急了,想了解Go调度器相关的原理,看下这些文章:设计方案:https://golang.org/s/go11sched代码中关于调度器的描述:https://golang.org/src/runtim…引用最多的调度器文章:https://morsmachine.dk/go-sch…kavya的PPT,目前看到的讲调度最好的PPT:https://speakerdeck.com/kavya…work stealing论文:http://supertech.csail.mit.ed…分析调度器的论文(就问你6不6,还有论文研究):http://www.cs.columbia.edu/~a…声明:关于老调度器的资料已经完全搜不到,根据新版调度器设计方案的描述,想象着写了老调度器这一章,可能存在错误。参考资料https://en.wikipedia.org/wiki…https://en.wikipedia.org/wiki...https://en.wikipedia.org/wiki...https://golang.org/doc/faq#go...https://golang.org/s/go11schedhttps://golang.org/src/runtim…如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/03/10/golang-scheduler-1-history

March 10, 2019 · 1 min · jiezi

<<Java并发编程实践>>有感 ConcurrentLinkedQueue解读

ConcurrentLinkedQueue(上集)算法实现 CASCAS的优点当一个线程执行任务失败不影响其他线程的进行 最大限度的利用CPU资源 能提高程序的伸缩性 伸缩性:不修改任何代码 升级硬件就能带来性能上的提高 升级硬件带来的性能提高明显 就是伸缩性良好CAS的缺点代码复杂 影响阅读性 刚开始看ConcurrentLinkedQueue的时候 没有正确的思路,理解起来会比较费劲 我推荐直接用多线程同时执行的方式去理解 这样会比较好重要概念不变性所有item不为null的节点都能从head头节点开始通过succ()方法访问到head!=null 只要队列有值 保证真实的head永不为null head哪怕会自引用 迟早也会解除这种假状态可变性heatd.item 可能为null也可能不为null 因为cas活锁操作 每一行代码执行都不影响其他线程的访问相同的代码块tail尾节点的更新是滞后于head的 个人理解 在offer中 尾节点掉队后 通过head节点 (不变性1的保证) 成功访问最后一个p.next=null的节点快照snapshot是我自己的理解 因为对于多线程操作来说 当前引用对象 如offer()中 t=tail中的t; p=t中的p; q=p.next中的q都是一个快照 他获得一个对象的快照版本 然后在后续的操作中 使(t!=(t=tail))这样操作有意义重要方法offer()入队poll() 出队源码public boolean offer(E e) { checkNotNull(e); //NullPointException检查 final Node<E> newNode = new Node<E>(e); //包装成一个Node对象 for (Node<E> t = tail, p = t;;) {//获取当前尾节点 t=tail,p是真正的尾节点 p.next==null Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) {//方法1 CAS更新 自己想3个线程同时进行这个操作 // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become “live”. if (p != t) // hop two nodes at a time //方法2 延迟更新尾节点 下面说为什么 casTail(t, newNode); //方法3 成不成功无所谓 下面说 return true; } // Lost CAS race to another thread; re-read next } else if (p == q)// 方法4 学习offer方法时 可以暂时放弃这一步 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else //去找到真正的尾节点 此处和方法2 应是相互辉映的存在 // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //方法5 } }解读offer()方法自顶向下 思考CAS中可能出现的情况 CAS是活锁 所谓活锁即是每一行代码运行时 允许其他线程访问相同的代码块 成功与失败并存 衍生了更多的条件判断 本人觉得CAS方法都应该从这个方法去理解 再自己画画时序图 (注意:理解offer()时,先把方法4排除,因为4方法出现自引用的情况 只有offer()和poll()交替执行时会出现 本文只介绍第一种情况)多线程操作第一种情况: 只有 offer()第二种情况: offer()和 poll()方法交替执行同时执行offer()(假设我们现在有3个线程)不变性:永远只有一个线程CAS成功 并且总会成功一个循环次数分析:Thread1 成功 循环一次退出 Thread2失败 再循环一次成功 Thread3失败 再循环两次成功 如果有n个线程同时执行 offer() 执行次数 最大为n次 最少为1次方法5中三目表达式解析: p=condition?result1:result2 我先说一下这里的意义 满足result1的场景为 :获取尾节点tail的快照已经过时了(其他线程更新了新的尾节点tail) 直接跳转到当前获得的最新尾节点的地方 满足result2的场景为:多线程同时操作offer() 执行1方法CAS成功后 未更新尾节点(未执行3方法:两种原因 1是未满足前置条件if判断 2是CAS更新失败) 直接找next节点方法2与方法5 是整个offer() 操作的点睛之笔 下面解释只有offer() 操作时假设:Thread 1执行完1方法成功 还未执行2方法 Thread2和Thread3进入5方法 ,也就是说Thread2和Thread3执行5方法发生在Thread1执行2方法之前 Thread2 and Thread3 invoke method5() before Thread1 invoke method2() 此时 Thread2.p =q,Thread3.p=q, 因为p==t成立 时序图如下,然后Thread1执行方法2 p==t 不执行tail尾节点的更新操作 由此可知 尾节点是延迟更新 一切为了更高效~~~ 图1 Thread 2 与 Thread3 此时再次执行 1 方法 见图1 他们此时的q.next==null 我们规定Thread2 CAS成功 Thread3失败了 成功后的时序图如下 我们假设 Thread3 invoke method5() after Thread2 invoke method2() Thread2执行方法2 在 Thread3执行方法5之前 图2 对于Thread2 进入2方法 p!=t 满足 执行 casTail(t, newNode) 更新尾节点的快照 如下图 图3 Thread2 工作完成 退出循环 对于Thread3 因为执行1方法失败 进入5方法 此时Thread3的tail快照t3 p = (p != t && t != (t = tail)) ? t : q; 按图3来翻译p=(p!=t3&&t3!=(t3=t2))?t2:q;p=t2;//直接去当前能获取到的尾节点!!!到这里 offer() 方法解决完成ConcurrentLinkedQueue核心总结tail和head都是 延迟更新的 但是tail更新在head更新后面 因为方法4中 需要依赖head节点 去找每一个存活的节点前面的叙述中 可以看到 offer() 方法内 核心操作 就是 p=condition?result1:result2偶数次offer() 操作更新一次tail 单线程的环境下与Michael-Scott 队列比较Michael-Scott队列 每次操作 都需要判断是否需要推动尾节点 采取CAS的操作 优点也是缺点Doug Lead老神仙的CAS 我这个菜鸟猜测 能不用CAS 就尽量不用 因为CAS存在竞争 提供以最少次数的更新达到最终正确的效果我们把offer()中的整个行为想象为跳台阶 result1的形式就像是 武侠小说中的越阶战斗!!!result2的形式就是一步一个脚印 每次平稳地去下一个台阶我们想象一下 offer()最优的情况 10个线程同时offer() 每一个执行1方法成功的线程都没有(执行2方法或则执行3方法失败) 没关系 尾节点的更新终会成功每一个失败的线程都是去当前节点的next节点 p.next进行插入操作 在第9个线程(相当于我们上文中的线程2) 当第10个线程操作时 虽然它很可怜 一直排到最后 但是尾节点更新一下就越过了9阶!!!(不太恰当的地方请大佬们指点) ConcurrrntLinkedQueue 优点能跃过一整段因为多线程在极短时间内offer()插入的节点 直接去尾节点 直接跨过去能抵达每一个相对于当前快照来说最新的next节点高并发时 tail 和 p 相互配合 尽力去离当前尾节点 最近的地方ConcurrentLinkedQueue 缺点CAS操作 虽然总会成功 但是竞争效率如果很低 不如用同步锁 采用CAS编写并发代码 都是大佬级别 难度高 不接地气(嘿嘿)循环可能会带来额外的资源开销 ...

January 25, 2019 · 2 min · jiezi

总结了才知道,原来channel有这么多用法!

这篇文章总结了channel的10种常用操作,以一个更高的视角看待channel,会给大家带来对channel更全面的认识。在介绍10种操作前,先简要介绍下channel的使用场景、基本操作和注意事项。channel的使用场景把channel用在数据流动的地方:消息传递、消息过滤信号广播事件订阅与广播请求、响应转发任务分发结果汇总并发控制同步与异步…channel的基本操作和注意事项channel存在3种状态:nil,未初始化的状态,只进行了声明,或者手动赋值为nilactive,正常的channel,可读或者可写closed,已关闭,千万不要误认为关闭channel后,channel的值是nilchannel可进行3种操作:读写关闭把这3种操作和3种channel状态可以组合出9种情况:对于nil通道的情况,也并非完全遵循上表,有1个特殊场景:当nil的通道在select的某个case中时,这个case会阻塞,但不会造成死锁。参考代码请看:https://dave.cheney.net/2014/…下面介绍使用channel的10种常用操作。1. 使用for range读channel场景:当需要不断从channel读取数据时原理:使用for-range读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。用法:for x := range ch{ fmt.Println(x)}2. 使用_,ok判断channel是否关闭场景:读channel,但不确定channel是否关闭时原理:读已关闭的channel会造成panic,如果不确定channel,需要使用ok进行检测。ok的结果和含义:true:读到数据,并且通道没有关闭。false:通道关闭,无数据读到。用法:if v, ok := <- ch; ok { fmt.Println(v)}3. 使用select处理多个channel场景:需要对多个通道进行同时处理,但只处理最先发生的channel时原理:select可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的。用法:// 分配job时,如果收到关闭的通知则退出,不分配jobfunc (h *Handler) handle(job *Job) { select { case h.jobCh<-job: return case <-h.stopCh: return }}4. 使用channel的声明控制读写权限场景:协程对某个通道只读或只写时目的:A. 使代码更易读、更易维护,B. 防止只读协程对通道进行写数据,但通道已关闭,造成panic。用法:如果协程对某个channel只有写操作,则这个channel声明为只写。如果协程对某个channel只有读操作,则这个channe声明为只读。// 只有generator进行对outCh进行写操作,返回声明// <-chan int,可以防止其他协程乱用此通道,造成隐藏bugfunc generator(int n) <-chan int { outCh := make(chan int) go func(){ for i:=0;i<n;i++{ outCh<-i } }() return outCh}// consumer只读inCh的数据,声明为<-chan int// 可以防止它向inCh写数据func consumer(inCh <-chan int) { for x := range inCh { fmt.Println(x) }}5. 使用缓冲channel增强并发和异步场景:异步和并发原理:A. 有缓冲通道是异步的,无缓冲通道是同步的,B. 有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。用法:// 无缓冲,同步ch1 := make(chan int)ch2 := make(chan int, 0)// 有缓冲,异步ch3 := make(chan int, 1)// 使用5个do协程同时处理输入数据func test() { inCh := generator(100) outCh := make(chan int, 10) for i := 0; i < 5; i++ { go do(inCh, outCh) } for r := range outCh { fmt.Println(r) }}func do(inCh <-chan int, outCh chan<- int) { for v := range inCh { outCh <- v * v }}6. 为操作加上超时场景:需要超时控制的操作原理:使用select和time.After,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果用法:func doWithTimeOut(timeout time.Duration) (int, error) { select { case ret := <-do(): return ret, nil case <-time.After(timeout): return 0, errors.New(“timeout”) }}func do() <-chan int { outCh := make(chan int) go func() { // do work }() return outCh}7. 使用time实现channel无阻塞读写场景:并不希望在channel的读写上浪费时间原理:是为操作加上超时的扩展,这里的操作是channel的读或写用法:func unBlockRead(ch chan int) (x int, err error) { select { case x = <-ch: return x, nil case <-time.After(time.Microsecond): return 0, errors.New(“read time out”) }}func unBlockWrite(ch chan int, x int) (err error) { select { case ch <- x: return nil case <-time.After(time.Microsecond): return errors.New(“read time out”) }}注:time.After等待可以替换为default,则是channel阻塞时,立即返回的效果8. 使用close(ch)关闭所有下游协程场景:退出时,显示通知所有协程退出原理:所有读ch的协程都会收到close(ch)的信号用法:func (h *Handler) Stop() { close(h.stopCh) // 可以使用WaitGroup等待所有协程退出}// 收到停止后,不再处理请求func (h *Handler) loop() error { for { select { case req := <-h.reqCh: go handle(req) case <-h.stopCh: return } }}9. 使用chan struct{}作为信号channel场景:使用channel传递信号,而不是传递数据时原理:没数据需要传递时,传递空struct用法:// 上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据// 只是要给所有协程发送退出的信号type Handler struct { stopCh chan struct{} reqCh chan *Request}10. 使用channel传递结构体的指针而非结构体场景:使用channel传递结构体数据时原理:channel本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效用法:reqCh chan *Request// 好过reqCh chan Request你有哪些channel的奇淫巧技,说来看看?如果这篇文章对你有帮助,请点个赞/喜欢,感谢。本文作者:大彬如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/20/golang-channel-all-usage/ ...

January 21, 2019 · 2 min · jiezi

Golang并发模型:select进阶

最近公司工作有点多,Golang的select进阶就这样被拖沓啦,今天坚持把时间挤一挤,把吹的牛皮补上。前一篇文章《Golang并发模型:轻松入门select》介绍了select的作用和它的基本用法,这次介绍它的3个进阶特性。nil的通道永远阻塞如何跳出for-selectselect{}阻塞nil的通道永远阻塞当case上读一个通道时,如果这个通道是nil,则该case永远阻塞。这个功能有1个妙用,select通常处理的是多个通道,当某个读通道关闭了,但不想select再继续关注此case,继续处理其他case,把该通道设置为nil即可。下面是一个合并程序等待两个输入通道都关闭后才退出的例子,就使用了这个特性。func combine(inCh1, inCh2 <-chan int) <-chan int { // 输出通道 out := make(chan int) // 启动协程合并数据 go func() { defer close(out) for { select { case x, open := <-inCh1: if !open { inCh1 = nil continue } out<-x case x, open := <-inCh2: if !open { inCh2 = nil continue } out<-x } // 当ch1和ch2都关闭是才退出 if inCh1 == nil && inCh2 == nil { break } } }() return out}如何跳出for-selectbreak在select内的并不能跳出for-select循环。看下面的例子,consume函数从通道inCh不停读数据,期待在inCh关闭后退出for-select循环,但结果是永远没有退出。func consume(inCh <-chan int) { i := 0 for { fmt.Printf(“for: %d\n”, i) select { case x, open := <-inCh: if !open { break } fmt.Printf(“read: %d\n”, x) } i++ } fmt.Println(“combine-routine exit”)}运行结果:➜ go run x.gofor: 0read: 0for: 1read: 1for: 2read: 2for: 3gen exitfor: 4for: 5for: 6for: 7for: 8… // never stop既然break不能跳出for-select,那怎么办呢?给你3个锦囊:在满足条件的case内,使用return,如果有结尾工作,尝试交给defer。在select外for内使用break挑出循环,如combine函数。使用goto。select{}永远阻塞select{}的效果等价于创建了1个通道,直接从通道读数据:ch := make(chan int)<-ch但是,这个写起来多麻烦啊!没select{}简洁啊。但是,永远阻塞能有什么用呢!?当你开发一个并发程序的时候,main函数千万不能在子协程干完活前退出啊,不然所有的协程都被迫退出了,还怎么提供服务呢?比如,写了个Web服务程序,端口监听、后端处理等等都在子协程跑起来了,main函数这时候能退出吗?select应用场景最后,介绍下我常用的select场景:无阻塞的读、写通道。即使通道是带缓存的,也是存在阻塞的情况,使用select可以完美的解决阻塞读写,这篇文章我之前发在了个人博客,后面给大家介绍下。给某个请求/处理/操作,设置超时时间,一旦超时时间内无法完成,则停止处理。select本色:多通道处理并发系列文章推荐Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出Golang并发模型:轻松入门select如果这篇文章对你有帮助,请点个赞/喜欢,鼓励我持续分享,感谢。我的文章列表,点此可查看如果喜欢本文,随意转载,但请保留此原文链接。 ...

December 18, 2018 · 1 min · jiezi

Golang并发模型:轻松入门select

之前的文章都提到过,Golang的并发模型都来自生活,select也不例外。举个例子:我们都知道一句话,“吃饭睡觉打豆豆”,这一句话里包含了3件事:妈妈喊你吃饭,你去吃饭。时间到了,要睡觉。没事做,打豆豆。在Golang里,select就是干这个事的:到吃饭了去吃饭,该睡觉了就睡觉,没事干就打豆豆。结束发散,我们看下select的功能,以及它能做啥。select功能在多个通道上进行读或写操作,让函数可以处理多个事情,但1次只处理1个。以下特性也都必须熟记于心:每次执行select,都会只执行其中1个case或者执行default语句。当没有case或者default可以执行时,select则阻塞,等待直到有1个case可以执行。当有多个case可以执行时,则随机选择1个case执行。case后面跟的必须是读或者写通道的操作,否则编译出错。select长下面这个样子,由select和case组成,default不是必须的,如果没其他事可做,可以省略default。func main() { readCh := make(chan int, 1) writeCh := make(chan int, 1) y := 1 select { case x := <-readCh: fmt.Printf(“Read %d\n”, x) case writeCh <- y: fmt.Printf(“Write %d\n”, y) default: fmt.Println(“Do what you want”) }}我们创建了readCh和writeCh2个通道:readCh中没有数据,所以case x := <-readCh读不到数据,所以这个case不能执行。writeCh是带缓冲区的通道,它里面是空的,可以写入1个数据,所以case writeCh <- y可以执行。有case可以执行,所以default不会执行。这个测试的结果是$ go run example.goWrite 1用打豆豆实践select来,我们看看select怎么实现打豆豆:eat()函数会启动1个协程,该协程先睡几秒,事件不定,然后喊你吃饭,main()函数中的sleep是个定时器,每3秒喊你吃1次饭,select则处理3种情况:从eatCh中读到数据,代表有人喊我吃饭,我要吃饭了。从sleep.C中读到数据,代表闹钟时间到了,我要睡觉。default是,没人喊我吃饭,也不到时间睡觉,我就打豆豆。import ( “fmt” “time” “math/rand”)func eat() chan string { out := make(chan string) go func (){ rand.Seed(time.Now().UnixNano()) time.Sleep(time.Duration(rand.Intn(5)) * time.Second) out <- “Mom call you eating” close(out) }() return out}func main() { eatCh := eat() sleep := time.NewTimer(time.Second * 3) select { case s := <-eatCh: fmt.Println(s) case <- sleep.C: fmt.Println(“Time to sleep”) default: fmt.Println(“Beat DouDou”) }}由于前2个case都要等待一会,所以都不能执行,所以执行default,运行结果一直是打豆豆:$ go run x.goBeat DouDou现在我们不打豆豆了,你把default和下面的打印注释掉,多运行几次,有时候会吃饭,有时候会睡觉,比如这样:$ go run x.goMom call you eating$ go run x.goTime to sleep$ go run x.goTime to sleepselect很简单但功能很强大,它让golang的并发功能变的更强大。这篇文章写的啰嗦了点,重点是为下一篇文章做铺垫,下一篇我们将介绍下select的高级用法。select的应用场景很多,让我总结一下,放在下一篇文章中吧。并发系列文章推荐Golang并发模型:轻松入门流水线模型Golang并发模型:轻松入门流水线FAN模式Golang并发模型:并发协程的优雅退出Golang并发模型:轻松入门select如果这篇文章对你有帮助,请点个赞/喜欢,鼓励我持续分享,感谢。我的文章列表,点此可查看如果喜欢本文,随意转载,但请保留此原文链接。 ...

December 12, 2018 · 1 min · jiezi

一份针对于新手的多线程实践

前言前段时间在某个第三方平台看到我写作字数居然突破了 10W 字,难以想象高中 800 字作文我都得巧妙的利用换行来完成(懂的人肯定也干过????)。干了这行养成了一个习惯:能撸码验证的事情都自己验证一遍。于是在上周五通宵加班的空余时间写了一个工具:https://github.com/crossoverJie/NOWS利用 SpringBoot 只需要一行命令即可统计自己写了多少个字。java -jar nows-0.0.1-SNAPSHOT.jar /xx/Hexo/source/_posts传入需要扫描的文章目录即可输出结果(目前只支持 .md 结尾 Markdown 文件)当然结果看个乐就行(40 几万字),因为早期的博客我喜欢大篇的贴代码,还有一些英文单词也没有过滤,所以导致结果相差较大。如果仅仅只是中文文字统计肯定是准的,并且该工具内置灵活的扩展方式,使用者可以自定义统计策略,具体请看后文。其实这个工具挺简单的,代码量也少,没有多少可以值得拿出来讲的。但经过我回忆不管是面试还是和网友们交流都发现一个普遍的现象:大部分新手开发都会去看多线程、但几乎都没有相关的实践。甚至有些都不知道多线程拿来在实际开发中有什么用。为此我想基于这个简单的工具为这类朋友带来一个可实践、易理解的多线程案例。至少可以让你知道:为什么需要多线程?怎么实现一个多线程程序?多线程带来的问题及解决方案?单线程统计再谈多线程之前先来聊聊单线程如何实现。本次的需求也很简单,只是需要扫描一个目录读取下面的所有文件即可。所有我们的实现有以下几步:读取某个目录下的所有文件。将所有文件的路径保持到内存。遍历所有的文件挨个读取文本记录字数即可。先来看前两个如何实现,并且当扫描到目录时需要继续读取当前目录下的文件。这样的场景就非常适合递归: public List<String> getAllFile(String path){ File f = new File(path) ; File[] files = f.listFiles(); for (File file : files) { if (file.isDirectory()){ String directoryPath = file.getPath(); getAllFile(directoryPath); }else { String filePath = file.getPath(); if (!filePath.endsWith(".md")){ continue; } allFile.add(filePath) ; } } return allFile ; }}读取之后将文件的路径保持到一个集合中。需要注意的是这个递归次数需要控制下,避免出现栈溢出(StackOverflow)。最后读取文件内容则是使用 Java8 中的流来进行读取,这样代码可以更简洁:Stream<String> stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8);List<String> collect = stringStream.collect(Collectors.toList());接下来便是读取字数,同时要过滤一些特殊文本(比如我想过滤掉所有的空格、换行、超链接等)。扩展能力简单处理可在上面的代码中遍历 collect 然后把其中需要过滤的内容替换为空就行。但每个人的想法可能都不一样。比如我只想过滤掉空格、换行、超链接就行了,但有些人需要去掉其中所有的英文单词,甚至换行还得留着(就像写作文一样可以充字数)。所有这就需要一个比较灵活的处理方式。看过上文《利用责任链模式设计一个拦截器》应该很容易想到这样的场景责任链模式再合适不过了。关于责任链模式具体的内容就不在详述了,感兴趣的可以查看上文。这里直接看实现吧:定义责任链的抽象接口及处理方法:public interface FilterProcess { /** * 处理文本 * @param msg * @return / String process(String msg) ;}处理空格和换行的实现:public class WrapFilterProcess implements FilterProcess{ @Override public String process(String msg) { msg = msg.replaceAll("\s", “”); return msg ; }}处理超链接的实现:public class HttpFilterProcess implements FilterProcess{ @Override public String process(String msg) { msg = msg.replaceAll("^((https|http|ftp|rtsp|mms)?:\/\/)[^\s]+",""); return msg ; }}这样在初始化时需要将这些处理 handle 都加入责任链中,同时提供一个 API 供客户端执行即可。这样一个简单的统计字数的工具就完成了。多线程模式在我本地一共就几十篇博客的条件下执行一次还是很快的,但如果我们的文件是几万、几十万甚至上百万呢。虽然功能可以实现,但可以想象这样的耗时绝对是成倍的增加。这时多线程就发挥优势了,由多个线程分别去读取文件最后汇总结果即可。这样实现的过程就变为:读取某个目录下的所有文件。将文件路径交由不同的线程自行处理。最终汇总结果。多线程带来的问题也不是使用多线程就万事大吉了,先来看看第一个问题:共享资源。简单来说就是怎么保证多线程和单线程统计的总字数是一致的。基于我本地的环境先看看单线程运行的结果:总计为:414142 字。接下来换为多线程的方式:List<String> allFile = scannerFile.getAllFile(strings[0]);logger.info(“allFile size=[{}]",allFile.size());for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager));}public class ScanNumTask implements Runnable { private static Logger logger = LoggerFactory.getLogger(ScanNumTask.class); private String path; private FilterProcessManager filterProcessManager; public ScanNumTask(String path, FilterProcessManager filterProcessManager) { this.path = path; this.filterProcessManager = filterProcessManager; } @Override public void run() { Stream<String> stringStream = null; try { stringStream = Files.lines(Paths.get(path), StandardCharsets.UTF_8); } catch (Exception e) { logger.error(“IOException”, e); } List<String> collect = stringStream.collect(Collectors.toList()); for (String msg : collect) { filterProcessManager.process(msg); } }}使用线程池管理线程,更多线程池相关的内容请看这里:《如何优雅的使用和理解线程池》执行结果:我们会发现无论执行多少次,这个值都会小于我们的预期值。来看看统计那里是怎么实现的。@Componentpublic class TotalWords { private long sum = 0 ; public void sum(int count){ sum += count; } public long total(){ return sum; }}可以看到就是对一个基本类型进行累加而已。那导致这个值比预期小的原因是什么呢?我想大部分人都会说:多线程运行时会导致有些线程把其他线程运算的值覆盖。但其实这只是导致这个问题的表象,根本原因还是没有讲清楚。内存可见性核心原因其实是由 Java 内存模型(JMM)的规定导致的。这里引用一段之前写的《你应该知道的 volatile 关键字》一段解释:由于 Java 内存模型(JMM)规定,所有的变量都存放在主内存中,而每个线程都有着自己的工作内存(高速缓存)。线程在工作时,需要将主内存中的数据拷贝到工作内存中。这样对数据的任何操作都是基于工作内存(效率提高),并且不能直接操作主内存以及其他线程工作内存中的数据,之后再将更新之后的数据刷新到主内存中。这里所提到的主内存可以简单认为是堆内存,而工作内存则可以认为是栈内存。如下图所示:所以在并发运行时可能会出现线程 B 所读取到的数据是线程 A 更新之前的数据。更多相关内容就不再展开了,感兴趣的朋友可以翻翻以前的博文。直接来说如何解决这个问题吧,JDK 其实已经帮我们想到了这些问题。在 java.util.concurrent 并发包下有许多你可能会使用到的并发工具。这里就非常适合 AtomicLong,它可以原子性的对数据进行修改。来看看修改后的实现:@Componentpublic class TotalWords { private AtomicLong sum = new AtomicLong() ; public void sum(int count){ sum.addAndGet(count) ; } public long total(){ return sum.get() ; }}只是使用了它的两个 API 而已。再来运行下程序会发现结果居然还是不对。甚至为 0 了。线程间通信这时又出现了一个新的问题,来看看获取总计数据是怎么实现的。List<String> allFile = scannerFile.getAllFile(strings[0]);logger.info(“allFile size=[{}]",allFile.size());for (String msg : allFile) { executorService.execute(new ScanNumTask(msg,filterProcessManager));}executorService.shutdown();long total = totalWords.total();long end = System.currentTimeMillis();logger.info(“total sum=[{}],[{}] ms”,total,end-start);不知道大家看出问题没有,其实是在最后打印总数时并不知道其他线程是否已经执行完毕了。因为 executorService.execute() 会直接返回,所以当打印获取数据时还没有一个线程执行完毕,也就导致了这样的结果。关于线程间通信之前我也写过相关的内容:《深入理解线程通信》大概的方式有以下几种:这里我们使用线程池的方式:在停用线程池后加上一个判断条件即可:executorService.shutdown();while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info(“worker running”);}long total = totalWords.total();long end = System.currentTimeMillis();logger.info(“total sum=[{}],[{}] ms”,total,end-start);这样我们再次尝试,发现无论多少次结果都是正确的了:效率提升可能还会有朋友问,这样的方式也没见提升多少效率啊。这其实是由于我本地文件少,加上一个文件处理的耗时也比较短导致的。甚至线程数开的够多导致频繁的上下文切换还是让执行效率降低。为了模拟效率的提升,每处理一个文件我都让当前线程休眠 100 毫秒来模拟执行耗时。先看单线程运行需要耗时多久。总共耗时:[8404] ms接着在线程池大小为 4 的情况下耗时:总共耗时:[2350] ms可见效率提升还是非常明显的。更多思考这只是多线程其中的一个用法,相信看到这里的朋友应该多它的理解更进一步了。再给大家留个阅后练习,场景也是类似的:在 Redis 或者其他存储介质中存放有上千万的手机号码数据,每个号码都是唯一的,需要在最快的时间内把这些号码全部都遍历一遍。有想法感兴趣的朋友欢迎在文末留言参与讨论????????。总结希望看完的朋友心中能对文初的几个问题能有自己的答案:为什么需要多线程?怎么实现一个多线程程序?多线程带来的问题及解决方案?文中的代码都在此处。https://github.com/crossoverJie/NOWS你的点赞与转发是最大的支持。 ...

October 29, 2018 · 2 min · jiezi