mirror of
https://github.com/ultimatepp/ultimatepp.git
synced 2026-05-16 06:05:58 -06:00
178 lines
No EOL
3.8 KiB
C++
178 lines
No EOL
3.8 KiB
C++
#include "WorkQueue.h"
|
|
|
|
namespace MyWQ
|
|
{
|
|
|
|
#ifdef _MULTITHREADED
|
|
|
|
#define LLOG(x) // LOG(x)
|
|
#define LDUMP(x) // DUMP(x)
|
|
|
|
WorkQueue::Pool& WorkQueue::pool()
|
|
{
|
|
// static Pool pool;
|
|
return _pool;
|
|
}
|
|
|
|
WorkQueue::Pool::Pool(int threadnr)
|
|
: jobcount(0), lastjob(_jobs.GetPtr())
|
|
{
|
|
LLOG("WorkQueue INIT with threadnr " << threadnr);
|
|
if(threadnr < 0)
|
|
threadnr = CPU_Cores() + 2;
|
|
for(int i = 0; i < threadnr; i++)
|
|
threads.Add().Run(THISBACK1(ThreadRun, i));
|
|
}
|
|
|
|
WorkQueue::Pool::~Pool()
|
|
{
|
|
LLOG("Quit");
|
|
//MJob job;
|
|
//job.work = NULL;
|
|
lock.Enter();
|
|
//jobs.Clear();
|
|
_jobs.DeleteList();
|
|
jobcount = 0;
|
|
//jobs.Add(job);
|
|
MJob & job = NextFree();
|
|
lock.Leave();
|
|
for(int i = 0; i < threads.GetCount(); i++)
|
|
waitforjob.Release();
|
|
for(int i = 0; i < threads.GetCount(); i++)
|
|
threads[i].Wait();
|
|
LLOG("Quit ended");
|
|
}
|
|
|
|
bool WorkQueue::Pool::DoJob()
|
|
{
|
|
MJob * list = _jobs.GetPtr();
|
|
MJob * e = list->GetNext();
|
|
ASSERT(e != list);
|
|
//if(jobs.Top().work == NULL) {
|
|
if(e->work == NULL) {
|
|
LLOG("Quit thread");
|
|
return true;
|
|
}
|
|
//MJob job = jobs.Pop(); //already done above
|
|
e->Unlink();
|
|
--jobcount;
|
|
if(e == lastjob)
|
|
lastjob = list; //so we knoe where to insert next
|
|
MJob & job = *e;
|
|
lock.Leave();
|
|
job.cb();
|
|
lock.Enter();
|
|
e->LinkBefore(list); //part of pop
|
|
if(--job.work->todo <= 0) {
|
|
LLOG("Releasing waitforfinish of (WorkQueue " << FormatIntHex(job.work) << ")");
|
|
job.work->waitforfinish.Release(); //multiple call, but semaphore cant become negative anyway
|
|
}
|
|
LLOG("Finished, remaining todo " << job.work->todo << " (WorkQueue " << FormatIntHex(job.work) << ")");
|
|
return false;
|
|
}
|
|
|
|
void WorkQueue::Pool::ThreadRun(int tno)
|
|
{
|
|
LLOG("WorkQueue thread #" << tno << " started");
|
|
lock.Enter();
|
|
for(;;) {
|
|
// while(jobs.GetCount() <= 0) {
|
|
while(jobcount <= 0) {
|
|
waiting_threads++;
|
|
lock.Leave();
|
|
LLOG("#" << tno << " Waiting for job");
|
|
waitforjob.Wait();
|
|
LLOG("#" << tno << " Waiting ended");
|
|
lock.Enter();
|
|
}
|
|
LLOG("#" << tno << " Job acquired");
|
|
if(DoJob())
|
|
break;
|
|
LLOG("#" << tno << " Job finished");
|
|
}
|
|
lock.Leave();
|
|
LLOG("WorkQueue thread #" << tno << " finished");
|
|
}
|
|
|
|
WorkQueue::MJob & WorkQueue::Pool::NextFree()
|
|
{
|
|
MJob *list = _jobs.GetPtr();
|
|
MJob *e = list->GetNext();
|
|
if(e != list && e->work == NULL) //as long as not end
|
|
{
|
|
//found a free one
|
|
e->Unlink();
|
|
e->LinkAfter(lastjob); //list
|
|
}
|
|
else
|
|
{
|
|
//need one more, either end or used
|
|
//e = list->InsertNext();
|
|
e = lastjob->InsertNext();
|
|
lastjob = e;
|
|
}
|
|
++jobcount;
|
|
return *e;
|
|
}
|
|
|
|
void WorkQueue::Do(Callback cb) {
|
|
Pool& p = pool();
|
|
p.lock.Enter();
|
|
// if(p.jobs.GetCount() > 128) {
|
|
if(p.jobcount > 128) {
|
|
LLOG("Stack full: running in the main thread");
|
|
p.lock.Leave();
|
|
cb();
|
|
return;
|
|
}
|
|
//MJob job;
|
|
MJob & job = p.NextFree();
|
|
job.cb = cb;
|
|
job.work = this;
|
|
//p.jobs.Add(job); //already done
|
|
todo++;
|
|
LLOG("Adding job; todo: " << todo << " (WorkQueue " << FormatIntHex(this) << ")");
|
|
if(p.waiting_threads>0) {
|
|
LLOG("Releasing thread waiting for job: " << p.waiting_threads);
|
|
p.waiting_threads--;
|
|
p.waitforjob.Release();
|
|
}
|
|
p.lock.Leave();
|
|
}
|
|
|
|
void WorkQueue::Finish() {
|
|
#ifdef _MULTITHREADED
|
|
Pool &p = pool();
|
|
p.lock.Enter();
|
|
while(todo>0) {
|
|
LLOG("Finish: todo: " << todo << " (WorkQueue " << FormatIntHex(this) << ")");
|
|
if(p.jobcount>0)
|
|
p.DoJob();
|
|
else {
|
|
p.lock.Leave();
|
|
LLOG("WaitForFinish (WorkQueue " << FormatIntHex(this) << ")");
|
|
waitforfinish.Wait();
|
|
p.lock.Enter();
|
|
}
|
|
}
|
|
p.lock.Leave();
|
|
LLOG("WorkQueue finished");
|
|
#endif
|
|
}
|
|
|
|
WorkQueue::WorkQueue()
|
|
: _pool(1)
|
|
{
|
|
LLOG("*** WorkQueue constructed " << FormatHex(this));
|
|
todo = 0;
|
|
}
|
|
|
|
WorkQueue::~WorkQueue()
|
|
{
|
|
Finish();
|
|
LLOG("~~~ WorkQueue destructed");
|
|
}
|
|
|
|
#endif
|
|
|
|
} //namespace
|