关于c++:linux下C多线程并发之原子操作与无锁编程

44次阅读

共计 5991 个字符,预计需要花费 15 分钟才能阅读完成。

一、何为原子操作

原子操作:顾名思义就是不可分割的操作,该操作只存在未开始和已实现两种状态,不存在中间状态;

原子类型:原子库中定义的数据类型,对这些类型的所有操作都是原子的,包含通过原子类模板 std::atomic< T > 实例化的数据类型,也都是反对原子操作的。

二、如何应用原子类型

2.1 原子库 atomic 反对的原子操作

原子库 < atomic > 中提供了一些根本原子类型,也能够通过原子类模板实例化一个原子对象,上面列出一些根本原子类型及相应的特化模板如下:

对原子类型的拜访,最次要的就是读和写,但原子库提供的对应原子操作是 load()与 store(val)。原子类型反对的原子操作如下:

2.2 原子操作中的内存拜访模型

原子操作保障了对数据的拜访只有未开始和已实现两种状态,不会拜访到中间状态,但咱们拜访数据个别是须要特定程序的,比方想读取写入后的最新数据,原子操作函数是反对管制读写程序的,即带有一个数据同步内存模型参数 std::memory_order,用于对同一时间的读写操作进行排序。C++11 定义的 6 种类型如下:

  • memory_order_relaxed: 宽松操作,没有同步或程序制约,仅对此操作要求原子性;
  • memory_order_release & memory_order_acquire: 两个线程 A &B,A 线程 Release 后,B 线程 Acquire 能保障肯定读到的是最新被批改过的值;这种模型更弱小的中央在于它能保障产生在 A -Release 前的所有写操作,在 B -Acquire 后都能读到最新值;
  • memory_order_release & memory_order_consume: 上一个模型的同步是针对所有对象的,这种模型只针对依赖于该操作波及的对象:比方这个操作产生在变量 a 上,而 s = a + b; 那 s 依赖于 a,但 b 不依赖于 a; 当然这里也有循环依赖的问题,例如:t = s + 1,因为 s 依赖于 a,那 t 其实也是依赖于 a 的;
  • memory_order_seq_cst: 程序一致性模型,这是 C ++11 原子操作的默认模型;大略行为为对每一个变量都进行 Release-Acquire 操作,当然这也是一个最慢的同步模型;

内存拜访模型属于比拟底层的管制接口,如果对编译原理和 CPU 指令执行过程不理解的话,容易引入 bug。内存模型不是本章重点,这里不再开展介绍,后续的代码都应用默认的程序一致性模型或比拟稳当的 Release-Acquire 模型。

须要 C /C++ Linux 服务器架构师学习材料加群 563998835 获取(材料包含 C /C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),收费分享

2.3 应用原子类型代替互斥锁编程

为便于比拟,间接基于前篇文章:线程同步之互斥锁中的示例程序进行批改,用原子库取代互斥库的代码如下:

//atomic1.cpp 应用原子库取代互斥库实现线程同步
#include <chrono>#include <atomic>#include <thread>#include <iostream> std::chrono::milliseconds interval(100);
std::atomic<bool> readyFlag(false);     // 原子布尔类型,取代互斥量
std::atomic<int> job_shared(0); // 两个线程都能批改 'job_shared', 将该变量特化为原子类型
int job_exclusive = 0; // 只有一个线程能批改 'job_exclusive', 不须要爱护
// 此线程只能批改 'job_shared'
void job_1(){       std::this_thread::sleep_for(5 * interval);
    job_shared.fetch_add(1);
    std::cout << "job_1 shared (" << job_shared.load() << ")n";
    readyFlag.store(true);      // 扭转布尔标记状态为真
}// 此线程能批改 'job_shared' 和 'job_exclusive'
void job_2(){    while (true) {    // 有限循环,直到可拜访并批改 'job_shared'
        if (readyFlag.load()) {     // 判断布尔标记状态是否为真,为真则批改‘job_shared’job_shared.fetch_add(1);
            std::cout << "job_2 shared (" << job_shared.load() << ")n";
            return;
        } else {      // 布尔标记为假, 则批改 'job_exclusive'
            ++job_exclusive;            std::cout << "job_2 exclusive (" << job_exclusive << ")n";
            std::this_thread::sleep_for(interval);        }    }}int main() {    std::thread thread_1(job_1);    std::thread thread_2(job_2);    thread_1.join();    thread_2.join();    getchar();    return 0;}

由示例程序能够看出,原子布尔类型能够实现互斥锁的局部性能,但在应用条件变量 condition variable 时,依然须要 mutex 爱护对 condition variable 的生产,即便 condition variable 是一个 atomic object。

2.4 应用原子类型实现自旋锁

自旋锁(spinlock)与互斥锁 (mutex) 相似,在任一时刻最多只能有一个持有者,但如果资源已被占用,互斥锁会让资源申请者进入睡眠状态,而自旋锁不会引起调用者睡眠,会始终循环判断该锁是否胜利获取。自旋锁是专为避免多处理器并发而引入的一种锁,它在内核中大量利用于中断解决等局部(对于单处理器来说,避免中断解决中的并发可简略采纳敞开中断的形式,即在标记寄存器中敞开 / 关上中断标记位,不须要自旋锁)。

对于多核处理器来说,检测到锁可用与设置锁状态两个动作须要实现为一个原子操作,如果分为两个原子操作,则可能一个线程在取得锁后设置锁前被其余线程抢到该锁,导致执行谬误。这就须要原子库提供对原子变量“读 - 批改 - 写 (Read-Modify-Write)”的原子操作,上文原子类型反对的操作中就提供了 RMW(Read-Modify-Write) 原子操作,比方 a.exchange(val)与 a.compare_exchange(expected,desired)。

规范库还专门提供了一个原子布尔类型 std::atomic_flag,不同于所有 std::atomic 的特化,它保障是免锁的,不提供 load()与 store(val)操作,但提供了 test_and_set()与 clear()操作,其中 test_and_set()就是反对 RMW 的原子操作,可用 std::atomic_flag 实现自旋锁的性能,代码如下:

//atomic2.cpp 应用原子布尔类型实现自旋锁的性能
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
 
std::atomic_flag lock = ATOMIC_FLAG_INIT;       // 初始化原子布尔类型
 
void f(int n)
{for (int cnt = 0; cnt < 100; ++cnt) {while (lock.test_and_set(std::memory_order_acquire))  // 取得锁
 ; // 自旋
 std::cout << n << "thread Output:" << cnt << 'n';
 lock.clear(std::memory_order_release);               // 开释锁
 }
}
 
int main()
{
 std::vector<std::thread> v;     // 实例化一个元素类型为 std::thread 的向量
 for (int n = 0; n < 10; ++n) {v.emplace_back(f, n);       // 以参数 (f,n) 为初值的元素放到向量开端,相当于启动新线程 f(n)
 }
 for (auto& t : v) {     // 遍历向量 v 中的元素,基于范畴的 for 循环,auto& 主动推导变量类型并援用指针指向的内容
 t.join();           // 阻塞主线程直至子线程执行结束}
 getchar();
 return 0;
}

自旋锁除了应用 atomic_flag 的 TAS(Test And Set)原子操作实现外,还能够应用一般的原子类型 std::atomic 实现:其中 a.exchange(val)是反对 TAS 原子操作的,a.compare_exchange(expected,desired)是反对 CAS(Compare And Swap)原子操作的,感兴趣能够本人实现进去。其中 CAS 原子操作是无锁编程的次要实现伎俩,咱们接着往下介绍无锁编程。

三、如何进行无锁编程

3.1 什么是无锁编程

在原子操作呈现之前,对共享数据的读写可能失去不确定的后果,所以多线程并发编程时要对应用锁机制对共享数据的拜访过程进行爱护。但锁的申请开释减少了访问共享资源的耗费,且可能引起线程阻塞、锁竞争、死锁、优先级反转、难以调试等问题。

当初有了原子操作的反对,对单个根底数据类型的读、写访问能够不必锁爱护了,但对于简单数据类型比方链表,有可能呈现多个外围在链表同一地位同时增删节点的状况,这将会导致操作失败或错序。所以咱们在对某节点操作前,须要先判断该节点的值是否跟预期的统一,如果统一则进行操作,不统一则更新期望值,这几步操作仍然须要实现为一个 RMW(Read-Modify-Write)原子操作,这就是后面提到的 CAS(Compare And Swap)原子操作,它是无锁编程中最罕用的操作。

既然无锁编程是为了解决锁机制带来的一些问题而呈现的,那么无锁编程就能够了解为不应用锁机制就可保障多线程间原子变量同步的编程。无锁 (lock-free) 的实现只是将多条指令合并成了一条指令造成一个逻辑齐备的最小单元,通过兼容 CPU 指令执行逻辑造成的一种多线程编程模型。

无锁编程是基于原子操作的,对根本原子类型的共享拜访由 load()与 store(val)即可保障其并发同步,对形象简单类型的共享拜访则须要更简单的 CAS 来保障其并发同步,并发拜访过程只是不应用锁机制了,但还是能够了解为有锁止行为的,其粒度很小,性能更高。对于某个无奈实现为一个原子操作的并发拜访过程还是须要借助锁机制来实现。

3.1 CAS 原子操作实现无锁编程

CAS 原子操作次要是通过函数 a.compare_exchange(expected,desired)实现的,其语义为“我认为 V 的值应该为 A,如果是,那么将 V 的值更新为 B,否则不批改并通知 V 的值理论为多少”,CAS 算法的实现伪码如下:

bool compare_exchange_strong(T& expected, T desired) 
{if( this->load() == expected ) {this->store(desired); 
        return true; 
    } else {expected = this->load();
        return false; 
    } }

上面尝试实现一个无锁栈,代码如下:

//atomic3.cpp 应用 CAS 操作实现一个无锁栈
#include <atomic>#include <iostream>template<typename T>class lock_free_stack{private:    struct node    {        T data;        node* next;
        node(const T& data) : data(data), next(nullptr) {}};    std::atomic<node*> head; public:    lock_free_stack(): head(nullptr) {}    void push(const T& data)    {node* new_node = new node(data);        do{new_node->next = head.load();   // 将 head 的以后值放入 new_node->next
        }while(!head.compare_exchange_strong(new_node->next, new_node));
        // 如果新元素 new_node 的 next 和栈顶 head 一样,证实在你之前没人操作它,应用新元素替换栈顶退出即可;// 如果不一样,证实在你之前曾经有人操作它,栈顶已产生扭转,该函数会自动更新新元素的 next 值为扭转后的栈顶;// 而后持续循环检测直到状态 1 成立退出;}    T pop()    {        node* node;        do{node = head.load();
        }while (node && !head.compare_exchange_strong(node, node->next));
        if(node) 
            return node->data;
    }}; int main(){    lock_free_stack<int> s;    s.push(1);
    s.push(2);
    s.push(3);
    std::cout << s.pop() << std::endl;    std::cout << s.pop() << std::endl;        getchar();    return 0;}

程序正文中曾经解释的很分明了,在将数据压栈前,先通过比拟原子类型 head 与新元素的 next 指向对象是否相等来判断 head 是否已被其余线程批改,依据判断后果抉择是持续操作还是更新冀望,而这一切都是在一个原子操作中实现的,保障了在不应用锁的状况下实现共享数据的并发同步。

CAS 看起来很厉害,但也有毛病,最驰名的就是 ABA 问题,假如一个变量 A,批改为 B 之后又批改为 A,CAS 的机制是无奈觉察的,但实际上曾经被批改过了。如果在根本类型上是没有问题的,然而如果是援用类型呢?这个对象中有多个变量,我怎么晓得有没有被改过?聪慧的你肯定想到了,加个版本号啊。每次批改就查看版本号,如果版本号变了,阐明改过,就算你还是 A,也不行。

下面的例子节点指针也属于援用类型,天然也存在 ABA 问题,比方在线程 2 执行 pop 操作,将 A,B 都删掉,而后创立一个新元素 push 进去,因为操作系统的内存分配机制会重复使用之前开释的内存,恰好 push 进去的内存地址和 A 一样,咱们记为 A’,这时候切换到线程 1,CAS 操作查看到 A 没变动胜利将 B 设为栈顶,但 B 是一个曾经被开释的内存块。该问题的解决方案就是下面说的通过打标签标识 A 和 A’为不同的指针,具体实现代码读者能够尝试实现

正文完
 0