diff --git a/uppsrc/Core/App.cpp b/uppsrc/Core/App.cpp index 297713d24..5ffbace2f 100644 --- a/uppsrc/Core/App.cpp +++ b/uppsrc/Core/App.cpp @@ -416,7 +416,9 @@ void AppInit__(int argc, const char **argv) void AppExit__() { #ifdef _MULTITHREADED +#ifndef COWORK2 Thread::ShutdownThreads(); +#endif #endif sMainRunning = false; #ifdef PLATFORM_POSIX diff --git a/uppsrc/Core/CoWork.cpp b/uppsrc/Core/CoWork.cpp index 302734301..f0f75250d 100644 --- a/uppsrc/Core/CoWork.cpp +++ b/uppsrc/Core/CoWork.cpp @@ -2,6 +2,8 @@ NAMESPACE_UPP +#ifndef COWORK2 + #ifdef _MULTITHREADED #define LLOG(x) // DLOG(x) @@ -252,4 +254,6 @@ CoWork::~CoWork() #endif +#endif + END_UPP_NAMESPACE diff --git a/uppsrc/Core/CoWork.h b/uppsrc/Core/CoWork.h index 715c4c437..21c2f94a8 100644 --- a/uppsrc/Core/CoWork.h +++ b/uppsrc/Core/CoWork.h @@ -1,5 +1,83 @@ #ifdef _MULTITHREADED +#define COWORK2 + +#ifdef COWORK2 + +class CoWork : NoCopy { + struct MJob : Moveable, Link { + Function fn; + CoWork *work = NULL; + }; + + enum { SCHEDULED_MAX = 2048 }; + +public: + struct Pool { + MJob *free; + Link jobs; + MJob slot[SCHEDULED_MAX]; + int waiting_threads; + Array threads; + bool quit; + + Mutex lock; + ConditionVariable waitforjob; + + void Free(MJob& m); + void DoJob(MJob& m); + void PushJob(Function&& fn, CoWork *work); + + Pool(); + ~Pool(); + + static thread__ bool finlock; + + bool DoJob(); + static void ThreadRun(int tno); + }; + + friend struct Pool; + + static Pool& GetPool(); + + static thread_local bool is_worker; + +// byte magic[sizeof(ConditionVariable)]; + ConditionVariable waitforfinish; + Link jobs; + int todo; + + Mutex stepmutex; + Array>> step; + Vector steprunning; + +public: + static void Start(Function&& fn); + + void Do(Function&& fn); + void Do(const Function& fn) { Do(clone(fn)); } + + CoWork& operator&(const Function& fn) { Do(fn); return *this; } + CoWork& operator&(Function&& fn) { Do(pick(fn)); return *this; } + + void Pipe(int stepi, Function&& lambda); // experimental + + static void FinLock(); + + void Finish(); + + bool IsFinished(); + + static bool IsWorker() { return is_worker; } +// static void StartPool(int n); +// static void ShutdownPool(); + + CoWork(); + ~CoWork(); +}; + +#else class CoWork : NoCopy { typedef StaticCriticalSection Lock; @@ -74,6 +152,8 @@ public: ~CoWork(); }; +#endif + #else class CoWork : NoCopy { diff --git a/uppsrc/Core/CoWork2.cpp b/uppsrc/Core/CoWork2.cpp new file mode 100644 index 000000000..1252db5fc --- /dev/null +++ b/uppsrc/Core/CoWork2.cpp @@ -0,0 +1,238 @@ +#include "Core.h" + +#ifdef COWORK2 + +#include "Core.h" + +NAMESPACE_UPP + +#ifdef _MULTITHREADED + +#define LLOG(x) DLOG(x) +#define LDUMP(x) DDUMP(x) + +#define LHITCOUNT(x) // RHITCOUNT(x) + +thread_local bool CoWork::Pool::finlock; +thread_local bool CoWork::is_worker; + +CoWork::Pool& CoWork::GetPool() +{ + static CoWork::Pool pool; + return pool; +} + +void CoWork::Pool::Free(MJob& job) +{ + job.link_next[0] = free; + free = &job; +} + +CoWork::Pool::Pool() +{ + ASSERT(!is_worker); + + int nthreads = CPU_Cores() + 2; + LLOG("CoWork INIT pool: " << nthreads); + for(int i = 0; i < nthreads; i++) + threads.Add().Run([=] { is_worker = true; ThreadRun(i); }); + + free = NULL; + for(int i = 0; i < SCHEDULED_MAX; i++) + Free(slot[i]); + + quit = false; +} + +CoWork::Pool::~Pool() +{ + ASSERT(!IsWorker()); + LLOG("Quit"); + lock.Enter(); + quit = true; + lock.Leave(); + for(int i = 0; i < threads.GetCount(); i++) { + waitforjob.Broadcast(); + threads[i].Wait(); + } + for(int i = 0; i < SCHEDULED_MAX; i++) + slot[i].LinkSelf(); + LLOG("Quit ended"); +} + +void CoWork::FinLock() +{ + Pool::finlock = true; + GetPool().lock.Enter(); +} + +void CoWork::Pool::DoJob(MJob& job) +{ + job.UnlinkAll(); + LLOG("DoJob (CoWork " << FormatIntHex(job.work) << ")"); + finlock = false; + Function fn = pick(job.fn); + Free(job); + CoWork *work = job.work; + lock.Leave(); + fn(); + if(!finlock) + lock.Enter(); + if(!work) + return; + if(--work->todo == 0) { + LLOG("Releasing waitforfinish of (CoWork " << FormatIntHex(work) << ")"); + work->waitforfinish.Signal(); + } + LLOG("DoJobA, todo: " << work->todo << " (CoWork " << FormatIntHex(work) << ")"); + ASSERT(work->todo >= 0); + LLOG("Finished, remaining todo " << work->todo); +} + +void CoWork::Pool::ThreadRun(int tno) +{ + LLOG("CoWork thread #" << tno << " started"); + Pool& p = GetPool(); + p.lock.Enter(); + for(;;) { + while(!p.jobs.InList()) { + LHITCOUNT("CoWork: Parking thread to Wait"); + p.waiting_threads++; + LLOG("#" << tno << " Waiting for job"); + p.waitforjob.Wait(p.lock); + LLOG("#" << tno << " Waiting ended"); + p.waiting_threads--; + if(p.quit) { + p.lock.Leave(); + return; + } + } + LLOG("#" << tno << " Job acquired"); + LHITCOUNT("CoWork: Running new job"); + p.DoJob(*p.jobs.GetNext()); + LLOG("#" << tno << " Job finished"); + } + p.lock.Leave(); + LLOG("CoWork thread #" << tno << " finished"); +} + +void CoWork::Pool::PushJob(Function&& fn, CoWork *work) +{ + ASSERT(free); + MJob& job = *free; + free = job.link_next[0]; + job.LinkAfter(&jobs); + if(work) + job.LinkAfter(&work->jobs, 1); + job.fn = pick(fn); + job.work = work; + LLOG("Adding job"); + if(waiting_threads) { + LLOG("Releasing thread waiting for job, waiting threads: " << waiting_threads); + waitforjob.Signal(); + } +} + +void CoWork::Start(Function&& fn) +{ + Pool& p = GetPool(); + Mutex::Lock __(p.lock); + while(!p.free) { // this is quite unlikely, so we can be ugly here + p.lock.Leave(); + Sleep(0); + p.lock.Enter(); + } + p.PushJob(pick(fn), NULL); +} + +void CoWork::Do(Function&& fn) +{ + LHITCOUNT("CoWork: Sheduling callback"); + Pool& p = GetPool(); + p.lock.Enter(); + if(!p.free) { + LLOG("Stack full: running in the originating thread"); + LHITCOUNT("CoWork: Stack full: Running in originating thread"); + p.lock.Leave(); + fn(); + if(Pool::finlock) + p.lock.Leave(); + return; + } + p.PushJob(pick(fn), this); + todo++; + p.lock.Leave(); +} + +void CoWork::Finish() { + Pool& p = GetPool(); + p.lock.Enter(); + while(!jobs.IsEmpty(1)) { + LLOG("Finish: todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); + p.DoJob(*jobs.GetNext(1)); + } + while(todo) { + LLOG("WaitForFinish (CoWork " << FormatIntHex(this) << ")"); + waitforfinish.Wait(p.lock); + } + p.lock.Leave(); + LLOG("CoWork " << FormatIntHex(this) << " finished"); +} + +bool CoWork::IsFinished() +{ + Pool& p = GetPool(); + p.lock.Enter(); + bool b = jobs.IsEmpty(1); + p.lock.Leave(); + return b; +} + +void CoWork::Pipe(int stepi, Function&& fn) +{ + Mutex::Lock __(stepmutex); + auto& q = step.At(stepi); + LLOG("Step " << stepi << ", count: " << q.GetCount() << ", running: " << steprunning.GetCount()); + q.AddHead(pick(fn)); + if(!steprunning.At(stepi, false)) { + steprunning.At(stepi) = true; + *this & [=]() { + LLOG("Starting step " << stepi << " processor"); + stepmutex.Enter(); + for(;;) { + Function f; + auto& q = step[stepi]; + LLOG("StepWork " << stepi << ", todo:" << q.GetCount()); + if(q.GetCount() == 0) + break; + f = pick(q.Tail()); + q.DropTail(); + stepmutex.Leave(); + f(); + stepmutex.Enter(); + } + steprunning.At(stepi) = false; + stepmutex.Leave(); + LLOG("Exiting step " << stepi << " processor"); + }; + } +} + +CoWork::CoWork() +{ + LLOG("CoWork constructed " << FormatHex(this)); + todo = 0; +} + +CoWork::~CoWork() +{ + Finish(); + LLOG("~CoWork " << FormatIntHex(this)); +} + +#endif + +END_UPP_NAMESPACE + +#endif + diff --git a/uppsrc/Core/Core.upp b/uppsrc/Core/Core.upp index 907b6c093..5db210d34 100644 --- a/uppsrc/Core/Core.upp +++ b/uppsrc/Core/Core.upp @@ -161,6 +161,7 @@ file Topic.cpp, CoWork.h, CoWork.cpp, + CoWork2.cpp, Hash.h, MD5.cpp, SHA1.cpp, diff --git a/uppsrc/Core/Mt.cpp b/uppsrc/Core/Mt.cpp index 45dd50f7c..e3560e661 100644 --- a/uppsrc/Core/Mt.cpp +++ b/uppsrc/Core/Mt.cpp @@ -88,7 +88,9 @@ sThreadRoutine(void *arg) delete cb; if(sExit) (*sExit)(); +#ifndef COWORK2 CoWork::ShutdownPool(); +#endif #ifdef UPP_HEAP MemoryFreeThread(); #endif @@ -208,7 +210,9 @@ void Thread::EndShutdownThreads() void Thread::ShutdownThreads() { +#ifndef COWORK2 CoWork::ShutdownPool(); +#endif BeginShutdownThreads(); while(GetCount()) Sleep(100); diff --git a/uppsrc/Core/Other.h b/uppsrc/Core/Other.h index c25a39444..74b4dcb1d 100644 --- a/uppsrc/Core/Other.h +++ b/uppsrc/Core/Other.h @@ -227,11 +227,11 @@ void Mitor::Shrink() //# template -class Link { -protected: +struct Link { T *link_prev[N]; T *link_next[N]; +protected: void LPN(int i) { link_prev[i]->link_next[i] = link_next[i]->link_prev[i] = (T *)this; } public: