Add control schedule run for pro_scheduler.

Добавлен контроль запуска задач по расписанию (из-за проблем в версии 2.7)
При найденных пропусках запуска, добавляется строчка "Previous sched time run skip"
в Свойства. Всего проверяется 100 последних запусков.
This commit is contained in:
lsv 2021-01-13 10:36:38 +05:00
parent 3c245c16a5
commit 7caaf132f3
3 changed files with 193 additions and 7 deletions

View file

@ -303,7 +303,7 @@ wxDateTime pgSet::GetDateTime(const int col) const
{
wxASSERT(col < nCols && col >= 0);
wxDateTime dt;
wxDateTime dt((time_t)-1);
wxString str = GetVal(col);
/* This hasn't just been used. ( Is not infinity ) */
if (!str.IsEmpty())

View file

@ -53,6 +53,145 @@ public:
{
return crontab;
}
void iSetSched(int cron, wxString &mi, wxString& h, wxString& d, wxString& wd, wxString& mon)
{
for (int i = 0; i < 60; i++) _mi[i] = false;
for (int i = 0; i < 24; i++) _h[i] = false;
for (int i = 0; i < 31; i++) _d[i] = false;
for (int i = 0; i < 7; i++) _wd[i] = false;
for (int i = 0; i < 12; i++) _mon[i] = false;
wxStringTokenizer confkey(mi.substr(1,mi.Len()-1),",");
while (confkey.HasMoreTokens())
{
wxString str = confkey.GetNextToken();
long j = atol(str.ToAscii());
_mi[j] = true;
}
confkey.SetString(h.substr(1, h.Len() - 1),",");
while (confkey.HasMoreTokens())
{
wxString str = confkey.GetNextToken();
long j = atol(str.ToAscii());
_h[j] = true;
}
confkey.SetString(wd.substr(1, wd.Len() - 1), ",");
while (confkey.HasMoreTokens())
{
wxString str = confkey.GetNextToken();
long j = atol(str.ToAscii());
_wd[j] = true;
}
confkey.SetString(d.substr(1, d.Len() - 1), ",");
while (confkey.HasMoreTokens())
{
wxString str = confkey.GetNextToken();
long j = atol(str.ToAscii())-1;
_d[j] = true;
}
confkey.SetString(mon.substr(1, mon.Len() - 1), ",");
while (confkey.HasMoreTokens())
{
wxString str = confkey.GetNextToken();
long j = atol(str.ToAscii())-1;
_mon[j] = true;
}
}
// ïîëó÷èòü èíäåêñ ñëåäóþùåãî ýëåìåíòà èëè -1 åñëè åãî óæå íåò.
int getnext(bool array[],int len,int pos, int direct) {
//int len = sizeof(array) / sizeof(array[0]);
int mi = pos;
do {
mi = mi + direct;
if (mi > -1 && mi < len) {
}
else {
if (direct == 1 && mi == len) mi = 0;
if (direct == -1 && mi < 0) mi = len - 1;
mi = -1;
break;
}
} while (!array[mi]);
// âîçâðàùàåò -1 åñëè íåò ñëóäóþùåãî ýëåìåíòà.
return mi;
}
// ïîëó÷èòü èíäåêñ ïåðâûé/ïîñëåäíèé ýëåìåíòà
int getfirst(bool array[], int len, int direct) {
int mi = -1;
if (direct == -1) mi = len ;
do {
mi = mi + direct;
if (mi > -1 && mi < len) {
}
else {
if (direct == 1 && mi == len) mi = 0;
if (direct == -1 && mi < 0) mi = len - 1;
mi = -1;
break;
}
} while (!array[mi]);
return mi;
}
wxDateTime GetNextSchedule_At(wxDateTime prev_at, int dr) {
int mi = prev_at.GetMinute();
int h = prev_at.GetHour();
int d = prev_at.GetDay()-1;
int wd=prev_at.GetWeekDay();
int mon = prev_at.GetMonth();
int y = prev_at.GetYear();
bool novalid = false;
int nextp = getnext(_mi, sizeof(_mi) / sizeof(_mi[0]),mi, dr);
if (nextp == -1) {
mi = getfirst(_mi, sizeof(_mi) / sizeof(_mi[0]), dr);
nextp = getnext(_h, sizeof(_h) / sizeof(_h[0]), h, dr);
if (nextp == -1) {
// hours
h = getfirst(_h, sizeof(_h) / sizeof(_h[0]), dr);
do {
nextp = getnext(_d, sizeof(_d) / sizeof(_d[0]), d, dr);
if (nextp == -1) {
// days
d = getfirst(_d, sizeof(_d) / sizeof(_d[0]), dr);
nextp = getnext(_mon, sizeof(_mon) / sizeof(_mon[0]), mon, dr);
if (nextp == -1) {
// mon
mon = getfirst(_mon, sizeof(_mon) / sizeof(_mon[0]), dr);
y = y + dr;
}
else mon = nextp;
}
else d = nextp;
// day next
// ïðîâåðèì ñîîòâåòñòâèå wdays
wxDateTime tmp((wxDateTime::wxDateTime_t) d + 1, (wxDateTime::Month) mon, y, (wxDateTime::wxDateTime_t)h, (wxDateTime::wxDateTime_t)mi);
novalid = !tmp.IsValid();
wd = tmp.GetWeekDay();
//if (nextp == -1) nextp=getfirst(_wd, dr);
} while (!_wd[wd] || novalid);
}
else h = nextp;
}
else mi = nextp;
//
wxDateTime tmp((wxDateTime::wxDateTime_t) d+1, (wxDateTime::Month) mon, y, (wxDateTime::wxDateTime_t)h, (wxDateTime::wxDateTime_t)mi);
return tmp;
}
wxDateTime GetSchedMin() const
{
return sched_min;
}
void iSetSchedMin(const wxDateTime& d)
{
sched_min = d;
}
void iSetCrontab(const wxString &s)
{
crontab = s;
@ -186,8 +325,9 @@ public:
private:
bool enabled;
wxDateTime finished, changed, nextrun, lastrun;
wxDateTime finished, changed, nextrun, lastrun,sched_min;
wxString message, crontab, runas, commands,status,rule,tryname;
bool _d[31], _h[24], _mi[60], _wd[7], _mon[12];
long recId;
};

View file

@ -145,6 +145,43 @@ void pgproJob::ShowTreeDetail(ctlTree *browser, frmMain *form, ctlListView *prop
properties->AppendItem(_("Runas"), GetRunAs());
properties->AppendItem(_("Message"), GetMessage());
properties->AppendItem(_("Comment"), firstLineOnly(GetComment()));
//wxDateTime dt= GetNextSchedule_At(GetStarted(),-1);
wxDateTime t = wxDateTime::Now();
wxDateTime dt = GetStarted();
if (!dt.IsValid()) dt = t;
wxDateTime prev;
while (dt<t) {
prev = dt;
dt=GetNextSchedule_At(dt, 1);
}
//dt = prev;
wxString str;
for (int i = 0; i < 100; i++) {
dt = GetNextSchedule_At(dt, -1);
if (dt < GetSchedMin()) break;
if (str.Len() > 0) str += "),(";
str += "'"+dt.FormatISODate() + wxT(" ") + dt.FormatISOTime()+ "'";
}
wxString sql = "with at(dt) as(select to_timestamp(t.dt,'YYY-MM-DD HH24:MI') dt from (values("+str+")) as t(dt))\n";
sql += "select dt,status from at left join schedule.get_log() b on b.scheduled_at=dt and b.cron="+ NumToStr(GetRecId())+" where b.scheduled_at is null;";
pgSet* jobs = GetConnection()->ExecuteSet(sql);
if (jobs)
{
wxString str;
while (!jobs->Eof())
{
str+=jobs->GetVal(wxT("dt"))+";";
jobs->MoveNext();
}
delete jobs;
if (str.Len()>0) properties->AppendItem(_("Previous sched time run skip"), str);
}
}
}
@ -167,15 +204,17 @@ pgObject *pgproJob::Refresh(ctlTree *browser, const wxTreeItemId item)
pgObject *pgproJobFactory::CreateObjects(pgCollection *collection, ctlTree *browser, const wxString &restriction)
{
pgproJob *job = 0;
wxString sql= wxT("with last_job as (select cron,max(started) started from schedule.get_log() b group by cron), a as (select cron,scheduled_at,started,finished,status,message from schedule.get_active_jobs()), lastj as (select b.cron,")
wxString sql= wxT("with last_job as (select cron,max(started) started,min(scheduled_at) scheduled_at_min from schedule.get_log() b group by cron)")
wxT(", a as (select cron,scheduled_at,started,finished,status,message from schedule.get_active_jobs()), lastj as (select b.cron,")
wxT("coalesce((select scheduled_at from a where a.cron=b.cron),b.scheduled_at) scheduled_at")
wxT(",coalesce((select started from a where a.cron=b.cron),b.started) started")
wxT(",case when (select status from a where a.cron=b.cron)='working' then null else b.finished end finished")
wxT(",coalesce((select status from a where a.cron=b.cron),b.status) status")
wxT(",coalesce((select message from a where a.cron=b.cron),b.message) message")
wxT(",scheduled_at_min")
wxT(" from schedule.get_log() b,last_job where b.cron=last_job.cron and b.started=last_job.started")
wxT(") select j.crontab cron,* from (select c.*,null stop,l.* from schedule.get_cron() c left join lastj l on c.id=l.cron) c")
wxT(" join lateral (select crontab from jsonb_to_record(c.rule) as (d int[],h int[],wd int[],m int[],crontab text,mi int[]) ) j on true")
wxT(") select j.crontab cron,j.days,j.hours,j.wdays,j.months,j.minutes,* from (select c.*,null stop,l.* from schedule.get_cron() c left join lastj l on c.id=l.cron) c")
wxT(" join lateral (select crontab,days,hours,wdays,months,minutes from jsonb_to_record(c.rule) as (days int[],hours int[],wdays int[],months int[],crontab text,minutes int[]) ) j on true")
wxT("")
wxT("")
wxT(" ") + restriction;
@ -212,8 +251,9 @@ pgObject *pgproJobFactory::CreateObjects(pgCollection *collection, ctlTree *brow
if (tmp.find('{',0)==0) { tmp=wxT("[")+tmp.Mid(1,tmp.Len()-2)+wxT("]"); }
//tmp.Replace(wxT("{"), wxT("["),false);
//tmp.Replace(wxT("}"), wxT("]"));
// days,hours,wdays,months,minutes
job->iSetSched(job->GetRecId(), jobs->GetVal("minutes"), jobs->GetVal("hours"), jobs->GetVal("days"), jobs->GetVal("wdays"), jobs->GetVal("months"));
job->iSetSchedMin(jobs->GetDateTime(wxT("scheduled_at_min")));
job->iSetCommands(tmp);
job->iSetRunAs(jobs->GetVal(wxT("run_as")));
job->iSetStarted(jobs->GetDateTime(wxT("started")));
@ -234,6 +274,12 @@ pgObject *pgproJobFactory::CreateObjects(pgCollection *collection, ctlTree *brow
if (colname==wxT("id")
||colname==wxT("node")
||colname==wxT("rule")
||colname == wxT("minutes")
||colname == wxT("days")
||colname == wxT("wdays")
||colname == wxT("hours")
||colname == wxT("months")
||colname == wxT("scheduled_at_min")
||colname==wxT("owner")) continue;
if (colname==wxT("broken")) break;