如何线程安全地遍历List

遍历List的多种方式在讲如何线程安全地遍历 List 之前,先看看遍历一个 List 通常会采用哪些方式。 方式一:for(int i = 0; i < list.size(); i++) { System.out.println(list.get(i));}方式二:Iterator iterator = list.iterator();while(iterator.hasNext()) { System.out.println(iterator.next());}方式三:for(Object item : list) { System.out.println(item);}方式四(Java 8):list.forEach(new Consumer<Object>() { @Override public void accept(Object item) { System.out.println(item); }});方式五(Java 8 Lambda):list.forEach(item -> { System.out.println(item);});方式一的遍历方法对于 RandomAccess 接口的实现类(例如 ArrayList)来说是一种性能很好的遍历方式。但是对于 LinkedList 这样的基于链表实现的 List,通过 list.get(i) 获取元素的性能差。 方式二和方式三两种方式的本质是一样的,都是通过 Iterator 迭代器来实现的遍历,方式三是增强版的 for 循环,可以看作是方式二的简化形式。 方式四和方式五本质也是一样的,都是使用Java 8新增的 forEach 方法来遍历。方式五是方式四的一种简化形式,使用了Lambda表达式。 遍历List的同时操作List会发生什么?先用非线程安全的 ArrayList 做个试验,用一个线程通过增强的 for 循环遍历 List,遍历的同时另一个线程删除 List 中的一个元素,代码如下: ...

November 3, 2019 · 3 min · jiezi

Java设计模式优化单例模式

单例模式概述单例模式是一种对象创建模式,用于产生一个类的具体事例。使用单例模式可以确保整个系统中单例类只产生一个实例。有下面两大好处: 对于频繁创建的对象,节省初第一次实例化之后的创建时间。由于new操作的减少,会降低系统内存的使用频率。减轻GC压力,从而缩短GC停顿时间创建方式: 单例作为类的私有private属性单例类拥有私有private构造函数提供获取实例的public方法单例模式的角色: 角色作用单例类提供单例的工厂,返回类的单例实例使用者获取并使用单例类类基本结构: 单例模式的实现1.饿汉式public class HungerSingleton { //1.饿汉式 //私有构造器 private HungerSingleton() { System.out.println("create HungerSingleton"); } //私有单例属性 private static HungerSingleton instance = new HungerSingleton(); //获取单例的方法 public static HungerSingleton getInstance() { return instance; }}注意: 单例修饰符为static JVM加载单例类加载时,直接初始化单例。无法延时加载。如果此单例一直未被使用,单Singleton 因为调用静态方法被初始化则会造成内存的浪费。getInstance()使用static修饰,不用实例化可以直接使用Singleton.getInstance()获取单例。由于单例由JVM加载类的时候创建,所以不存在线程安全问题。2.简单懒汉式public class Singleton { //2.1简单懒汉式(线程不安全) //私有构造器 private Singleton() { System.out.println("create Singleton"); } //私有单例属性[初始化为null] private static Singleton instance = null; //获取单例的方法 public static Singleton getInstance() { if(instance == null) { //此处instance实例化 //首次调用单例时会进入 达成延时加载 instance = new Singleton(); } return instance; }}由于未使用 synchronized 关键字,所以当线程1调用单例工厂方法Singleton.getInstance() 且 instance 未初始化完成时,线程2调用此方法会将instance判断为null,也会将instance重新实例化赋值,此时则产生了多个实例!如需线程安全可以直接给getInstance方法上加synchronized关键字,如下:public class Singleton { //2.2简单懒汉式(线程安全) //私有构造器 private Singleton() { System.out.println("create Singleton"); } //私有单例属性[初始化为null] private static Singleton instance = null; //获取单例的方法 将此方法使用synchronized关键字同步 public static synchronized Singleton getInstance() { if(instance == null) { //此处instance实例化 //首次调用单例时会进入 达成延时加载 instance = new Singleton(); } return instance; }}面临的问题: ...

August 7, 2019 · 2 min · jiezi

从入门到放弃Java并发编程线程安全

概述并发编程,即多条线程在同一时间段内“同时”运行。 在多处理器系统已经普及的今天,多线程能发挥出其优势,如:一个8核cpu的服务器,如果只使用单线程的话,将有7个处理器被闲置,只能发挥出服务器八分之一的能力(忽略其它资源占用情况)。同时,使用多线程,可以简化我们对复杂任务的处理逻辑,降低业务模型的复杂程度。 因此并发编程对于提高服务器的资源利用率、提高系统吞吐量、降低编码难度等方面起着至关重要的作用。 以上是并发编程的优点,但是它同样引入了一个很重要的问题:线程安全。 什么是线程安全问题线程在并发执行时,因为cpu的调度等原因,线程会交替执行。如下图例子所示 public class SelfIncremental { private static int count; public static void main(String[] args) { Thread thread1 = new Thread(() -> { for (int i = 0; i< 10000; i++) { count++; System.out.println(count); } }); Thread thread2 = new Thread(() -> { for (int i = 0; i< 10000; i++) { count++; System.out.println(count); } }); thread1.start(); thread2.start(); }}执行完毕后count的值并不是每次都能等于20000,会出现小于20000的情况,原因是thread1和thread2可能会交替执行。 如图所示: t1时刻: thread1 读取到count=100t2时刻: thread2 读取到count=100t3时刻: thread1 对count+1t4时刻: thread2 对count+1t5时刻: thread1 将101写入countt5时刻: thread2 将101写入count因为count++ 不是一个原子操作,实际上会执行三步: ...

July 15, 2019 · 1 min · jiezi

Java并发23并发设计模式-两阶段终止模式优雅地终止线程

前面我们都是在讲如何创建线程,接下来我们说下如何终止线程。 java的线程小节中,我曾讲过:线程执行完或者出现异常就会进入终止状态。这样看,终止一个线程看上去很简单啊!一个线程执行完自己的任务,自己进入终止状态,这的确很简单。不过我们今天谈到的“优雅地终止线程”,不是自己终止自己,而是在一个线程 T1 中,终止线程 T2;这里所谓的“优雅”,指的是给 T2 一个机会料理后事,而不是被直接终止。 Java 语言的 Thread 类中曾经提供了一个 stop() 方法,用来终止线程,可是早已不建议使用了,原因是这个方法用是直接终止的线程,线程并没有机会料理后事。 如何理解两阶段终止模式前辈们经过认真对比分析,已经总结出了一套成熟的方案,叫做两阶段终止模式。顾名思义,就是将终止过程分成两个阶段,其中第一个阶段主要是线程 T1 向线程 T2发送终止指令,而第二阶段则是线程 T2响应终止指令 两阶段终止模式示意图### 那在 Java 语言里,终止指令是什么呢?这个要从 Java 线程的状态转换过程说起。我们在 java的线程小节中曾经提到过 Java 线程的状态转换图。 从这个图里你会发现,Java 线程进入终止状态的前提是线程进入 RUNNABLE 状态,而实际上线程也可能处在休眠状态,也就是说,我们要想终止一个线程,首先要把线程的状态从休眠状态转换到 RUNNABLE 状态。如何做到呢?这个要靠 Java Thread 类提供的interrupt() 方法,它可以将休眠状态的线程转换到 RUNNABLE 状态。 线程转换到 RUNNABLE 状态之后,我们如何再将其终止呢?RUNNABLE 状态转换到终止状态,优雅的方式是让 Java 线程自己执行完 run() 方法,所以一般我们采用的方法是设置一个标志位,然后线程会在合适的时机检查这个标志位,如果发现符合终止条件,则自动退出 run() 方法。这个过程其实就是我们前面提到的第二阶段:响应终止指令 综合上面这两点,我们能总结出终止指令,其实包括两方面内容:interrupt() 方法和线程终止的标志位。 用两阶段终止模式终止监控操作实际工作中,有些监控系统需要动态地采集一些数据,一般都是监控系统发送采集指令给被监控系统的监控代理,监控代理接收到指令之后,从监控目标收集数据,然后回传给监控系统,详细过程如下图所示。出于对性能的考虑(有些监控项对系统性能影响很大,所以不能一直持续监控),动态采集功能一般都会有终止操作。 动态采集功能示意图### 下面的示例代码是监控代理简化之后的实现,start() 方法会启动一个新的线程 rptThread 来执行监控数据采集和回传的功能,stop() 方法需要优雅地终止线程 rptThread,那 stop() 相关功能该如何实现呢? class Proxy { boolean started = false; // 采集线程 Thread rptThread; // 启动采集功能 synchronized void start(){ // 不允许同时启动多个采集线程 if (started) { return; } started = true; rptThread = new Thread(()->{ while (true) { // 省略采集、回传实现 report(); // 每隔两秒钟采集、回传一次数据 try { Thread.sleep(2000); } catch (InterruptedException e) { } } // 执行到此处说明线程马上终止 started = false; }); rptThread.start(); } // 终止采集功能 synchronized void stop(){ // 如何实现? }} 按照两阶段终止模式,我们首先需要做的就是将线程 rptThread 状态转换到 RUNNABLE,做法很简单,只需要在调用 rptThread.interrupt() 就可以了。线程 rptThread 的状态转换到 RUNNABLE 之后,如何优雅地终止呢?下面的示例代码中,我们选择的标志位是线程的中断状态:Thread.currentThread().isInterrupted() ,需要注意的是,我们在捕获 Thread.sleep() 的中断异常之后,通过 Thread.currentThread().interrupt() 重新设置了线程的中断状态,因为 JVM 的异常处理会清除线程的中断状态。 ...

July 15, 2019 · 2 min · jiezi

操作的原子性与线程安全

本案例来源于java zone社区,由于源代码里面存在一些自己开发的注解,我暂时没找到相关的文档,所以我做了一些修改。用的都是java SDK的API。关于概念: 原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。线程安全:就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据污染。线程不安全:就是不提供数据访问保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据进入正题,如果可以从多个线程调用所有方法而没有外部同步,则类是线程安全的。为了实现这一点,线程安全方法必须是原子的,例如,其他线程只能看到方法之前或之后调用之间的状态。以下示例说明了为什么线程安全方法必须是原子的: public class TR extends FanLibrary { private volatile int i = 0; public void ss() { sleep(100);//为了更容易出现效果 i++; } @Before public void be() { output("before"); } @Test public void sdfa() throws InterruptedException { Thread first = new Thread(() -> { ss(); }); Thread second = new Thread(() -> { ss(); }); first.start(); second.start(); first.join(); second.join(); output(i); } @After public void ds() { output("after"); }}控制台输出,以下内容可能会出现,代码中sleep(100)的原因: ...

July 13, 2019 · 1 min · jiezi

Java并发19并发设计模式-ThreadLocal-线程本地存储模式

我们曾经重复说到,多个线程同时读写同一共享变量存在并发问题。前面两篇文章我们突破的是写,没有写操作自然没有并发问题了。其实还可以突破共享变量,没有共享变量也不会有并发问题。 那如何避免共享呢?思路其实很简单,并发编程领域,就是每个线程都拥有自己的变量,彼此之间不共享,也就没有并发问题了。 我们知道局部变量可以做到避免共享, 即线程封闭,其本质上就是避免共享。那还有没有其他方法可以做到呢?有的Java 语言提供的线程本地存储(ThreadLocal)就能够做到 ThreadLocal 的使用方法下面这个静态类 ThreadId 会为每个线程分配一个唯一的线程 Id,如果一个线程前后两次调用 ThreadId 的 get() 方法,两次 get() 方法的返回值是相同的。但如果是两个线程分别调用 ThreadId 的 get() 方法,那么两个线程看到的 get() 方法的返回值是不同的。若你是初次接触 ThreadLocal,可能会觉得奇怪,为什么相同线程调用 get() 方法结果就相同,而不同线程调用 get() 方法结果就不同呢? static class ThreadId { static final AtomicLong nextId=new AtomicLong(0); // 定义 ThreadLocal 变量 static final ThreadLocal<Long> tl=ThreadLocal.withInitial( ()->nextId.getAndIncrement()); // 此方法会为每个线程分配一个唯一的 Id static long get(){ return tl.get(); }}在详细解释 ThreadLocal 的工作原理之前,我们再看一个实际工作中可能遇到的例子来加深一下对 ThreadLocal 的理解。你可能知道 SimpleDateFormat 不是线程安全的,那如果需要在并发场景下使用它。 其实有一个办法就是用 ThreadLocal 来解决,下面的示例代码就是 ThreadLocal 解决方案的具体实现,这段代码与前面 ThreadId 的代码高度相似,同样地,不同线程调用 SafeDateFormat 的 get() 方法将返回不同的 SimpleDateFormat 对象实例,由于不同线程并不共享 SimpleDateFormat,所以就像局部变量一样,是线程安全的。 ...

July 9, 2019 · 2 min · jiezi

如何编写快速且线程安全的Python代码

概述如今我也是使用Python写代码好多年了,但是我却很少关心GIL的内部机制,导致在写Python多线程程序的时候。今天我们就来看看CPython的源代码,探索一下GIL的源码,了解为什么Python里要存在这个GIL,过程中我会给出一些示例来帮助大家更好的理解GIL。 GIL概览有如下代码: static PyThread_type_lock interpreter_lock = 0; /* This is the GIL */这行代码位于Python2.7源码ceval.c文件里。在类Unix操作系统中,PyThread_type_lock对应C语言里的mutex_t类型。在Python解释器开始运行时初始化这个变量 voidPyEval_InitThreads(void){ interpreter_lock = PyThread_allocate_lock(); PyThread_acquire_lock(interpreter_lock);}所有Python解释器里执行的c代码都必须获取这个锁,作者一开始为求简单,所以使用这种单线程的方式,后来每次想移除时,都发现代价太高了。 GIL对程序中的线程的影响很简单,你可以在手背上写下这个原则:“一个线程运行Python,而另外一个线程正在等待I / O.”Python代码可以使用threading.Lock或者其他同步对象,来释放CPU占用,让其他程序得以执行。 什么时候线程切换? 每当线程开始休眠或等待网络I / O时,另一个线程都有机会获取GIL并执行Python代码。CPython还具有抢先式多任务处理:如果一个线程在Python 2中不间断地运行1000个字节码指令,或者在Python 3中运行15毫秒,那么它就会放弃GIL而另一个线程可能会运行。 协作式多任务每当运行一个任务,比如网络I/O,持续的时间很长或者无法确定运行时间,这时可以放弃GIL,这样另一个线程就可以接受并运行Python。 这种行为称为协同多任务,它允许并发; 许多线程可以同时等待不同的事件。假设有两个链接socket的线程 def do_connect(): s = socket.socket() s.connect(('python.org', 80)) # drop the GILfor i in range(2): t = threading.Thread(target=do_connect) t.start()这两个线程中一次只有一个可以执行Python,但是一旦线程开始连接,它就会丢弃GIL,以便其他线程可以运行。这意味着两个线程都可以等待它们的套接字同时连接,他们可以在相同的时间内完成更多的工作。接下来,让我们打开Python的源码,来看看内部是如何实现的(位于socketmodule.c文件里): static PyObject *sock_connect(PySocketSockObject *s, PyObject *addro){ sock_addr_t addrbuf; int addrlen; int res; /* convert (host, port) tuple to C address */ getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen); Py_BEGIN_ALLOW_THREADS res = connect(s->sock_fd, addr, addrlen); Py_END_ALLOW_THREADS /* error handling and so on .... */}Py_BEGIN_ALLOW_THREADS宏指令用于释放GIL,他的定义很简单: ...

May 22, 2019 · 2 min · jiezi

ArrayList-线程安全性学习

引言最近学校的氛围比较活跃,考研的复习,不考研的都在写简历准备面试。 看了看,最近也没有好公司来办宣讲会,也就没了投简历的意向。最近看了看面试题,想着补一补基础,以后面几家Spring Cloud的企业,去和面试官交流交流。 Spring Cloud的学习与体会 最近看了《Spring Cloud微服务实战》一书,感觉受益匪浅,大有裨益。 高并发应用,必须是要启用Spring Cloud的。有了Spring Cloud,就不用再像之前一样,前端工程师团队,后端工程师团队,运维团队。而是按模块划分,订单模块团队,支付模块团队,每个团队里都是从前端到后台到运维的全栈工程师。 就像上次黄庭祥说的,ThinkPHP开发,他写学期管理;AngularJS开发,他又写学期管理;Angular开发,他还写学期管理。想到什么了么?肯定精通这个模块的业务逻辑啊? 如果培养出优秀的支付模块团队、优秀的安全模块团队、优秀的高并发优化团队,其实淘宝也不过如此。 相互的依赖,从原来的@Autowired转为服务器接口间的调用。每个模块都是一个Spring Cloud应用,各应用间通过互相调用、相互协作共同实现业务功能,同时,各应用模块可以采用不同的数据库,以发挥各数据库之所长。 然后后台分布式部署,到了并发的时候,给相应的模块加服务器负载均衡就是了。个人中心模块,不常用,两个服务器负载;订单模块,可能会并发,加个百十来个服务器负载均衡。当然,像618、双十一这样的场景,肯定不是加服务器就能解决的,我这里只是举个简单的例子。模块划分之后,可以有针对性地解决高并发问题。 不扯淡了,开始进入正题。 面试题再谈线程安全什么是线程安全? 我看到这道题就感觉怎么也说不出来,就是多线程的环境下运行,我这个应用也不炸,虽然是这个意思,但是也不能这样回答啊?一时之间,找不到相关的学术词汇回答此问题。 这是想了许久后,我自己总结出的回答: 程序在单线程环境下正常执行得到了正确的结果,在多个线程并发执行的环境条件下,仍然能得到像单线程一样正确的结果,这就是线程安全。 如果一个类(或对象),我们在使用时,无需考虑任何多线程相关的问题,就像单线程一样使用,且最后能得到正确的结果,那就说这个类(或对象)是线程安全的。 ArrayList线程安全吗?看了许多面试题,发现面试官都喜欢以一个小方面进行切入,然后无限扩展,直到把面试者问懵圈为止。 ArrayList线程安全吗? 虽然天天用ArrayList,但是真的没考虑过这个问题。其实,ArrayList线程不安全。 ArrayList是一个内部采用数组实现的线性表,它相比数组最大的优点就是使用时可以不用去像数组一样new的时候去考虑要容纳多少个元素。ArrayList默认构造一个容量为10的数组。 private static final int DEFAULT_CAPACITY = 10;如果容量不够了,ArrayList会自动扩容,扩容至原来的1.5倍。(右移一位,相当于除以2)。 int newCapacity = oldCapacity + (oldCapacity >> 1);ArrayList没有对多线程问题进行处理,举个add方法的例子就能证明它线程不安全。 elementData[size++] = e;别看这是一行,其实是执行了两步操作,赋值和自增。 线程A add一个元素,然后暂停执行,size还没自增,然后线程B再add元素,size没变,就直接把A add的元素覆盖了。 不安全为什么要使用?又回到了之前向晨澍请教的问题,线程安全,必然是有额外开销的。 所以List的三个接口ArrayList、LinkedList和Vector。 线程不安全的要比线程安全的执行效率高。所以我们常用的是线程不安全的ArrayList、LinkedList,而从来没有用过线程安全的Vector。 Vector自JDK1.0就存在,设计得不够完善,多线程情况下如果使用不当也会发生错误,不推荐使用。 如何解决线程不安全既然Vector不能用,那我就想要一个线程安全的List得怎么整呢? 调用Collections.synchronizedList方法,使ArrayList线程安全。 List<String> synchronizedList = Collections.synchronizedList(new ArrayList<>());返回SynchronizedList类的对象,经典的装饰器模式,对方法访问加了同步。 public void add(int index, E element) { synchronized (mutex) {list.add(index, element);}}public E remove(int index) { synchronized (mutex) {return list.remove(index);}}总结何处望神州?满眼风光北固楼。千古兴亡多少事?悠悠。不尽长江滚滚流。年少万兜鍪,坐断东南战未休。天下英雄谁敌手?曹刘。生子当如孙仲谋。 ...

May 11, 2019 · 1 min · jiezi

多线程、锁和线程同步方案

多线程多线程技术大家都很了解,而且在项目中也比较常用。比如开启一个子线程来处理一些耗时的计算,然后返回主线程刷新UI等。首先我们先简单的梳理一下常用到的多线程方案。具体的用法这里我就不说了,每一种方案大家可以去查一下,网上教程很多。常见的多线程方案我们比较常用的是GCD和NSOperation,当然还有NSThread,pthread。他们的具体区别我们不详细说,给出下面这一个表格,大家自行对比一下。容易混淆的术语提到多线程,有一个术语是经常能听到的,同步,异步,串行,并发。同步和异步的区别,就是是否有开启新的线程的能力。异步具备开启线程的能力,同步不具备开启线程的能力。注意,异步只是具备开始新线程的能力,具体开启与否还要跟队列的属性有关系。串行和并发,是指的任务的执行方式。并发是任务可以多个同时执行,串行之能是一个执行完成后在执行下一个。在面试的过程中可能被问到什么网情况下会出现死锁的问题,总结一下就是使用sync函数(同步)往当前的串行对列中添加任务时,会出现死锁。锁多线程的安全隐患多线程和安全问题是分不开的,因为在使用多个线程访问同一块数据的时候,如果同时有读写操作,就可能产生数据安全问题。所以这时候我们就用到了锁这个东西。其实使用锁也是为了在使用多线程的过程中保障数据安全,除了锁,然后一些其他的实现线程同步来保证数据安全的方案,我们一起来了解一下。线程同步方案下面这些是我们常用来实现线程同步方案的。OSSpinLockos_unfair_lockpthread_mutexNSLockNSRecursiveLockNSConditionNSConditinLockdispatch_semaphoredispatch_queue(DISPATCH_QUEUE_SERIAL)@synchronized可以看出来,实现线程同步的方案包括各种锁,还有信号量,串行队列。我们只挑其中不常用的来说一下使用方法。下面是我们模拟了存钱取钱的场景,下面是加锁之前的代码,运行之后肯定是有数据问题的。/** 存钱、取钱演示 /- (void)moneyTest { self.money = 100; dispatch_queue_t queue = dispatch_get_global_queue(0, 0); dispatch_async(queue, ^{ for (int i = 0; i < 10; i++) { [self __saveMoney]; } }); dispatch_async(queue, ^{ for (int i = 0; i < 10; i++) { [self __drawMoney]; } });}/* 存钱 /- (void)__saveMoney { int oldMoney = self.money; sleep(.2); oldMoney += 50; self.money = oldMoney; NSLog(@“存50,还剩%d元 - %@”, oldMoney, [NSThread currentThread]); }/* 取钱 /- (void)__drawMoney { int oldMoney = self.money; sleep(.2); oldMoney -= 20; self.money = oldMoney; NSLog(@“取20,还剩%d元 - %@”, oldMoney, [NSThread currentThread]); }加锁的代码,涉及到锁的初始化、加锁、解锁这么三部分。我们从OSSpinLock开始说。OSSpinLock自旋锁OSSpinLock叫做自旋锁。那什么叫自旋锁呢?其实我们可以从大类上面把锁分为两类,一类是自旋锁,一类是互斥锁。我们通过一个例子来区分这两类锁。如果线程A率先到达加锁的部分,并成功加锁,线程B到达的时候会因为已经被A加锁而等待。如果是自旋锁,线程B会通过执行一个循环来实现等待,我们不用管它循环执行了什么,只要知道他在那"转圈圈"等着就行。如果是互斥锁,那线程B在等待的时候会休眠。使用OSSpinLock需要导入头文件#import <libkern/OSAtomic.h>//声明一个锁@property (nonatomic, assign) OSSpinLock lock;// 锁的初始化self.lock = OS_SPINLOCK_INIT;在我们这个例子中,存钱取钱都是访问了money,所以我们要在存和取的操作中使用同一个锁。/* 存钱 /- (void)__saveMoney { OSSpinLockLock(&_lock); //….省去中间的逻辑代码 OSSpinLockUnlock(&_lock);}/* 取钱 */- (void)__drawMoney { OSSpinLockLock(&_lock); //….省去中间的逻辑代码 OSSpinLockUnlock(&_lock);}这就是简单的自旋锁的使用,我们发现在使用的过程中,Xcode一直提醒我们这个OSSpinLock被废弃了,让我们使用os_unfair_lock代替。OSSpinLock之所以会被废弃是因为它可能会产生一个优先级反转的问题。具体来说,如果一个低优先级的线程获得了锁并访问共享资源,那高优先级的线程只能忙等,从而占用大量的CPU。低优先级的线程无法和高优先级的线程竞争(CPU会给高优先级的线程分配更多的时间片),所以会导致低优先级的线程的任务一直完不成,从而无法释放锁。os_unfair_lock的用法跟OSSpinLock很像,就不单独说了。pthread_mutexDefault一看到这个pthread我们应该就能知道这是一种跨平台的方案了。首先还是来看用法。//声明一个锁@property (nonatomic, assign) pthread_mutex_t lock;//初始化pthread_mutex_init(pthread_mutex_t *restrict _Nonnull, const pthread_mutexattr_t *restrict _Nullable)我们可以看到在初始化锁的时候,第一个参数是锁的地址,第二个参数是一个pthread_mutexattr_t类型的地址,如果我们不传pthread_mutexattr_t,直接传一个NULL,相当于创建一个默认的互斥锁。//方式一pthread_mutex_init(mutex, NULL);//方式二// - 创建attrpthread_mutexattr_t attr;// - 初始化attrpthread_mutexattr_init(&attr);// - 设置attr类型pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_DEFAULT);// - 使用attr初始化锁pthread_mutex_init(&_lock, &attr);// - 销毁attrpthread_mutexattr_destroy(&attr);上面两个方式是一个效果,那为什么使用attr,那就说明除了default类型的还有其他类型,我们后面再说。在使用的时候用pthread_mutex_lock(&_lock); 和 pthread_mutex_unlock(&_lock);加锁解锁。NSLock就是对这种普通互斥锁的OC层面的封装。RECURSIVE 递归锁调用pthread_mutexattr_settype的时候如果类型传入PTHREAD_MUTEX_RECURSIVE,会创建一个递归锁。举个例子吧。// 伪代码-(void)test { lock; [self test]; unlock;}如果是普通的锁,当我们在test方法中,递归调用test,应该会出现死锁,因为被lock,在递归调用时无法调用,一直等待。但是如果锁是递归锁,他会允许同一个线程多次加锁和解锁,就可以解决这个问题了。NSRecursiveLock是对递归锁的封装。Condition 条件锁我们直接上这种锁的使用方法,- (void)otherTest{ [[[NSThread alloc] initWithTarget:self selector:@selector(__remove) object:nil] start]; [[[NSThread alloc] initWithTarget:self selector:@selector(__add) object:nil] start];}// 线程1// 删除数组中的元素- (void)__remove { pthread_mutex_lock(&_mutex); NSLog(@"__remove - begin"); if (self.data.count == 0) { // 等待 pthread_cond_wait(&_cond, &_mutex); } [self.data removeLastObject]; NSLog(@“删除了元素”); pthread_mutex_unlock(&_mutex);}// 线程2// 往数组中添加元素- (void)__add { pthread_mutex_lock(&_mutex); sleep(1); [self.data addObject:@“Test”]; NSLog(@“添加了元素”); // 信号 pthread_cond_signal(&_cond); // 广播// pthread_cond_broadcast(&_cond); pthread_mutex_unlock(&_mutex);}我们创建了两个线程,一个往数组中添加数据,一个删除数据,我们通过这个条件锁实现的效果就是在数组中还没有数据的时候等待,数组中添加了一个数据之后在进行删除。条件锁就是互斥锁+条件。我们声明一个条件并初始化。@property (assign, nonatomic) pthread_cond_t cond;//使用完后也要pthread_cond_destroy(&_cond);pthread_cond_init(&_cond, NULL);在__remove方法中if (self.data.count == 0) { // 等待 pthread_cond_wait(&_cond, &_mutex);}如果线程1率先拿到所并加锁,执行到上面代码这里发现数组中还没有数据,就执行pthread_cond_wait,此时线程1会暂时放开_mutex这个锁,并在这休眠等待。线程2在__add方法中最开始因为拿不到锁,所以等待,在线程1休眠放开锁之后拿到锁,加锁,并执行为数组添加数据的代码。添加完了之后会发个信号通知等待条件的线程,并解锁。 pthread_cond_signal(&_cond); pthread_mutex_unlock(&_mutex);线程2执行了pthread_cond_signal之后,线程1就收到了通知,退出休眠状态,继续执行下面的代码。这个地方可能有人会有疑问,是不是线程2应该先unlock再cond_dingnal,其实这个地方顺序没有太大差别,因为线程2执行了pthread_cond_signal之后,会继续执行unlock代码,线程1收到signal通知后会推出休眠状态,同时线程1需要再一次持有这个锁,就算此时线程2还没有unlock,线程1等到线程2 unlock 的时间间隔很短,等到线程2 unlock 后线程1会再去持有这个锁,并加锁。NSCondition就是OC层面的条件锁,内部把mutex互斥锁和条件封装到了一起。NSConditionLock其实也差不多,NSConditionLock可以指定具体的条件,这两个OC层面的类的用法大家可以自行上网搜索。dispatch_semaphore 信号量@property (strong, nonatomic) dispatch_semaphore_t semaphore;//初始化self.semaphore = dispatch_semaphore_create(5);在初始化一个信号的的过程中传入dispatch_semaphore_create的值,其实就代表了允许几个线程同时访问。再回到之前我们存钱取钱这个例子。self.moneySemaphore = dispatch_semaphore_create(1);我们一次只允许一个线程访问,所以在初始化的时候传1。下面就是使用方法。- (void)__drawMoney{ dispatch_semaphore_wait(self.moneySemaphore, DISPATCH_TIME_FOREVER); // … 省略代码 dispatch_semaphore_signal(self.moneySemaphore);}- (void)__saveMoney{ dispatch_semaphore_wait(self.moneySemaphore, DISPATCH_TIME_FOREVER); // … 省略代码 dispatch_semaphore_signal(self.moneySemaphore);}dispatch_semaphore_wait是怎么上锁的呢?如果信号量>0的时候,让信号量-1,并继续往下执行。如果信号量<=0的时候,休眠等待。就这么简单。dispatch_semaphore_signal让信号量+1。小提示在我们平时使用这种方法的时候,可以把信号量的代码提取出来定义一个宏。#define SemaphoreBegin \static dispatch_semaphore_t semaphore; \static dispatch_once_t onceToken; \dispatch_once(&onceToken, ^{ \ semaphore = dispatch_semaphore_create(1); }); \dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);#define SemaphoreEnd \dispatch_semaphore_signal(semaphore);读写安全方案上面我们讲到的线程同步方案都是每次只允许一个线程访问,在实际的情况中,读写的同步方案应该下面这样:每次只能有一个线程写可以有多个线程同时读读和写不能同时进行这就是多读单写,用于文件读写的操作。在我们的iOS中可以用下面这两种解决方案。pthread_rwlock 读写锁这个读写锁的用法很简单,跟之前的普通互斥锁都差不多,大家随便搜一下应该就能搜到,我就不拿出来写了,这里主要是提一下这种锁,大家以后有需要的时候可以用。dispatch_barrier_async 异步栅栏首先在使用这个函数的时候,我们要用自己创建的并发队列。如果传入的是一个串行队列或者全局的并发队列,那dispatch_barrier_async等同于dispatch_async的效果。self.queue = dispatch_queue_create(“rw_queue”, DISPATCH_QUEUE_CONCURRENT);dispatch_async(self.queue, ^{ [self read];}); dispatch_barrier_async(self.queue, ^{ [self write];});在读取数据的时候,使用dispatch_async往对列中添加任务,在写数据时,用dispatch_barrier_async添加任务。dispatch_barrier_async添加的任务会等前面所有的任务都执行完,他再执行,而且他执行的时候,不允许有别的任务同时执行。atomic我们都知道这个atomic是原子性的意思。他保证了属性setter和getter的原子性操作,相当于在set和get方法内部加锁。atomic修饰的属性是读/写安全的,但不是线程安全。假设有一个 atomic 的属性 “name”,如果线程 A 调用 [self setName:@“A”],线程 B 调用 [self setName:@“B”],线程 C 调用 [self name],那么所有这些不同线程上的操作都将依次顺序执行——也就是说,如果一个线程正在执行 getter/setter,其他线程就得等待。因此,属性 name 是读/写安全的。但是,如果有另一个线程 D 同时在调[name release],那可能就会crash,因为 release 不受 getter/setter 操作的限制。也就是说,这个属性只能说是读/写安全的,但并不是线程安全的,因为别的线程还能进行读写之外的其他操作。线程安全需要开发者自己来保证。 ...

April 1, 2019 · 2 min · jiezi

Netty中的Channel之数据冲刷与线程安全(writeAndFlush)

本文首发个人博客:猫叔的博客 | MySelfGitHub项目地址InChat一个轻量级、高效率的支持多端(应用与硬件Iot)的异步网络应用通讯框架前言本文预设读者已经了解了一定的Netty基础知识,并能够自己构建一个Netty的通信服务(包括客户端与服务端)。那么你一定使用到了Channel,这是Netty对传统JavaIO、NIO的链接封装实例。那么接下来让我们来了解一下关于Channel的数据冲刷与线程安全吧。数据冲刷的步骤1、获取一个链接实例@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel();}我将案例放在初学者最熟悉的channelRead方法中,这是一个数据接收的方法,我们自实现Netty的消息处理接口时需要重写的方法。即客户端发送消息后,这个方法会被触发调用,所以我们在这个方法中进行本次内容的讲解。由上一段代码,其实目前还是很简单,我们借助ChannelHandlerContext(这是一个ChannelHandler与ChannelPipeline相交互并对接的一个对象。如下是源码的解释)来获取目前的链接实例Channel。/* Enables a {@link ChannelHandler} to interact with its {@link ChannelPipeline} * and other handlers. Among other things a handler can notify the next {@link ChannelHandler} in the * {@link ChannelPipeline} as well as modify the {@link ChannelPipeline} it belongs to dynamically. / public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { //…… }2、创建一个持有数据的ByteBuf@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8);}ByteBuf又是什么呢?它是Netty框架自己封装的一个字符底层对象,是一个对 byte[] 和 ByteBuffer NIO 的抽象类,更官网的说就是“零个或多个字节的随机和顺序可访问的序列。”,如下是源码的解释/* * A random and sequential accessible sequence of zero or more bytes (octets). * This interface provides an abstract view for one or more primitive byte * arrays ({@code byte[]}) and {@linkplain ByteBuffer NIO buffers}. / public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> { //…… }由上一段源码可以看出,ByteBuf是一个抽象类,所以我们不能通过 new 的形式来创建一个新的ByteBuf对象。那么我们可以通过Netty提供的一个 final 的工具类 Unpooled(你将其看作是一个创建ByteBuf的工具类就好了)。/* * Creates a new {@link ByteBuf} by allocating new space or by wrapping * or copying existing byte arrays, byte buffers and a string. / public final class Unpooled { //…… }这真是一个有趣的过程,那么接下来我们仅需要再看看 copiedBuffer 这个方法了。这个方法相对简单,就是我们将创建一个新的缓冲区,其内容是我们指定的 UTF-8字符集 编码指定的 “data” ,同时这个新的缓冲区的读索引和写索引分别是0和字符串的长度。3、冲刷数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 channel.writeAndFlush(buf);}我相信大部分人都是直接这么写的,因为我们经常理所当然的启动测试,并在客户端接受到了这个 “data” 消息。那么我们是否应该注意一下,这个数据冲刷会返回一个什么值,我们要如何才能在服务端知道,这次数据冲刷是成功还是失败呢?那么其实Netty框架已经考虑到了这个点,本次数据冲刷我们将得到一个 ChannelFuture 。@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 ChannelFuture cf = channel.writeAndFlush(buf);}是的,他就是 Channel 异步IO操作的结果,它是一个接口,并继承了Future<V>。(如下为源码的解释)/* * The result of an asynchronous {@link Channel} I/O operation. / public interface ChannelFuture extends Future<Void> { //…… }既然如此,那么我们可以明显的知道我们可以对其添加对应的监听。4、异步回调结果监听@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取链接实例 Channel channel = ctx.channel(); //创建一个持有数据的ByteBuf ByteBuf buf = Unpooled.copiedBuffer(“data”, CharsetUtil.UTF_8); //数据冲刷 ChannelFuture cf = channel.writeAndFlush(buf); //添加ChannelFutureListener以便在写操作完成后接收通知 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { //写操作完成,并没有错误发生 if (future.isSuccess()){ System.out.println(“successful”); }else{ //记录错误 System.out.println(“error”); future.cause().printStackTrace(); } } });}好的,我们可以简单的从代码理解到,我们将通过对异步IO的结果监听,得到本次运行的结果。我想这才是一个相对完整的 数据冲刷(writeAndFlush)。测试线程安全的流程对于线程安全的测试,我们将模拟多个线程去执行数据冲刷操作,我们可以用到 Executor 。我们可以这样理解 Executor ,是一种省略了线程启用与调度的方式,你只需要传递一个 Runnable 给它即可,你不再需要去 start 一个线程。(如下是源码的解释)/* * An object that executes submitted {@link Runnable} tasks. This * interface provides a way of decoupling task submission from the * mechanics of how each task will be run, including details of thread * use, scheduling, etc. An {@code Executor} is normally used * instead of explicitly creating threads. For example, rather than * invoking {@code new Thread(new(RunnableTask())).start()} for each * of a set of tasks, you might use:… / public interface Executor { //…… }那么我们的测试代码,大致是这样的。final Channel channel = ctx.channel();//创建要写数据的ByteBuffinal ByteBuf buf = Unpooled.copiedBuffer(“data”,CharsetUtil.UTF_8).retain();//创建将数据写到Channel的RunnableRunnable writer = new Runnable() { @Override public void run() { ChannelFuture cf = channel.writeAndFlush(buf.duplicate()); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { //写操作完成,并没有错误发生 if (future.isSuccess()){ System.out.println(“successful”); }else{ //记录错误 System.out.println(“error”); future.cause().printStackTrace(); } } }); }};//获取到线程池的Executor的引用Executor executor = Executors.newCachedThreadPool();//提交到某个线程中执行executor.execute(writer);//提交到另一个线程中执行executor.execute(writer);这里,我们需要注意的是:创建 ByteBuf 的时候,我们使用了 retain 这个方法,他是将我们生成的这个 ByteBuf 进行保留操作。在 ByteBuf 中有这样的一种区域: 非保留和保留派生缓冲区。这里有点复杂,我们可以简单的理解,如果调用了 retain 那么数据就存在派生缓冲区中,如果没有调用,则会在调用后,移除这一个字符数据。(如下是 ByteBuf 源码的解释)/<h4>Non-retained and retained derived buffers</h4> * * Note that the {@link #duplicate()}, {@link #slice()}, {@link #slice(int, int)} and {@link #readSlice(int)} does NOT * call {@link #retain()} on the returned derived buffer, and thus its reference count will NOT be increased. If you * need to create a derived buffer with increased reference count, consider using {@link #retainedDuplicate()}, * {@link #retainedSlice()}, {@link #retainedSlice(int, int)} and {@link #readRetainedSlice(int)} which may return * a buffer implementation that produces less garbage. */好的,我想你可以自己动手去测试一下,最好再看看源码,加深一下实现的原理印象。这里的线程池并不是现实线程安全,而是用来做测试多线程的,Netty的Channel实现是线程安全的,所以我们可以存储一个到Channel的引用,并且每当我们需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它,消息也会被保证按顺序发送的。结语最后,介绍一下,个人的一个基于Netty的开源项目:InChat一个轻量级、高效率的支持多端(应用与硬件Iot)的异步网络应用通讯框架参考资料: 《Netty实战》 ...

December 24, 2018 · 3 min · jiezi