class CoWork : NoCopy { struct MJob : Moveable, Link<2> { Function fn; CoWork *work = NULL; bool looper = false; }; enum { SCHEDULED_MAX = 2048 }; public: struct Pool { Link<2> *free; Link<2> jobs; MJob slot[SCHEDULED_MAX]; int waiting_threads; Array threads; bool quit; Mutex lock; ConditionVariable waitforjob; void Free(MJob& m); void DoJob(MJob& m); void PushJob(Function&& fn, CoWork *work, bool looper = false); void InitThreads(int nthreads); void ExitThreads(); Pool(); ~Pool(); static thread_local bool finlock; bool DoJob(); static void ThreadRun(int tno); }; friend struct Pool; static Pool& GetPool(); static thread_local int worker_index; static thread_local CoWork *current; ConditionVariable waitforfinish; Link<2> jobs; // global stack and CoWork stack as double-linked lists int todo; bool canceled; std::exception_ptr exc = nullptr; // workaround for sanitizer bug(?) Function looper_fn; int looper_count; void Do0(Function&& fn, bool looper); void Cancel0(); void Finish0(); Atomic index; public: static bool TrySchedule(Function&& fn); static bool TrySchedule(const Function& fn) { return TrySchedule(clone(fn)); } static void Schedule(Function&& fn); static void Schedule(const Function& fn) { return Schedule(clone(fn)); } void Do(Function&& fn) { Do0(pick(fn), false); } void Do(const Function& fn) { Do(clone(fn)); } CoWork& operator&(const Function& fn) { Do(fn); return *this; } CoWork& operator&(Function&& fn) { Do(pick(fn)); return *this; } int GetScheduledCount() const; static void FinLock(); void Cancel(); static bool IsCanceled(); void Finish(); bool IsFinished(); void Reset(); static bool IsWorker() { return GetWorkerIndex() >= 0; } static int GetWorkerIndex(); static int GetPoolSize(); static void SetPoolSize(int n); CoWork(); ~CoWork() noexcept(false); // deprecated: void Loop(Function&& fn); void Loop(const Function& fn) { Loop(clone(fn)); } CoWork& operator*(const Function& fn) { Loop(fn); return *this; } CoWork& operator*(Function&& fn) { Loop(pick(fn)); return *this; } int Next() { return ++index - 1; } }; struct CoWorkNX : CoWork { ~CoWorkNX() noexcept(true) {} }; inline void CoDo(Function&& fn) { CoWork co; co * fn; } inline void CoDo_ST(Function&& fn) { fn(); } inline void CoDo(bool co, Function&& fn) { if(co) CoDo(pick(fn)); else CoDo_ST(pick(fn)); } template void CoFor(int n, Fn iterator) { std::atomic ii(0); CoDo([&] { for(int i = ii++; i < n; i = ii++) iterator(i); }); } template void CoFor_ST(int n, Fn iterator) { for(int i = 0; i < n; i++) iterator(i); } template void CoFor(bool co, int n, Fn iterator) { if(co) CoFor(n, iterator); else CoFor_ST(n, iterator); } template class CoWorkerResources { int workercount; Buffer res; public: int GetCount() const { return workercount + 1; } T& operator[](int i) { return res[i]; } T& Get() { int i = CoWork::GetWorkerIndex(); return res[i < 0 ? workercount : i]; } T& operator~() { return Get(); } T *begin() { return ~res; } T *end() { return ~res + GetCount(); } CoWorkerResources() { workercount = CoWork::GetPoolSize(); res.Alloc(GetCount()); } CoWorkerResources(Event initializer) : CoWorkerResources() { for(int i = 0; i < GetCount(); i++) initializer(res[i]); } }; template class AsyncWork { template struct Imp { CoWork co; Ret2 ret; template void Do(Function&& f, Args&&... args) { co.Do([=]() { ret = f(args...); }); } const Ret2& Get() { return ret; } Ret2 Pick() { return pick(ret); } }; struct ImpVoid { CoWork co; template void Do(Function&& f, Args&&... args) { co.Do([=]() { f(args...); }); } void Get() {} void Pick() {} }; using ImpType = typename std::conditional::value, ImpVoid, Imp>::type; One imp; public: template< class Function, class... Args> void Do(Function&& f, Args&&... args) { imp.Create().Do(f, args...); } void Cancel() { if(imp) imp->co.Cancel(); } static bool IsCanceled() { return CoWork::IsCanceled(); } bool IsFinished() { return imp && imp->co.IsFinished(); } Ret Get() { ASSERT(imp); imp->co.Finish(); return imp->Get(); } Ret operator~() { return Get(); } Ret Pick() { ASSERT(imp); imp->co.Finish(); return imp->Pick(); } AsyncWork& operator=(AsyncWork&&) = default; AsyncWork(AsyncWork&&) = default; AsyncWork() {} ~AsyncWork() { if(imp) imp->co.Cancel(); } }; template< class Function, class... Args> AsyncWork< #ifdef CPP_17 std::invoke_result_t #else typename std::result_of< typename std::decay::type (typename std::decay::type...) >::type #endif > Async(Function&& f, Args&&... args) { AsyncWork< #ifdef CPP_17 std::invoke_result_t #else typename std::result_of< typename std::decay::type (typename std::decay::type...) >::type #endif > h; h.Do(f, args...); return pick(h); }