本仓库主要是学习《C++并发编程实战》这本书的实战代码总结以及其他并发编程的项目汇总。
仓库的按照如下结构进行组织:
同步的《C++并发编程实战》阅读笔记:
多个或同时独立进行的活动
两种并发:硬件所支持的并发任务数量(决定因素)、任务切换
分离关注点:简化各个线程的内部逻辑,将线程间交互得以限定于代码中可明确辨识的切入点,而无须将不同任务的逻辑交错散置
这样做与CPU的内核数量无关,因为用线程分离关注点提升的是系统设计,而不是运算吞吐量
提升性能
带来的收益(性能提升、模块耦合度降低)对不起维护多线程代码的复杂的时候
为了提高系统性能,以可用的硬件并发资源作为依据调整运行线程的数目
低抽象损失
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello Concurrent World\n";
}
int main() {
std::thread t(hello);
t.join(); //主线程等待子线程退出
return 0;
}
//-lpthead 在Linux下使用thread库需要链接pthread库
当main
函数返回时,整个进程结束,程序就会退出。
如果我们默认构造std::thread
,那么它将不会和任何线程关联。
我们通过给std::thread
传入可调用对象以及他所需要的参数,来设置子线程的入口(entry point)。**函数对象会被拷贝到属于新线程的存储空间中,并在那里被调用。**由此可以推断thread
封装了这个可调用对象。
在传入这个可调用对象时要注意一个问题:如果我们想传入一个临时函数对象,我们可能会有这样的写法:
class Func {
void operator() () {}
};
int main() {
std::thread t(Func());
t.join();
//Member reference base type 'std::thread (Func (*)())' is not a structure or union
return 0;
}
第6行我们的意思是传入一个临时的Func函数对象供线程运行,但是却会被看作一个函数声明:参数是一个函数指针,函数指针指向的是一个返回Func,没有参数的函数;返回值是一个thread。为了解决这个问题我们有两种解决方案:
class Func {
void operator() () {}
};
int main() {
//std::thread t((Func()));
std::thread t{Func()};
t.join();
return 0;
}
Func()
外围添加一个圆括号为了确保子线程正常运行(不被其他线程影响),我们应该让线程函数自含。
默认情况下如果thread
析构,而且线程是joinable
的,那么会导致程序崩溃(调用std::terminate
);如果线程是detach
后的,那么即使线程类析构,线程还是会继续在后台运行
调用线程类的join
方法:主线程等待子线程结束,并回收子线程资源
detach
方法:设定子线程分离,系统会在子线程运行结束后自动回收子线程资源,不会导致僵尸线程
如果子线程启动后主线程函数发生异常导致无法执行join
函数,那么有可能导致僵尸线程。为了避免这种情况,我们应该用RAII的手法封装线程类,使得对线程的join
无论如何都能够正确执行:将join
放在析构函数中。
为了能够保证等待子线程join
,我们可以使用RAII的手法进行如下封装:
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
#include <random>
std::mutex io_mutex;
inline void print() {
std::cout << std::endl;
}
template<typename T, typename... Args>
void print(T&& t, Args&&... args) {
std::cout << t << " ";
print(std::forward<Args>(args)...);
}
inline int getRandom(int min_ = 0, int max_ = 10) {
static std::default_random_engine e(std::chrono::system_clock::now().time_since_epoch().count());
static std::uniform_int_distribution<> u;
u.param(decltype(u)::param_type(min_, max_));
return u(e);
}
class thread_guard {
std::thread &t_;
public:
explicit thread_guard(std::thread& t)
: t_(t) {}
~thread_guard() {
if (t_.joinable()) t_.join();
}
thread_guard(const thread_guard&) = delete;
thread_guard &operator= (const thread_guard&) = delete;
};
//启动子线程
void start_sub_thread() {
std::thread t{[]{
int cnt = getRandom();
print("sub thread:", std::this_thread::get_id(), "sleep", cnt, "s");
std::this_thread::sleep_for(std::chrono::seconds(getRandom()));
print("sub thread:", std::this_thread::get_id(), "awaked");
}};
//without thread guard: terminate called without an active exception
thread_guard threadGuard(t);
//do something
[]{}();
//t.join();
}
int main() {
start_sub_thread();
return 0;
}
调用std::thread
对象的detach
成员会令线程在后台运行,使得线程分离,无法等待它完结,其归属权和控制权都转移给C++运行时库,std::thread
对象不再关联实际的执行线程
我们在detach
前同样需要调用joinable
函数判断std::thread
是否与线程关联
我们可以用分离线程实现守护线程
传递参数的过程:线程具有内部存储空间,参数会按照默认方式先复制到该处,然后再以右值的形式传递给可调用对象。
传递给可调用对象的方式是使用bind
重新生成一个可调用对象,然后再调用该对象
这种实现原理意味着我们可以给thread
传递一个成员指针和对象指针,因为bind
会将其变成一个正确的可调用对象
string
类型,我们应该自己手动转换成string
类型,否则thread
构造函数会传入一个const char *
类型的变量,而传入一个指针很可能是有风险的(如果所在函数退出,就会出现未定义的行为)std::bind
封装函数或者std::ref
封装参数。如果我们直接传递一个变量,那么这个变量首先会被拷贝,然后再转换为右值传递给可调用对象,这可能会发生错误。可以使用unique_ptr
帮助我们管理动态内存
std::thread
只支持移动语义
我们不能给一个关联线程的thread
对象赋值,会导致调用std::terminate
函数终止程序
thread
如果符合RVO(Return Value Optimization)的条件:
那么就会触发拷贝消除。如果不执行拷贝,会将返回值转化为右值。因此我们可以在函数中返回thread
。
thread
因为thread
只支持移动语义,所以我们在传递给非引用形参时必须传递一个右值供其拷贝。
C++20中引入了std::jthread
。其作用与unique_thread
类似。
我们可以将std::thread
放在容器中统一进行管理。
std::thread::hardware_concurrency()
:表示程序在各次运行中可真正并发的线程数量(有可能返回0)如果我们想要得到子线程的返回值,可以通过传递引用的方式记录返回值,或者使用future
记住在传递引用时我们要使用std::ref
,这一点和bind
的使用是相同的(严重怀疑thread
内部就用的是bind
线程ID:std::thread::id
的获取方法:
std::thread
上调用get_id()
。如果没有关联任何线程,那么get_id()
会返回一个默认构造的std::thread::id
,表示线程不存在std::this_thread::get_id()
获得C++标准库允许我们任意判断两个线程ID是否相同,可以当作关联容器(包括无序容器,标准库有std::hash<std::thread::id>
的定义)的键值
线程ID不具备语义意义
race condition(条件竞争):条件竞争出现的根本原因是破坏了不变量。
恶性条件竞争普遍“挑剔”出现的时机:当程序在负载比较大的时候容易出现,但是在调试环境却无法察觉,而且很难重现。
防止恶性条件竞争的方法:
互斥mutex:mutually exclusion
头文件:<mutex>
std::mutex
创建互斥,lock()
加锁,unlock()
解锁
为了保证每条路径都能够记得unlock
,我们用RAII的时候封装std::mutex
模板类std::lock_guard<>
,在构造时加锁,析构时解锁
C++17引入了类模板参数推断,可以让我们省略类模板参数
正确的使用互斥锁的方式:将互斥锁和受保护的数据组成一个类,并将其设置成mutable
私有变量,将对数据的访问封装成成员方法。
我们在设计接口时必须小心谨慎:不得向锁所在的作用域之外传递指针和引用指向受保护的共享数据,无论是通过函数返回值还是函数参数。
但是简单的加锁是不能解决问题的,更多的引起条件竞争的原因在于功能性的设计:考虑一个多线程环境下的栈Stack
我们一般会在获取元素top()
之前判断是否为空empty()
。但是如果在top
和empty
之间有其他线程进行pop
导致栈为空,那么我们去top
的时候就会非法,一个解决方案是调用top
时进行判断,如果为空就抛出异常。
如果我们有两个线程都想要获取并弹出栈顶元素,他们获取到同一个栈顶元素后然后再连续弹出两次,这就会导致有一个元素没有被看到就弹出了
我们可能会思考将两种方法合并成一种方法,例如:弹出并返回栈顶元素。但是如果在拷贝已经被弹出的栈顶元素的时候抛出了异常,那么我们就无法得到弹出的那个元素
为了解决上面的第二个问题,一般的方法(包括智能指针实现的COW)都是没有作用的,我们只能谨慎设计接口,甚至弱化接口来让接口变得简单,可控。
例如对上面的例子,我们为了避免top
和pop
方法产生的条件竞态度,唯一的办法就是去除掉top
方法,只保留pop
方法,将top
方法和pop
方法合并
然而这种合并却会导致新的问题:如果对象的拷贝会抛出异常,有可能我们在合并后的pop
方法内首先用top
方法获取栈顶元素,然后弹出栈顶元素,再将栈顶元素拷贝返回时发生异常,此时就会出现数据丢失的问题:栈顶元素已经无法获取,但是又被弹出。
为了解决这个问题,书中给出了三种解决方案:
pop
方法,这样就不用担心弹出栈顶元素后出现的无法拷贝的情况。虽然可行,但是我的看法是这种做法很丑陋。shared_ptr
),不使用裸指针应该是不言而喻的:返回一个裸指针就要求使用者进行释放,而这个释放的时机很难把握,交给智能指针就不用担心这个问题。不得不说shared_ptr
简直是处理多线程问题的神兵利器,因为其计数的原子性,我们不用担心在某个线程还在使用该对象的时候将其析构。返回智能指针就没有上面担心的数据丢失的问题:程序不可能缺少一个shared_ptr
的空间,而且避免了拷贝的性能损耗。书中推荐将方法1和其他方法混合使用,但是我觉得方法1非常丑陋。
代码实现:
解决死锁问题的一般思路是:按相同的顺序对互斥加锁。
需要注意的是如果我们已经在某个std::mutex
对象上获取锁,那么再次试图从该湖上获取锁将导致未定义的行为。
为了保证对两个互斥按照相同的顺序加锁,我们可以使用标准库的std::lock
函数,他保证了传入的锁要么都上锁成功,要么都没有上锁,并抛出异常。
因为我们使用std::lock
对std::mutex
上锁,所以我们在使用std::lock_guard
管理其解锁的时候必须传入std::adopt_lock
来告诉std::lock_guard
我们的锁已经上锁,不用在构造函数中再进行上锁。std::lock
函数的语义是要么锁住所有互斥,要么 没有获取任何锁并抛出异常。关于std::lock
的详细解释如下,实际上是一种back-off-and-retry的做法:
Is std::lock() ill-defined, unimplementable, or useless?
C++17以后应该使用std::scoped_lock
来同时锁住多个互斥。
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int main() {
std::mutex mtx1, mtx2;
/*
//C++ 11
std::lock(mtx1, mtx2);
std::lock_guard<std::mutex> lockGuard1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> lockGuard2(mtx2, std::adopt_lock);
*/
//C++17
std::scoped_lock scopedLock(mtx1, mtx2);
cout << "Lock sucessfully" << endl;
return 0;
}
死锁发生的情况变化万千,而且不一定和锁有关系:加入两个线程都在等待对方join
,那么也会发生死锁
假如已经有了一个锁,就不要尝试去获取其他锁
如果有必要获取多个锁,使用std::lock
或std::scoped_lock
来一次加锁
用户是不可信的,谁也不知道他要干什么
如果多个锁是绝对必要的而且无法使用std::lock
一次加锁,则只能规定每个线程都依从固定顺序获取这些锁。
例如我们有个多线程链表,为了保证不发生死锁,我们只能从一个方向遍历链表。这里的方向规定的是对元素加锁的顺序必须是固定的。
明确每个互斥位于哪个层级。若某个线程已经对低层级互斥加锁,则不准再对高层级互斥加锁。
我们可以让自定义的互斥锁与std::lock_guard
配合使用,只需要我们去实现lock()、unlock()、try_lock()
即可。
try_lock()
的含义:假如该锁已经上锁,则直接返回false
,不等待;如果没有上锁,就上锁。std::lock
就是根据try_lock()
实现的
class hierarchical_mutex {
std::mutex mutex_;
using value_type = unsigned long;
const value_type value_;
value_type pre_value_; //用来存储之前的cur_value,解锁后进行恢复,不用进行初始化
static thread_local value_type cur_value; //线程专属变量,表示当前线程层次锁的级别
void check_violation() {
if (value_ >= cur_value) {
//试图对更高级别的锁加锁,抛出错误
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy() {
pre_value_ = cur_value;
cur_value = value_;
}
void rollback_hierarchy() {
if (value_ != cur_value) {
throw std::logic_error("mutex hierarchy violated");
}
cur_value = pre_value_;
}
public:
explicit hierarchical_mutex(value_type value)
: value_(value)
, mutex_() {}
void lock() {
check_violation();
mutex_.lock();
update_hierarchy();
}
void unlock() {
rollback_hierarchy();
mutex_.unlock();
}
bool tyr_lock() {
check_violation();
if (!mutex_.try_lock()) return false;
update_hierarchy();
return true;
}
};
thread_local hierarchical_mutex::value_type hierarchical_mutex::cur_value = ULONG_MAX; //初始化为无穷大,可以对任意锁加锁
int main() {
hierarchical_mutex mtx(100);
{
std::lock_guard<hierarchical_mutex> lk(mtx);
hierarchical_mutex mtx1(1000);
std::lock_guard<hierarchical_mutex> lk1(mtx1);
}
return 0;
}
std::unique_lock<>
灵活加锁std::unique_lock
对象不一定始终占有与之关联的互斥,因为这份灵活性,需要占用一定的空间用来存储并更新互斥信息,同时效率略慢。
其构造函数接收第二个参数:
std::adopt_lock
指明传入的互斥锁已经上锁std::defer_lock
指明不对传入的互斥锁上锁,以后再上锁std::unique_lock
在析构时保证所关联的互斥解锁
因为std::unique_lock
具有lock()、unlock()、try_lock()
方法,所以可以传递给std::lock
方法。
int main() {
std::mutex mtx1, mtx2;
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock), lock2(mtx2, std::defer_lock);
std::lock(lock1, lock2);
return 0;
}
对于上面的用法,最好还是使用C++17支持的scoped_lock
std::unique_lock
还支持转移锁的归属权
如果可以用std::lock_guard
,最好还是用这个,效率略高
考虑到lock_guard
和unique_lock
都是以RAII的手法封装互斥锁来保证互斥锁的unlock
,而我们是不允许多次unlock
的,所以两者都不可拷贝的,然后后者是可以移动的,用以在函数之间转移互斥锁的控制权
std::mutex mtx;
std::unique_lock<std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx);
print("in get_lock");
return lock;
}
void process() {
auto lock = get_lock();
print("in process");
}
int main() {
process();
return 0;
}
因为std::unique_lock
与std::mutex
的接口一致,而且能够保证在其析构时解锁,所以如果我们需要中途解锁(这一点是std::lock_guard
做不到的,std::lock_guard
只能接收未加锁的互斥并对其加锁,然后在析构时解锁),我们应该使用std::unique_lock
替代
持锁期间应该避免任何耗时的操作,例如文件IO
如果只用单独一个互斥保护整个数据结构,不但可能加剧锁的争夺,还将难以缩短持锁时间。
对于像单例模式那样的场景来讲,假如我们的单例是只读的(例如用来进行数据库的查询操作),那么我们就仅仅需要在数据初始化的时候保护数据(因为只有初始化的时候是写操作)
template<typename T>
std::shared_ptr<T> singleton() {
static std::shared_ptr<T> p;
if (!p) {
p.reset(new T);
}
return p;
}
为了保护数据,我们考虑使用互斥锁进行保护
template<typename T>
std::shared_ptr<T> singleton() {
static std::shared_ptr<T> p;
static std::mutex mtx;
{
std::lock_guard<std::mutex> lock(mtx);
if (!p) {
p.reset(new T);
}
}
return p;
}
为了进一步缩小临界区,提出了双检查锁
template<typename T>
std::shared_ptr<T> singleton() {
static std::shared_ptr<T> p;
static std::mutex mtx;
if (!p) {
std::lock_guard<std::mutex> lock(mtx);
if (!p) {
T *tp = new T;
p.reset(tp);
}
}
return p;
}
但是双检查锁因为指令重排的问题同样不可以解决问题,C++11提供了std::once_flag
和std::call_once()
函数,专门用来处理这个问题。
令所有线程共同调用std::call_once()
函数,并且传入std::once_flag
存储同步信息。相比使用互斥,std::call_once()
的额外开销往往更低,特别是初始化已经完成的情况下
使用std::call_once()
完成的单例模式模板:
#include <memory>
#include <mutex>
template<typename T>
class SingleTon {
using Ptr = std::shared_ptr<T>;
static Ptr p;
static std::once_flag flag;
public:
template<typename ...Args>
static Ptr getInstance(Args&& ...args) {
auto init = [](Ptr &p, auto&& ...args1) {
p.reset(new T(std::forward<decltype(args1)>(args1)...));
};
std::call_once(flag, init, p, std::forward<Args>(args)...);
return p;
}
};
template<typename T>
std::shared_ptr<T> SingleTon<T>::p;
template<typename T>
std::once_flag SingleTon<T>::flag;
需要注意的是,std::call_once
必须使用在多线程环境下,要求程序链接pthread库。详见博客:
如我们所见,std::call_once
除了传入std::once_flag
外还可以传入可调用对象及其参数,同std::thread
一样。
std::once_flag
不可拷贝、不可移动
C++11规定对局部静态变量的初始化只会在单一线程上独立发生,在初始化完成前,其他线程不会越过静态数据的声明而继续运行
因此如果我们声明局部静态变量也可以实现上面的效果:
/*
template<typename T,typename ...Args>
T& get_singleton(Args&& ...args) {//错误的写法!!!因为每个get_instance模板函数的不同实例化指向的是不同的instance
static T instance(std::forward<Args>(args)...);
return instance;
}
template<typename T>
T& get_singleton() {//正确的写法
static T instance{};
return instance;
}
*/
template<typename T>
class SingleTon {
using Ptr = std::shared_ptr<T>;
public:
static Ptr getInstance() {
static Ptr p = std::make_shared<T>(); //对于静态变量的初始化只会进行一次
return p;
}
};
但是我还是认为使用std::call_once()
实现更加优雅.
~~因为我们可能不想要在变量未使用的时候占用内存(静态局部变量占用.bss段)。~~我忘记了模板函数只有在使用到的时候才被实例化,因此如果不进行实例化的话是不会占用额外内存的。
C++中模板类的静态成员_肥肥胖胖是太阳的博客-CSDN博客_模板类的静态成员
所以说使用局部静态变量的缺点是不能够传入用于构造的参数。
C++14支持std::shared_timed_mutex
C++17支持std::shared_mutex
前者相比后者支持的操作更多,但是后者相对性能更好。
std::lock_guard<std::shared_mutex>
和std::unique_lock<std::shared_mutex>
互斥访问std::shared_lock<std::shared_mutex>
实现共享访问(C++14),使用方式和std::unique_lock
相同多个线程可以同时共享访问std::shared_mutex
,但是如果在读锁上获取写锁,会使得写锁阻塞,直到所有读锁释放,如果在写锁上获取读锁,自然读锁阻塞。
假如一个线程A的函数需要读锁,其内部运行的某个函数也需要读锁,另一个线程B需要写锁。那么在线程A获取读锁后,线程B获取写锁就会阻塞,线程A继续获取读锁也会阻塞
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <shared_mutex>
using namespace std;
void print() {
cout << "\n";
}
template<typename T, typename... Args>
void print(T&& first, Args&& ...args) {
cout << first << " ";
print(std::forward<Args>(args)...);
}
std::shared_mutex mtx;
int step = 0;
std::mutex cond_mtx;
std::condition_variable cond;
void read() {
//step0: 读锁
shared_lock<std::shared_mutex> lock(mtx);
unique_lock<std::mutex> uniqueLock(cond_mtx);
print("read lock 1");
//通知step0结束
++step;
cond.notify_all();
//等待step1: 写锁 结束
cond.wait(uniqueLock, []{
return step == 2;
});
uniqueLock.unlock();
//step2: 再次读锁
shared_lock<std::shared_mutex> lock1(mtx);
print("read lock 2");
}
void write() {
//等待step0: 读锁 结束
unique_lock<std::mutex> uniqueLock(cond_mtx);
cond.wait(uniqueLock, []{
return step == 1;
});
uniqueLock.unlock();
//step1: 写锁
lock_guard<std::shared_mutex> lock(mtx);
uniqueLock.lock();
print("write lock");
//通知step1结束
++step;
cond.notify_all();
uniqueLock.unlock();
}
int main() {
std::thread t_read{read};
std::thread t_write{write};
t_read.join();
t_write.join();
return 0;
}
std::recursive_mutex
:允许同一线程对某个锁多次加锁,而且必须释放所有锁才可以让另一个线程上锁。
如果需要可重入的锁,那么很可能设计有问题。例如如果多个获取锁的函数需要互相调用,我们可以从这些函数中抽取出公共部分,作为私有函数,而这个私有函数被共有函数调用,并假定互斥已经锁住,无需加锁(xxxWithLock()
)。
为了实现线程同步,我们有几种方式:
设置一个被互斥保护的共享标志,等待线程轮询访问等待。
缺点:等待线程必须不断查验标志,浪费处理时间。如果等待线程互斥访问标志,限制其他线程修改标志
让等待线程定期休眠(std::this_thread::sleep_for()
),减少轮询次数
休眠时间难以预知:过长会导致延迟,过短影响效率,虚耗处理时间
使用线程同步机制:条件变量、future(首选项)
C++提供了两种条件变量的实现:std::condition_variable
和std::condition_variable_any
。前者只能和std::mutex
配合使用,后者只需要符合互斥的标准即可。因为std::condition_variable_any
更通用,所以可能产生额外的开销,如果没什么特殊需要,尽可能使用std::condition_variable
条件变量是非常重要的线程同步的手段(目前我认为是最重要的),因此对其的深入理解至关重要。
条件变量总是和互斥一起配合使用,互斥用于保护共享数据,条件变量用于
通知线程往往先通过互斥保护共享数据,对数据进行一定的修改后再发送通知(notify_one()、notify_all()
)。需要注意的是我们应尽可能在临界区内发送通知,从而避免可能出现的优先级翻转和条件变量失效问题。虽然临界区外通知可以让等待线程一旦被唤醒就能立即解锁互斥查看是否满足情况,但是在Pthread进行wait morphint后基本上两者没有性能上的差距。详细的分析可以参考博客:条件变量用例–解锁与signal的顺序问题。
notify_one()
理论上只会唤醒一个等待线程,适用于共享变量数量发生变化的情况,例如通知消息队列中的消息个数增加。
notify_all()
会唤醒所有等待该条件变量的线程,适用于共享变量状态发生变化的情况,例如通知所有工作线程开始计算。
等待线程先获得互斥,然后将锁和判定条件传递给wait
函数等待返回。
wait
函数首先会根据判断条件判断是否满足条件(返回true
)
如果满足条件,则直接返回(互斥依旧上锁)
如果不满足条件,则阻塞等待,并解锁互斥(让其他线程得以修改共享数据的状态)。直到被notify
函数唤醒,再次上锁,判断条件是否满足。这里的阻塞和解锁、唤醒和上锁都是原子的,就是为了避免两个动作分别执行出现的条件竞态。
需要理解的是上面的死锁的出现是有限定条件的(例如唤醒之间的依赖、条件满足的依赖),虽然大多数情况下没有这么严格的条件,但是工具本身需要避免这种危险的情况。
原子操作保证了重要的唤醒和条件满足都能够至少被一个等待线程看到。
可以看到wait
函数内部需要解锁互斥,所以就不能使用不提供unlock
函数的lock_guard
,而应该使用和互斥有相同接口的unique_lock
。
其实C++的线程库是对pthread库的封装,因此也可以像pthread库一样只传入互斥,解锁并等待通知,一旦接收到通知后再上锁,然后在一个while
循环中进行判断。
while (!pred()) {
cond_.wait(lk); //调用pthread_cond_wait
}
对于传入判定条件的版本,其实内部也是这样的一个封装罢了。
之所以说notify_one()
理论上只会唤醒一个等待线程是因为存在调用一次notify_one()
却唤醒了多个线程的可能性,甚至有时候没有调用notify
等待线程都被唤醒,称这种意外唤醒等待线程的情况为伪唤醒。按照C++标准的规定,这种伪唤醒出现的数量和频率都不确定,因此要求等待线程的判定函数不能有副作用(可重用),并且需要在唤醒后再次判断条件是否满足,如果不满足则需要重新等待。这也是为什么上面的代码使用while
进行条件判断而不是if
的原因。
C++新标准引入 了新的线程感知的内存模型,内存模型精确定义 了基础构建单元应该如何运转。
内存模型牵涉两个方面:基本结构和并发。基本结构关系到整个程序在内存中的布局,就C++而言,归根结底,基本结构就是对象和内存区域。
C++标准中将对象定义为某一存储范围,程序的数据全部都由对象构成。
不论对象属于什么类型,它都会存储在一个或多个内存区域中。每个内存区域要么是对象,属于标量(scalar type)类型,如unsigned short,要么是一串连续的位域(bit field)。
尽管相邻的位域分属不同对象,但照相算作同一内存区域。
为了保证多个线程对于同一内存的访问互相之间没有冲突,要求同一时刻只有一个线程能够访问:要么使用互斥,要么使用原子操作。
假设两个线程访问同一内存区域,却没有强制他们服从一定的访问次序,如果其中至少有一个是非原子化访问,并且至少有一个是写操作,就会出现数据竞争,导致未定义行为。
如果我们都使用原子操作访问共享内存,就可以保证避免未定义行为。但是我们仍然无法保证数据竞争,因为我们无法指定访问内存的次序。
每个对象的改动序列由所有线程的全部写操作构成。
在不同的线程上观察同一变量的改动序列,如果不同,说明出现了数据竞争和未定义行为。
为了保障同一变量的改动序列的一致性,要求禁止某些预测执行。
原子操作是不可分割的操作:在系统的任一线程内,原子操作要么还没有开始,要么已经完成。
标准原子类型的定义位于头文件中,这些类型的操作全都是原子化的。
标准原子类型全部都具备成员函数is_lock_free(),准许使用者判断该原子类型是否是无锁的。
从C++17开始,所有原子类型都含有一个编译期确定的静态常量成员变量is_always_lock_free,如果该类型在所有平台上都是无锁的,那么该值为true,否则为false。还有一些宏帮助我们去判断原子类型的无锁情况。
void test_atomic_lock_free() {
//g++ (Ubuntu 11.3.0-1ubuntu1~22.04) 11.3.0
print("atomic_bool", "always? :", atomic_bool::is_always_lock_free); //true
print("atomic_bool", ":", atomic_bool().is_lock_free()); //true
print("atomic_int", "always? :", atomic_int::is_always_lock_free); //true
print("atomic_int", ":", atomic_int().is_lock_free()); //true
print("atomic_char", "always? :", atomic_char::is_always_lock_free); //true
print("atomic_char", ":", atomic_char().is_lock_free()); //true
print("atomic<double>", "always? :", atomic<double>::is_always_lock_free); //true
print("atomic<double>", ":", atomic<double>().is_lock_free()); //true
print("atomic<int*>", "always? :", atomic<int*>::is_always_lock_free); //true
print("atomic<int*>", ":", atomic<int*>().is_lock_free()); //true
shared_ptr<int> p;
print("atomic_shared_ptr", ":", atomic_is_lock_free(&p)); //false
}
/*
atomic_bool always? : 1
atomic_bool : 1
atomic_int always? : 1
atomic_int : 1
atomic_char always? : 1
atomic_char : 1
atomic<double> always? : 1
atomic<double> : 1
atomic<int*> always? : 1
atomic<int*> : 1
atomic_shared_ptr : 0
*/
从测试程序的结果看,大多数原子类型都是无锁的,在我这个平台上,普通的智能指针的原子操作不是无锁的。
原子类型std::atomic_flag不提供is_lock_free()成员函数,它是简单的布尔标志,一定是采取无锁操作。利用这种简单的无锁布尔标志,我们就能实现一个简单的锁,进而基于该锁实现其他所有原子类型。它只支持两种操作:
test_and_set()
:查值并设1clear()
:清0其余的原子类型都是通过类模板std::atomic<>
特化得出的,功能更加齐全,但可能不属于无锁结构。他们还有对应的别名,std::atomic的别名就是atomic_T。
由于不具备拷贝构造函数和拷贝赋值操作符, 标准的原子类型对象无法复制,也无法赋值。原因是原子类型的操作都是原子化的,但是拷贝操作都涉及两个对象,无法原子化。
但是他们可以、接受内建类型赋值,也可以隐式地转换为内建类型,还可以直接经由成员函数处理。
**赋值操作符的返回值是存入的值,具名成员函数的返回值则是操作前的值。**虽然赋值操作符习惯上返回引用,但是如果想要通过该引用查看存入的值,就会使得赋值和读取操作之间可能被其他线程修改,无法读取正确的值。
对于原子类型的每一种操作,我们都可以提供额外的参数,用于设定内存次序语义。如果没有传入,则默认采用最严格的内存次序。在5.3节将对于内存次序进行更加详细的探讨。
std::atomic_flag必须由宏初始化为置零状态:std::atomic_flag flag = ATOMIC_FLAG_INIT
。它仅仅支持三种操作:销毁(析构函数)、置零(clear()
)、读取原有的值并置位(test_and_set()
)。我们可以使用其构建自旋锁(spin lock):当flag处于置零状态,表示互斥未上锁,当flag处于置位状态,表示互斥上锁。
#include <atomic>
namespace edward::multithread {
class SpinLockMutex {
std::atomic_flag flag_;
public:
SpinLockMutex(): flag_(ATOMIC_FLAG_INIT) {}
void lock() {
while (flag_.test_and_set(std::memory_order_acquire));
}
void unlock() {
flag_.clear(std::memory_order_release);
}
};
}
相比std::atomic_flag,std::atomic是一个功能更加齐全的布尔标志,支持通过布尔值赋值,读取(load
),写入(store
),读-改-写(exchange
),比较-交换(compare_exchange_weak
和compare_exchange_strong
)。其中比较-交换操作是原子类型的编程基石。
比较-交换操作传入两个参数,一个是期望值a,一个是修改值b。当调用比较-交换操作后:
compare_exchange_weak
和compare_exchange_strong
的区别在于:
compare_exchange_weak
可能存在原子类型x的值和期望值a相同,但是修改失败,这种情况下返回false,称这种现象为佯败(spurious failure),其原因在于有的处理器可能不能一条指令完成比较-交换操作,如果在执行的过程中发生了线程切换,就会导致失败。因此我们经常要配合循环使用compare_exchange_weak
bool expected = false;
extern std::atomic<bool> flag;
while (!flag.compare_exchange_weak(expected, true) && !expected);
compare_exchange_strong
可以确保当原子类型x的值和期望值a相同时,修改成功。我认为原因是其内部封装了compare_exchange_weak
循环。
书中说,当修改值b的计算比较方便时,两者没有什么区别,如果需要计算不方便,使用compare_exchange_strong
可以帮助我们保存修改值b。不过我觉得也可以自己保存,还是尽可能用compare_exchange_weak
吧。
比较-交换函数还可以传入两个内存次序,分别用作成功和失败,其中后者不能比前者更加严格,原因是后者只是读操作,而前者是读-写操作。
std::atomic不保证是无锁的,我们可以调用成员函数is_lock_free进行检查,或者通过静态成员变量或者宏查看其是否是无锁的。
同样支持检查无锁情况(is_lock_free
)、读出(load
)、写入(store
)、读-改-写(exchange
)、比较-交换(compare_exchange_weak
和compare_exchange_strong
)
相比原子布尔类型,原子指针提供了算术运算符重载(+=、-=、++、—)和对应的成员函数(fetch_add和fetch_sub)。运算符返回修改后的值,成员函数返回修改前的值
对于类型T,我们实例化出std::atomic的条件是:T必须具备平实拷贝赋值操作符:不得包含虚函数,也不可以从虚基类派生,必须由编译器隐式生成拷贝赋值操作符,且比较-交换操作所采用的比较是逐位比较,如果类型T自定义了比较运算符也不会起作用。会有这么多限制的原因是编译器往往没有能力为自定义类型T生成无锁原子结构,因此它必须在内部运用锁保护所有操作,为了避免调用用户自定义操作导致死锁,编译器对类型T有严格的限制。
如果类型T的大小不超过int或void*的体积,那么大多数硬件平台都可以为std::atomic生成无锁指令。
由于浮点类型的比较操作比较复杂,可能有多种表示形式,简单使用按位比较可能会使值相等的比较失败,所以尽可能少用。
同时为了兼容C语言,C++还对原子类型提供了非成员函数,其用法和成员函数相同。特殊的,C++标准库还针对std::shared_ptr
提供了非成员函数,使得能够原子化访问。并发技术规约还提供了std::experimental::atomic_shared_ptr<T>
,如果要使用原子共享指针,应该优先采用。
同步关系:对于一个原子类型的操作是自洽的,没有数据竞争和不变量的破坏
先行关系:时序上的先后
原子类型的操作服从6种内存次序:memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel和memory_order_seq_cst。
其中memory_order_seq_cst是最严格的内存次序,各种原子类型的所有操作默认都遵从该次序。
这6种内存次序代表了3种模式:先后一致次序(memory_order_seq_cst)、获取-释放次序(memory_order_consume、memory_order_acquire、memory_order_release和memory_order_acq_rel)、宽松次序(memory_order_relaxed)
先后一致次序
原子类型的改动序列对所有线程都是相同的。这种内存次序易于理解、符合直觉、易于编程,代价是保序操作会导致性能损失,因为它必须在多处理器之间维持全局操作次序(维护缓存一致性)
宽松次序
如果采用宽松次序,那么原子类型上的操作不存在线程间同步关系。同一个线程内的操作服从先行关系。交换操作和比较-交换操作可以让我们获取该原子类型的最新状态(推测是因为有一次写操作),书中建议不要使用宽松次序
获取-释放次序
原子化载入(load)为获取操作(memory_order_acquire),原子化存储(store)为释放操作(memory_order_release),原子化读-改-写为获取、释放操作(memory_order_acq_rel)。
不同线程对同一个变量的释放和获取构成同步。释放操作所存储的值必须为获取操作所见才可以有效同步,因此我们会循环获取,直到获取到的值和释放的值相同,构成同步。
如果原子操作对先后一致的要求不是很严格,那么由成对的获取-释放操作实现同步,开销会远低于由保序操作实现的全局一致顺序。这种做法很消耗脑力,要求周密思考线程间那些违背一般情况的行为,从而保证不出错,让程序服从施加的次序正确运行。
C++17标准不建议我们采用memory_order_consume次序,这里不进行学习,如果日后有需要再去了解。
我认为书中这样讲解内存次序非常费力,读者理解起来也非常费力,原因在于没有从本质上揭示不同内存次序意味着什么。可以理解作者是为了避免介绍缓存一致性等内存引入的复杂的硬件知识。但是正如Charles在MIT6.172开始阐述的,如果要很好地理解某个知识,必须要从比这个知识低一个层级的视角去认识。
虽然我目前还没有系统地学习CPU缓存相关的知识,但是我推测这些不同的内存次序的差异在于是否触发CPU缓存同步。先后一致次序每次存储都会触发缓存同步,而宽松次序则完全不会触发,获取-释放次序会在释放(存储)和获取(载入)的时候触发缓存同步,通过控制缓存同步的粒度,提升性能。
**在释放-获取同步中,如果中间还存在其他“读-改-写”操作,同步关系依然成立。**还不是很理解背后的工作原理,推测是因为每次同步都会刷新cpu缓存,所以中间的其他操作也会被同步
#include "SpinLockMutex.h"
#include "utils.h"
#include <atomic>
#include <thread>
#include <vector>
#include <mutex>
using namespace std;
atomic<int> cnt = 0;
vector<int> arr;
edward::multithread::SpinLockMutex mtx;
constexpr int kMax = 20;
void populate_queue() {
for (int i = 0; i < kMax; ++i) arr.push_back(i);
cnt.store(kMax, std::memory_order_release);
}
void consume_queue(int n) {
int idx;
while (true) {
if ((idx = cnt.fetch_sub(1, std::memory_order_acquire)) <= 0) {
//this_thread::sleep_for(chrono::seconds(1));
} else {
lock_guard lk(mtx);
edward::print(n, ":", arr[idx - 1]);
}
}
}
int main() {
thread producer(populate_queue);
thread consumer1(consume_queue, 1);
thread consumer2(consume_queue, 2);
producer.join();
consumer1.join();
consumer2.join();
return 0;
}
运行结果:
1 : 19 2 : 18 1 : 17 2 : 16 1 : 15 2 : 14 1 : 13 2 : 12 1 : 11 2 : 10 1 : 9 2 : 8 1 : 7 2 : 6 1 : 5 2 : 4 1 : 3 2 : 2 1 : 1 2 : 0
如果不想通过原子操作施加内存次序(刷新CPU缓存),我们还可以通过栅栏(std::atomic_thread_fence(memory_order)
)进行,当线程运行至栅栏处,它便对其他原子操作的次序产生作用(也就是刷新CPU缓存)。
我们利用原子操作或栅栏强制施行内存次序,可以强制非原子操作服从内存次序。如同上面例子中的arr数组一样:如果按照控制流程,线程A中非原子操作甲在原子操作乙之前执行,线程B中的原子操作丙在甲的原子操作乙后执行,那么非原子操作甲同样在操作丙前执行。
要使得数据结构可以被多个线程并发访问:
我们称可以被多线程安全访问的特性叫做线程安全:多线程执行的操作无论异同,每个线程所见的数据结构都是自洽的;数据不会丢失或破坏,所有不变量始终成立,恶性条件竞争也不会出现。
除了要保证线程安全,我们设计并发数据结构的另一个目标是尽可能减少串行化(每个线程轮流串行访问受互斥保护的数据),提高并行度。
线程安全注意事项:
提高并行度注意事项:
在第三章实现了一个线程安全的栈容器,这里进行分析:
//
// Created by edward on 22-11-10.
//
#ifndef CONCURRENCE_STACK_H
#define CONCURRENCE_STACK_H
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
#include <vector>
class EmptyStackException : std::exception {
const char * what() const noexcept override {
return "Empty Stack!";
}
};
namespace edward::multithread {
/*!
* 线程安全的栈,底层使用std::stack进行实现
* @tparam T 栈中元素类型
* 使用mutex配合简单的lock_guard进行互斥保护,之所以不用COW是因为剩下的接口都是写接口,没有读接口,所以不需要
*/
template<typename T>
class Stack {
using Ptr = std::shared_ptr<T>;
using LockGuard = std::lock_guard<std::mutex>;
public:
//接口相对于std::stack简化了许多,简化的接口让我们得以确保互斥能针对完整操作而锁定
Stack() = default;
Stack(const Stack& rhs) {
LockGuard lock(rhs.mtx_);
data_ = rhs.data_;
}
Stack& operator= (const Stack&) = delete; //将赋值运算符删除 TODO:?
//有在考虑要不要让栈中保存shared_ptr,想了想没什么不同了
void push(T value) {
LockGuard lock(mtx_);
data_.push(std::move(value));
}
Ptr pop() {
LockGuard lock(mtx_);
if (data_.empty()) throw EmptyStackException();
//觉得要是元素支持移动还是不要这么做,好丑
Ptr const ret{std::make_shared<T>(data_.top())}; //先拷贝一份,然后再返回其指针,避免再次拷贝出现的异常导致数据丢失。
data_.pop();
return ret;
}
bool empty() const {
LockGuard lock(mtx_);
return data_.empty();
}
private:
std::stack<T, std::vector<T>> data_; //stack底层使用vector实现,默认是deque
mutable std::mutex mtx_; //必须是mutable的,用于保护const Stack的线程安全
};
}
#endif //CONCURRENCE_STACK_H
用上面的注意事项进行分析:
top
接口和pop
接口之间的数据竞争,取消top
接口archive
即使如此,上述实现仍然存在死锁的可能:如果容器保存的对象的构造函数中调用了栈容器的成员函数,就会导致加锁两次。这种情况无法避免,只能要求用户不能在对象的方法中调用栈容器的方法。
并且需要由用户保证:在构造完成之前,其他线程不得访问;在全部线程停止访问后才可销毁容器。
第四章通过使用锁和条件变量实现了一个线程安全的队列:
//
// component/Queue/Queue_v0.h
// Created by edward on 22-11-16.
//
#ifndef CONCURRENCE_QUEUE_V0_H
#define CONCURRENCE_QUEUE_V0_H
#include <mutex>
#include <condition_variable>
#include <queue>
namespace edward::multithread {
/*!
* 线程安全的队列,底层使用std::queue进行实现
* @tparam T 队列的元素类型
* 使用互斥和条件变量保证线程安全
*/
template<typename T>
class Queue {
public:
using Type = T;
Queue() = default;
~Queue() = default;
Queue(const Queue& other) {
std::lock_guard(other.mtx_);
queue_ = other.queue_;
}
Queue& operator= (const Queue& other) {
std::scoped_lock lk(mtx_, other.mtx_); //使用scoped_lock一次加锁,避免死锁
queue_ = other.queue_;
}
void swap(Queue &other) {
std::scoped_lock lk(mtx_, other.mtx_);
queue_.swap(other.queue_);
}
void push(T t) {
std::lock_guard lk(mtx_); //互斥保护数据
queue_.push(std::move(t));
cond_.notify_one(); //在临界区内notify
}
T pop() noexcept { //要求T的移动操作不抛出异常——如果T要抛出异常,那么应该考虑转换接口,类似Stack实现。详细内容见《C++并发编程实战》第二版 3.2节相关内容
std::unique_lock lk(mtx_);
cond_.wait(lk, [&](){
return !queue_.empty();
});
T frnt = std::move(queue_.front());
queue_.pop();
return frnt;
}
T tryPop() { //如果不想阻塞等待,就要处理可能发生的异常,书中是通过返回布尔值,但是我觉得那样有些丑
std::lock_guard lk(mtx_);
if (queue_.empty()) throw std::logic_error("pop a empty queue!!!");
T frnt = std::move(queue_.front());
queue_.pop();
return frnt;
}
bool empty() const {
std::lock_guard lk(mtx_);
return queue_.empty();
}
int size() const {
std::lock_guard lk(mtx_);
return queue_.size();
}
private:
mutable std::mutex mtx_;
mutable std::condition_variable cond_;
std::queue<T> queue_;
};
}
#endif //CONCURRENCE_QUEUE_V0_H
其中的push和tryPop方法和前面的栈容器类似,不同之处在于我这里的tryPop方法没有处理异常,觉得有RVO(返回值优化)的情况下大部分时间都不会出现异常。
pop方法使用条件变量等待队列不为空,并且在检查期间获取互斥,保证线程安全。
但是此时还是存在条件竞态:如果一个线程调用push方法,通知另一个线程pop,但是出现了异常(第54行),那么该对象就无法被正常获取,为了避免这种情况:
我们这里采用第三种方法,从根本上解决问题。
//
// component/Queue/Queue_v1.h
// Created by edward on 22-11-16.
//
#ifndef CONCURRENCE_QUEUE_V1_H
#define CONCURRENCE_QUEUE_V1_H
#include <mutex>
#include <condition_variable>
#include <queue>
namespace edward::multithread {
/*!
* 线程安全的队列,底层使用std::queue进行实现
* @tparam T 队列的元素类型
* 使用互斥和条件变量保证线程安全
*/
template<typename T>
class Queue {
public:
using Type = T;
using Ptr = std::shared_ptr<T>;
Queue() = default;
~Queue() = default;
Queue(const Queue& other) {
std::lock_guard(other.mtx_);
queue_ = other.queue_;
}
Queue& operator= (const Queue& other) {
std::scoped_lock lk(mtx_, other.mtx_); //使用scoped_lock一次加锁,避免死锁
queue_ = other.queue_;
}
void swap(Queue &other) {
std::scoped_lock lk(mtx_, other.mtx_);
queue_.swap(other.queue_);
}
void push(T t) {
Ptr p = std::make_shared<T>(std::move(t));
std::lock_guard lk(mtx_); //互斥保护数据
queue_.push(std::move(p));
cond_.notify_one(); //在临界区内notify
}
Ptr pop() noexcept { //要求T的移动操作不抛出异常——如果T要抛出异常,那么应该考虑转换接口,类似Stack实现。详细内容见《C++并发编程实战》第二版 3.2节相关内容
std::unique_lock lk(mtx_);
cond_.wait(lk, [&](){
return !queue_.empty();
});
Ptr frnt = std::move(queue_.front());
queue_.pop();
return frnt;
}
Ptr tryPop() { //如果不想阻塞等待,就要处理可能发生的异常,书中是通过返回布尔值,但是我觉得那样有些丑
std::lock_guard lk(mtx_);
if (queue_.empty()) throw std::logic_error("pop a empty queue!!!");
Ptr frnt = std::move(queue_.front());
queue_.pop();
return frnt;
}
bool empty() const {
std::lock_guard lk(mtx_);
return queue_.empty();
}
int size() const {
std::lock_guard lk(mtx_);
return queue_.size();
}
private:
mutable std::mutex mtx_;
mutable std::condition_variable cond_;
std::queue<Ptr> queue_;
};
}
#endif //CONCURRENCE_QUEUE_V1_H
使用智能指针进行存储的另一个好处是可以将内存分配放在临界区外,因为内存分配往往是比较耗时的操作,将其放在临界区外有利于增强性能。
虽然上面的实现已经解决了许多的问题,实现了并发编程的第一个目标:线程安全,但是其并发程度并不高,根本原因在于队列只有一个唯一的互斥保护整个数据结构,实际上同一个时刻只允许一个线程操作队列数据。为了提高并发度,我们必须掌控数据结构的实现细节,提供粒度更精细的锁。
简单的想法是使用链表实现队列,分别用互斥保护头节点和尾节点,一般的实现是用head指针指向头部元素,tail指针指向尾部元素,push的时候向tail指针后添加元素,修改tail指针;pop的时候返回head指向元素,修改head指针。
//
// Created by Administrator on 2022/12/20.
//
#ifndef CONCURRENCE_QUEUE_T0_H
#define CONCURRENCE_QUEUE_T0_H
#include <memory>
//使用普通链表实现的单线程队列
template<typename T>
class Queue {
using Type = T;
using Ptr = std::shared_ptr<T>;
struct Node {
Ptr data_;
std::unique_ptr<Node> next_;
Node(Ptr t): data_(std::move(t)) {}
};
using NodePtr = std::unique_ptr<Node>;
NodePtr head_;
Node* tail_ = nullptr;
public:
Queue(const Queue&) = delete;
Queue& operator= (const Queue&) = delete;
Ptr tryPop() {
if (!head_) return Ptr{};
Ptr const ret = head_->data_;
//head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值
NodePtr tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next
head_ = std::move(tmpHead->next_);
if (!head_) {
tail_ = nullptr;
}
return ret;
}
void push(T t) {
NodePtr node{new Node{std::make_shared<T>(std::move(t))}};
Node *newTail = node.get();
if (tail_) {
tail_->next_ = std::move(node);
} else {
head_ = std::move(node);
}
tail_ = newTail;
}
};
#endif //CONCURRENCE_QUEUE_T0_H
由于只能通过将head和tail赋null来判断队列是否为空,所以push和pop的时候都会同时需要修改head和tail,而这样子就要求我们同时用互斥保护,这样就和用一个互斥保护没有什么区别了。或许可以通过条件判断在合适的时候加锁来避免同时加两把锁。但是当队列中仅有一个元素的时候,head_和tail_会指向相同的元素,这时虽然是两个不同的互斥,但是还是会有条件竞争。
不过说实话这些看起来很简单的代码如果单单靠眼睛盯着书本看,虽然当时觉得自己理解了,但是自己写起来才发现有很多细节,果然书上的代码要手敲一遍才能真正理解,否则看到后面越来越觉得云里雾里,难以理解。
上面的实现方式中,由于需要维护队列为空的信息,在push和pop中都同时需要对head和tail进行修改。如果我们使用一个哑节点(dummy node)就可以避免这种复杂的判断情况。不同于一般链表为了方便在链表头部添加元素在链表头部设置哑节点,这里为了方便在链表尾部添加元素在链表尾部设置哑节点,当head和tail都指向哑节点时说明队列为空。(不需要修改指针状态为null,只需要向后移动)
//
// Created by Administrator on 2022/12/20.
//
#ifndef CONCURRENCE_QUEUE_T1_H
#define CONCURRENCE_QUEUE_T1_H
#include <memory>
//使用普通链表实现的单线程队列
template<typename T>
class Queue {
using Type = T;
using Ptr = std::shared_ptr<T>;
struct Node {
Ptr data_;
std::unique_ptr<Node> next_;
Node() {}
Node(Ptr t): data_(std::move(t)) {}
};
using NodePtr = std::unique_ptr<Node>;
NodePtr head_;
Node* tail_;
public:
Queue(): head_(new Node), tail_(head_.get()) {}
Queue(const Queue&) = delete;
Queue& operator= (const Queue&) = delete;
Ptr tryPop() {
if (head_.get() == tail_) return Ptr{};
Ptr const ret = head_->data_;
//head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值
NodePtr tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next
head_ = std::move(tmpHead->next_);
return ret;
}
void push(T t) {
auto newData = std::make_shared<T>(std::move(t));
NodePtr node{new Node};
tail_->data_ = newData;
Node* const newTail = node.get();
tail_->next_ = std::move(node); //unique_ptr只能移动赋值、构造,不能拷贝
tail_ = newTail;
}
};
#endif //CONCURRENCE_QUEUE_T1_H
通过添加哑节点的方式,我们避免了之前为了记录队列为空的状态复杂的分支判断,并且pop函数仅仅修改head指针,push函数仅仅修改tail指针。虽然pop函数还需要读取tail指针,但是并不会修改其状态。这样子同样避免了push和pop操作同一个节点的条件竞争问题,队列非空情况下,head指针和tail指针都不会处理同一个节点,队列为空的情况下,pop操作会直接返回,不会和push进行竞争。接下来我们进行合适的加锁令其变成线程安全的队列。
//
// Created by Administrator on 2022/12/21.
//
#ifndef CONCURRENCE_QUEUE_V2_H
#define CONCURRENCE_QUEUE_V2_H
#include <mutex>
#include <memory>
namespace edward::multithread {
//精细粒度锁的线程安全队列
template<typename T>
class Queue {
using Type = T;
using Ptr = std::shared_ptr<T>;
struct Node {
Ptr data_;
std::unique_ptr<Node> next_;
};
using NodePtr = std::unique_ptr<Node>;
NodePtr head_;
node* tail_;
std::mutex mtxHead_, mtxTail_;
public:
Queue() : head_(new Node), tail_(head_.get()) {}
Queue(const Queue&) = delete;
Queue& operator= (const Queue&) = delete;
void push(T t) {
Ptr data = std::make_shared<T>(std::move(t)); //将内存分配等操作放在临界区外
NodePtr dummyNode {new Node};
Node* const newTail = dummyNode.get();
std::lock_guard lk(mtxTail_); //C++17支持模板类的模板参数推断
tail_->data_ = data;
tail_->next_ = std::move(dummyNode);
tail_ = newTail;
}
Ptr tryPop() {
NodePtr tmpHead_; //在临界区外释放头节点
{
std::lock_guard lk(mtxHead_);
if (std::lock_guard lk1(mtxTail_); head_.get() == tail_) {
return Ptr{};
}
//head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值
tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next
head_ = std::move(tmpHead->next_);
}
return tmpHead_->data_; //将函数返回放在临界区外
}
};
}
#endif //CONCURRENCE_QUEUE_V2_H
和书上代码6.6的实现大同小异,书中将tryPop函数中获取头节点的部分封装成了函数,我这里用局部作用域也能实现相同的效果。在pop函数中为了判断队列是否为空,在if语句中首先上锁,这种在if语句中声明变量的写法在C++17以后被支持。
那么上面的队列是否是线程安全?
上面的实现没有阻塞等待pop的部分,接下来我们配合条件变量实现这一部分,完成整个线程安全的队列
//
// Created by Administrator on 2022/12/21.
//
#ifndef CONCURRENCE_QUEUE_V2_H
#define CONCURRENCE_QUEUE_V2_H
#include <mutex>
#include <memory>
#include <condition_variable>
namespace edward::multithread {
//精细粒度锁的线程安全队列
template<typename T>
class Queue {
using Type = T;
using Ptr = std::shared_ptr<T>;
public:
Queue() : head_(new Node), tail_(head_.get()) {}
Queue(const Queue&) = delete;
Queue& operator= (const Queue&) = delete;
void push(T t) {
Ptr data = std::make_shared<T>(std::move(t)); //将内存分配等操作放在临界区外
NodePtr dummyNode {new Node};
Node* const newTail = dummyNode.get();
{
std::lock_guard lk(mtxTail_); //C++17支持模板类的模板参数推断
tail_->data_ = data;
tail_->next_ = std::move(dummyNode);
tail_ = newTail;
}
cond_.notify_one();
}
Ptr tryPop() {
NodePtr tmpHead; //在临界区外释放头节点
{
std::lock_guard lk(mtxHead_);
if (head_.get() == getTail()) {
return Ptr{};
}
//head_ = head_->next_; 但是unique_ptr不能直接赋值,必须用一个临时节点接管head_,然后再赋值
tmpHead = std::move(head_); //防止由于给head赋值释放掉head->next
head_ = std::move(tmpHead->next_);
}
return tmpHead->data_; //将函数返回放在临界区外
}
Ptr pop() {
NodePtr tmpHead;
{
std::unique_lock lk(mtxHead_);
cond_.wait(lk, [this]() {
head_.get() != getTail();
});
tmpHead = std::move(head_);
head_ = std::move(tmpHead->next_);
}
return tmpHead->data_;
}
bool empty() const {
std::lock_guard lk(mtxHead_);
return head_.get() == getTail();
}
private:
struct Node {
Ptr data_;
std::unique_ptr<Node> next_;
};
using NodePtr = std::unique_ptr<Node>;
NodePtr head_;
Node* tail_;
mutable std::mutex mtxHead_, mtxTail_;
mutable std::condition_variable cond_;
Node* getTail() const {
std::lock_guard lk(mtxTail_);
return tail_;
}
};
}
#endif //CONCURRENCE_QUEUE_V2_H
主要就是在push函数后通知,在pop函数中等待条件变量,等待退出条件是队列不为空。这里还是把尾指针状态的获取封装了一下,使得程序看起来更加简洁。
书里面觉得有一些过度封装了,很简单的代码却被层层封装,当然还因为有两个相似的接口需要复用。不过我觉得另一个传入引用参数的形式非常丑,觉得保留返回智能指针的接口就可以了。
这个队列是无限队列,只要存在空闲内存,各个线程还是能够持续往队列添加新数据。与之对应的是有限队列,其最大长度在创建之时就已经确定。一旦有限队列容量已满,再试图向其压入数据就会阻塞,直到有数据弹出而产生空闲空间。有限队列可用于多线程的工作分配,能够依据待执行任务的数量,确保工作在各个线程中均匀分配。相比无限队列,有限队列能够防止某些线程向队列添加任务的速度过快,远远超过处理任务的速度,从而导致某个线程的任务堆积,而其他线程的任务饥饿。
线程安全的链表需要支持的操作:
要想提高链表的并发程度,我们就得采用精细粒度的锁操作,让每个节点都具有自己的互斥。
//
// Created by edward on 23-2-17.
//
#ifndef CONCURRENCE_LIST_H
#define CONCURRENCE_LIST_H
#include <mutex>
#include <memory>
namespace edward::multithread {
template<typename T>
class List {
using Ptr = std::shared_ptr<T>;
struct Node;
using NodePtr = std::unique_ptr<Node>;
struct Node {
std::mutex mtx_;
Ptr data_;
NodePtr next_;
explicit Node(const Ptr &data = Ptr{}): data_(data) {}
};
NodePtr head_;
public:
List(): head_(new Node) {}
List(const List&) = delete;
List& operator= (const List&) = delete;
void add(T t) {
NodePtr newNode(new Node(std::make_shared<T>(std::move(t))));
std::lock_guard lk(head_->mtx_);
newNode->next_ = std::move(head_->next_);
head_->next_ = std::move(newNode);
}
template<typename Function>
void for_each(Function f) { //考虑到多线程环境下可能引用失效,不使用万能引用,而是简单传值
Node *cur = head_.get(), *next;
std::unique_lock<std::mutex> lk(head_->mtx_);
while (next = cur->next_.get()) {
std::unique_lock<std::mutex> nextLk(next->mtx_);
lk.unlock();
f(next->data_);
cur = next;
lk = std::move(nextLk);
}
}
template<typename Predicate>
Ptr find_first_if(Predicate p) {
Node *cur = head_.get(), *next;
std::unique_lock<std::mutex> lk(head_->mtx_);
while (next = cur->next_.get()) {
std::unique_lock<std::mutex> nextLk(next->mtx_);
lk.unlock();
if (p(next->data_)) {
return next->data_;
}
cur = next;
lk = std::move(nextLk);
}
return Ptr{};
}
template<typename Predicate>
void remove(Predicate p) {
Node* cur = head_.get(), *next;
std::unique_lock<std::mutex> lk(head_->mtx_);
while (next = cur->next_.get()) {
std::unique_lock<std::mutex> nextLk(next->mtx_);
if (p(next->data_)) {
NodePtr oldNext = std::move(cur->next_); //避免next节点的释放导致后面所有节点的释放
cur->next_ = std::move(next->next_);
nextLk.unlock(); //这个锁已经没有用了,但还是释放掉
} else {
lk.unlock(); //尽可能缩小临界区
cur = next;
lk = std::move(nextLk);
}
}
}
};
}
#endif //CONCURRENCE_LIST_H
为了简化结构,这里的链表是单向链表(不用考虑加锁不同顺序导致死锁)。不过其实觉得多线程环境下,节点之间的相对顺序已经没有什么意义,双向链表没有必要。
头节点设置为哑节点,每次添加数据都添加到头部,只需要获取头节点一个的互斥就可以安全添加数据。遍历链表的过程需要锁住相邻的两个互斥,一旦我们锁住了下一个节点,就可以安全释放当前节点的锁,并且迭代进行这一过程。删除操作不会立即释放当前节点的锁,当判断当前节点需要删除时,需要对当前节点的next指针改动。这里有一个细节是当我们判断该节点需要删除时,使用一个临时的智能指针保存节点,然后在改动完链表后及时解锁,因为该智能指针离开作用域就是释放内存,包括前面已经上锁的互斥,如果不及时解锁会引发为定义行为。
通过这样设计,我们实现了精细粒度的互斥操作增加并发程度。由于是单向链表,所有线程加锁的顺序都一致,这也会导致如果某个线程阻塞在某个互斥上(由于用户传入的操作),就会使得其他线程无法越过那个互斥。这一点只能够要求用户对节点都采用非阻塞的操作。
在设计无锁数据结构时,我们需要极为小心、谨慎,因为他们的正确实现相当不容易,而导致代码出错的情形可能难以复现。
阻塞型算法/数据结构:使用了互斥、条件变量或 future进行同步操作的算法或数据结构,执行会导致线程阻塞,直到其他线程释放互斥、通知条件变量或为future对象填充结果。
非阻塞型(nonblocking)算法/数据结构:没有使用阻塞型库函数调用的算法和数据结构。在实践中,我们需要对非阻塞数据结构进行分类:
无锁数据结构:
尝试使用链表实现栈容器(之所以不使用数组是因为在无锁情况下,不再是互斥修改,多个线程都在修改的情况下顺序性无法保证)
添加数据:1.创建新节点 → 2.新节点的next指向头节点 → 3.更新头节点为新节点
在没有互斥保护的情况下,步骤2和3之间可能存在条件竞争。无锁的做法是首先保证对头节点的修改是原子的(头节点是原子类型),其次每次通过compare_exchange_weak函数查看是否有其他线程修改头节点,如果没有修改,就更新头节点,如果已经修改,则修改新节点的next指针的,再次尝试修改头节点。
#include <memory>
#include <atomic>
namespace edward::multithread {
template<typename T>
class LockFreeStack {
public:
void push(T t) {
Node *node = new Node(t, head_.load());
while (!head_.compare_exchange_weak(node->next_, node));
}
private:
struct Node {
std::shared_ptr<T> data_;
Node *next_;
Node(T t, Node* next = nullptr)
: data_(std::make_shared<T>(std::move(t)))
, next_(next)
{}
};
std::atomic<Node*> head_;
};
}
弹出数据的步骤:1.读取head → 2.读取head→next → 3. head改为head→next → 4.返回data → 5.删除旧节点
#include <memory>
#include <atomic>
namespace edward::multithread {
template<typename T>
class LockFreeStack {
using Ptr = std::shared_ptr<T>;
public:
void push(T t) {
Node *node = new Node(std::make_shared<T>(std::move(t)), head_.load());
while (!head_.compare_exchange_weak(node->next_, node));
}
Ptr pop() {
Node* head = head_.load();
while(head && !head_.compare_exchange_weak(head, head->next_));
if (!head) return Ptr{}; //空栈
Ptr ret = head->data_;
//delete head;
return ret;
}
private:
struct Node {
Ptr data_;
Node *next_;
Node(Ptr data, Node* next = nullptr)
: data_(data)
, next_(next)
{}
};
std::atomic<Node*> head_ = nullptr;
};
}
使用智能指针避免处理类型T的拷贝异常(详细的讨论见第3章线程安全栈的设计)。无锁设计的核心步骤就是,首先准备好要修改的数据,然后使用比较-交换原子操作不断判断数据是否满足不变量,如果满足就立即更新。
此时的栈容器在逻辑上已经没有问题了,但是还存在没有释放pop出的节点所占用内存所导致的内存泄漏的问题。
之所以不能在移动头节点后立即释放旧的头节点,是因为如果有多个线程同时pop,那么他们可能尝试pop同一个头节点,其中一个线程完成了修改,另一个线程还在执行while语句,使用旧的头节点的head→next进行判断,虽然一旦运行一次比较-交换操作就会更新head,但是第一次运行会RE。push函数虽然也会并行访问,但是push并不会访问已经存在于栈容器节点上的内容。
为了避免这种风险,书中介绍了一种内存回收机制:使用链表管理所有未删除的节点,如果只有一个线程调用pop函数,那么它除了要负责删除当前节点,还需要删除所有未删除的节点,如果有多个线程调用pop函数,那么我们无法判断是否可以安全删除,只能将其添加在链表上。
#include <memory>
#include <atomic>
namespace edward::multithread {
template<typename T>
class LockFreeStack {
using Ptr = std::shared_ptr<T>;
public:
void push(T t) {
Node *node = new Node(std::make_shared<T>(std::move(t)), head_.load());
while (!head_.compare_exchange_weak(node->next_, node));
}
Ptr pop() {
++threadsInPop_;
Node* head = head_.load();
while(head && !head_.compare_exchange_weak(head, head->next_));
Ptr ret; //空栈返回空指针
//从节点提取数据,节点可能不立即删除,但是如果返回的数据一旦不需要(例如不保存返回值)就立即被删除
if (head) ret.swap(head->data_);
try_reclaim(head);
return ret;
}
private:
struct Node {
Ptr data_;
Node *next_;
Node(Ptr data, Node* next = nullptr)
: data_(data)
, next_(next)
{}
};
static void delete_nodes(Node *nodes) {
Node *node;
while (nodes) {
node = nodes;
nodes = nodes->next_;
delete node;
}
}
void try_reclaim(Node *oldHead) {
if (threadsInPop_ == 1) {
Node *deletedNodes = deletedNodes_.exchange(nullptr); //接手候删链表
if (!--threadsInPop_) {
delete_nodes(deletedNodes);
} else if (deletedNodes) {
chain_pending_nodes(deletedNodes);
}
delete oldHead;
} else {
chain_pending_node(oldHead);
--threadsInPop_;
}
}
//将deletedNodes链表接在nodes链表后面
void chain_pending_nodes(Node* nodes) {
Node *last = nodes;
while (last->next_) {
last = last->next_;
}
chain_pending_nodes(nodes, last);
}
void chain_pending_nodes(Node* first, Node* last) {
last->next_ = deletedNodes_;
while (!deletedNodes_.compare_exchange_weak(last->next_, first));
}
//将deletedNodes链表接在节点n后面
void chain_pending_node(Node *n) {
chain_pending_nodes(n, n);
}
std::atomic<Node*> head_ = nullptr; //链表头节点,用来模拟栈
std::atomic<int> threadsInPop_ = 0; //运行pop的线程个数
std::atomic<Node*> deletedNodes_ = nullptr; //候删节点链表
};
}
负责内存回收的代码主要是try_reclaim
函数,它会判断当前有多少个线程调用pop函数,决定是否能够安全删除。如果调用try_reclaim函数时只有一个线程调用pop函数,那么当前节点oldHead是肯定可以释放的,原因在于oldHead只能被pop函数的while语句执行通过前共享地访问,一旦线程通过了while语句,那么得到的肯定是不同的oldHead,此时在try_reclaim中进行判断只有当前线程调动pop函数,说明其他线程已经通过while语句,因此可以释放。
代码的难点在于try_reclaim函数中,如果判断当前只有一个线程在执行pop函数,获取到未删除节点链表后,为什么还要判断是否有其他线程在调用pop函数?原因是,我们获取的这个未删除节点链表中的节点有可能有其他线程的pop函数正在访问,为了避免释放节点后导致Runtime Error,我们必须再判断一次是否有其他线程调用pop函数,如果没有,那么我们就可以认定得到的这个未删除节点链表上的节点都是可以安全释放的,如果有其他线程,那么我们只能再把这些节点放回去。
其中的辅助函数chain_pending_nodes
是负责向未删除节点deletedNodes链表上添加节点的,其实现颇为精妙,关键的地方在于先准备好待添加节点链表的头部和尾部,然后将deletedNoeds添加在尾部,并更新deletedNodes为新链表,这里使用了compare_exchange_weak
实现这一原子操作。
整个代码比较复杂,需要仔细揣摩。该内存回收机制在低负荷时能够良好运行,当在高负荷情况下,几乎任何时候都有线程调用pop函数,导致无法触发内存回收,未删除节点链表将无休止地增加,导致事实上的内存泄漏。这要求我们寻求不同的方法来回收节点。
若某节点仍被其他线程指涉,而我们依然删除它,便成了“冒险”动作。删除目标节点后,别的线程还持有指涉它的引用,还通过这一引用对其进行访问,这会导致程序产生未定义的行为(多半是RE)。为了避免这种情况,假设当前线程要访问某对象,而它却被别的线程删除,那就让前者设置一指涉目标对象的风险指针,以同志其他线程删除该对象将产生实质风险。若程序不再需要那个对象,风险指针则被清零。
若能够安全且精准地辨识哪些节点正被访问,以及知晓它们何时不再为线程所访问,我们即可将其删除。引用计数针对各个节点分别维护一个计数器,随时知悉访问它的线程数目,在没有人使用时将其删除,这正是我们所熟悉的智能指针的解决方案。但是虽然智能指针shared_ptr
的实现有用到原子特性,但是对他的操作却不是原子的。
C++20针对std::atomic<>对std::shared_ptr进行特化,使得其在不具备平实拷贝语义的情况下能够正常使用。当前g++12开始支持std::atomic<std::shared_ptr<T>>
(头文件),为此,我们在系统中安装g++12,并使用其进行编译。
sudo apt install g++-12
#include <memory>
void test_atomic_lock_free() {
shared_ptr<int> p;
print("atomic_shared_ptr", ":", atomic_is_lock_free(&p)); //false
print("atomic<shared_ptr>", ":", std::atomic<std::shared_ptr<int>>().is_lock_free()); //false
}
然后我们就能完善代码使得其能够正确回收内存。
#include <memory>
#include <atomic>
namespace edward::multithread {
template<typename T>
class LockFreeStack {
using Ptr = std::shared_ptr<T>;
struct Node;
using NodePtr = std::shared_ptr<Node>;
public:
void push(T t) {
NodePtr node = std::make_shared<Node>(std::make_shared<T>(std::move(t)), head_.load());
while (!head_.compare_exchange_weak(node->next_, node));
}
Ptr pop() {
NodePtr head = head_.load();
while(head && !head_.compare_exchange_weak(head, head->next_));
Ptr ret; //空栈返回空指针
//从节点提取数据,节点可能不立即删除,但是如果返回的数据一旦不需要(例如不保存返回值)就立即被删除
if (head) {
head->next_.reset();
ret.swap(head->data_);
}
return ret;
}
private:
struct Node {
Ptr data_;
NodePtr next_;
Node(Ptr data, NodePtr next = nullptr)
: data_(data)
, next_(next)
{}
};
std::atomic<NodePtr> head_ = nullptr; //链表头节点,用来模拟栈
};
}
我们用带引用计数的智能指针shared_ptr来保存节点,这样就不同考虑节点的生存周期,它会在没有线程使用时自动析构。头节点仍然需要使用原子类型,多个线程都会对其进行修改。
我自己的实现与书中实现不同的地方有两点:
在获取到旧的头节点后,清空旧节点的数据域(使用swap交换而不是直接赋值),确保数据的生存周期不会被意外延长
节点的next指针使用普通智能指针而不是原子智能指针。我认为节点的next指针只会被正确使用一次,即使多个线程同时pop同一个节点,也只会是第一个线程用到正确的next指针,也就是说只会有一个线程写入(其他的线程都会在while循环中访问其他节点),我们仅仅需要保证节点有next指针不会导致RE即可。
根据我的测试,g++-12实现的原子智能指针不是无锁的,next指针使用原子类型无疑会影响性能
目前还没有搞清除内存模型的工作原理,日后再完成这部分的笔记。
//
// Created by edward on 23-2-13.
//
#ifndef CONCURRENCE_LOCKFREEQUEUE_H
#define CONCURRENCE_LOCKFREEQUEUE_H
#include <memory>
namespace edward::multithread {
template<typename T>
class LockFreeQueue {
using Ptr = std::shared_ptr<T>;
struct Node;
// using NodePtr = std::shared_ptr<Node>;
using NodePtr = Node*;
public:
LockFreeQueue(): head_(new Node), tail_(head_.load()) {}
~LockFreeQueue() {
//把尚存在队列的节点删除
NodePtr node = head_.load(), p;
while (node) {
p = node->next_;
delete node;
node = p;
}
}
void push(T t) {
// NodePtr newTail = std::make_shared<Node>();
Ptr data = std::make_shared<T>(std::move(t));
NodePtr newTail = new Node();
NodePtr oldTail = tail_.load();
oldTail->data_.swap(data);
oldTail->next_ = newTail;
tail_.store(newTail);
}
Ptr tryPop() {
NodePtr oldHead = head_.load();
if (oldHead == tail_.load()) return Ptr{}; //空队列
head_.store(oldHead->next_);
Ptr ret;
ret.swap(oldHead->data_);
delete oldHead; //已经没有人可以看到了
return ret;
}
Ptr pop() {
Ptr ret;
while (!(ret = tryPop())) { //自旋
std::this_thread::yield();
}
return ret;
}
bool empty() const {
return head_ == tail_;
}
private:
struct Node {
Ptr data_;
NodePtr next_;
Node() = default;
explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) {}
};
std::atomic<NodePtr> head_, tail_;
};
}
#endif //CONCURRENCE_LOCKFREEQUEUE_H
上面的实现是简单的SPSC队列(单一生产者、单一消费者队列),借鉴第6章设计多个互斥保护的线程安全队列的经验,我们只需要把头节点head和尾节点tail用原子类型进行同步,因为生产者和消费者调用的是不同的函数,所以我们只需要保证head和tail之间的同步关系。
多线程环境下,多个线程都会调用pop和push函数,pop函数比较好处理,因为我们只需要读取一次数据(如同无锁栈的pop函数一样,将链表指针向后移动,不同的地方在于需要不断判断是否为空栈)。
主要的难点在于push函数,为了判断队列是否为空,设置尾指针指向哑节点(第6章已经进行讨论,否则就需要同时修改头指针和尾指针,不利于并行),每次push把数据放在哑结点的数据域中,然后再将尾指针向后移动。
一方面, 要保证push和pop操作的先行关系,即每次push修改尾指针后数据应该已经就绪(先保存数据),另一方面多线程环境下我们要保证操作的原子性,就只能原子地完成存储数据、修改尾指针的过程,这是简单的CAS操作无法做到的。
为了实现线程安全的push操作,有两种解决方案:
允许空节点存在,每次push时存入两个节点,一个用来保存数据,一个用来设置哑结点。通过这种方式我们就能够将修改原子化,但是这样要求pop能够处理空节点,并且会浪费一倍的空间
要求对数据域的操作是原子的。这样就可以在每次push的时候先使用CAS修改数据域,一旦修改失败,说明其他线程已经push过了,那我们就重新获取尾指针,再次尝试对数据域的更新操作。当我们更新哑结点数据域但未更新尾指针时,其他线程调用pop就会阻塞在CSA操作处,直到我们更新尾指针。
需要注意的是这样我们就不能在pop时将头结点的数据域立即清空,可能会引发ABA问题。例如线程A和线程B同时调用push函数,线程A抢占成功,将自己的数据存入,线程B由于线程调度获取oldTail以后没有执行CAS就被挂起,线程C调用pop函数,此时的head指向oldTail(只有一个数据),更新头结点,读取数据后将数据域清空,此时线程B开始运行,它看到的oldTail已经被丢弃,已经被线程C访问过了,但是它并不知道,还尝试更新oldTail的数据域,结果更新成功(CAS操作仅判断数据域是否非空),但是数据保存失败,出现数据覆写现象,造成ABA问题。
我们可以将头结点的数据域设置为其他非空值,从而使得数据的生命周期不会因为节点的内存没有及时释放而意外延长。
//
// Created by edward on 23-2-13.
//
#ifndef CONCURRENCE_LOCKFREEQUEUE_H
#define CONCURRENCE_LOCKFREEQUEUE_H
#include <memory>
namespace edward::multithread {
template<typename T>
class LockFreeQueue {
using Ptr = std::shared_ptr<T>;
struct Node;
// using NodePtr = std::shared_ptr<Node>;
using NodePtr = Node*;
static Ptr tmpP;
public:
LockFreeQueue(): head_(new Node), tail_(head_.load()) {}
~LockFreeQueue() {
/*
//把尚存在队列的节点删除
NodePtr node = head_.load(), p;
while (node) {
p = node->next_;
delete node;
node = p;
}
*/
}
void push(T t) {
// NodePtr newTail = std::make_shared<Node>();
Ptr data = std::make_shared<T>(std::move(t));
NodePtr newTail = new Node();
NodePtr oldTail = tail_.load();
Ptr oldData = nullptr;
while (!oldTail->data_.compare_exchange_weak(oldData, data)) {
oldTail = tail_.load();
oldData.reset();
}
oldTail->next_ = newTail;
tail_.store(newTail);
}
Ptr tryPop() {
NodePtr oldHead = head_.load();
if (oldHead == tail_.load()) return Ptr{}; //空队列
while (!head_.compare_exchange_weak(oldHead, oldHead->next_)) {
if (oldHead == tail_.load()) return Ptr{}; //空队列
}
Ptr ret = oldHead->data_.load();
oldHead->data_ = tmpP;
// delete oldHead; //已经没有人可以看到了
return ret;
}
Ptr pop() {
Ptr ret;
while (!(ret = tryPop())) { //自旋
std::this_thread::yield();
}
return ret;
}
bool empty() const {
return head_.load() == tail_.load();
}
private:
struct Node {
std::atomic<Ptr> data_;
NodePtr next_;
Node() = default;
explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) {}
};
std::atomic<NodePtr> head_, tail_;
};
template<typename T>
LockFreeQueue<T>::Ptr LockFreeQueue<T>::tmpP = std::make_shared<T>();
}
#endif //CONCURRENCE_LOCKFREEQUEUE_H
正如上面所讲,我们不能立即释放节点的内存,即使节点的数据已经被读出,但是可能有其他线程看到的是旧的节点,等待着通过CAS操作失败进行更新。为了管理内存,我们尝试使用带引用计数的智能指针shared_ptr
管理节点内存,并使用C++20开始支持的std::atomic<std::shared_ptr<T>>
管理头尾指针,保证线程安全。
//
// Created by edward on 23-2-13.
//
#ifndef CONCURRENCE_LOCKFREEQUEUE_H
#define CONCURRENCE_LOCKFREEQUEUE_H
#include "utils.h"
#include <memory>
namespace edward::multithread {
template<typename T>
class LockFreeQueue {
using Ptr = std::shared_ptr<T>;
struct Node;
using NodePtr = std::shared_ptr<Node>;
// using NodePtr = Node*;
static Ptr tmpP;
public:
LockFreeQueue(): head_(std::make_shared<Node>()), tail_(head_.load()) {}
~LockFreeQueue() {
/*
//把尚存在队列的节点删除
NodePtr node = head_.load(), p;
while (node) {
p = node->next_;
delete node;
node = p;
}
*/
}
void push(T t) {
// NodePtr newTail = std::make_shared<Node>();
Ptr data = std::make_shared<T>(std::move(t));
NodePtr newTail = std::make_shared<Node>();
NodePtr oldTail = tail_.load();
Ptr oldData = nullptr;
while (!oldTail->data_.compare_exchange_weak(oldData, data)) {
oldTail = tail_.load();
oldData.reset();
}
oldTail->next_ = newTail;
tail_.store(newTail);
}
Ptr tryPop() {
NodePtr oldHead = head_.load();
if (oldHead == tail_.load()) return Ptr{}; //空队列
while (!head_.compare_exchange_weak(oldHead, oldHead->next_)) {
if (oldHead == tail_.load()) return Ptr{}; //空队列
}
Ptr ret = oldHead->data_.load();
oldHead->data_ = tmpP;
// delete oldHead; //已经没有人可以看到了
return ret;
}
Ptr pop() {
Ptr ret;
while (!(ret = tryPop())) { //自旋
std::this_thread::yield();
}
return ret;
}
bool empty() const {
return head_ == tail_;
}
private:
struct Node {
std::atomic<Ptr> data_;
NodePtr next_;
Node() {
// edward::print("ctor");
}
explicit Node(Ptr data, NodePtr next = NodePtr{}): data_(data), next_(next) {
// edward::print("ctor");
}
~Node() {
// edward::print("dtor");
}
};
std::atomic<NodePtr> head_, tail_;
};
template<typename T>
LockFreeQueue<T>::Ptr LockFreeQueue<T>::tmpP = std::make_shared<T>();
}
#endif //CONCURRENCE_LOCKFREEQUEUE_H
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。