以下内容为本人的学习笔记,如需要转载,请将本段内容(无删改)张贴于文章顶部:微信公众号「ENG八戒」mp.weixin.qq.com/s/vCKVYZYdq…,更多无限制精彩内容欢迎查阅我的个人博客站点 ENG八戒
这是系列文章的上文,之所以分成两篇是是因为篇幅过长了,正所谓好东西要慢慢消化!
一提到异步,大家可能想到的是多线程/进程编程,甚至分布式编程,这符合大方向,没错。不过,笔者在这篇文章里,打算聊的是从古早的写法到现代版语言标准下,异步计算在多线程的演进。
举个题目为例,假设有个比较费时的计算任务,需要放在后台线程(子线程)中执行,然后前台线程(当前线程)适时去读取计算结果,那么正在阅读本文的你会如何实现?
经典的味道
按照以前的思路(也就是古早写法),这需要首先创建新的线程,将任务函数和函数参数传入,还需要共享数据缓冲用于传递计算结果,再在跨线程中运用互斥量、条件变量、原子量等实现数据同步。
经典 C++ 时期,创建线程必须要依赖系统提供的线程库,比如 POSIX 的 pthread()。这种 API 都比较老旧,而且使用不方便,分分钟让你怀疑人生。
那么我们就来看看传统的写法:
c 代码解读复制代码#include
#include
#include
struct data_t {
int a;
int b;
};
void *compute(void* arg) {
data_t *data = static_cast<data_t *>(arg);
int *sum = new int;
if (nullptr == sum) {
std::cerr << "create buff for task fail" << std::endl;
return nullptr;
}
*sum = data->a + data->b;
sleep(2); // 模拟耗时
return sum;
}
int main() {
pthread_t thread;
data_t data = {3, 4};
void *ret_val = nullptr;
if (pthread_create(&thread, nullptr, compute, &data) != 0) {
std::cerr << "create thread err!" << std::endl;
return -1;
}
if (pthread_join(thread, &ret_val) != 0) {
std::cerr << "get compute result err!" << std::endl;
return -2;
} else {
if (nullptr == ret_val) {
std::cerr << "get compute result fail!"
<< std::endl;
return -3;
} else {
std::cout << "get compute result:"
<< *static_cast<int*>(ret_val)
<< std::endl;
}
}
return 0;
}
compute 是任务函数,计算结果通过函数返回。为了模拟耗时动作,直接调用 sleep()。
POSIX 库提供的 pthread_create 用于创建子线程,传入第一个参数用于返回线程句柄,第二个参数是线程属性(空就是默认),第三个参数是固定类型为 void*(void*)
的线程函数(我们这里传入 compute),最后一个参数就是传递给线程函数的参数。
你看,线程函数都是固定格式的,传入的参数也被限制为空类型的指针,比较死板,所以任务函数的参数必须先封装在结构体内再以指针的形式传递。
pthread_join 用于等待线程函数返回并获取返回值。
从代码篇幅来看,有些啰里啰唆的。
运行输出:
sql 代码解读复制代码get compute result:7
示例代码目标是对两个数(3 和 4)取和,运行结果 OK。
为什么 c++ 语言层面没有提供线程相关的集成类或者函数?
std::thread
进入 C++ 11 后,标准库提供了非常方便的类 std::thread 用来创建线程对象。
std::thread 又是如何使用?把上面的例子重写一遍:
c 代码解读复制代码#include
#include
#include
void compute(int a, int b, int **res) {
int *sum = new int;
*sum = a + b;
// 模拟耗时
std::this_thread::sleep_for(std::chrono::seconds(1));
if (nullptr == res) {
std::cerr << "can not transfer result" << std::endl;
} else {
*res = sum;
}
}
int main() {
int *ret_val = nullptr;
std::thread t(compute, 3, 4, &ret_val);
t.join();
if (ret_val != nullptr) {
std::cout << "get compute result:"
<< *ret_val
<< std::endl;
delete ret_val;
ret_val = nullptr;
}
return 0;
}
相比上一个使用系统 API 创建异步任务的做法,使用 std::thread 类要稍微简单一些,而且更灵活了,比如线程函数的参数变成了可变长参数。
但是,std::thread 提供的 join() 方法不能获取线程执行函数的返回值,所以需要利用其他方式获取异步计算结果。
为了获取异步计算的结果值,需要特意往子线程的执行函数传递缓存地址的指针 res,线程执行函数内会分配用于保存计算结果的堆缓存,堆缓存地址再通过 res 返回。
这样传递指针,虽然可以正常执行,但是会引入跨线程的数据共享问题,进而需要添加更多的同步措施。
当前的例子过于简单,没有过多的数据交互,仅需要在异步线程结束之后才读取缓存结果即可。如果面对多个线程竞争数据读写,那么编写的过程又会变得麻烦。
有没有更方便的特性可供使用?
std::promise 和 std::future
为了方便异步编程获取计算结果,C++ 11 同样提供了 std::promise 和 std::future 配合使用。
std::promise 是一个模板类,顾名思义就是承诺,用途是在异步任务中一旦计算完成,利用它履行赋值的承诺,通过 std::promise 对象的 set_value() 实现。
std::future 也是一个模板类,对象存储的是在将来会被赋值的结果。std::future 也提供了方法用于获取结果,比如 get(),在结果就绪之前,该方法会提供阻塞机制。结果是由 std::promise 对象提供,所以它俩是关联的。我们无需自己创建 std::future 对象,可通过 std::promise 对象的 get_future() 获取。
看到这里,你可能会瞬间感叹:标准库还有这等利器!
利器用上,赶紧把上一个例子重新实现一下,看效果:
c 代码解读复制代码#include
#include
#include
#include
#include
void compute(int a, int b, std::promise<int> promise_) {
int sum = a + b;
// 模拟耗时
std::this_thread::sleep_for(std::chrono::seconds(1));
promise_.set_value(sum);
}
int main() {
std::promise<int> promise_;
std::future<int> future_ = promise_.get_future();
std::thread t(compute, 3, 4, std::move(promise_));
std::cout << "get compute result:"
<< future_.get()
<< std::endl;
t.join();
return 0;
}
在创建子线程之前,实例化 std::promise 的对象 promise_,利用 std::promise 对象的 get_future() 方法获取 std::promise 对象协助创建的 std::future 对象 future_。
创建 std::thread 对象时,在传入线程函数时,附带传入的函数参数中需要包括 promise_ 对象。我这里使用 std::move 将主线程中的 promise_ 对象转化为可移动的值,这样主线程中 main 函数的 promise_ 对象资源和状态都会被转移到线程函数中的新 promise_ 对象,包括和 future_ 对象的联系。
在线程函数中,一旦计算完成,只需要调用 promise_ 对象的 set_value() 方法,就可将结果赋值给 future_ 对象。
在主线程中,future_ 对象调用 get() 可返回被赋值的计算结果,如果计算结果未就绪,get() 会等待并阻塞当前线程。
std::promise::set_value 和 std::future::get() 仅能被调用一次,否则会抛出异常 std::future_error。如果你需要 get() 多次,可以改为使用 std::shared_future。
从上面的代码来看,std::promise 和 std::future 的使用减少了对线程间的数据同步问题的关心,大大简化互动过程。
但是,如果你细心看上面的示例代码,有个问题:为什么 std::thread 的 join() 必须在 future_.get() 之后才执行?如果我把它们的调用顺序反过来,等待任务线程结束并返回后,主线程才去读取 future_ 的值呢?
c 代码解读复制代码int main() {
std::promise<int> promise_;
std::future<int> future_ = promise_.get_future();
std::thread t(compute, 3, 4, std::move(promise_));
t.join();
std::cout << "get compute result:"
<< future_.get()
<< std::endl;
// t.join();
return 0;
}
编译输出:
sql 代码解读复制代码get compute result:7
编译过程没有报错,计算结果也正常。
任务线程结束并返回后,和 future_ 对象关联的 promise_ 对象已经被释放和失效,可是这时 future_ 对象还能正常返回原来 promise_ 对象赋值的结果,说明 future_ 对象虽然由 promise_ 对象创建,但是内部资源不受 promise_ 对象同步释放。
上文到此为止,欢迎关注我!
评论记录:
回复评论: