首页 c++并发编程实战-笔记
文章
取消

c++并发编程实战-笔记

第二章

std::thread的析构函数在线程对象没有调用join或detach的时候会调用std::terminate()

如果一个线程我们没有调用joindetach,也就是说并没有分离出去或阻塞直至新线程执行完毕并回收资源,(也就是joinable() == true)。那么当这个线程被析构的时候,std::thread的析构函数会调用std::terminate()。这是非常不好的行为。因为是异常!

  • 有四种情况在调用thread析构函数的时候是正常销毁:

    • 被默认构造
      • 默认构造的线程对象不认为是激活的线程对象
    • 被移动
    • 已调用join()
    • 已调用 detach()

为何要这么设计(effective c++ 条款37部分)

因为剩下两种选项:隐式join或隐式detach会更糟糕。

如果隐式join,则析构函数会阻塞等待至线程结束。这听上去好像不错,但是会导致难以追踪的性能问题。

如果隐式detach,假设线程使用了某些局部变量,或者是使用的资源已被释放,会导致很多问题。同时可能此时被释放的内存被其他资源或程序占用,而一直运行的线程会无意间使用这部分本来就毫无相关的资源。

注意在将仿函数(函数对象)作为参数传入thread对象的时候,避免语法歧义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class obj{
    //其他东西
};

int main(){
    thread mythread(obj()); //错误
    mythread.join();
    
    thread mythread1((obj())); //可以
    mythread1.join();
    
    thread mythread2{obj()}; //可以
    mythread2.join();
    return 0;
}

传参的时候需要使用引用就需要用std::ref/cref。因为默认一律按照拷贝/移动传值。

  • 一个实参从主线程传递到子线程的线程函数中,需要经过两次传递第1次发生在std::thread构造时,此次参数按值并以副本形式被保存在thread对象内部第2次发生在向线程函数传递时,此次传递是由子线程发起,并将之前std::thread内部保存的副本以右值的形式(std::move())传入线程函数中的。
  • 当我们使用了ref去完成引用传参的时候,其实我们会先创建一个std::ref的临时对象,这里保存着我们要传入的那个值的引用。然后这个std::ref再以副本的形式保存在thread对象中。随后这个副本被move到线程函数,由于std::ref重载了operator T&(),因此会隐式转换为对应的类型。因此看起来就像是真的按照引用个传递进来了一样。
    • 此处参考:https://en.cppreference.com/w/cpp/thread/thread/thread
  • 如果线程函数的形参为T、const T&或T&&类型时,std::thread的构造函数可以接受左值或右值实参。因为不管是左值还是右值,在std::thread中均是以副本形式被保存,并在第2次向线程函数传参时以右值方式传入,而以上三种形参均可接受右值。

  • 而如果线程函数的形参为T&时不管是左值还是右值的T类型实参,都是无法直接经std::thread传递给形参为T&的线程函数,因为该实参数的副本会被std::move成右值并传递线程函数,但T&无法接受右值类型。因此,需要以std::ref形式传入

  • 当向线程函数传参时,可能发生隐式类型转换,这种转换是在子线程中进行的。需要注意,由于隐式转换会构造临时对象,并将该对象(是个右值)传入线程函数,因此线程函数的形参应该是可接受右值类型的T、const T&或T&&类型,但不能是T&类型。此外,如果源类型是指针或引用类型时,还要防止可能发生悬空指针和悬空引用的现象。

  • 因为thread函数使用了完美转发,所以在某些只支持移动的参数中,必须显示使用move来转移所有权。比如智能指针。(异步操作API也一样)

  • 参考资料:https://www.cnblogs.com/5iedu/p/11633683.html

传递成员函数做为线程执行函数注意语法

需要传入成员函数指针。语法参见杂记2(显式指定作用域并取地址)。并且要传入类对象地址做为类成员函数的隐藏参数- this指针做为第二个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class obj{
    public:
    void execute(){
        cout << "start" << endl;
    }
};



int main(){
    obj classobj;
    thread mythread(&obj::execute, &classobj); //需要传入成员函数指针。语法参见杂记2(显式指定作用域并取地址)。并且要传入类对象地址做为类成员函数的隐藏参数- this指针。
   
    thread mythread(&obj::execute, ref(classobj)); //引用
    
    thread mythread(&obj::execute, classobj); //对象
    mythread.join();

    return 0;
}

此处第二个参数传递引用,指针或者是对象本身都可以。具体情况需要具体分析。比如当禁用了X类的拷贝构造的时候,第三种按值传递就不可以。因为是拷贝。 最好是传递指针。因为类成员函数的第一个参数是this指针。所以thread的第二个参数应该最好是这个类对象的指针。 还有,如果按照值传递,会变成副本。可能会导致原对象被销毁。

线程对象不可以被复制。但是可以被移动

1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // 1
std::thread t2=std::move(t1); // 2
t1=std::thread(some_other_function); // 3
std::thread t3; // 4
t3=std::move(t2); // 5
t1=std::move(t3); // 6 赋值操作将使程序崩溃

并发编程P32

前面几个都没问题。包括t1。因为在给t1赋值前,其所有权已经转移至t2。此时t1就空了。

  • 赋值给一个已经有关联线程的std::thread,系统直接调用 std::terminate() 终止程序继续运行。

  • 不能通过赋新值给 std::thread 对象的方式来”丢弃”一个线程。
  • 如果这个容器是移动敏感的(比如,标准中的 std::vector<> ),那么移动操作同样适用于这些容器。

小心局部变量导致的悬空引用

比如我们有这样一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void func2(int& x){
    while(true){
        cout << x << endl;
        Sleep(1000);
    }
}
void func1(){
    int x = 10;
    thread th2(func2, ref(x)); //按照引用传递
    th2.detach();
    Sleep(1000);
    cout <<"thread 1 finished" << endl;
}



int main(){
    thread th1(func1);
    th1.join();
    Sleep(10000); //一定要sleep
}
  • 在这个例子中,输出了两次x后,输出thread 1 finished。这时候马上会有segmentation fault。因为这时候x已经被销毁因为线程1已经结束。所以这时候func2引用的x就是悬空引用。

避免应用被抛出的异常所终止。通常,在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免生命周期的问题。最好使用RAII

使用RAII包装线程对象避免在某些可能抛出异常的函数中忘记joindetach线程对象

  • 一个简单的线程对象包装类。这里的语义是持有一个线程对象的引用。如果想写为值语义,也就是包装类本身持有线程对象,可以参考effective modern C++的条款37
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class thread_guard{
    thread& t; //持有一个线程对象的引用
    public:
        explicit thread_guard (thread& rhs):t(rhs){}; //构造函数要explicit避免隐式类型转换

        ~thread_guard(){
            if(t.joinable()){ //析构函数中如果线程是joinable的
                cout << "析构" << endl;
                t.join();
            }
        }
        thread_guard(const thread&) = delete; //禁用拷贝构造和拷贝赋值
        thread_guard& operator=(const thread&) = delete;

};

int main(){
    thread my_thread(func);

    thread_guard my_guard(my_thread); //使用RAII对象管理线程对象
    Sleep(5000);

    cout<<"abc"<<endl;
    //安全join。
    return 0;
}

在给thread调用的线程函数传递参数的时候,注意参数是否是不可拷贝的

1
2
3
4
5
6
7
8
9
10
11
12
void threadfunc(mutex &mylock){ //注意必须传引用
    Sleep(1000);
    mylock.unlock();
}
void testfunc(){
    mylock.lock();
    thread th(threadfunc, ref(mylock)); //注意必须传引用
    mylock.lock();
    mylock.unlock();
    th.join();

}

因为mutex是不可拷贝的。所以必须引用传参。

# 第三章

  • 尽可能不要使用原始锁,而使用lock_guard。因为使用原始锁意味着必须在函数的每一个出口都手动解锁。
  • 一个指针或引用,也会让这种保护形同虚设。切勿将受保护数据的指针或引用传递到互斥锁作用域之外。因为指针或引用可以在锁的作用域外面直接修改数据。锁是管不到那些的。所以切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论 是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
class list_wrapper{
    list<int> my_list;
    mutex m;

    void add_to_list(int x){
        lock_guard<mutex> lcg(m);
        my_list.push_front(x);
    }

    list<int>* get_list(){
        return &my_list;
    }
};

如上面所示,我们有一个函数会返回一个指向受保护数据(my_list)的指针。然而有可能有别人通过这个指针直接修改我们的my_list。这样做就完全绕过了我们的函数add_to_list所添加的锁的保护。

  • 避免死锁
    • 方式之一就是让两个互斥量总以相同的顺序上锁。不要一个函数先锁A再锁B,另一个先锁B再锁A。
    • 我们也可以使用std::lockstd::scoped_lock(c++17起)来同时上锁多个互斥量。(由于设计的原因,不会发生死锁。因为一旦一个锁报异常就会把另一个锁也解锁 – 要么两个都锁,要么一个都不锁。)同时,我们可以用lock_guard 的可选参数std::adopt_lock来接管已经上锁了的互斥量。
    • 避免死锁的进阶指导:
      • 避免嵌套锁。
      • 避免在持有锁时调用外部代码。
      • 使用固定顺序获取锁。
      • 使用层次锁结构。当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。因此锁的顺序只能先锁层级高的锁再锁层级低的锁。
  • unique_locklock_guard更灵活,但是成本会更高。
  • 一般情况下,尽可能将持有锁的时间缩减到最小。
  • 单例模式也可以使用call_once来解决线程安全问题。
  • 可以使用读写锁来减少一定场合下普通锁的开销。c中可以使用pthread_rwlock_t,c++可以使用shared_lock(c++17起)/shared_mutex(c++14起)
    • std::unique_lockstd::lock_guard 管理排他性锁定。(写锁)
    • std::shared_lock 管理共享锁定。(读锁)

第四章

  • 同步操作可使用条件变量。此处不赘述用法。
  • 剩下的都在杂记3.

thread源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 管理线程的类
class thread {
public:
	class id;                   // 内部ID类
	typedef void *native_handle_type;
 
	thread() _NOEXCEPT {	    // 构造函数,专门为空线程设计。这就是为什么默认构造的线程对象并不是有效的
		_Thr_set_null(_Thr);    // 宏定义,原型为:#define _Thr_set_null(thr) (thr._Id = 0)
	}
 
	template<class _Fn, class... _Args>
    explicit thread(_Fn&& _Fx, _Args&&... _Ax) {	// 带参模板构造函数_Fx(_Ax...)
		_Launch(&_Thr, _STD bind(_Decay_copy(_STD forward<_Fn>(_Fx)), _Decay_copy(_STD forward<_Args>(_Ax))...));
        //一堆乱七八糟连带着完美转发和退化的。你看到copy了。所以说了thread是一种必拷贝传值的。引用的话得用ref包一下。
    }
 
	~thread() _NOEXCEPT {	    // 析构函数
		if (joinable())         // 如果析构线程对象的时候,线程依旧是可结合的(没有调用join或detach)的话会调用terminate
			_XSTD terminate();  // terminate会调用abort()来终止程序
	}
 
	thread(thread&& _Other) _NOEXCEPT : _Thr(_Other._Thr) {	    // 拷贝构造函数,调用move
		_Thr_set_null(_Other._Thr);
	}
 
	thread& operator=(thread&& _Other) _NOEXCEPT {	// 赋值函数,调用move
		return (_Move_thread(_Other));
	}
 
	thread(const thread&) = delete;                 // 禁用 拷贝构造函数
	thread& operator=(const thread&) = delete;      // 禁用 赋值函数
 
	void swap(thread& _Other) _NOEXCEPT {	        // 交换两线程
		_STD swap(_Thr, _Other._Thr);
	}
 
	bool joinable() const _NOEXCEPT {	            // 若线程可结合程,返回 true;否则,返回flase
		return (!_Thr_is_null(_Thr));               // 宏定义,原型为:#define _Thr_is_null(thr) (thr._Id == 0)
	}
 
	void join();                                    // 线程结合,阻塞的
 
	void detach() {	                                // 线程分离
		if (!joinable())                            // 若线程是不可结合的,则异常
			_Throw_Cpp_error(_INVALID_ARGUMENT);
		_Thrd_detachX(_Thr);					//detach的核心
		_Thr_set_null(_Thr);					//设置线程id为0
	}
 
	id get_id() const _NOEXCEPT;                    // 获取线程唯一 id
 
	static unsigned int hardware_concurrency() _NOEXCEPT {	    // 返回硬件线程上下文数量
		return (::Concurrency::details::_GetConcurrency());
	}
 
	native_handle_type native_handle() {	        // 以 void* 形式返回线程的 Win32 句柄
		return (_Thr._Hnd);
	}
 
private:
	thread& _Move_thread(thread& _Other) {	        // move from _Other
		if (joinable())
			_XSTD terminate();
		_Thr = _Other._Thr;
		_Thr_set_null(_Other._Thr);
		return (*this);
	}
 
	_Thrd_t _Thr;            // 私有成员变量,_Thrd_t是一个结构体
};

里面的_Thrd_t结构体长这样:

1
2
3
4
5
6
7
8
_Thrd_t _Thr; //其实_Thrd_t 是类型的别名
 
typedef _Thrd_imp_t _Thrd_t;    // 而_Thrd_imp_t是一个结构体
 
typedef struct {	/* 线程 Win32 标识符 */
    void *_Hnd;	    /* Win32 句柄 */
    unsigned int _Id;    // 线程id
} _Thrd_imp_t;

join长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
inline void thread::join(){	// join thread
	if (!joinable())            // 如果线程不可结合,调用join就报错。
		_Throw_Cpp_error(_INVALID_ARGUMENT);
	if (_Thr_is_null(_Thr))     // 如果是空线程,调用join继续报错。
		_Throw_Cpp_error(_INVALID_ARGUMENT);
	if (get_id() == _STD this_thread::get_id()) // 线程不能自己join自己。自己join自己就死锁了。
		_Throw_Cpp_error(_RESOURCE_DEADLOCK_WOULD_OCCUR);
	if (_Thrd_join(_Thr, 0) != _Thrd_success)   // 线程结合(_Thrd_join()是join方法的核心),是阻塞的
		_Throw_Cpp_error(_NO_SUCH_PROCESS); //如果结合失败就报错
	_Thr_set_null(_Thr);        // 设置线程id为0
}

所以我们看到这几种情况不可以join

  • 默认构造的线程对象(空对象)
  • 是非joinable的 (比如已经join或者detach过的,move过的和空的)
  • 自己和自己。

如果一个可结合的线程经过join后(等线程执行完毕后),会将线程id置为0。

我们查看_Move_thread函数可以发现,如果尝试给一个仍有效的线程对象再次用一个线程对象赋值,则会触发terminate

为什么析构一个非joinable的线程会报错?

  • 在实例化了线程对象之后,它们的状态默认都是可结合的,如果现在直接调用它们的析构函数来析构它们,那么在析构的时候线程处于什么状态呢?是执行完了吗?还是正在执行呢?注意,如果一个在没有结合(join)的情况下,就算它先于主线程执行完毕,其id依然是不为0的。所以我们是不能确定其状态的,所以我们只能析构明确了id为0的线程。因为id为0的线程要么已经执行完毕,要么是空线程,要么是分离后的线程。
  • 另外,一个线程分离(detach)后,该线程对象便不能控制该线程,而是交由系统接管。

学习,整理自这里

join和detach区别

  • join在主线程和新线程中引入了一个同步点。
    • 主线程调用join后,会阻塞直到新线程完成并返回。只有当join返回后,主线程才能回收被调线程资源,并继续运行。
  • 当使用detach函数时,主线程继续运行,被调线程驻留后台运行。二者同时运行。主调线程无法再取得该被调线程的控制权。当主调线程结束时,由运行时库负责清理与被调线程相关的资源。(thread对象已和底层执行线程分离)

我们对一个线程对象使用joindetach后,这个线程对象就不再是joinable的了。这时候线程对象可以被安全销毁。

detach函数被调用后,执行的线程从线程对象中被分离,已不再被一个线程对象所表达–这是两个独立的事情。C++线程对象可以被销毁,同时OS执行的线程可以继续。如果程序想要知道执行的线程何时结束,就需要一些其它的机制。join函数在那个thread对象上不能再被调用,因为它已经不再和一个执行的线程相关联。 去销毁一个仍然可以“joinable”的C++线程对象会被认为是一种错误。为了销毁一个C++线程对象,要么join函数需要被调用(并结束),要么detach函数被调用。如果一个C++线程对象当销毁时仍然可以被join,异常会被抛出。

提防表面的线程安全

假设我们有这样一个表面线程安全的stack包装器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template<typename T>
class not_good_stack{
    public:
    stack<T> my_stk;
    mutex m;
        void push(const T& element){
            lock_guard<mutex>lcg (m);
            my_stk.push(element);
        }
        void pop(){
            lock_guard<mutex>lcg (m);
            my_stk.pop();
        }
        bool empty(){
            lock_guard<mutex>lcg (m);
            return my_stk.empty();
        }
        T& top(){
            lock_guard<mutex>lcg (m);
            return my_stk.top();
        }
};

它很简单,因为只是给每一个标准库实现的函数外面加了个锁。而且现在看起来很安全。不会同时又pushpop。但是有一个问题。在emptypop中间,toppop中间都会有间接的线程不安全问题。

1
2
3
4
5
6
7
8
void race1(){
    not_good_stack<int> mystk;

    if(!mystk.empty()){
        int value = mystk.top();
        mystk.pop();
    }  
}

在这种情况下,假设我们现在stack只有一个元素。有可能有两个线程并发的进入了该函数。

  • 第一个函数检查是否为空,答案是不为空,所以进入if
  • 第二个函数此时刚好也进来了,检查是否为空。由于第一个函数没有来得及进行下一步,则此时依旧不为空。也进入if

这时候就已经有问题了。可能第一个元素已经移除元素,这时候top会出现错误。

甚至,因为两个线程必定会执行两次pop。但是此时只有一个元素。所以会出现错误。

如果并不是只有一个元素,也会额外的多删除一个元素。也是错误。

如何解决

一般有两种方式。

  • 第一种方式是针对整个函数race1加锁。
  • 另一种方式是把有竞争关系的函数整合为一个。例如emptypop合起来
1
2
3
4
5
6
7
8
void pop(){
    lock_guard<mutex>lcg (m);
    if(my_stk.empty()){
        cout << "already empty" <<endl;
        return;
    }
    my_stk.pop();
}

信号量

  • 这个sem_init最后的值意思是初始值是几。也不能完全理解为物品容量。信号量后续操作是单纯地对那个数字进行增减。而这个数字只有初始值而没有顶(所以会溢出)。wait会让这个数字减掉1。如果相减后小于0了就阻塞(所以这个数字理论上可以到-1,因为我们如果现在是1,则拿一个资源就是0。因为现在我拿到了资源所以不会被阻塞,然后下一个线程进来了-1,发现结果变成了-1,就会阻塞)
  • 选择题中,这个值可以为负。负数就是目前有多少个进程/线程在等待该资源
  • 这个值可能会溢出。也就是初始值假如5,可能变成6,溢出,这样会返回错误 一个错误代码是EOVERFLOW。但是一般操作系统会忽略。
  • post会让睡眠的进程唤醒,如果相加后发现信号量值<=0,意味着有程序被阻塞,则会唤醒对应的线程或者是进程。如果>0则意味着没有进程睡眠

这也就是为什么信号量是先等待(-1),后加锁。因为wait本身是阻塞的,如果小于0了就阻塞。如果先加锁,发现小于0了直接阻塞就没办法解锁了。这也是为什么条件变量要反过来。

如何理解信号量

  • 信号量理解为需要控制的资源数量。比如经典的更衣室场景。我们不应该只限制一个人在更衣室内,假如更衣室的容量是50,则应该允许高达50人同时在更衣室内。所以使用信号量,让信号量的初始值为50是非常合理的选择,也满足语义。
  • 所以如果初始值是1,那么这就是一种互斥锁。
  • 同时,我们反复强调,只有当信号量数值小于等于0的时候才会引起阻塞,所以由于这个特殊性,信号量不需要broadcast。因为信号量数值增加(有人从更衣室出来)的方式只可以是一个一个增加,所以只需要挑选一个睡眠的进程(线程)唤醒即可。

例子

  • 假设我们使用的是一个信号量,即消费者和生产者共享信号量。
    • 一个信号量的时候就是,如果数字不为0,就该消费消费该生产生产。如果数字为0了,那么消费者就等着,等生产者生产完了通知后继续消费。
    • 首先,初始化的时候,我们不能让消费者直接消费,所以初始化的值一定是0。(如果生产者消费者区分信号量,则生产者信号量初始值应为队列的最大值,消费者信号量初始值仍旧应为0。
    • 生产者:
      • 加锁
      • 生产
      • 解锁
      • post [+1]
    • 消费者:
      • wait [-1] 一定要先等待。如果上来就锁了,因为wait是阻塞的如果是0就阻塞等待,那生产者也拿不到锁也没办法生产了。
      • 加锁
      • 消费
      • 解锁
  • 假设我们使用的是两个信号量,即消费者和生产者共享信号量。(其实和条件变量差不多)
    • 首先,初始化的时候,我们不能让消费者直接消费,所以初始化的值一定是0。但是生产者可以直接生产。所以初始化的值可以为队列最大值,比如8。
    • 生产者:
      • wait[-1] 注意这个时候是减掉的生产者自己的空位。也就是每生产一次,减掉一个。他最多生产8个,生产多了就停止等待让这个数字不为0。(消费者会+1)
      • 加锁
      • 生产
      • 解锁
      • post [+1]注意这个时候是添加的消费者的消费。让消费者的信号量不为1
    • 消费者:
      • wait[-1] 注意这个时候是减掉的消费者自己的空位。也就是记录有多少可以消费的
      • 加锁
      • 消费
      • 解锁
      • post[+1]注意这个是告诉生产者+1,也就是可生产的空位+1.

信号量可以有多种同步方式

  • 实现一次临时的happens-before 语义
    • 初始信号量为0,
    • A; V(S)
      • V就是post
    • P(S); B
      • P就是wait
    • 假设S只被使用一次,则保证A happens-before B。因为如果A没有执行完毕后把信号量的值增加1,则B无法执行,因为前面一直在等待信号量的值增加。
    • 举个例子就是标准库的线程的join操作。如果使用这个方法,则每一个子线程会V(Sn)。主线程会P(Sn)。比如第一个线程会V(S1),第二个线程会V(S2)。主线程就是P(S1); P(S2)

QQ截图20230508221734

  • 实现计数型同步
    • 初始信号量为0
    • 执行线程每完成一次工作,V(S)
    • 主线程等候,P(S) * 工作数量来检查是否完成全部工作。
    • 举个例子就是标准库的线程的join操作。如果使用这个方法,则每一个子线程会V(S)。主线程会P(S)。比如第一个线程会V(S),第二个线程会V(S)。主线程就是P(S)*n

QQ截图20230508221739

条件变量

条件变量的语义是:wait until

条件变量一定要先加锁而且必须是可以手动解锁的锁比如unique_lock而不能使用lock_guard。也可以用一个条件变量也可以用两个。核心是pthread_cond_wait。原理是首先调用方抢锁,然后发现需要等待,所以调用方会被阻塞(睡眠并加入等待队列),然后互斥锁解锁,让其余线程抢锁。其余线程如果抢到锁执行完了任务,然后就可以调用notify通知。通知后调用方的wait会重新加锁并唤醒当前进程(之后wait函数返回)。系统保证解锁并睡眠是原子操作。系统也保证加锁并唤醒是原子操作。

  • 为了防止虚假唤醒,必须要使用while而不能使用wait。

因为首先,我们的生产者和消费者线程全部都是wait同一个条件变量。我们在wait函数返回之前,当前进程必须执行 拿到锁–>加锁并唤醒 这两步。加锁并唤醒是原子的,但是并不一定能拿得到这个锁。假如我们消费者1在等待,然后生产者生产完毕,通知消费者。假如这个时候消费者2进来了,直接就拿了锁(因为生产者释放锁到wait函数拿锁这两步不是原子的。存在这种第三方插进来的情况)然后消费了生产的数据。然后释放锁。这时候我们消费者1才终于拿到锁了,但是这个时候抢到的锁在语义上可能并不是被原始生产者释放的锁,而是被其他消费者释放的锁。所以发现数据已经被消费了(或者是不满足某一条件),这样再去拿数据会有错误。所以必须用while。也就是使用判别式。

再详细解释一下第三方插进来的情况。其实这样解释并不准确。因为我们提到过,我们的生产者和消费者线程全部都是wait同一个条件变量。所以我们notify不一定是notify到具体哪一个生产者或者是哪个消费者。如果不使用while再次判断的话,就会发生错误。提到的不满足某一条件,比如队列为空等等。举个例子,此时两个消费者都卡在wait。生产者生产了一个,notify。这时候notify了一个消费者,消费者拿到了锁,消费过后队列为空。这时候再解锁,notify一个生产者,表示可以生产了。但是此时可能notify到了另一个消费者。因为消费者已经卡在wait了(已经判断过条件了),它拿到了锁。然后再想消费但是队列为空。这时候就错了。

QQ截图20230506004803

  • 注意条件变量的信号丢失问题。因为如果没有notify(),则wait()的线程永远不会唤醒。

看看例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::condition_variable cv;
std::mutex gMtx;

void Sender(){	
    std::unique_lock<std::mutex> lck(gMtx);
    std::cout << "Ready Send notification." << std::endl;
    lck.unlock(); //先解锁
    cv.notify_one();   // 发送通知
 }

void Receiver(){
    std::unique_lock<std::mutex> lck(gMtx);
    std::cout << "Wait for notification." << std::endl;
    cv.wait(lck);    // 等待通知并唤醒继续执行下面的指令
    std::cout << "Process." << std::endl;
}

int main(){
    std::thread sender(Sender);
    std::thread receiver(Receiver);
    sender.join();
    receiver.join();
    return 0;
}

线程随机启动导致的唤醒丢失,即:通信线程先启动并调用通知函数(notify_one),但是接收线程还没有开始执行等待(wait)函数,如果不再次调用函数通知,等待会一直持续下去。这个是最容易发现和验证的问题,上面的主线程中启动线程的顺序就会概率性出现唤醒丢失的问题。

解决方案也比较简单。也是搭配while和判断式。其实和上面解决虚假唤醒的道理一致。只不过要注意使用while

  • std::condition_variable::wait后面的判断式(谓词)的意思是,只要后面的谓词返回false,则前面无论如何都不会解锁。尽管可能已经被通知到。

判断式法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::condition_variable cv;
std::mutex gMtx;

bool send = false;
void Sender(){
    std::cout << "Ready Send notification." << std::endl;
    std::unique_lock<std::mutex> lck(gMtx);
    send = true;
    lck.unlock(); //先解锁
    cv.notify_one();   // 发送通知
}

void Receiver(){
    std::cout << "Wait for notification." << std::endl;
    std::unique_lock<std::mutex> lck(gMtx);
    cv.wait(lck, [](){return send;});    // 等待通知并唤醒继续执行下面的指令
    std::cout << "Process." << std::endl;
}

int main(){
    std::thread sender(Sender);
    std::thread receiver(Receiver);
    sender.join();
    receiver.join();
    return 0;
}
  • 上面的谓词可以等同于这种写法:
1
2
3
4
5
6
7
8
9
void Receiver1()
{
     std::cout << "Wait for notification." << std::endl;
     std::unique_lock<std::mutex> lck(gMtx);
     while(send == false){
        cv.wait(lck); // 等待通知并唤醒继续执行下面的指令
     }
     std::cout << "Process." << std::endl;
}

有没有发现和解决虚假唤醒的很像?都是while内有一个判别式,然后循环判断等待。

  • 这个方法可以解决唤醒丢失的原因:
    • 我们的问题在于sender先发送了notify,然后receiver才开始wait。导致丢失
    • 所以在使用上述方法后,就算sender先发送了nofity,但是此时send已经被sender变更为true。所以此时就算唤醒receiver的信号已经丢失,由于sendtrue,所以不会被阻塞。
  • 二者的解决方式都是while+判别式。
  • 为什么要先解锁,再通知?也就是先unlock(),再notify_one()
    • 语义上没什么区别,但是性能会有一点点提升。原因是如果先通知再解锁,在通知的那一刻,wait中的被通知线程会尝试获取锁,但是这时候通知线程还没有释放锁,所以被通知线程又会被阻塞一次然后睡眠,加入等待队列。如果是先释放锁,则不会发生这种情况。也就是它可能会直接获取锁成功。
      • 这时候可能会问,我要是先解锁不就有可能通知线程又抢了一次吗?没错,但是就算是后解锁也有可能发生这种情况。也就是所有情况下,都可能发生虚假唤醒。这种情况其实并不要紧。
    • 来自这里这里

简要分析源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
    volatile pthread_t self = thread_self();
    
    acquire(&cond->c_spinlock);// 加锁操作队列

    enqueue(&cond->c_waiting, self); // 插入条件的等待队列

    release(&cond->c_spinlock); // 操作完释放锁
    
    pthread_mutex_unlock(mutex); // 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
    
    suspend_with_cancellation(self); // 挂起,然后等待条件满足后被唤醒
	//上面这行和下面这行不是原子的,所以虚假唤醒发生在这里。
    pthread_mutex_lock(mutex); // 被唤醒后重新获取互斥锁
    /* This is a cancellation point */
    // 取消点,等待期间被取消了
    if (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE) {
        /* Remove ourselves from the waiting queue if we're still on it */
        acquire(&cond->c_spinlock);
        // 线程准备退出,从条件阻塞队列中移除
        remove_from_queue(&cond->c_waiting, self);
        release(&cond->c_spinlock);
        pthread_exit(PTHREAD_CANCELED);
    }
    return 0;
}
static inline void suspend_with_cancellation(pthread_t self)
{
  sigset_t mask;
  sigjmp_buf jmpbuf;
  // 获取当前的信号屏蔽码
  sigprocmask(SIG_SETMASK, NULL, &mask); /* Get current signal mask */
  // 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
  sigdelset(&mask, PTHREAD_SIG_RESTART); /* Unblock the restart signal */
  /* No need to save the signal mask, we'll restore it ourselves */
  /*
    直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
    收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
  */
  if (sigsetjmp(jmpbuf, 0) == 0) {
    self->p_cancel_jmp = &jmpbuf;
    // 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
    if (! (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE)) {
      do {
        // 挂起等待restart信号
        sigsuspend(&mask);               /* Wait for a signal */
      } while (self->p_signal != PTHREAD_SIG_RESTART);
    }
    self->p_cancel_jmp = NULL;
  } else {
    // 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
    sigaddset(&mask, PTHREAD_SIG_RESTART); /* Reblock the restart signal */
    sigprocmask(SIG_SETMASK, &mask, NULL);
  }
}
// 条件满足,唤醒线程
int pthread_cond_signal(pthread_cond_t *cond)
{
  pthread_t th;

  acquire(&cond->c_spinlock);
  th = dequeue(&cond->c_waiting); // 取出一个被被阻塞的线程
  release(&cond->c_spinlock);

  if (th != NULL) restart(th); // 发送信号唤醒他
  return 0;
}
// 给pid进程发送唤醒信号
static inline void restart(pthread_t th)
{
  kill(th->p_pid, PTHREAD_SIG_RESTART);
}

整理自这里

生产者消费者模型中,到底使用几个条件变量?

针对条件变量和生产者消费者模型到底应该用几个条件变量的问题,比较看情况。有可能生产者和消费者各有一个,也可能同时使用一个。为什么呢?

  • 这是一个非常不好解答的问题。因为这个和死锁息息相关。比如下面的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
std::condition_variable cvS;
std::condition_variable cvR;
std::condition_variable cv;
std::mutex gMtx;
int countm = 0;
void SenderS(){
    std::unique_lock<std::mutex> lck(gMtx);
    while(countm >= 1){
        std::cout << std::this_thread::get_id() <<"exceedin S" << std::endl;
        cv.wait(lck); //or cvS.wait(lck);
    }
    countm++;
    std::cout<< std::this_thread::get_id() << "S"<< countm << std::endl;
    lck.unlock();
    cv.notify_one();  //or cvR.notify_one(); or cv.notify_all();
}
void ReceiverS(){
    std::unique_lock<std::mutex> lck(gMtx);
    while(countm <= 0){
        std::cout << std::this_thread::get_id() <<"exceedin R" << std::endl;
        cv.wait(lck); //or cvR.wait(lck);
    }
    countm--;
    std::cout << std::this_thread::get_id() <<"R" << countm << std::endl;
    lck.unlock();
    cv.notify_one();  //or cvS.notify_one(); or cv.notify_all();
}
void Sender(){
    while(1){
        SenderS();
    }
}
void Receiver(){
    while(1){
        ReceiverS();
    }
}
int main(){
    std::thread sender1(Sender);
    std::thread receiver1(Receiver);
    std::thread sender2(Sender);
    std::thread receiver2(Receiver);
    sender1.join();
    receiver1.join();
    sender2.join();
    receiver2.join();
    return 0;
}
  • 上面的代码死锁了。为何会死锁?现在我们只使用1个条件变量cv,并且使用的是cv.notify_one()
    • 在极限情况下,假设现在有一个ReceiverS执行了,卡在cv.wait。然后这时候是第二个ReceiverS执行,也卡在cv.wait了。然后这时候只能从两个SenderS中选。其中一个SenderS成功执行,然后这时候非常不巧。notify的还是一个SenderS。然后卡在cv.wait。然后最后只剩下一个SenderS,又卡在cv.wait了。此时所有线程都卡在了cv.wait。死锁发生。
  • 但是为什么改成两个条件变量,也就是cvScvR就没事呢?
    • 假设现在有一个ReceiverS执行了,卡在cvR.wait。然后这时候是第二个ReceiverS执行,也卡在cvR.wait了。然后这时候只能从两个SenderS中选。其中一个SenderS成功执行,然后这时候notify一定且只能通知到等待cvR的线程,也就是ReceiverS。就算现在唤醒的这个ReceiverS没有成功抢锁,被一个SenderS抢到了,SenderS卡在cvS.wait了。然后此时又没有成功抢锁,第二个SenderS抢到了,SenderS也卡在cvS.wait了。不过没关系。这时候抢到锁的一定是仅剩的一个ReceiverS因为ReceiverS此时已经被唤醒,只是没抢到锁,只要能抢到锁程序就可以继续。然后此时notify一定且只能通知到等待cvS的线程,也就是SenderS。程序继续。
  • 或者改成cv.notify_all();
    • 这样可以确保唤醒所有等候的线程。因为这种情况死锁的核心是非常不巧的notify到了自己类型的线程。也就是发送者通知发送者,接收者通知接收者。

所以说,具体使用一个或两个不是那么简单就可以决定的。这是我的提问根据我的个人分析,如果使用一个条件变量,为了避免唤醒到错误的线程,则应该使用notify_all()唤醒所有等候线程。如果使用两个条件变量,则使用notify_one()唤醒对方的一个线程即可。

同时,具体的while判断的wait的条件也很重要。比如现在是两个接收者和两个发送者。如果不更改原有死锁代码,只把countm >= 1更改为countm >= 2便没有问题。但是,如果这时候又加了一个发送者,三个发送者的话此时又会死锁。所以为了根本性避免这个问题,就应该使用上一段提到的,自我总结的规则。因为一般来说,具体的while判断的wait的条件是不确定的,比如在任务队列场景下,具体应该大于几是可以更改的,同时,具体由几个线程也是可以更改的。但是。任务队列本身的逻辑不应该变动。

杂项

  • 注意,线程对象在创建时就已经启动。而非调用joindetach时启动,这里和异步API有些不同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void func(){
    cout<<"我的线程开始执行了"<<endl;
    //...
    cout<<"我的线程结束执行了"<<endl;
}

int main(){
    thread my_thread(func);
    Sleep(5000);
    my_thread.join();//等待子线程执行结束

    cout<<"abc"<<endl;
    return 0;
}
/*
我的线程开始执行了
我的线程结束执行了

然后睡眠直到时间到达。开始继续执行join
abc

*/

C++11原语和C++20原语的比较。来自这里

QQ截图20230506032808

QQ截图20230506032738

本文由作者按照 CC BY 4.0 进行授权