由于多核的出现,使用多线程能够显著提高性能。
C++11之前,C++并没有对并发提供语言层面的支持,C++标准库也没有。C++11之后:
- 语言层面,定义一个内存模型,保证在两个不同线程中对两个对象的操作相互独立,增加了
thread_local
关键字。 - 标准库提供了对开启多线程、同步多线程的支持。
The High-Level Interface: async() and Futures
- async()
提供了接口让一个可调用的对象(如某个函数)在独立的线程中运行。 - future<> 类
允许等待某个线程完成,并访问其结果。
一个使用 async() 以及 Future 的例子
计算 func1() + func2()
:
如果是单线程,只能依次运行,并把结果相加。总时间是两者时间之和。
多核多线程情况下,如果两者独立,可以分别运行再相加。总时间是两者时间的最大值。
#include <future>
#include <iostream>
#include <random> // for default_random_engine, uniform_int_distribution
using namespace std;
int doSomething (char c) {
// random-number generator(use c as seed to get different sequences)
std::default_random_engine dre(c);
std::uniform_int_distribution<int> id(10,1000);
for (int i=0; i<10; ++i) {
this_thread::sleep_for(chrono::milliseconds(id(dre)));
cout.put(c).flush(); // output immediately
}
return c;
}
int func1() {
return doSomething('.');
}
int func2() {
return doSomething('+');
}
int main() {
cout << "start func1() in background, and func2() in foreground: " << endl;
future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
cout << "\nresult of func1()+func2(): " << result << endl;
}
注意到在主函数中,我们使用了如下步骤:
// instead of:
// int result = func1() + func2();
future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
我们使用 async() 使 func1() 在别的线程运行,并将结果赋值给 future。func2() 继续在主线程运行,最后综合结果。
future 对象的作用体现在:
- 允许访问 func1 产生的结果,可能是一个返回值也可能是一个异常。注意 future 是一个模板类,我们指定为 func1 的返回值类型 int。对于无返回值的类型我们可以声明一个
std::future<void>
。 - 保证 func1 的执行。async() 仅仅是尝试开始运行传入的 functionality,如果并没有运行,那在需要结果的时候,future 对象强制开始运行。
最后,我们需要使用到异步执行的函数的结果的时候,会使用 get()
。如:
int result = result1.get() + result2;
当我们使用 get()
的时候,可能发生以下三种情况:
- 如果 func1() 是使用 async() 在别的线程启动的,而且已经运行结束,可以立即得到结果。
- 如果 func1() 启动了但是还未结束,则 get() 会阻塞直到拿到结果。
- 如果 func1() 还未启动,则会强制启动,这时候就表现得像一个同步执行的程序。
可以看出,综合使用
std::future<int> result1(std::async(func1));
result1.get()
使得无论是否允许多线程,程序都能顺利完成。
为达到最佳使用效果,我们需要记住一条准则“call early and return late”,给予异步线程足够的执行时间。
async()
的参数可以是任何可调用对象:函数,成员函数,函数对象,或者 lambda。注意 lambda 后面不要习惯性的加上小括号。
std::async([]{ ... }) // try to perform ... asynchronously
Using Launch Policies
有时候我们希望子线程立刻开始,而不要等待调度。那么我们就需要使用 lauch policy 来显式指定,如果开始失败,会抛出系统错误异常。
// force func1() to start asynchronously now or throw std::system_error
std::future<long> result1 = std::async(std::launch::async, func1);
使用 std::launch::async 我们就不必再使用 get() 了,因为如果 result1 的生命周期结束了,程序会等待 func1 完成。因此,如果我没有调用 get(),在退出 result1 的作用域时一样会等待 func1 结束。然而,出于代码的可读性,推荐还是加上 get()。
同理,我们也可以指定子线程在 get() 时再运行。
auto f1 = std::async(std::launch::deferred, func1);
Waiting and Polling
future 的 get() 方法只能被调用一次,在使用 get() 之后,future 实例就变为 invalid 的了。而 future 也提供了 wait() 方法,可以被调用多次,并且可以加上时限。其调用形式为:
std::future<...> f(std::async(func));
f.wait_for(std::chrono::seconds(10)); // wait at most 10 seconds
std::future<...> f(std::async(func));
f.wait_until(std::system_lock::now() + std::chrono::minites(1)); // wait until a specific timepoint has reached
这两个函数的返回值一样,有三种:
- std::future_status::deferred
func 还没有开始执行。 - std::future_status::timeout
func 开始执行但是还没完成 - std::future_status::ready
func 执行完毕
综合使用 launch policy 和 wait 方法的例子如下,我们可以让两个线程分别进行计算,然后等待一定时间后输出结果。
#include <cstdio>
#include <future>
#include <iostream>
// defined here, lifetime of accurateComputation() may be longer than
// bestResultInTime()
std::future<double> f_slow;
double accurateComputation() {
std::cout << "Begin accurate computation..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Yield accurate answer..." << std::endl;
return 3.1415;
}
double quickComputation() {
std::cout << "Begin quick computation..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Yield quick answer..." << std::endl;
return 3.14;
}
double bestResultInTime(int seconds) {
auto tp = std::chrono::system_clock::now() + std::chrono::seconds(seconds);
// 立即开始
f_slow = std::async(std::launch::async, accurateComputation);
// 这两句顺序不可交换
double quick_result = quickComputation();
std::future_status f_status = f_slow.wait_until(tp);
if (f_status == std::future_status::ready) {
return f_slow.get();
} else {
return quick_result;
}
}
int main() {
using namespace std::chrono;
int timeLimit;
printf("Input execute time (in seconds):\n");
std::cin >> timeLimit;
printf("Execute for %d seconds\n", timeLimit);
auto start = steady_clock::now();
std::cout << "Result: " << bestResultInTime(timeLimit) << std::endl;
std::cout
<< "time elapsed: "
<< duration_cast<duration<double>>(steady_clock::now() - start).count()
<< std::endl;
}
// g++ -o async2_test async2.cpp -std=c++0x -lpthread
在上面程序中尤为值得注意有两点。
我们没有把 future 放在 bestResultInTime() 中。这是因为如果 future 是局部变量,退出 bestResultInTime() 时,future 的析构函数会阻塞直到产生结果。
wait 方法会阻塞等待,也就是说如果顺序不对,就会变成顺序执行,而非并行。
// 这两句顺序不可交换
double quick_result = quickComputation();
std::future_status f_status = f_slow.wait_until(tp);
如果交换,则会先等待直到 timepoint,然后再执行 quickComputation(),变成串行程序。
以上程序输出结果为:
Input execute time (in seconds):
3
Execute for 3 seconds
Begin quick computation...
Begin accurate computation...
Yield quick answer...
Result: 3.14
time elapsed: 3.00111
Yield accurate answer...
非常值得注意的是 main 函数结束后并没有立刻退出,而是等待 func 执行完毕后 future 析构。
给 wait_for() 方法传入 0,相当于立即获取 future 的状态。可以利用这点来得知某个任务 现在 是否开始了,或者是否还在运行。
The Low-Level Interface: Threads and Promises
C++ 标准库也提供了底层接口来启动和处理线程,我们可以先定义一个线程对象,以一个可调用对象来初始化,然后等待或者detach。
void doSomething();
std::thread t(doSomething); // start doSomething in the background
t.join(); // wait for t to finish (block until doSomething() ends)
如同 async() 一样,我们可以用任何可调用对象来初始化。但是作为一个底层的接口,一些在 async() 中的特性是不能使用的。
- thread 没有 launch policy。它总是立即开启新线程运行 func。类似于使用了 std::launch::async。
- 没有用于处理结果的接口。我们能获得的只有 thread ID(利用
get_id()
方法)。 - 如果出现异常,程序会立即停止。
- 我们需要声明我们是需要等待 thread 运行结束(使用
join()
),或者让它自己在别的线程运行。(使用detach()
) - 如果 main() 函数结束后,线程还在运行,则所有线程都会强制地结束。(future 会等待结束再析构)
以下例子显示了 join 和 detach 的区别。
我们新建了一个线程输出 +
,另外5个线程 detach 输出字母。按任意键将输出 +
的线程 join。程序在等待 +
打印结束后,就会停止,不管 detach 线程是否完成了任务。
#include <exception>
#include <iostream>
#include <random>
#include <thread>
void doSomething(int num, char c) {
try {
std::default_random_engine dre(c);
std::uniform_int_distribution<int> distribution(10, 1000);
for (int i = 0; i < num; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(distribution(dre)));
std::cout.put(c).flush();
}
} catch (const std::exception &e) {
std::cerr << "THREAD-EXCEPTION (thread " << std::this_thread::get_id()
<< e.what() << std::endl;
} catch (...) {
std::cerr << "THREAD-EXCEPTION (thread " << std::this_thread::get_id()
<< ")" << std::endl;
}
}
int main() {
try {
std::thread t1(doSomething, 5, '+');
std::cout << "- started fg thread " << t1.get_id() << std::endl;
for (int i = 0; i < 5; i++) {
std::thread t(doSomething, 10, 'a' + i);
std::cout << "- detach started bg thread " << t.get_id() << std::endl;
t.detach();
}
std::cin.get();
std::cout << "- join fg thread " << t1.get_id() << std::endl;
t1.join();
} catch (const std::exception &e) {
std::cerr << "EXCEPTION: " << e.what() << std::endl;
}
}
使用 detached 线程的时候,一定要注意尽量避免使用非局部变量。也就是说,它所使用的变量都要和它的生命周期相同。因为我们 detach 之后就丢失了对它的控制权,不能保证它会对其他线程中的数据做什么更改。尽量使用传值,而不要传引用。
对于 static 和 global 变量,我们无法阻止 detached 线程使用他们。如果我们已经销毁了某个 static 变量和 global 变量,detached 线程仍然在访问,那就会出现 undefined behavior。
在我们上面的程序中,detached 线程访问了 std::cin
, std::cout
, std::cerr
这些全局的流,但是这些访问时安全的,因为这些流会持续直到程序结束。然而,其他的全局变量不一定能保证。
Promises
现在我们需要考虑一个问题:如何才能在线程之间传递参数,以及处理异常。这也是上层的接口,例如 async()
的实现需要考虑的。当然,我们可以简单地进行处理,需要参数则传入参数,需要返回值则传入一个引用。
但是,如果我们需要获取函数的返回值或者异常,那我们就需要用到 std::promise
了。它就是对应于 future
的底层实现。我们可以利用 set_value()
以及set_exception()
方法来设置 promise 的值。
#include <future>
#include <iostream>
#include <thread>
void doSomething(std::promise<std::string> &p) {
try {
std::cout << "read char ('x' for exception): ";
char c = std::cin.get();
if (c == 'x') {
throw std::runtime_error(std::string("char ") + c + " read");
}
std::string s = std::string("char ") + c + " processed";
p.set_value(std::move(s)); // use move to avoid copying
} catch (...) {
p.set_exception(std::current_exception());
}
}
int main() {
try {
std::promise<std::string> p;
std::thread t(doSomething, std::ref(p));
t.detach();
std::future<std::string> f(p.get_future());
std::cout << "result: " << f.get() << std::endl;
} catch (const std::exception &e) {
std::cerr << "EXCEPTION: " << e.what() << std::endl;
} catch (...) {
std::cerr << "EXCEPTION " << std::endl;
}
}
以上程序定义了一个 promise,用这个 promise 初始化了一个 future,并在一个 detached 的线程之中为其赋值(可能返回 string 也可能是个 exception)。赋值过后,future 的状态会变成 ready。然后调用 get() 获取。
Synchronizing Threads
使用多线程的时候,往往都伴随着并发的数据访问,很少有线程相互独立的情况。
The only safe way to concurrently access the same data by multiple threads without synchronization is when ALL threads only READ the data.
然而,当多个线程访问同一个变量,并且至少一个线程会对它作出更改时,就必须同步了。这就叫做 data race。定义为“不同线程中的两个冲突的动作,其中至少一个动作是非原子的,两个动作同时发生”。
编程语言,例如C++,抽象化了不同的硬件和平台。因此,会有一个标准来指定语句和操作的作用,而不是每个语句具体生成什么汇编指令。也就是说,这些标准指定了 what,而不是 how。
例如,函数参数的 evaluation 顺序就是未指明的。编译器可以按照任何顺序对操作数求值,也可以在多次求值同一个表达式时选择不同的顺序。
因此,编译器几乎是一个黑箱,我们得到的只是外表看起来一致的程序。编译器可能会展开循环,整理表达式,去掉 dead code 等它认为的“优化”。
C++ 为了给编译器和硬件预留了足够的优化空间,并没有给出一些你期望的保证。我们可能会遇到如下的问题:
-
Unsynchronized data access
如果两个线程并行读写同样的数据,并不保证哪条语句先执行。
例如,下面的程序在单线程中确保了使用 val 的绝对值:
if (val >= 0) {
f(val);
} else {
f(-val);
}
但是在多线程中,就不一定能正常工作,val 的值可能在判断后改变。
-
Half-written data
如果一个线程读取数据,另一个线程修改,读取的线程可能会在写入的同时读取,读到的既不是新数据,也不是老数据,而是不完整的修改中的数据。
例如,我们定义如下变量:
long long x = 0;
新开一个线程 t1 更改变量的值:
x = -1;
在其他线程 t2 读取:
std::cout << x;
那么我们可能得到:
- 0(旧值),如果 t1 还没有赋值
- -1(新值),如果 t1 完成了赋值
- 其他值,如果 t2 在 t1 赋值的时候读取
这里解释一下第三类情况,假设已在一个 32 位机器上,存储需要 2 个单位,假设第一个单位已经被更改,第二个单位还没有更改,然后 t2 开始读取,就会出现其他值。
这类情况不止发生在 long long
类型上,即使是基础类型,C++ 标准也没有保证读写是原子操作。
-
Reordered statements
表达式和操作有可能会被改变顺序,因此单线程运行可能没问题,但是多线程运行,就会出错。
假设我们需要在两个线程之中共享一个 long,使用一个 bool 标志数据是否准备好。
// 定义
long data;
bool readyFlag = false;
// 线程 A
data = 42; // 1
readyFlag = true; // 2
// 线程 B
while (!readyFlag) {
;
}
foo(data);
粗一看似乎没有什么问题,整个程序只有在线程 A 给 data 赋值后,readyFlag 才会变 true。
以上代码的问题在于,如果编译器改变了 1,2 的语句顺序(这是允许的,因为编译器只保证在 一个 线程中的执行是符合预期的),那么就会出现错误。
The Features to Solve the Problems
为了解决上述问题,我们需要下面的几个概念。
- **Atomicity: ** 不被中断地、独占地读写变量。其他进程无法读取到中间态。
- **Order: ** 确保某些语句的顺序不被改变。
C++ 标准库提供了不同的方案来处理。
可以使用
future
以及promise
来同时保证 atomicity 以及 order,它保证了先设置 outcome,再处理 outcome,表明了读写肯定是不同步的。使用
mutex
和lock
来处理临界区(critical section)。只有得到锁的线程才能执行代码。使用条件变量,使进程等待其他进程控制的断言变为 true。
Mutexes and Locks
互斥量是通过提供独占的访问来控制资源的并发访问的对象。为了实现独占访问,对应的进程 “锁住” 互斥量,阻止别的进程访问直到 “解锁”。
一般使用 mutex 的时候,可能会这么使用:
int val
std::mutex valMutex;
// Thread A
valMutex.lock();
if (val >= 0) {
f(val);
} else {
f(-val);
}
valMutex.unlock();
// Thread B
valMutex.lock();
++val;
valMutex.unlock();
看起来似乎能够正常运行,但是如果 f(val) 中出现 exception,那么 unlock() 就不会执行,资源会被一直锁住。
lock_guard
为解决这个问题,C++ 标准库提供了一个在析构时能够释放锁的类型:std::lock_guard
。实现了类似于 Golang 中的 defer mu.unlock()
的功能。上面的例子可以改进为:
// Thread A
...
{ //新的scope
std::lock_guard<std::mutex> lg(valMutex);
if (val >= 0) {
f(val);
} else {
f(-val);
}
}
// Thread B
{
std::lock_guard<std::mutex> lg(valMutex);
++val;
}
需要注意新开了一个作用域。确保 lg 能在合适的地方析构。
再看一个完整的例子。
#include <future>
#include <mutex>
#include <iostream>
#include <string>
std::mutex printMutex;
void print(const std::string& str) {
std::lock_guard<std::mutex> lg(printMutex); // lg 初始化时自动锁定
for (char c : str) {
std::cout.put(c);
}
std::cout << std::endl;
} // lg析构时自动解锁
int main() {
auto f1 = std::async(std::launch::async, print, "Hello from first thread");
auto f2 = std::async(std::launch::async, print, "Hello from second thread");
print("Hello from main thread");
}
如果不加锁会乱序打印。
unique_lock
有时候,我们并不希望在锁初始化的同时就上锁。C++ 还提供了 unique_lock<> 类,它与 lock_guard<> 接口一致,但是它允许程序显式地决定什么时候、怎样上锁和解锁。它还提供了 owns_lock()
方法来查询是否上锁。使用更佳灵活,最常用的场景就是配合条件变量使用。具体例子在后面介绍 condition_variable 的部分。
处理多个锁
这个的处理多个锁并不是说挨个上锁,而是假设一个线程执行同时需要用到多个资源,应该要么一起锁上,要么全都不锁。否则很容易出现死锁。
例如线程 A 和 B 都需要锁 m1 和 m2,而线程 A 获得了 m1,在请求 m2,而线程 B 获得了 m2,在请求 m1,这时候就会相互等待,发生死锁。
C++ 标准库提供了 lock()
函数以解决上述问题。它的功能简单来讲就是要么都锁,要么都不锁。
以下实现了一个简单的银行转帐的例程。
#include <mutex>
#include <thread>
#include <iostream>
struct Bank_account {
explicit Bank_account(int Balance) : balance(Balance) {}
int balance;
std::mutex mtx;
};
void transfer(Bank_account& from, Bank_account& to, int amount) {
/*
// std::adopt_lock 假设已经上过锁,在初始化时不会再加锁,但是保留了析构释放锁的功能
std::lock(from.mtx, to.mtx);
std::lock_guard<std::mutex> lg1(from.mtx, std::adopt_lock);
std::lock_guard<std::mutex> lg2(to.mtx, std::adopt_lock);
*/
// equivalent approach:
std::unique_lock<std::mutex> ulock1(from.mtx, std::defer_lock);
std::unique_lock<std::mutex> ulock2(to.mtx, std::defer_lock);
std::lock(ulock1, ulock2); // 这里锁的是封装过后的 mutex
from.balance -= amount;
to.balance += amount;
}
int main() {
Bank_account a(100);
Bank_account b(30);
// 注意使用 std::ref
std::thread t1(transfer, std::ref(a), std::ref(b), 20);
std::thread t2(transfer, std::ref(b), std::ref(a), 10);
t1.join();
t2.join();
std::cout << " a now has " << a.balance << ", b now has " << b.balance << std::endl;
}
以上例程有三个重点。已经在程序中中文标注。
重点讲一下 std::ref,这里如果不加 std::ref 会报错。说一下个人理解。
虽然这里已经在函数中说明了是引用:
void transfer(Bank_account& from, Bank_account& to, int amount);
但是我们实际是传参数给了 std::thread 类的构造函数。它是一个模板类,默认肯定是进行值传递的。因此我们有必要在这里声明是引用传递,这样可以将模板改为引用。例如
template <typename T>
void foo (T val);
如果我们使用
int x;
foo (std::ref(x));
则模板类自动以 int& 为参数类型。
具体可以参见 std::reference_wrapper<> 类。
Condition Variables
有时,不同线程运行的任务可能需要互相等待。因此,除了访问同一个数据,我们还有其他使用同步的场景,即逻辑上的依赖关系。
你可能会认为我们已经介绍过了这种机制:Futures 允许我们阻塞直到另一个线程执行结束。但是,Future 实际上是设计来处理返回值的,在这个场景下使用并不方便。
这里我们将介绍条件变量,它可以用于同步线程间的逻辑依赖。
在引入条件变量之前,为了实现这个功能,只能采用轮询的方法,设置一个时间间隔,不断去检查。例如:
bool readyFlag;
std::mutex readyFlagMutex; // wait until readyFlag is true:
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
while (!readyFlag) {
ul.unlock();
std::this_thread::yield(); // hint to reschedule to the next thread
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ul.lock();
}
} // release lock
这显然不是一个好的方案,因为时间间隔设置太短和太长都不行。但是它表现了条件变量的基本思想。就是在未满足条件时,放弃对锁和 cpu 资源的占有,让别的线程运行,等待条件满足跳出 while。但是条件变量没有采取轮询而是采用一个信号通知。
一个典型应用是消费者-生产者模型。
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <future> // for async
std::queue<int> q;
std::mutex mu;
std::condition_variable condVar;
void provider(int val) {
for (int i=0; i<6; i++) {
{
std::lock_guard<std::mutex> lg(mu);
q.push(val + i);
}
condVar.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(val));
}
}
void consumer(int id) {
while (true) {
int val;
{
std::unique_lock<std::mutex> ul(mu);
condVar.wait(ul, [](){return !q.empty();});
val = q.front();
q.pop();
}
std::cout << "consumer " << id << ": " << val << std::endl;
}
}
int main() {
// 3 个生产者
auto p1 = std::async(std::launch::async, provider, 100);
auto p2 = std::async(std::launch::async, provider, 300);
auto p3 = std::async(std::launch::async, provider, 500);
// 2 个消费者
auto c1 = std::async(std::launch::async, consumer, 1);
auto c2 = std::async(std::launch::async, consumer, 2);
}
Atomic
这是实现 lock-free 的重要类型。非常值得深入了解。
atomic 的效率比锁要快很多,在 linux 下大概快 6 倍。
具体应用可以看我的 github 项目:使用mmap实现文件极速无锁并行写入
书中有一处错误:
经过实验,实际上返回的并不是 new value 而是 previous value。具体可以参考cppreference。
而且还能用于实现 spinlock。
附录
补充:C++11 中的随机数生成方法
关于为什么要引入新的随机数生成方法,参考这里。
标准流程如下所示。
std::random_device rd; // 随机数种子 generator
std::default_random_engine e(rd()); // 原始随机数 generator
std::uniform_int_distribution<> u(5,20); // 在 [5,20] 上的均匀分布
for ( size_t i = 0 ; i < 10 ; i ++ ) {
cout << u ( e ) << endl ; // 迫使原始随机数服从规定的分布
}
一个简单的例子,观察其转化关系。
#include <iostream>
#include <random> // for default_random_engine, uniform_int_distribution
using namespace std;
void randomPrint () {
// random-number generator(use c as seed to get different sequences)
std::random_device rd;
std::default_random_engine dre(rd());
std::uniform_int_distribution<int> id(10,100);
for (int i=0; i<10; i++) {
// random test
cout << dre() << " => " <<id(dre) << endl;
}
}
int main() {
randomPrint();
}
输出为:
1337774351 => 49
354763686 => 93
1223972804 => 56
827960471 => 33
54361696 => 94
540202105 => 51
90156615 => 84
1553865488 => 64
780656749 => 21
134206787 => 74
更加详细的可参考这里。
有疑问加站长微信联系(非本文作者)