技术: C++11 并发库

techniques of thread in c++, not just only in posix linux

如果不看 Futures 库, 整个 C++ 并发库, 就跟 pthread 没有啥区别. 如果加上Futures库, 整个和Boost又没有太多差别. 总之, 整个C++并发库貌似都是在炒冷饭的样子, 不是移植boost, 就是封装底层线程库(例如pthread), 没办法还是仔细说说吧.

关于Pthread, 可以参考我的博文: posix-thread
关于Boost, 可以参考我的博文: Boost总结

本文会花非常大的力气总结 C++11 并发库

引子

C+11中的实现基本和Boost一致, 只不过命名空间不一样以及涉及的头文件不一样:

  • atomic
  • mutex
  • thread
  • condition_variable
  • future

并且相关效率也不一样, 一般认为: lock, atomic, spinlock三者的效率相当, 但是mutex效率低. (这也是你看到为啥和libevent相比, 不管是asio还是多线程框架, 基本讨不到偏移, 可能就是因为mutex的关系).

《C++ Concurrency In Action》 真本书非常推荐, 如果没有时间看完一本书, 那么直接看我说精华, 也不错.

正文

简介

首先c++, 或者boost 为了我们方便的使用线程, 做了很多封装, 什么gud之类的, 反而影响了我们对于线程范畴内最本质内容的理解:

  • 线程的控制(创建,销毁/分离)
  • 线程的同步(信号量(PV),互斥量,竞争/冒险条件,文件锁,屏障)
  • 线程的属性
  • 线程的调试(多线程其实是不好调试的)

根据 Cppreference 的说法, C++11并发库包含以下内容:

  • atomic
    该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数.
    这个和boost是一致的, 不适用锁就能同步的原子操作
  • thread
    该头文件主要声明了 std::thread 类, 另外 std::this_thread 命名空间也在该头文件中.
  • mutex
    该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数
  • condition_variable
    该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any
  • future
    该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::futurestd::shared_future 两个 Future 类
    另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中

其中最后一个future由于涉及到了异步任务, 相当于对于线程的封装(async). 异步任务可以参考我的文章 Asio

讲真, 从pthread切换过来, 没有感觉到幸福可能是因为pthread api用熟悉了, 但是不得不说c++并发库, 使得并发编程变得简单了.

直接上手一个demo:

可以看到老奸巨猾的c++11, 还是只给出了标准, 具体的实现, 要依赖系统平台的库.

也就是说, linux平台, 我们完全可以认为, c++ 并发库给出了统一的操作API, 实际上是封装了pthread操作(具体封装过程可以参考具体的头文件, 比如创建线程这个, 其实就是根据_type参数判断对应pthread_create()的哪种调用).

虽然只是封装, 但是, 确实精简了太多! 例如我要给线程执行函数传参, 看看下面是不是够简单的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <thread>
#include <iostream>

int main()
{
int n1 = 500;
int n2 = 600;
std::thread t([&](int addNum){
n1 += addNum;
n2 += addNum;
},500);
t.join();
std::cout << n1 << ' ' << n2 << std::endl;
}

下面开始细说


Threads库

定义在 thread 头文件下, 该库就两大块儿

  • thread类
  • this_thread命名空间

thread类

主要说说他的构造器, 赋值函数(移动函数)等成员函数的使用.

构造器:

  • 默认构造函数,创建一个空的 thread 执行对象

    1
    thread() noexcept;
  • 初始化构造函数,创建一个 thread对象,该 thread对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出

    1
    2
    template <class Fn, class... Args>
    explicit thread (Fn&& fn, Args&&... args);
  • 拷贝构造函数(被禁用),意味着 thread 不可被拷贝构造

    1
    thread (const thread&) = delete;
  • move 构造函数,move 构造函数,调用成功之后 x 不代表任何 thread 执行对象

    1
    thread (thread&& x) noexcept;

参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <thread>
#include <functional>

void f1(int n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread " << n << " executing\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void f2(int& n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 2 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

int main()
{
int n = 0;
std::thread t1; // t1 is not a thread
std::thread t2(f1, n + 1); // pass by value
std::thread t3(f2, std::ref(n)); // pass by reference
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
t2.join();
t4.join();
std::cout << "Final value of n is " << n << '\n';
}

移动赋值函数

  • move 赋值操作,如果当前对象不可 joinable,传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则调用 terminate() .

    1
    thread& operator= (thread&& rhs) noexcept;
  • 拷贝赋值操作被禁用,thread 对象不可被拷贝

    1
    thread& operator= (const thread&) = delete;

其他成员函数:

  • bool joinable() const noexcept; 检查某个线程是否可以被joinable (正在运行的可以被joinable)
    代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #include <iostream>
    #include <thread>
    #include <chrono>

    void foo()
    {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    int main()
    {
    std::thread t;
    std::cout << "before starting, joinable: " << t.joinable() << '\n';

    t = std::thread(foo);
    std::cout << "after starting, joinable: " << t.joinable() << '\n';

    t.join();
    std::cout << "after joining, joinable: " << t.joinable() << '\n';
    }
    /* 运行结果:
    before starting, joinable: 0
    after starting, joinable: 1
    after joining, joinable: 0
    */

    最好把joinable理解成正在运行 , 后面说成员数join的时候, 还会用到.

  • std::thread::id get_id() const noexcept; 获取线程id
  • native_handle_type native_handle(); 返回一个线程handler, 用于实时调度
    由于系统对于实时调度的支持不同, 所以下面的代码不一定成功.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    #include <thread>
    #include <mutex>
    #include <iostream>
    #include <chrono>
    #include <cstring>
    #include <pthread.h>

    std::mutex iomutex;
    void f(int num)
    {
    std::this_thread::sleep_for(std::chrono::seconds(1));

    sched_param sch;
    int policy;
    pthread_getschedparam(pthread_self(), &policy, &sch);
    std::lock_guard<std::mutex> lk(iomutex);
    std::cout << "Thread " << num << " is executing at priority "
    << sch.sched_priority << '\n';
    }

    int main()
    {
    std::thread t1(f, 1), t2(f, 2);

    sched_param sch;
    int policy;
    pthread_getschedparam(t1.native_handle(), &policy, &sch);
    sch.sched_priority = 20;
    if (pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch)) {
    std::cout << "Failed to setschedparam: " << std::strerror(errno) << '\n';
    }

    t1.join(); t2.join();
    }
  • static unsigned int hardware_concurrency() noexcept; 返回并发数的参考

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #include <iostream>
    #include <thread>

    int main()
    {
    //注意这是个静态方法
    unsigned int n = std::thread::hardware_concurrency();
    //结果和你的逻辑处理器processor个数保持一致, cat /proc/cpuinfo
    //每个processor 启动一个硬件线程
    std::cout << n << " concurrent threads are supported.\n";
    }
  • void join(); 阻塞等待回收其他线程; 只有本线程 joinable is false , 即当前不在运行, 才可以回收其他线程, 否则出现自己等待回收自己, 那就是死锁了.

  • void detach(); 主要是设置分离的线程自己调用detach方法, 必须当前线程在运行, 即joinable才可以.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #include <iostream>
    #include <chrono>
    #include <thread>

    void independentThread()
    {
    std::cout << "Starting concurrent thread.\n";
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Exiting concurrent thread.\n";
    }

    void threadCaller()
    {
    std::cout << "Starting thread caller.\n";
    std::thread t(independentThread);
    t.detach();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Exiting thread caller.\n";
    }

    int main()
    {
    threadCaller();
    std::this_thread::sleep_for(std::chrono::seconds(5));
    }
  • void swap( thread& other ) noexcept; 交换线程对象所绑定的具体线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    #include <iostream>
    #include <thread>
    #include <chrono>

    void foo()
    {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    void bar()
    {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    int main()
    {
    std::thread t1(foo);
    std::thread t2(bar);

    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    std::swap(t1, t2);

    std::cout << "after std::swap(t1, t2):" << std::endl;
    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    t1.swap(t2);

    std::cout << "after t1.swap(t2):" << std::endl;
    std::cout << "thread 1 id: " << t1.get_id() << std::endl;
    std::cout << "thread 2 id: " << t2.get_id() << std::endl;

    t1.join();
    t2.join();
    }

    /*运行结果*/
    thread 1 id: 1892
    thread 2 id: 2584
    after std::swap(t1, t2):
    thread 1 id: 2584
    thread 2 id: 1892
    after t1.swap(t2):
    thread 1 id: 1892
    thread 2 id: 2584

this_thread

这个命名空间下, 主要说几个函数:

  • yield : void yield() noexcept; 让出cpu, 让其他线程执行.
    是否阻塞让出cpu和操作系统的实现有关, 也就是说这只是个建议. For example, a first-in-first-out realtime scheduler (SCHED_FIFO in Linux) would suspend the current thread and put it on the back of the queue of the same-priority threads that are ready to run (and if there are no other threads at the same priority, yield has no effect). 样例代码可以看下面:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    #include <iostream>
    #include <chrono>
    #include <thread>

    // "busy sleep" while suggesting that other threads run
    // for a small amount of time
    void little_sleep(std::chrono::microseconds us)
    {
    auto start = std::chrono::high_resolution_clock::now();
    auto end = start + us;
    do {
    //使用的时候注意加上命名空间
    std::this_thread::yield();
    } while (std::chrono::high_resolution_clock::now() < end);
    }

    int main()
    {
    auto start = std::chrono::high_resolution_clock::now();

    little_sleep(std::chrono::microseconds(100));

    auto elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << "waited for "
    << std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
    << " microseconds\n";
    }
  • get_id : std::thread::id get_id() noexcept; 获取当前线程的id, 类型为 std::thread::id

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    #include <iostream>
    #include <thread>
    #include <chrono>
    #include <mutex>

    std::mutex g_display_mutex;

    void foo()
    {
    std::thread::id this_id = std::this_thread::get_id();

    g_display_mutex.lock();
    std::cout << "thread " << this_id << " sleeping...\n";
    g_display_mutex.unlock();

    std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    int main()
    {
    std::thread t1(foo);
    std::thread t2(foo);

    t1.join();
    t2.join();
    }
  • sleep_for: 主动睡眠放弃cpu一定时间, 之后自动醒来.(由系统调度), 代码原型如下:

    1
    2
    template< class Rep, class Period > //Rep数字计数单位, Period时间单位
    void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );

    基本使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #include <iostream>
    #include <chrono>
    #include <thread>

    int main()
    {
    using namespace std::chrono_literals;
    std::cout << "Hello waiter" << std::endl;

    auto start = std::chrono::high_resolution_clock::now();
    std::this_thread::sleep_for(2s);
    auto end = std::chrono::high_resolution_clock::now();
    // 注意持续的时间可以使用std::chrono::duration并提供单位
    std::chrono::duration<double, std::milli> elapsed = end-start;
    std::cout << "Waited " << elapsed.count() << " ms\n";
    }
  • sleep_until 睡眠到某个时间点

    1
    2
    template< class Clock, class Duration >
    void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );

std::this_thread下的这几个函数还是经常用到的:

  • yield
  • get_id
  • sleep_for
  • sleep_until

Mutex库

On Linux, it uses pthreads underlying. So it can be thought of pthreads wrapped in C++ style objects. 但是讲到mutex锁, 它提供了一种全新的使用方式.
mutex库, 主要是围绕 mutex 以及 lock 展开 (当然也包括它们的变形和封装体). 此外把call_once相关的内容也放入了头文件 mutex 这里.

先看下, 以前再Boost是怎么用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <iostream>
#include <boost/thread.hpp>

int globalVariable;

class Reader
{
public:
Reader(int waitTime) { _waitTime = waitTime;}
void operator() () {
for (int i=0; i < 10; i++) {
std::cout << "Reader Api: " << globalVariable << std::endl;
usleep(_waitTime);
}
return;
}
private:
int _waitTime;
};

class Writer
{
public:
Writer(int variable, int waitTime)
{
_writerVariable = variable;
_waitTime = waitTime;
}
void operator () () {
for (int i=0; i < 10; i++) {
usleep(_waitTime);
// Take lock and modify the global variable
boost::mutex::scoped_lock lock(_writerMutex);
globalVariable = _writerVariable;
_writerVariable++;
// since we have used scoped lock,
// it automatically unlocks on going out of scope
}
}
private:
int _writerVariable;
int _waitTime;
static boost::mutex _writerMutex;
};

boost::mutex
Writer::_writerMutex;

int main()
{
Reader reads(100);
Writer writes1(100, 200);
Writer writes2(200, 200);

boost::thread readerThread(reads);
boost::thread writerThread1(writes1);
usleep(100);
boost::thread writerThread2(writes2);

readerThread.join();
writerThread1.join();
writerThread2.join();
}

编译运行:

1
2
3
4
5
6
7
8
9
10
11
$ g++ -o boost_mutex boost_mutex.cpp -lboost_thread -lboost_system
$ ./boost_mutex
Reader Api: 100
Reader Api: 100
Reader Api: 200
Reader Api: 103
Reader Api: 104
Reader Api: 204
Reader Api: 205
Reader Api: 206
Reader Api: 207

但是加锁和解锁的过程, 用了一个 boost::mutex::scoped_lock lock(_writerMutex); 相当简单.

boost thread就说这么多, 下面接着说c++的mutex库.

mutex

首先要说的是, 在C++线程库, 很少有, 手动加锁然后解锁的这种传统流程(当然也可以这么做), 一般是使用手动加锁, 自动解锁的形式, 即 一般使用的是 lock_guard, unique_lock 等包装类, 但是包装类也需要Mutex作为锁的基本. 所以还是要介绍一下 mutex 锁:

  • mutex 基本的排它锁(类)
  • timed_mutex 定时 Mutex (类)
  • recursive_mutex 递归锁(可以重复上锁)
  • recursive_timed_mutex 定时递归锁

当然也有共享锁:

1
2
3
#include <mutex>
shared_timed_mutex : provides shared mutual exclusion facility (C++14)
shared_mutex : provides shared mutual exclusion facility (C++17)

基本锁 mutex 类, 仅提供了传统的流程方法:

  • lock : locks the mutex, blocks if the mutex is not available
  • try_lock : tries to lock the mutex, returns if the mutex is not available
  • unlock : unlocks the mutex

以try_lock为例, 案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout

std::chrono::milliseconds interval(100);

std::mutex mutex;
int job_shared = 0; //两个线程要修改的(共享)量

int job_exclusive = 0; // 两个线程要修改的量, 但不提供mutex保护

// this thread can modify both 'job_shared' and 'job_exclusive'
void job_1()
{
std::this_thread::sleep_for(interval); // let 'job_2' take a lock

while (true) { //使用 try_lock 离不开循环
// try to lock mutex to modify 'job_shared'
if (mutex.try_lock()) {
std::cout << "job shared (" << job_shared << ")\n";
mutex.unlock(); //能加上锁, 自然用完要解锁
return;
} else {
// can't get lock to modify 'job_shared'
// but there is some other work to do
++job_exclusive;
std::cout << "job exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}

// this thread can modify only 'job_shared'
void job_2()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval);
++job_shared;
mutex.unlock();
}

int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);

thread_1.join();
thread_2.join();
}

其他额锁, 定时锁和递归锁(重复锁), 主要方法和mutex类相似, 但是稍稍做了改进, 比如定时锁, 如果一直拿不到锁, 这个线程要不要这么傻等着? 要等多久? 要等到某个时刻? 而递归锁则是为同一个线程反复加锁提供了方便, 里三层外三层, 只要是同一个线程, 可以多次对同一个锁进行加锁(解锁次数也要匹配, 不然会报错std::system_error), 但是如果别的线程已经拿到了锁, 别说加三层, 一层也加不了, 所以递归锁方便的是本线程自己; 和其他线程的互斥性不变.

定时锁多了两个方法:

  • try_lock_for (public member function)
    tries to lock the mutex, returns if the mutex has been unavailable for the specified timeout duration
  • try_lock_until
    tries to lock the mutex, returns if the mutex has been unavailable until specified time point has been reached

(注意它们的返回值, 成功返回true, 失败返回false; 如果已经有所了, 还在try_lock, 那么其行为是未定义的, 看具体的平台具体怎么实现吧)
例如, try_lock_for: Tries to lock the mutex. Blocks until specified timeout_duration has elapsed or the lock is acquired, whichever comes first. On successful lock acquisition returns true, otherwise returns false.

给一个写的不是太好的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <sstream>

// control access to std::cout (非核心逻辑)
std::mutex cout_mutex;

//下面开辟的线程集合, 都在抢这个锁进行ostringstreams输出
std::timed_mutex mutex;

void job(int id)
{
using Ms = std::chrono::milliseconds;
std::ostringstream stream;

for (int i = 0; i < 3; ++i) {
if (mutex.try_lock_for(Ms(100))) {
stream << "success ";
std::this_thread::sleep_for(Ms(100));
mutex.unlock();
} else {
stream << "failed ";
}
std::this_thread::sleep_for(Ms(100));
}

std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "[" << id << "] " << stream.str() << "\n";
}

int main()
{ //线程集合
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(job, i);
}

for (auto& i: threads) {
i.join();
}
}

再次强调一下:

一般很少会用一个Mutex自己去调用成员方法加锁的, 更多的使用全局加锁方法(针对多个锁, 同时或者顺序加锁) 或者 加锁的包装类比如lock_guard或者unique_lock等

全局加锁算法

本来上面的 mutex 类自身已经提供了lock 和 try_lock方法, 这里又针对不同锁(单个或者多个, 是否需要顺序加锁)提供了通用方法: (其实是对mutex.lock, unlock等的封装)

  • std::lock 阻塞加锁
    locks specified mutexes, blocks if any are unavailable

    1
    2
    template< class Lockable1, class Lockable2, class... LockableN >
    void lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn );
  • std::try_lock 异步加锁
    attempts to obtain ownership of mutexes via repeated calls to try_lock

这些加锁方法其实是去调用每种lockable对象, 即mutex自身的方法, 然后加锁, 并且不会死锁, 原因如下:

Locks the given Lockable objects lock1, lock2, …, lockn using a deadlock avoidance algorithm to avoid deadlock. The objects are locked by an unspecified series of calls to lock, try_lock, unlock. If a call to lock or unlock results in an exception, unlock is called for any locked objects before rethrowing.

简单解释就是, 好比你要连续加两个锁, 现在加完第一个锁, 正要加第二锁的时候, 被强占了cpu, 导致了第二所没有获取, 之后造成了死锁; 如果想要连续一次性加几个锁, 就要这个方法就对了.

但是通用加锁算法, 只提供了加锁, 没有提供解锁方法, 所以, 一般要配合 lock_guard 或者 unique_lock 这类加锁封装类(其实就是把锁作为资源, 用raii手法封装起来)来使用, 下面给出一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <functional>
#include <chrono>
#include <string>

struct Employee {
Employee(std::string id) : id(id) {}
std::string id;
std::vector<std::string> lunch_partners;
std::mutex m;
std::string output() const
{
std::string ret = "Employee " + id + " has lunch partners: ";
for( const auto& partner : lunch_partners )
ret += partner + " ";
return ret;
}
};

void send_mail(Employee &, Employee &)
{
// simulate a time-consuming messaging operation
std::this_thread::sleep_for(std::chrono::seconds(1));
}

// 线程执行方法
// 某个Employee往自己的 lunch_partners 列表中添加人员的时候, 不允许其他线程同时操作.
// assign_lunch_partners 把相关的双方互相添加联系, 所以两个Employee都要加锁
void assign_lunch_partner(Employee &e1, Employee &e2)
{
/* 非核心逻辑
static std::mutex io_mutex;


// 确保cout过程不被其他线程打断, 可以连续输出
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl;
}
*/
// use std::lock to acquire two locks without worrying about
// other calls to assign_lunch_partner deadlocking us
{
//不用检查返回值, 因为加不上锁, 它就一直傻等
std::lock(e1.m, e2.m);
//adopt_lock表示已经加锁了, 调用lock_guard时不必再去加锁了
std::lock_guard<std::mutex> lk1(e1.m, std::adopt_lock);
std::lock_guard<std::mutex> lk2(e2.m, std::adopt_lock);



// Equivalent code (if unique_locks are needed, e.g. for condition variables)
// std::unique_lock<std::mutex> lk1(e1.m, std::defer_lock);
// std::unique_lock<std::mutex> lk2(e2.m, std::defer_lock);
// std::lock(lk1, lk2);


/* 非核心逻辑
//同样确保输出过程不被打断
{
std::lock_guard<std::mutex> lk(io_mutex);
std::cout << e1.id << " and " << e2.id << " got locks" << std::endl;
}

*/
e1.lunch_partners.push_back(e2.id);
e2.lunch_partners.push_back(e1.id);
}
send_mail(e1, e2);
send_mail(e2, e1);
}

int main()
{
Employee alice("alice"), bob("bob"), christina("christina"), dave("dave");

// assign in parallel threads because mailing users about lunch assignments
// takes a long time
std::vector<std::thread> threads;
threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob));
threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice));
threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob));

for (auto &thread : threads) thread.join();
std::cout << alice.output() << '\n' << bob.output() << '\n'
<< christina.output() << '\n' << dave.output() << '\n';
}

但是如果不用封装类, lock_guard或者unique_lock, 就应该让每个lockable对象分别自己解锁.

std::try_lock也是类似的, 只不过相当于std::lock, 它是非阻塞的, 所以注意检查它的 返回值 :(拿不到锁, 它又不会傻等)

1
2
template< class Lockable1, class Lockable2, class... LockableN>
int try_lock( Lockable1& lock1, Lockable2& lock2, LockableN&... lockn);

按你给定的顺序加锁, 全部成功, 返回-1; 如果哪一个失败就返回哪一个的索引(参数的索引从0开始, 即0-based). 通常仅当所有的加锁成功, 即返回-1的时候才停止try_lock尝试.

可以看下面的代码: (多个锁的 try_lock 实验)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <mutex>
#include <vector>
#include <thread>
#include <iostream>
#include <functional>
#include <chrono>

int main()
{
/*一个锁保护一个变量, 两个线程, 分别增加foo_count, bar_count*/
int foo_count = 0; std::mutex foo_count_mutex;
int bar_count = 0; std::mutex bar_count_mutex;
int overall_count = 0; //上面两个锁都拿到才修改总数

//主线程, 即main函数所在线程控制是否结束 done 标志, 多余的锁, 见下面分析
bool done = false; //std::mutex done_mutex;


//线程执行函数(用来修改 foo_count, bar_count)
auto increment = [](int &counter, std::mutex &m, const char *desc) {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(m);
++counter;
std::cout << desc << ": " << counter << '\n';
lock.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};

std::thread increment_foo(increment, std::ref(foo_count),
std::ref(foo_count_mutex), "foo");
std::thread increment_bar(increment, std::ref(bar_count),
std::ref(bar_count_mutex), "bar");

std::thread update_overall([&]() {

//本线程和main线程共同受到done标志的相应, 只是本线程只是检查, 并不修改.
// 所以下面弄什么 done_mutex其实是多余
//done_mutex.lock();

while (!done) {
//done_mutex.unlock();
int result = std::try_lock(foo_count_mutex, bar_count_mutex);
if (result == -1) { //两个锁我都拿到才修改overall_count
overall_count += foo_count + bar_count;
foo_count = 0;
bar_count = 0;
std::cout << "overall: " << overall_count << '\n';
foo_count_mutex.unlock();
bar_count_mutex.unlock();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
//done_mutex.lock();
}

//done_mutex.unlock();
});

increment_foo.join(); //foo_count 增加到10完毕, 回收线程
increment_bar.join(); //bar_count 增加到10完毕, 回收线程

//done_mutex.lock();
done = true;
//done_mutex.unlock();

update_overall.join();

std::cout << "Done processing\n"
<< "foo: " << foo_count << '\n'
<< "bar: " << bar_count << '\n'
<< "overall: " << overall_count << '\n';
}

lock类结构

BasicLockable 类型的对象只需满足两种操作,lock 和 unlock,
Lockable 类型,在 BasicLockable 类型的基础上新增了 try_lock 操作,因此一个满足 Lockable 的对象应支持三种操作:lock,unlock 和 try_lock;
TimedLockable 类型,在 Lockable 类型的基础上又新增了 try_lock_for 和 try_lock_until 2种操作,因此一个满足 TimedLockable 的对象应支持五种操作:lock, unlock, try_lock, try_lock_for, try_lock_until

lock包装类

C++11中单个锁的包装类, 构造的时候m.lock, 析构的时候自动m.unlock (其实就是把锁作为资源, 用raii手法封装起来):

  • std::lock_guard 方便线程对互斥量上锁(RAII 手法)的包装类
    implements a strictly scope-based mutex ownership wrapper
  • std::unique_lock 方便线程对互斥量上锁, 但提供了更好的上锁和解锁控制的包装类
    implements movable mutex ownership wrapper

(严格来说 std::unique_lock应该归类为BasicLockable类型, 即和mutex归为一类)

C++14和C++17中的包装类:

  • shared_lock
    implements movable shared mutex ownership wrapper
  • scoped_lock
    deadlock-avoiding RAII wrapper for multiple mutexes

(主要使用 lock_guardunique_lock, 如果是boost, 那么几个就随便用了)

以lock_guard为例:

  • explicit lock_guard( mutex_type& m );
  • lock_guard( mutex_type& m, std::adopt_lock_t t ); //adopt_lock_t类型表示加锁策略
  • lock_guard( const lock_guard& ) = delete; //不允许lock_guard间相互赋值

(移动构造/移动拷贝也没有)

如果这个线程还再运行, 并且拥有 mutx m, 那么调用lock_guard()就会尝试去拿锁(拿不到阻塞等待). 但是下面的情况是危险的:

  • 不是递归mutex, 却要用lock_guard取重复加锁
  • 当前线程不存在了(停止或者被停止运行了), 还想调用lock_guard()
  • 当前线程不拥有该mutex变量

(该方法会抛出异常, 一般是m.lock()产生的)

简单的使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <thread>
#include <mutex>
#include <iostream>

int g_i = 0;
std::mutex g_i_mutex; // protects g_i

void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;

std::cout << std::this_thread::get_id() << ": " << g_i << '\n';

// g_i_mutex is automatically released when lock
// goes out of scope
}

int main()
{
std::cout << __func__ << ": " << g_i << '\n';

std::thread t1(safe_increment);
std::thread t2(safe_increment);

t1.join();
t2.join();

std::cout << __func__ << ": " << g_i << '\n';
}

lock_guard 只是简单的包装, 它简化了mutex加锁和解锁的过程(但是不维护和管理锁的生命周期), 更加强大的是, std::unique_lock, shared_lock, scoped_lock; C++11中只支持到了unique_lock.

下面说说 unique_lock()

unique_lock 是对 mutex 集合的封装, 新创建的 unique_lock 对象负责传入的 Mutex 对象的上锁和解锁操作, 并且是独占方式. unique_lock支持:

  • 延迟加锁(先声明锁, 之后真正用的时候在加锁, 比如使用std::lock())
  • time-constrained attempts at locking (定义异步加锁)
  • recursive locking
  • transfer of lock ownership (但是不支持拷贝, 只支持移动)
  • use with condition variables

和 lock_guard 一样, std::unique_lock 对象也能保证在其自身析构时它所管理的 Mutex 对象能够被正确地解锁(即使没有显式地调用 unlock 函数). 这也是一种简单而又安全的上锁和解锁方式, 尤其是在程序抛出异常后先前已被上锁的 Mutex 对象可以正确进行解锁操作, 极大地简化了程序员编写与 Mutex 相关的异常处理代码. (unique_lock 对象同样也不负责管理 Mutex 对象的生命周期)

下面是延迟锁定的简单使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <mutex>
#include <thread>
#include <chrono>

struct Box {
explicit Box(int num) : num_things{num} {}

int num_things;
std::mutex m;
};

void transfer(Box &from, Box &to, int num)
{
// don't actually take the locks yet
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);

// lock both unique_locks without deadlock
std::lock(lock1, lock2);

from.num_things -= num;
to.num_things += num;

// 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
}

int main()
{
Box acc1(100);
Box acc2(50);

std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);

t1.join();
t2.join();
}

但 unique_lock 给程序员提供了更多的自由, unique_lock 作为一个TimedLockable对象, 支持5种主要操作: lock, unlock, try_lock, try_lock_for, try_lock_until. 主要构造器, 大致上如下:

  • unique_lock() noexcept; //默认构造器
  • explicit unique_lock(mutex_type& m); //对应basic mutex, 即含有lock和unlock操作
  • unique_lock(mutex_type& m, try_to_lock_t tag); //在上面的基础上增加了try_lock方法
  • unique_lock(mutex_type& m, defer_lock_t tag) noexcept; //延迟绑定mutex
  • unique_lock(mutex_type& m, adopt_lock_t tag); //中途收养已经加锁的mutex
  • template
    unique_lock(mutex_type& m, const chrono::duration& rel_time); //带有计时器的mutex的封装, 相当于lock_for
  • template
    unique_lock(mutex_type& m, const chrono::time_point& abs_time); //相当于lock_until
  • unique_lock(const unique_lock&) = delete; //禁止拷贝
  • unique_lock(unique_lock&& x); //移动转移所有权
    如果被赋值的对象之前已经获得了它所管理的 Mutex 对象的锁(即已经上锁), 则在移动赋值(move assignment)之前会调用 unlock 函数释放它所占有的锁
    主要是用在, 创建 unique_lock 的时候不指定 mutex 的情况.
    1
    2
    std::unique_lock<std::mutex> lck;         // default-constructed
    lck = std::unique_lock<std::mutex>(mtx); // move-assigned

其他成员函数, 根据 参考手册 的分类, 应该如下:

  • 锁操作类
    • lock
    • try_lock, try_lock_for, try_lock_until
    • unlock
  • 所有权类
    • swap 交换 unique_lock 所关联的 mutex
    • release 释放所有权(返回指向它所管理的 Mutex 对象的指针并释放所有权)
  • 判别类(只读)
    • owns_lock(返回当前 std::unique_lock 对象是否获得了锁, 不仅仅是检查是否关联了 mutex ), 已经加锁则返回true
    • operator bool (和上面作用一样, 用于判断条件语句中直接使用对象进行判断)
    • mutex 直接返回相关联的 mutex 的指针

但是注意, 如果你要后面自己调用锁操作相关的成员方法, 主要是指加锁操作, (不手动调用unlock, 在作用域结束的时候也会自动调用), 那么初始化 unique_lock的时候, 必须制定为延迟绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//std::unique_lock<std::mutex> lck (mtx,std::defer_lock);

//线程执行方法
void print_thread_id (int id) {
std::unique_lock<std::mutex> lck (mtx,std::defer_lock);
// critical section (exclusive access to std::cout signaled by locking lck):
lck.lock();
std::cout << "thread #" << id << '\n';
lck.unlock(); //不手动调用也会自动调用
}

//线程执行方法
void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::defer_lock);
// print '*' if successfully locked, 'x' otherwise:
if (lck.try_lock())
std::cout << '*';
else
std::cout << 'x';
}


//线程执行方法
void fireworks () {
std::unique_lock<std::timed_mutex> lck(mtx, std::defer_lock);
// waiting to get a lock: each thread prints "-" every 200ms:
while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
}

当然, 如果你使用 std::try_to_lock 这种参数, 那么初始化的时候, 就会尝试加锁了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>       // std::cout
#include <vector> // std::vector
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::try_to_lock

std::mutex mtx; // mutex for critical section

void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::try_to_lock);
// print '*' if successfully locked, 'x' otherwise:
if (lck.owns_lock()) // 等价于 if(lck)
std::cout << '*';
else
std::cout << 'x';
}

int main ()
{
std::vector<std::thread> threads;
for (int i=0; i<10; ++i)
threads.emplace_back(std::thread(print_star));

for (auto& x: threads) x.join();

return 0;
}

仔细一看, unique_lock 比 lock_guard 灵活不少, 但是lock_guard设计目的明显而又简单.

lock_t

上面说 std::lock_guard, std::scoped_lock, std::unique_lock, and std::shared_lock 的时候, 已经涉及到这个结构体了, 代表上锁的策略( tag type used to specify locking strategy).

它们的用法不同:

  • defer_lock_t do not acquire ownership of the mutex (延后获取, 真正加锁的时候才获取所有权; 构造只是声明以后的联系)
  • try_to_lock_t try to acquire ownership of the mutex without blocking
  • adopt_lock_t assume the calling thread already has ownership of the mutex

它们都可以作为参数传入给 unique_lock 或 lock_guard 的构造函数, 但是具体意义是不同的.

一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <mutex>
#include <thread>

struct bank_account {
explicit bank_account(int balance) : balance(balance) {}
int balance;
std::mutex m;
};

void transfer(bank_account &from, bank_account &to, int amount)
{
// lock both mutexes without deadlock
std::lock(from.m, to.m);

// make sure both already-locked mutexes are unlocked at the end of scope
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);

// 上面的代码等价于(equivalent approach): //derfer_lock表明自己手动加锁
// std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
// std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// std::lock(lock1, lock2);

from.balance -= amount;
to.balance += amount;
}

int main()
{
bank_account my_account(100);
bank_account your_account(50);

std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10);
std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5);

t1.join();
t2.join();
}

call_once

在pthreads中是这样用的

1
2
3
4
#include <pthread.h>

pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));

为了确保某些变量只在多线程环境中初始化一次, 要求 once_control 初始化指定的值, 而全部的这个flag是由库维护的保证了其互斥性. (c++中也可以用aotomic进行代替)

在C++这个并发库中, 类似的使用 std::call_once 来解决:

1
2
template< class Function, class... Args >  
void call_once ( std::once_flag& flag, Function&& f, Args&& args... );

注意这个函数时会抛出异常的(当f运行出错时): std::system_error if any condition prevents calls to call_once from executing as specified any exception thrown by f .
按照c++标准的介绍, 应该是如果第一次调用没有成功的话,那么第二次还会继续调用,一次类推直到调用成功为止。(实际上, 根据不同的实现, 表现结果也有一点儿差别)

简单的使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <mutex>

static std::vector<std::string> staticData;
std::vector<std::string> initializeStaticData ()
{
std::vector<std::string> vec;
vec.push_back ("initialize");

return vec; //值拷贝
}

void foo()
{
static std::once_flag oc;
std::call_once(oc, [] { staticData = initializeStaticData ();});
}

在本例中是系统自动初始化它(实际上onc_flag类的内部状态在调用call_once时采取设置, 不用担心), 并且注意std::once_flag is neither copyable nor movable.

但是上面的案例是不规范的, 说过的, 它是会抛出异常的, 下面看一个可能抛出异常的案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <thread>
#include <mutex>

std::once_flag flag1, flag2;

void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}

void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // this may appear more than once
throw std::exception();
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}

void do_once(bool do_throw)
{
std::cout << "pid = " << std::this_thread::get_id() << "\n";
try {
//可能抛出异常
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {
//do nothing
std::cout << "unlock yes ?" << "\n";
}
}

int main()
{
//可惜只执行一次
// std::thread st1(simple_do_once);
// std::thread st2(simple_do_once);
// std::thread st3(simple_do_once);
// std::thread st4(simple_do_once);
// st1.join();
// st2.join();
// st3.join();
// st4.join();

//std::thread t1(do_once, true);
//std::thread t2(do_once, true);
std::thread t3(do_once, false); //直到不再抛出异常, 才算执行了一次
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
}

本质上来说, 异常和线程pthread的库在内核的futex同步机制上(内核互斥对象处理上), 貌似没有做的很好; 所以如果你不注释掉相应的行

1
2
//std::thread t1(do_once, true);
//std::thread t2(do_once, true);

就可能产生死锁, 运行类似下面的结果:

1
2
3
4
5
6
pid = pid = 140252808775424
pid = 140252800382720
throw: call_once will retry
140252817168128
unlock yes ?
pid = 140252791990016

查看线程堆栈, 发现3个线程在等待内核的互斥对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Thread 4 (Thread 0x7fe23bfff700 (LWP 8588)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 futex_wait_simple (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/nptl/futex-internal.h:135
#2 __pthread_once_slow (once_control=0xcb536311d8 <flag2>, init_routine=0x7fe244f293c0 <__once_proxy>) at pthread_once.c:105
0x000000cb5342e3b1 in do_once (do_throw=true) at callonce.cpp:26

Thread 3 (Thread 0x7fe243514700 (LWP 8587)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 futex_wait_simple (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/nptl/futex-internal.h:135
#2 __pthread_once_slow (once_control=0xcb536311d8 <flag2>, init_routine=0x7fe244f293c0 <__once_proxy>) at pthread_once.c:105
0x000000cb5342e3b1 in do_once (do_throw=false) at callonce.cpp:26

Thread 2 (Thread 0x7fe243d65700 (LWP 8586)):
#0 0x00007fe2452006ef in futex_wait (private=0, expected=1, futex_word=0xcb536311d8 <flag2>) at ../sysdeps/unix/sysv/linux/futex-internal.h:61
#1 fusimple (private=0, expected=1, rnal.h:135
#2 __nce_slow (once_control=0xcb5363thread_once.c:105
0x342e3b1 in do_once (do_throw=true) at callonce.cpp:26

//还有一个已经跑完的,但是没有释放锁的线程
Threadd 0x7fe245605740 (LWP 8584)):
#0 0x51fa67d in pthread_join (threadid=140609777456896, thread_return=0x0) at pthread_join.c:90
#1 0x4f2a397 in std::thread::join() () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#2 0x342e48b in main () at callonce.cpp:51

但是如果 t3 最先执行, 那么最好不过了, 可以直接运行结束, 而不会产生死锁(你多运行几次)

1
2
3
4
5
= pid = 139844931725056
Didn't throw, call_once will not attempt again
139844923332352
pid = 139844940117760
pid = 139844948510464

如果你涉及到异常, call_once可以很好的用于初始化(拿到一个实例)一次, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {  
private:
mutable std::once_flag initDataFlag;
void initData ()
{
_data = "init";
}

std::string _data;
public:
std::string& getData () {
std::call_once (initDataFlag, &X::initData, this);
return _data;
}
};

如果返回相关实例的话, 这就是一个单例啊.

condition_variable库

该库的头文件如下:

1
< condition_variable >

这个里面主要包括2个类和1个函数:

  • condition_variable 类
  • condition_variable_any 类
  • notify_all_at_thread_exit 函数

额外的还有一个枚举类型 cv_status

配合mutex或者mutex的包装类使用, 个人感觉比 pthreads 要简单一些.

与 std::condition_variable 类似, 只不过 std::condition_variable_any 的 wait 函数可以接受任何 lockable 参数(它就是一个类模板), 而 std::condition_variable 只能接受 std::unique_lock 类型的参数, 除此以外, 和 std::condition_variable 几乎完全一样. 下面重点说 std::condition_variable .

看下api, 大致也就和pthreads的 pthread_cond_*() 用法类似, 可能名字不太一样.

大致把成员函数分类一下:

  • wait系列 (条件不满足&拿不到锁, 等待)
    • wait
    • wait_until
    • wait_for
  • notify系列
    • notify_one
    • notify_all
  • 构造系列 (条件变量和锁绑定, 相互复制没有意义)
    • condition_variable();
    • condition_variable(const condition_variable&) = delete;

为什么条件变量要初始化要绑锁?
因为给下属放权很重要,cv判断条件是否满足从而决定是否竞争锁,这期间cv需要有主动放锁和主动加锁的权利.

std::condition_variable 对象通常使用 std::unique_lock 来等待, 等待前先加锁. 等待时, 如果条件不满足, wait 会原子性地解锁并把线程挂起.

好吧, 我强调一下:

与条件变量搭配使用的锁必须是 unique_lock, 不能用 lock_guard.

直接看一下案例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>                // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.

//线程执行函数
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) {
// 条件不满足, 放锁, 等待重新加锁
// 该线程执行阻塞在这条语句
cv.wait(lck);
}
//被别的线程唤醒并抢到了锁
std::cout << "thread " << id << '\n';
}



int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);

std::cout << "10 threads ready to race...\n";

{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}

//cv.notify_one(); //请先通知再放锁
for (auto & th:threads) {
th.join();
}

return 0;
}

至于先放锁,再通知别人,还是先通知别人再放锁, 再pthread里面讨论过, 不在多说.
一般只有两个线程, 才会用到 notify_one , 因为”通知”非彼即此.

(即使接到通知了也不一定能抢到锁, 还是要看操作系统的调度策略, 比如优先级高的先来之类的)

下面说说 wait 系列的函数.

  • void wait (unique_lock& lck); //无条件放锁阻塞等待
  • template
    void wait (unique_lock& lck, Predicate pred); //自带预判条件

它俩的区别是? 第二种情况设置了前置条件, 它相当于:

1
2
3
while (!pred()) {
cv.wait(lock);
}

注意 pred 是一个为此函数, 例如lambda表达式之类的 []{return i == 1;} , 当然你也可以把它写成函数:

1
2
3
bool equalsOne() {
return i == 1;
}

下面有一个简单的案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>

std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
// 1) to synchronize accesses to i
// 2) to synchronize accesses to std::cerr
// 3) for the condition variable cv
int i = 0;

void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "Waiting... \n";
cv.wait(lk, []{return i == 1;});
std::cerr << "...finished waiting. i == 1\n";
}

void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();

std::this_thread::sleep_for(std::chrono::seconds(1));

{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}

int main()
{
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}

todo

Futures库

概述

当时学习 boost 的时候, 这一篇是直接一带而过的, 因为平常都没有怎么用到这个库.这个库和异步任务(简单理解成单独封装&运行在独立线程中的任务)有关, 和线程间传递数据(当然包括共享数据–即同步)有关. Futures本身又是指期货, 未来交易的一种约定&协议.

怎么样理解这个库呢? Wiki上面给了很好的解释:

In computer science, future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is yet incomplete.

联系实际编程实体, 其实就很好理解了: 线程本身运行是互不干扰的, 虽然它们可以互相的共享一些资源. 那么两个独立运行的实体, 怎么通信交流呢? 就通过约定或者协议来了, 交流呢就通过promise对象来, 约定我这个线程的执行结果存储在一个叫做 futures的实体里, 另外的线程体执行体, 你们要结果先同步等待着(这个同步可能被封装起来, 你看不到它们显示的在wait或者被mutex挡住), 之后有了值就去取(而不是在那傻等) 所以这个过程是异步过程(可能有个线程在同步等待, 比如主线程).

并且这种约定, 通常是运行之前就要约定好, 我这个线程里面跑的是个什么任务(package_task), 我运行期来之后, 别的线程要交流就通过 promise (我给promise写值), 这个代理对象, 我没有给它值之前, 别的需要从这个对象获取值的其他线程就同步等待, 而且只能通过与promise关联的futures获取, 之后我这个线程运行的任务结束了, 有需要结果, 但是又在运行中不能傻等在那儿等结果的线程, 可以异步的从futures得到结果(当然我结束了给一个回调通知也是可以的?).

如何匹配future/promise对呢? (建立相关的关联) 一个在我的线程, 另一个在别的啥线程中么?
既然 future 和 promise 可以被到处移动(不是拷贝), 那么可能性就挺多的.
最普遍的情况是父子线程配对形式, 父线程用future获取子线程promise返回的值. 在这种情况下, 使用async()是很优雅的方法.

如果还是没有听明白, 直接看代码吧, 这段代码是这么个意思:

一个double数组求和的任务, 我(主线程) 分别交给两个子线程去求和(一个求前半个数组的和, 一个求后半个数组的和), 之后分别返回后, 我主线程在完成最后的求和工作.

大致代码如下: (同步&协作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
double accumulate_all(vector& v)
{
// package the tasks:
//用accumulate算法计算 doubles 数据的和
//注意 task 内部已经处理好了写入promise的操作
packaged_task pt0{std::accumulate};
packaged_task pt1{std::accumulate};

auto f0 = pt0.get_future(); // get hold of the futures
auto f1 = pt1.get_future();

//把任务分解到两个线程; 分别完成后, 在主线程完成最终求和计算
pt0(&v[0],&v[v.size()/2],0); // start the threads
pt1(&[v.size()/2],&v[size()],0); // packageed_task 封装了线程

return f0.get()+f1.get(); // get the results
}

packaged_task 提供了启动任务的简单方法(但是并没有单独开辟线程, 还是在本线程内执行的). 特别是它处理好了 future 和 promise 的关联关系, 同时提供了包装代码(就是put方法)以保证返回值/异常可以放到 promise 中.

总结:
C++11并发库提供了 future 和 promise 来简化任务线程间的传值(返回值)操作; 同时为启动任务提供了packaged_task以方便的封装(也可以通过 std::packaged_task 拿到future, 获取future_status, 但是想put或者set还是需要promise).

其中的关键点是允许2个任务间使用无(显式)锁的方式进行值传递(标准库帮你高效的封装好了).

async()基本实现思路: 当一个任务需要另外一个线程(启动它的线程)返回值时, 它把这个值放到promise中. 之后这个返回值会出现在和此 promise 关联的 future 中. 于是另外线程就能读到返回值, 从futures中读取(当然你最好先判断一下 futures_status 为 ready 状态).

provider

该部分主要说:

  • std::promise
  • std::package_task

以及 provider 相关的函数 std::async() 和 std::launch.

下面先说 promise.

promise的主要目的是提供一个”put”(也能”get”)操作以和 future 的 get() 对应, 在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联, 并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值. promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值. 可以通过 get_future 来获取与该 promise 对象相关联的 future 对象, 并且用到其set方法:

1
2
3
4
5
6
7
std::future<T> get_future();

//根据声明时promise的参数不同, 有不同的setter
void set_value( const R& value );
void set_value( R&& value );
void set_value( R& value );
void set_value();

promise为future传递的结果类型有2种: (set_xxx)

  • 传一个普通值 (set_value, set_value_at_thread_exit)
  • 抛出一个异常 (set_exception, set_exception_at_thread_exit)

基本用法不同:

1
2
3
4
5
6
7
8
try {
X res;
// compute a value for res
p.set_value(res);
}
catch (...) { // oops: couldn't compute res
p.set_exception(std::current_exception());
}

关于 get_future:
返回的 future 对象可以访问由 promise 对象设置在共享状态上的值或者某个异常对象。只能从 promise 共享状态获取一个 future 对象。如果关联了 future 的promise不设置值或者异常, 那么 promise对象在析构时会自动地设置一个 future_error 异常(broken_promise). 设置promise的值&异常,此后 promise 的共享状态标志变为 ready.

下面有一个简单的例子: (主线程通过promise设置值, 让其他线程通过相关联的future去获取值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>       // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future

void print_int(std::future<int>& fut) {
int x = fut.get(); // 获取共享状态的值.
std::cout << "value: " << x << '\n'; // 打印 value: 10.
}

int main ()
{
std::promise<int> prom; // 生成一个 std::promise<int> 对象.
std::future<int> fut = prom.get_future(); // 和 future 关联.
std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.
prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
t.join();
return 0;
}

设置异常的具体例子: (线程1从终端接收一个整数, 线程2将该整数打印出来, 如果线程1接收一个非整数, 则为 promise 设置一个异常(failbit), 线程2 在std::future::get 是抛出该异常)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>       // std::cin, std::cout, std::ios
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
#include <exception> // std::exception, std::current_exception

void get_int(std::promise<int>& prom) {
int x;
std::cout << "Please, enter an integer value: ";
std::cin.exceptions (std::ios::failbit); // throw on failbit
try {
std::cin >> x; // sets failbit if input is not int
prom.set_value(x);
} catch (std::exception&) { //cin拿到的不是整数时
prom.set_exception(std::current_exception());
}
}

void print_int(std::future<int>& fut) {
try {
int x = fut.get();
std::cout << "value: " << x << '\n';
} catch (std::exception& e) {
std::cout << "[exception caught: " << e.what() << "]\n";
}
}

int main ()
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();

std::thread th1(get_int, std::ref(prom));
std::thread th2(print_int, std::ref(fut));

th1.join();
th2.join();
return 0;
}

那么 void set_value() 是怎么回事?
如果不设置值的话(虽然还是会自动设置一个异常, 然后promise的状态还是ready状态), 那就起到通知(notify)等待从future拿到值的线程, 例如下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#include <thread>
#include <future>
#include <cctype>
#include <vector>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <sstream>

int main()
{
std::istringstream iss_numbers{"3 4 1 42 23 -23 93 2 -289 93"};
std::istringstream iss_letters{" a 23 b,e a2 k k?a;si,ksa c"};

//其他线程把上面流的内容放入 这些容器中
std::vector<int> numbers;
std::vector<char> letters;


std::promise<void> numbers_promise, letters_promise;

//主线程纯碎只是要获取其他线程是否完成的状态
auto numbers_ready = numbers_promise.get_future();
auto letter_ready = letters_promise.get_future();

//子线程用xxx_promise只是传递ready状态
std::thread value_reader([&]
{
// I/O operations.
std::copy(std::istream_iterator<int>{iss_numbers},
std::istream_iterator<int>{},
std::back_inserter(numbers));

//Notify for numbers.
numbers_promise.set_value(); //仅仅是只是传递ready状态

std::copy_if(std::istreambuf_iterator<char>{iss_letters},
std::istreambuf_iterator<char>{},
std::back_inserter(letters),
::isalpha);

//Notify for letters.
letters_promise.set_value();
});

//主线程numbers future 阻塞wait
numbers_ready.wait();
//numbers容器被填充完毕了
std::sort(numbers.begin(), numbers.end());

/* //非核心逻辑
if (letter_ready.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
//output the numbers while letters are being obtained.
for (int num : numbers) std::cout << num << ' ';
numbers.clear(); //Numbers were already printed.
}
*/

//阻塞等待
letter_ready.wait();
std::sort(letters.begin(), letters.end());

//If numbers were already printed, it does nothing.
for (int num : numbers) std::cout << num << ' ';
std::cout << '\n';

for (char let : letters) std::cout << let << ' ';
std::cout << '\n';

value_reader.join();
}

set_xxx_at_thread_exit 这类比较特殊, 拥有promise的线程, 真正set是在线程结束的时候才去set(延后设置), 可想而知, 对方等待的线程会一直等待结果.(但是重复调用set会报错). The state is made ready when the current thread exits, after all variables with thread-local storage duration have been destroyed. 下面有个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <future>
#include <thread>

int main()
{
using namespace std::chrono_literals;
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread([&p] {
std::this_thread::sleep_for(1s);
p.set_value_at_thread_exit(9);
}).detach();

std::cout << "Waiting..." << std::flush;
f.wait();
std::cout << "Done!\nResult is: " << f.get() << '\n';
}

补充说明一下其构造方法中也是禁用了拷贝构造函数的, 只保留了移动构造:

  • promise(); //默认构造函数,初始化一个空的共享状态。
  • template promise
    (allocator_arg_t aa, const Alloc& alloc); //和默认类似, 但是可以自定指定分配器
  • promise (const promise&) = delete; //禁止拷贝构造
  • promise (promise&& x) noexcept; //移动构造

移动构造就是为了转移所有权, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <future> // std::promise, std::future

std::promise<int> prom; //下面接收不同的promise实例的所有权

void print_global_promise () {
std::future<int> fut = prom.get_future();
int x = fut.get();
std::cout << "value: " << x << '\n';
}

int main ()
{
std::thread th1(print_global_promise);
prom.set_value(10);
th1.join();

prom = std::promise<int>(); // prom 被move赋值为一个新的 promise 对象.

std::thread th2 (print_global_promise);
prom.set_value (20);
th2.join();

return 0;
}

下面记录一下 std::packaged_task

std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果). 也就是说 std::packaged_task 包含了两个最基本要素:

  • 被包装的任务(stored task),任务(task)是一个可调用的对象,如函数指针、成员函数指针或者函数对象
  • 共享状态(shared state),用于保存任务的返回值,可以通过 std::future 对象来达到异步访问共享状态的效果

std::packaged_task 对象是异步 Provider, 它在某一时刻通过调用被包装的任务来设置共享状态的值.

通俗的说, 它和 std::function (function, lambda, bind , functor)一样, 可以作为一个可调用对象容器. 但不同的是它多了两样重要功能, 一是异步调用, 二是它保存返回值或者异常(别的线程是可以通过std::future对象获取到的)

std::packaged_task 的 operator() 函数 和 thread对象包装一个方法有什么不同:

A successful call to operator() synchronizes with a call to any member function of a std::future

实际上, 你可以像上面说promise的时候一样, 直接绕过 packaged_task, 而采用普通的启动线程的方式, 还是可以从promise关联的future中拿到结果.

但是直接执行调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>

// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }

void task_lambda()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();

task(2, 9);

std::cout << "task_lambda:\t" << result.get() << '\n';
}

void task_bind()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();

task();

std::cout << "task_bind:\t" << result.get() << '\n';
}

void task_thread()
{
std::cout << "thread id : " << std::this_thread::get_id() << std::endl;
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();

std::thread task_td(std::move(task), 2, 10);
task_td.join();

std::cout << "task_thread:\t" << result.get() << '\n';
}

int main()
{
std::cout << "main thread id : "
<< std::this_thread::get_id() << std::endl;
task_lambda();
task_bind();
task_thread();

return 0;
}

运行结果是:

1
2
3
4
5
6
7
main thread id : 140384054449984
thread id : 139700016576320
task_lambda: 512
thread id : 139700016576320
task_bind: 2048
thread id : 139700016576320
task_thread: 1024

是的, 可以看到其实没有开辟新的线程, 仅仅是进行了调用, 要想在另外的线程执行, 还是要借助 thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>     // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Finished!\n";
return from - to;
}

int main ()
{
std::packaged_task<int(int,int)> task(countdown); // 设置 packaged_task
std::future<int> ret = task.get_future(); // 获得与 packaged_task 共享状态相关联的 future 对象.

std::thread th(std::move(task), 10, 0); //创建一个新线程完成计数任务.

int value = ret.get(); // 等待任务完成并获取结果.

std::cout << "The countdown lasted for " << value << " seconds.\n";

th.join();
return 0;
}

用 packaged_task 直接拿到future, 而不是借助promise才能拿到;

(packaged_task 和 promise 同为 provide 类, 但是要设置值, 还是需要 promise 类对象)

实际上调用传参上面也有一点儿区别: 参考 cplusplus网站

  • If the stored task is a function pointer or a function object, it is called forwarding the arguments to the call.
  • If the stored task is a pointer to a non-static member function, it is called using the first argument as the object on which the member is called (this may either be an object, a reference, or a pointer to it), and the remaining arguments are forwarded as arguments for the member function.
  • If it is a pointer to a non-static data member, it should be called with a single argument, and the function stores in the shared state a reference to that member of its argument (the argument may either be an object, a reference, or a pointer to it).

翻译一下就是:

  • 如果被包装的任务是函数指针或者函数对象,调用 std::packaged_task::operator() 只是将参数传递给被包装的对象。
  • 如果被包装的任务是指向类的非静态成员函数的指针,那么 std::packaged_task::operator() 的第一个参数应该指定为成员函数被调用的那个对象,剩余的参数作为该成员函数的参数。
  • 如果被包装的任务是指向类的非静态成员变量,那么 std::packaged_task::operator() 只允许单个参数。

(其实和 std::function 类似)

关于 packaged_task 执行结果中, 还有两个重要的方法:

  • reset() : resets the state abandoning any stored results of previous executions
  • make_ready_at_thread_exit : executes the function ensuring that the result is ready only once the current thread exits

案例如下:
(看到所在线程退出后, future 才进入ready状态)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <future>
#include <iostream>
#include <chrono>
#include <thread>
#include <functional>
#include <utility>

void worker(std::future<void>& output)
{
std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };

auto result = my_task.get_future();

bool done = false;

my_task.make_ready_at_thread_exit(done); // execute task right away

std::cout << "worker: done = " << std::boolalpha << done << std::endl;

auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::timeout)
std::cout << "worker: result is not ready yet" << std::endl;

output = std::move(result);
}


int main()
{
std::future<void> result;

std::thread{worker, std::ref(result)}.join();
//检查 future的状态 : std::future_status 类型
auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready)
std::cout << "main: result is ready" << std::endl;
}

而 reset 则是重置状态和存储的值, 实现方式其实是检查包装的任务:
*this = packaged_task(std::move(f)), 使用案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <cmath>
#include <thread>
#include <future>

int main()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "2^9 = " << result.get() << '\n';

task.reset();
result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "2^10 = " << result.get() << '\n';
}

其他需要注意的是, packaged_task 是禁止了拷贝语义, 只有移动语义, 所以简单多用 std::move 进行再次包装:

1
2
packaged_task& operator=( const packaged_task& ) = delete;
packaged_task& operator=( packaged_task&& rhs ) noexcept;

移动之后, 原来的 packaged_task 对象也讲失去 state和包装的方法, 此时需要用valid成员方法检测有效性:

1
bool valid() const noexcept;

成员方法 和 std::swap 作用一样, 都是 交换 staes 和包装的方法:

1
2
3
4
5
void swap( packaged_task& other ) noexcept;

template< class Function, class... Args >
void swap( packaged_task<Function(Args...)> &lhs,
packaged_task<Function(Args...)> &rhs ) noexcept;

最后 packaged_task 只是封装了任务, 并没有说是不是同一线程执行, 或者另外的线程单独执行; 如果要想另外的线程单独执行, 其他线程异步获取结果, 可以再用一个封装, std::async. (std:async 通常配合 std::launch 枚举一起使用)

btw: std::async 和 std::launch 都定义在 future 头文件中.

future

该部分主要涉及:

  • std::future
  • std::shared_future

future 对象可以异步返回(拿到)共享状态的值, 或者在必要的情况下阻塞调用者(即拿到并使用future的线程)并等待共享状态标志变为 ready, 然后才能获取共享状态的值.

标准库中提供了3种future:

  • 普通future
  • shared_future(用于复杂场合)
  • atomic_future (暂时不说)

其实普通 future 它已经完全够用了.

如果我们有一个future f,通过get()可以获得它的值:

1
2
// if necessary wait for the value to get computed
X v = f.get();

如果它的返回值还没有到达,调用线程会进行阻塞等待. 等待超时怎么办? get()会抛出异常的(从标准库或等待的线程那个线程中抛出).
如果我们不需要等待返回值(非阻塞方式),可以简单询问一下future,看返回值是否已经到达:

1
2
3
4
5
6
7
8
9
//换成while的话, 就是直接返回, 然后进行下一次询问
if (f.wait_for(0))
{
// there is a value to get()
// do something
} else
{
// do something else
}

一般从构造函数拿到的future都是无效的, 除非接收了有效future对象的所有权(即 move 拿到的别人的).

1
2
3
4
5
6
7
future() noexcept; //默认构造
future (future&& x) noexcept;
future (const future&) = delete;// 禁用拷贝语义

//实例:
std::future<int> fut; // 默认构造函数
fut = std::async(do_some_task); // move-赋值操作。

一般可以从三种途径获取:

  • provider
    • promise 的成员函数 get_future , 但是一般promise都是共享的(全局的), 两个线程都可见
    • packaged_task 的成员函数 get_future , 因为启动 task的线程(姑且称为父线程), 它是拥有task的实例的, 这个线程想拿到异步调用的结果, 就完全可以从task对象要future对象.
  • std::async 函数的返回值(这个就是对标准异步任务流程的封装, 可以简单理解成跑在别的线程里面的 packaged_task , 所以也是可以拿到future对象的)

如果不是默认构造&上面三种途径接收的到的(move), 那么在调用 get 或者 share 之前, 最好检查一下, 用valid 成员函数:

1
2
3
4
//如果这个future有保存shared status, 
//那么就返回true, 表示可以用(get(), share())
//否则返回false
bool valid() const noexcept;

还有一个重要原因要检查 valid, 可能是 share, 即 std::future::share() :
返回一个 std::shared_future 对象(本文后续内容将介绍 std::shared_future), 调用该函数之后, 该 std::future 对象本身已经不和任何共享状态相关联, 因此该 std::future 的状态不再是 valid 的了, valid() == false

那为什么需要 shared_future ? 因为原来的 std::future 当你get一次共享状态(status)值或者存储的异常之后, 它就失效了, 再解引用或者参考它的值的行为是未定义的, 通俗说, 就是你调用两次 std::future的get方法试试看, 行为未定义, 但是shared_future则不同, 看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>       // std::cout
#include <future> // std::async,
//std::future, std::shared_future

int do_get_value() { return 10; }

int main ()
{
std::future<int> fut = std::async(do_get_value);
std::shared_future<int> shared_fut = fut.share();

// 共享的 future 对象可以被多次访问.
std::cout << "value: " << shared_fut.get() << '\n'; //10
std::cout << "its double: " << shared_fut.get()*2 << '\n'; //10

return 0;
}

所以啊, 没事儿还是要在使用之前检查一下future对象, 看看究竟还能不能再调用get或者share方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>       // std::cout
#include <future> // std::async, std::future
#include <utility> // std::move

int do_get_value() { return 11; }

int main ()
{
// 由默认构造函数创建的 std::future 对象,
// 初始化时该 std::future 对象处于为 invalid 状态.
std::future<int> foo, bar;
foo = std::async(do_get_value); // move 赋值, foo 变为 valid.
bar = std::move(foo); // move 赋值, bar 变为 valid, 而 move 赋值以后 foo 变为 invalid.

if (foo.valid())
std::cout << "foo's value: " << foo.get() << '\n';
else
std::cout << "foo is not valid\n";

if (bar.valid())
std::cout << "bar's value: " << bar.get() << '\n';
else
std::cout << "bar is not valid\n";

return 0;
}

再次强调 : 在一个有效的 future 对象上调用 get 会阻塞当前的调用者(get函数内高效的封装了std::future::wait, 而wait是Blocks until the result becomes available.), 直到 Provider 设置了共享状态的值或异常(此时共享状态的标志变为 ready), std::future::get 将返回异步任务的值或异常(如果发生了异常).

不想一直阻塞等怎么办? 那就像上面一样, 用wait_for()异步询问(其实是, 我先等指定的时间, 时间到了, 我就询问, 不管有没有我都返回), 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int main()
{
std::future<int> future = std::async(std::launch::async, [](){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});

std::cout << "waiting...\n";
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);

std::cout << "result is " << future.get() << '\n';
}

运行结果如下:

1
2
3
4
5
waiting...
timeout
timeout
ready!
result is 8

而如果你用的是 wait , 那么就会阻塞等待结果(get函数封装了wait(), 当然如果get的时候调用已经完成, 可以拿到结果, 那么就不用阻塞了), 看下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <future>
#include <thread>

int fib(int n)
{
if (n < 3) return 1;
else return fib(n-1) + fib(n-2);
}

int main()
{
std::future<int> f1 = std::async(std::launch::async, [](){
return fib(20);
});
std::future<int> f2 = std::async(std::launch::async, [](){
return fib(25);
});

std::cout << "waiting...\n";
f1.wait();
f2.wait();

std::cout << "f1: " << f1.get() << '\n';
std::cout << "f2: " << f2.get() << '\n';
}

运行结果如下:

1
2
3
waiting...
f1: 6765
f2: 75025

还有一个 wait_until呢? 它也不像 wait 那样一直傻等, 它等具体制定的时间到了, 就停止等待. 如果具体时间到之前, 结果就拿到了那最好(提前就返回了), 如果没有, 那么你就只能去判断他的返回值了, 和 wait_for 一样, 是判断 std::future_status , 即:

1
2
3
4
5
6
7
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}

具体含义是:

  • future_status::deferred The function to calculate the result has not been started yet
  • future_status::ready The result is ready
  • future_status::timeout The timeout has expired

下面有个例子: (仅等待2秒, 让执行1秒的拿到结果, 执行5秒的超时而返回)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int main()
{
std::chrono::system_clock::time_point two_seconds_passed
= std::chrono::system_clock::now() + std::chrono::seconds(2);

// Make a future that that takes 1 second to completed
std::promise<int> p1;
std::future<int> f_completes = p1.get_future();
std::thread([](std::promise<int> p1)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
p1.set_value_at_thread_exit(9);
},
std::move(p1)
).detach();

// Make a future that that takes 5 seconds to completed
std::promise<int> p2;
std::future<int> f_times_out = p2.get_future();
std::thread([](std::promise<int> p2)
{
std::this_thread::sleep_for(std::chrono::seconds(5));
p2.set_value_at_thread_exit(8);
},
std::move(p2)
).detach();

std::cout << "Waiting for 2 seconds..." << std::endl;

if(std::future_status::ready == f_completes.wait_until(two_seconds_passed))
{ std::cout << "f_completes: " << f_completes.get() << "\n"; }
else
{ std::cout << "f_completes did not complete!\n"; }

if(std::future_status::ready == f_times_out.wait_until(two_seconds_passed))
{ std::cout << "f_times_out: " << f_times_out.get() << "\n"; }
else
{ std::cout << "f_times_out did not complete!\n"; }

std::cout << "Done!\n";
}

运行结果如下:

1
2
3
4
Waiting for 2 seconds...
f_completes: 9
f_times_out did not complete!
Done!

补上说一下 shared_future :
std::shared_future 与 std::future 类似,但是 std::shared_future 可以拷贝、多个 std::shared_future 可以共享某个共享状态的最终结果(即共享状态的某个值或者异常, 表现为重复调用 get方法). shared_future 可以通过某个 std::future 对象隐式转换(参见 std::shared_future 的构造函数 shared_future (future<T>&& x) noexcept;), 或者通过 std::future::share() 显示转换,无论哪种转换,被转换的那个 std::future 对象都会变为 not-valid.

它的构造函数如下:

  • shared_future() noexcept; //default
  • shared_future (const shared_future& x); //copy
  • shared_future (shared_future&& x) noexcept; //move
  • shared_future (future&& x) noexcept; //move from future – 相当于隐式转换

辅助工具

这部分有异常, 枚举, 还有辅助函数, 主要涉及

  • async 异步调用函数
  • launch 异步调用函数的启动策略
  • future_status 枚举
  • future_errors 异常枚举
    std::future_error 继承子 C++ 标准异常体系中的 logic_error

按顺序说说.

async 和 launch:
async的函数原型如下:

1
2
3
4
5
6
7
8
9
//没有指定policy (默认async策略)
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async(Fn&& fn, Args&&... args);

//指定policy才运行
template <class Fn, class... Args>
future<typename result_of<Fn(Args...)>::type>
async(launch policy, Fn&& fn, Args&&... args);

std::async() 的 fn 和 args 参数用来指定异步任务及其参数, launch指定了 async的启动policy, 枚举定义如下:

1
2
3
4
5
enum class launch : /* unspecified */ {
async = /* unspecified */,
deferred = /* unspecified */,
/* implementation-defined */
};

policy 参数可以是launch::async,launch::deferred,以及两者的按位或( | ).

下面看一个案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <stdlib.h>

#include <cmath>
#include <chrono>
#include <future>
#include <iostream>

double ThreadTask(int n) {
std::cout << std::this_thread::get_id()
<< " start computing..." << std::endl;

double ret = 0;
for (int i = 0; i <= n; i++) {
ret += std::sin(i);
}

std::cout << std::this_thread::get_id()
<< " finished computing..." << std::endl;
return ret;
}

int main(void)
{
std::cout << " main thread id :"
<< std::this_thread::get_id()
<< std::endl;
std::future<double> f(std::async(std::launch::async, ThreadTask, 100000000));

#if 0
while(f.wait_until(std::chrono::system_clock::now() + std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}
#else
while(f.wait_for(std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}
#endif

std::cout << f.get() << std::endl;

return EXIT_SUCCESS;
}

运行结果如下:

1
2
3
4
5
6
7
8
 main thread id :140714198243136
140714176349952 start computing...
task is running...
task is running...
task is running...
task is running...
140714176349952 finished computing...
1.71365

但是上面那个案例, 如果你把策略指定城 std::launch::deferred 那就完蛋了, 因为 std::launch::deferred 策略会延迟到下面这个循环

1
2
3
4
while(f.wait_for(std::chrono::seconds(1))
!= std::future_status::ready) {
std::cout << "task is running...\n";
}

的后面的 f.get() 语句是才去执行异步线程内的任务, 也就是说 main 线程这个循环永远出不去, 运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
ing...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
task is running...
//下面死循环了

使用 std::launch::deferred 策略, 就直接 get不要 wait_for 或者 wait_until 去包装函数是否执行完毕了.

下面说说, staus 封装函数(task)的返回状态.

上面已经说了 future_status 的含义了, 再重复一下:

1
2
3
4
5
6
7
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}

具体含义是:

  • future_status::deferred The function to calculate the result has not been started yet ( 共享状态包含一个 deferred 函数, 即函数延迟执行)
  • future_status::ready The result is ready (共享状态的标志已经变为 ready,即 Provider 在共享状态上设置了值或者异常)
  • future_status::timeout The timeout has expired (超时,即在规定的时间内共享状态的标志没有变为 ready)

一般用future_status做逻辑判断, 但是错误处理就要依靠errc, error等类的枚举了.

下面还有三个和错误相关的内容:

future_errc (enum): identifies the future error codes

  • broken_promise 取值0 与该 std::future 共享状态相关联的 std::promise 对象在设置值或者异常之前一被销毁
  • future_already_retrieved 取值1 与该 std::future 对象相关联的共享状态的值已经被当前 Provider 获取了,即调用了 std::future::get 函数
  • promise_already_satisfied 取值2 std::promise 对象已经对共享状态设置了某一值或者异常
  • no_state 取值3 无共享状态

当操作promise或者future出错的时候, 会抛出异常, 用future_error类进行捕获.

future_error (class): reports an error related to futures or promises
这就是个异常类(其父类是logic_error), 可以通过其成员方法code 以及 what 获取异常详情.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <future>
#include <iostream>

int main()
{
std::future<int> empty;
try {
int n = empty.get(); // The behavior is undefined, but
// some implementations throw std::future_error
} catch (const std::future_error& e) {
std::cout << "Caught a future_error with code \"" << e.code()
<< "\"\nMessage: \"" << e.what() << "\"\n";
}
}


整理&上传 ING.

其他主题

很显然在c++11的时候, 并发库并没有完全支持boost线程库的内容, 而是有选择性的部分摘取, 下面提一下并发库中没有涉及到的内容:

  • shared_mutex
    读写锁(rwlock)
    pthreads库倒是为读写锁专门开辟了API, Boost也是(shared_mutex), 但是C++标准貌似到C++17才会引入, 请参考我的文章 boost
  • atomic
    原子操作, 也应该算作 无锁编程 领域, 因为我用的不是很多, 所以没有谈, 可以参考我的文章 boost
  • thread_group
    c++11的标准库直接省略了..
    1
    2
    3
    boost::thread_group threads;	
    threads.create_thread(boost::bind(Worker, boost::ref(counter)));
    threads.create_thread(boost::bind(Worker, boost::ref(counter)));

*

尾巴

花了很大的经历, C++11并发支持库也就说的差不多了. 但是用熟悉了这些API是远远不够的, 有时间还是需要看看真正的大师是怎么在理解和处理线程. 这方面的好书并不多, 但是单独花一本书的内容去讲解并发&线程的, 通常都还不错.

参考资料

  1. 《C++并发编程实战》
  2. 我的博文boost
  3. boost线程库
  4. cppreference
  5. cplusplus
文章目录
  1. 1. 引子
  2. 2. 正文
    1. 2.1. 简介
    2. 2.2. Threads库
      1. 2.2.1. thread类
      2. 2.2.2. this_thread
    3. 2.3. Mutex库
      1. 2.3.1. mutex
      2. 2.3.2. 全局加锁算法
      3. 2.3.3. lock类结构
      4. 2.3.4. lock包装类
      5. 2.3.5. lock_t
      6. 2.3.6. call_once
    4. 2.4. condition_variable库
    5. 2.5. Futures库
      1. 2.5.1. 概述
      2. 2.5.2. provider
      3. 2.5.3. future
      4. 2.5.4. 辅助工具
    6. 2.6. 其他主题
  3. 3. 尾巴
  4. 4. 参考资料
|