diff --git a/uppsrc/Core/CoWork.cpp b/uppsrc/Core/CoWork.cpp index 4d80fb0ef..1958e8c10 100644 --- a/uppsrc/Core/CoWork.cpp +++ b/uppsrc/Core/CoWork.cpp @@ -56,6 +56,7 @@ CoWork::Pool::~Pool() LLOG("Quit"); lock.Enter(); jobs[0].work = NULL; + jobs[0].started = NULL; scheduled = 1; lock.Leave(); waitforjob.Broadcast(); @@ -74,7 +75,7 @@ bool CoWork::Pool::DoJob() { Pool& p = GetPool(); MJob& job = p.jobs[p.scheduled - 1]; - if(job.work == NULL) { + if(job.work == NULL && job.started == NULL) { LLOG("Quit thread"); return true; } @@ -91,6 +92,8 @@ bool CoWork::Pool::DoJob() fn(); if(!finlock) p.lock.Enter(); + if(!work) + return false; if(--work->todo == 0) { LLOG("Releasing waitforfinish of (CoWork " << FormatIntHex(work) << ")"); work->waitforfinish.Signal(); @@ -143,9 +146,7 @@ CoWork::MJob& CoWork::PushJob(Function&& fn) { Pool& p = GetPool(); MJob& job = p.jobs[p.scheduled++]; - job.work = this; job.fn = pick(fn); - todo++; LLOG("Adding job " << p.scheduled - 1 << "; todo: " << todo << " (CoWork " << FormatIntHex(this) << ")"); if(p.waiting_threads) { LLOG("Releasing thread waiting for job: " << p.waiting_threads); @@ -169,7 +170,8 @@ void CoWork::Do(Function&& fn) p.lock.Leave(); return; } - PushJob(pick(fn)); + PushJob(pick(fn)).work = this; + todo++; p.lock.Leave(); } diff --git a/uppsrc/Core/CoWork.h b/uppsrc/Core/CoWork.h index 931128adf..26b2e2835 100644 --- a/uppsrc/Core/CoWork.h +++ b/uppsrc/Core/CoWork.h @@ -5,7 +5,7 @@ class CoWork : NoCopy { struct MJob : Moveable { Function fn; - CoWork *work; + CoWork *work = NULL; bool *started = NULL; }; @@ -42,14 +42,14 @@ public: ConditionVariable waitforfinish; int todo; - MJob& PushJob(Function&& fn); + static MJob& PushJob(Function&& fn); Mutex stepmutex; Array>> step; Vector steprunning; public: - void Start(Function&& fn); + static void Start(Function&& fn); void Do(Function&& fn); void Do(const Function& fn) { Do(clone(fn)); } diff --git a/uppsrc/plugin/lz4/Compress.cpp b/uppsrc/plugin/lz4/Compress.cpp index 8c6de82d4..5f2e15a75 100644 --- a/uppsrc/plugin/lz4/Compress.cpp +++ b/uppsrc/plugin/lz4/Compress.cpp @@ -14,20 +14,6 @@ void LZ4CompressStream::Init() SetupBuffer(); } -void LZ4CompressStream::Clear() -{ - Init(); -} - -void LZ4CompressStream::PutOut(const void *ptr, int size) -{ - LLOG("LZ4 PutOut " << out.GetCount()); - if(out) - out->Put(ptr, size); - else - sout.Cat((const char *)ptr, (int)size); -} - void LZ4CompressStream::FinishBlock(char *outbuf, int clen, const char *origdata, int origsize) { RTIMING("FinishBlock"); @@ -40,11 +26,11 @@ void LZ4CompressStream::FinishBlock(char *outbuf, int clen, const char *origdata if(clen >= origsize) { Poke32le(outbuf, 0x80000000 | origsize); memcpy(outbuf + 4, origdata, origsize); - PutOut(outbuf, origsize + 4); + out->Put(outbuf, origsize + 4); } else { Poke32le(outbuf, clen); - PutOut(outbuf, clen + 4); + out->Put(outbuf, clen + 4); } } @@ -63,9 +49,8 @@ void LZ4CompressStream::FlushOut() h[4] = LZ4F_VERSION | LZ4F_BLOCKINDEPENDENCE | LZ4F_CONTENTCHECKSUM; h[5] = LZ4F_MAXSIZE_1024KB; h[6] = xxHash(h + 4, 2) >> 8; - PutOut(h, 7); + out->Put(h, 7); header = true; - maxblock = BLOCK_BYTES; } if(ptr == (byte *)~buffer) @@ -77,11 +62,11 @@ void LZ4CompressStream::FlushOut() WhenPos(pos); - if(parallel) { + if(co) { String bs = buffer; int inblk = inblock++; - co.Start([=] { - Buffer outbuf(4 + LZ4_compressBound(maxblock)); + CoWork::Start([=] { + Buffer outbuf(4 + LZ4_compressBound(BLOCK_BYTES)); int clen = LZ4_compress(~bs, ~outbuf + 4, origsize); Mutex::Lock __(lock); while(outblock != inblk) @@ -94,7 +79,7 @@ void LZ4CompressStream::FlushOut() SetupBuffer(); } else { - Buffer outbuf(4 + LZ4_compressBound(maxblock)); + Buffer outbuf(4 + LZ4_compressBound(BLOCK_BYTES)); xxh.Put(~buffer, origsize); int clen = LZ4_compress(~buffer, ~outbuf + 4, origsize); FinishBlock(outbuf, clen, buffer, origsize); @@ -106,7 +91,7 @@ void LZ4CompressStream::Close() { ASSERT(compress >= 0); FlushOut(); - if(parallel) { + if(co) { Mutex::Lock __(lock); while(outblock != inblock) cond.Wait(lock); @@ -114,8 +99,7 @@ void LZ4CompressStream::Close() byte h[8]; Poke32le(h, 0); Poke32le(h + 4, xxh.Finish()); - PutOut(h, 8); - Clear(); + out->Put(h, 8); buffer.Clear(); } @@ -155,13 +139,11 @@ void LZ4CompressStream::_Put(const void *data, dword size) break; } } - - DDUMP(pos); } LZ4CompressStream::LZ4CompressStream() { - parallel = false; + co = false; out = NULL; Init(); } diff --git a/uppsrc/plugin/lz4/Decompress.cpp b/uppsrc/plugin/lz4/Decompress.cpp new file mode 100644 index 000000000..0eb90e738 --- /dev/null +++ b/uppsrc/plugin/lz4/Decompress.cpp @@ -0,0 +1,190 @@ +#include "lz4.h" + +#define LLOG(x) // LOG(x) + +namespace Upp { + +void LZ4DecompressStream::Init() +{ + pos = 0; + eof = false; + buffer.Clear(); + xxh.Reset(); + outblock = inblock = 0; + ClearError(); +} + +bool LZ4DecompressStream::Open(Stream& in_) +{ + in = &in_; + String header_data = in->Get(7); + if(header_data.GetCount() < 7) { + SetError(); + return false; + } + + if(Peek32le(~header_data) != LZ4F_MAGIC) { + SetError(); + return false; + } + lz4hdr = header_data[4]; + if((lz4hdr & LZ4F_VERSIONMASK) != LZ4F_VERSION) { + SetError(); + return false; + } + if(!(lz4hdr & LZ4F_BLOCKINDEPENDENCE)) { // dependent blocks not supported + SetError(); + return false; + } + maxblock = header_data[5]; + maxblock = decode(maxblock & LZ4F_MAXSIZEMASK, + LZ4F_MAXSIZE_64KB, 1024 * 64, + LZ4F_MAXSIZE_256KB, 1024 * 256, + LZ4F_MAXSIZE_1024KB, 1024 * 1024, + LZ4F_MAXSIZE_4096KB, 1024 * 4096, + -1); + if(maxblock < 0) { + SetError(); + return false; + } + + if((lz4hdr & LZ4F_CONTENTSIZE) && in->Get(8).GetCount() != 8) { + SetError(); + return false; + } + + return true; +} + +int LZ4DecompressStream::Fetch(char *t, int size) +{ + if(IsError() || in->IsError()) + return 0; + int blksz = in->Get32le(); + if(blksz == 0) // This is EOF + return 0; + int len = blksz & 0x7fffffff; + if(len > maxblock || len > size) { + SetError(); + return 0; + } + if(blksz & 0x80000000) { + if(in->Get(t, len) != len) { + SetError(); + return 0; + } + return len; + } + String data = in->Get(len); + if(data.GetCount() != len) { + SetError(); + return 0; + } + int sz = LZ4_decompress_safe(~data, t, len, maxblock); + if(sz < 0) { + SetError(); + return 0; + } + if(lz4hdr & LZ4F_BLOCKCHECKSUM) + in->Get32le(); + return sz; +} + +String LZ4DecompressStream::Fetch() +{ + StringBuffer b(maxblock); + int sz = Fetch(~b, maxblock); + if(sz <= 0) + return Null; + b.SetLength(sz); + return b; +} + +bool LZ4DecompressStream::IsOpen() const +{ + return in->IsOpen() && !IsError(); +} + +void LZ4DecompressStream::CheckEof() +{ + if(!eof) { + if(in->Get32le() != xxh.Finish()) + SetError(); + eof = true; + } +} + +void LZ4DecompressStream::NewBuffer(const String& s) +{ + pos += buffer.GetCount(); + buffer = s; + ptr = (byte *)buffer.begin(); + rdlim = (byte *)buffer.end(); + xxh.Put(s, s.GetCount()); + if(ptr == rdlim) + CheckEof(); +} + +int LZ4DecompressStream::_Term() +{ + if(eof) + return -1; + NewBuffer(Fetch()); + return ptr == rdlim ? -1 : *ptr; +} + +int LZ4DecompressStream::_Get() +{ + if(eof) + return -1; + NewBuffer(Fetch()); + return ptr == rdlim ? -1 : *ptr++; +} + +dword LZ4DecompressStream::_Get(void *data, dword size) +{ + byte *t = (byte *)data; + while(size) { + if(IsError() || in->IsError() || IsEof()) + break; + dword n = dword(rdlim - ptr); + if(size < n) { + memcpy(t, ptr, size); + t += size; + ptr += size; + break; + } + else { + memcpy(t, ptr, n); + t += n; + size -= n; + if(size > (dword)maxblock) { + pos += buffer.GetCount(); + buffer.Clear(); + int sz = Fetch((char *)t, size); + xxh.Put(t, sz); + if(sz == 0) + CheckEof(); + size -= sz; + pos += sz; + t += sz; + } + else + NewBuffer(Fetch()); + } + } + + return t - (byte *)data; +} + +LZ4DecompressStream::LZ4DecompressStream() +{ + parallel = false; + in = NULL; +} + +LZ4DecompressStream::~LZ4DecompressStream() +{ +} + +}; diff --git a/uppsrc/plugin/lz4/lz4.h b/uppsrc/plugin/lz4/lz4.h index bff8b403d..c790c321d 100644 --- a/uppsrc/plugin/lz4/lz4.h +++ b/uppsrc/plugin/lz4/lz4.h @@ -96,16 +96,12 @@ protected: enum { BLOCK_BYTES = 1024 * 1024 }; xxHashStream xxh; - int maxblock; int blockchksumsz; bool header; byte lz4hdr; String header_data; - String sout; - - bool parallel; - CoWork co; + bool co; Mutex lock; ConditionVariable cond; int outblock; @@ -116,25 +112,65 @@ protected: void FinishBlock(char *outbuf, int clen, const char *origdata, int origsize); void FlushOut(); - void PutOut(const void *ptr, int size); - public: Event WhenPos; - void Clear(); - - const String& Get() const { return sout; } - operator const String&() const { return sout; } - const String& operator~() const { return sout; } - void ClearOut() { sout.Clear(); } - - void Parallel(bool b = true) { parallel = b; } - void Out(Stream& o) { out = &o; } + void Concurrent(bool b = true) { co = b; } + void Open(Stream& out_) { Init(); out = &out_; } LZ4CompressStream(); + LZ4CompressStream(Stream& out) : LZ4CompressStream() { Open(out); } ~LZ4CompressStream(); }; +class LZ4DecompressStream : public Stream { +public: + virtual bool IsOpen() const; + +protected: + virtual int _Term(); + virtual int _Get(); + virtual dword _Get(void *data, dword size); + +private: + Stream *in; + String buffer; + + enum { BLOCK_BYTES = 1024 * 1024 }; + + xxHashStream xxh; + int maxblock; + int blockchksumsz; + byte lz4hdr; + bool eof; + + bool parallel; + CoWork co; + Mutex lock; + ConditionVariable cond; + int outblock; + int inblock; + + void TryHeader(); + + void Init(); + int Fetch(char *t, int size); + String Fetch(); + void CheckEof(); + void NewBuffer(const String& s); + +public: + Callback2 WhenOut; + + bool Open(Stream& in); + + void Parallel(bool b = true) { parallel = b; } + + LZ4DecompressStream(); + ~LZ4DecompressStream(); +}; + + int64 LZ4Compress(Stream& out, Stream& in, Gate2 progress = false); int64 LZ4Decompress(Stream& out, Stream& in, Gate2 progress = false); String LZ4Compress(const void *data, int64 len, Gate2 progress = false); diff --git a/uppsrc/plugin/lz4/lz4.upp b/uppsrc/plugin/lz4/lz4.upp index cf8499f6e..4dbcfad90 100644 --- a/uppsrc/plugin/lz4/lz4.upp +++ b/uppsrc/plugin/lz4/lz4.upp @@ -7,6 +7,7 @@ file lz4.h, lz4upp.cpp, Compress.cpp, + Decompress.cpp, util.cpp, lib\LICENSE, Copying; diff --git a/uppsrc/plugin/lz4/lz4upp.cpp b/uppsrc/plugin/lz4/lz4upp.cpp index bf48466d6..6504889f9 100644 --- a/uppsrc/plugin/lz4/lz4upp.cpp +++ b/uppsrc/plugin/lz4/lz4upp.cpp @@ -4,23 +4,6 @@ namespace Upp { -enum { - LZ4F_MAGIC = 0x184D2204, - - LZ4F_VERSIONMASK = 0b11000000, - LZ4F_VERSION = 0b01000000, - LZ4F_BLOCKINDEPENDENCE = (1 << 5), - LZ4F_BLOCKCHECKSUM = (1 << 4), - LZ4F_CONTENTSIZE = (1 << 3), - LZ4F_CONTENTCHECKSUM = (1 << 2), - - LZ4F_MAXSIZEMASK = 0x70, - LZ4F_MAXSIZE_64KB = 0x40, - LZ4F_MAXSIZE_256KB = 0x50, - LZ4F_MAXSIZE_1024KB = 0x60, - LZ4F_MAXSIZE_4096KB = 0x70, -}; - void Lz4::Init() { error = false;