1. 背景
c 11中提供了对线程与条件变量的更好支持,对于写多线程程序方便了很多。
再看c 并发编程,记一下学习笔记。
2. c 11 提供的相关api
3.1 wait
wait用于无条件等待,其中predicate表示校验条件,可以避免假唤醒。
unconditional (1)
void wait (unique_lock& lck);
predicate (2)
template
void wait (unique_lock& lck, predicate pred);
3.2 wait for
wait_for可以指定超时时间,其中predicate表示校验条件,可以避免假唤醒。
unconditional (1)
template
cv_status wait_for (unique_lock& lck,
const chrono::duration& rel_time);
predicate (2)
template
bool wait_for (unique_lock& lck,
const chrono::duration& rel_time, predicate pred);
3. 线程安全队列示例(生产者与消费者模型)
一个生产者向队列中添加数据;多个消费者从队列中读取任务。
#include
#include
#include
#include
#include
#include
template
class threadsafe_queue
{
private:
std::mutex mut;
std::queue data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue(){ }
void push(t new_value) {
std::lock_guard lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
//无限等待
int pop(t& value) {
std::unique_lock lk(mut);
// (1) 带判断条件的wait, 条件不满足则继续等待; 满足则继续后续代码
data_cond.wait(lk,[this]{return !data_queue.empty();});
// (2)wait唤醒后需要再次判断, 避免假唤醒
//while(true){
// data_cond.wait(lk);
// if (data_queue.empty())
// continue;
// else
// break;
//}
value=data_queue.front();
data_queue.pop();
return 0;
}
//有限等待
int pop_with_timeout(t& value, int timeout) {
if (timeout < 0){
return this->pop(value);
}
std::unique_lock lk(mut);
//带超时, 带判断条件的wait
data_cond.wait_for(lk, std::chrono::milliseconds(timeout), [this] {
std::cout << "thread id: " << std::this_thread::get_id() << " wait..." << std::endl;
return !data_queue.empty();}
);
if (!data_queue.empty()){
value=data_queue.front();
data_queue.pop();
return 0;
}
return -1;
}
bool is_empty(){
std::lock_guard lk(mut);
return data_queue.empty();
}
};
template
void consume(threadsafe_queue &queue, bool &stop){
while(true){
if (stop && queue.is_empty()){ //结束条件
break;
}
int job_id = 0;
if (0 == queue.pop_with_timeout(job_id, 3)){
std::cout << "thread id: " << std::this_thread::get_id() << ", job:" << job_id << std::endl;
}
std::this_thread::sleep_for (std::chrono::milliseconds(5));
}
}
template
void product(threadsafe_queue &queue, bool &stop){
for (auto i = 0; i < 100; i){
queue.push(i);
std::this_thread::sleep_for (std::chrono::milliseconds(1));
}
stop = true; //设置结束条件
}
int main(){
threadsafe_queue my_queue;
bool stop_flag = false;
//生产者
std::thread prod(product, std::ref(my_queue), std::ref(stop_flag));
//消费者
std::vector cons;
for(auto i = 0; i < 5; i){
std::thread tmp = std::thread(consume, std::ref(my_queue), std::ref(stop_flag));
cons.emplace_back(std::move(tmp));
}
prod.join();
for(auto &t : cons){
t.join();
}
return 0;
}