pgadmin3/db/pgQueryThread.cpp
levinsv 4af765213c support PG11
Поддержка PostgreSQL 11 только для Windows
2018-10-10 22:59:25 +05:00

901 lines
21 KiB
C++

//////////////////////////////////////////////////////////////////////////
//
// pgAdmin III - PostgreSQL Tools
//
// Copyright (C) 2002 - 2016, The pgAdmin Development Team
// This software is released under the PostgreSQL Licence
//
// pgQueryThread.cpp - PostgreSQL threaded query class
//
//////////////////////////////////////////////////////////////////////////
#include "pgAdmin3.h"
// wxWindows headers
#include <wx/wx.h>
// PostgreSQL headers
#include <libpq-fe.h>
// App headers
#include "db/pgSet.h"
#include "db/pgConn.h"
#include "db/pgQueryThread.h"
#include "db/pgQueryResultEvent.h"
#include "utils/pgDefs.h"
#include "utils/sysLogger.h"
const wxEventType PGQueryResultEvent = wxNewEventType();
// default notice processor for the pgQueryThread
// we do assume that the argument passed will be always the
// object of pgQueryThread
void pgNoticeProcessor(void *_arg, const char *_message)
{
wxString str(_message, wxConvUTF8);
wxLogNotice(wxT("%s"), str.Trim().c_str());
if (_arg)
{
((pgQueryThread *)_arg)->AppendMessage(str);
}
}
// support for multiple queries support
pgQueryThread::pgQueryThread(pgConn *_conn, wxEvtHandler *_caller,
PQnoticeProcessor _processor, void *_noticeHandler) :
wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
m_cancelled(false), m_multiQueries(true), m_useCallable(false),
m_caller(_caller), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
m_eventOnCancellation(true)
{
// check if we can really use the enterprisedb callable statement and
// required
#ifdef __WXMSW__
if (PQiGetOutResult && PQiPrepareOut && PQiSendQueryPreparedOut)
{
// we do not need all of pqi stuff as90 onwards
if (m_conn && m_conn->conn && m_conn->GetIsEdb()
&& !m_conn->EdbMinimumVersion(9, 0))
{
m_useCallable = true;
}
}
#else // __WXMSW__
#ifdef EDB_LIBPQ
// use callable statement only with enterprisedb advanced servers
// we do not need all of pqi stuff as90 onwards
if (m_conn && m_conn->conn && m_conn->GetIsEdb()
&& !m_conn->EdbMinimumVersion(9, 0))
{
m_useCallable = true;
}
#endif // EDB_LIBPQ
#endif // __WXMSW__
if (m_conn && m_conn->conn)
{
PQsetnonblocking(m_conn->conn, 1);
}
if (_processor != NULL)
{
m_processor = _processor;
if (_noticeHandler)
m_noticeHandler = _noticeHandler;
else
m_noticeHandler = (void *)this;
}
else
{
m_noticeHandler = (void *)this;
}
}
pgQueryThread::pgQueryThread(pgConn *_conn, const wxString &_qry,
int _resultToRetrieve, wxWindow *_caller, long _eventId, void *_data)
: wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
m_cancelled(false), m_multiQueries(false), m_useCallable(false),
m_caller(NULL), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
m_eventOnCancellation(true)
{
if (m_conn && m_conn->conn)
{
PQsetnonblocking(m_conn->conn, 1);
}
m_queries.Add(
new pgBatchQuery(_qry, (pgParamsArray *)NULL, _eventId, _data, false,
_resultToRetrieve));
wxLogInfo(wxT("queueing : %s"), _qry.c_str());
m_noticeHandler = (void *)this;
if (_caller)
{
m_caller = _caller->GetEventHandler();
}
}
void pgQueryThread::SetEventOnCancellation(bool eventOnCancelled)
{
m_eventOnCancellation = eventOnCancelled;
}
void pgQueryThread::AddQuery(const wxString &_qry, pgParamsArray *_params,
long _eventId, void *_data, bool _useCallable, int _resultToRetrieve)
{
m_queries.Add(
new pgBatchQuery(_qry, _params, _eventId, _data,
// use callable statement only if supported
m_useCallable && _useCallable, _resultToRetrieve));
wxLogInfo(wxT("queueing (%ld): %s"), GetId(), _qry.c_str());
}
pgQueryThread::~pgQueryThread()
{
m_conn->RegisterNoticeProcessor(0, 0);
WX_CLEAR_ARRAY(m_queries);
}
wxString pgQueryThread::GetMessagesAndClear(int idx)
{
wxString msg;
if (idx == -1)
idx = m_currIndex;
if (idx >= 0 && idx <= m_currIndex)
{
wxCriticalSectionLocker lock(m_criticalSection);
msg = m_queries[idx]->m_message;
m_queries[idx]->m_message = wxT("");
}
return msg;
}
void pgQueryThread::AppendMessage(const wxString &str)
{
if (m_queries[m_currIndex]->m_message.IsEmpty())
{
wxCriticalSectionLocker lock(m_criticalSection);
if (str != wxT("\n"))
m_queries[m_currIndex]->m_message.Append(str);
}
else
{
wxCriticalSectionLocker lock(m_criticalSection);
m_queries[m_currIndex]->m_message.Append(wxT("\n")).Append(str);
}
}
int pgQueryThread::Execute()
{
wxMutexLocker lock(m_queriesLock);
PGresult *result = NULL;
wxMBConv &conv = *(m_conn->conv);
wxString &query = m_queries[m_currIndex]->m_query;
int &resultToRetrieve = m_queries[m_currIndex]->m_resToRetrieve;
long &rowsInserted = m_queries[m_currIndex]->m_rowsInserted;
Oid &insertedOid = m_queries[m_currIndex]->m_insertedOid;
// using the alias for the pointer here, in order to save the result back
// in the pgBatchQuery object
pgSet *&dataSet = m_queries[m_currIndex]->m_resultSet;
int &rc = m_queries[m_currIndex]->m_returnCode;
pgParamsArray *params = m_queries[m_currIndex]->m_params;
bool useCallable = m_queries[m_currIndex]->m_useCallable;
pgError &err = m_queries[m_currIndex]->m_err;
wxCharBuffer queryBuf = query.mb_str(conv);
if (PQstatus(m_conn->conn) != CONNECTION_OK)
{
rc = pgQueryResultEvent::PGQ_CONN_LOST;
err.msg_primary = _("Connection to the database server lost");
return(RaiseEvent(rc));
}
if (!queryBuf && !query.IsEmpty())
{
rc = pgQueryResultEvent::PGQ_STRING_INVALID;
m_conn->SetLastResultError(NULL, _("the query could not be converted to the required encoding."));
err.msg_primary = _("Query string is empty");
return(RaiseEvent(rc));
}
// Honour the parameters (if any)
if (params && params->GetCount() > 0)
{
int pCount = params->GetCount();
int ret = 0,
idx = 0;
Oid *pOids = (Oid *)malloc(pCount * sizeof(Oid));
const char **pParams = (const char **)malloc(pCount * sizeof(const char *));
int *pLens = (int *)malloc(pCount * sizeof(int));
int *pFormats = (int *)malloc(pCount * sizeof(int));
// modes are used only by enterprisedb callable statement
#if defined (__WXMSW__) || (EDB_LIBPQ)
int *pModes = (int *)malloc(pCount * sizeof(int));
#endif
for (; idx < pCount; idx++)
{
pgParam *param = (*params)[idx];
pOids[idx] = param->m_type;
pParams[idx] = (const char *)param->m_val;
pLens[idx] = param->m_len;
pFormats[idx] = param->GetFormat();
#if defined (__WXMSW__) || (EDB_LIBPQ)
pModes[idx] = param->m_mode;
#endif
}
if (useCallable)
{
#if defined (__WXMSW__) || (EDB_LIBPQ)
wxLogInfo(wxString::Format(
_("using an enterprisedb callable statement (queryid:%ld, threadid:%ld)"),
(long)m_currIndex, (long)GetId()));
wxString stmt = wxString::Format(wxT("pgQueryThread-%ld-%ld"), this->GetId(), m_currIndex);
PGresult *res = PQiPrepareOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
queryBuf, pCount, pOids, pModes);
if( PQresultStatus(res) != PGRES_COMMAND_OK)
{
rc = pgQueryResultEvent::PGQ_ERROR_PREPARE_CALLABLE;
err.SetError(res, &conv);
PQclear(res);
goto return_with_error;
}
ret = PQiSendQueryPreparedOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
pCount, pParams, pLens, pFormats, 1);
if (ret != 1)
{
rc = pgQueryResultEvent::PGQ_ERROR_EXECUTE_CALLABLE;
m_conn->SetLastResultError(NULL, _("Failed to run PQsendQuery in pgQueryThread"));
err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
PQclear(res);
res = NULL;
goto return_with_error;
}
PQclear(res);
res = NULL;
#else
rc = -1;
wxASSERT_MSG(false,
_("the program execution flow must not reach to this point in pgQueryThread"));
goto return_with_error;
#endif
}
else
{
// assumptions: we will need the results in text format only
ret = PQsendQueryParams(m_conn->conn, queryBuf, pCount, pOids, pParams, pLens, pFormats, 0);
if (ret != 1)
{
rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
m_conn->SetLastResultError(NULL,
_("Failed to run PQsendQueryParams in pgQueryThread"));
err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
wxString(PQerrorMessage(m_conn->conn), conv);
goto return_with_error;
}
}
goto continue_without_error;
return_with_error:
{
free(pOids);
free(pParams);
free(pLens);
free(pFormats);
#if defined (__WXMSW__) || (EDB_LIBPQ)
free(pModes);
#endif
return (RaiseEvent(rc));
}
}
else
{
// use the PQsendQuery api in case, we don't have any parameters to
// pass to the server
if (!PQsendQuery(m_conn->conn, queryBuf))
{
rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
wxString(PQerrorMessage(m_conn->conn), conv);
return(RaiseEvent(rc));
}
}
continue_without_error:
int resultsRetrieved = 0;
PGresult *lastResult = 0;
bool connExecutionCancelled = false;
while (true)
{
// This is a 'joinable' thread, it is not advisable to call 'delete'
// function on this.
// Hence - it does not make sense to use the function 'testdestroy' here.
// We introduced the 'CancelExecution' function for the same purpose.
//
// We will have to call the CancelExecution function of pgConn to
// inform the backend that the user has cancelled the execution.
//
// We will need to consume all the results before quiting from the thread.
// Otherwise - the connection object will become unusable, and throw
// error - because libpq connections expects application to consume all
// the result, before executing another query
//
if (m_cancelled && rc != pgQueryResultEvent::PGQ_EXECUTION_CANCELLED)
{
// We shouldn't be calling cancellation multiple time
if (!connExecutionCancelled)
{
m_conn->CancelExecution();
connExecutionCancelled = true;
}
rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
err.msg_primary = _("Execution Cancelled");
if (lastResult)
{
PQclear(lastResult);
lastResult = NULL;
}
}
if ((rc = PQconsumeInput(m_conn->conn)) != 1)
{
if (m_cancelled)
{
rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
err.msg_primary = _("Execution Cancelled");
if (lastResult)
{
PQclear(lastResult);
lastResult = NULL;
}
// There is nothing more to consume.
// We can quit the thread now.
//
// Raise the event - if the component asked for it on
// execution cancellation.
if (m_eventOnCancellation)
RaiseEvent(rc);
return rc;
}
if (rc == 0)
{
err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
}
if (PQstatus(m_conn->conn) == CONNECTION_BAD)
{
err.msg_primary = _("Connection to the database server lost");
rc = pgQueryResultEvent::PGQ_CONN_LOST;
}
else
{
rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
}
return(RaiseEvent(rc));
}
if (PQisBusy(m_conn->conn))
{
Yield();
this->Sleep(10);
continue;
}
// if resultToRetrieve is given, the nth result will be returned,
// otherwise the last result set will be returned.
// all others are discarded
PGresult *res = PQgetResult(m_conn->conn);
if (!res)
break;
if((PQresultStatus(res) == PGRES_NONFATAL_ERROR) ||
(PQresultStatus(res) == PGRES_FATAL_ERROR) ||
(PQresultStatus(res) == PGRES_BAD_RESPONSE))
{
result = res;
err.SetError(res, &conv);
// Wait for the execution to be finished
// We need to fetch all the results, before sending the error
// message
do
{
if (PQconsumeInput(m_conn->conn) != 1)
{
if (m_cancelled)
{
rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
// Release the result as the query execution has been cancelled by the
// user
if (result)
PQclear(result);
result = NULL;
if (m_eventOnCancellation)
RaiseEvent(rc);
return rc;
}
goto out_of_consume_input_loop;
}
if ((res = PQgetResult(m_conn->conn)) == NULL)
{
goto out_of_consume_input_loop;
}
// Release the temporary results
PQclear(res);
res = NULL;
if (PQisBusy(m_conn->conn))
{
Yield();
this->Sleep(10);
}
}
while (true);
break;
}
#if defined (__WXMSW__) || (EDB_LIBPQ)
// there should be 2 results in the callable statement - the first is the
// dummy, the second contains our out params.
if (useCallable)
{
PQclear(res);
result = PQiGetOutResult(m_conn->conn);
}
#endif
if (PQresultStatus(res) == PGRES_COPY_IN)
{
rc = PGRES_COPY_IN;
PQputCopyEnd(m_conn->conn, "not supported by pgadmin");
}
if (PQresultStatus(res) == PGRES_COPY_OUT)
{
int copyRc;
char *buf;
int copyRows = 0;
int lastCopyRc = 0;
rc = PGRES_COPY_OUT;
AppendMessage(_("query returned copy data:\n"));
while((copyRc = PQgetCopyData(m_conn->conn, &buf, 1)) >= 0)
{
// Ignore processing the query result, when it has already been
// cancelled by the user
if (m_cancelled)
{
if (!connExecutionCancelled)
{
m_conn->CancelExecution();
connExecutionCancelled = true;
}
continue;
}
if (buf != NULL)
{
if (copyRows < 100)
{
wxString str(buf, conv);
wxCriticalSectionLocker cs(m_criticalSection);
m_queries[m_currIndex]->m_message.Append(str);
}
else if (copyRows == 100)
AppendMessage(_("Query returned more than 100 copy rows, discarding the rest...\n"));
PQfreemem(buf);
}
if (copyRc > 0)
copyRows++;
if (lastCopyRc == 0 && copyRc == 0)
{
Yield();
this->Sleep(10);
}
if (copyRc == 0)
{
if (!PQconsumeInput(m_conn->conn))
{
// It might be the case - it is a result of the
// execution cancellation.
if (m_cancelled)
{
rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
// Release the result as the query execution has been cancelled by the
// user
if (result)
PQclear(result);
result = NULL;
if (m_eventOnCancellation)
RaiseEvent(rc);
return rc;
}
if (PQstatus(m_conn->conn) == CONNECTION_BAD)
{
err.msg_primary = _("Connection to the database server lost");
rc = pgQueryResultEvent::PGQ_CONN_LOST;
}
else
{
rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
}
return(RaiseEvent(rc));
}
}
lastCopyRc = copyRc;
}
res = PQgetResult(m_conn->conn);
if (!res)
break;
}
resultsRetrieved++;
// Save the current result, as asked by the component
// But - only if the execution is not cancelled
if (!m_cancelled && resultsRetrieved == resultToRetrieve)
{
result = res;
insertedOid = PQoidValue(res);
if (insertedOid && insertedOid != (Oid) - 1)
AppendMessage(wxString::Format(_("query inserted one row with oid %d.\n"), insertedOid));
else
AppendMessage(wxString::Format(wxPLURAL("query result with %d row will be returned.\n", "query result with %d rows will be returned.\n",
PQntuples(result)), PQntuples(result)));
continue;
}
if (lastResult)
{
if (!m_cancelled && PQntuples(lastResult))
AppendMessage(wxString::Format(wxPLURAL("query result with %d row discarded.\n", "query result with %d rows discarded.\n",
PQntuples(lastResult)), PQntuples(lastResult)));
PQclear(lastResult);
}
lastResult = res;
}
out_of_consume_input_loop:
if (m_cancelled)
{
rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
// Release the result as the query execution has been cancelled by the
// user
if (result)
PQclear(result);
result = NULL;
if (m_eventOnCancellation)
RaiseEvent(rc);
return rc;
}
if (!result)
result = lastResult;
err.SetError(result, &conv);
AppendMessage(wxT("\n"));
rc = PQresultStatus(result);
if (rc == PGRES_TUPLES_OK)
{
dataSet = new pgSet(result, m_conn, conv, m_conn->needColQuoting);
dataSet->MoveFirst();
}
else if (rc == PGRES_COMMAND_OK)
{
char *s = PQcmdTuples(result);
if (*s)
rowsInserted = atol(s);
}
else if (rc == PGRES_FATAL_ERROR ||
rc == PGRES_NONFATAL_ERROR ||
rc == PGRES_BAD_RESPONSE)
{
if (result)
{
AppendMessage(wxString(PQresultErrorMessage(result), conv));
PQclear(result);
result = NULL;
}
else
{
AppendMessage(wxString(PQerrorMessage(m_conn->conn), conv));
}
return(RaiseEvent(rc));
}
insertedOid = PQoidValue(result);
if (insertedOid == (Oid) - 1)
insertedOid = 0;
return(RaiseEvent(1));
}
int pgQueryThread::RaiseEvent(int _retval)
{
#if !defined(PGSCLI)
if (m_caller)
{
pgQueryResultEvent resultEvent(GetId(), m_queries[m_currIndex], m_queries[m_currIndex]->m_eventID);
// client data
resultEvent.SetClientData(m_queries[m_currIndex]->m_data);
resultEvent.SetInt(_retval);
m_caller->AddPendingEvent(resultEvent);
}
#endif
return _retval;
}
void *pgQueryThread::Entry()
{
do
{
if (m_currIndex < (((int)m_queries.GetCount()) - 1))
{
// Create the PGcancel object to enable cancelling the running
// query
m_conn->SetConnCancel();
m_currIndex++;
m_queries[m_currIndex]->m_returnCode = -2;
m_queries[m_currIndex]->m_rowsInserted = -1l;
wxLogSql(wxT("Thread executing query (%d:%s:%d): %s"),
m_currIndex + 1, m_conn->GetHost().c_str(), m_conn->GetPort(),
m_queries[m_currIndex]->m_query.c_str());
// register the notice processor for the current query
m_conn->RegisterNoticeProcessor(m_processor, m_noticeHandler);
// execute the current query now
Execute();
// remove the notice processor now
m_conn->RegisterNoticeProcessor(0, 0);
// reset the PGcancel object
m_conn->ResetConnCancel();
}
if (!m_multiQueries || m_cancelled)
break;
wxThread::Sleep(10);
}
while (true);
return(NULL);
}
int pgQueryThread::DeleteReleasedQueries()
{
int res = 0,
idx = 0;
if (m_queriesLock.TryLock() == wxMUTEX_BUSY)
return res;
for (; idx <= m_currIndex; idx++)
{
if (m_queries[idx]->m_resultSet != NULL)
{
pgSet *set = m_queries[idx]->m_resultSet;
m_queries[idx]->m_resultSet = NULL;
delete set;
set = NULL;
res++;
}
}
m_queriesLock.Unlock();
return res;
}
pgError pgQueryThread::GetResultError(int idx)
{
wxMutexLocker lock(m_queriesLock);
if (idx == -1)
idx = m_currIndex;
return m_queries[idx]->m_err;
}
const wxString &pgBatchQuery::GetErrorMessage()
{
return m_err.msg_primary;
}
pgBatchQuery::~pgBatchQuery()
{
if (m_resultSet)
{
delete m_resultSet;
m_resultSet = NULL;
}
if (m_params)
{
WX_CLEAR_ARRAY((*m_params));
delete m_params;
m_params = NULL;
}
}
bool pgBatchQuery::Release()
{
bool res = false;
if (m_resultSet != NULL)
{
res = true;
pgSet *set = m_resultSet;
m_resultSet = NULL;
if (set)
delete set;
set = NULL;
}
if (m_params)
{
res = true;
WX_CLEAR_ARRAY((*m_params));
delete m_params;
m_params = NULL;
}
return res;
}
pgQueryResultEvent::pgQueryResultEvent(
unsigned long _thrdId, pgBatchQuery *_qry, int _id) :
wxCommandEvent(PGQueryResultEvent, _id), m_thrdId(_thrdId),
m_query(_qry) { }
pgQueryResultEvent::pgQueryResultEvent(const pgQueryResultEvent &_ev)
: wxCommandEvent(_ev), m_thrdId(_ev.m_thrdId), m_query(_ev.m_query) { }
pgParam::pgParam(Oid _type, void *_val, int _len, short _mode)
: m_type(_type), m_val(_val), m_len(_len), m_mode(_mode)
{
switch(m_type)
{
case PGOID_TYPE_CHAR:
case PGOID_TYPE_NAME:
case PGOID_TYPE_TEXT:
case PGOID_TYPE_VARCHAR:
case PGOID_TYPE_CSTRING:
m_format = 0;
default:
m_format = 1;
}
}
// wxString data
pgParam::pgParam(Oid _oid, wxString *_val, wxMBConv *_conv, short _mode)
: m_mode(_mode)
{
if (m_mode == PG_PARAM_OUT || !_val)
{
m_len = 0;
}
else
{
m_len = _val->Len();
}
if (_val)
{
char *str = (char *)malloc(m_len + 1);
if (!_val->IsEmpty() && _mode != PG_PARAM_OUT)
{
strncpy(str,
(const char *)_val->mb_str(
*(_conv != NULL ? _conv : &wxConvLocal)), m_len);
str[m_len] = '\0';
}
else
{
str[0] = '\0';
}
m_val = (void *)(str);
}
else
{
m_val = NULL;
}
m_type = _oid;
// text format
m_format = 0;
}
pgParam::~pgParam()
{
if (m_val)
free(m_val);
m_val = NULL;
}
int pgParam::GetFormat()
{
return m_format;
}