类的功能
-
task (任务基类)
该类主要实现一个任务类
virtual int dowork() = 0; -
taskqueue (任务队列)
该类主要针对任务的存储、删除、撤回等状态做管理 -
threadpool (线程池)
整个线程池的核心业务处理类
代码
- task.h
//任务的基类
#pragma once
#include
#include
//任务的基类
class task
{
public:
//构造、析构函数
task():_id(_nrequestid ),_iscancelrequired(false),_createtime(clock()){}
~task(){};
// 任务类虚接口,继承这个类的必须要实现这个接口
virtual int dowork(void) = 0;
// 任务已取消回调
virtual int oncanceled(void)
{
return 1;
}
// 任务已完成
virtual int oncompleted(int)
{
return 1;
}
// 任务是否超时
virtual bool istimeout(const clock_t& now)
{
return ((now - _createtime) > 5000);
}
// 获取任务id
size_t getid(void)
{
return _id;
}
//获取任务取消状态
bool iscancelrequired(void)
{
return _iscancelrequired;
}
//设置任务取消状态
void setcancelrequired(void)
{
_iscancelrequired = true;
}
protected:
size_t _id; //任务的唯一标识
clock_t _createtime; //任务创建时间,非unix时间戳
private:
static std::atomic _nrequestid;
std::atomic _iscancelrequired; //任务取消状态
};
//selectany可以让我们在.h文件中初始化一个全局变量而不是只能放在.cpp中。
//这样的代码来初始化这个全局变量。既是该.h被多次include,链接器也会为我们剔除多重定义的错误。
__declspec(selectany) std::atomic task::_nrequestid = 100000;
- taskqueue.h
#pragma once
#include
#include
#include
#include <unordered_map>
#include
#include
//任务队列
template
class taskqueue
{
public:
//向队列的末尾插入任务,task是任务类
void put_back(std::shared_ptr& task)
{
std::unique_lock lock(_mutexqueue);
_queue.push_back(task);
_conditput.notify_one();
}
//向队列的头部插入任务
void put_front(std::shared_ptr& task)
{
std::unique_lock lock(_mutexqueue);
_queue.push_front(task);
_conditput.notify_one();
}
//获取队首(并将任务加到运行任务列表中),返回tase是任务类
std::shared_ptr get(void) {
std::unique_lock lock(_mutexqueue);
if (_queue.empty())
return nullptr;
//lock_guard取代了mutex的lock()和unlock();
std::lock_guard lock_doing_task(_mutexdoingtask);
std::shared_ptr& task = _queue.front();
_mapdoingtask.insert(std::make_pair(task->getid(), task));
_queue.pop_front();
return task;
}
//获取双向链表queue的大小
size_t size(void)
{
std::unique_lock lock(_mutexqueue);
return _queue.size();
}
//释放队列
void release(void)
{
deletealltasks();
_conditput.notify_all();
}
//删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
int deletetask(size_t nid)
{
std::unique_lock lock(_mutexqueue, std::defer_lock);
lock.lock();
auto it = _queue.begin();
for (; it != _queue.end(); it)
{
if ((*it)->getid() == nid)
{
_queue.erase(it);
lock.unlock();
return 0;
}
}
//下面的逻辑可能会造成死锁,这里要及时释放
lock.unlock();
// 试图取消正在执行的任务
{
std::lock_guard lock_doing_task(_mutexdoingtask);
auto it_map = _mapdoingtask.find(nid);
if (it_map != _mapdoingtask.end())
it_map->second->setcancelrequired();
}
//任务执行完后再返回
while (_mapdoingtask.count(nid))
std::this_thread::sleep_for(std::chrono::milliseconds(20));
return 0;
}
//删除所有任务
int deletealltasks(void)
{
std::unique_lock lock(_mutexqueue, std::defer_lock);
lock.lock();
if (!_queue.empty())
_queue.clear();//清空
{
std::lock_guard lock_doing_task(_mutexdoingtask);
if (!_mapdoingtask.empty())
{
auto it_map = _mapdoingtask.begin();
for (; it_map != _mapdoingtask.end(); it_map)
it_map->second->setcancelrequired();
}
}
lock.unlock();
//任务执行完后再返回
while (!_mapdoingtask.empty())
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return 0;
}
//任务完成回调(从运行列表中删除指定任务)
int ontaskfinished(size_t nid)
{
std::lock_guard lock_doing_task(_mutexdoingtask);
auto it_map = _mapdoingtask.find(nid);
if (it_map != _mapdoingtask.end())
_mapdoingtask.erase(it_map);
return 0;
}
//判断任务是否执行完毕
std::shared_ptr istaskprocessed(size_t nid)
{
std::lock_guard lock_queue(_mutexqueue);
auto it = _queue.begin();
for (; it != _queue.end(); it) {
if ((*it)->getid() == nid)
return *it;
}
std::lock_guard lock_doing_task(_mutexdoingtask);
auto it_map = _mapdoingtask.find(nid);
if (it_map != _mapdoingtask.end())
return it_map->second;
return nullptr;
}
//等待有任务到达(带超时:超时自动唤醒)
bool wait(std::chrono::milliseconds millsec)
{
std::unique_lock lock(_mutexconditput);
_conditput.wait_for(lock, millsec);
return true;
}
private:
//就绪的任务
std::mutex _mutexqueue;
std::deque> _queue;
//条件变量
std::mutex _mutexconditput;
std::condition_variable _conditput;
//运行的任务
std::mutex _mutexdoingtask;
std::unordered_map > _mapdoingtask;
};
- threadpool.h
#pragma once
#include
#include
#include
#include
#include
#include "task.h"
#include "taskqueue.h"
class threadpool
{
public:
// 线程池配置参数
typedef struct tagthreadpoolconfig {
int nmaxthreadsnum; // 最大线程数量
int nminthreadsnum; // 最小线程数量
double dbtaskaddthreadrate; // 增 最大线程任务比 (任务数量与线程数量,什么比例的时候才加)
double dbtasksubthreadrate; // 减 最小线程任务比 (任务数量与线程数量,什么比例的时候才减)
} threadpoolconfig;
public:
//构造函数
threadpool(void):_taskqueue(new taskqueue()), _atccurtotalthrnum(0), _atcworking(true){}
//析构函数
~threadpool(void)
{
release();
}
//初始化资源
int init(const threadpoolconfig& threadpoolconfig) {
// 错误的设置
if (threadpoolconfig.dbtaskaddthreadrate < threadpoolconfig.dbtasksubthreadrate)
return 87;
_threadpoolconfig.nmaxthreadsnum = threadpoolconfig.nmaxthreadsnum;
_threadpoolconfig.nminthreadsnum = threadpoolconfig.nminthreadsnum;
_threadpoolconfig.dbtaskaddthreadrate = threadpoolconfig.dbtaskaddthreadrate;
_threadpoolconfig.dbtasksubthreadrate = threadpoolconfig.dbtasksubthreadrate;
int ret = 0;
// 创建线程池
if (_threadpoolconfig.nminthreadsnum > 0)
ret = addprothreads(_threadpoolconfig.nminthreadsnum);
return ret;
}
// 添加任务
int addtask(std::shared_ptr taskptr, bool priority=false)
{
const double& rate = getthreadtaskrate();
int ret = 0;
if (priority)
{
if (rate > 1000)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
_taskqueue->put_front(taskptr);
}
else
{
// 检测任务数量
if (rate > 100) {
taskptr->oncanceled();
return 298;
}
// 将任务推入队列
_taskqueue->put_back(taskptr);
}
// 检查是否要扩展线程
if (_atccurtotalthrnum < _threadpoolconfig.nmaxthreadsnum
&& rate > _threadpoolconfig.dbtaskaddthreadrate)
ret = addprothreads(1);
return ret;
}
// 删除任务(从就绪队列删除,如果就绪队列没有,则看执行队列有没有,有的话置下取消状态位)
int deletetask(size_t nid)
{
return _taskqueue->deletetask(nid);
}
// 删除所有任务
int deletealltasks(void)
{
return _taskqueue->deletealltasks();
}
std::shared_ptr istaskprocessed(size_t nid)
{
return _taskqueue->istaskprocessed(nid);
}
// 释放资源(释放线程池、释放任务队列)
bool release(void)
{
// 1、停止线程池。
// 2、清楚就绪队列。
// 3、等待执行队列为0
releasethreadpool();
_taskqueue->release();
int i = 0;
while (_atccurtotalthrnum != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 异常等待
if (i == 10)
exit(23);
}
_atccurtotalthrnum = 0;
return true;
}
// 获取当前线程任务比
double getthreadtaskrate(void)
{
if (_atccurtotalthrnum != 0)
return _taskqueue->size() * 1.0 / _atccurtotalthrnum;
return 0;
}
// 当前线程是否需要结束
bool shouldend(void)
{
bool bflag = false;
double dbthreadtaskrate = getthreadtaskrate();
// 检查线程与任务比率
if (!_atcworking || _atccurtotalthrnum > _threadpoolconfig.nminthreadsnum
&& dbthreadtaskrate < _threadpoolconfig.dbtasksubthreadrate)
bflag = true;
return bflag;
}
// 释放线程池
bool releasethreadpool(void)
{
_threadpoolconfig.nminthreadsnum = 0;
_threadpoolconfig.dbtasksubthreadrate = 0;
_atcworking = false;
return true;
}
// 添加指定数量的处理线程
int addprothreads(int nthreadsnum)
{
try {
for (; nthreadsnum > 0; --nthreadsnum)
std::thread(&threadpool::taskprocessthread, this).detach();
}
catch (...){
return 155;
}
return 0;
}
// 任务处理线程函数
void taskprocessthread(void)
{
int ntaskprocret = 0;
// 线程增加
_atccurtotalthrnum.fetch_add(1);
std::chrono::milliseconds mills_sleep(500);
std::shared_ptr ptask;
while (_atcworking)
{
// 从任务队列中获取任务
ptask = _taskqueue->get(); //get会将任务添加到运行任务的map中去
if (ptask == nullptr)
{
if (shouldend())
break;
// 进入睡眠池
_taskqueue->wait(mills_sleep);
continue;
}
// 检测任务取消状态
if (ptask->iscancelrequired())
ptask->oncanceled();
else
// 处理任务
ptask->oncompleted(ptask->dowork());
// 从运行任务队列中移除任务
_taskqueue->ontaskfinished(ptask->getid());
// 判断线程是否需要结束
if (shouldend())
break;
}
// 线程个数减一
_atccurtotalthrnum.fetch_sub(1);
}
private:
std::shared_ptr > _taskqueue; //任务队列
threadpoolconfig _threadpoolconfig; //线程池配置
std::atomic _atcworking; //线程池是否被要求结束
std::atomic _atccurtotalthrnum; //当前线程个数
};
- funtask.h
#pragma once
#include
#include "task.h"
class functask:public task
{
public:
functask(std::function f) : _pf(f) {}
functask(void) : _pf(nullptr){}
virtual ~functask(){}
template
void asynbind(f(*f)(args...), args... args)
{
_pf = std::bind(f, args...);
}
virtual int dowork()
{
if (_pf == nullptr)
return 86;
return _pf();
}
private:
typedef std::function pvfunc;
pvfunc _pf;
};
- main.cpp
#pragma once
#include
#include
#include
#include
#include "threadpool.h"
#include "functask.h"
using namespace std;
int vfunction(void)
{
std::cout << __function__ << std::endl;
return 0;
}
int counter(int a,int b)
{
std::cout << a << ":" << b << std::endl;
return 0;
}
int main()
{
threadpool::threadpoolconfig threadpoolconfig;
threadpoolconfig.nmaxthreadsnum = 100;
threadpoolconfig.nminthreadsnum = 5;
threadpoolconfig.dbtaskaddthreadrate = 3;
threadpoolconfig.dbtasksubthreadrate = 0.5;
clock_t start = clock();
{
std::shared_ptr threadpool(new threadpool);
threadpool->init(threadpoolconfig);
int i = 1;
while (true)
{
/*std::shared_ptr request(new functask(vfunction));
threadpool->addtask(request);*/
std::shared_ptr request(new functask);
request->asynbind(counter, i , 1);
threadpool->addtask(request);
if (request->getid() == 110000) {
break;
}
}
threadpool->release();
}
clock_t finish = clock();
std::cout << "duration:" << finish - start << "ms" << std::endl;
cout << "main:thread" << endl;
return 0;
}