Core: CoWork refactored

git-svn-id: svn://ultimatepp.org/upp/trunk@10151 f0d560ea-af0d-0410-9eb7-867de7ffcac7
This commit is contained in:
cxl 2016-08-07 09:32:35 +00:00
parent 93af3f371b
commit 2547ae8569
7 changed files with 331 additions and 2 deletions

View file

@ -416,7 +416,9 @@ void AppInit__(int argc, const char **argv)
void AppExit__()
{
#ifdef _MULTITHREADED
#ifndef COWORK2
Thread::ShutdownThreads();
#endif
#endif
sMainRunning = false;
#ifdef PLATFORM_POSIX

View file

@ -2,6 +2,8 @@
NAMESPACE_UPP
#ifndef COWORK2
#ifdef _MULTITHREADED
#define LLOG(x) // DLOG(x)
@ -252,4 +254,6 @@ CoWork::~CoWork()
#endif
#endif
END_UPP_NAMESPACE

View file

@ -1,5 +1,83 @@
#ifdef _MULTITHREADED
#define COWORK2
#ifdef COWORK2
class CoWork : NoCopy {
struct MJob : Moveable<MJob>, Link<MJob, 2> {
Function<void ()> fn;
CoWork *work = NULL;
};
enum { SCHEDULED_MAX = 2048 };
public:
struct Pool {
MJob *free;
Link<MJob, 2> jobs;
MJob slot[SCHEDULED_MAX];
int waiting_threads;
Array<Thread> threads;
bool quit;
Mutex lock;
ConditionVariable waitforjob;
void Free(MJob& m);
void DoJob(MJob& m);
void PushJob(Function<void ()>&& fn, CoWork *work);
Pool();
~Pool();
static thread__ bool finlock;
bool DoJob();
static void ThreadRun(int tno);
};
friend struct Pool;
static Pool& GetPool();
static thread_local bool is_worker;
// byte magic[sizeof(ConditionVariable)];
ConditionVariable waitforfinish;
Link<MJob, 2> jobs;
int todo;
Mutex stepmutex;
Array<BiVector<Function<void ()>>> step;
Vector<bool> steprunning;
public:
static void Start(Function<void ()>&& fn);
void Do(Function<void ()>&& fn);
void Do(const Function<void ()>& fn) { Do(clone(fn)); }
CoWork& operator&(const Function<void ()>& fn) { Do(fn); return *this; }
CoWork& operator&(Function<void ()>&& fn) { Do(pick(fn)); return *this; }
void Pipe(int stepi, Function<void ()>&& lambda); // experimental
static void FinLock();
void Finish();
bool IsFinished();
static bool IsWorker() { return is_worker; }
// static void StartPool(int n);
// static void ShutdownPool();
CoWork();
~CoWork();
};
#else
class CoWork : NoCopy {
typedef StaticCriticalSection Lock;
@ -74,6 +152,8 @@ public:
~CoWork();
};
#endif
#else
class CoWork : NoCopy {

238
uppsrc/Core/CoWork2.cpp Normal file
View file

@ -0,0 +1,238 @@
#include "Core.h"
#ifdef COWORK2
#include "Core.h"
NAMESPACE_UPP
#ifdef _MULTITHREADED
#define LLOG(x) DLOG(x)
#define LDUMP(x) DDUMP(x)
#define LHITCOUNT(x) // RHITCOUNT(x)
thread_local bool CoWork::Pool::finlock;
thread_local bool CoWork::is_worker;
CoWork::Pool& CoWork::GetPool()
{
static CoWork::Pool pool;
return pool;
}
void CoWork::Pool::Free(MJob& job)
{
job.link_next[0] = free;
free = &job;
}
CoWork::Pool::Pool()
{
ASSERT(!is_worker);
int nthreads = CPU_Cores() + 2;
LLOG("CoWork INIT pool: " << nthreads);
for(int i = 0; i < nthreads; i++)
threads.Add().Run([=] { is_worker = true; ThreadRun(i); });
free = NULL;
for(int i = 0; i < SCHEDULED_MAX; i++)
Free(slot[i]);
quit = false;
}
CoWork::Pool::~Pool()
{
ASSERT(!IsWorker());
LLOG("Quit");
lock.Enter();
quit = true;
lock.Leave();
for(int i = 0; i < threads.GetCount(); i++) {
waitforjob.Broadcast();
threads[i].Wait();
}
for(int i = 0; i < SCHEDULED_MAX; i++)
slot[i].LinkSelf();
LLOG("Quit ended");
}
void CoWork::FinLock()
{
Pool::finlock = true;
GetPool().lock.Enter();
}
void CoWork::Pool::DoJob(MJob& job)
{
job.UnlinkAll();
LLOG("DoJob (CoWork " << FormatIntHex(job.work) << ")");
finlock = false;
Function<void ()> fn = pick(job.fn);
Free(job);
CoWork *work = job.work;
lock.Leave();
fn();
if(!finlock)
lock.Enter();
if(!work)
return;
if(--work->todo == 0) {
LLOG("Releasing waitforfinish of (CoWork " << FormatIntHex(work) << ")");
work->waitforfinish.Signal();
}
LLOG("DoJobA, todo: " << work->todo << " (CoWork " << FormatIntHex(work) << ")");
ASSERT(work->todo >= 0);
LLOG("Finished, remaining todo " << work->todo);
}
void CoWork::Pool::ThreadRun(int tno)
{
LLOG("CoWork thread #" << tno << " started");
Pool& p = GetPool();
p.lock.Enter();
for(;;) {
while(!p.jobs.InList()) {
LHITCOUNT("CoWork: Parking thread to Wait");
p.waiting_threads++;
LLOG("#" << tno << " Waiting for job");
p.waitforjob.Wait(p.lock);
LLOG("#" << tno << " Waiting ended");
p.waiting_threads--;
if(p.quit) {
p.lock.Leave();
return;
}
}
LLOG("#" << tno << " Job acquired");
LHITCOUNT("CoWork: Running new job");
p.DoJob(*p.jobs.GetNext());
LLOG("#" << tno << " Job finished");
}
p.lock.Leave();
LLOG("CoWork thread #" << tno << " finished");
}
void CoWork::Pool::PushJob(Function<void ()>&& fn, CoWork *work)
{
ASSERT(free);
MJob& job = *free;
free = job.link_next[0];
job.LinkAfter(&jobs);
if(work)
job.LinkAfter(&work->jobs, 1);
job.fn = pick(fn);
job.work = work;
LLOG("Adding job");
if(waiting_threads) {
LLOG("Releasing thread waiting for job, waiting threads: " << waiting_threads);
waitforjob.Signal();
}
}
void CoWork::Start(Function<void ()>&& fn)
{
Pool& p = GetPool();
Mutex::Lock __(p.lock);
while(!p.free) { // this is quite unlikely, so we can be ugly here
p.lock.Leave();
Sleep(0);
p.lock.Enter();
}
p.PushJob(pick(fn), NULL);
}
void CoWork::Do(Function<void ()>&& fn)
{
LHITCOUNT("CoWork: Sheduling callback");
Pool& p = GetPool();
p.lock.Enter();
if(!p.free) {
LLOG("Stack full: running in the originating thread");
LHITCOUNT("CoWork: Stack full: Running in originating thread");
p.lock.Leave();
fn();
if(Pool::finlock)
p.lock.Leave();
return;
}
p.PushJob(pick(fn), this);
todo++;
p.lock.Leave();
}
void CoWork::Finish() {
Pool& p = GetPool();
p.lock.Enter();
while(!jobs.IsEmpty(1)) {
LLOG("Finish: todo: " << todo << " (CoWork " << FormatIntHex(this) << ")");
p.DoJob(*jobs.GetNext(1));
}
while(todo) {
LLOG("WaitForFinish (CoWork " << FormatIntHex(this) << ")");
waitforfinish.Wait(p.lock);
}
p.lock.Leave();
LLOG("CoWork " << FormatIntHex(this) << " finished");
}
bool CoWork::IsFinished()
{
Pool& p = GetPool();
p.lock.Enter();
bool b = jobs.IsEmpty(1);
p.lock.Leave();
return b;
}
void CoWork::Pipe(int stepi, Function<void ()>&& fn)
{
Mutex::Lock __(stepmutex);
auto& q = step.At(stepi);
LLOG("Step " << stepi << ", count: " << q.GetCount() << ", running: " << steprunning.GetCount());
q.AddHead(pick(fn));
if(!steprunning.At(stepi, false)) {
steprunning.At(stepi) = true;
*this & [=]() {
LLOG("Starting step " << stepi << " processor");
stepmutex.Enter();
for(;;) {
Function<void ()> f;
auto& q = step[stepi];
LLOG("StepWork " << stepi << ", todo:" << q.GetCount());
if(q.GetCount() == 0)
break;
f = pick(q.Tail());
q.DropTail();
stepmutex.Leave();
f();
stepmutex.Enter();
}
steprunning.At(stepi) = false;
stepmutex.Leave();
LLOG("Exiting step " << stepi << " processor");
};
}
}
CoWork::CoWork()
{
LLOG("CoWork constructed " << FormatHex(this));
todo = 0;
}
CoWork::~CoWork()
{
Finish();
LLOG("~CoWork " << FormatIntHex(this));
}
#endif
END_UPP_NAMESPACE
#endif

View file

@ -161,6 +161,7 @@ file
Topic.cpp,
CoWork.h,
CoWork.cpp,
CoWork2.cpp,
Hash.h,
MD5.cpp,
SHA1.cpp,

View file

@ -88,7 +88,9 @@ sThreadRoutine(void *arg)
delete cb;
if(sExit)
(*sExit)();
#ifndef COWORK2
CoWork::ShutdownPool();
#endif
#ifdef UPP_HEAP
MemoryFreeThread();
#endif
@ -208,7 +210,9 @@ void Thread::EndShutdownThreads()
void Thread::ShutdownThreads()
{
#ifndef COWORK2
CoWork::ShutdownPool();
#endif
BeginShutdownThreads();
while(GetCount())
Sleep(100);

View file

@ -227,11 +227,11 @@ void Mitor<T>::Shrink()
//#
template <class T, int N = 1>
class Link {
protected:
struct Link {
T *link_prev[N];
T *link_next[N];
protected:
void LPN(int i) { link_prev[i]->link_next[i] = link_next[i]->link_prev[i] = (T *)this; }
public: