Core: CoWork now using ConditionVariable instead of Semaphore. Lz4 MT in development

git-svn-id: svn://ultimatepp.org/upp/trunk@10070 f0d560ea-af0d-0410-9eb7-867de7ffcac7
This commit is contained in:
cxl 2016-07-18 16:40:54 +00:00
parent 78d8b8f322
commit c0750bc340
7 changed files with 159 additions and 153 deletions

View file

@ -83,6 +83,10 @@ bool CoWork::Pool::DoJob()
Function<void ()> fn = pick(job.fn);
CoWork *work = job.work;
p.scheduled--;
if(job.started) {
*job.started = true;
p.waitforstart.Broadcast();
}
p.lock.Leave();
fn();
if(!finlock)
@ -120,6 +124,37 @@ void CoWork::Pool::ThreadRun(int tno)
LLOG("CoWork thread #" << tno << " finished");
}
void CoWork::Start(Function<void ()>&& fn)
{
bool started = false;
Pool& p = GetPool();
Mutex::Lock __(p.lock);
while(p.scheduled >= SCHEDULED_MAX) { // this is quite unlikely, so we can be ugly here
p.lock.Leave();
Sleep(0);
p.lock.Enter();
}
PushJob(pick(fn)).started = &started;
while(!started)
p.waitforstart.Wait(p.lock);
}
CoWork::MJob& CoWork::PushJob(Function<void ()>&& fn)
{
Pool& p = GetPool();
MJob& job = p.jobs[p.scheduled++];
job.work = this;
job.fn = pick(fn);
todo++;
LLOG("Adding job " << p.scheduled - 1 << "; todo: " << todo << " (CoWork " << FormatIntHex(this) << ")");
if(p.waiting_threads) {
LLOG("Releasing thread waiting for job: " << p.waiting_threads);
p.waiting_threads--;
p.waitforjob.Signal();
}
return job;
}
void CoWork::Do(Function<void ()>&& fn)
{
LHITCOUNT("CoWork: Sheduling callback");
@ -134,16 +169,7 @@ void CoWork::Do(Function<void ()>&& fn)
p.lock.Leave();
return;
}
MJob& job = p.jobs[p.scheduled++];
job.work = this;
job.fn = pick(fn);
todo++;
LLOG("Adding job " << p.scheduled - 1 << "; todo: " << todo << " (CoWork " << FormatIntHex(this) << ")");
if(p.waiting_threads) {
LLOG("Releasing thread waiting for job: " << p.waiting_threads);
p.waiting_threads--;
p.waitforjob.Signal();
}
PushJob(pick(fn));
p.lock.Leave();
}

View file

@ -6,6 +6,7 @@ class CoWork : NoCopy {
struct MJob : Moveable<MJob> {
Function<void ()> fn;
CoWork *work;
bool *started = NULL;
};
enum { SCHEDULED_MAX = 2048 };
@ -19,6 +20,7 @@ public:
Mutex lock;
ConditionVariable waitforjob;
ConditionVariable waitforstart;
Pool(int nthreads);
~Pool();
@ -40,11 +42,15 @@ public:
ConditionVariable waitforfinish;
int todo;
MJob& PushJob(Function<void ()>&& fn);
Mutex stepmutex;
Array<BiVector<Function<void ()>>> step;
Vector<bool> steprunning;
public:
void Start(Function<void ()>&& fn);
void Do(Function<void ()>&& fn);
void Do(const Function<void ()>& fn) { Do(clone(fn)); }

View file

@ -34,7 +34,6 @@ link(SOLARIS) "-Wl,-R -Wl,/usr/local/lib";
link(GCC POSIX STACKTRACE) -rdynamic;
file
todo.txt,
Core.h options(BUILDER_OPTION) PCH,
config.h,
Defs.h,

View file

@ -1,114 +0,0 @@
uppbox
oportunity &&
RawValueRep - PICK DEEP no more necessary, RawDeepToValue?
Fix Value::Void::Retain
Index::FirstAdd
kw.Get(r["keywordId"], String())
MoveableAndDeepCopyOption - with base should improve VectorMap sizeof
Vector/Array/Index/Map :: RemoveIf
Remove ArrayIndex
FromSystemCharset
Buffer(size_t size, const T& init) { ptr = new T[size]; Fill(ptr, ptr + size, init); }
String MD5Digest(const void *data, int length);
Remove ValueType uses
InVector::InsertN possibly incorrect (can create small blocks)
Stream& Stream::operator/(int& i) { dword w = 0; if(IsStoring()) w = i + 1; Pack(w); i = w - 1; return *this; }
CoWork pipe max_queue
CoWork remove Semaphore
-----------------------
DONE
IIterator typedef typename V::ValueType T;
SerializeRaw - int64!
StringBuffer 2GB limit
Remove NanoStrings
Fix StringStream 2GB limit,
Stream SizeLimit
CoWork pipe
CoWork limits, logical thread not working
rename Function (Functor, Event, EventGate)
ExitThread (or Thread::Exit), Sleep(10)
Generic Tuple
String() <<
String &&
Remove IsPicked
Remove AddPick (keep as synonym)
void XmlNode::SetAttrsPick(VectorMap<String, String>&& a)
inline String& operator<<(String&& s, const T& x)
Remove HashFn
for(auto i: ~map)
Remove precomputed hash
Remove StringC
Create like emplace everywhere
Global.h ?
nanoalloc
ONCELOCK_PTR removed
Atomic->std::atomic, Barriers removed
Optimize Index::Hash
ConditionVariable
idmapBencmark ValueMap optimize
BiVector/Array/BiArray Pop
Range Insert/Append in containers
Function/Callback &&, Swap
Range
Remove:
WithDeepCopy<T> DeepClone(const T& src)
class WithPick : public T {
WithPick<T> AsPick(T&& src)
WithDeepCopy should implement &&
Remove CPP_11
Remove commented out code
AuxMutex, MT: Remove!

View file

@ -8,8 +8,7 @@
namespace Upp {
class Lz4 {
Buffer<char> buffer;
Buffer<char> outbuf;
StringBuffer buffer; // to be able to pass as String
int8 compress;
bool error;
@ -24,14 +23,18 @@ class Lz4 {
String header_data;
String out;
bool co;
bool parallel;
CoWork co;
Mutex lock;
ConditionVariable cond;
int outblock;
int inblock;
void TryHeader();
void Init();
void FinishBlock(char *outbuf, int clen, const char *origdata, int origsize);
void FlushOut();
void PutOut(const void *ptr, int size);
@ -52,7 +55,7 @@ public:
void Compress();
void Decompress();
void Co(bool b) { co = b; }
void Parallel(bool b = true) { parallel = b; }
bool IsError() const { return error; }
@ -67,6 +70,13 @@ String LZ4Compress(const String& s, Gate2<int64, int64> progress = false);
String LZ4Decompress(const void *data, int64 len, Gate2<int64, int64> progress = false);
String LZ4Decompress(const String& s, Gate2<int64, int64> progress = false);
int64 CoLZ4Compress(Stream& out, Stream& in, Gate2<int64, int64> progress = false);
int64 CoLZ4Decompress(Stream& out, Stream& in, Gate2<int64, int64> progress = false);
String CoLZ4Compress(const void *data, int64 len, Gate2<int64, int64> progress = false);
String CoLZ4Compress(const String& s, Gate2<int64, int64> progress = false);
String CoLZ4Decompress(const void *data, int64 len, Gate2<int64, int64> progress = false);
String CoLZ4Decompress(const String& s, Gate2<int64, int64> progress = false);
bool IsLZ4(Stream& s);
};

View file

@ -27,12 +27,13 @@ void Lz4::Init()
pos = 0;
header = false;
xxh.Reset();
outblock = inblock = 0;
}
void Lz4::Compress()
{
compress = 1;
buffer.Alloc(BLOCK_BYTES);
buffer.SetCount(BLOCK_BYTES);
Init();
}
@ -49,6 +50,26 @@ void Lz4::PutOut(const void *ptr, int size)
out.Cat((const char *)ptr, (int)size);
}
void Lz4::FinishBlock(char *outbuf, int clen, const char *origdata, int origsize)
{
RTIMING("FinishBlock");
if(error)
return;
if(clen < 0) {
error = true;
return;
}
if(clen >= origsize) {
Poke32le(outbuf, 0x80000000 | origsize);
memcpy(outbuf + 4, origdata, origsize);
WhenOut(outbuf, origsize + 4);
}
else {
Poke32le(outbuf, clen);
WhenOut(outbuf, clen + 4);
}
}
void Lz4::FlushOut()
{
if(!header) {
@ -60,28 +81,50 @@ void Lz4::FlushOut()
WhenOut(h, 7);
header = true;
maxblock = BLOCK_BYTES;
outbuf.Alloc(4 + LZ4_compressBound(maxblock));
}
if(!pos)
return;
int clen = LZ4_compress(buffer, ~outbuf + 4, pos);
if(clen < 0) {
error = true;
return;
}
xxh.Put(buffer, pos);
if(clen >= pos) {
Poke32le(~outbuf, 0x80000000 | pos);
memcpy(~outbuf + 4, buffer, pos);
WhenOut(~outbuf, pos + 4);
int origsize = pos;
pos = 0;
if(parallel) {
String bs = buffer;
int inblk = inblock++;
// DLOG("Scheduling " << inblk);
co.Start([=] {
Buffer<char> outbuf(4 + LZ4_compressBound(maxblock));
int clen = LZ4_compress(~bs, ~outbuf + 4, origsize);
Mutex::Lock __(lock);
{ RTIMING("Waiting for order");
while(outblock != inblk)
cond.Wait(lock);
}
FinishBlock(outbuf, clen, ~bs, origsize);
outblock++;
cond.Broadcast();
});
RTIMING("xxh");
xxh.Put(~bs, origsize);
buffer.SetCount(BLOCK_BYTES);
}
else {
Poke32le(~outbuf, clen);
WhenOut(~outbuf, clen + 4);
Buffer<char> outbuf(4 + LZ4_compressBound(maxblock));
xxh.Put(~buffer, origsize);
int clen = LZ4_compress(~buffer, ~outbuf + 4, origsize);
FinishBlock(outbuf, clen, buffer, origsize);
}
}
void Lz4::SyncInOut()
{
if(parallel) {
Mutex::Lock __(lock);
while(outblock != inblock)
cond.Wait(lock);
}
pos = 0;
}
void Lz4::End()
@ -89,6 +132,7 @@ void Lz4::End()
ASSERT(compress >= 0);
if(compress) {
FlushOut();
SyncInOut();
byte h[8];
Poke32le(h, 0);
Poke32le(h + 4, xxh.Finish());
@ -134,8 +178,7 @@ void Lz4::TryHeader()
header = true;
blockchksumsz = 4 * !!(lz4hdr & LZ4F_BLOCKCHECKSUM);
buffer.Alloc(maxblock + 4 + blockchksumsz);
outbuf.Alloc(maxblock);
buffer.SetCount(maxblock + 4 + blockchksumsz);
}
void Lz4::Put(const void *ptr_, int size)
@ -193,6 +236,7 @@ void Lz4::Put(const void *ptr_, int size)
int need = 4 - pos + 4;
if(size >= need) { // we have enough data for final checksum
memcpy(~buffer + pos, ptr, need);
SyncInOut();
if(Peek32le(~buffer + 4) != xxh.Finish()) {
error = true;
return;
@ -214,10 +258,8 @@ void Lz4::Put(const void *ptr_, int size)
}
else {
int sz;
{
RTIMING("Decompress");
sz = LZ4_decompress_safe(src, outbuf, len, maxblock);
}
Buffer<char> outbuf(maxblock);
sz = LZ4_decompress_safe(src, outbuf, len, maxblock);
if(sz < 0) {
error = true;
return;
@ -242,6 +284,7 @@ void Lz4::Put(const void *ptr_, int size)
Lz4::Lz4()
{
compress = -1;
parallel = false;
WhenOut = callback(this, &Lz4::PutOut);
}

View file

@ -2,13 +2,15 @@
namespace Upp {
int64 lz4Press(Stream& out, Stream& in, int64 size, Gate2<int64, int64> progress, bool compress)
int64 lz4Press(Stream& out, Stream& in, int64 size, Gate2<int64, int64> progress, bool compress, bool co = false)
{
Lz4 lz4;
int64 r = -1;
{
OutFilterStream outs(out, lz4);
if(co)
lz4.Parallel();
if(compress)
lz4.Compress();
else
@ -56,4 +58,38 @@ String LZ4Decompress(const String& s, Gate2<int64, int64> progress)
return LZ4Decompress(~s, s.GetLength(), progress);
}
int64 CoLZ4Compress(Stream& out, Stream& in, Gate2<int64, int64> progress)
{
return lz4Press(out, in, in.GetLeft(), progress, true, true);
}
int64 CoLZ4Decompress(Stream& out, Stream& in, Gate2<int64, int64> progress)
{
return lz4Press(out, in, in.GetLeft(), progress, false, true);
}
String CoLZ4Compress(const void *data, int64 len, Gate2<int64, int64> progress)
{
StringStream out;
MemReadStream in(data, len);
return CoLZ4Compress(out, in, progress) < 0 ? String::GetVoid() : out.GetResult();
}
String CoLZ4Compress(const String& s, Gate2<int64, int64> progress)
{
return CoLZ4Compress(~s, s.GetLength(), progress);
}
String CoLZ4Decompress(const void *data, int64 len, Gate2<int64, int64> progress)
{
StringStream out;
MemReadStream in(data, len);
return CoLZ4Decompress(out, in, progress) < 0 ? String::GetVoid() : out.GetResult();
}
String CoLZ4Decompress(const String& s, Gate2<int64, int64> progress)
{
return CoLZ4Decompress(~s, s.GetLength(), progress);
}
};