techniques of thread in c++, not just only in posix linux
如果不看 Futures 库, 整个 C++ 并发库, 就跟 pthread 没有啥区别. 如果加上Futures库, 整个和Boost又没有太多差别. 总之, 整个C++并发库貌似都是在炒冷饭的样子, 不是移植boost, 就是封装底层线程库(例如pthread), 没办法还是仔细说说吧.
关于Pthread, 可以参考我的博文: posix-thread
关于Boost, 可以参考我的博文: Boost总结
本文会花非常大的力气总结 C++11 并发库
引子
C+11中的实现基本和Boost一致, 只不过命名空间不一样以及涉及的头文件不一样:
- atomic
- mutex
- thread
- condition_variable
- future
并且相关效率也不一样, 一般认为: lock, atomic, spinlock三者的效率相当, 但是mutex效率低. (这也是你看到为啥和libevent相比, 不管是asio还是多线程框架, 基本讨不到偏移, 可能就是因为mutex的关系).
《C++ Concurrency In Action》
真本书非常推荐, 如果没有时间看完一本书, 那么直接看我说精华, 也不错.
正文
简介
首先c++, 或者boost 为了我们方便的使用线程, 做了很多封装, 什么gud之类的, 反而影响了我们对于线程范畴内最本质内容的理解:
- 线程的控制(创建,销毁/分离)
- 线程的同步(信号量(PV),互斥量,竞争/冒险条件,文件锁,屏障)
- 线程的属性
- 线程的调试(多线程其实是不好调试的)
根据 Cppreference 的说法, C++11并发库包含以下内容:
- atomic
该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数.
这个和boost是一致的, 不适用锁就能同步的原子操作 - thread
该头文件主要声明了 std::thread 类, 另外std::this_thread
命名空间也在该头文件中. - mutex
该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数 - condition_variable
该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any - future
该头文件主要声明了std::promise
,std::package_task
两个 Provider 类,以及std::future
和std::shared_future
两个 Future 类
另外还有一些与之相关的类型和函数,std::async()
函数就声明在此头文件中
其中最后一个future由于涉及到了异步任务, 相当于对于线程的封装(async). 异步任务可以参考我的文章 Asio
讲真, 从pthread切换过来, 没有感觉到幸福可能是因为pthread api用熟悉了, 但是不得不说c++并发库, 使得并发编程变得简单了.
直接上手一个demo:
可以看到老奸巨猾的c++11, 还是只给出了标准, 具体的实现, 要依赖系统平台的库.
也就是说, linux平台, 我们完全可以认为, c++ 并发库给出了统一的操作API, 实际上是封装了pthread操作(具体封装过程可以参考具体的头文件, 比如创建线程这个, 其实就是根据_type参数判断对应pthread_create()的哪种调用).
虽然只是封装, 但是, 确实精简了太多! 例如我要给线程执行函数传参, 看看下面是不是够简单的:
1 |
|
下面开始细说
Threads库
定义在 thread
头文件下, 该库就两大块儿
- thread类
- this_thread命名空间
thread类
主要说说他的构造器, 赋值函数(移动函数)等成员函数的使用.
构造器:
默认构造函数,创建一个空的 thread 执行对象
1
thread() noexcept;
初始化构造函数,创建一个 thread对象,该 thread对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出
1
2template <class Fn, class... Args>
explicit thread (Fn&& fn, Args&&... args);拷贝构造函数(被禁用),意味着 thread 不可被拷贝构造
1
thread (const thread&) = delete;
move 构造函数,move 构造函数,调用成功之后 x 不代表任何 thread 执行对象
1
thread (thread&& x) noexcept;
参考代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void f1(int n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread " << n << " executing\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void f2(int& n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 2 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int main()
{
int n = 0;
std::thread t1; // t1 is not a thread
std::thread t2(f1, n + 1); // pass by value
std::thread t3(f2, std::ref(n)); // pass by reference
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
t2.join();
t4.join();
std::cout << "Final value of n is " << n << '\n';
}
移动赋值函数
move 赋值操作,如果当前对象不可 joinable,传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则调用 terminate() .
1
thread& operator= (thread&& rhs) noexcept;
拷贝赋值操作被禁用,thread 对象不可被拷贝
1
thread& operator= (const thread&) = delete;
其他成员函数:
bool joinable() const noexcept; 检查某个线程是否可以被joinable (正在运行的可以被joinable)
代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void foo()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t;
std::cout << "before starting, joinable: " << t.joinable() << '\n';
t = std::thread(foo);
std::cout << "after starting, joinable: " << t.joinable() << '\n';
t.join();
std::cout << "after joining, joinable: " << t.joinable() << '\n';
}
/* 运行结果:
before starting, joinable: 0
after starting, joinable: 1
after joining, joinable: 0
*/最好把joinable理解成正在运行
, 后面说成员数join的时候, 还会用到.- std::thread::id get_id() const noexcept; 获取线程id
native_handle_type native_handle(); 返回一个线程handler, 用于实时调度
由于系统对于实时调度的支持不同, 所以下面的代码不一定成功.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
std::mutex iomutex;
void f(int num)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
sched_param sch;
int policy;
pthread_getschedparam(pthread_self(), &policy, &sch);
std::lock_guard<std::mutex> lk(iomutex);
std::cout << "Thread " << num << " is executing at priority "
<< sch.sched_priority << '\n';
}
int main()
{
std::thread t1(f, 1), t2(f, 2);
sched_param sch;
int policy;
pthread_getschedparam(t1.native_handle(), &policy, &sch);
sch.sched_priority = 20;
if (pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch)) {
std::cout << "Failed to setschedparam: " << std::strerror(errno) << '\n';
}
t1.join(); t2.join();
}static unsigned int hardware_concurrency() noexcept; 返回并发数的参考
1
2
3
4
5
6
7
8
9
10
11
int main()
{
//注意这是个静态方法
unsigned int n = std::thread::hardware_concurrency();
//结果和你的逻辑处理器processor个数保持一致, cat /proc/cpuinfo
//每个processor 启动一个硬件线程
std::cout << n << " concurrent threads are supported.\n";
}void join(); 阻塞等待回收其他线程; 只有本线程
joinable is false
, 即当前不在运行, 才可以回收其他线程, 否则出现自己等待回收自己, 那就是死锁了.void detach(); 主要是设置分离的线程自己调用detach方法, 必须当前线程在运行, 即joinable才可以.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void independentThread()
{
std::cout << "Starting concurrent thread.\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Exiting concurrent thread.\n";
}
void threadCaller()
{
std::cout << "Starting thread caller.\n";
std::thread t(independentThread);
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Exiting thread caller.\n";
}
int main()
{
threadCaller();
std::this_thread::sleep_for(std::chrono::seconds(5));
}void swap( thread& other ) noexcept; 交换线程对象所绑定的具体线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void foo()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void bar()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t1(foo);
std::thread t2(bar);
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
std::swap(t1, t2);
std::cout << "after std::swap(t1, t2):" << std::endl;
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
t1.swap(t2);
std::cout << "after t1.swap(t2):" << std::endl;
std::cout << "thread 1 id: " << t1.get_id() << std::endl;
std::cout << "thread 2 id: " << t2.get_id() << std::endl;
t1.join();
t2.join();
}
/*运行结果*/
thread 1 id: 1892
thread 2 id: 2584
after std::swap(t1, t2):
thread 1 id: 2584
thread 2 id: 1892
after t1.swap(t2):
thread 1 id: 1892
thread 2 id: 2584
this_thread
这个命名空间下, 主要说几个函数:
yield : void yield() noexcept; 让出cpu, 让其他线程执行.
是否阻塞让出cpu和操作系统的实现有关, 也就是说这只是个建议. For example, a first-in-first-out realtime scheduler (SCHED_FIFO in Linux) would suspend the current thread and put it on the back of the queue of the same-priority threads that are ready to run (and if there are no other threads at the same priority, yield has no effect). 样例代码可以看下面:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// "busy sleep" while suggesting that other threads run
// for a small amount of time
void little_sleep(std::chrono::microseconds us)
{
auto start = std::chrono::high_resolution_clock::now();
auto end = start + us;
do {
//使用的时候注意加上命名空间
std::this_thread::yield();
} while (std::chrono::high_resolution_clock::now() < end);
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
little_sleep(std::chrono::microseconds(100));
auto elapsed = std::chrono::high_resolution_clock::now() - start;
std::cout << "waited for "
<< std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
<< " microseconds\n";
}get_id : std::thread::id get_id() noexcept; 获取当前线程的id, 类型为 std::thread::id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::mutex g_display_mutex;
void foo()
{
std::thread::id this_id = std::this_thread::get_id();
g_display_mutex.lock();
std::cout << "thread " << this_id << " sleeping...\n";
g_display_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t1(foo);
std::thread t2(foo);
t1.join();
t2.join();
}sleep_for: 主动睡眠放弃cpu一定时间, 之后自动醒来.(由系统调度), 代码原型如下:
1
2template< class Rep, class Period > //Rep数字计数单位, Period时间单位
void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );基本使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main()
{
using namespace std::chrono_literals;
std::cout << "Hello waiter" << std::endl;
auto start = std::chrono::high_resolution_clock::now();
std::this_thread::sleep_for(2s);
auto end = std::chrono::high_resolution_clock::now();
// 注意持续的时间可以使用std::chrono::duration并提供单位
std::chrono::duration<double, std::milli> elapsed = end-start;
std::cout << "Waited " << elapsed.count() << " ms\n";
}sleep_until 睡眠到某个时间点
1
2template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );
std::this_thread下的这几个函数还是经常用到的:
- yield
- get_id
- sleep_for
- sleep_until
Mutex库
On Linux, it uses pthreads underlying. So it can be thought of pthreads wrapped in C++ style objects. 但是讲到mutex锁, 它提供了一种全新的使用方式.
mutex库, 主要是围绕 mutex 以及 lock 展开 (当然也包括它们的变形和封装体). 此外把call_once相关的内容也放入了头文件 mutex
这里.
先看下, 以前再Boost是怎么用的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
int globalVariable;
class Reader
{
public:
Reader(int waitTime) { _waitTime = waitTime;}
void operator() () {
for (int i=0; i < 10; i++) {
std::cout << "Reader Api: " << globalVariable << std::endl;
usleep(_waitTime);
}
return;
}
private:
int _waitTime;
};
class Writer
{
public:
Writer(int variable, int waitTime)
{
_writerVariable = variable;
_waitTime = waitTime;
}
void operator () () {
for (int i=0; i < 10; i++) {
usleep(_waitTime);
// Take lock and modify the global variable
boost::mutex::scoped_lock lock(_writerMutex);
globalVariable = _writerVariable;
_writerVariable++;
// since we have used scoped lock,
// it automatically unlocks on going out of scope
}
}
private:
int _writerVariable;
int _waitTime;
static boost::mutex _writerMutex;
};
boost::mutex
Writer::_writerMutex;
int main()
{
Reader reads(100);
Writer writes1(100, 200);
Writer writes2(200, 200);
boost::thread readerThread(reads);
boost::thread writerThread1(writes1);
usleep(100);
boost::thread writerThread2(writes2);
readerThread.join();
writerThread1.join();
writerThread2.join();
}
编译运行:1
2
3
4
5
6
7
8
9
10
11$ g++ -o boost_mutex boost_mutex.cpp -lboost_thread -lboost_system
$ ./boost_mutex
Reader Api: 100
Reader Api: 100
Reader Api: 200
Reader Api: 103
Reader Api: 104
Reader Api: 204
Reader Api: 205
Reader Api: 206
Reader Api: 207
但是加锁和解锁的过程, 用了一个 boost::mutex::scoped_lock lock(_writerMutex);
相当简单.
boost thread就说这么多, 下面接着说c++的mutex库.
mutex
首先要说的是, 在C++线程库, 很少有, 手动加锁然后解锁的这种传统流程(当然也可以这么做), 一般是使用手动加锁, 自动解锁的形式, 即 一般使用的是 lock_guard
, unique_lock
等包装类, 但是包装类也需要Mutex作为锁的基本. 所以还是要介绍一下 mutex 锁:
- mutex 基本的排它锁(类)
- timed_mutex 定时 Mutex (类)
- recursive_mutex 递归锁(可以重复上锁)
- recursive_timed_mutex 定时递归锁
当然也有共享锁:1
2
3#include <mutex>
shared_timed_mutex : provides shared mutual exclusion facility (C++14)
shared_mutex : provides shared mutual exclusion facility (C++17)
基本锁 mutex 类, 仅提供了传统的流程方法:
- lock : locks the mutex, blocks if the mutex is not available
- try_lock : tries to lock the mutex, returns if the mutex is not available
- unlock : unlocks the mutex
以try_lock为例, 案例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
std::chrono::milliseconds interval(100);
std::mutex mutex;
int job_shared = 0; //两个线程要修改的(共享)量
int job_exclusive = 0; // 两个线程要修改的量, 但不提供mutex保护
// this thread can modify both 'job_shared' and 'job_exclusive'
void job_1()
{
std::this_thread::sleep_for(interval); // let 'job_2' take a lock
while (true) { //使用 try_lock 离不开循环
// try to lock mutex to modify 'job_shared'
if (mutex.try_lock()) {
std::cout << "job shared (" << job_shared << ")\n";
mutex.unlock(); //能加上锁, 自然用完要解锁
return;
} else {
// can't get lock to modify 'job_shared'
// but there is some other work to do
++job_exclusive;
std::cout << "job exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
// this thread can modify only 'job_shared'
void job_2()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval);
++job_shared;
mutex.unlock();
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
}
其他额锁, 定时锁和递归锁(重复锁), 主要方法和mutex类相似, 但是稍稍做了改进, 比如定时锁, 如果一直拿不到锁, 这个线程要不要这么傻等着? 要等多久? 要等到某个时刻? 而递归锁则是为同一个线程反复加锁提供了方便, 里三层外三层, 只要是同一个线程, 可以多次对同一个锁进行加锁(解锁次数也要匹配, 不然会报错std::system_error), 但是如果别的线程已经拿到了锁, 别说加三层, 一层也加不了, 所以递归锁方便的是本线程自己; 和其他线程的互斥性不变.
定时锁多了两个方法:
- try_lock_for (public member function)
tries to lock the mutex, returns if the mutex has been unavailable for the specified timeout duration - try_lock_until
tries to lock the mutex, returns if the mutex has been unavailable until specified time point has been reached
(注意它们的返回值, 成功返回true, 失败返回false; 如果已经有所了, 还在try_lock, 那么其行为是未定义的, 看具体的平台具体怎么实现吧)
例如, try_lock_for: Tries to lock the mutex. Blocks until specified timeout_duration has elapsed or the lock is acquired, whichever comes first. On successful lock acquisition returns true, otherwise returns false.
给一个写的不是太好的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// control access to std::cout (非核心逻辑)
std::mutex cout_mutex;
//下面开辟的线程集合, 都在抢这个锁进行ostringstreams输出
std::timed_mutex mutex;
void job(int id)
{
using Ms = std::chrono::milliseconds;
std::ostringstream stream;
for (int i = 0; i < 3; ++i) {
if (mutex.try_lock_for(Ms(100))) {
stream << "success ";
std::this_thread::sleep_for(Ms(100));
mutex.unlock();
} else {
stream << "failed ";
}
std::this_thread::sleep_for(Ms(100));
}
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "[" << id << "] " << stream.str() << "\n";
}
int main()
{ //线程集合
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(job, i);
}
for (auto& i: threads) {
i.join();
}
}
再次强调一下:
一般很少会用一个Mutex自己去调用成员方法加锁的, 更多的使用全局加锁方法(针对多个锁, 同时或者顺序加锁) 或者 加锁的包装类比如lock_guard或者unique_lock等
全局加锁算法
本来上面的 mutex 类自身已经提供了lock 和 try_lock方法, 这里又针对不同锁(单个或者多个, 是否需要顺序加锁)提供了通用方法: (其实是对mutex.lock, unlock等的封装)
std::lock 阻塞加锁
locks specified mutexes, blocks if any are unavailable1
2template< class Lockable1, class Lockable2, class... LockableN >
void lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );std::try_lock 异步加锁
attempts to obtain ownership of mutexes via repeated calls to try_lock
这些加锁方法其实是去调用每种lockable对象, 即mutex自身的方法, 然后加锁, 并且不会死锁, 原因如下:
Locks the given Lockable objects lock1, lock2, …, lockn using a deadlock avoidance algorithm to avoid deadlock. The objects are locked by an unspecified series of calls to lock, try_lock, unlock. If a call to lock or unlock results in an exception, unlock is called for any locked objects before rethrowing.
简单解释就是, 好比你要连续加两个锁, 现在加完第一个锁, 正要加第二锁的时候, 被强占了cpu, 导致了第二所没有获取, 之后造成了死锁; 如果想要连续一次性加几个锁, 就要这个方法就对了.
但是通用加锁算法, 只提供了加锁, 没有提供解锁方法, 所以, 一般要配合 lock_guard
或者 unique_lock
这类加锁封装类(其实就是把锁作为资源, 用raii手法封装起来)来使用, 下面给出一个案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
struct Employee {
Employee(std::string id) : id(id) {}
std::string id;
std::vector<std::string> lunch_partners;
std::mutex m;
std::string output() const
{
std::string ret = "Employee " + id + " has lunch partners: ";
for( const auto& partner : lunch_partners )
ret += partner + " ";
return ret;
}
};
void send_mail(Employee &, Employee &)
{
// simulate a time-consuming messaging operation
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// 线程执行方法
// 某个Employee往自己的 lunch_partners 列表中添加人员的时候, 不允许其他线程同时操作.
// assign_lunch_partners 把相关的双方互相添加联系, 所以两个Employee都要加锁
void assign_lunch_partner(Employee &e1, Employee &e2)
{
/* 非核心逻辑
static std::mutex io_mutex;
// 确保cout过程不被其他线程打断, 可以连续输出
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;
}
*/
// use std::lock to acquire two locks without worrying about
// other calls to assign_lunch_partner deadlocking us
{
//不用检查返回值, 因为加不上锁, 它就一直傻等
std::lock(e1.m, e2.m);
//adopt_lock表示已经加锁了, 调用lock_guard时不必再去加锁了
std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);
std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);
// Equivalent code (if unique_locks are needed, e.g. for condition variables)
// std::unique_lock<std::mutex> lk1(e1.m, std::defer_lock);
// std::unique_lock<std::mutex> lk2(e2.m, std::defer_lock);
// std::lock(lk1, lk2);
/* 非核心逻辑
//同样确保输出过程不被打断
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;
}
*/
e1.lunch_partners.push_back(e2.id);
e2.lunch_partners.push_back(e1.id);
}
send_mail(e1, e2);
send_mail(e2, e1);
}
int main()
{
Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");
// assign in parallel threads because mailing users about lunch assignments
// takes a long time
std::vector<std::thread> threads;
threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));
threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));
for (auto &thread : threads) thread.join();
std::cout << alice.output() << '\n' << bob.output() << '\n'
<< christina.output() << '\n' << dave.output() << '\n';
}
但是如果不用封装类, lock_guard或者unique_lock, 就应该让每个lockable对象分别自己解锁.
std::try_lock也是类似的, 只不过相当于std::lock, 它是非阻塞的, 所以注意检查它的 返回值
:(拿不到锁, 它又不会傻等)1
2template< class Lockable1, class Lockable2, class... LockableN>
int try_lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn);
按你给定的顺序加锁, 全部成功, 返回-1; 如果哪一个失败就返回哪一个的索引(参数的索引从0开始, 即0-based). 通常仅当所有的加锁成功, 即返回-1的时候才停止try_lock尝试.
可以看下面的代码: (多个锁的 try_lock 实验)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
int main()
{
/*一个锁保护一个变量, 两个线程, 分别增加foo_count, bar_count*/
int foo_count = 0; std::mutex foo_count_mutex;
int bar_count = 0; std::mutex bar_count_mutex;
int overall_count = 0; //上面两个锁都拿到才修改总数
//主线程, 即main函数所在线程控制是否结束 done 标志, 多余的锁, 见下面分析
bool done = false; //std::mutex done_mutex;
//线程执行函数(用来修改 foo_count, bar_count)
auto increment = [](int &counter, std::mutex &m, const char *desc) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(m);
++counter;
std::cout << desc << ": " << counter << '\n';
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};
std::thread increment_foo(increment, std::ref(foo_count),
std::ref(foo_count_mutex), "foo");
std::thread increment_bar(increment, std::ref(bar_count),
std::ref(bar_count_mutex), "bar");
std::thread update_overall([&]() {
//本线程和main线程共同受到done标志的相应, 只是本线程只是检查, 并不修改.
// 所以下面弄什么 done_mutex其实是多余
//done_mutex.lock();
while (!done) {
//done_mutex.unlock();
int result = std::try_lock(foo_count_mutex, bar_count_mutex);
if (result == -1) { //两个锁我都拿到才修改overall_count
overall_count += foo_count + bar_count;
foo_count = 0;
bar_count = 0;
std::cout << "overall: " << overall_count << '\n';
foo_count_mutex.unlock();
bar_count_mutex.unlock();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
//done_mutex.lock();
}
//done_mutex.unlock();
});
increment_foo.join(); //foo_count 增加到10完毕, 回收线程
increment_bar.join(); //bar_count 增加到10完毕, 回收线程
//done_mutex.lock();
done = true;
//done_mutex.unlock();
update_overall.join();
std::cout << "Done processing\n"
<< "foo: " << foo_count << '\n'
<< "bar: " << bar_count << '\n'
<< "overall: " << overall_count << '\n';
}
lock类结构
BasicLockable 类型的对象只需满足两种操作,lock 和 unlock,
Lockable 类型,在 BasicLockable 类型的基础上新增了 try_lock 操作,因此一个满足 Lockable 的对象应支持三种操作:lock,unlock 和 try_lock;
TimedLockable 类型,在 Lockable 类型的基础上又新增了 try_lock_for 和 try_lock_until 2种操作,因此一个满足 TimedLockable 的对象应支持五种操作:lock, unlock, try_lock, try_lock_for, try_lock_until
lock包装类
C++11中单个锁的包装类, 构造的时候m.lock, 析构的时候自动m.unlock (其实就是把锁作为资源, 用raii手法封装起来):
std::lock_guard
方便线程对互斥量上锁(RAII 手法)的包装类
implements a strictly scope-based mutex ownership wrapperstd::unique_lock
方便线程对互斥量上锁, 但提供了更好的上锁和解锁控制的包装类
implements movable mutex ownership wrapper
(严格来说 std::unique_lock应该归类为BasicLockable类型, 即和mutex归为一类)
C++14和C++17中的包装类:
- shared_lock
implements movable shared mutex ownership wrapper - scoped_lock
deadlock-avoiding RAII wrapper for multiple mutexes
(主要使用 lock_guard
和 unique_lock
, 如果是boost, 那么几个就随便用了)
以lock_guard为例:
- explicit lock_guard( mutex_type& m );
- lock_guard( mutex_type& m, std::adopt_lock_t t ); //adopt_lock_t类型表示加锁策略
- lock_guard( const lock_guard& ) = delete; //不允许lock_guard间相互赋值
(移动构造/移动拷贝也没有)
如果这个线程还再运行, 并且拥有 mutx m, 那么调用lock_guard()
就会尝试去拿锁(拿不到阻塞等待). 但是下面的情况是危险的:
- 不是递归mutex, 却要用lock_guard取重复加锁
- 当前线程不存在了(停止或者被停止运行了), 还想调用lock_guard()
- 当前线程不拥有该mutex变量
(该方法会抛出异常, 一般是m.lock()产生的)
简单的使用案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int g_i = 0;
std::mutex g_i_mutex; // protects g_i
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// g_i_mutex is automatically released when lock
// goes out of scope
}
int main()
{
std::cout << __func__ << ": " << g_i << '\n';
std::thread t1(safe_increment);
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << __func__ << ": " << g_i << '\n';
}
lock_guard 只是简单的包装, 它简化了mutex加锁和解锁的过程(但是不维护和管理锁的生命周期), 更加强大的是, std::unique_lock, shared_lock, scoped_lock; C++11中只支持到了unique_lock.
下面说说 unique_lock()
unique_lock 是对 mutex 集合的封装, 新创建的 unique_lock 对象负责传入的 Mutex 对象的上锁和解锁操作, 并且是独占方式. unique_lock支持:
- 延迟加锁(先声明锁, 之后真正用的时候在加锁, 比如使用std::lock())
- time-constrained attempts at locking (定义异步加锁)
- recursive locking
- transfer of lock ownership (但是不支持拷贝, 只支持移动)
- use with condition variables
和 lock_guard 一样, std::unique_lock 对象也能保证在其自身析构时它所管理的 Mutex 对象能够被正确地解锁(即使没有显式地调用 unlock 函数). 这也是一种简单而又安全的上锁和解锁方式, 尤其是在程序抛出异常后先前已被上锁的 Mutex 对象可以正确进行解锁操作, 极大地简化了程序员编写与 Mutex 相关的异常处理代码. (unique_lock 对象同样也不负责管理 Mutex 对象的生命周期)
下面是延迟锁定的简单使用案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct Box {
explicit Box(int num) : num_things{num} {}
int num_things;
std::mutex m;
};
void transfer(Box &from, Box &to, int num)
{
// don't actually take the locks yet
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// lock both unique_locks without deadlock
std::lock(lock1, lock2);
from.num_things -= num;
to.num_things += num;
// 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
}
int main()
{
Box acc1(100);
Box acc2(50);
std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);
t1.join();
t2.join();
}
但 unique_lock 给程序员提供了更多的自由, unique_lock 作为一个TimedLockable对象, 支持5种主要操作: lock, unlock, try_lock, try_lock_for, try_lock_until. 主要构造器, 大致上如下:
- unique_lock() noexcept; //默认构造器
- explicit unique_lock(mutex_type& m); //对应basic mutex, 即含有lock和unlock操作
- unique_lock(mutex_type& m, try_to_lock_t tag); //在上面的基础上增加了try_lock方法
- unique_lock(mutex_type& m, defer_lock_t tag) noexcept; //延迟绑定mutex
- unique_lock(mutex_type& m, adopt_lock_t tag); //中途收养已经加锁的mutex
- template
unique_lock(mutex_type& m, const chrono::duration& rel_time); //带有计时器的mutex的封装, 相当于lock_for - template
unique_lock(mutex_type& m, const chrono::time_point& abs_time); //相当于lock_until - unique_lock(const unique_lock&) = delete; //禁止拷贝
- unique_lock(unique_lock&& x); //移动转移所有权
如果被赋值的对象之前已经获得了它所管理的 Mutex 对象的锁(即已经上锁), 则在移动赋值(move assignment)之前会调用 unlock 函数释放它所占有的锁
主要是用在, 创建 unique_lock 的时候不指定 mutex 的情况.1
2std::unique_lock<std::mutex> lck; // default-constructed
lck = std::unique_lock<std::mutex>(mtx); // move-assigned
其他成员函数, 根据 参考手册 的分类, 应该如下:
- 锁操作类
- lock
- try_lock, try_lock_for, try_lock_until
- unlock
- 所有权类
- swap 交换 unique_lock 所关联的 mutex
- release 释放所有权(返回指向它所管理的 Mutex 对象的指针并释放所有权)
- 判别类(只读)
- owns_lock(返回当前 std::unique_lock 对象是否获得了锁, 不仅仅是检查是否关联了 mutex ), 已经加锁则返回true
- operator bool (和上面作用一样, 用于判断条件语句中直接使用对象进行判断)
- mutex 直接返回相关联的 mutex 的指针
但是注意, 如果你要后面自己调用锁操作相关的成员方法, 主要是指加锁操作, (不手动调用unlock, 在作用域结束的时候也会自动调用), 那么初始化 unique_lock的时候, 必须制定为延迟绑定:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33//std::unique_lock<std::mutex> lck (mtx,std::defer_lock);
//线程执行方法
void print_thread_id (int id) {
std::unique_lock<std::mutex> lck (mtx,std::defer_lock);
// critical section (exclusive access to std::cout signaled by locking lck):
lck.lock();
std::cout << "thread #" << id << '\n';
lck.unlock(); //不手动调用也会自动调用
}
//线程执行方法
void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::defer_lock);
// print '*' if successfully locked, 'x' otherwise:
if (lck.try_lock())
std::cout << '*';
else
std::cout << 'x';
}
//线程执行方法
void fireworks () {
std::unique_lock<std::timed_mutex> lck(mtx, std::defer_lock);
// waiting to get a lock: each thread prints "-" every 200ms:
while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
}
当然, 如果你使用 std::try_to_lock
这种参数, 那么初始化的时候, 就会尝试加锁了:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::mutex mtx; // mutex for critical section
void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::try_to_lock);
// print '*' if successfully locked, 'x' otherwise:
if (lck.owns_lock()) // 等价于 if(lck)
std::cout << '*';
else
std::cout << 'x';
}
int main ()
{
std::vector<std::thread> threads;
for (int i=0; i<10; ++i)
threads.emplace_back(std::thread(print_star));
for (auto& x: threads) x.join();
return 0;
}
仔细一看, unique_lock 比 lock_guard 灵活不少, 但是lock_guard设计目的明显而又简单.
lock_t
上面说 std::lock_guard, std::scoped_lock, std::unique_lock, and std::shared_lock 的时候, 已经涉及到这个结构体了, 代表上锁的策略( tag type used to specify locking strategy).
它们的用法不同:
- defer_lock_t do not acquire ownership of the mutex (延后获取, 真正加锁的时候才获取所有权; 构造只是声明以后的联系)
- try_to_lock_t try to acquire ownership of the mutex without blocking
- adopt_lock_t assume the calling thread already has ownership of the mutex
它们都可以作为参数传入给 unique_lock 或 lock_guard 的构造函数, 但是具体意义是不同的.
一个案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct bank_account {
explicit bank_account(int balance) : balance(balance) {}
int balance;
std::mutex m;
};
void transfer(bank_account &from, bank_account &to, int amount)
{
// lock both mutexes without deadlock
std::lock(from.m, to.m);
// make sure both already-locked mutexes are unlocked at the end of scope
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);
// 上面的代码等价于(equivalent approach): //derfer_lock表明自己手动加锁
// std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
// std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// std::lock(lock1, lock2);
from.balance -= amount;
to.balance += amount;
}
int main()
{
bank_account my_account(100);
bank_account your_account(50);
std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10);
std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5);
t1.join();
t2.join();
}
call_once
在pthreads中是这样用的1
2
3
4
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));
为了确保某些变量只在多线程环境中初始化一次, 要求 once_control
初始化指定的值, 而全部的这个flag是由库维护的保证了其互斥性. (c++中也可以用aotomic进行代替)
在C++这个并发库中, 类似的使用 std::call_once
来解决:
1 | template< class Function, class... Args > |
注意这个函数时会抛出异常的(当f运行出错时): std::system_error if any condition prevents calls to call_once from executing as specified any exception thrown by f .
按照c++标准的介绍, 应该是如果第一次调用没有成功的话,那么第二次还会继续调用,一次类推直到调用成功为止。(实际上, 根据不同的实现, 表现结果也有一点儿差别)
简单的使用案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static std::vector<std::string> staticData;
std::vector<std::string> initializeStaticData ()
{
std::vector<std::string> vec;
vec.push_back ("initialize");
return vec; //值拷贝
}
void foo()
{
static std::once_flag oc;
std::call_once(oc, [] { staticData = initializeStaticData ();});
}
在本例中是系统自动初始化它(实际上onc_flag类的内部状态在调用call_once时采取设置, 不用担心), 并且注意std::once_flag is neither copyable nor movable.
但是上面的案例是不规范的, 说过的, 它是会抛出异常的, 下面看一个可能抛出异常的案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
std::once_flag flag1, flag2;
void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // this may appear more than once
throw std::exception();
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}
void do_once(bool do_throw)
{
std::cout << "pid = " << std::this_thread::get_id() << "\n";
try {
//可能抛出异常
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {
//do nothing
std::cout << "unlock yes ?" << "\n";
}
}
int main()
{
//可惜只执行一次
// std::thread st1(simple_do_once);
// std::thread st2(simple_do_once);
// std::thread st3(simple_do_once);
// std::thread st4(simple_do_once);
// st1.join();
// st2.join();
// st3.join();
// st4.join();
//std::thread t1(do_once, true);
//std::thread t2(do_once, true);
std::thread t3(do_once, false); //直到不再抛出异常, 才算执行了一次
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
}
本质上来说, 异常和线程pthread的库在内核的futex同步机制上(内核互斥对象处理上), 貌似没有做的很好; 所以如果你不注释掉相应的行1
2//std::thread t1(do_once, true);
//std::thread t2(do_once, true);
就可能产生死锁, 运行类似下面的结果:1
2
3
4
5
6pid = pid = 140252808775424
pid = 140252800382720
throw: call_once will retry
140252817168128
unlock yes ?
pid = 140252791990016
查看线程堆栈, 发现3个线程在等待内核的互斥对象:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Thread 4 (Thread 0x7fe23bfff700 (LWP 8588)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 futex_wait_simple (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/nptl/futex-internal.h:135
#2 __pthread_once_slow (once_control=0xcb536311d8 <flag2>, init_routine=0x7fe244f293c0 <__once_proxy>) at pthread_once.c:105
0x000000cb5342e3b1 in do_once (do_throw=true) at callonce.cpp:26
Thread 3 (Thread 0x7fe243514700 (LWP 8587)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 futex_wait_simple (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/nptl/futex-internal.h:135
#2 __pthread_once_slow (once_control=0xcb536311d8 <flag2>, init_routine=0x7fe244f293c0 <__once_proxy>) at pthread_once.c:105
0x000000cb5342e3b1 in do_once (do_throw=false) at callonce.cpp:26
Thread 2 (Thread 0x7fe243d65700 (LWP 8586)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 fusimple (private=0, expected=1, rnal.h:135
#2 __nce_slow (once_control=0xcb5363thread_once.c:105
0x342e3b1 in do_once (do_throw=true) at callonce.cpp:26
//还有一个已经跑完的,但是没有释放锁的线程
Threadd 0x7fe245605740 (LWP 8584)):
#0 0x51fa67d in pthread_join (threadid=140609777456896, thread_return=0x0) at pthread_join.c:90
#1 0x4f2a397 in std::thread::join() () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#2 0x342e48b in main () at callonce.cpp:51
但是如果 t3
最先执行, 那么最好不过了, 可以直接运行结束, 而不会产生死锁(你多运行几次)1
2
3
4
5= pid = 139844931725056
Didn't throw, call_once will not attempt again
139844923332352
pid = 139844940117760
pid = 139844948510464
如果你涉及到异常, call_once可以很好的用于初始化(拿到一个实例)一次, 例如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class X {
private:
mutable std::once_flag initDataFlag;
void initData ()
{
_data = "init";
}
std::string _data;
public:
std::string& getData () {
std::call_once (initDataFlag, &X::initData, this);
return _data;
}
};
如果返回相关实例的话, 这就是一个单例啊.
condition_variable库
该库的头文件如下:1
< condition_variable >
这个里面主要包括2个类和1个函数:
- condition_variable 类
- condition_variable_any 类
- notify_all_at_thread_exit 函数
额外的还有一个枚举类型 cv_status
配合mutex或者mutex的包装类使用, 个人感觉比 pthreads 要简单一些.
与 std::condition_variable 类似, 只不过 std::condition_variable_any 的 wait 函数可以接受任何 lockable 参数(它就是一个类模板), 而 std::condition_variable 只能接受 std::unique_lockstd::condition_variable
.
看下api, 大致也就和pthreads的 pthread_cond_*()
用法类似, 可能名字不太一样.
大致把成员函数分类一下:
- wait系列 (条件不满足&拿不到锁, 等待)
- wait
- wait_until
- wait_for
- notify系列
- notify_one
- notify_all
- 构造系列 (条件变量和锁绑定, 相互复制没有意义)
- condition_variable();
- condition_variable(const condition_variable&) = delete;
为什么条件变量要初始化要绑锁?
因为给下属放权很重要,cv判断条件是否满足从而决定是否竞争锁,这期间cv需要有主动放锁和主动加锁的权利.
std::condition_variable 对象通常使用 std::unique_lock
好吧, 我强调一下:
与条件变量搭配使用的锁必须是 unique_lock, 不能用 lock_guard.
直接看一下案例代码:
1 |
|
至于先放锁,再通知别人,还是先通知别人再放锁, 再pthread里面讨论过, 不在多说.
一般只有两个线程, 才会用到 notify_one
, 因为”通知”非彼即此.
(即使接到通知了也不一定能抢到锁, 还是要看操作系统的调度策略, 比如优先级高的先来之类的)
下面说说 wait 系列的函数.
- void wait (unique_lock
& lck); //无条件放锁阻塞等待 - template
void wait (unique_lock& lck, Predicate pred); //自带预判条件
它俩的区别是? 第二种情况设置了前置条件, 它相当于:1
2
3while (!pred()) {
cv.wait(lock);
}
注意 pred 是一个为此函数, 例如lambda表达式之类的 []{return i == 1;}
, 当然你也可以把它写成函数:1
2
3bool equalsOne() {
return i == 1;
}
下面有一个简单的案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
int i = 0;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "Waiting... \n";
cv.wait(lk, []{return i == 1;});
std::cerr << "...finished waiting. i == 1\n";
}
void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}
int main()
{
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
todo
Futures库
概述
当时学习 boost 的时候, 这一篇是直接一带而过的, 因为平常都没有怎么用到这个库.这个库和异步任务(简单理解成单独封装&运行在独立线程中的任务)有关, 和线程间传递数据(当然包括共享数据–即同步)有关. Futures本身又是指期货, 未来交易的一种约定&协议.
怎么样理解这个库呢? Wiki上面给了很好的解释:
In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is yet incomplete.
联系实际编程实体, 其实就很好理解了: 线程本身运行是互不干扰的, 虽然它们可以互相的共享一些资源. 那么两个独立运行的实体, 怎么通信交流呢? 就通过约定或者协议来了, 交流呢就通过promise对象来, 约定我这个线程的执行结果存储在一个叫做 futures的实体里, 另外的线程体执行体, 你们要结果先同步等待着(这个同步可能被封装起来, 你看不到它们显示的在wait或者被mutex挡住), 之后有了值就去取(而不是在那傻等) 所以这个过程是异步过程(可能有个线程在同步等待, 比如主线程).
并且这种约定, 通常是运行之前就要约定好, 我这个线程里面跑的是个什么任务(package_task), 我运行期来之后, 别的线程要交流就通过 promise (我给promise写值), 这个代理对象, 我没有给它值之前, 别的需要从这个对象获取值的其他线程就同步等待, 而且只能通过与promise关联的futures获取, 之后我这个线程运行的任务结束了, 有需要结果, 但是又在运行中不能傻等在那儿等结果的线程, 可以异步的从futures得到结果(当然我结束了给一个回调通知也是可以的?).
如何匹配future/promise对呢? (建立相关的关联) 一个在我的线程, 另一个在别的啥线程中么?
既然 future 和 promise 可以被到处移动(不是拷贝), 那么可能性就挺多的.
最普遍的情况是父子线程配对形式, 父线程用future获取子线程promise返回的值. 在这种情况下, 使用async()是很优雅的方法.
如果还是没有听明白, 直接看代码吧, 这段代码是这么个意思:
一个double数组求和的任务, 我(主线程) 分别交给两个子线程去求和(一个求前半个数组的和, 一个求后半个数组的和), 之后分别返回后, 我主线程在完成最后的求和工作.
大致代码如下: (同步&协作)
1 | double accumulate_all(vector& v) |
packaged_task
提供了启动任务的简单方法(但是并没有单独开辟线程, 还是在本线程内执行的). 特别是它处理好了 future 和 promise 的关联关系, 同时提供了包装代码(就是put方法)以保证返回值/异常可以放到 promise 中.
总结:
C++11并发库提供了 future 和 promise 来简化任务线程间的传值(返回值)操作; 同时为启动任务提供了packaged_task以方便的封装(也可以通过 std::packaged_task 拿到future, 获取future_status, 但是想put或者set还是需要promise).
其中的关键点是允许2个任务间使用无(显式)锁的方式进行值传递(标准库帮你高效的封装好了).
async()基本实现思路: 当一个任务需要另外一个线程(启动它的线程)返回值时, 它把这个值放到promise中. 之后这个返回值会出现在和此 promise 关联的 future 中. 于是另外线程就能读到返回值, 从futures中读取(当然你最好先判断一下 futures_status 为 ready 状态).
provider
该部分主要说:
- std::promise
- std::package_task
以及 provider 相关的函数 std::async() 和 std::launch.
下面先说 promise.
promise的主要目的是提供一个”put”(也能”get”)操作以和 future 的 get() 对应, 在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联, 并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值. promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值. 可以通过 get_future 来获取与该 promise 对象相关联的 future 对象, 并且用到其set方法:1
2
3
4
5
6
7std::future<T> get_future();
//根据声明时promise的参数不同, 有不同的setter
void set_value( const R& value );
void set_value( R&& value );
void set_value( R& value );
void set_value();
promise为future传递的结果类型有2种: (set_xxx)
- 传一个普通值 (set_value, set_value_at_thread_exit)
- 抛出一个异常 (set_exception, set_exception_at_thread_exit)
基本用法不同:1
2
3
4
5
6
7
8try {
X res;
// compute a value for res
p.set_value(res);
}
catch (...) { // oops: couldn't compute res
p.set_exception(std::current_exception());
}
关于 get_future:
返回的 future 对象可以访问由 promise 对象设置在共享状态上的值或者某个异常对象。只能从 promise 共享状态获取一个 future 对象。如果关联了 future 的promise不设置值或者异常, 那么 promise对象在析构时会自动地设置一个 future_error 异常(broken_promise). 设置promise的值&异常,此后 promise 的共享状态标志变为 ready.
下面有一个简单的例子: (主线程通过promise设置值, 让其他线程通过相关联的future去获取值)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void print_int(std::future<int>& fut) {
int x = fut.get(); // 获取共享状态的值.
std::cout << "value: " << x << '\n'; // 打印 value: 10.
}
int main ()
{
std::promise<int> prom; // 生成一个 std::promise<int> 对象.
std::future<int> fut = prom.get_future(); // 和 future 关联.
std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.
prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
t.join();
return 0;
}
设置异常的具体例子: (线程1从终端接收一个整数, 线程2将该整数打印出来, 如果线程1接收一个非整数, 则为 promise 设置一个异常(failbit), 线程2 在std::future::get 是抛出该异常)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void get_int(std::promise<int>& prom) {
int x;
std::cout << "Please, enter an integer value: ";
std::cin.exceptions (std::ios::failbit); // throw on failbit
try {
std::cin >> x; // sets failbit if input is not int
prom.set_value(x);
} catch (std::exception&) { //cin拿到的不是整数时
prom.set_exception(std::current_exception());
}
}
void print_int(std::future<int>& fut) {
try {
int x = fut.get();
std::cout << "value: " << x << '\n';
} catch (std::exception& e) {
std::cout << "[exception caught: " << e.what() << "]\n";
}
}
int main ()
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread th1(get_int, std::ref(prom));
std::thread th2(print_int, std::ref(fut));
th1.join();
th2.join();
return 0;
}
那么 void set_value()
是怎么回事?
如果不设置值的话(虽然还是会自动设置一个异常, 然后promise的状态还是ready状态), 那就起到通知(notify)等待从future拿到值的线程, 例如下面的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73#include <thread>
#include <future>
#include <cctype>
#include <vector>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <sstream>
int main()
{
std::istringstream iss_numbers{"3 4 1 42 23 -23 93 2 -289 93"};
std::istringstream iss_letters{" a 23 b,e a2 k k?a;si,ksa c"};
//其他线程把上面流的内容放入 这些容器中
std::vector<int> numbers;
std::vector<char> letters;
std::promise<void> numbers_promise, letters_promise;
//主线程纯碎只是要获取其他线程是否完成的状态
auto numbers_ready = numbers_promise.get_future();
auto letter_ready = letters_promise.get_future();
//子线程用xxx_promise只是传递ready状态
std::thread value_reader([&]
{
// I/O operations.
std::copy(std::istream_iterator<int>{iss_numbers},
std::istream_iterator<int>{},
std::back_inserter(numbers));
//Notify for numbers.
numbers_promise.set_value(); //仅仅是只是传递ready状态
std::copy_if(std::istreambuf_iterator<char>{iss_letters},
std::istreambuf_iterator<char>{},
std::back_inserter(letters),
::isalpha);
//Notify for letters.
letters_promise.set_value();
});
//主线程numbers future 阻塞wait
numbers_ready.wait();
//numbers容器被填充完毕了
std::sort(numbers.begin(), numbers.end());
/* //非核心逻辑
if (letter_ready.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
//output the numbers while letters are being obtained.
for (int num : numbers) std::cout << num << ' ';
numbers.clear(); //Numbers were already printed.
}
*/
//阻塞等待
letter_ready.wait();
std::sort(letters.begin(), letters.end());
//If numbers were already printed, it does nothing.
for (int num : numbers) std::cout << num << ' ';
std::cout << '\n';
for (char let : letters) std::cout << let << ' ';
std::cout << '\n';
value_reader.join();
}
set_xxx_at_thread_exit
这类比较特殊, 拥有promise的线程, 真正set是在线程结束的时候才去set(延后设置), 可想而知, 对方等待的线程会一直等待结果.(但是重复调用set会报错). The state is made ready when the current thread exits, after all variables with thread-local storage duration have been destroyed. 下面有个简单的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int main()
{
using namespace std::chrono_literals;
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread([&p] {
std::this_thread::sleep_for(1s);
p.set_value_at_thread_exit(9);
}).detach();
std::cout << "Waiting..." << std::flush;
f.wait();
std::cout << "Done!\nResult is: " << f.get() << '\n';
}
补充说明一下其构造方法中也是禁用了拷贝构造函数的, 只保留了移动构造:
- promise(); //默认构造函数,初始化一个空的共享状态。
- template
promise
(allocator_arg_t aa, const Alloc& alloc); //和默认类似, 但是可以自定指定分配器 - promise (const promise&) = delete; //禁止拷贝构造
- promise (promise&& x) noexcept; //移动构造
移动构造就是为了转移所有权, 例如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::promise<int> prom; //下面接收不同的promise实例的所有权
void print_global_promise () {
std::future<int> fut = prom.get_future();
int x = fut.get();
std::cout << "value: " << x << '\n';
}
int main ()
{
std::thread th1(print_global_promise);
prom.set_value(10);
th1.join();
prom = std::promise<int>(); // prom 被move赋值为一个新的 promise 对象.
std::thread th2 (print_global_promise);
prom.set_value (20);
th2.join();
return 0;
}
下面记录一下 std::packaged_task
std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果). 也就是说 std::packaged_task 包含了两个最基本要素:
- 被包装的任务(stored task),任务(task)是一个可调用的对象,如函数指针、成员函数指针或者函数对象
- 共享状态(shared state),用于保存任务的返回值,可以通过 std::future 对象来达到异步访问共享状态的效果
std::packaged_task 对象是异步 Provider, 它在某一时刻通过调用被包装的任务来设置共享状态的值.
通俗的说, 它和 std::function (function, lambda, bind , functor)一样, 可以作为一个可调用对象容器. 但不同的是它多了两样重要功能, 一是异步调用, 二是它保存返回值或者异常(别的线程是可以通过std::future对象获取到的)
std::packaged_task 的 operator()
函数 和 thread对象包装一个方法有什么不同:
A successful call to operator() synchronizes with a call to any member function of a std::future
实际上, 你可以像上面说promise的时候一样, 直接绕过 packaged_task, 而采用普通的启动线程的方式, 还是可以从promise关联的future中拿到结果.
但是直接执行调用:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
void task_lambda()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
void task_bind()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();
task();
std::cout << "task_bind:\t" << result.get() << '\n';
}
void task_thread()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main()
{
std::cout << "main thread id : "
<< std::this_thread::get_id() << std::endl;
task_lambda();
task_bind();
task_thread();
return 0;
}
运行结果是:1
2
3
4
5
6
7main thread id : 140384054449984
thread id : 139700016576320
task_lambda: 512
thread id : 139700016576320
task_bind: 2048
thread id : 139700016576320
task_thread: 1024
是的, 可以看到其实没有开辟新的线程, 仅仅是进行了调用, 要想在另外的线程执行, 还是要借助 thread:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Finished!\n";
return from - to;
}
int main ()
{
std::packaged_task<int(int,int)> task(countdown); // 设置 packaged_task
std::future<int> ret = task.get_future(); // 获得与 packaged_task 共享状态相关联的 future 对象.
std::thread th(std::move(task), 10, 0); //创建一个新线程完成计数任务.
int value = ret.get(); // 等待任务完成并获取结果.
std::cout << "The countdown lasted for " << value << " seconds.\n";
th.join();
return 0;
}
用 packaged_task 直接拿到future, 而不是借助promise才能拿到;
(packaged_task 和 promise 同为 provide 类, 但是要设置值, 还是需要 promise 类对象)
实际上调用传参上面也有一点儿区别: 参考 cplusplus网站
- If the stored task is a function pointer or a function object, it is called forwarding the arguments to the call.
- If the stored task is a pointer to a non-static member function, it is called using the first argument as the object on which the member is called (this may either be an object, a reference, or a pointer to it), and the remaining arguments are forwarded as arguments for the member function.
- If it is a pointer to a non-static data member, it should be called with a single argument, and the function stores in the shared state a reference to that member of its argument (the argument may either be an object, a reference, or a pointer to it).
翻译一下就是:
- 如果被包装的任务是函数指针或者函数对象,调用 std::packaged_task::operator() 只是将参数传递给被包装的对象。
- 如果被包装的任务是指向类的非静态成员函数的指针,那么 std::packaged_task::operator() 的第一个参数应该指定为成员函数被调用的那个对象,剩余的参数作为该成员函数的参数。
- 如果被包装的任务是指向类的非静态成员变量,那么 std::packaged_task::operator() 只允许单个参数。
(其实和 std::function 类似)
关于 packaged_task 执行结果中, 还有两个重要的方法:
reset()
: resets the state abandoning any stored results of previous executionsmake_ready_at_thread_exit
: executes the function ensuring that the result is ready only once the current thread exits
案例如下:
(看到所在线程退出后, future 才进入ready状态)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void worker(std::future<void>& output)
{
std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };
auto result = my_task.get_future();
bool done = false;
my_task.make_ready_at_thread_exit(done); // execute task right away
std::cout << "worker: done = " << std::boolalpha << done << std::endl;
auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::timeout)
std::cout << "worker: result is not ready yet" << std::endl;
output = std::move(result);
}
int main()
{
std::future<void> result;
std::thread{worker, std::ref(result)}.join();
//检查 future的状态 : std::future_status 类型
auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready)
std::cout << "main: result is ready" << std::endl;
}
而 reset 则是重置状态和存储的值, 实现方式其实是检查包装的任务:
即*this = packaged_task(std::move(f))
, 使用案例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int main()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "2^9 = " << result.get() << '\n';
task.reset();
result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "2^10 = " << result.get() << '\n';
}
其他需要注意的是, packaged_task 是禁止了拷贝语义, 只有移动语义, 所以简单多用 std::move 进行再次包装:1
2packaged_task& operator=( const packaged_task& ) = delete;
packaged_task& operator=( packaged_task&& rhs ) noexcept;
移动之后, 原来的 packaged_task 对象也讲失去 state和包装的方法, 此时需要用valid
成员方法检测有效性:1
bool valid() const noexcept;
成员方法 和 std::swap
作用一样, 都是 交换 staes 和包装的方法:1
2
3
4
5void swap( packaged_task& other ) noexcept;
template< class Function, class... Args >
void swap( packaged_task<Function(Args...)> &lhs,
packaged_task<Function(Args...)> &rhs ) noexcept;
最后 packaged_task 只是封装了任务, 并没有说是不是同一线程执行, 或者另外的线程单独执行; 如果要想另外的线程单独执行, 其他线程异步获取结果, 可以再用一个封装, std::async. (std:async 通常配合 std::launch
枚举一起使用)
btw: std::async 和 std::launch 都定义在 future 头文件中.
future
该部分主要涉及:
- std::future
- std::shared_future
future 对象可以异步返回(拿到)共享状态的值, 或者在必要的情况下阻塞调用者(即拿到并使用future的线程)并等待共享状态标志变为 ready, 然后才能获取共享状态的值.
标准库中提供了3种future:
- 普通future
- shared_future(用于复杂场合)
- atomic_future (暂时不说)
其实普通 future 它已经完全够用了.
如果我们有一个future f,通过get()可以获得它的值:1
2// if necessary wait for the value to get computed
X v = f.get();
如果它的返回值还没有到达,调用线程会进行阻塞等待. 等待超时怎么办? get()会抛出异常的(从标准库或等待的线程那个线程中抛出).
如果我们不需要等待返回值(非阻塞方式),可以简单询问一下future,看返回值是否已经到达:1
2
3
4
5
6
7
8
9//换成while的话, 就是直接返回, 然后进行下一次询问
if (f.wait_for(0))
{
// there is a value to get()
// do something
} else
{
// do something else
}
一般从构造函数拿到的future都是无效的, 除非接收了有效future对象的所有权(即 move 拿到的别人的).1
2
3
4
5
6
7future() noexcept; //默认构造
future (future&& x) noexcept;
future (const future&) = delete;// 禁用拷贝语义
//实例:
std::future<int> fut; // 默认构造函数
fut = std::async(do_some_task); // move-赋值操作。
一般可以从三种途径获取:
- provider
- promise 的成员函数 get_future , 但是一般promise都是共享的(全局的), 两个线程都可见
- packaged_task 的成员函数 get_future , 因为启动 task的线程(姑且称为父线程), 它是拥有task的实例的, 这个线程想拿到异步调用的结果, 就完全可以从task对象要future对象.
- std::async 函数的返回值(这个就是对标准异步任务流程的封装, 可以简单理解成跑在别的线程里面的 packaged_task , 所以也是可以拿到future对象的)
如果不是默认构造&上面三种途径接收的到的(move), 那么在调用 get
或者 share
之前, 最好检查一下, 用valid
成员函数:1
2
3
4//如果这个future有保存shared status,
//那么就返回true, 表示可以用(get(), share())
//否则返回false
bool valid() const noexcept;
还有一个重要原因要检查 valid
, 可能是 share
, 即 std::future::share()
:
返回一个 std::shared_future 对象(本文后续内容将介绍 std::shared_future), 调用该函数之后, 该 std::future 对象本身已经不和任何共享状态相关联, 因此该 std::future 的状态不再是 valid 的了, valid() == false
那为什么需要 shared_future ? 因为原来的 std::future
当你get一次共享状态(status)值或者存储的异常之后, 它就失效了, 再解引用或者参考它的值的行为是未定义的, 通俗说, 就是你调用两次 std::future的get方法试试看, 行为未定义, 但是shared_future则不同, 看下面的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//std::future, std::shared_future
int do_get_value() { return 10; }
int main ()
{
std::future<int> fut = std::async(do_get_value);
std::shared_future<int> shared_fut = fut.share();
// 共享的 future 对象可以被多次访问.
std::cout << "value: " << shared_fut.get() << '\n'; //10
std::cout << "its double: " << shared_fut.get()*2 << '\n'; //10
return 0;
}
所以啊, 没事儿还是要在使用之前检查一下future对象, 看看究竟还能不能再调用get
或者share
方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int do_get_value() { return 11; }
int main ()
{
// 由默认构造函数创建的 std::future 对象,
// 初始化时该 std::future 对象处于为 invalid 状态.
std::future<int> foo, bar;
foo = std::async(do_get_value); // move 赋值, foo 变为 valid.
bar = std::move(foo); // move 赋值, bar 变为 valid, 而 move 赋值以后 foo 变为 invalid.
if (foo.valid())
std::cout << "foo's value: " << foo.get() << '\n';
else
std::cout << "foo is not valid\n";
if (bar.valid())
std::cout << "bar's value: " << bar.get() << '\n';
else
std::cout << "bar is not valid\n";
return 0;
}
再次强调 : 在一个有效的 future 对象上调用 get 会阻塞当前的调用者(get函数内高效的封装了std::future::wait, 而wait是Blocks until the result becomes available.), 直到 Provider 设置了共享状态的值或异常(此时共享状态的标志变为 ready), std::future::get 将返回异步任务的值或异常(如果发生了异常).
不想一直阻塞等怎么办? 那就像上面一样, 用wait_for()异步询问(其实是, 我先等指定的时间, 时间到了, 我就询问, 不管有没有我都返回), 例如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int main()
{
std::future<int> future = std::async(std::launch::async, [](){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});
std::cout << "waiting...\n";
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);
std::cout << "result is " << future.get() << '\n';
}
运行结果如下:1
2
3
4
5waiting...
timeout
timeout
ready!
result is 8
而如果你用的是 wait
, 那么就会阻塞等待结果(get函数封装了wait(), 当然如果get的时候调用已经完成, 可以拿到结果, 那么就不用阻塞了), 看下面:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int fib(int n)
{
if (n < 3) return 1;
else return fib(n-1) + fib(n-2);
}
int main()
{
std::future<int> f1 = std::async(std::launch::async, [](){
return fib(20);
});
std::future<int> f2 = std::async(std::launch::async, [](){
return fib(25);
});
std::cout << "waiting...\n";
f1.wait();
f2.wait();
std::cout << "f1: " << f1.get() << '\n';
std::cout << "f2: " << f2.get() << '\n';
}
运行结果如下:1
2
3waiting...
f1: 6765
f2: 75025
还有一个 wait_until呢? 它也不像 wait 那样一直傻等, 它等具体制定的时间到了, 就停止等待. 如果具体时间到之前, 结果就拿到了那最好(提前就返回了), 如果没有, 那么你就只能去判断他的返回值了, 和 wait_for 一样, 是判断 std::future_status
, 即:1
2
3
4
5
6
7if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
具体含义是:
- future_status::deferred The function to calculate the result has not been started yet
- future_status::ready The result is ready
- future_status::timeout The timeout has expired
下面有个例子: (仅等待2秒, 让执行1秒的拿到结果, 执行5秒的超时而返回)
1 |
|
运行结果如下:1
2
3
4Waiting for 2 seconds...
f_completes: 9
f_times_out did not complete!
Done!
补上说一下 shared_future
:
std::shared_future 与 std::future 类似,但是 std::shared_future 可以拷贝、多个 std::shared_future 可以共享某个共享状态的最终结果(即共享状态的某个值或者异常, 表现为重复调用 get方法). shared_future 可以通过某个 std::future 对象隐式转换(参见 std::shared_future 的构造函数 shared_future (future<T>&& x) noexcept;
), 或者通过 std::future::share() 显示转换,无论哪种转换,被转换的那个 std::future 对象都会变为 not-valid.
它的构造函数如下:
- shared_future() noexcept; //default
- shared_future (const shared_future& x); //copy
- shared_future (shared_future&& x) noexcept; //move
- shared_future (future
&& x) noexcept; //move from future – 相当于隐式转换
辅助工具
这部分有异常, 枚举, 还有辅助函数, 主要涉及
- async 异步调用函数
- launch 异步调用函数的启动策略
- future_status 枚举
- future_errors 异常枚举
std::future_error 继承子 C++ 标准异常体系中的 logic_error
按顺序说说.
async 和 launch:
async的函数原型如下:1
2
3
4
5
6
7
8
9//没有指定policy (默认async策略)
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async(Fn&& fn, Args&&... args);
//指定policy才运行
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async(launch policy, Fn&& fn, Args&&... args);
std::async() 的 fn 和 args 参数用来指定异步任务及其参数, launch指定了 async的启动policy, 枚举定义如下:1
2
3
4
5enum class launch : /* unspecified */ {
async = /* unspecified */,
deferred = /* unspecified */,
/* implementation-defined */
};
policy 参数可以是launch::async,launch::deferred,以及两者的按位或( | ).
下面看一个案例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
double ThreadTask(int n) {
std::cout << std::this_thread::get_id()
<< " start computing..." << std::endl;
double ret = 0;
for (int i = 0; i <= n; i++) {
ret += std::sin(i);
}
std::cout << std::this_thread::get_id()
<< " finished computing..." << std::endl;
return ret;
}
int main(void)
{
std::cout << " main thread id :"
<< std::this_thread::get_id()
<< std::endl;
std::future<double> f(std::async(std::launch::async, ThreadTask, 100000000));
while(f.wait_until(std::chrono::system_clock::now() + std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}
while(f.wait_for(std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}
std::cout << f.get() << std::endl;
return EXIT_SUCCESS;
}
运行结果如下:1
2
3
4
5
6
7
8 main thread id :140714198243136
140714176349952 start computing...
task is running...
task is running...
task is running...
task is running...
140714176349952 finished computing...
1.71365
但是上面那个案例, 如果你把策略指定城 std::launch::deferred
那就完蛋了, 因为 std::launch::deferred
策略会延迟到下面这个循环1
2
3
4while(f.wait_for(std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}
的后面的 f.get()
语句是才去执行异步线程内的任务, 也就是说 main 线程这个循环永远出不去, 运行结果如下:1
2
3
4
5
6
7
8
9
10
11ing...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
//下面死循环了
使用 std::launch::deferred
策略, 就直接 get
不要 wait_for 或者 wait_until 去包装函数是否执行完毕了.
下面说说, staus 封装函数(task)的返回状态.
上面已经说了 future_status 的含义了, 再重复一下:
1 | if (status == std::future_status::deferred) { |
具体含义是:
- future_status::deferred The function to calculate the result has not been started yet ( 共享状态包含一个 deferred 函数, 即函数延迟执行)
- future_status::ready The result is ready (共享状态的标志已经变为 ready,即 Provider 在共享状态上设置了值或者异常)
- future_status::timeout The timeout has expired (超时,即在规定的时间内共享状态的标志没有变为 ready)
一般用future_status做逻辑判断, 但是错误处理就要依靠errc, error等类的枚举了.
下面还有三个和错误相关的内容:
future_errc (enum): identifies the future error codes
- broken_promise 取值0 与该 std::future 共享状态相关联的 std::promise 对象在设置值或者异常之前一被销毁
- future_already_retrieved 取值1 与该 std::future 对象相关联的共享状态的值已经被当前 Provider 获取了,即调用了 std::future::get 函数
- promise_already_satisfied 取值2 std::promise 对象已经对共享状态设置了某一值或者异常
- no_state 取值3 无共享状态
当操作promise或者future出错的时候, 会抛出异常, 用future_error类进行捕获.
future_error (class): reports an error related to futures or promises
这就是个异常类(其父类是logic_error), 可以通过其成员方法code
以及 what
获取异常详情.1
2
3
4
5
6
7
8
9
10
11
12
13
14#include <future>
#include <iostream>
int main()
{
std::future<int> empty;
try {
int n = empty.get(); // The behavior is undefined, but
// some implementations throw std::future_error
} catch (const std::future_error& e) {
std::cout << "Caught a future_error with code \"" << e.code()
<< "\"\nMessage: \"" << e.what() << "\"\n";
}
}
整理&上传 ING.
其他主题
很显然在c++11的时候, 并发库并没有完全支持boost线程库的内容, 而是有选择性的部分摘取, 下面提一下并发库中没有涉及到的内容:
- shared_mutex
读写锁(rwlock)
pthreads库倒是为读写锁专门开辟了API, Boost也是(shared_mutex), 但是C++标准貌似到C++17才会引入, 请参考我的文章 boost - atomic
原子操作, 也应该算作无锁编程
领域, 因为我用的不是很多, 所以没有谈, 可以参考我的文章 boost - thread_group
c++11的标准库直接省略了..1
2
3boost::thread_group threads;
threads.create_thread(boost::bind(Worker, boost::ref(counter)));
threads.create_thread(boost::bind(Worker, boost::ref(counter)));
*
尾巴
花了很大的经历, C++11并发支持库也就说的差不多了. 但是用熟悉了这些API是远远不够的, 有时间还是需要看看真正的大师是怎么在理解和处理线程. 这方面的好书并不多, 但是单独花一本书的内容去讲解并发&线程的, 通常都还不错.