第二章
std::thread
的析构函数在线程对象没有调用join或detach的时候会调用std::terminate()
。
如果一个线程我们没有调用join
或detach
,也就是说并没有分离出去或阻塞直至新线程执行完毕并回收资源,(也就是joinable() == true
)。那么当这个线程被析构的时候,std::thread
的析构函数会调用std::terminate()
。这是非常不好的行为。因为是异常!
有四种情况在调用
thread
析构函数的时候是正常销毁:- 被默认构造
- 默认构造的线程对象不认为是激活的线程对象
- 被移动
- 已调用
join()
- 已调用
detach()
- 被默认构造
为何要这么设计(effective c++ 条款37部分)
因为剩下两种选项:隐式join
或隐式detach
会更糟糕。
如果隐式join,则析构函数会阻塞等待至线程结束。这听上去好像不错,但是会导致难以追踪的性能问题。
如果隐式detach,假设线程使用了某些局部变量,或者是使用的资源已被释放,会导致很多问题。同时可能此时被释放的内存被其他资源或程序占用,而一直运行的线程会无意间使用这部分本来就毫无相关的资源。
注意在将仿函数(函数对象)作为参数传入thread
对象的时候,避免语法歧义。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class obj{
//其他东西
};
int main(){
thread mythread(obj()); //错误
mythread.join();
thread mythread1((obj())); //可以
mythread1.join();
thread mythread2{obj()}; //可以
mythread2.join();
return 0;
}
传参的时候需要使用引用就需要用std::ref/cref
。因为默认一律按照拷贝/移动传值。
- 一个实参从主线程传递到子线程的线程函数中,需要经过两次传递。第1次发生在
std::thread
构造时,此次参数按值并以副本形式被保存在thread
对象内部。第2次发生在向线程函数传递时,此次传递是由子线程发起,并将之前std::thread
内部保存的副本以右值的形式(std::move())传入线程函数中的。 - 当我们使用了
ref
去完成引用传参的时候,其实我们会先创建一个std::ref
的临时对象,这里保存着我们要传入的那个值的引用。然后这个std::ref
再以副本的形式保存在thread
对象中。随后这个副本被move
到线程函数,由于std::ref
重载了operator T&()
,因此会隐式转换为对应的类型。因此看起来就像是真的按照引用个传递进来了一样。- 此处参考:https://en.cppreference.com/w/cpp/thread/thread/thread
如果线程函数的形参为T、const T&或T&&类型时,std::thread的构造函数可以接受左值或右值实参。因为不管是左值还是右值,在std::thread中均是以副本形式被保存,并在第2次向线程函数传参时以右值方式传入,而以上三种形参均可接受右值。
而如果线程函数的形参为T&时,不管是左值还是右值的T类型实参,都是无法直接经std::thread传递给形参为T&的线程函数,因为该实参数的副本会被std::move成右值并传递线程函数,但T&无法接受右值类型。因此,需要以std::ref形式传入。
当向线程函数传参时,可能发生隐式类型转换,这种转换是在子线程中进行的。需要注意,由于隐式转换会构造临时对象,并将该对象(是个右值)传入线程函数,因此线程函数的形参应该是可接受右值类型的T、const T&或T&&类型,但不能是T&类型。此外,如果源类型是指针或引用类型时,还要防止可能发生悬空指针和悬空引用的现象。
因为thread函数使用了完美转发,所以在某些只支持移动的参数中,必须显示使用move来转移所有权。比如智能指针。(异步操作API也一样)
- 参考资料:https://www.cnblogs.com/5iedu/p/11633683.html
传递成员函数做为线程执行函数注意语法
需要传入成员函数指针。语法参见杂记2(显式指定作用域并取地址)。并且要传入类对象地址做为类成员函数的隐藏参数- this指针做为第二个参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class obj{
public:
void execute(){
cout << "start" << endl;
}
};
int main(){
obj classobj;
thread mythread(&obj::execute, &classobj); //需要传入成员函数指针。语法参见杂记2(显式指定作用域并取地址)。并且要传入类对象地址做为类成员函数的隐藏参数- this指针。
thread mythread(&obj::execute, ref(classobj)); //引用
thread mythread(&obj::execute, classobj); //对象
mythread.join();
return 0;
}
此处第二个参数传递引用,指针或者是对象本身都可以。具体情况需要具体分析。比如当禁用了X类的拷贝构造的时候,第三种按值传递就不可以。因为是拷贝。 最好是传递指针。因为类成员函数的第一个参数是this指针。所以thread的第二个参数应该最好是这个类对象的指针。 还有,如果按照值传递,会变成副本。可能会导致原对象被销毁。
线程对象不可以被复制。但是可以被移动
1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // 1
std::thread t2=std::move(t1); // 2
t1=std::thread(some_other_function); // 3
std::thread t3; // 4
t3=std::move(t2); // 5
t1=std::move(t3); // 6 赋值操作将使程序崩溃
并发编程P32
前面几个都没问题。包括t1。因为在给t1赋值前,其所有权已经转移至t2。此时t1就空了。
赋值给一个已经有关联线程的std::thread,系统直接调用 std::terminate() 终止程序继续运行。
- 不能通过赋新值给 std::thread 对象的方式来”丢弃”一个线程。
- 如果这个容器是移动敏感的(比如,标准中的
std::vector<>
),那么移动操作同样适用于这些容器。
小心局部变量导致的悬空引用
比如我们有这样一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void func2(int& x){
while(true){
cout << x << endl;
Sleep(1000);
}
}
void func1(){
int x = 10;
thread th2(func2, ref(x)); //按照引用传递
th2.detach();
Sleep(1000);
cout <<"thread 1 finished" << endl;
}
int main(){
thread th1(func1);
th1.join();
Sleep(10000); //一定要sleep
}
- 在这个例子中,输出了两次
x
后,输出thread 1 finished
。这时候马上会有segmentation fault
。因为这时候x
已经被销毁因为线程1已经结束。所以这时候func2
引用的x
就是悬空引用。
避免应用被抛出的异常所终止。通常,在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免生命周期的问题。最好使用RAII
使用RAII包装线程对象避免在某些可能抛出异常的函数中忘记join
或detach
线程对象
- 一个简单的线程对象包装类。这里的语义是持有一个线程对象的引用。如果想写为值语义,也就是包装类本身持有线程对象,可以参考effective modern C++的条款37
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class thread_guard{
thread& t; //持有一个线程对象的引用
public:
explicit thread_guard (thread& rhs):t(rhs){}; //构造函数要explicit避免隐式类型转换
~thread_guard(){
if(t.joinable()){ //析构函数中如果线程是joinable的
cout << "析构" << endl;
t.join();
}
}
thread_guard(const thread&) = delete; //禁用拷贝构造和拷贝赋值
thread_guard& operator=(const thread&) = delete;
};
int main(){
thread my_thread(func);
thread_guard my_guard(my_thread); //使用RAII对象管理线程对象
Sleep(5000);
cout<<"abc"<<endl;
//安全join。
return 0;
}
在给thread调用的线程函数传递参数的时候,注意参数是否是不可拷贝的
1
2
3
4
5
6
7
8
9
10
11
12
void threadfunc(mutex &mylock){ //注意必须传引用
Sleep(1000);
mylock.unlock();
}
void testfunc(){
mylock.lock();
thread th(threadfunc, ref(mylock)); //注意必须传引用
mylock.lock();
mylock.unlock();
th.join();
}
因为mutex
是不可拷贝的。所以必须引用传参。
# 第三章
- 尽可能不要使用原始锁,而使用
lock_guard
。因为使用原始锁意味着必须在函数的每一个出口都手动解锁。 - 一个指针或引用,也会让这种保护形同虚设。切勿将受保护数据的指针或引用传递到互斥锁作用域之外。因为指针或引用可以在锁的作用域外面直接修改数据。锁是管不到那些的。所以切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论 是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。
一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
class list_wrapper{
list<int> my_list;
mutex m;
void add_to_list(int x){
lock_guard<mutex> lcg(m);
my_list.push_front(x);
}
list<int>* get_list(){
return &my_list;
}
};
如上面所示,我们有一个函数会返回一个指向受保护数据(my_list
)的指针。然而有可能有别人通过这个指针直接修改我们的my_list
。这样做就完全绕过了我们的函数add_to_list
所添加的锁的保护。
- 避免死锁
- 方式之一就是让两个互斥量总以相同的顺序上锁。不要一个函数先锁A再锁B,另一个先锁B再锁A。
- 我们也可以使用
std::lock
或std::scoped_lock
(c++17起)来同时上锁多个互斥量。(由于设计的原因,不会发生死锁。因为一旦一个锁报异常就会把另一个锁也解锁 – 要么两个都锁,要么一个都不锁。)同时,我们可以用lock_guard
的可选参数std::adopt_lock
来接管已经上锁了的互斥量。 - 避免死锁的进阶指导:
- 避免嵌套锁。
- 避免在持有锁时调用外部代码。
- 使用固定顺序获取锁。
- 使用层次锁结构。当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。因此锁的顺序只能先锁层级高的锁再锁层级低的锁。
unique_lock
比lock_guard
更灵活,但是成本会更高。- 一般情况下,尽可能将持有锁的时间缩减到最小。
- 单例模式也可以使用
call_once
来解决线程安全问题。 - 可以使用读写锁来减少一定场合下普通锁的开销。c中可以使用
pthread_rwlock_t
,c++可以使用shared_lock
(c++17起)/shared_mutex
(c++14起)- 用
std::unique_lock
与std::lock_guard
管理排他性锁定。(写锁) - 用
std::shared_lock
管理共享锁定。(读锁)
- 用
第四章
- 同步操作可使用条件变量。此处不赘述用法。
- 剩下的都在杂记3.
thread源码解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 管理线程的类
class thread {
public:
class id; // 内部ID类
typedef void *native_handle_type;
thread() _NOEXCEPT { // 构造函数,专门为空线程设计。这就是为什么默认构造的线程对象并不是有效的
_Thr_set_null(_Thr); // 宏定义,原型为:#define _Thr_set_null(thr) (thr._Id = 0)
}
template<class _Fn, class... _Args>
explicit thread(_Fn&& _Fx, _Args&&... _Ax) { // 带参模板构造函数_Fx(_Ax...)
_Launch(&_Thr, _STD bind(_Decay_copy(_STD forward<_Fn>(_Fx)), _Decay_copy(_STD forward<_Args>(_Ax))...));
//一堆乱七八糟连带着完美转发和退化的。你看到copy了。所以说了thread是一种必拷贝传值的。引用的话得用ref包一下。
}
~thread() _NOEXCEPT { // 析构函数
if (joinable()) // 如果析构线程对象的时候,线程依旧是可结合的(没有调用join或detach)的话会调用terminate
_XSTD terminate(); // terminate会调用abort()来终止程序
}
thread(thread&& _Other) _NOEXCEPT : _Thr(_Other._Thr) { // 拷贝构造函数,调用move
_Thr_set_null(_Other._Thr);
}
thread& operator=(thread&& _Other) _NOEXCEPT { // 赋值函数,调用move
return (_Move_thread(_Other));
}
thread(const thread&) = delete; // 禁用 拷贝构造函数
thread& operator=(const thread&) = delete; // 禁用 赋值函数
void swap(thread& _Other) _NOEXCEPT { // 交换两线程
_STD swap(_Thr, _Other._Thr);
}
bool joinable() const _NOEXCEPT { // 若线程可结合程,返回 true;否则,返回flase
return (!_Thr_is_null(_Thr)); // 宏定义,原型为:#define _Thr_is_null(thr) (thr._Id == 0)
}
void join(); // 线程结合,阻塞的
void detach() { // 线程分离
if (!joinable()) // 若线程是不可结合的,则异常
_Throw_Cpp_error(_INVALID_ARGUMENT);
_Thrd_detachX(_Thr); //detach的核心
_Thr_set_null(_Thr); //设置线程id为0
}
id get_id() const _NOEXCEPT; // 获取线程唯一 id
static unsigned int hardware_concurrency() _NOEXCEPT { // 返回硬件线程上下文数量
return (::Concurrency::details::_GetConcurrency());
}
native_handle_type native_handle() { // 以 void* 形式返回线程的 Win32 句柄
return (_Thr._Hnd);
}
private:
thread& _Move_thread(thread& _Other) { // move from _Other
if (joinable())
_XSTD terminate();
_Thr = _Other._Thr;
_Thr_set_null(_Other._Thr);
return (*this);
}
_Thrd_t _Thr; // 私有成员变量,_Thrd_t是一个结构体
};
里面的_Thrd_t
结构体长这样:
1
2
3
4
5
6
7
8
_Thrd_t _Thr; //其实_Thrd_t 是类型的别名
typedef _Thrd_imp_t _Thrd_t; // 而_Thrd_imp_t是一个结构体
typedef struct { /* 线程 Win32 标识符 */
void *_Hnd; /* Win32 句柄 */
unsigned int _Id; // 线程id
} _Thrd_imp_t;
join
长这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
inline void thread::join(){ // join thread
if (!joinable()) // 如果线程不可结合,调用join就报错。
_Throw_Cpp_error(_INVALID_ARGUMENT);
if (_Thr_is_null(_Thr)) // 如果是空线程,调用join继续报错。
_Throw_Cpp_error(_INVALID_ARGUMENT);
if (get_id() == _STD this_thread::get_id()) // 线程不能自己join自己。自己join自己就死锁了。
_Throw_Cpp_error(_RESOURCE_DEADLOCK_WOULD_OCCUR);
if (_Thrd_join(_Thr, 0) != _Thrd_success) // 线程结合(_Thrd_join()是join方法的核心),是阻塞的
_Throw_Cpp_error(_NO_SUCH_PROCESS); //如果结合失败就报错
_Thr_set_null(_Thr); // 设置线程id为0
}
所以我们看到这几种情况不可以join
:
- 默认构造的线程对象(空对象)
- 是非joinable的 (比如已经
join
或者detach
过的,move
过的和空的) - 自己和自己。
如果一个可结合的线程经过join
后(等线程执行完毕后),会将线程id置为0。
我们查看_Move_thread
函数可以发现,如果尝试给一个仍有效的线程对象再次用一个线程对象赋值,则会触发terminate
为什么析构一个非joinable的线程会报错?
- 在实例化了线程对象之后,它们的状态默认都是可结合的,如果现在直接调用它们的析构函数来析构它们,那么在析构的时候线程处于什么状态呢?是执行完了吗?还是正在执行呢?注意,如果一个在没有结合(join)的情况下,就算它先于主线程执行完毕,其id依然是不为0的。所以我们是不能确定其状态的,所以我们只能析构明确了id为0的线程。因为id为0的线程要么已经执行完毕,要么是空线程,要么是分离后的线程。
- 另外,一个线程分离(
detach
)后,该线程对象便不能控制该线程,而是交由系统接管。
学习,整理自这里
join和detach区别
join
在主线程和新线程中引入了一个同步点。- 主线程调用
join
后,会阻塞直到新线程完成并返回。只有当join
返回后,主线程才能回收被调线程资源,并继续运行。
- 主线程调用
- 当使用
detach
函数时,主线程继续运行,被调线程驻留后台运行。二者同时运行。主调线程无法再取得该被调线程的控制权。当主调线程结束时,由运行时库负责清理与被调线程相关的资源。(thread
对象已和底层执行线程分离)
我们对一个线程对象使用join
或detach
后,这个线程对象就不再是joinable
的了。这时候线程对象可以被安全销毁。
当detach
函数被调用后,执行的线程从线程对象中被分离,已不再被一个线程对象所表达–这是两个独立的事情。C++线程对象可以被销毁,同时OS执行的线程可以继续。如果程序想要知道执行的线程何时结束,就需要一些其它的机制。join
函数在那个thread对象上不能再被调用,因为它已经不再和一个执行的线程相关联。 去销毁一个仍然可以“joinable”的C++线程对象会被认为是一种错误。为了销毁一个C++线程对象,要么join
函数需要被调用(并结束),要么detach
函数被调用。如果一个C++线程对象当销毁时仍然可以被join
,异常会被抛出。
提防表面的线程安全
假设我们有这样一个表面线程安全的stack
包装器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template<typename T>
class not_good_stack{
public:
stack<T> my_stk;
mutex m;
void push(const T& element){
lock_guard<mutex>lcg (m);
my_stk.push(element);
}
void pop(){
lock_guard<mutex>lcg (m);
my_stk.pop();
}
bool empty(){
lock_guard<mutex>lcg (m);
return my_stk.empty();
}
T& top(){
lock_guard<mutex>lcg (m);
return my_stk.top();
}
};
它很简单,因为只是给每一个标准库实现的函数外面加了个锁。而且现在看起来很安全。不会同时又push
又pop
。但是有一个问题。在empty
和pop
中间,top
和pop
中间都会有间接的线程不安全问题。
1
2
3
4
5
6
7
8
void race1(){
not_good_stack<int> mystk;
if(!mystk.empty()){
int value = mystk.top();
mystk.pop();
}
}
在这种情况下,假设我们现在stack
只有一个元素。有可能有两个线程并发的进入了该函数。
- 第一个函数检查是否为空,答案是不为空,所以进入
if
。 - 第二个函数此时刚好也进来了,检查是否为空。由于第一个函数没有来得及进行下一步,则此时依旧不为空。也进入
if
。
这时候就已经有问题了。可能第一个元素已经移除元素,这时候top
会出现错误。
甚至,因为两个线程必定会执行两次pop
。但是此时只有一个元素。所以会出现错误。
如果并不是只有一个元素,也会额外的多删除一个元素。也是错误。
如何解决
一般有两种方式。
- 第一种方式是针对整个函数
race1
加锁。 - 另一种方式是把有竞争关系的函数整合为一个。例如
empty
和pop
合起来
1
2
3
4
5
6
7
8
void pop(){
lock_guard<mutex>lcg (m);
if(my_stk.empty()){
cout << "already empty" <<endl;
return;
}
my_stk.pop();
}
信号量
- 这个
sem_init
最后的值意思是初始值是几。也不能完全理解为物品容量。信号量后续操作是单纯地对那个数字进行增减。而这个数字只有初始值而没有顶(所以会溢出)。wait
会让这个数字减掉1。如果相减后小于0了就阻塞(所以这个数字理论上可以到-1,因为我们如果现在是1,则拿一个资源就是0。因为现在我拿到了资源所以不会被阻塞,然后下一个线程进来了-1,发现结果变成了-1,就会阻塞)。 - 选择题中,这个值可以为负。负数就是目前有多少个进程/线程在等待该资源
- 这个值可能会溢出。也就是初始值假如5,可能变成6,溢出,这样会返回错误 一个错误代码是
EOVERFLOW
。但是一般操作系统会忽略。 post
会让睡眠的进程唤醒,如果相加后发现信号量值<=0,意味着有程序被阻塞,则会唤醒对应的线程或者是进程。如果>0则意味着没有进程睡眠
这也就是为什么信号量是先等待(-1),后加锁。因为wait
本身是阻塞的,如果小于0了就阻塞。如果先加锁,发现小于0了直接阻塞就没办法解锁了。这也是为什么条件变量要反过来。
如何理解信号量
- 信号量理解为需要控制的资源数量。比如经典的更衣室场景。我们不应该只限制一个人在更衣室内,假如更衣室的容量是50,则应该允许高达50人同时在更衣室内。所以使用信号量,让信号量的初始值为50是非常合理的选择,也满足语义。
- 所以如果初始值是1,那么这就是一种互斥锁。
- 同时,我们反复强调,只有当信号量数值小于等于0的时候才会引起阻塞,所以由于这个特殊性,信号量不需要broadcast。因为信号量数值增加(有人从更衣室出来)的方式只可以是一个一个增加,所以只需要挑选一个睡眠的进程(线程)唤醒即可。
例子
- 假设我们使用的是一个信号量,即消费者和生产者共享信号量。
- 一个信号量的时候就是,如果数字不为0,就该消费消费该生产生产。如果数字为0了,那么消费者就等着,等生产者生产完了通知后继续消费。
- 首先,初始化的时候,我们不能让消费者直接消费,所以初始化的值一定是0。(如果生产者消费者区分信号量,则生产者信号量初始值应为队列的最大值,消费者信号量初始值仍旧应为0。
- 生产者:
- 加锁
- 生产
- 解锁
- post [+1]
- 消费者:
- wait [-1] 一定要先等待。如果上来就锁了,因为wait是阻塞的如果是0就阻塞等待,那生产者也拿不到锁也没办法生产了。
- 加锁
- 消费
- 解锁
- 假设我们使用的是两个信号量,即消费者和生产者不共享信号量。(其实和条件变量差不多)
- 首先,初始化的时候,我们不能让消费者直接消费,所以初始化的值一定是0。但是生产者可以直接生产。所以初始化的值可以为队列最大值,比如8。
- 生产者:
- wait[-1] 注意这个时候是减掉的生产者自己的空位。也就是每生产一次,减掉一个。他最多生产8个,生产多了就停止等待让这个数字不为0。(消费者会+1)
- 加锁
- 生产
- 解锁
- post [+1]注意这个时候是添加的消费者的消费。让消费者的信号量不为1
- 消费者:
- wait[-1] 注意这个时候是减掉的消费者自己的空位。也就是记录有多少可以消费的
- 加锁
- 消费
- 解锁
- post[+1]注意这个是告诉生产者+1,也就是可生产的空位+1.
信号量可以有多种同步方式
- 实现一次临时的happens-before 语义
- 初始信号量为
0
, A; V(S)
- V就是
post
- V就是
P(S); B
P
就是wait
- 假设S只被使用一次,则保证A happens-before B。因为如果A没有执行完毕后把信号量的值增加1,则B无法执行,因为前面一直在等待信号量的值增加。
- 举个例子就是标准库的线程的
join
操作。如果使用这个方法,则每一个子线程会V(Sn)
。主线程会P(Sn)
。比如第一个线程会V(S1)
,第二个线程会V(S2)
。主线程就是P(S1); P(S2)
- 初始信号量为
- 实现计数型同步
- 初始信号量为
0
- 执行线程每完成一次工作,
V(S)
- 主线程等候,
P(S) * 工作数量
来检查是否完成全部工作。 - 举个例子就是标准库的线程的
join
操作。如果使用这个方法,则每一个子线程会V(S)。主线程会P(S)
。比如第一个线程会V(S)
,第二个线程会V(S)
。主线程就是P(S)*n
。
- 初始信号量为
条件变量
条件变量的语义是:wait until。
条件变量一定要先加锁而且必须是可以手动解锁的锁比如unique_lock
而不能使用lock_guard
。也可以用一个条件变量也可以用两个。核心是pthread_cond_wait
。原理是首先调用方抢锁,然后发现需要等待,所以调用方会被阻塞(睡眠并加入等待队列),然后互斥锁解锁,让其余线程抢锁。其余线程如果抢到锁执行完了任务,然后就可以调用notify
通知。通知后调用方的wait
会重新加锁并唤醒当前进程(之后wait
函数返回)。系统保证解锁并睡眠是原子操作。系统也保证加锁并唤醒是原子操作。
- 为了防止虚假唤醒,必须要使用
while
而不能使用wait。
因为首先,我们的生产者和消费者线程全部都是wait
同一个条件变量。我们在wait
函数返回之前,当前进程必须执行 拿到锁–>加锁并唤醒 这两步。加锁并唤醒是原子的,但是并不一定能拿得到这个锁。假如我们消费者1在等待,然后生产者生产完毕,通知消费者。假如这个时候消费者2进来了,直接就拿了锁(因为生产者释放锁到wait
函数拿锁这两步不是原子的。存在这种第三方插进来的情况)然后消费了生产的数据。然后释放锁。这时候我们消费者1才终于拿到锁了,但是这个时候抢到的锁在语义上可能并不是被原始生产者释放的锁,而是被其他消费者释放的锁。所以发现数据已经被消费了(或者是不满足某一条件),这样再去拿数据会有错误。所以必须用while
。也就是使用判别式。
再详细解释一下第三方插进来的情况。其实这样解释并不准确。因为我们提到过,我们的生产者和消费者线程全部都是wait
同一个条件变量。所以我们notify
不一定是notify
到具体哪一个生产者或者是哪个消费者。如果不使用while
再次判断的话,就会发生错误。提到的不满足某一条件,比如队列为空等等。举个例子,此时两个消费者都卡在wait
。生产者生产了一个,notify
。这时候notify
了一个消费者,消费者拿到了锁,消费过后队列为空。这时候再解锁,notify
一个生产者,表示可以生产了。但是此时可能notify
到了另一个消费者。因为消费者已经卡在wait
了(已经判断过条件了),它拿到了锁。然后再想消费但是队列为空。这时候就错了。
- 注意条件变量的信号丢失问题。因为如果没有
notify()
,则wait()
的线程永远不会唤醒。
看看例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::condition_variable cv;
std::mutex gMtx;
void Sender(){
std::unique_lock<std::mutex> lck(gMtx);
std::cout << "Ready Send notification." << std::endl;
lck.unlock(); //先解锁
cv.notify_one(); // 发送通知
}
void Receiver(){
std::unique_lock<std::mutex> lck(gMtx);
std::cout << "Wait for notification." << std::endl;
cv.wait(lck); // 等待通知并唤醒继续执行下面的指令
std::cout << "Process." << std::endl;
}
int main(){
std::thread sender(Sender);
std::thread receiver(Receiver);
sender.join();
receiver.join();
return 0;
}
线程随机启动导致的唤醒丢失,即:通信线程先启动并调用通知函数(notify_one
),但是接收线程还没有开始执行等待(wait
)函数,如果不再次调用函数通知,等待会一直持续下去。这个是最容易发现和验证的问题,上面的主线程中启动线程的顺序就会概率性出现唤醒丢失的问题。
解决方案也比较简单。也是搭配while
和判断式。其实和上面解决虚假唤醒的道理一致。只不过要注意使用while
std::condition_variable::wait
后面的判断式(谓词)的意思是,只要后面的谓词返回false
,则前面无论如何都不会解锁。尽管可能已经被通知到。
判断式法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::condition_variable cv;
std::mutex gMtx;
bool send = false;
void Sender(){
std::cout << "Ready Send notification." << std::endl;
std::unique_lock<std::mutex> lck(gMtx);
send = true;
lck.unlock(); //先解锁
cv.notify_one(); // 发送通知
}
void Receiver(){
std::cout << "Wait for notification." << std::endl;
std::unique_lock<std::mutex> lck(gMtx);
cv.wait(lck, [](){return send;}); // 等待通知并唤醒继续执行下面的指令
std::cout << "Process." << std::endl;
}
int main(){
std::thread sender(Sender);
std::thread receiver(Receiver);
sender.join();
receiver.join();
return 0;
}
- 上面的谓词可以等同于这种写法:
1
2
3
4
5
6
7
8
9
void Receiver1()
{
std::cout << "Wait for notification." << std::endl;
std::unique_lock<std::mutex> lck(gMtx);
while(send == false){
cv.wait(lck); // 等待通知并唤醒继续执行下面的指令
}
std::cout << "Process." << std::endl;
}
有没有发现和解决虚假唤醒的很像?都是while
内有一个判别式,然后循环判断等待。
- 这个方法可以解决唤醒丢失的原因:
- 我们的问题在于
sender
先发送了notify
,然后receiver
才开始wait
。导致丢失 - 所以在使用上述方法后,就算
sender
先发送了nofity
,但是此时send
已经被sender
变更为true
。所以此时就算唤醒receiver
的信号已经丢失,由于send
是true
,所以不会被阻塞。
- 我们的问题在于
- 二者的解决方式都是
while
+判别式。 - 为什么要先解锁,再通知?也就是先
unlock()
,再notify_one()
?
简要分析源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
volatile pthread_t self = thread_self();
acquire(&cond->c_spinlock);// 加锁操作队列
enqueue(&cond->c_waiting, self); // 插入条件的等待队列
release(&cond->c_spinlock); // 操作完释放锁
pthread_mutex_unlock(mutex); // 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
suspend_with_cancellation(self); // 挂起,然后等待条件满足后被唤醒
//上面这行和下面这行不是原子的,所以虚假唤醒发生在这里。
pthread_mutex_lock(mutex); // 被唤醒后重新获取互斥锁
/* This is a cancellation point */
// 取消点,等待期间被取消了
if (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE) {
/* Remove ourselves from the waiting queue if we're still on it */
acquire(&cond->c_spinlock);
// 线程准备退出,从条件阻塞队列中移除
remove_from_queue(&cond->c_waiting, self);
release(&cond->c_spinlock);
pthread_exit(PTHREAD_CANCELED);
}
return 0;
}
static inline void suspend_with_cancellation(pthread_t self)
{
sigset_t mask;
sigjmp_buf jmpbuf;
// 获取当前的信号屏蔽码
sigprocmask(SIG_SETMASK, NULL, &mask); /* Get current signal mask */
// 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
sigdelset(&mask, PTHREAD_SIG_RESTART); /* Unblock the restart signal */
/* No need to save the signal mask, we'll restore it ourselves */
/*
直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
*/
if (sigsetjmp(jmpbuf, 0) == 0) {
self->p_cancel_jmp = &jmpbuf;
// 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
if (! (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE)) {
do {
// 挂起等待restart信号
sigsuspend(&mask); /* Wait for a signal */
} while (self->p_signal != PTHREAD_SIG_RESTART);
}
self->p_cancel_jmp = NULL;
} else {
// 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
sigaddset(&mask, PTHREAD_SIG_RESTART); /* Reblock the restart signal */
sigprocmask(SIG_SETMASK, &mask, NULL);
}
}
// 条件满足,唤醒线程
int pthread_cond_signal(pthread_cond_t *cond)
{
pthread_t th;
acquire(&cond->c_spinlock);
th = dequeue(&cond->c_waiting); // 取出一个被被阻塞的线程
release(&cond->c_spinlock);
if (th != NULL) restart(th); // 发送信号唤醒他
return 0;
}
// 给pid进程发送唤醒信号
static inline void restart(pthread_t th)
{
kill(th->p_pid, PTHREAD_SIG_RESTART);
}
整理自这里
生产者消费者模型中,到底使用几个条件变量?
针对条件变量和生产者消费者模型到底应该用几个条件变量的问题,比较看情况。有可能生产者和消费者各有一个,也可能同时使用一个。为什么呢?
- 这是一个非常不好解答的问题。因为这个和死锁息息相关。比如下面的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
std::condition_variable cvS;
std::condition_variable cvR;
std::condition_variable cv;
std::mutex gMtx;
int countm = 0;
void SenderS(){
std::unique_lock<std::mutex> lck(gMtx);
while(countm >= 1){
std::cout << std::this_thread::get_id() <<"exceedin S" << std::endl;
cv.wait(lck); //or cvS.wait(lck);
}
countm++;
std::cout<< std::this_thread::get_id() << "S"<< countm << std::endl;
lck.unlock();
cv.notify_one(); //or cvR.notify_one(); or cv.notify_all();
}
void ReceiverS(){
std::unique_lock<std::mutex> lck(gMtx);
while(countm <= 0){
std::cout << std::this_thread::get_id() <<"exceedin R" << std::endl;
cv.wait(lck); //or cvR.wait(lck);
}
countm--;
std::cout << std::this_thread::get_id() <<"R" << countm << std::endl;
lck.unlock();
cv.notify_one(); //or cvS.notify_one(); or cv.notify_all();
}
void Sender(){
while(1){
SenderS();
}
}
void Receiver(){
while(1){
ReceiverS();
}
}
int main(){
std::thread sender1(Sender);
std::thread receiver1(Receiver);
std::thread sender2(Sender);
std::thread receiver2(Receiver);
sender1.join();
receiver1.join();
sender2.join();
receiver2.join();
return 0;
}
- 上面的代码死锁了。为何会死锁?现在我们只使用1个条件变量
cv
,并且使用的是cv.notify_one()
- 在极限情况下,假设现在有一个
ReceiverS
执行了,卡在cv.wait
。然后这时候是第二个ReceiverS
执行,也卡在cv.wait
了。然后这时候只能从两个SenderS
中选。其中一个SenderS
成功执行,然后这时候非常不巧。notify
的还是一个SenderS
。然后卡在cv.wait
。然后最后只剩下一个SenderS
,又卡在cv.wait
了。此时所有线程都卡在了cv.wait
。死锁发生。
- 在极限情况下,假设现在有一个
- 但是为什么改成两个条件变量,也就是
cvS
和cvR
就没事呢?- 假设现在有一个
ReceiverS
执行了,卡在cvR.wait
。然后这时候是第二个ReceiverS
执行,也卡在cvR.wait
了。然后这时候只能从两个SenderS
中选。其中一个SenderS
成功执行,然后这时候notify
一定且只能通知到等待cvR
的线程,也就是ReceiverS
。就算现在唤醒的这个ReceiverS
没有成功抢锁,被一个SenderS
抢到了,SenderS
卡在cvS.wait
了。然后此时又没有成功抢锁,第二个SenderS
抢到了,SenderS
也卡在cvS.wait
了。不过没关系。这时候抢到锁的一定是仅剩的一个ReceiverS
。因为ReceiverS
此时已经被唤醒,只是没抢到锁,只要能抢到锁程序就可以继续。然后此时notify
一定且只能通知到等待cvS
的线程,也就是SenderS
。程序继续。
- 假设现在有一个
- 或者改成
cv.notify_all();
- 这样可以确保唤醒所有等候的线程。因为这种情况死锁的核心是非常不巧的
notify
到了自己类型的线程。也就是发送者通知发送者,接收者通知接收者。
- 这样可以确保唤醒所有等候的线程。因为这种情况死锁的核心是非常不巧的
所以说,具体使用一个或两个不是那么简单就可以决定的。这是我的提问。根据我的个人分析,如果使用一个条件变量,为了避免唤醒到错误的线程,则应该使用notify_all()
唤醒所有等候线程。如果使用两个条件变量,则使用notify_one()
唤醒对方的一个线程即可。
同时,具体的while
判断的wait
的条件也很重要。比如现在是两个接收者和两个发送者。如果不更改原有死锁代码,只把countm >= 1
更改为countm >= 2
便没有问题。但是,如果这时候又加了一个发送者,三个发送者的话此时又会死锁。所以为了根本性避免这个问题,就应该使用上一段提到的,自我总结的规则。因为一般来说,具体的while
判断的wait
的条件是不确定的,比如在任务队列场景下,具体应该大于几是可以更改的,同时,具体由几个线程也是可以更改的。但是。任务队列本身的逻辑不应该变动。
杂项
- 注意,线程对象在创建时就已经启动。而非调用
join
或detach
时启动,这里和异步API有些不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void func(){
cout<<"我的线程开始执行了"<<endl;
//...
cout<<"我的线程结束执行了"<<endl;
}
int main(){
thread my_thread(func);
Sleep(5000);
my_thread.join();//等待子线程执行结束
cout<<"abc"<<endl;
return 0;
}
/*
我的线程开始执行了
我的线程结束执行了
然后睡眠直到时间到达。开始继续执行join
abc
*/
C++11原语和C++20原语的比较。来自这里