From 89cdcc39cd37181deaca2b5c4336578dc045f463 Mon Sep 17 00:00:00 2001 From: cxl Date: Mon, 8 Aug 2016 07:31:41 +0000 Subject: [PATCH] Core: CoWork refactoring finished, RichText: Fixed issue with begin/infinite height text git-svn-id: svn://ultimatepp.org/upp/trunk@10153 f0d560ea-af0d-0410-9eb7-867de7ffcac7 --- uppsrc/Core/CoWork.cpp | 220 ++++++++++++------------- uppsrc/Core/CoWork.h | 92 +---------- uppsrc/Core/CoWork2.cpp | 238 --------------------------- uppsrc/Core/Core.upp | 1 - uppsrc/Core/Mt.cpp | 41 ++--- uppsrc/Core/Mt.h | 4 +- uppsrc/Core/src.tpp/CoWork$en-us.tpp | 60 ++++--- uppsrc/Core/src.tpp/Thread$en-us.tpp | 41 +++-- uppsrc/RichText/TxtPaint.cpp | 3 +- 9 files changed, 203 insertions(+), 497 deletions(-) delete mode 100644 uppsrc/Core/CoWork2.cpp diff --git a/uppsrc/Core/CoWork.cpp b/uppsrc/Core/CoWork.cpp index f0f75250d..304ea0bb8 100644 --- a/uppsrc/Core/CoWork.cpp +++ b/uppsrc/Core/CoWork.cpp @@ -2,8 +2,6 @@ NAMESPACE_UPP -#ifndef COWORK2 - #ifdef _MULTITHREADED #define LLOG(x) // DLOG(x) @@ -13,57 +11,61 @@ NAMESPACE_UPP thread_local bool CoWork::Pool::finlock; thread_local bool CoWork::is_worker; -thread_local CoWork::Pool *CoWork::pool; CoWork::Pool& CoWork::GetPool() { - return GetPool(CPU_Cores() + 2); + static CoWork::Pool pool; + return pool; } -CoWork::Pool& CoWork::GetPool(int nthreads) +void CoWork::Pool::Free(MJob& job) { - if(!pool) - pool = new Pool(nthreads); - return *pool; + job.link_next[0] = free; + free = &job; } -void CoWork::StartPool(int n) +void CoWork::Pool::InitThreads(int nthreads) { - if(!is_worker) { - ShutdownPool(); - GetPool(n); + LLOG("Pool::InitThreads: " << nthreads); + for(int i = 0; i < nthreads; i++) + threads.Add().Run([=] { is_worker = true; ThreadRun(i); }, true); +} + +void CoWork::Pool::ExitThreads() +{ + lock.Enter(); + quit = true; + lock.Leave(); + for(int i = 0; i < threads.GetCount(); i++) { + waitforjob.Broadcast(); + threads[i].Wait(); } + threads.Clear(); + lock.Enter(); + quit = false; + lock.Leave(); } -void CoWork::ShutdownPool() -{ - if(!is_worker && pool) { - delete pool; - pool = NULL; - } -} - -CoWork::Pool::Pool(int nthreads) +CoWork::Pool::Pool() { ASSERT(!is_worker); - LLOG("CoWork INIT pool: " << nthreads); - scheduled = 0; - for(int i = 0; i < nthreads; i++) - threads.Add().Run([=] { is_worker = true; pool = this; ThreadRun(i); }); + + InitThreads(CPU_Cores() + 2); + + free = NULL; + for(int i = 0; i < SCHEDULED_MAX; i++) + Free(slot[i]); + + quit = false; } CoWork::Pool::~Pool() { ASSERT(!IsWorker()); LLOG("Quit"); - lock.Enter(); - jobs[0].work = NULL; - jobs[0].started = NULL; - scheduled = 1; - lock.Leave(); - waitforjob.Broadcast(); - for(int i = 0; i < threads.GetCount(); i++) - threads[i].Wait(); + ExitThreads(); + for(int i = 0; i < SCHEDULED_MAX; i++) + slot[i].LinkSelf(); LLOG("Quit ended"); } @@ -73,37 +75,27 @@ void CoWork::FinLock() GetPool().lock.Enter(); } -bool CoWork::Pool::DoJob() +void CoWork::Pool::DoJob(MJob& job) { - Pool& p = GetPool(); - MJob& job = p.jobs[p.scheduled - 1]; - if(job.work == NULL && job.started == NULL) { - LLOG("Quit thread"); - return true; - } - LLOG("DoJob " << p.scheduled - 1 << " (CoWork " << FormatIntHex(job.work) << ")"); + job.UnlinkAll(); + LLOG("DoJob (CoWork " << FormatIntHex(job.work) << ")"); finlock = false; Function fn = pick(job.fn); + Free(job); CoWork *work = job.work; - p.scheduled--; - if(job.started) { - *job.started = true; - p.waitforstart.Broadcast(); - } - p.lock.Leave(); + lock.Leave(); fn(); if(!finlock) - p.lock.Enter(); + lock.Enter(); if(!work) - return false; + return; if(--work->todo == 0) { LLOG("Releasing waitforfinish of (CoWork " << FormatIntHex(work) << ")"); work->waitforfinish.Signal(); } - LLOG("DoJobA " << p.scheduled << ", todo: " << work->todo << " (CoWork " << FormatIntHex(work) << ")"); + LLOG("DoJobA, todo: " << work->todo << " (CoWork " << FormatIntHex(work) << ")"); ASSERT(work->todo >= 0); LLOG("Finished, remaining todo " << work->todo); - return false; } void CoWork::Pool::ThreadRun(int tno) @@ -112,52 +104,57 @@ void CoWork::Pool::ThreadRun(int tno) Pool& p = GetPool(); p.lock.Enter(); for(;;) { - while(p.scheduled == 0) { + while(!p.jobs.InList()) { LHITCOUNT("CoWork: Parking thread to Wait"); p.waiting_threads++; LLOG("#" << tno << " Waiting for job"); p.waitforjob.Wait(p.lock); LLOG("#" << tno << " Waiting ended"); + p.waiting_threads--; + if(p.quit) { + p.lock.Leave(); + return; + } } LLOG("#" << tno << " Job acquired"); LHITCOUNT("CoWork: Running new job"); - if(DoJob()) - break; + p.DoJob(*p.jobs.GetNext()); LLOG("#" << tno << " Job finished"); } p.lock.Leave(); LLOG("CoWork thread #" << tno << " finished"); } -void CoWork::Start(Function&& fn) +void CoWork::Pool::PushJob(Function&& fn, CoWork *work) { - bool started = false; - Pool& p = GetPool(); - Mutex::Lock __(p.lock); - while(p.scheduled >= SCHEDULED_MAX) { // this is quite unlikely, so we can be ugly here - p.lock.Leave(); - Sleep(0); - p.lock.Enter(); + ASSERT(free); + MJob& job = *free; + free = job.link_next[0]; + job.LinkAfter(&jobs); + if(work) + job.LinkAfter(&work->jobs, 1); + job.fn = pick(fn); + job.work = work; + LLOG("Adding job"); + if(waiting_threads) { + LLOG("Releasing thread waiting for job, waiting threads: " << waiting_threads); + waitforjob.Signal(); } - PushJob(pick(fn)).started = &started; - while(!started) - p.waitforstart.Wait(p.lock); } -CoWork::MJob& CoWork::PushJob(Function&& fn) +bool CoWork::TrySchedule(Function&& fn) { Pool& p = GetPool(); - MJob& job = p.jobs[p.scheduled++]; - job.fn = pick(fn); - job.work = NULL; - job.started = NULL; - LLOG("Adding job " << p.scheduled - 1); - if(p.waiting_threads) { - LLOG("Releasing thread waiting for job: " << p.waiting_threads); - p.waiting_threads--; - p.waitforjob.Signal(); - } - return job; + Mutex::Lock __(p.lock); + if(!p.free) + return false; + p.PushJob(pick(fn), NULL); + return true; +} + +void CoWork::Schedule(Function&& fn) +{ + while(!TrySchedule(pick(fn))) Sleep(0); } void CoWork::Do(Function&& fn) @@ -165,7 +162,7 @@ void CoWork::Do(Function&& fn) LHITCOUNT("CoWork: Sheduling callback"); Pool& p = GetPool(); p.lock.Enter(); - if(p.scheduled >= SCHEDULED_MAX) { + if(!p.free) { LLOG("Stack full: running in the originating thread"); LHITCOUNT("CoWork: Stack full: Running in originating thread"); p.lock.Leave(); @@ -174,11 +171,42 @@ void CoWork::Do(Function&& fn) p.lock.Leave(); return; } - PushJob(pick(fn)).work = this; + p.PushJob(pick(fn), this); todo++; p.lock.Leave(); } +void CoWork::Finish() { + Pool& p = GetPool(); + p.lock.Enter(); + while(!jobs.IsEmpty(1)) { + LLOG("Finish: todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); + p.DoJob(*jobs.GetNext(1)); + } + while(todo) { + LLOG("WaitForFinish (CoWork " << FormatIntHex(this) << ")"); + waitforfinish.Wait(p.lock); + } + p.lock.Leave(); + LLOG("CoWork " << FormatIntHex(this) << " finished"); +} + +bool CoWork::IsFinished() +{ + Pool& p = GetPool(); + p.lock.Enter(); + bool b = jobs.IsEmpty(1); + p.lock.Leave(); + return b; +} + +void CoWork::SetPoolSize(int n) +{ + Pool& p = GetPool(); + p.ExitThreads(); + p.InitThreads(n); +} + void CoWork::Pipe(int stepi, Function&& fn) { Mutex::Lock __(stepmutex); @@ -209,51 +237,19 @@ void CoWork::Pipe(int stepi, Function&& fn) } } -void CoWork::Finish() { - if(!pool) return; - Pool& p = *pool; - p.lock.Enter(); - while(todo) { - LLOG("Finish: todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); - if(todo == 0) - break; - if(p.scheduled) - Pool::DoJob(); - else { - LLOG("WaitForFinish (CoWork " << FormatIntHex(this) << ")"); - waitforfinish.Wait(p.lock); - } - } - p.lock.Leave(); - LLOG("CoWork " << FormatIntHex(this) << " finished"); -} - -bool CoWork::IsFinished() -{ - Pool& p = GetPool(); - p.lock.Enter(); - bool b = todo == 0; - p.lock.Leave(); - return b; -} - CoWork::CoWork() { LLOG("CoWork constructed " << FormatHex(this)); todo = 0; -// SetMagic(magic, sizeof(magic)); } CoWork::~CoWork() { -// CheckMagic(magic, sizeof(magic)); Finish(); -// CheckMagic(magic, sizeof(magic)); LLOG("~CoWork " << FormatIntHex(this)); } #endif -#endif - END_UPP_NAMESPACE + diff --git a/uppsrc/Core/CoWork.h b/uppsrc/Core/CoWork.h index 21c2f94a8..336a7e7e7 100644 --- a/uppsrc/Core/CoWork.h +++ b/uppsrc/Core/CoWork.h @@ -1,9 +1,5 @@ #ifdef _MULTITHREADED -#define COWORK2 - -#ifdef COWORK2 - class CoWork : NoCopy { struct MJob : Moveable, Link { Function fn; @@ -28,6 +24,9 @@ public: void DoJob(MJob& m); void PushJob(Function&& fn, CoWork *work); + void InitThreads(int nthreads); + void ExitThreads(); + Pool(); ~Pool(); @@ -53,7 +52,10 @@ public: Vector steprunning; public: - static void Start(Function&& fn); + static bool TrySchedule(Function&& fn); + static bool TrySchedule(const Function& fn) { return TrySchedule(clone(fn)); } + static void Schedule(Function&& fn); + static void Schedule(const Function& fn) { return Schedule(clone(fn)); } void Do(Function&& fn); void Do(const Function& fn) { Do(clone(fn)); } @@ -70,90 +72,12 @@ public: bool IsFinished(); static bool IsWorker() { return is_worker; } -// static void StartPool(int n); -// static void ShutdownPool(); + static void SetPoolSize(int n); CoWork(); ~CoWork(); }; -#else -class CoWork : NoCopy { - typedef StaticCriticalSection Lock; - - struct MJob : Moveable { - Function fn; - CoWork *work = NULL; - bool *started = NULL; - }; - - enum { SCHEDULED_MAX = 2048 }; - -public: - struct Pool { - int scheduled; - MJob jobs[SCHEDULED_MAX]; - int waiting_threads; - Array threads; - - Mutex lock; - ConditionVariable waitforjob; - ConditionVariable waitforstart; - - Pool(int nthreads); - ~Pool(); - - static thread__ bool finlock; - - static bool DoJob(); - static void ThreadRun(int tno); - }; - - friend struct Pool; - - static Pool& GetPool(int n); - static Pool& GetPool(); - - static thread_local bool is_worker; - static thread_local Pool *pool; - -// byte magic[sizeof(ConditionVariable)]; - ConditionVariable waitforfinish; - int todo; - - static MJob& PushJob(Function&& fn); - - Mutex stepmutex; - Array>> step; - Vector steprunning; - -public: - static void Start(Function&& fn); - - void Do(Function&& fn); - void Do(const Function& fn) { Do(clone(fn)); } - - CoWork& operator&(const Function& fn) { Do(fn); return *this; } - CoWork& operator&(Function&& fn) { Do(pick(fn)); return *this; } - - void Pipe(int stepi, Function&& lambda); // experimental - - static void FinLock(); - - void Finish(); - - bool IsFinished(); - - static bool IsWorker() { return is_worker; } - static void StartPool(int n); - static void ShutdownPool(); - - CoWork(); - ~CoWork(); -}; - -#endif - #else class CoWork : NoCopy { diff --git a/uppsrc/Core/CoWork2.cpp b/uppsrc/Core/CoWork2.cpp deleted file mode 100644 index 1252db5fc..000000000 --- a/uppsrc/Core/CoWork2.cpp +++ /dev/null @@ -1,238 +0,0 @@ -#include "Core.h" - -#ifdef COWORK2 - -#include "Core.h" - -NAMESPACE_UPP - -#ifdef _MULTITHREADED - -#define LLOG(x) DLOG(x) -#define LDUMP(x) DDUMP(x) - -#define LHITCOUNT(x) // RHITCOUNT(x) - -thread_local bool CoWork::Pool::finlock; -thread_local bool CoWork::is_worker; - -CoWork::Pool& CoWork::GetPool() -{ - static CoWork::Pool pool; - return pool; -} - -void CoWork::Pool::Free(MJob& job) -{ - job.link_next[0] = free; - free = &job; -} - -CoWork::Pool::Pool() -{ - ASSERT(!is_worker); - - int nthreads = CPU_Cores() + 2; - LLOG("CoWork INIT pool: " << nthreads); - for(int i = 0; i < nthreads; i++) - threads.Add().Run([=] { is_worker = true; ThreadRun(i); }); - - free = NULL; - for(int i = 0; i < SCHEDULED_MAX; i++) - Free(slot[i]); - - quit = false; -} - -CoWork::Pool::~Pool() -{ - ASSERT(!IsWorker()); - LLOG("Quit"); - lock.Enter(); - quit = true; - lock.Leave(); - for(int i = 0; i < threads.GetCount(); i++) { - waitforjob.Broadcast(); - threads[i].Wait(); - } - for(int i = 0; i < SCHEDULED_MAX; i++) - slot[i].LinkSelf(); - LLOG("Quit ended"); -} - -void CoWork::FinLock() -{ - Pool::finlock = true; - GetPool().lock.Enter(); -} - -void CoWork::Pool::DoJob(MJob& job) -{ - job.UnlinkAll(); - LLOG("DoJob (CoWork " << FormatIntHex(job.work) << ")"); - finlock = false; - Function fn = pick(job.fn); - Free(job); - CoWork *work = job.work; - lock.Leave(); - fn(); - if(!finlock) - lock.Enter(); - if(!work) - return; - if(--work->todo == 0) { - LLOG("Releasing waitforfinish of (CoWork " << FormatIntHex(work) << ")"); - work->waitforfinish.Signal(); - } - LLOG("DoJobA, todo: " << work->todo << " (CoWork " << FormatIntHex(work) << ")"); - ASSERT(work->todo >= 0); - LLOG("Finished, remaining todo " << work->todo); -} - -void CoWork::Pool::ThreadRun(int tno) -{ - LLOG("CoWork thread #" << tno << " started"); - Pool& p = GetPool(); - p.lock.Enter(); - for(;;) { - while(!p.jobs.InList()) { - LHITCOUNT("CoWork: Parking thread to Wait"); - p.waiting_threads++; - LLOG("#" << tno << " Waiting for job"); - p.waitforjob.Wait(p.lock); - LLOG("#" << tno << " Waiting ended"); - p.waiting_threads--; - if(p.quit) { - p.lock.Leave(); - return; - } - } - LLOG("#" << tno << " Job acquired"); - LHITCOUNT("CoWork: Running new job"); - p.DoJob(*p.jobs.GetNext()); - LLOG("#" << tno << " Job finished"); - } - p.lock.Leave(); - LLOG("CoWork thread #" << tno << " finished"); -} - -void CoWork::Pool::PushJob(Function&& fn, CoWork *work) -{ - ASSERT(free); - MJob& job = *free; - free = job.link_next[0]; - job.LinkAfter(&jobs); - if(work) - job.LinkAfter(&work->jobs, 1); - job.fn = pick(fn); - job.work = work; - LLOG("Adding job"); - if(waiting_threads) { - LLOG("Releasing thread waiting for job, waiting threads: " << waiting_threads); - waitforjob.Signal(); - } -} - -void CoWork::Start(Function&& fn) -{ - Pool& p = GetPool(); - Mutex::Lock __(p.lock); - while(!p.free) { // this is quite unlikely, so we can be ugly here - p.lock.Leave(); - Sleep(0); - p.lock.Enter(); - } - p.PushJob(pick(fn), NULL); -} - -void CoWork::Do(Function&& fn) -{ - LHITCOUNT("CoWork: Sheduling callback"); - Pool& p = GetPool(); - p.lock.Enter(); - if(!p.free) { - LLOG("Stack full: running in the originating thread"); - LHITCOUNT("CoWork: Stack full: Running in originating thread"); - p.lock.Leave(); - fn(); - if(Pool::finlock) - p.lock.Leave(); - return; - } - p.PushJob(pick(fn), this); - todo++; - p.lock.Leave(); -} - -void CoWork::Finish() { - Pool& p = GetPool(); - p.lock.Enter(); - while(!jobs.IsEmpty(1)) { - LLOG("Finish: todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); - p.DoJob(*jobs.GetNext(1)); - } - while(todo) { - LLOG("WaitForFinish (CoWork " << FormatIntHex(this) << ")"); - waitforfinish.Wait(p.lock); - } - p.lock.Leave(); - LLOG("CoWork " << FormatIntHex(this) << " finished"); -} - -bool CoWork::IsFinished() -{ - Pool& p = GetPool(); - p.lock.Enter(); - bool b = jobs.IsEmpty(1); - p.lock.Leave(); - return b; -} - -void CoWork::Pipe(int stepi, Function&& fn) -{ - Mutex::Lock __(stepmutex); - auto& q = step.At(stepi); - LLOG("Step " << stepi << ", count: " << q.GetCount() << ", running: " << steprunning.GetCount()); - q.AddHead(pick(fn)); - if(!steprunning.At(stepi, false)) { - steprunning.At(stepi) = true; - *this & [=]() { - LLOG("Starting step " << stepi << " processor"); - stepmutex.Enter(); - for(;;) { - Function f; - auto& q = step[stepi]; - LLOG("StepWork " << stepi << ", todo:" << q.GetCount()); - if(q.GetCount() == 0) - break; - f = pick(q.Tail()); - q.DropTail(); - stepmutex.Leave(); - f(); - stepmutex.Enter(); - } - steprunning.At(stepi) = false; - stepmutex.Leave(); - LLOG("Exiting step " << stepi << " processor"); - }; - } -} - -CoWork::CoWork() -{ - LLOG("CoWork constructed " << FormatHex(this)); - todo = 0; -} - -CoWork::~CoWork() -{ - Finish(); - LLOG("~CoWork " << FormatIntHex(this)); -} - -#endif - -END_UPP_NAMESPACE - -#endif - diff --git a/uppsrc/Core/Core.upp b/uppsrc/Core/Core.upp index 5db210d34..907b6c093 100644 --- a/uppsrc/Core/Core.upp +++ b/uppsrc/Core/Core.upp @@ -161,7 +161,6 @@ file Topic.cpp, CoWork.h, CoWork.cpp, - CoWork2.cpp, Hash.h, MD5.cpp, SHA1.cpp, diff --git a/uppsrc/Core/Mt.cpp b/uppsrc/Core/Mt.cpp index e3560e661..3d2a81eb0 100644 --- a/uppsrc/Core/Mt.cpp +++ b/uppsrc/Core/Mt.cpp @@ -62,6 +62,11 @@ void Thread::Exit() throw sThreadExitExc__(); } +struct sThreadParam { + Function cb; + bool noshutdown; +}; + static #ifdef PLATFORM_WIN32 #ifdef CPU_64 @@ -75,22 +80,20 @@ static sThreadRoutine(void *arg) { LLOG("sThreadRoutine"); - auto cb = (Function *)arg; + auto p = (sThreadParam *)arg; try { - (*cb)(); + p->cb(); } catch(Exc e) { Panic(e); } catch(sThreadExitExc__) {} catch(Upp::ExitExc) {} - AtomicDec(sThreadCount); - delete cb; + if(!p->noshutdown) + AtomicDec(sThreadCount); + delete p; if(sExit) (*sExit)(); -#ifndef COWORK2 - CoWork::ShutdownPool(); -#endif #ifdef UPP_HEAP MemoryFreeThread(); #endif @@ -112,10 +115,11 @@ Mutex vm; //a common access synchronizer //otherwise no child threads could run. they are created by main. //now each thread, having any Thread instance can start a first Run() -bool Thread::Run(Function _cb) +bool Thread::Run(Function _cb, bool noshutdown) { LLOG("Thread::Run"); - AtomicInc(sThreadCount); + if(!noshutdown) + AtomicInc(sThreadCount); if(!threadr) #ifndef CPU_BLACKFIN threadr = sMain = true; @@ -137,12 +141,14 @@ bool Thread::Run(Function _cb) } #endif Detach(); - auto cb = new Function(_cb); + auto p = new sThreadParam; + p->cb = _cb; + p->noshutdown = noshutdown; #ifdef PLATFORM_WIN32 - handle = (HANDLE)_beginthreadex(0, 0, sThreadRoutine, cb, 0, ((unsigned int *)(&thread_id))); + handle = (HANDLE)_beginthreadex(0, 0, sThreadRoutine, p, 0, ((unsigned int *)(&thread_id))); #endif #ifdef PLATFORM_POSIX - if(pthread_create(&handle, 0, sThreadRoutine, cb)) + if(pthread_create(&handle, 0, sThreadRoutine, p)) handle = 0; #endif return handle; @@ -196,23 +202,20 @@ int Thread::GetCount() return sThreadCount; } -static bool sShutdown; +static int sShutdown; void Thread::BeginShutdownThreads() { - sShutdown = true; + sShutdown++; } void Thread::EndShutdownThreads() { - sShutdown = false; + sShutdown--; } void Thread::ShutdownThreads() { -#ifndef COWORK2 - CoWork::ShutdownPool(); -#endif BeginShutdownThreads(); while(GetCount()) Sleep(100); @@ -349,7 +352,7 @@ bool Thread::Priority(int percent) #endif } -void Thread::Start(Function cb) +void Thread::Start(Function cb, bool noshutdown) { Thread t; t.Run(cb); diff --git a/uppsrc/Core/Mt.h b/uppsrc/Core/Mt.h index 9ac707d99..ebb4cbbac 100644 --- a/uppsrc/Core/Mt.h +++ b/uppsrc/Core/Mt.h @@ -49,7 +49,7 @@ class Thread : NoCopy { pthread_t handle; #endif public: - bool Run(Function cb); + bool Run(Function cb, bool noshutdown = false); void Detach(); int Wait(); @@ -73,7 +73,7 @@ public: bool Priority(int percent); // 0 = lowest, 100 = normal - static void Start(Function cb); + static void Start(Function cb, bool noshutdown = false); static void Sleep(int ms); diff --git a/uppsrc/Core/src.tpp/CoWork$en-us.tpp b/uppsrc/Core/src.tpp/CoWork$en-us.tpp index dfc595001..ac3eb458c 100644 --- a/uppsrc/Core/src.tpp/CoWork$en-us.tpp +++ b/uppsrc/Core/src.tpp/CoWork$en-us.tpp @@ -25,15 +25,38 @@ constructor called in master thread). No more thread are created or destroyed during normal work. Nesting of CoWork instances is also possible. Thread pool is normally terminated when master thread finishes.&] -[s3;%% &] +[s9;%% No synchronization is required to access CoWork instances +from various threads (CoWork is internally synchronized).&] +[s9;%% [*/ Implementation notes: ]Current implementation has single +global FIFO stack for 2048 scheduled jobs. When there is no slot +available when scheduling the job, it is performed immediately +by Do. Finish method has to wait until all jobs scheduled by +CoWork.instance are finished, while waiting it attempts to perform +scheduled jobs from the same instance. That way work always progresses +even if there is shortage of worker threads.&] [s0;%% &] [ {{10000F(128)G(128)@1 [s0;%% [* Public Method List]]}}&] [s3; &] -[s5;:Upp`:`:CoWork`:`:Start`(Upp`:`:Function``&`&`): [@(0.0.255) static] -[@(0.0.255) void]_[* Start]([_^Upp`:`:Function^ Function]<[@(0.0.255) void]_()>`&`&_[*@3 fn -])&] -[s2;%% This is a low`-level function that schedules [%-*@3 fn] to be -executed by worker thread.&] +[s5;:Upp`:`:CoWork`:`:TrySchedule`(Upp`:`:Function``&`&`): [@(0.0.255) stati +c bool]_[* TrySchedule]([_^Upp`:`:Function^ Function]<[@(0.0.255) void]_()>`&`&_[*@3 fn]) +&] +[s5;:Upp`:`:CoWork`:`:TrySchedule`(const Upp`:`:Function``&`): [@(0.0.255) s +tatic] [@(0.0.255) bool]_[* TrySchedule]([@(0.0.255) const]_[_^Upp`:`:Function^ Function]< +[@(0.0.255) void]_()>`&_[*@3 fn])&] +[s2;%% This is a low`-level function that attempts to schedule [%-*@3 fn] +to be executed by worker thread. Returns true if [%-*@3 fn] was +scheduled, false if not (in case there is no slot left in scheduling +stacks). Not that this function only schedules the function, +the exact time of execution is unknown.&] +[s3;%% &] +[s4; &] +[s5;:Upp`:`:CoWork`:`:Schedule`(Upp`:`:Function``&`&`): [@(0.0.255) static +void]_[* Schedule]([_^Upp`:`:Function^ Function]<[@(0.0.255) void]_()>`&`&_[*@3 fn])&] +[s5;:Upp`:`:CoWork`:`:Schedule`(const Upp`:`:Function``&`): [@(0.0.255) stat +ic] [@(0.0.255) void]_[* Schedule]([@(0.0.255) const]_[_^Upp`:`:Function^ Function]<[@(0.0.255) v +oid]_()>`&_[*@3 fn])&] +[s2;%% Similar to TrySchedule, but always schedules [%-*@3 fn] `- even +if it has to wait for slot to become available.&] [s3;%% &] [s4; &] [s5;:Upp`:`:CoWork`:`:Do`(Upp`:`:Function``&`&`): [@(0.0.255) void]_[* Do]([_^Upp`:`:Function^ F @@ -49,7 +72,8 @@ oWork][@(0.0.255) `&]_[* operator`&]([_^Upp`:`:Function^ Function]<[@(0.0.255) v [s2;%% Schedules [%-*@3 fn] to be executed. All changes to data done before Do are visible in the scheduled code. The order of execution or whether the code is execute in another or calling thread is -not specified.&] +not specified. In certain situations (no scheduling slot available), +Do can perform scheduled job immediately.&] [s3;%% &] [s4; &] [s5;:Upp`:`:CoWork`:`:FinLock`(`): [@(0.0.255) static] [@(0.0.255) void]_[* FinLock]()&] @@ -64,7 +88,8 @@ as with all locks, execution of locked code should be short.&] [s5;:CoWork`:`:Finish`(`): [@(0.0.255) void]_[* Finish]()&] [s2;%% Waits until all jobs scheduled using Do (or operator`&) are finished. All changes to data performed by scheduled threads -are visible after Finish.&] +are visible after Finish. While waiting, Finish can perform scheduled +jobs.&] [s3; &] [s4; &] [s5;:Upp`:`:CoWork`:`:IsFinished`(`): [@(0.0.255) bool]_[* IsFinished]()&] @@ -79,20 +104,11 @@ non`-blocking variant of Finish).&] [s3; &] [s4; &] [s5;:Upp`:`:CoWork`:`:IsWorker`(`): [@(0.0.255) static] [@(0.0.255) bool]_[* IsWorker]()&] -[s2;%% Returns true if current thread is worker thread.&] +[s2;%% Returns true if current thread is CoWork worker thread.&] [s3; &] [s4; &] -[s5;:Upp`:`:CoWork`:`:StartPool`(int`): [@(0.0.255) static] [@(0.0.255) void]_[* StartPool]( -[@(0.0.255) int]_[*@3 n])&] -[s2;%% Starts the thread pool for current master thread. Note that -this also happens automatically on first CoWork use, however -StartPool can be used to setup different number of worker thread -than default (which is CPU`_Cores() `+ 2).&] -[s3;%% &] -[s4; &] -[s5;:Upp`:`:CoWork`:`:ShutdownPool`(`): [@(0.0.255) static] [@(0.0.255) void]_[* ShutdownPo -ol]()&] -[s2;%% Waits for all jobs to finish and then releases the thread -pool.&] -[s3; &] +[s5;:Upp`:`:CoWork`:`:SetPoolSize`(int`): [@(0.0.255) static void]_[* SetPoolSize]([@(0.0.255) i +nt]_[*@3 n])&] +[s2;%% Adjusts the thread pool size (default pool size is CPU`_Cores() +`+ 2).&] [s0; ]] \ No newline at end of file diff --git a/uppsrc/Core/src.tpp/Thread$en-us.tpp b/uppsrc/Core/src.tpp/Thread$en-us.tpp index d6d0b830c..82f47dabd 100644 --- a/uppsrc/Core/src.tpp/Thread$en-us.tpp +++ b/uppsrc/Core/src.tpp/Thread$en-us.tpp @@ -20,7 +20,7 @@ topic "Thread"; [s3; &] [s5;:Thread`:`:Thread`(`): [* Thread]()&] [s2;%% Default constructor.&] -[s3; &] +[s3;%% &] [s4; &] [s5;:Thread`:`:`~Thread`(`): [@(0.0.255) `~][* Thread]()&] [s2;%% Destructor. Performs Detach `- thread continues running.&] @@ -28,9 +28,22 @@ topic "Thread"; [s0;%% &] [ {{10000F(128)G(128)@1 [s0;%% [* Public Method List]]}}&] [s3; &] -[s5;:Thread`:`:Run`(Callback`): [@(0.0.255) bool]_[* Run]([_^Callback^ Callback]_[*@3 cb])&] -[s2;%% Starts a new thread.&] -[s3; &] +[s5;:Upp`:`:Thread`:`:Run`(Upp`:`:Function``,bool`): [@(0.0.255) bool]_[* Run]( +[_^Upp`:`:Function^ Function]<[@(0.0.255) void]_()>_[*@3 cb], [@(0.0.255) bool]_[*@3 noshut +down]_`=_[@(0.0.255) false])&] +[s2;%% Starts a new thread. If [%-*@3 noshutdown] is true, started +thread is not meant to be aware of Shutdown system `- basically +it means that it does not affect thread counter.&] +[s3;%% &] +[s4; &] +[s5;:Upp`:`:Thread`:`:Start`(Upp`:`:Function``,bool`): [@(0.0.255) static] +[@(0.0.255) void]_[* Start]([_^Upp`:`:Function^ Function]<[@(0.0.255) void]_()>_[*@3 cb], +[@(0.0.255) bool]_[*@3 noshutdown]_`=_[@(0.0.255) false])&] +[s2;%% Starts a thread and returns immediately (you cannot Wait for +the thread to finish in this case). If [%-*@3 noshutdown] is true, +started thread is not meant to be aware of Shutdown system `- +basically it means that it does not affect thread counter.&] +[s3;%% &] [s4; &] [s5;:Thread`:`:Detach`(`): [@(0.0.255) void]_[* Detach]()&] [s2;%% Detaches running thread from the Thread object. It means that @@ -55,12 +68,6 @@ in that case returns immediately).&] [s2;%% Returns platform specific handle of thread.&] [s3; &] [s4; &] -[s5;:Thread`:`:Start`(Callback`): [@(0.0.255) static] [@(0.0.255) void]_[* Start]([_^Callback^ C -allback]_[*@3 cb])&] -[s2;%% Starts a thread and returns immediately (you cannot Wait for -the thread to finish in this case).&] -[s3; &] -[s4; &] [s5;:Thread`:`:Sleep`(int`): [@(0.0.255) static] [@(0.0.255) void]_[* Sleep]([@(0.0.255) int]_ [*@3 ms])&] [s2;%% Sleep for a given number of milliseconds.&] @@ -81,16 +88,17 @@ is running so far).&] [s4; &] [s5;:Thread`:`:ShutdownThreads`(`): [@(0.0.255) static] [@(0.0.255) void]_[* ShutdownThread s]()&] -[s2;%% Sets the `"Shutdown`" flag on, waits before all threads terminate, -then sets flag off again. It is meant to be used together with -IsShutdownThreads to terminate long running secondary service -threads. Main thread calls ShutdownThreads, secondary threads -test IsShutdownThreads and if true, exit.&] +[s2;%% Sets the `"Shutdown`" flag on, waits before all threads started +without noshutdown true terminate, then sets flag off again. +It is meant to be used together with IsShutdownThreads to terminate +long running secondary service threads. Main thread calls ShutdownThreads, +secondary threads test IsShutdownThreads and if true, exit.&] [s3; &] [s4; &] [s5;:Thread`:`:IsShutdownThreads`(`): [@(0.0.255) static] [@(0.0.255) bool]_[* IsShutdownTh reads]()&] -[s2;%% True if ShutdownThreads is active.&] +[s2;%% True if ShutdownThreads is active. This is supposed to be +tested by threads participating in shutdown system.&] [s3; &] [s4; &] [s5;:Thread`:`:AtExit: [@(0.0.255) static]_[@(0.0.255) void]_(`*[* AtExit]([@(0.0.255) void]_ @@ -107,5 +115,4 @@ value is not null)&] reality, current implementation supports only 5 levels, 25%, 75%, 125%, 175% and more than 175%; last two levels require root privileges. Returns true if setting the priority was successful.&] -[s3;%% &] [s0; ]] \ No newline at end of file diff --git a/uppsrc/RichText/TxtPaint.cpp b/uppsrc/RichText/TxtPaint.cpp index 3815c2438..ab6b2316f 100644 --- a/uppsrc/RichText/TxtPaint.cpp +++ b/uppsrc/RichText/TxtPaint.cpp @@ -120,9 +120,8 @@ void RichTxt::Advance(int parti, RichContext& rc, RichContext& begin) const rc.Page(); } else { - rc.py.y += pp.before; begin = rc; - rc.py.y += pp.cy + pp.after + pp.ruler; + rc.py.y += pp.before + pp.cy + pp.after + pp.ruler; } } }