# C++并发编程 **Repository Path**: xxrl2018/c-concurrent-programming ## Basic Information - **Project Name**: C++并发编程 - **Description**: C++并发编程学习及实战总结 - **Primary Language**: C++ - **License**: GPL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2023-03-01 - **Last Updated**: 2023-03-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # C++并发编程 本仓库主要是学习《C++并发编程实战》这本书的实战代码总结以及其他并发编程的项目汇总。 仓库的按照如下结构进行组织: - component: 下是各个数据结构的线程安全实现 - activity: 各种线程活动函数 - test: 测试函数 - learn: 学习《C++并发编程实战》的示例代码 - lib: 存放了常用的库,包括自己编写的库函数 - exercise:并发编程小作业 ------------------------ 同步的《C++并发编程实战》阅读笔记: # 《C++并发编程实战》 # 第1章 你好,C++并发世界 ## 什么是并发 多个或同时独立进行的活动 两种并发:硬件所支持的并发任务数量(决定因素)、任务切换 - 多进程并发 - 优点:安全 - 缺点:设置复杂,速度慢,固定开销大,进程启动花费事件,消耗操作系统资源(管控进程) - 多线程并发(C++并发实现) - 优点:额外开销低,速度快 - 缺点:共享内存存在隐患 ## 并发与并行的区别 - 并行强调性能提升(例如并行算法) - 并发强调分离关注点,降低模块耦合,提高响应能力(例如系统并发能力) ## 使用并发技术的原因 1. 分离关注点:简化各个线程的内部逻辑,将线程间交互得以限定于代码中可明确辨识的切入点,而无须将不同任务的逻辑交错散置 这样做与CPU的内核数量无关,因为用线程分离关注点提升的是系统设计,而不是运算吞吐量 2. 提升性能 1. 任务并行:将任务分解,并行解决。易于采用这种并行方式的算法叫做尴尬并行(天然并行、方便并发)算法 2. 数据并行:将数据分解,并行解决 ## 什么时候避免并发 带来的收益(性能提升、模块耦合度降低)对不起维护多线程代码的复杂的时候 - 使用多线程的性能增幅可能不如预期:线程启动存在固有开销 - 线程是有限的。如果运行太多线程可能导致系统整体变慢 为了提高系统性能,以**可用的硬件并发资源**作为依据调整运行线程的数目 ## C++线程库的效率 低抽象损失 ## 简单示例 ```cpp #include #include void hello() { std::cout << "Hello Concurrent World\n"; } int main() { std::thread t(hello); t.join(); //主线程等待子线程退出 return 0; } //-lpthead 在Linux下使用thread库需要链接pthread库 ``` # 第2章 线程管控 ## 创建线程 当`main`函数返回时,整个进程结束,程序就会退出。 ### 构造 如果我们默认构造`std::thread`,那么它将不会和任何线程关联。 我们通过给`std::thread`传入可调用对象以及他所需要的参数,来设置子线程的入口(entry point)。**函数对象会被拷贝到属于新线程的存储空间中,并在那里被调用。**由此可以推断`thread`封装了这个可调用对象。 在传入这个可调用对象时要注意一个问题:如果我们想传入一个临时函数对象,我们可能会有这样的写法: ```cpp class Func { void operator() () {} }; int main() { std::thread t(Func()); t.join(); //Member reference base type 'std::thread (Func (*)())' is not a structure or union return 0; } ``` 第6行我们的意思是传入一个临时的Func函数对象供线程运行,但是却会被看作一个函数声明:参数是一个函数指针,函数指针指向的是一个返回Func,没有参数的函数;返回值是一个thread。为了解决这个问题我们有两种解决方案: ```cpp class Func { void operator() () {} }; int main() { //std::thread t((Func())); std::thread t{Func()}; t.join(); return 0; } ``` 1. 在`Func()`外围添加一个圆括号 2. 使用花括号进行列表初始化(推荐) 为了确保子线程正常运行(不被其他线程影响),我们应该让线程函数自含。 ### 析构 默认情况下如果`thread`析构,而且线程是`joinable`的,那么会导致程序崩溃(调用`std::terminate`);如果线程是`detach`后的,那么即使线程类析构,线程还是会继续在后台运行 ## 等待线程完成 调用线程类的`join`方法:主线程等待子线程结束,并回收子线程资源 `detach`方法:设定子线程分离,系统会在子线程运行结束后自动回收子线程资源,不会导致僵尸线程 ## 在异常情况下等待线程 如果子线程启动后主线程函数发生异常导致无法执行`join`函数,那么有可能导致僵尸线程。为了避免这种情况,我们应该用RAII的手法封装线程类,使得对线程的`join`无论如何都能够正确执行:将`join`放在析构函数中。 为了能够保证等待子线程`join`,我们可以使用RAII的手法进行如下封装: ```cpp #include #include #include #include #include std::mutex io_mutex; inline void print() { std::cout << std::endl; } template void print(T&& t, Args&&... args) { std::cout << t << " "; print(std::forward(args)...); } inline int getRandom(int min_ = 0, int max_ = 10) { static std::default_random_engine e(std::chrono::system_clock::now().time_since_epoch().count()); static std::uniform_int_distribution<> u; u.param(decltype(u)::param_type(min_, max_)); return u(e); } class thread_guard { std::thread &t_; public: explicit thread_guard(std::thread& t) : t_(t) {} ~thread_guard() { if (t_.joinable()) t_.join(); } thread_guard(const thread_guard&) = delete; thread_guard &operator= (const thread_guard&) = delete; }; //启动子线程 void start_sub_thread() { std::thread t{[]{ int cnt = getRandom(); print("sub thread:", std::this_thread::get_id(), "sleep", cnt, "s"); std::this_thread::sleep_for(std::chrono::seconds(getRandom())); print("sub thread:", std::this_thread::get_id(), "awaked"); }}; //without thread guard: terminate called without an active exception thread_guard threadGuard(t); //do something []{}(); //t.join(); } int main() { start_sub_thread(); return 0; } ``` ## 在后台运行线程 调用`std::thread`对象的`detach`成员会令线程在后台运行,使得线程分离,无法等待它完结,其归属权和控制权都转移给C++运行时库,`std::thread`对象不再关联实际的执行线程 我们在`detach`前同样需要调用`joinable`函数判断`std::thread`是否与线程关联 我们可以用分离线程实现守护线程 ## 向线程函数传递参数 传递参数的过程:**线程具有内部存储空间,参数会按照默认方式先复制到该处,然后再以右值的形式传递给可调用对象。** 传递给可调用对象的方式是使用`bind`重新生成一个可调用对象,然后再调用该对象 这种实现原理意味着我们可以给`thread`传递一个成员指针和对象指针,因为`bind`会将其变成一个正确的可调用对象 - 如果我们确实想要传递参数的拷贝,请确保参数的类型正确。例如我们如果想传递一个字符串常量给一个`string`类型,我们应该自己手动转换成`string`类型,否则`thread`构造函数会传入一个`const char *`类型的变量,而传入一个指针很可能是有风险的(如果所在函数退出,就会出现未定义的行为) - 如果我们确实想要传递一个引用,我们应该使用`std::bind`封装函数或者`std::ref`封装参数。如果我们直接传递一个变量,那么这个变量首先会被拷贝,然后再转换为右值传递给可调用对象,这可能会发生错误。 可以使用`unique_ptr`帮助我们管理动态内存 ## 移交线程归属权 `std::thread`只支持移动语义 我们不能给一个关联线程的`thread`对象赋值,会导致调用`std::terminate`函数终止程序 ### 返回一个`thread` 如果符合RVO(Return Value Optimization)的条件: - 返回值和要返回的局部对象类型相同 那么就会触发拷贝消除。如果不执行拷贝,会将返回值转化为右值。因此我们可以在函数中返回`thread`。 ### 传入一个`thread` 因为`thread`只支持移动语义,所以我们在传递给非引用形参时必须传递一个右值供其拷贝。 C++20中引入了`std::jthread`。其作用与`unique_thread`类似。 [noexcept](https://www.notion.so/noexcept-e579a8b20d294f348021e1d040570bbf) 我们可以将`std::thread`放在容器中统一进行管理。 ## 在运行时选择线程数量 - `std::thread::hardware_concurrency()`:表示程序在各次运行中可真正并发的线程数量(有可能返回0) 如果我们想要得到子线程的返回值,可以通过传递引用的方式记录返回值,或者使用`future` 记住在传递引用时我们要使用`std::ref`,这一点和`bind`的使用是相同的(严重怀疑`thread`内部就用的是`bind` ## 识别线程 线程ID:`std::thread::id`的获取方法: - 在与线程关联的`std::thread`上调用`get_id()`。如果没有关联任何线程,那么`get_id()`会返回一个默认构造的`std::thread::id`,表示线程不存在 - 当前线程的ID可以通过`std::this_thread::get_id()`获得 C++标准库允许我们任意判断两个线程ID是否相同,可以当作关联容器(包括无序容器,标准库有`std::hash`的定义)的键值 线程ID不具备语义意义 # 第3章 在线程间共享数据 race condition(条件竞争):条件竞争出现的根本原因是**破坏了不变量。** 恶性条件竞争普遍“挑剔”出现的时机:当程序在负载比较大的时候容易出现,但是在调试环境却无法察觉,而且很难重现。 防止恶性条件竞争的方法: 1. 互斥 2. 无锁编程:修改数据结构的设计及其不变量,由一连串不可拆分的改动完成数据变更,每个改动都维持不变量不被破坏。要求对C++内存模型十分熟稔,以及区分每个线程可以看到内存中的什么东西1 3. 软件事务内存STM,C++目前没有直接支持。不过我觉得用上面的方法可以实现事务,这不是一个语言的责任吧 ## 用互斥保护共享数据 互斥mutex:mutually exclusion 头文件:`` `std::mutex`创建互斥,`lock()`加锁,`unlock()`解锁 为了保证每条路径都能够记得`unlock`,我们用RAII的时候封装`std::mutex` 模板类`std::lock_guard<>`,在构造时加锁,析构时解锁 C++17引入了类模板参数推断,可以让我们省略类模板参数 正确的使用互斥锁的方式:**将互斥锁和受保护的数据组成一个类,并将其设置成`mutable`私有变量,将对数据的访问封装成成员方法。** 我们在设计接口时必须小心谨慎:**不得向锁所在的作用域之外传递指针和引用指向受保护的共享数据,无论是通过函数返回值还是函数参数。** ### 简单加锁是不够的 但是简单的加锁是不能解决问题的,更多的引起条件竞争的原因在于功能性的设计:考虑一个多线程环境下的栈`Stack` - 我们一般会在获取元素`top()`之前判断是否为空`empty()`。但是如果在`top`和`empty`之间有其他线程进行`pop`导致栈为空,那么我们去`top`的时候就会非法,一个解决方案是调用`top`时进行判断,如果为空就抛出异常。 - 如果我们有两个线程都想要获取并弹出栈顶元素,他们获取到同一个栈顶元素后然后再连续弹出两次,这就会导致有一个元素没有被看到就弹出了 我们可能会思考将两种方法合并成一种方法,例如:弹出并返回栈顶元素。但是如果在拷贝已经被弹出的栈顶元素的时候抛出了异常,那么我们就无法得到弹出的那个元素 ### 接口的设计很重要 为了解决上面的第二个问题,一般的方法(包括智能指针实现的COW)都是没有作用的,我们只能谨慎设计接口,甚至弱化接口来让接口变得简单,可控。 例如对上面的例子,我们为了避免`top`和`pop`方法产生的条件竞态度,唯一的办法就是去除掉`top`方法,只保留`pop`方法,将`top`方法和`pop`方法合并 然而这种合并却会导致新的问题:如果对象的拷贝会抛出异常,有可能我们在合并后的`pop`方法内首先用`top`方法获取栈顶元素,然后弹出栈顶元素,再将栈顶元素拷贝返回时发生异常,此时就会出现数据丢失的问题:栈顶元素已经无法获取,但是又被弹出。 为了解决这个问题,书中给出了三种解决方案: 1. 传入用于拷贝的内存:先在外部申请用来保存栈顶元素的内存,然后再将其传入`pop`方法,这样就不用担心弹出栈顶元素后出现的无法拷贝的情况。虽然可行,但是我的看法是这种做法很丑陋。 2. 要求对象可移动,避免出现无法拷贝的情况。因为栈顶元素已经从栈中弹出了,我们就可以返回其右值,而大多数对象的移动构造都是不会抛出异常的。我觉得这也是很符合直觉的,不过书中的说法是这样子没有移动构造并且拷贝构造会抛异常的对象就无法存入栈中,也不能说是一个非常好的做法。 3. 栈返回对象的智能指针(`shared_ptr`),不使用裸指针应该是不言而喻的:返回一个裸指针就要求使用者进行释放,而这个释放的时机很难把握,交给智能指针就不用担心这个问题。不得不说`shared_ptr`简直是处理多线程问题的神兵利器,因为其计数的原子性,我们不用担心在某个线程还在使用该对象的时候将其析构。返回智能指针就没有上面担心的数据丢失的问题:程序不可能缺少一个`shared_ptr`的空间,而且避免了拷贝的性能损耗。 书中推荐将方法1和其他方法混合使用,但是我觉得方法1非常丑陋。 代码实现: [](https://gitee.com/EdwardElric233/c-concurrent-programming/blob/master/component/Stack.h) ## 死锁 解决死锁问题的一般思路是:按相同的顺序对互斥加锁。 需要注意的是如果我们已经在某个`std::mutex`对象上获取锁,那么再次试图从该湖上获取锁将导致未定义的行为。 为了保证对两个互斥按照相同的顺序加锁,我们可以使用标准库的`std::lock`函数,他保证了传入的锁要么都上锁成功,要么都没有上锁,并抛出异常。 因为我们使用`std::lock`对`std::mutex`上锁,所以我们在使用`std::lock_guard`管理其解锁的时候必须传入`std::adopt_lock`来告诉`std::lock_guard`我们的锁已经上锁,不用在构造函数中再进行上锁。`std::lock`函数的语义是要么锁住所有互斥,要么 没有获取任何锁并抛出异常。关于`std::lock`的详细解释如下,实际上是一种back-off-and-retry的做法: [Is std::lock() ill-defined, unimplementable, or useless?](https://stackoverflow.com/questions/18520983/is-stdlock-ill-defined-unimplementable-or-useless) C++17以后应该使用`std::scoped_lock`来同时锁住多个互斥。 ```cpp #include #include #include using namespace std; int main() { std::mutex mtx1, mtx2; /* //C++ 11 std::lock(mtx1, mtx2); std::lock_guard lockGuard1(mtx1, std::adopt_lock); std::lock_guard lockGuard2(mtx2, std::adopt_lock); */ //C++17 std::scoped_lock scopedLock(mtx1, mtx2); cout << "Lock sucessfully" << endl; return 0; } ``` ## 防范死锁的补充准则 死锁发生的情况变化万千,而且不一定和锁有关系:加入两个线程都在等待对方`join`,那么也会发生死锁 ### 避免嵌套锁 假如已经有了一个锁,就不要尝试去获取其他锁 如果有必要获取多个锁,使用`std::lock`或`std::scoped_lock`来一次加锁 ### 一旦持有锁,避免调用由用户提供的用户接口 用户是不可信的,谁也不知道他要干什么 ### 依从固定顺序获取锁 如果多个锁是绝对必要的而且无法使用`std::lock`一次加锁,则只能规定每个线程都依从固定顺序获取这些锁。 例如我们有个多线程链表,为了保证不发生死锁,我们只能从一个方向遍历链表。这里的方向规定的是对元素加锁的顺序必须是固定的。 ### 按层级加锁 明确每个互斥位于哪个层级。若某个线程已经对低层级互斥加锁,则不准再对高层级互斥加锁。 我们可以让自定义的互斥锁与`std::lock_guard`配合使用,只需要我们去实现`lock()、unlock()、try_lock()`即可。 `try_lock()`的含义:假如该锁已经上锁,则直接返回`false`,不等待;如果没有上锁,就上锁。`std::lock`就是根据`try_lock()`实现的 ```cpp class hierarchical_mutex { std::mutex mutex_; using value_type = unsigned long; const value_type value_; value_type pre_value_; //用来存储之前的cur_value,解锁后进行恢复,不用进行初始化 static thread_local value_type cur_value; //线程专属变量,表示当前线程层次锁的级别 void check_violation() { if (value_ >= cur_value) { //试图对更高级别的锁加锁,抛出错误 throw std::logic_error("mutex hierarchy violated"); } } void update_hierarchy() { pre_value_ = cur_value; cur_value = value_; } void rollback_hierarchy() { if (value_ != cur_value) { throw std::logic_error("mutex hierarchy violated"); } cur_value = pre_value_; } public: explicit hierarchical_mutex(value_type value) : value_(value) , mutex_() {} void lock() { check_violation(); mutex_.lock(); update_hierarchy(); } void unlock() { rollback_hierarchy(); mutex_.unlock(); } bool tyr_lock() { check_violation(); if (!mutex_.try_lock()) return false; update_hierarchy(); return true; } }; thread_local hierarchical_mutex::value_type hierarchical_mutex::cur_value = ULONG_MAX; //初始化为无穷大,可以对任意锁加锁 int main() { hierarchical_mutex mtx(100); { std::lock_guard lk(mtx); hierarchical_mutex mtx1(1000); std::lock_guard lk1(mtx1); } return 0; } ``` ## 运用`std::unique_lock<>`灵活加锁 `std::unique_lock`对象不一定始终占有与之关联的互斥,因为这份灵活性,需要占用一定的空间用来存储并更新互斥信息,同时效率略慢。 其构造函数接收第二个参数: - `std::adopt_lock`指明传入的互斥锁已经上锁 - `std::defer_lock`指明不对传入的互斥锁上锁,以后再上锁 `std::unique_lock`在析构时保证所关联的互斥解锁 因为`std::unique_lock`具有`lock()、unlock()、try_lock()`方法,所以可以传递给`std::lock`方法。 ```cpp int main() { std::mutex mtx1, mtx2; std::unique_lock lock1(mtx1, std::defer_lock), lock2(mtx2, std::defer_lock); std::lock(lock1, lock2); return 0; } ``` 对于上面的用法,最好还是使用C++17支持的`scoped_lock` `std::unique_lock`还支持转移锁的归属权 如果可以用`std::lock_guard`,最好还是用这个,效率略高 ## 在不同作用域之间转移互斥归属权 考虑到`lock_guard`和`unique_lock`都是以RAII的手法封装互斥锁来保证互斥锁的`unlock`,而我们是不允许多次`unlock`的,所以两者都不可拷贝的,然后后者是可以移动的,用以在函数之间转移互斥锁的控制权 ```cpp std::mutex mtx; std::unique_lock get_lock() { std::unique_lock lock(mtx); print("in get_lock"); return lock; } void process() { auto lock = get_lock(); print("in process"); } int main() { process(); return 0; } ``` 因为`std::unique_lock`与`std::mutex`的接口一致,而且能够保证在其析构时解锁,所以如果我们需要中途解锁(这一点是`std::lock_guard`做不到的,`std::lock_guard`只能接收未加锁的互斥并对其加锁,然后在析构时解锁),我们应该使用`std::unique_lock`替代 ## 按适合的粒度加锁 持锁期间应该避免任何耗时的操作,例如文件IO 如果只用单独一个互斥保护整个数据结构,不但可能加剧锁的争夺,还将难以缩短持锁时间。 ## 在初始化过程中保护数据 对于像单例模式那样的场景来讲,假如我们的单例是只读的(例如用来进行数据库的查询操作),那么我们就仅仅需要在数据初始化的时候保护数据(因为只有初始化的时候是写操作) ```cpp template std::shared_ptr singleton() { static std::shared_ptr p; if (!p) { p.reset(new T); } return p; } ``` 为了保护数据,我们考虑使用互斥锁进行保护 ```cpp template std::shared_ptr singleton() { static std::shared_ptr p; static std::mutex mtx; { std::lock_guard lock(mtx); if (!p) { p.reset(new T); } } return p; } ``` 为了进一步缩小临界区,提出了双检查锁 ```cpp template std::shared_ptr singleton() { static std::shared_ptr p; static std::mutex mtx; if (!p) { std::lock_guard lock(mtx); if (!p) { T *tp = new T; p.reset(tp); } } return p; } ``` 但是双检查锁因为指令重排的问题同样不可以解决问题,C++11提供了`std::once_flag`和`std::call_once()`函数,专门用来处理这个问题。 令所有线程共同调用`std::call_once()`函数,并且传入`std::once_flag`存储同步信息。相比使用互斥,`std::call_once()`的额外开销往往更低,特别是初始化已经完成的情况下 使用`std::call_once()`完成的单例模式模板: ```cpp #include #include template class SingleTon { using Ptr = std::shared_ptr; static Ptr p; static std::once_flag flag; public: template static Ptr getInstance(Args&& ...args) { auto init = [](Ptr &p, auto&& ...args1) { p.reset(new T(std::forward(args1)...)); }; std::call_once(flag, init, p, std::forward(args)...); return p; } }; template std::shared_ptr SingleTon::p; template std::once_flag SingleTon::flag; ``` 需要注意的是,`std::call_once`必须使用在多线程环境下,要求程序链接pthread库。详见博客: [C++ 单例模式 call_once : terminate called after throwing an instance of 'std::system_error'_月本_诚的博客-CSDN博客](https://blog.csdn.net/T_T233333333/article/details/124492256) 如我们所见,`std::call_once`除了传入`std::once_flag`外还可以传入可调用对象及其参数,同`std::thread`一样。 `std::once_flag`不可拷贝、不可移动 **C++11规定对局部静态变量的初始化只会在单一线程上独立发生,在初始化完成前,其他线程不会越过静态数据的声明而继续运行** 因此如果我们声明局部静态变量也可以实现上面的效果: ```cpp /* template T& get_singleton(Args&& ...args) {//错误的写法!!!因为每个get_instance模板函数的不同实例化指向的是不同的instance static T instance(std::forward(args)...); return instance; } template T& get_singleton() {//正确的写法 static T instance{}; return instance; } */ template class SingleTon { using Ptr = std::shared_ptr; public: static Ptr getInstance() { static Ptr p = std::make_shared(); //对于静态变量的初始化只会进行一次 return p; } }; ``` 但是我还是认为使用`std::call_once()`实现更加优雅. ~~因为我们可能不想要在变量未使用的时候占用内存(静态局部变量占用.bss段)。~~我忘记了模板函数只有在使用到的时候才被实例化,因此如果不进行实例化的话是不会占用额外内存的。 [C++中模板类的静态成员_肥肥胖胖是太阳的博客-CSDN博客_模板类的静态成员](https://blog.csdn.net/weixin_45590473/article/details/107371803) 所以说使用局部静态变量的缺点是不能够传入用于构造的参数。 ## 读写锁 C++14支持`std::shared_timed_mutex` C++17支持`std::shared_mutex` 前者相比后者支持的操作更多,但是后者相对性能更好。 - 使用`std::lock_guard`和`std::unique_lock`互斥访问 - 使用`std::shared_lock`实现共享访问(C++14),使用方式和`std::unique_lock`相同 多个线程可以同时共享访问`std::shared_mutex`,但是如果在读锁上获取写锁,会使得写锁阻塞,直到所有读锁释放,如果在写锁上获取读锁,自然读锁阻塞。 假如一个线程A的函数需要读锁,其内部运行的某个函数也需要读锁,另一个线程B需要写锁。那么在线程A获取读锁后,线程B获取写锁就会阻塞,线程A继续获取读锁也会阻塞 ```cpp #include #include #include #include #include using namespace std; void print() { cout << "\n"; } template void print(T&& first, Args&& ...args) { cout << first << " "; print(std::forward(args)...); } std::shared_mutex mtx; int step = 0; std::mutex cond_mtx; std::condition_variable cond; void read() { //step0: 读锁 shared_lock lock(mtx); unique_lock uniqueLock(cond_mtx); print("read lock 1"); //通知step0结束 ++step; cond.notify_all(); //等待step1: 写锁 结束 cond.wait(uniqueLock, []{ return step == 2; }); uniqueLock.unlock(); //step2: 再次读锁 shared_lock lock1(mtx); print("read lock 2"); } void write() { //等待step0: 读锁 结束 unique_lock uniqueLock(cond_mtx); cond.wait(uniqueLock, []{ return step == 1; }); uniqueLock.unlock(); //step1: 写锁 lock_guard lock(mtx); uniqueLock.lock(); print("write lock"); //通知step1结束 ++step; cond.notify_all(); uniqueLock.unlock(); } int main() { std::thread t_read{read}; std::thread t_write{write}; t_read.join(); t_write.join(); return 0; } ``` ## 递归加锁 `std::recursive_mutex`:允许同一线程对某个锁多次加锁,而且必须释放所有锁才可以让另一个线程上锁。 如果需要可重入的锁,那么很可能设计有问题。例如如果多个获取锁的函数需要互相调用,我们可以从这些函数中抽取出公共部分,作为私有函数,而这个私有函数被共有函数调用,并假定互斥已经锁住,无需加锁(`xxxWithLock()`)。 # 第4章 并发操作的同步 为了实现线程同步,我们有几种方式: 1. 设置一个被互斥保护的共享标志,等待线程轮询访问等待。 缺点:等待线程必须不断查验标志,浪费处理时间。如果等待线程互斥访问标志,限制其他线程修改标志 2. 让等待线程定期休眠(`std::this_thread::sleep_for()`),减少轮询次数 休眠时间难以预知:过长会导致延迟,过短影响效率,虚耗处理时间 3. 使用线程同步机制:条件变量、future(首选项) ## 4.1 条件变量 C++提供了两种条件变量的实现:`std::condition_variable`和`std::condition_variable_any`。前者只能和`std::mutex`配合使用,后者只需要符合互斥的标准即可。因为`std::condition_variable_any`更通用,所以可能产生额外的开销,如果没什么特殊需要,尽可能使用`std::condition_variable` **条件变量是非常重要的线程同步的手段**(目前我认为是最重要的),因此对其的深入理解至关重要。 - 条件变量总是和互斥一起配合使用,互斥用于保护共享数据,条件变量用于 1. 通知(通知线程) 2. 判断**该**共享数据是否满足条件(等待线程) - 通知线程往往先通过互斥保护共享数据,对数据进行一定的修改后再发送通知(`notify_one()、notify_all()`)。需要注意的是我们**应尽可能在临界区内发送通知**,从而避免可能出现的优先级翻转和条件变量失效问题。虽然临界区外通知可以让等待线程一旦被唤醒就能立即解锁互斥查看是否满足情况,但是在Pthread进行wait morphint后基本上两者没有性能上的差距。详细的分析可以参考博客:[条件变量用例–解锁与signal的顺序问题](https://blog.csdn.net/weixin_30374009/article/details/94981070)。 - `notify_one()`理论上只会唤醒一个等待线程,适用于共享变量**数量**发生变化的情况,例如通知消息队列中的消息个数增加。 - `notify_all()`会唤醒所有等待该条件变量的线程,适用于共享变量**状态**发生变化的情况,例如通知所有工作线程开始计算。 - 等待线程先获得互斥,然后将锁和判定条件传递给`wait`函数等待返回。 - `wait`函数首先会根据判断条件判断是否满足条件(返回`true`) - 如果满足条件,则直接返回(互斥依旧上锁) - 如果不满足条件,则阻塞等待,并解锁互斥(让其他线程得以修改共享数据的状态)。直到被`notify`函数唤醒,再次上锁,判断条件是否满足。这里的阻塞和解锁、唤醒和上锁都是原子的,就是为了避免两个动作分别执行出现的条件竞态。 1. 解锁和阻塞是原子的:lock → !pred() → unlock → sleep;如果变量的改变以及唤醒事件发生在unlock和sleep中间,那么你不会检测到,也就是错过了这次唤醒。假如下次唤醒依赖于此次唤醒的成功(也就是说不会主动唤醒第二次),那么将发生死锁。 2. 唤醒和上锁是原子的:wakeup → lock → !pred :如果条件在wakeup和lock之间从满足变成了不满足(不是因为其他等待线程修改,而是因为负责唤醒的线程自己再次修改了条件),那么此次唤醒将失败。假如后面条件的再次满足依赖于此次条件满足成功(也就是说条件不会再主动满足),那么将发生死锁。 需要理解的是上面的死锁的出现是有限定条件的(例如唤醒之间的依赖、条件满足的依赖),虽然大多数情况下没有这么严格的条件,但是工具本身需要避免这种危险的情况。 原子操作保证了重要的唤醒和条件满足都能够至少被一个等待线程看到。 - 可以看到`wait`函数内部需要解锁互斥,所以就不能使用不提供`unlock`函数的`lock_guard`,而应该使用和互斥有相同接口的`unique_lock`。 - 其实C++的线程库是对pthread库的封装,因此也可以像pthread库一样只传入互斥,解锁并等待通知,一旦接收到通知后再上锁,然后在一个`while`循环中进行判断。 ```cpp while (!pred()) { cond_.wait(lk); //调用pthread_cond_wait } ``` 对于传入判定条件的版本,其实内部也是这样的一个封装罢了。 - 之所以说`notify_one()`理论上只会唤醒一个等待线程是因为存在调用一次`notify_one()`却唤醒了多个线程的可能性,甚至有时候没有调用`notify`等待线程都被唤醒,称这种意外唤醒等待线程的情况为**伪唤醒**。按照C++标准的规定,这种伪唤醒出现的数量和频率都不确定,因此要求等待线程的判定函数不能有副作用(可重用),并且需要在唤醒后再次判断条件是否满足,如果不满足则需要重新等待。这也是为什么上面的代码使用`while`进行条件判断而不是`if`的原因。 [生产者消费者模型](https://www.notion.so/48e6478ce9c346dd9432a0d118ba2b61) ## 4.2 使用future等待一次性事件发生 # 第5章 C++内存模型和原子操作 C++新标准引入 了新的线程感知的内存模型,内存模型精确定义 了基础构建单元应该如何运转。 ## 5.1 内存模型基础 内存模型牵涉两个方面:基本结构和并发。基本结构关系到整个程序在内存中的布局,就C++而言,归根结底,基本结构就是对象和内存区域。 C++标准中将对象定义为某一存储范围,程序的数据全部都由对象构成。 不论对象属于什么类型,它都会存储在一个或多个内存区域中。每个内存区域要么是对象,属于标量(scalar type)类型,如unsigned short,要么是一串连续的位域(bit field)。 尽管相邻的位域分属不同对象,但照相算作同一内存区域。 为了保证多个线程对于同一内存的访问互相之间没有冲突,要求同一时刻只有一个线程能够访问:要么使用互斥,要么使用原子操作。 **假设两个线程访问同一内存区域,却没有强制他们服从一定的访问次序,如果其中至少有一个是非原子化访问,并且至少有一个是写操作,就会出现数据竞争,导致未定义行为。** 如果我们都使用原子操作访问共享内存,就可以保证避免未定义行为。但是我们仍然无法保证数据竞争,因为我们无法指定访问内存的次序。 每个对象的改动序列由所有线程的全部写操作构成。 在不同的线程上观察同一变量的改动序列,如果不同,说明出现了数据竞争和未定义行为。 为了保障同一变量的改动序列的一致性,要求禁止某些预测执行。 ## 5.2 C++中的原子操作及其类别 原子操作是不可分割的操作:在系统的任一线程内,原子操作要么还没有开始,要么已经完成。 ### 5.2.1 标准原子类型 标准原子类型的定义位于头文件中,这些类型的操作全都是原子化的。 标准原子类型全部都具备成员函数is_lock_free(),准许使用者判断该原子类型是否是无锁的。 从C++17开始,所有原子类型都含有一个编译期确定的静态常量成员变量is_always_lock_free,如果该类型在所有平台上都是无锁的,那么该值为true,否则为false。还有一些宏帮助我们去判断原子类型的无锁情况。 ```cpp void test_atomic_lock_free() { //g++ (Ubuntu 11.3.0-1ubuntu1~22.04) 11.3.0 print("atomic_bool", "always? :", atomic_bool::is_always_lock_free); //true print("atomic_bool", ":", atomic_bool().is_lock_free()); //true print("atomic_int", "always? :", atomic_int::is_always_lock_free); //true print("atomic_int", ":", atomic_int().is_lock_free()); //true print("atomic_char", "always? :", atomic_char::is_always_lock_free); //true print("atomic_char", ":", atomic_char().is_lock_free()); //true print("atomic", "always? :", atomic::is_always_lock_free); //true print("atomic", ":", atomic().is_lock_free()); //true print("atomic", "always? :", atomic::is_always_lock_free); //true print("atomic", ":", atomic().is_lock_free()); //true shared_ptr p; print("atomic_shared_ptr", ":", atomic_is_lock_free(&p)); //false } /* atomic_bool always? : 1 atomic_bool : 1 atomic_int always? : 1 atomic_int : 1 atomic_char always? : 1 atomic_char : 1 atomic always? : 1 atomic : 1 atomic always? : 1 atomic : 1 atomic_shared_ptr : 0 */ ``` 从测试程序的结果看,大多数原子类型都是无锁的,在我这个平台上,普通的智能指针的原子操作不是无锁的。 原子类型std::atomic_flag不提供is_lock_free()成员函数,它是简单的布尔标志,一定是采取无锁操作。利用这种简单的无锁布尔标志,我们就能实现一个简单的锁,进而基于该锁实现其他所有原子类型。它只支持两种操作: - `test_and_set()`:查值并设1 - `clear()`:清0 其余的原子类型都是通过类模板`std::atomic<>`特化得出的,功能更加齐全,但可能不属于无锁结构。他们还有对应的别名,std::atomic的别名就是atomic_T。 由于不具备拷贝构造函数和拷贝赋值操作符, **标准的原子类型对象无法复制,也无法赋值**。原因是原子类型的操作都是原子化的,但是拷贝操作都涉及两个对象,无法原子化。 但是他们可以、接受内建类型赋值,也可以隐式地转换为内建类型,还可以直接经由成员函数处理。 **赋值操作符的返回值是存入的值,具名成员函数的返回值则是操作前的值。**虽然赋值操作符习惯上返回引用,但是如果想要通过该引用查看存入的值,就会使得赋值和读取操作之间可能被其他线程修改,无法读取正确的值。 对于原子类型的每一种操作,我们都可以提供额外的参数,用于设定内存次序语义。如果没有传入,则默认采用最严格的内存次序。在5.3节将对于内存次序进行更加详细的探讨。 ### 5.2.2 操作std::atomic_flag std::atomic_flag必须由宏初始化为置零状态:`std::atomic_flag flag = ATOMIC_FLAG_INIT`。它仅仅支持三种操作:销毁(析构函数)、置零(`clear()`)、读取原有的值并置位(`test_and_set()`)。我们可以使用其构建自旋锁(spin lock):当flag处于置零状态,表示互斥未上锁,当flag处于置位状态,表示互斥上锁。 ```cpp #include namespace edward::multithread { class SpinLockMutex { std::atomic_flag flag_; public: SpinLockMutex(): flag_(ATOMIC_FLAG_INIT) {} void lock() { while (flag_.test_and_set(std::memory_order_acquire)); } void unlock() { flag_.clear(std::memory_order_release); } }; } ``` ### 5.2.3 操作std::atomic 相比std::atomic_flag,std::atomic是一个功能更加齐全的布尔标志,支持通过布尔值赋值,读取(`load`),写入(`store`),读-改-写(`exchange`),比较-交换(`compare_exchange_weak`和`compare_exchange_strong`)。其中**比较-交换操作是原子类型的编程基石**。 比较-交换操作传入两个参数,一个是期望值a,一个是修改值b。当调用比较-交换操作后: - 如果当前原子类型x的值和期望值a相同,就修改x的值为b,返回true - 如果当前原子类型x的值和期望值a不同,则修改 `compare_exchange_weak`和`compare_exchange_strong`的区别在于: - `compare_exchange_weak`可能存在原子类型x的值和期望值a相同,但是修改失败,这种情况下返回false,称这种现象为佯败(spurious failure),其原因在于有的处理器可能不能一条指令完成比较-交换操作,如果在执行的过程中发生了线程切换,就会导致失败。因此我们经常要配合循环使用`compare_exchange_weak` ```cpp bool expected = false; extern std::atomic flag; while (!flag.compare_exchange_weak(expected, true) && !expected); ``` - `compare_exchange_strong`可以确保当原子类型x的值和期望值a相同时,修改成功。我认为原因是其内部封装了`compare_exchange_weak`循环。 书中说,当修改值b的计算比较方便时,两者没有什么区别,如果需要计算不方便,使用`compare_exchange_strong`可以帮助我们保存修改值b。不过我觉得也可以自己保存,还是尽可能用`compare_exchange_weak`吧。 比较-交换函数还可以传入两个内存次序,分别用作成功和失败,其中后者不能比前者更加严格,原因是后者只是读操作,而前者是读-写操作。 std::atomic不保证是无锁的,我们可以调用成员函数is_lock_free进行检查,或者通过静态成员变量或者宏查看其是否是无锁的。 ### 5.2.4 操作标准整数原子类型 同样支持检查无锁情况(`is_lock_free`)、读出(`load`)、写入(`store`)、读-改-写(`exchange`)、比较-交换(`compare_exchange_weak`和`compare_exchange_strong`) 相比原子布尔类型,原子指针提供了算术运算符重载(+=、-=、++、—)和对应的成员函数(fetch_add和fetch_sub)。运算符返回修改后的值,成员函数返回修改前的值 ### 5.2.5 泛化的std::atomic<>类模板 对于类型T,我们实例化出std::atomic的条件是:T必须具备平实拷贝赋值操作符:不得包含虚函数,也不可以从虚基类派生,必须由编译器隐式生成拷贝赋值操作符,且比较-交换操作所采用的比较是**逐位**比较,如果类型T自定义了比较运算符也不会起作用。会有这么多限制的原因是编译器往往没有能力为自定义类型T生成无锁原子结构,因此它必须在内部运用锁保护所有操作,为了避免调用用户自定义操作导致死锁,编译器对类型T有严格的限制。 如果类型T的大小不超过int或void*的体积,那么大多数硬件平台都可以为std::atomic生成无锁指令。 由于浮点类型的比较操作比较复杂,可能有多种表示形式,简单使用按位比较可能会使值相等的比较失败,所以尽可能少用。 同时为了兼容C语言,C++还对原子类型提供了非成员函数,其用法和成员函数相同。特殊的,C++标准库还针对`std::shared_ptr`提供了非成员函数,使得能够原子化访问。并发技术规约还提供了`std::experimental::atomic_shared_ptr`,如果要使用原子共享指针,应该优先采用。 ## 5.3 同步操作和强制次序 同步关系:对于一个原子类型的操作是自洽的,没有数据竞争和不变量的破坏 先行关系:时序上的先后 原子类型的操作服从6种内存次序:memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel和memory_order_seq_cst。 其中memory_order_seq_cst是最严格的内存次序,各种原子类型的所有操作默认都遵从该次序。 这6种内存次序代表了3种模式:先后一致次序(memory_order_seq_cst)、获取-释放次序(memory_order_consume、memory_order_acquire、memory_order_release和memory_order_acq_rel)、宽松次序(memory_order_relaxed) 1. 先后一致次序 原子类型的改动序列对所有线程都是相同的。这种内存次序易于理解、符合直觉、易于编程,代价是保序操作会导致性能损失,因为它必须在多处理器之间维持全局操作次序(维护缓存一致性) 2. 宽松次序 如果采用宽松次序,那么原子类型上的操作不存在线程间同步关系。**同一个线程内的操作服从先行关系**。交换操作和比较-交换操作可以让我们获取该原子类型的最新状态(推测是因为有一次写操作),书中建议不要使用宽松次序 3. 获取-释放次序 原子化载入(load)为获取操作(memory_order_acquire),原子化存储(store)为释放操作(memory_order_release),原子化读-改-写为获取、释放操作(memory_order_acq_rel)。 不同线程对同一个变量的释放和获取构成同步。释放操作所存储的值必须为获取操作所见才可以有效同步,因此我们会**循环获取**,直到获取到的值和释放的值相同,构成同步。 如果原子操作对先后一致的要求不是很严格,那么由成对的获取-释放操作实现同步,开销会远低于由保序操作实现的全局一致顺序。这种做法很消耗脑力,要求周密思考线程间那些违背一般情况的行为,从而保证不出错,让程序服从施加的次序正确运行。 C++17标准不建议我们采用memory_order_consume次序,这里不进行学习,如果日后有需要再去了解。 我认为书中这样讲解内存次序非常费力,读者理解起来也非常费力,原因在于没有从本质上揭示不同内存次序意味着什么。可以理解作者是为了避免介绍缓存一致性等内存引入的复杂的硬件知识。但是正如Charles在MIT6.172开始阐述的,如果要很好地理解某个知识,必须要从比这个知识低一个层级的视角去认识。 虽然我目前还没有系统地学习CPU缓存相关的知识,但是我推测这些不同的内存次序的差异在于是否触发CPU缓存同步。先后一致次序每次存储都会触发缓存同步,而宽松次序则完全不会触发,获取-释放次序会在释放(存储)和获取(载入)的时候触发缓存同步,通过控制缓存同步的粒度,提升性能。 **在释放-获取同步中,如果中间还存在其他“读-改-写”操作,同步关系依然成立。**还不是很理解背后的工作原理,推测是因为每次同步都会刷新cpu缓存,所以中间的其他操作也会被同步 ```cpp #include "SpinLockMutex.h" #include "utils.h" #include #include #include #include using namespace std; atomic cnt = 0; vector arr; edward::multithread::SpinLockMutex mtx; constexpr int kMax = 20; void populate_queue() { for (int i = 0; i < kMax; ++i) arr.push_back(i); cnt.store(kMax, std::memory_order_release); } void consume_queue(int n) { int idx; while (true) { if ((idx = cnt.fetch_sub(1, std::memory_order_acquire)) <= 0) { //this_thread::sleep_for(chrono::seconds(1)); } else { lock_guard lk(mtx); edward::print(n, ":", arr[idx - 1]); } } } int main() { thread producer(populate_queue); thread consumer1(consume_queue, 1); thread consumer2(consume_queue, 2); producer.join(); consumer1.join(); consumer2.join(); return 0; } ``` 运行结果: > 1 : 19 2 : 18 1 : 17 2 : 16 1 : 15 2 : 14 1 : 13 2 : 12 1 : 11 2 : 10 1 : 9 2 : 8 1 : 7 2 : 6 1 : 5 2 : 4 1 : 3 2 : 2 1 : 1 2 : 0 > 如果不想通过原子操作施加内存次序(刷新CPU缓存),我们还可以通过栅栏(`std::atomic_thread_fence(memory_order)`)进行,当线程运行至栅栏处,它便对其他原子操作的次序产生作用(也就是刷新CPU缓存)。 我们利用原子操作或栅栏强制施行内存次序,**可以强制非原子操作服从内存次序**。如同上面例子中的arr数组一样:如果按照控制流程,线程A中非原子操作甲在原子操作乙之前执行,线程B中的原子操作丙在甲的原子操作乙后执行,那么非原子操作甲同样在操作丙前执行。 # 第6章 设计基于锁的并发数据结构 要使得数据结构可以被多个线程并发访问: - 要么只读不可变化,很多函数式编程语言都具有这样的特点(例如SML) - 要么设计精良,保证线程之间正确同步,多个改动并行无碍 ## 目标 我们称可以被多线程安全访问的特性叫做**线程安全**:多线程执行的操作无论异同,每个线程所见的数据结构都是自洽的;数据不会丢失或破坏,所有不变量始终成立,恶性条件竞争也不会出现。 除了要保证线程安全,我们设计并发数据结构的另一个目标是尽可能**减少串行化**(每个线程轮流串行访问受互斥保护的数据),**提高并行度**。 ## 注意事项 线程安全注意事项: - 谨慎设计数据结构的接口以避免固有的条件竞争,数据结构提供的操作应该完整、独立,而非零散的分解步骤 - 即使程序抛出异常,也要保证不变量不被破坏 - 避免在临界区外访问数据,避免嵌套锁→避免死锁 - **数据结构的使用场景** 提高并行度注意事项: - 尽量缩小临界区的大小 - 不同接口尽量使用不同的互斥(在保证线程安全的前提下) ## 6.1 线程安全的栈 在第三章实现了一个线程安全的栈容器,这里进行分析: ```cpp // // Created by edward on 22-11-10. // #ifndef CONCURRENCE_STACK_H #define CONCURRENCE_STACK_H #include #include #include #include #include class EmptyStackException : std::exception { const char * what() const noexcept override { return "Empty Stack!"; } }; namespace edward::multithread { /*! * 线程安全的栈,底层使用std::stack进行实现 * @tparam T 栈中元素类型 * 使用mutex配合简单的lock_guard进行互斥保护,之所以不用COW是因为剩下的接口都是写接口,没有读接口,所以不需要 */ template class Stack { using Ptr = std::shared_ptr; using LockGuard = std::lock_guard; public: //接口相对于std::stack简化了许多,简化的接口让我们得以确保互斥能针对完整操作而锁定 Stack() = default; Stack(const Stack& rhs) { LockGuard lock(rhs.mtx_); data_ = rhs.data_; } Stack& operator= (const Stack&) = delete; //将赋值运算符删除 TODO:? //有在考虑要不要让栈中保存shared_ptr,想了想没什么不同了 void push(T value) { LockGuard lock(mtx_); data_.push(std::move(value)); } Ptr pop() { LockGuard lock(mtx_); if (data_.empty()) throw EmptyStackException(); //觉得要是元素支持移动还是不要这么做,好丑 Ptr const ret{std::make_shared(data_.top())}; //先拷贝一份,然后再返回其指针,避免再次拷贝出现的异常导致数据丢失。 data_.pop(); return ret; } bool empty() const { LockGuard lock(mtx_); return data_.empty(); } private: std::stack> data_; //stack底层使用vector实现,默认是deque mutable std::mutex mtx_; //必须是mutable的,用于保护const Stack的线程安全 }; } #endif //CONCURRENCE_STACK_H ``` 用上面的注意事项进行分析: - 每个操作内部都首先进行互斥,保证了任何时刻仅有唯一的线程访问数据→线程安全 - 为了避免`top`接口和`pop`接口之间的数据竞争,取消`top`接口 - 使用RAII保证解锁的std::lock_guard保证绝不遗漏数据的解锁操作 - push操作有可能遇到内存分配不足的异常,由std::stack<>保证安全 - pop操作有可能抛出异常(在创建shared_ptr的时候),由C++标准库保证不会发生内存泄漏,而且此时未改动栈容器,没有破坏不变量 archive 即使如此,上述实现仍然存在死锁的可能:如果容器保存的对象的构造函数中调用了栈容器的成员函数,就会导致加锁两次。这种情况无法避免,只能要求用户不能在对象的方法中调用栈容器的方法。 并且需要由用户保证:在构造完成之前,其他线程不得访问;在全部线程停止访问后才可销毁容器。 ## 6.2 线程安全的队列 ### 6.2.1 单个互斥保护的队列 第四章通过使用锁和条件变量实现了一个线程安全的队列: ```cpp // // component/Queue/Queue_v0.h // Created by edward on 22-11-16. // #ifndef CONCURRENCE_QUEUE_V0_H #define CONCURRENCE_QUEUE_V0_H #include #include #include namespace edward::multithread { /*! * 线程安全的队列,底层使用std::queue进行实现 * @tparam T 队列的元素类型 * 使用互斥和条件变量保证线程安全 */ template class Queue { public: using Type = T; Queue() = default; ~Queue() = default; Queue(const Queue& other) { std::lock_guard(other.mtx_); queue_ = other.queue_; } Queue& operator= (const Queue& other) { std::scoped_lock lk(mtx_, other.mtx_); //使用scoped_lock一次加锁,避免死锁 queue_ = other.queue_; } void swap(Queue &other) { std::scoped_lock lk(mtx_, other.mtx_); queue_.swap(other.queue_); } void push(T t) { std::lock_guard lk(mtx_); //互斥保护数据 queue_.push(std::move(t)); cond_.notify_one(); //在临界区内notify } T pop() noexcept { //要求T的移动操作不抛出异常——如果T要抛出异常,那么应该考虑转换接口,类似Stack实现。详细内容见《C++并发编程实战》第二版 3.2节相关内容 std::unique_lock lk(mtx_); cond_.wait(lk, [&](){ return !queue_.empty(); }); T frnt = std::move(queue_.front()); queue_.pop(); return frnt; } T tryPop() { //如果不想阻塞等待,就要处理可能发生的异常,书中是通过返回布尔值,但是我觉得那样有些丑 std::lock_guard lk(mtx_); if (queue_.empty()) throw std::logic_error("pop a empty queue!!!"); T frnt = std::move(queue_.front()); queue_.pop(); return frnt; } bool empty() const { std::lock_guard lk(mtx_); return queue_.empty(); } int size() const { std::lock_guard lk(mtx_); return queue_.size(); } private: mutable std::mutex mtx_; mutable std::condition_variable cond_; std::queue queue_; }; } #endif //CONCURRENCE_QUEUE_V0_H ``` 其中的push和tryPop方法和前面的栈容器类似,不同之处在于我这里的tryPop方法没有处理异常,觉得有RVO(返回值优化)的情况下大部分时间都不会出现异常。 pop方法使用条件变量等待队列不为空,并且在检查期间获取互斥,保证线程安全。 但是此时还是存在条件竞态:如果一个线程调用push方法,通知另一个线程pop,但是出现了异常(第54行),那么该对象就无法被正常获取,为了避免这种情况: 1. push方法中使用notify_all进行通知,这样子会出现惊群现象 2. pop方法捕获异常后再次通知(notify_one)其他线程pop 3. 避免pop方法出现异常,方法就是在队列中保存(智能)指针而不是对象本身,这样就不会因为拷贝/移动出现异常 我们这里采用第三种方法,从根本上解决问题。 ```cpp // // component/Queue/Queue_v1.h // Created by edward on 22-11-16. // #ifndef CONCURRENCE_QUEUE_V1_H #define CONCURRENCE_QUEUE_V1_H #include #include #include namespace edward::multithread { /*! * 线程安全的队列,底层使用std::queue进行实现 * @tparam T 队列的元素类型 * 使用互斥和条件变量保证线程安全 */ template class Queue { public: using Type = T; using Ptr = std::shared_ptr; Queue() = default; ~Queue() = default; Queue(const Queue& other) { std::lock_guard(other.mtx_); queue_ = other.queue_; } Queue& operator= (const Queue& other) { std::scoped_lock lk(mtx_, other.mtx_); //使用scoped_lock一次加锁,避免死锁 queue_ = other.queue_; } void swap(Queue &other) { std::scoped_lock lk(mtx_, other.mtx_); queue_.swap(other.queue_); } void push(T t) { Ptr p = std::make_shared(std::move(t)); std::lock_guard lk(mtx_); //互斥保护数据 queue_.push(std::move(p)); cond_.notify_one(); //在临界区内notify } Ptr pop() noexcept { //要求T的移动操作不抛出异常——如果T要抛出异常,那么应该考虑转换接口,类似Stack实现。详细内容见《C++并发编程实战》第二版 3.2节相关内容 std::unique_lock lk(mtx_); cond_.wait(lk, [&](){ return !queue_.empty(); }); Ptr frnt = std::move(queue_.front()); queue_.pop(); return frnt; } Ptr tryPop() { //如果不想阻塞等待,就要处理可能发生的异常,书中是通过返回布尔值,但是我觉得那样有些丑 std::lock_guard lk(mtx_); if (queue_.empty()) throw std::logic_error("pop a empty queue!!!"); Ptr frnt = std::move(queue_.front()); queue_.pop(); return frnt; } bool empty() const { std::lock_guard lk(mtx_); return queue_.empty(); } int size() const { std::lock_guard lk(mtx_); return queue_.size(); } private: mutable std::mutex mtx_; mutable std::condition_variable cond_; std::queue queue_; }; } #endif //CONCURRENCE_QUEUE_V1_H ``` 使用智能指针进行存储的另一个好处是可以将内存分配放在临界区外,因为内存分配往往是比较耗时的操作,将其放在临界区外有利于增强性能。 虽然上面的实现已经解决了许多的问题,实现了并发编程的第一个目标:线程安全,但是其并发程度并不高,根本原因在于队列只有一个唯一的互斥保护整个数据结构,实际上同一个时刻只允许一个线程操作队列数据。为了提高并发度,我们必须掌控数据结构的实现细节,提供粒度更精细的锁。 ### 6.2.2 多个互斥保护的队列 简单的想法是使用链表实现队列,分别用互斥保护头节点和尾节点,一般的实现是用head指针指向头部元素,tail指针指向尾部元素,push的时候向tail指针后添加元素,修改tail指针;pop的时候返回head指向元素,修改head指针。 ```cpp // // Created by Administrator on 2022/12/20. // #ifndef CONCURRENCE_QUEUE_T0_H #define CONCURRENCE_QUEUE_T0_H #include //使用普通链表实现的单线程队列 template class Queue { using Type = T; using Ptr = std::shared_ptr; struct Node { Ptr data_; std::unique_ptr next_; Node(Ptr t): data_(std::move(t)) {} }; using NodePtr = std::unique_ptr; NodePtr head_; Node* tail_ = nullptr; public: Queue(const Queue&) = delete; Queue& operator= (const Queue&) = delete; Ptr tryPop() { if (!head_) return Ptr{}; Ptr const ret = head_->data_; //head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值 NodePtr tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next head_ = std::move(tmpHead->next_); if (!head_) { tail_ = nullptr; } return ret; } void push(T t) { NodePtr node{new Node{std::make_shared(std::move(t))}}; Node *newTail = node.get(); if (tail_) { tail_->next_ = std::move(node); } else { head_ = std::move(node); } tail_ = newTail; } }; #endif //CONCURRENCE_QUEUE_T0_H ``` 由于只能通过将head和tail赋null来判断队列是否为空,所以push和pop的时候都会同时需要修改head和tail,而这样子就要求我们同时用互斥保护,这样就和用一个互斥保护没有什么区别了。或许可以通过条件判断在合适的时候加锁来避免同时加两把锁。但是当队列中仅有一个元素的时候,head_和tail_会指向相同的元素,这时虽然是两个不同的互斥,但是还是会有条件竞争。 不过说实话这些看起来很简单的代码如果单单靠眼睛盯着书本看,虽然当时觉得自己理解了,但是自己写起来才发现有很多细节,果然书上的代码要手敲一遍才能真正理解,否则看到后面越来越觉得云里雾里,难以理解。 上面的实现方式中,由于需要维护队列为空的信息,在push和pop中都同时需要对head和tail进行修改。如果我们使用一个哑节点(dummy node)就可以避免这种复杂的判断情况。不同于一般链表为了方便在链表头部添加元素在链表头部设置哑节点,这里为了方便在链表尾部添加元素在链表尾部设置哑节点,当head和tail都指向哑节点时说明队列为空。(不需要修改指针状态为null,只需要向后移动) ```cpp // // Created by Administrator on 2022/12/20. // #ifndef CONCURRENCE_QUEUE_T1_H #define CONCURRENCE_QUEUE_T1_H #include //使用普通链表实现的单线程队列 template class Queue { using Type = T; using Ptr = std::shared_ptr; struct Node { Ptr data_; std::unique_ptr next_; Node() {} Node(Ptr t): data_(std::move(t)) {} }; using NodePtr = std::unique_ptr; NodePtr head_; Node* tail_; public: Queue(): head_(new Node), tail_(head_.get()) {} Queue(const Queue&) = delete; Queue& operator= (const Queue&) = delete; Ptr tryPop() { if (head_.get() == tail_) return Ptr{}; Ptr const ret = head_->data_; //head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值 NodePtr tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next head_ = std::move(tmpHead->next_); return ret; } void push(T t) { auto newData = std::make_shared(std::move(t)); NodePtr node{new Node}; tail_->data_ = newData; Node* const newTail = node.get(); tail_->next_ = std::move(node); //unique_ptr只能移动赋值、构造,不能拷贝 tail_ = newTail; } }; #endif //CONCURRENCE_QUEUE_T1_H ``` 通过添加哑节点的方式,我们避免了之前为了记录队列为空的状态复杂的分支判断,并且pop函数仅仅修改head指针,push函数仅仅修改tail指针。虽然pop函数还需要读取tail指针,但是并不会修改其状态。这样子同样避免了push和pop操作同一个节点的条件竞争问题,队列非空情况下,head指针和tail指针都不会处理同一个节点,队列为空的情况下,pop操作会直接返回,不会和push进行竞争。接下来我们进行合适的加锁令其变成线程安全的队列。 ```cpp // // Created by Administrator on 2022/12/21. // #ifndef CONCURRENCE_QUEUE_V2_H #define CONCURRENCE_QUEUE_V2_H #include #include namespace edward::multithread { //精细粒度锁的线程安全队列 template class Queue { using Type = T; using Ptr = std::shared_ptr; struct Node { Ptr data_; std::unique_ptr next_; }; using NodePtr = std::unique_ptr; NodePtr head_; node* tail_; std::mutex mtxHead_, mtxTail_; public: Queue() : head_(new Node), tail_(head_.get()) {} Queue(const Queue&) = delete; Queue& operator= (const Queue&) = delete; void push(T t) { Ptr data = std::make_shared(std::move(t)); //将内存分配等操作放在临界区外 NodePtr dummyNode {new Node}; Node* const newTail = dummyNode.get(); std::lock_guard lk(mtxTail_); //C++17支持模板类的模板参数推断 tail_->data_ = data; tail_->next_ = std::move(dummyNode); tail_ = newTail; } Ptr tryPop() { NodePtr tmpHead_; //在临界区外释放头节点 { std::lock_guard lk(mtxHead_); if (std::lock_guard lk1(mtxTail_); head_.get() == tail_) { return Ptr{}; } //head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值 tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next head_ = std::move(tmpHead->next_); } return tmpHead_->data_; //将函数返回放在临界区外 } }; } #endif //CONCURRENCE_QUEUE_V2_H ``` 和书上代码6.6的实现大同小异,书中将tryPop函数中获取头节点的部分封装成了函数,我这里用局部作用域也能实现相同的效果。在pop函数中为了判断队列是否为空,在if语句中首先上锁,这种在if语句中声明变量的写法在C++17以后被支持。 那么上面的队列是否是线程安全? 1. 是否破坏步变量?在pop函数中,为了判断队列是否为空,我们要获取tail指针的值,需要对tail指针进行加锁,这个操作是必要的,避免了潜在的数据竞争。如果不上锁,不同线程可能会看到不同内存状态的tail指针。而且我们必须首先锁上head指针后,才能上锁获取tail指针的状态,因为head指针和tail指针都在向后移动,如果我们在锁上head指针之前获取tail指针的状态A后其他线程移动head指针到A的后面,那么就无法判断队列是否为空(永远不相等),就会破坏不变量。(真晦涩,书里面写的绕来绕去的,把我都看懵了) 2. 异常处理很简单,由于使用了智能指针,并且在临界区外,不会破坏不变量。 3. 并行度也很高,临界区内没有多余的操作(书中有详细的讨论) 上面的实现没有阻塞等待pop的部分,接下来我们配合条件变量实现这一部分,完成整个线程安全的队列 ```cpp // // Created by Administrator on 2022/12/21. // #ifndef CONCURRENCE_QUEUE_V2_H #define CONCURRENCE_QUEUE_V2_H #include #include #include namespace edward::multithread { //精细粒度锁的线程安全队列 template class Queue { using Type = T; using Ptr = std::shared_ptr; public: Queue() : head_(new Node), tail_(head_.get()) {} Queue(const Queue&) = delete; Queue& operator= (const Queue&) = delete; void push(T t) { Ptr data = std::make_shared(std::move(t)); //将内存分配等操作放在临界区外 NodePtr dummyNode {new Node}; Node* const newTail = dummyNode.get(); { std::lock_guard lk(mtxTail_); //C++17支持模板类的模板参数推断 tail_->data_ = data; tail_->next_ = std::move(dummyNode); tail_ = newTail; } cond_.notify_one(); } Ptr tryPop() { NodePtr tmpHead; //在临界区外释放头节点 { std::lock_guard lk(mtxHead_); if (head_.get() == getTail()) { return Ptr{}; } //head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值 tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next head_ = std::move(tmpHead->next_); } return tmpHead->data_; //将函数返回放在临界区外 } Ptr pop() { NodePtr tmpHead; { std::unique_lock lk(mtxHead_); cond_.wait(lk, [this]() { head_.get() != getTail(); }); tmpHead = std::move(head_); head_ = std::move(tmpHead->next_); } return tmpHead->data_; } bool empty() const { std::lock_guard lk(mtxHead_); return head_.get() == getTail(); } private: struct Node { Ptr data_; std::unique_ptr next_; }; using NodePtr = std::unique_ptr; NodePtr head_; Node* tail_; mutable std::mutex mtxHead_, mtxTail_; mutable std::condition_variable cond_; Node* getTail() const { std::lock_guard lk(mtxTail_); return tail_; } }; } #endif //CONCURRENCE_QUEUE_V2_H ``` 主要就是在push函数后通知,在pop函数中等待条件变量,等待退出条件是队列不为空。这里还是把尾指针状态的获取封装了一下,使得程序看起来更加简洁。 书里面觉得有一些过度封装了,很简单的代码却被层层封装,当然还因为有两个相似的接口需要复用。不过我觉得另一个传入引用参数的形式非常丑,觉得保留返回智能指针的接口就可以了。 这个队列是无限队列,只要存在空闲内存,各个线程还是能够持续往队列添加新数据。与之对应的是有限队列,其最大长度在创建之时就已经确定。一旦有限队列容量已满,再试图向其压入数据就会阻塞,直到有数据弹出而产生空闲空间。有限队列可用于多线程的工作分配,能够依据待执行任务的数量,确保工作在各个线程中均匀分配。相比无限队列,有限队列能够防止某些线程向队列添加任务的速度过快,远远超过处理任务的速度,从而导致某个线程的任务堆积,而其他线程的任务饥饿。 ## 6.3 线程安全的链表 线程安全的链表需要支持的操作: - 添加数据 - 移除数据 - 查找数据 - 更新数据 要想提高链表的并发程度,我们就得采用精细粒度的锁操作,让每个节点都具有自己的互斥。 ```cpp // // Created by edward on 23-2-17. // #ifndef CONCURRENCE_LIST_H #define CONCURRENCE_LIST_H #include #include namespace edward::multithread { template class List { using Ptr = std::shared_ptr; struct Node; using NodePtr = std::unique_ptr; struct Node { std::mutex mtx_; Ptr data_; NodePtr next_; explicit Node(const Ptr &data = Ptr{}): data_(data) {} }; NodePtr head_; public: List(): head_(new Node) {} List(const List&) = delete; List& operator= (const List&) = delete; void add(T t) { NodePtr newNode(new Node(std::make_shared(std::move(t)))); std::lock_guard lk(head_->mtx_); newNode->next_ = std::move(head_->next_); head_->next_ = std::move(newNode); } template void for_each(Function f) { //考虑到多线程环境下可能引用失效,不使用万能引用,而是简单传值 Node *cur = head_.get(), *next; std::unique_lock lk(head_->mtx_); while (next = cur->next_.get()) { std::unique_lock nextLk(next->mtx_); lk.unlock(); f(next->data_); cur = next; lk = std::move(nextLk); } } template Ptr find_first_if(Predicate p) { Node *cur = head_.get(), *next; std::unique_lock lk(head_->mtx_); while (next = cur->next_.get()) { std::unique_lock nextLk(next->mtx_); lk.unlock(); if (p(next->data_)) { return next->data_; } cur = next; lk = std::move(nextLk); } return Ptr{}; } template void remove(Predicate p) { Node* cur = head_.get(), *next; std::unique_lock lk(head_->mtx_); while (next = cur->next_.get()) { std::unique_lock nextLk(next->mtx_); if (p(next->data_)) { NodePtr oldNext = std::move(cur->next_); //避免next节点的释放导致后面所有节点的释放 cur->next_ = std::move(next->next_); nextLk.unlock(); //这个锁已经没有用了,但还是释放掉 } else { lk.unlock(); //尽可能缩小临界区 cur = next; lk = std::move(nextLk); } } } }; } #endif //CONCURRENCE_LIST_H ``` 为了简化结构,这里的链表是单向链表(不用考虑加锁不同顺序导致死锁)。不过其实觉得多线程环境下,节点之间的相对顺序已经没有什么意义,双向链表没有必要。 头节点设置为哑节点,每次添加数据都添加到头部,只需要获取头节点一个的互斥就可以安全添加数据。遍历链表的过程需要锁住相邻的两个互斥,一旦我们锁住了下一个节点,就可以安全释放当前节点的锁,并且迭代进行这一过程。删除操作不会立即释放当前节点的锁,当判断当前节点需要删除时,需要对当前节点的next指针改动。这里有一个细节是当我们判断该节点需要删除时,使用一个临时的智能指针保存节点,然后在改动完链表后及时解锁,因为该智能指针离开作用域就是释放内存,包括前面已经上锁的互斥,如果不及时解锁会引发为定义行为。 通过这样设计,我们实现了精细粒度的互斥操作增加并发程度。由于是单向链表,所有线程加锁的顺序都一致,这也会导致如果某个线程阻塞在某个互斥上(由于用户传入的操作),就会使得其他线程无法越过那个互斥。这一点只能够要求用户对节点都采用非阻塞的操作。 # 第7章 设计无锁数据结构 在设计无锁数据结构时,我们需要极为小心、谨慎,因为他们的正确实现相当不容易,而导致代码出错的情形可能难以复现。 阻塞型算法/数据结构:使用了互斥、条件变量或 future进行同步操作的算法或数据结构,执行会导致线程阻塞,直到其他线程释放互斥、通知条件变量或为future对象填充结果。 非阻塞型(nonblocking)算法/数据结构:没有使用阻塞型库函数调用的算法和数据结构。在实践中,我们需要对非阻塞数据结构进行分类: - 无阻碍(obstruction-free):其他线程暂停就可以完成 - 无锁(lock-free):有限步骤内可以完成 - 免等(wait-free):不用等待其他线程 无锁数据结构: - 优点:最大限度实现并发 - 缺点:非常复杂,容易出错 - 无锁数据结构不会导致死锁,但有可能导致活锁:多个线程对同一数据的修改使得其他线程反复重新尝试,导致程序无法向前运行 ## 7.1 线程安全的无锁栈 尝试使用链表实现栈容器(之所以不使用数组是因为在无锁情况下,不再是互斥修改,多个线程都在修改的情况下顺序性无法保证) 添加数据:1.创建新节点 → 2.新节点的next指向头节点 → 3.更新头节点为新节点 在没有互斥保护的情况下,步骤2和3之间可能存在条件竞争。无锁的做法是首先保证对头节点的修改是原子的(头节点是原子类型),其次每次通过compare_exchange_weak函数查看是否有其他线程修改头节点,如果没有修改,就更新头节点,如果已经修改,则修改新节点的next指针的,再次尝试修改头节点。 ```cpp #include #include namespace edward::multithread { template class LockFreeStack { public: void push(T t) { Node *node = new Node(t, head_.load()); while (!head_.compare_exchange_weak(node->next_, node)); } private: struct Node { std::shared_ptr data_; Node *next_; Node(T t, Node* next = nullptr) : data_(std::make_shared(std::move(t))) , next_(next) {} }; std::atomic head_; }; } ``` 弹出数据的步骤:1.读取head → 2.读取head→next → 3. head改为head→next → 4.返回data → 5.删除旧节点 ```cpp #include #include namespace edward::multithread { template class LockFreeStack { using Ptr = std::shared_ptr; public: void push(T t) { Node *node = new Node(std::make_shared(std::move(t)), head_.load()); while (!head_.compare_exchange_weak(node->next_, node)); } Ptr pop() { Node* head = head_.load(); while(head && !head_.compare_exchange_weak(head, head->next_)); if (!head) return Ptr{}; //空栈 Ptr ret = head->data_; //delete head; return ret; } private: struct Node { Ptr data_; Node *next_; Node(Ptr data, Node* next = nullptr) : data_(data) , next_(next) {} }; std::atomic head_ = nullptr; }; } ``` 使用智能指针避免处理类型T的拷贝异常(详细的讨论见第3章线程安全栈的设计)。无锁设计的核心步骤就是,首先准备好要修改的数据,然后使用比较-交换原子操作不断判断数据是否满足不变量,如果满足就立即更新。 此时的栈容器在逻辑上已经没有问题了,但是还存在没有释放pop出的节点所占用内存所导致的内存泄漏的问题。 之所以不能在移动头节点后立即释放旧的头节点,是因为如果有多个线程同时pop,那么他们可能尝试pop同一个头节点,其中一个线程完成了修改,另一个线程还在执行while语句,使用旧的头节点的head→next进行判断,虽然一旦运行一次比较-交换操作就会更新head,但是第一次运行会RE。push函数虽然也会并行访问,但是push并不会访问已经存在于栈容器节点上的内容。 ### 7.1.1 内存回收 为了避免这种风险,书中介绍了一种内存回收机制:使用链表管理所有未删除的节点,如果只有一个线程调用pop函数,那么它除了要负责删除当前节点,还需要删除所有未删除的节点,如果有多个线程调用pop函数,那么我们无法判断是否可以安全删除,只能将其添加在链表上。 ```cpp #include #include namespace edward::multithread { template class LockFreeStack { using Ptr = std::shared_ptr; public: void push(T t) { Node *node = new Node(std::make_shared(std::move(t)), head_.load()); while (!head_.compare_exchange_weak(node->next_, node)); } Ptr pop() { ++threadsInPop_; Node* head = head_.load(); while(head && !head_.compare_exchange_weak(head, head->next_)); Ptr ret; //空栈返回空指针 //从节点提取数据,节点可能不立即删除,但是如果返回的数据一旦不需要(例如不保存返回值)就立即被删除 if (head) ret.swap(head->data_); try_reclaim(head); return ret; } private: struct Node { Ptr data_; Node *next_; Node(Ptr data, Node* next = nullptr) : data_(data) , next_(next) {} }; static void delete_nodes(Node *nodes) { Node *node; while (nodes) { node = nodes; nodes = nodes->next_; delete node; } } void try_reclaim(Node *oldHead) { if (threadsInPop_ == 1) { Node *deletedNodes = deletedNodes_.exchange(nullptr); //接手候删链表 if (!--threadsInPop_) { delete_nodes(deletedNodes); } else if (deletedNodes) { chain_pending_nodes(deletedNodes); } delete oldHead; } else { chain_pending_node(oldHead); --threadsInPop_; } } //将deletedNodes链表接在nodes链表后面 void chain_pending_nodes(Node* nodes) { Node *last = nodes; while (last->next_) { last = last->next_; } chain_pending_nodes(nodes, last); } void chain_pending_nodes(Node* first, Node* last) { last->next_ = deletedNodes_; while (!deletedNodes_.compare_exchange_weak(last->next_, first)); } //将deletedNodes链表接在节点n后面 void chain_pending_node(Node *n) { chain_pending_nodes(n, n); } std::atomic head_ = nullptr; //链表头节点,用来模拟栈 std::atomic threadsInPop_ = 0; //运行pop的线程个数 std::atomic deletedNodes_ = nullptr; //候删节点链表 }; } ``` 负责内存回收的代码主要是`try_reclaim`函数,它会判断当前有多少个线程调用pop函数,决定是否能够安全删除。如果调用try_reclaim函数时只有一个线程调用pop函数,那么当前节点oldHead是肯定可以释放的,原因在于**oldHead只能被pop函数的while语句执行通过前共享地访问**,一旦线程通过了while语句,那么得到的肯定是不同的oldHead,此时在try_reclaim中进行判断只有当前线程调动pop函数,说明其他线程已经通过while语句,因此可以释放。 代码的难点在于try_reclaim函数中,如果判断当前只有一个线程在执行pop函数,获取到未删除节点链表后,为什么还要判断是否有其他线程在调用pop函数?原因是,我们获取的这个未删除节点链表中的节点有可能有其他线程的pop函数正在访问,为了避免释放节点后导致Runtime Error,我们必须再判断一次是否有其他线程调用pop函数,如果没有,那么我们就可以认定得到的这个未删除节点链表上的节点都是可以安全释放的,如果有其他线程,那么我们只能再把这些节点放回去。 其中的辅助函数`chain_pending_nodes`是负责向未删除节点deletedNodes链表上添加节点的,其实现颇为精妙,关键的地方在于先准备好待添加节点链表的头部和尾部,然后将deletedNoeds添加在尾部,并更新deletedNodes为新链表,这里使用了`compare_exchange_weak`实现这一原子操作。 整个代码比较复杂,需要仔细揣摩。该内存回收机制在低负荷时能够良好运行,当在高负荷情况下,几乎任何时候都有线程调用pop函数,导致无法触发内存回收,未删除节点链表将无休止地增加,导致事实上的内存泄漏。这要求我们寻求不同的方法来回收节点。 ### 7.1.2 风险指针回收 若某节点仍被其他线程指涉,而我们依然删除它,便成了“冒险”动作。删除目标节点后,别的线程还持有指涉它的引用,还通过这一引用对其进行访问,这会导致程序产生未定义的行为(多半是RE)。为了避免这种情况,假设当前线程要访问某对象,而它却被别的线程删除,那就让前者设置一指涉目标对象的风险指针,以同志其他线程删除该对象将产生实质风险。若程序不再需要那个对象,风险指针则被清零。 ### 7.1.3 引用计数检测回收 若能够安全且精准地辨识哪些节点正被访问,以及知晓它们何时不再为线程所访问,我们即可将其删除。引用计数针对各个节点分别维护一个计数器,随时知悉访问它的线程数目,在没有人使用时将其删除,这正是我们所熟悉的智能指针的解决方案。但是虽然智能指针`shared_ptr`的实现有用到原子特性,但是对他的操作却不是原子的。 C++20针对std::atomic<>对std::shared_ptr进行特化,使得其在不具备平实拷贝语义的情况下能够正常使用。当前g++12开始支持`std::atomic>`(头文件),为此,我们在系统中安装g++12,并使用其进行编译。 ```bash sudo apt install g++-12 ``` ```cpp #include void test_atomic_lock_free() { shared_ptr p; print("atomic_shared_ptr", ":", atomic_is_lock_free(&p)); //false print("atomic", ":", std::atomic>().is_lock_free()); //false } ``` 然后我们就能完善代码使得其能够正确回收内存。 ```cpp #include #include namespace edward::multithread { template class LockFreeStack { using Ptr = std::shared_ptr; struct Node; using NodePtr = std::shared_ptr; public: void push(T t) { NodePtr node = std::make_shared(std::make_shared(std::move(t)), head_.load()); while (!head_.compare_exchange_weak(node->next_, node)); } Ptr pop() { NodePtr head = head_.load(); while(head && !head_.compare_exchange_weak(head, head->next_)); Ptr ret; //空栈返回空指针 //从节点提取数据,节点可能不立即删除,但是如果返回的数据一旦不需要(例如不保存返回值)就立即被删除 if (head) { head->next_.reset(); ret.swap(head->data_); } return ret; } private: struct Node { Ptr data_; NodePtr next_; Node(Ptr data, NodePtr next = nullptr) : data_(data) , next_(next) {} }; std::atomic head_ = nullptr; //链表头节点,用来模拟栈 }; } ``` 我们用带引用计数的智能指针shared_ptr来保存节点,这样就不同考虑节点的生存周期,它会在没有线程使用时自动析构。头节点仍然需要使用原子类型,多个线程都会对其进行修改。 我自己的实现与书中实现不同的地方有两点: 1. 在获取到旧的头节点后,清空旧节点的数据域(使用swap交换而不是直接赋值),确保数据的生存周期不会被意外延长 2. 节点的next指针使用普通智能指针而不是原子智能指针。我认为节点的next指针只会被正确使用一次,即使多个线程同时pop同一个节点,也只会是第一个线程用到正确的next指针,也就是说只会有一个线程写入(其他的线程都会在while循环中访问其他节点),我们仅仅需要保证节点有next指针不会导致RE即可。 根据我的测试,g++-12实现的原子智能指针不是无锁的,next指针使用原子类型无疑会影响性能 ### 7.1.4 施加内存模型 目前还没有搞清除内存模型的工作原理,日后再完成这部分的笔记。 ## 7.2 线程安全的无锁队列 ```cpp // // Created by edward on 23-2-13. // #ifndef CONCURRENCE_LOCKFREEQUEUE_H #define CONCURRENCE_LOCKFREEQUEUE_H #include namespace edward::multithread { template class LockFreeQueue { using Ptr = std::shared_ptr; struct Node; // using NodePtr = std::shared_ptr; using NodePtr = Node*; public: LockFreeQueue(): head_(new Node), tail_(head_.load()) {} ~LockFreeQueue() { //把尚存在队列的节点删除 NodePtr node = head_.load(), p; while (node) { p = node->next_; delete node; node = p; } } void push(T t) { // NodePtr newTail = std::make_shared(); Ptr data = std::make_shared(std::move(t)); NodePtr newTail = new Node(); NodePtr oldTail = tail_.load(); oldTail->data_.swap(data); oldTail->next_ = newTail; tail_.store(newTail); } Ptr tryPop() { NodePtr oldHead = head_.load(); if (oldHead == tail_.load()) return Ptr{}; //空队列 head_.store(oldHead->next_); Ptr ret; ret.swap(oldHead->data_); delete oldHead; //已经没有人可以看到了 return ret; } Ptr pop() { Ptr ret; while (!(ret = tryPop())) { //自旋 std::this_thread::yield(); } return ret; } bool empty() const { return head_ == tail_; } private: struct Node { Ptr data_; NodePtr next_; Node() = default; explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) {} }; std::atomic head_, tail_; }; } #endif //CONCURRENCE_LOCKFREEQUEUE_H ``` 上面的实现是简单的SPSC队列(单一生产者、单一消费者队列),借鉴第6章设计多个互斥保护的线程安全队列的经验,我们只需要把头节点head和尾节点tail用原子类型进行同步,因为生产者和消费者调用的是不同的函数,所以我们只需要保证head和tail之间的同步关系。 多线程环境下,多个线程都会调用pop和push函数,pop函数比较好处理,因为我们只需要读取一次数据(如同无锁栈的pop函数一样,将链表指针向后移动,不同的地方在于需要不断判断是否为空栈)。 主要的难点在于push函数,为了判断队列是否为空,设置尾指针指向哑节点(第6章已经进行讨论,否则就需要同时修改头指针和尾指针,不利于并行),每次push把数据放在哑结点的数据域中,然后再将尾指针向后移动。 一方面, 要保证push和pop操作的先行关系,即每次push修改尾指针后数据应该已经就绪(先保存数据),另一方面多线程环境下我们要保证操作的原子性,就只能原子地完成存储数据、修改尾指针的过程,这是简单的CAS操作无法做到的。 为了实现线程安全的push操作,有两种解决方案: - 允许空节点存在,每次push时存入两个节点,一个用来保存数据,一个用来设置哑结点。通过这种方式我们就能够将修改原子化,但是这样要求pop能够处理空节点,并且会浪费一倍的空间 - 要求对数据域的操作是原子的。这样就可以在每次push的时候先使用CAS修改数据域,一旦修改失败,说明其他线程已经push过了,那我们就重新获取尾指针,再次尝试对数据域的更新操作。当我们更新哑结点数据域但未更新尾指针时,其他线程调用pop就会阻塞在CSA操作处,直到我们更新尾指针。 需要注意的是这样我们就不能在pop时将头结点的数据域立即清空,可能会引发ABA问题。例如线程A和线程B同时调用push函数,线程A抢占成功,将自己的数据存入,线程B由于线程调度获取oldTail以后没有执行CAS就被挂起,线程C调用pop函数,此时的head指向oldTail(只有一个数据),更新头结点,读取数据后将数据域清空,此时线程B开始运行,它看到的oldTail已经被丢弃,已经被线程C访问过了,但是它并不知道,还尝试更新oldTail的数据域,结果更新成功(CAS操作仅判断数据域是否非空),但是数据保存失败,出现数据覆写现象,造成ABA问题。 我们可以将头结点的数据域设置为其他非空值,从而使得数据的生命周期不会因为节点的内存没有及时释放而意外延长。 ```cpp // // Created by edward on 23-2-13. // #ifndef CONCURRENCE_LOCKFREEQUEUE_H #define CONCURRENCE_LOCKFREEQUEUE_H #include namespace edward::multithread { template class LockFreeQueue { using Ptr = std::shared_ptr; struct Node; // using NodePtr = std::shared_ptr; using NodePtr = Node*; static Ptr tmpP; public: LockFreeQueue(): head_(new Node), tail_(head_.load()) {} ~LockFreeQueue() { /* //把尚存在队列的节点删除 NodePtr node = head_.load(), p; while (node) { p = node->next_; delete node; node = p; } */ } void push(T t) { // NodePtr newTail = std::make_shared(); Ptr data = std::make_shared(std::move(t)); NodePtr newTail = new Node(); NodePtr oldTail = tail_.load(); Ptr oldData = nullptr; while (!oldTail->data_.compare_exchange_weak(oldData, data)) { oldTail = tail_.load(); oldData.reset(); } oldTail->next_ = newTail; tail_.store(newTail); } Ptr tryPop() { NodePtr oldHead = head_.load(); if (oldHead == tail_.load()) return Ptr{}; //空队列 while (!head_.compare_exchange_weak(oldHead, oldHead->next_)) { if (oldHead == tail_.load()) return Ptr{}; //空队列 } Ptr ret = oldHead->data_.load(); oldHead->data_ = tmpP; // delete oldHead; //已经没有人可以看到了 return ret; } Ptr pop() { Ptr ret; while (!(ret = tryPop())) { //自旋 std::this_thread::yield(); } return ret; } bool empty() const { return head_.load() == tail_.load(); } private: struct Node { std::atomic data_; NodePtr next_; Node() = default; explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) {} }; std::atomic head_, tail_; }; template LockFreeQueue::Ptr LockFreeQueue::tmpP = std::make_shared(); } #endif //CONCURRENCE_LOCKFREEQUEUE_H ``` 正如上面所讲,我们不能立即释放节点的内存,即使节点的数据已经被读出,但是可能有其他线程看到的是旧的节点,等待着通过CAS操作失败进行更新。为了管理内存,我们尝试使用带引用计数的智能指针`shared_ptr`管理节点内存,并使用C++20开始支持的`std::atomic>`管理头尾指针,保证线程安全。 ```cpp // // Created by edward on 23-2-13. // #ifndef CONCURRENCE_LOCKFREEQUEUE_H #define CONCURRENCE_LOCKFREEQUEUE_H #include "utils.h" #include namespace edward::multithread { template class LockFreeQueue { using Ptr = std::shared_ptr; struct Node; using NodePtr = std::shared_ptr; // using NodePtr = Node*; static Ptr tmpP; public: LockFreeQueue(): head_(std::make_shared()), tail_(head_.load()) {} ~LockFreeQueue() { /* //把尚存在队列的节点删除 NodePtr node = head_.load(), p; while (node) { p = node->next_; delete node; node = p; } */ } void push(T t) { // NodePtr newTail = std::make_shared(); Ptr data = std::make_shared(std::move(t)); NodePtr newTail = std::make_shared(); NodePtr oldTail = tail_.load(); Ptr oldData = nullptr; while (!oldTail->data_.compare_exchange_weak(oldData, data)) { oldTail = tail_.load(); oldData.reset(); } oldTail->next_ = newTail; tail_.store(newTail); } Ptr tryPop() { NodePtr oldHead = head_.load(); if (oldHead == tail_.load()) return Ptr{}; //空队列 while (!head_.compare_exchange_weak(oldHead, oldHead->next_)) { if (oldHead == tail_.load()) return Ptr{}; //空队列 } Ptr ret = oldHead->data_.load(); oldHead->data_ = tmpP; // delete oldHead; //已经没有人可以看到了 return ret; } Ptr pop() { Ptr ret; while (!(ret = tryPop())) { //自旋 std::this_thread::yield(); } return ret; } bool empty() const { return head_ == tail_; } private: struct Node { std::atomic data_; NodePtr next_; Node() { // edward::print("ctor"); } explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) { // edward::print("ctor"); } ~Node() { // edward::print("dtor"); } }; std::atomic head_, tail_; }; template LockFreeQueue::Ptr LockFreeQueue::tmpP = std::make_shared(); } #endif //CONCURRENCE_LOCKFREEQUEUE_H ``` ## 7.3 实现无锁数据结构的原则 1. 在原型设计中使用std::memory_order_seq_cst次序 2. 使用无锁的内存回收方案 3. 防范ABA问题 4. 找出忙等循环,协助其他线程