diff --git a/uppsrc/Core/CoWork.cpp b/uppsrc/Core/CoWork.cpp index c6f5bf321..4d80fb0ef 100644 --- a/uppsrc/Core/CoWork.cpp +++ b/uppsrc/Core/CoWork.cpp @@ -83,6 +83,10 @@ bool CoWork::Pool::DoJob() Function fn = pick(job.fn); CoWork *work = job.work; p.scheduled--; + if(job.started) { + *job.started = true; + p.waitforstart.Broadcast(); + } p.lock.Leave(); fn(); if(!finlock) @@ -120,6 +124,37 @@ void CoWork::Pool::ThreadRun(int tno) LLOG("CoWork thread #" << tno << " finished"); } +void CoWork::Start(Function&& fn) +{ + bool started = false; + Pool& p = GetPool(); + Mutex::Lock __(p.lock); + while(p.scheduled >= SCHEDULED_MAX) { // this is quite unlikely, so we can be ugly here + p.lock.Leave(); + Sleep(0); + p.lock.Enter(); + } + PushJob(pick(fn)).started = &started; + while(!started) + p.waitforstart.Wait(p.lock); +} + +CoWork::MJob& CoWork::PushJob(Function&& fn) +{ + Pool& p = GetPool(); + MJob& job = p.jobs[p.scheduled++]; + job.work = this; + job.fn = pick(fn); + todo++; + LLOG("Adding job " << p.scheduled - 1 << "; todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); + if(p.waiting_threads) { + LLOG("Releasing thread waiting for job: " << p.waiting_threads); + p.waiting_threads--; + p.waitforjob.Signal(); + } + return job; +} + void CoWork::Do(Function&& fn) { LHITCOUNT("CoWork: Sheduling callback"); @@ -134,16 +169,7 @@ void CoWork::Do(Function&& fn) p.lock.Leave(); return; } - MJob& job = p.jobs[p.scheduled++]; - job.work = this; - job.fn = pick(fn); - todo++; - LLOG("Adding job " << p.scheduled - 1 << "; todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); - if(p.waiting_threads) { - LLOG("Releasing thread waiting for job: " << p.waiting_threads); - p.waiting_threads--; - p.waitforjob.Signal(); - } + PushJob(pick(fn)); p.lock.Leave(); } diff --git a/uppsrc/Core/CoWork.h b/uppsrc/Core/CoWork.h index c199fdeeb..931128adf 100644 --- a/uppsrc/Core/CoWork.h +++ b/uppsrc/Core/CoWork.h @@ -6,6 +6,7 @@ class CoWork : NoCopy { struct MJob : Moveable { Function fn; CoWork *work; + bool *started = NULL; }; enum { SCHEDULED_MAX = 2048 }; @@ -19,6 +20,7 @@ public: Mutex lock; ConditionVariable waitforjob; + ConditionVariable waitforstart; Pool(int nthreads); ~Pool(); @@ -40,11 +42,15 @@ public: ConditionVariable waitforfinish; int todo; + MJob& PushJob(Function&& fn); + Mutex stepmutex; Array>> step; Vector steprunning; public: + void Start(Function&& fn); + void Do(Function&& fn); void Do(const Function& fn) { Do(clone(fn)); } diff --git a/uppsrc/Core/Core.upp b/uppsrc/Core/Core.upp index ed5288a06..907b6c093 100644 --- a/uppsrc/Core/Core.upp +++ b/uppsrc/Core/Core.upp @@ -34,7 +34,6 @@ link(SOLARIS) "-Wl,-R -Wl,/usr/local/lib"; link(GCC POSIX STACKTRACE) -rdynamic; file - todo.txt, Core.h options(BUILDER_OPTION) PCH, config.h, Defs.h, diff --git a/uppsrc/Core/todo.txt b/uppsrc/Core/todo.txt deleted file mode 100644 index 5794de2bf..000000000 --- a/uppsrc/Core/todo.txt +++ /dev/null @@ -1,114 +0,0 @@ -uppbox - -oportunity && - -RawValueRep - PICK DEEP no more necessary, RawDeepToValue? - -Fix Value::Void::Retain - -Index::FirstAdd - -kw.Get(r["keywordId"], String()) - -MoveableAndDeepCopyOption - with base should improve VectorMap sizeof - -Vector/Array/Index/Map :: RemoveIf - -Remove ArrayIndex - -FromSystemCharset - -Buffer(size_t size, const T& init) { ptr = new T[size]; Fill(ptr, ptr + size, init); } - -String MD5Digest(const void *data, int length); - -Remove ValueType uses - -InVector::InsertN possibly incorrect (can create small blocks) - -Stream& Stream::operator/(int& i) { dword w = 0; if(IsStoring()) w = i + 1; Pack(w); i = w - 1; return *this; } - -CoWork pipe max_queue - -CoWork remove Semaphore - ------------------------ -DONE - -IIterator typedef typename V::ValueType T; - -SerializeRaw - int64! - -StringBuffer 2GB limit - -Remove NanoStrings - -Fix StringStream 2GB limit, -Stream SizeLimit - -CoWork pipe -CoWork limits, logical thread not working - -rename Function (Functor, Event, EventGate) - -ExitThread (or Thread::Exit), Sleep(10) - -Generic Tuple - -String() << - -String && - -Remove IsPicked - -Remove AddPick (keep as synonym) - -void XmlNode::SetAttrsPick(VectorMap&& a) - -inline String& operator<<(String&& s, const T& x) - -Remove HashFn - -for(auto i: ~map) - -Remove precomputed hash - -Remove StringC - -Create like emplace everywhere - -Global.h ? - -nanoalloc - -ONCELOCK_PTR removed - -Atomic->std::atomic, Barriers removed - -Optimize Index::Hash - -ConditionVariable - -idmapBencmark ValueMap optimize - -BiVector/Array/BiArray Pop - -Range Insert/Append in containers - -Function/Callback &&, Swap - -Range - -Remove: -WithDeepCopy DeepClone(const T& src) -class WithPick : public T { -WithPick AsPick(T&& src) - - -WithDeepCopy should implement && - -Remove CPP_11 - -Remove commented out code - -AuxMutex, MT: Remove! diff --git a/uppsrc/plugin/lz4/lz4.h b/uppsrc/plugin/lz4/lz4.h index 190674583..16767673d 100644 --- a/uppsrc/plugin/lz4/lz4.h +++ b/uppsrc/plugin/lz4/lz4.h @@ -8,8 +8,7 @@ namespace Upp { class Lz4 { - Buffer buffer; - Buffer outbuf; + StringBuffer buffer; // to be able to pass as String int8 compress; bool error; @@ -24,14 +23,18 @@ class Lz4 { String header_data; String out; - - - bool co; + bool parallel; + CoWork co; + Mutex lock; + ConditionVariable cond; + int outblock; + int inblock; void TryHeader(); void Init(); + void FinishBlock(char *outbuf, int clen, const char *origdata, int origsize); void FlushOut(); void PutOut(const void *ptr, int size); @@ -52,7 +55,7 @@ public: void Compress(); void Decompress(); - void Co(bool b) { co = b; } + void Parallel(bool b = true) { parallel = b; } bool IsError() const { return error; } @@ -67,6 +70,13 @@ String LZ4Compress(const String& s, Gate2 progress = false); String LZ4Decompress(const void *data, int64 len, Gate2 progress = false); String LZ4Decompress(const String& s, Gate2 progress = false); +int64 CoLZ4Compress(Stream& out, Stream& in, Gate2 progress = false); +int64 CoLZ4Decompress(Stream& out, Stream& in, Gate2 progress = false); +String CoLZ4Compress(const void *data, int64 len, Gate2 progress = false); +String CoLZ4Compress(const String& s, Gate2 progress = false); +String CoLZ4Decompress(const void *data, int64 len, Gate2 progress = false); +String CoLZ4Decompress(const String& s, Gate2 progress = false); + bool IsLZ4(Stream& s); }; diff --git a/uppsrc/plugin/lz4/lz4upp.cpp b/uppsrc/plugin/lz4/lz4upp.cpp index f6eb2c12a..8fe8700fb 100644 --- a/uppsrc/plugin/lz4/lz4upp.cpp +++ b/uppsrc/plugin/lz4/lz4upp.cpp @@ -27,12 +27,13 @@ void Lz4::Init() pos = 0; header = false; xxh.Reset(); + outblock = inblock = 0; } void Lz4::Compress() { compress = 1; - buffer.Alloc(BLOCK_BYTES); + buffer.SetCount(BLOCK_BYTES); Init(); } @@ -49,6 +50,26 @@ void Lz4::PutOut(const void *ptr, int size) out.Cat((const char *)ptr, (int)size); } +void Lz4::FinishBlock(char *outbuf, int clen, const char *origdata, int origsize) +{ + RTIMING("FinishBlock"); + if(error) + return; + if(clen < 0) { + error = true; + return; + } + if(clen >= origsize) { + Poke32le(outbuf, 0x80000000 | origsize); + memcpy(outbuf + 4, origdata, origsize); + WhenOut(outbuf, origsize + 4); + } + else { + Poke32le(outbuf, clen); + WhenOut(outbuf, clen + 4); + } +} + void Lz4::FlushOut() { if(!header) { @@ -60,28 +81,50 @@ void Lz4::FlushOut() WhenOut(h, 7); header = true; maxblock = BLOCK_BYTES; - outbuf.Alloc(4 + LZ4_compressBound(maxblock)); } if(!pos) return; - int clen = LZ4_compress(buffer, ~outbuf + 4, pos); - if(clen < 0) { - error = true; - return; - } - xxh.Put(buffer, pos); - if(clen >= pos) { - Poke32le(~outbuf, 0x80000000 | pos); - memcpy(~outbuf + 4, buffer, pos); - WhenOut(~outbuf, pos + 4); + int origsize = pos; + + pos = 0; + + if(parallel) { + String bs = buffer; + int inblk = inblock++; +// DLOG("Scheduling " << inblk); + co.Start([=] { + Buffer outbuf(4 + LZ4_compressBound(maxblock)); + int clen = LZ4_compress(~bs, ~outbuf + 4, origsize); + Mutex::Lock __(lock); + { RTIMING("Waiting for order"); + while(outblock != inblk) + cond.Wait(lock); + } + FinishBlock(outbuf, clen, ~bs, origsize); + outblock++; + cond.Broadcast(); + }); + RTIMING("xxh"); + xxh.Put(~bs, origsize); + buffer.SetCount(BLOCK_BYTES); } else { - Poke32le(~outbuf, clen); - WhenOut(~outbuf, clen + 4); + Buffer outbuf(4 + LZ4_compressBound(maxblock)); + xxh.Put(~buffer, origsize); + int clen = LZ4_compress(~buffer, ~outbuf + 4, origsize); + FinishBlock(outbuf, clen, buffer, origsize); + } +} + +void Lz4::SyncInOut() +{ + if(parallel) { + Mutex::Lock __(lock); + while(outblock != inblock) + cond.Wait(lock); } - pos = 0; } void Lz4::End() @@ -89,6 +132,7 @@ void Lz4::End() ASSERT(compress >= 0); if(compress) { FlushOut(); + SyncInOut(); byte h[8]; Poke32le(h, 0); Poke32le(h + 4, xxh.Finish()); @@ -134,8 +178,7 @@ void Lz4::TryHeader() header = true; blockchksumsz = 4 * !!(lz4hdr & LZ4F_BLOCKCHECKSUM); - buffer.Alloc(maxblock + 4 + blockchksumsz); - outbuf.Alloc(maxblock); + buffer.SetCount(maxblock + 4 + blockchksumsz); } void Lz4::Put(const void *ptr_, int size) @@ -193,6 +236,7 @@ void Lz4::Put(const void *ptr_, int size) int need = 4 - pos + 4; if(size >= need) { // we have enough data for final checksum memcpy(~buffer + pos, ptr, need); + SyncInOut(); if(Peek32le(~buffer + 4) != xxh.Finish()) { error = true; return; @@ -214,10 +258,8 @@ void Lz4::Put(const void *ptr_, int size) } else { int sz; - { - RTIMING("Decompress"); - sz = LZ4_decompress_safe(src, outbuf, len, maxblock); - } + Buffer outbuf(maxblock); + sz = LZ4_decompress_safe(src, outbuf, len, maxblock); if(sz < 0) { error = true; return; @@ -242,6 +284,7 @@ void Lz4::Put(const void *ptr_, int size) Lz4::Lz4() { compress = -1; + parallel = false; WhenOut = callback(this, &Lz4::PutOut); } diff --git a/uppsrc/plugin/lz4/util.cpp b/uppsrc/plugin/lz4/util.cpp index 390a687f7..d8b581a1d 100644 --- a/uppsrc/plugin/lz4/util.cpp +++ b/uppsrc/plugin/lz4/util.cpp @@ -2,13 +2,15 @@ namespace Upp { -int64 lz4Press(Stream& out, Stream& in, int64 size, Gate2 progress, bool compress) +int64 lz4Press(Stream& out, Stream& in, int64 size, Gate2 progress, bool compress, bool co = false) { Lz4 lz4; int64 r = -1; { OutFilterStream outs(out, lz4); + if(co) + lz4.Parallel(); if(compress) lz4.Compress(); else @@ -56,4 +58,38 @@ String LZ4Decompress(const String& s, Gate2 progress) return LZ4Decompress(~s, s.GetLength(), progress); } +int64 CoLZ4Compress(Stream& out, Stream& in, Gate2 progress) +{ + return lz4Press(out, in, in.GetLeft(), progress, true, true); +} + +int64 CoLZ4Decompress(Stream& out, Stream& in, Gate2 progress) +{ + return lz4Press(out, in, in.GetLeft(), progress, false, true); +} + +String CoLZ4Compress(const void *data, int64 len, Gate2 progress) +{ + StringStream out; + MemReadStream in(data, len); + return CoLZ4Compress(out, in, progress) < 0 ? String::GetVoid() : out.GetResult(); +} + +String CoLZ4Compress(const String& s, Gate2 progress) +{ + return CoLZ4Compress(~s, s.GetLength(), progress); +} + +String CoLZ4Decompress(const void *data, int64 len, Gate2 progress) +{ + StringStream out; + MemReadStream in(data, len); + return CoLZ4Decompress(out, in, progress) < 0 ? String::GetVoid() : out.GetResult(); +} + +String CoLZ4Decompress(const String& s, Gate2 progress) +{ + return CoLZ4Decompress(~s, s.GetLength(), progress); +} + }; \ No newline at end of file