在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。
主要有以下几个功能:
1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。
2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)
3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去
4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少
大体框架主要由3个类构成
1、cjob,任务类,用户需要从该类派生来实现自身需要完成的任务
2、cjobexecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程
3、cmthreadedjobq,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。
类图如下:
该例子中,cjobexecuter和cmthreadjobq这两个类的调用关系是非常值得我们学习的,同时,cjob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:
job.h文件:
class cjob { public: cjob(); virtual ~cjob(); bool m_completed; //任务是否完成:true 完成,false 未完成 static long lastusedid; //最后的id //================================================================================================ //函数名: setpriority //函数描述: 设置任务优先级 //输入: [in] priority 优先级别 //输出: 无 //返回: 无 //================================================================================================ void setpriority(int priority); //================================================================================================ //函数名: getpriority //函数描述: 返回任务优先级 //输入: 无 //输出: 无 //返回: 任务优先级 //================================================================================================ int getpriority(); //================================================================================================ //函数名: getid //函数描述: 返回任务id //输入: 无 //输出: 无 //返回: 任务id //================================================================================================ long getid(); //================================================================================================ //函数名: setautodelete //函数描述: 设置完成任务后是否删除任务 //输入: [in] autodeleteflag //输出: 无 //返回: 无 //================================================================================================ void setautodelete(bool autodeleteflag = true); //================================================================================================ //函数名: autodelete //函数描述: 返回删除任务标记 //输入: 无 //输出: 无 //返回: 任务标记 //================================================================================================ bool autodelete(); //================================================================================================ //函数名: execute //函数描述: 任务真正工作的函数,纯虚函数,需要子类化实现 //输入: 无 //输出: 无 //返回: 任务id //================================================================================================ virtual void execute() = 0; private: long m_id; //任务id bool m_autodeleteflag; //是否自动删除任务标记,true 删除,false 不删除,默认为true int m_priority; //任务优先级,默认为5 };
job.cpp文件:
long cjob::lastusedid = 0; cjob::cjob() { this->m_id = interlockedincrement(&lastusedid); this->m_autodeleteflag = true; this->m_priority = 5; this->m_completed= false; } cjob::~cjob() { } bool cjob::autodelete() { return m_autodeleteflag; } void cjob::setautodelete(bool autodeleteflag) { m_autodeleteflag = autodeleteflag; } long cjob::getid() { return this->m_id; } int cjob::getpriority() { return this->m_priority; } void cjob::setpriority(int priority) { this->m_priority = priority; }
jobexecuter.h文件:
//一个对象对应一个线程,执行任务job class cjobexecuter { public: cjobexecuter(cmthreadedjobq *pjobq); virtual ~cjobexecuter(); //================================================================================================ //函数名: stop //函数描述: 停止执行任务 //输入: 无 //输出: 无 //返回: 无 //================================================================================================ void stop(); //================================================================================================ //函数名: execute //函数描述: 执行一个任务 //输入: [in] pjob 任务指针 //输出: 无 //返回: 无 //================================================================================================ void execute(cjob* pjob); static uint threadfunction(lpvoid pparam); //线程函数 cmthreadedjobq* m_pjobq; //指向线程任务队列指针 cjob* m_pjob2do; //指向正在执行任务的指针 int m_flag; //线程执行标记 cwinthread* m_pexecuterthread; //线程标识符 };
jobexecuter.cpp文件:
#define stop_working -1 #define keep_working 0 cjobexecuter::cjobexecuter(cmthreadedjobq *pjobq) { this->m_pjobq= pjobq; this->m_pexecuterthread= afxbeginthread(threadfunction,this); this->m_pjob2do = null; this->m_flag = keep_working; } cjobexecuter::~cjobexecuter() { if(this->m_pexecuterthread!= null ) { this->m_pexecuterthread->exitinstance(); delete m_pexecuterthread; } } uint cjobexecuter::threadfunction(lpvoid pparam) { cjobexecuter *pexecuter = (cjobexecuter *)pparam; pexecuter->m_flag = 1; ::sleep(1); csinglelock singlelock(&pexecuter->m_pjobq->m_cs); while(pexecuter->m_flag !=stop_working ) { if(pexecuter->m_pjob2do!= null) { pexecuter->m_pjob2do->execute(); pexecuter->m_pjob2do->m_completed = true; if(pexecuter->m_pjob2do->autodelete()) delete pexecuter->m_pjob2do; pexecuter->m_pjob2do = null; } if(pexecuter->m_pjobq == null) break; csinglelock singlelock(&pexecuter->m_pjobq->m_cs); singlelock.lock(); if(pexecuter->m_pjobq->getnoofexecuter() > pexecuter->m_pjobq->getmaxnoofexecuter()) //cjobexecuter个数大于最大值,自动销毁 { pexecuter->stop(); singlelock.unlock(); } else { pexecuter->m_pjobq->addfreejobexecuter(pexecuter); //完成任务后,添加到cmthreadedjobq的空闲队列中 singlelock.unlock(); pexecuter->m_pjobq->m_pobserverthread->resumethread(); pexecuter->m_pexecuterthread->suspendthread(); } } if(pexecuter->m_pjobq != null) { pexecuter->m_pjobq->deletejobexecuter(pexecuter); } else { delete pexecuter; } return 0; } void cjobexecuter::execute(cjob* pjob) { this->m_pjob2do = pjob; ::sleep(0); this->m_pexecuterthread->resumethread(); } void cjobexecuter::stop() { this->m_flag = stop_working; this->m_pexecuterthread->resumethread(); }
mthreadedjobq.h文件:
typedef ctypedptrlist< cptrlist ,cjob*>cjobqlist; //线程池任务队列 class cmthreadedjobq { public: typedef struct thnode { cjobexecuter* pexecuter; thnode * pnext ; } thnode; cmthreadedjobq(); virtual ~cmthreadedjobq(); //================================================================================================ //函数名: deletejobexecuter //函数描述: 删除一个jobexecuter对象 //输入: [in] pex //输出: 无 //返回: 无 //================================================================================================ void deletejobexecuter(cjobexecuter *pex); //================================================================================================ //函数名: setmaxnoofexecuter //函数描述: 设置cjobexecuter的个数 //输入: [in] value //输出: 无 //返回: 无 //================================================================================================ void setmaxnoofexecuter(int value); //================================================================================================ //函数名: addjobexecuter //函数描述: 添加一个cjobexecuter //输入: [in] pex //输出: 无 //返回: 无 //================================================================================================ void addjobexecuter(cjobexecuter *pex); //================================================================================================ //函数名: getjobexecuter //函数描述: 返回一个cjobexecuter //输入: 无 //输出: 无 //返回: 处理任务的指针 //================================================================================================ cjobexecuter* getjobexecuter(); //================================================================================================ //函数名: addfreejobexecuter //函数描述: 添加一个cjobexecuter //输入: [in] pex //输出: 无 //返回: 无 //================================================================================================ void addfreejobexecuter(cjobexecuter *pex); //================================================================================================ //函数名: addjob //函数描述: 添加一个任务 //输入: [in] pjob //输出: 无 //返回: 无 //================================================================================================ void addjob(cjob *pjob); //================================================================================================ //函数名: getmaxnoofexecuter //函数描述: 获取cjobexecuter个数的最大值 //输入: 无 //输出: 无 //返回: 无 //================================================================================================ int getmaxnoofexecuter(); //================================================================================================ //函数名: getnoofexecuter //函数描述: 获取当前cjobexecuter的个数 //输入: 无 //输出: 无 //返回: 无 //================================================================================================ int getnoofexecuter(); static uint jobobserverthreadfunction(lpvoid); //================================================================================================ //函数名: pause //函数描述: 挂起jobobserverthread线程 //输入: 无 //输出: 无 //返回: 无 //================================================================================================ void pause(); //================================================================================================ //函数名: resume //函数描述: 唤醒jobobserverthread线程 //输入: 无 //输出: 无 //返回: 无 //================================================================================================ void resume(); cwinthread* m_pobserverthread; //向空闲的executer线程添加任务的线程 ccriticalsection m_cs; //关键代码段,用于互斥 cjobqlist m_jobqlist; //任务队列 private : bool m_pause; //jobobserverthread线程运行标记 int m_maxnoofexecuter; //cjobexecuter最大个数 int m_noofexecuter; //当前cjobexecuter个数 thnode* m_pfreeelist; //维护空闲处理任务线程的队列 thnode* m_pallelist; //维护所有处理任务线程的队列 };
mthreadedjobq.cpp文件:
cmthreadedjobq::cmthreadedjobq() { m_maxnoofexecuter = 2; m_pause = false; m_pobserverthread = afxbeginthread(jobobserverthreadfunction,this); m_pfreeelist =null; m_noofexecuter =0; m_pallelist = null; } cmthreadedjobq::~cmthreadedjobq() { thnode* ptempnode; while (m_pallelist != null) { ptempnode = m_pallelist->pnext; delete m_pallelist->pexecuter; delete m_pallelist; m_pallelist = ptempnode; } while (m_pfreeelist != null) { ptempnode = m_pfreeelist->pnext; delete m_pfreeelist; m_pfreeelist = ptempnode; } m_pobserverthread->exitinstance(); delete m_pobserverthread; } void cmthreadedjobq::pause() { this->m_pause = true; } void cmthreadedjobq::resume() { this->m_pause = false; this->m_pobserverthread->resumethread(); } uint cmthreadedjobq::jobobserverthreadfunction(lpvoid pparam) { cmthreadedjobq *pmtjq = (cmthreadedjobq *)pparam; cjobexecuter *pjexecuter; while(true) { sleep(100); if(pmtjq->m_pause != true) { while(!pmtjq->m_jobqlist.isempty() ) { pjexecuter = pmtjq->getjobexecuter(); if( pjexecuter!=null) { pmtjq->m_cs.lock(); pjexecuter->execute(pmtjq->m_jobqlist.gethead()); pmtjq->m_jobqlist.removehead(); afxgetapp()->m_pmainwnd->postmessage(refresh_list); pmtjq->m_cs.unlock(); } else { break; } if(pmtjq->m_pause == true) break; } } pmtjq->m_pobserverthread->suspendthread(); } return 0; } int cmthreadedjobq::getnoofexecuter() { return this->m_noofexecuter; } int cmthreadedjobq::getmaxnoofexecuter() { return this->m_maxnoofexecuter; } void cmthreadedjobq::addjob(cjob *pjob) { cjob * ptempjob; csinglelock slock(&this->m_cs); slock.lock(); position pos,lastpos; pos = this->m_jobqlist.getheadposition(); lastpos = pos; if(pos != null) ptempjob =this->m_jobqlist.gethead(); while(pos != null ) { if( pjob->getpriority() > ptempjob->getpriority()) break; lastpos = pos; ptempjob = this->m_jobqlist.getnext(pos); } if(pos == null) this->m_jobqlist.addtail(pjob); else this->m_jobqlist.insertbefore(lastpos,pjob); this->m_pobserverthread->resumethread(); slock.unlock(); } void cmthreadedjobq::addfreejobexecuter(cjobexecuter *pex) { m_cs.lock(); thnode* node = new thnode; node->pexecuter = pex; node->pnext = this->m_pfreeelist; this->m_pfreeelist = node; m_cs.unlock(); } cjobexecuter* cmthreadedjobq::getjobexecuter() { thnode *ptemp; cjobexecuter *pex=null; m_cs.lock(); if(this->m_pfreeelist != null) //有空闲cjobexecuter,就返回 { ptemp = this->m_pfreeelist; this->m_pfreeelist = this->m_pfreeelist->pnext; pex = ptemp->pexecuter; delete ptemp ; m_cs.unlock(); return pex; } if(this->m_noofexecuter < this->m_maxnoofexecuter) //没有空闲cjobexecuter,并且当前cjobexecuter小于最大值,就生成一个新的cjobexecuter { pex = new cjobexecuter(this); this->addjobexecuter(pex); this->m_noofexecuter ; m_cs.unlock(); return pex; } m_cs.unlock(); return null; } void cmthreadedjobq::addjobexecuter(cjobexecuter *pex) { m_cs.lock(); thnode* node = new thnode; node->pexecuter= pex; node->pnext = this->m_pallelist; this->m_pallelist = node; m_cs.unlock(); } void cmthreadedjobq::setmaxnoofexecuter(int value) { this->m_cs.lock(); if(value >1 && value <11) this->m_maxnoofexecuter = value; m_pobserverthread->resumethread(); this->m_cs.unlock(); } void cmthreadedjobq::deletejobexecuter(cjobexecuter *pex) { thnode* pnode,*pnodep; csinglelock singlelock(&m_cs); singlelock.lock(); if(this->m_pallelist != null) { pnode = this->m_pallelist; if(pnode->pexecuter == pex ) { this->m_pallelist = pnode->pnext; delete pnode; } else { pnodep =pnode; pnode = pnode->pnext ; while(pnode != null ) { if(pnode->pexecuter== pex ) break; pnodep = pnode; pnode = pnode->pnext ; } if(pnode!= null) { pnodep->pnext = pnode->pnext; delete pnode; } } } this->m_noofexecuter--; singlelock.unlock(); pex->stop(); sleep(1); delete pex; }
以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。