一、何为原子操作

原子操作:顾名思义就是不可分割的操作,该操作只存在未开始和已实现两种状态,不存在中间状态;

原子类型:原子库中定义的数据类型,对这些类型的所有操作都是原子的,包含通过原子类模板std::atomic< T >实例化的数据类型,也都是反对原子操作的。

二、如何应用原子类型

2.1 原子库atomic反对的原子操作

原子库< atomic >中提供了一些根本原子类型,也能够通过原子类模板实例化一个原子对象,上面列出一些根本原子类型及相应的特化模板如下:

对原子类型的拜访,最次要的就是读和写,但原子库提供的对应原子操作是load()与store(val)。原子类型反对的原子操作如下:

2.2 原子操作中的内存拜访模型

原子操作保障了对数据的拜访只有未开始和已实现两种状态,不会拜访到中间状态,但咱们拜访数据个别是须要特定程序的,比方想读取写入后的最新数据,原子操作函数是反对管制读写程序的,即带有一个数据同步内存模型参数std::memory_order,用于对同一时间的读写操作进行排序。C++11定义的6种类型如下:

  • memory_order_relaxed: 宽松操作,没有同步或程序制约,仅对此操作要求原子性;
  • memory_order_release & memory_order_acquire: 两个线程A&B,A线程Release后,B线程Acquire能保障肯定读到的是最新被批改过的值;这种模型更弱小的中央在于它能保障产生在A-Release前的所有写操作,在B-Acquire后都能读到最新值;
  • memory_order_release & memory_order_consume: 上一个模型的同步是针对所有对象的,这种模型只针对依赖于该操作波及的对象:比方这个操作产生在变量a上,而s = a + b; 那s依赖于a,但b不依赖于a; 当然这里也有循环依赖的问题,例如:t = s + 1,因为s依赖于a,那t其实也是依赖于a的;
  • memory_order_seq_cst: 程序一致性模型,这是C++11原子操作的默认模型;大略行为为对每一个变量都进行Release-Acquire操作,当然这也是一个最慢的同步模型;

内存拜访模型属于比拟底层的管制接口,如果对编译原理和CPU指令执行过程不理解的话,容易引入bug。内存模型不是本章重点,这里不再开展介绍,后续的代码都应用默认的程序一致性模型或比拟稳当的Release-Acquire模型。

须要C/C++ Linux服务器架构师学习材料加群563998835获取(材料包含C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),收费分享

2.3 应用原子类型代替互斥锁编程

为便于比拟,间接基于前篇文章:线程同步之互斥锁中的示例程序进行批改,用原子库取代互斥库的代码如下:

//atomic1.cpp 应用原子库取代互斥库实现线程同步#include <chrono>#include <atomic>#include <thread>#include <iostream> std::chrono::milliseconds interval(100);std::atomic<bool> readyFlag(false);     //原子布尔类型,取代互斥量std::atomic<int> job_shared(0); //两个线程都能批改'job_shared',将该变量特化为原子类型int job_exclusive = 0; //只有一个线程能批改'job_exclusive',不须要爱护//此线程只能批改 'job_shared'void job_1(){       std::this_thread::sleep_for(5 * interval);    job_shared.fetch_add(1);    std::cout << "job_1 shared (" << job_shared.load() << ")n";    readyFlag.store(true);      //扭转布尔标记状态为真}// 此线程能批改'job_shared'和'job_exclusive'void job_2(){    while (true) {    //有限循环,直到可拜访并批改'job_shared'        if (readyFlag.load()) {     //判断布尔标记状态是否为真,为真则批改‘job_shared’            job_shared.fetch_add(1);            std::cout << "job_2 shared (" << job_shared.load() << ")n";            return;        } else {      //布尔标记为假,则批改'job_exclusive'            ++job_exclusive;            std::cout << "job_2 exclusive (" << job_exclusive << ")n";            std::this_thread::sleep_for(interval);        }    }}int main() {    std::thread thread_1(job_1);    std::thread thread_2(job_2);    thread_1.join();    thread_2.join();    getchar();    return 0;}

由示例程序能够看出,原子布尔类型能够实现互斥锁的局部性能,但在应用条件变量condition variable时,依然须要mutex爱护对condition variable的生产,即便condition variable是一个atomic object。

2.4 应用原子类型实现自旋锁

自旋锁(spinlock)与互斥锁(mutex)相似,在任一时刻最多只能有一个持有者,但如果资源已被占用,互斥锁会让资源申请者进入睡眠状态,而自旋锁不会引起调用者睡眠,会始终循环判断该锁是否胜利获取。自旋锁是专为避免多处理器并发而引入的一种锁,它在内核中大量利用于中断解决等局部(对于单处理器来说,避免中断解决中的并发可简略采纳敞开中断的形式,即在标记寄存器中敞开/关上中断标记位,不须要自旋锁)。

对于多核处理器来说,检测到锁可用与设置锁状态两个动作须要实现为一个原子操作,如果分为两个原子操作,则可能一个线程在取得锁后设置锁前被其余线程抢到该锁,导致执行谬误。这就须要原子库提供对原子变量“读-批改-写(Read-Modify-Write)”的原子操作,上文原子类型反对的操作中就提供了RMW(Read-Modify-Write)原子操作,比方a.exchange(val)与a.compare_exchange(expected,desired)。

规范库还专门提供了一个原子布尔类型std::atomic_flag,不同于所有 std::atomic 的特化,它保障是免锁的,不提供load()与store(val)操作,但提供了test_and_set()与clear()操作,其中test_and_set()就是反对RMW的原子操作,可用std::atomic_flag实现自旋锁的性能,代码如下:

//atomic2.cpp 应用原子布尔类型实现自旋锁的性能#include <thread>#include <vector>#include <iostream>#include <atomic> std::atomic_flag lock = ATOMIC_FLAG_INIT;       //初始化原子布尔类型 void f(int n){ for (int cnt = 0; cnt < 100; ++cnt) { while (lock.test_and_set(std::memory_order_acquire))  // 取得锁 ; // 自旋 std::cout << n << " thread Output: " << cnt << 'n'; lock.clear(std::memory_order_release);               // 开释锁 }} int main(){ std::vector<std::thread> v;     //实例化一个元素类型为std::thread的向量 for (int n = 0; n < 10; ++n) { v.emplace_back(f, n);       //以参数(f,n)为初值的元素放到向量开端,相当于启动新线程f(n) } for (auto& t : v) {     //遍历向量v中的元素,基于范畴的for循环,auto&主动推导变量类型并援用指针指向的内容 t.join();           //阻塞主线程直至子线程执行结束 } getchar(); return 0;}

自旋锁除了应用atomic_flag的TAS(Test And Set)原子操作实现外,还能够应用一般的原子类型std::atomic实现:其中a.exchange(val)是反对TAS原子操作的,a.compare_exchange(expected,desired)是反对CAS(Compare And Swap)原子操作的,感兴趣能够本人实现进去。其中CAS原子操作是无锁编程的次要实现伎俩,咱们接着往下介绍无锁编程。

三、如何进行无锁编程

3.1 什么是无锁编程

在原子操作呈现之前,对共享数据的读写可能失去不确定的后果,所以多线程并发编程时要对应用锁机制对共享数据的拜访过程进行爱护。但锁的申请开释减少了访问共享资源的耗费,且可能引起线程阻塞、锁竞争、死锁、优先级反转、难以调试等问题。

当初有了原子操作的反对,对单个根底数据类型的读、写访问能够不必锁爱护了,但对于简单数据类型比方链表,有可能呈现多个外围在链表同一地位同时增删节点的状况,这将会导致操作失败或错序。所以咱们在对某节点操作前,须要先判断该节点的值是否跟预期的统一,如果统一则进行操作,不统一则更新期望值,这几步操作仍然须要实现为一个RMW(Read-Modify-Write)原子操作,这就是后面提到的CAS(Compare And Swap)原子操作,它是无锁编程中最罕用的操作。

既然无锁编程是为了解决锁机制带来的一些问题而呈现的,那么无锁编程就能够了解为不应用锁机制就可保障多线程间原子变量同步的编程。无锁(lock-free)的实现只是将多条指令合并成了一条指令造成一个逻辑齐备的最小单元,通过兼容CPU指令执行逻辑造成的一种多线程编程模型。

无锁编程是基于原子操作的,对根本原子类型的共享拜访由load()与store(val)即可保障其并发同步,对形象简单类型的共享拜访则须要更简单的CAS来保障其并发同步,并发拜访过程只是不应用锁机制了,但还是能够了解为有锁止行为的,其粒度很小,性能更高。对于某个无奈实现为一个原子操作的并发拜访过程还是须要借助锁机制来实现。

3.1 CAS原子操作实现无锁编程

CAS原子操作次要是通过函数a.compare_exchange(expected,desired)实现的,其语义为“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不批改并通知V的值理论为多少”,CAS算法的实现伪码如下:

bool compare_exchange_strong(T& expected, T desired) {     if( this->load() == expected ) {         this->store(desired);         return true;     } else {        expected = this->load();        return false;     } }

上面尝试实现一个无锁栈,代码如下:

//atomic3.cpp 应用CAS操作实现一个无锁栈#include <atomic>#include <iostream>template<typename T>class lock_free_stack{private:    struct node    {        T data;        node* next;        node(const T& data) : data(data), next(nullptr) {}    };    std::atomic<node*> head; public:    lock_free_stack(): head(nullptr) {}    void push(const T& data)    {        node* new_node = new node(data);        do{            new_node->next = head.load();   //将 head 的以后值放入new_node->next        }while(!head.compare_exchange_strong(new_node->next, new_node));        // 如果新元素new_node的next和栈顶head一样,证实在你之前没人操作它,应用新元素替换栈顶退出即可;        // 如果不一样,证实在你之前曾经有人操作它,栈顶已产生扭转,该函数会自动更新新元素的next值为扭转后的栈顶;        // 而后持续循环检测直到状态1成立退出;    }    T pop()    {        node* node;        do{            node = head.load();        }while (node && !head.compare_exchange_strong(node, node->next));        if(node)             return node->data;    }}; int main(){    lock_free_stack<int> s;    s.push(1);    s.push(2);    s.push(3);    std::cout << s.pop() << std::endl;    std::cout << s.pop() << std::endl;        getchar();    return 0;}

程序正文中曾经解释的很分明了,在将数据压栈前,先通过比拟原子类型head与新元素的next指向对象是否相等来判断head是否已被其余线程批改,依据判断后果抉择是持续操作还是更新冀望,而这一切都是在一个原子操作中实现的,保障了在不应用锁的状况下实现共享数据的并发同步。

CAS 看起来很厉害,但也有毛病,最驰名的就是 ABA 问题,假如一个变量 A ,批改为 B之后又批改为 A,CAS 的机制是无奈觉察的,但实际上曾经被批改过了。如果在根本类型上是没有问题的,然而如果是援用类型呢?这个对象中有多个变量,我怎么晓得有没有被改过?聪慧的你肯定想到了,加个版本号啊。每次批改就查看版本号,如果版本号变了,阐明改过,就算你还是 A,也不行。

下面的例子节点指针也属于援用类型,天然也存在ABA问题,比方在线程2执行pop操作,将A,B都删掉,而后创立一个新元素push进去,因为操作系统的内存分配机制会重复使用之前开释的内存,恰好push进去的内存地址和A一样,咱们记为A’,这时候切换到线程1,CAS操作查看到A没变动胜利将B设为栈顶,但B是一个曾经被开释的内存块。该问题的解决方案就是下面说的通过打标签标识A和A’为不同的指针,具体实现代码读者能够尝试实现