ultimatepp/bazaar/WorkQueue/WorkQueue.cpp

178 lines
No EOL
3.8 KiB
C++

#include "WorkQueue.h"
namespace MyWQ
{
#ifdef _MULTITHREADED
#define LLOG(x) // LOG(x)
#define LDUMP(x) // DUMP(x)
WorkQueue::Pool& WorkQueue::pool()
{
// static Pool pool;
return _pool;
}
WorkQueue::Pool::Pool(int threadnr)
: jobcount(0), lastjob(_jobs.GetPtr())
{
LLOG("WorkQueue INIT with threadnr " << threadnr);
if(threadnr < 0)
threadnr = CPU_Cores() + 2;
for(int i = 0; i < threadnr; i++)
threads.Add().Run(THISBACK1(ThreadRun, i));
}
WorkQueue::Pool::~Pool()
{
LLOG("Quit");
//MJob job;
//job.work = NULL;
lock.Enter();
//jobs.Clear();
_jobs.DeleteList();
jobcount = 0;
//jobs.Add(job);
MJob & job = NextFree();
lock.Leave();
for(int i = 0; i < threads.GetCount(); i++)
waitforjob.Release();
for(int i = 0; i < threads.GetCount(); i++)
threads[i].Wait();
LLOG("Quit ended");
}
bool WorkQueue::Pool::DoJob()
{
MJob * list = _jobs.GetPtr();
MJob * e = list->GetNext();
ASSERT(e != list);
//if(jobs.Top().work == NULL) {
if(e->work == NULL) {
LLOG("Quit thread");
return true;
}
//MJob job = jobs.Pop(); //already done above
e->Unlink();
--jobcount;
if(e == lastjob)
lastjob = list; //so we knoe where to insert next
MJob & job = *e;
lock.Leave();
job.cb();
lock.Enter();
e->LinkBefore(list); //part of pop
if(--job.work->todo <= 0) {
LLOG("Releasing waitforfinish of (WorkQueue " << FormatIntHex(job.work) << ")");
job.work->waitforfinish.Release(); //multiple call, but semaphore cant become negative anyway
}
LLOG("Finished, remaining todo " << job.work->todo << " (WorkQueue " << FormatIntHex(job.work) << ")");
return false;
}
void WorkQueue::Pool::ThreadRun(int tno)
{
LLOG("WorkQueue thread #" << tno << " started");
lock.Enter();
for(;;) {
// while(jobs.GetCount() <= 0) {
while(jobcount <= 0) {
waiting_threads++;
lock.Leave();
LLOG("#" << tno << " Waiting for job");
waitforjob.Wait();
LLOG("#" << tno << " Waiting ended");
lock.Enter();
}
LLOG("#" << tno << " Job acquired");
if(DoJob())
break;
LLOG("#" << tno << " Job finished");
}
lock.Leave();
LLOG("WorkQueue thread #" << tno << " finished");
}
WorkQueue::MJob & WorkQueue::Pool::NextFree()
{
MJob *list = _jobs.GetPtr();
MJob *e = list->GetNext();
if(e != list && e->work == NULL) //as long as not end
{
//found a free one
e->Unlink();
e->LinkAfter(lastjob); //list
}
else
{
//need one more, either end or used
//e = list->InsertNext();
e = lastjob->InsertNext();
lastjob = e;
}
++jobcount;
return *e;
}
void WorkQueue::Do(Callback cb) {
Pool& p = pool();
p.lock.Enter();
// if(p.jobs.GetCount() > 128) {
if(p.jobcount > 128) {
LLOG("Stack full: running in the main thread");
p.lock.Leave();
cb();
return;
}
//MJob job;
MJob & job = p.NextFree();
job.cb = cb;
job.work = this;
//p.jobs.Add(job); //already done
todo++;
LLOG("Adding job; todo: " << todo << " (WorkQueue " << FormatIntHex(this) << ")");
if(p.waiting_threads>0) {
LLOG("Releasing thread waiting for job: " << p.waiting_threads);
p.waiting_threads--;
p.waitforjob.Release();
}
p.lock.Leave();
}
void WorkQueue::Finish() {
#ifdef _MULTITHREADED
Pool &p = pool();
p.lock.Enter();
while(todo>0) {
LLOG("Finish: todo: " << todo << " (WorkQueue " << FormatIntHex(this) << ")");
if(p.jobcount>0)
p.DoJob();
else {
p.lock.Leave();
LLOG("WaitForFinish (WorkQueue " << FormatIntHex(this) << ")");
waitforfinish.Wait();
p.lock.Enter();
}
}
p.lock.Leave();
LLOG("WorkQueue finished");
#endif
}
WorkQueue::WorkQueue()
: _pool(1)
{
LLOG("*** WorkQueue constructed " << FormatHex(this));
todo = 0;
}
WorkQueue::~WorkQueue()
{
Finish();
LLOG("~~~ WorkQueue destructed");
}
#endif
} //namespace