573 lines
10 KiB
C++
573 lines
10 KiB
C++
#include "amxxmodule.h"
|
|
#include "sqlite_header.h"
|
|
#include "threading.h"
|
|
|
|
using namespace SourceMod;
|
|
using namespace SourceHook;
|
|
|
|
MainThreader g_Threader;
|
|
ThreadWorker *g_pWorker = NULL;
|
|
extern DLL_FUNCTIONS *g_pFunctionTable;
|
|
IMutex *g_QueueLock = NULL;
|
|
CStack<MysqlThread *> g_ThreadQueue;
|
|
CStack<MysqlThread *> g_FreeThreads;
|
|
float g_lasttime = 0.0f;
|
|
|
|
void ShutdownThreading()
|
|
{
|
|
if (g_pWorker)
|
|
{
|
|
g_pWorker->Stop(true);
|
|
delete g_pWorker;
|
|
g_pWorker = NULL;
|
|
}
|
|
|
|
g_QueueLock->Lock();
|
|
while (!g_ThreadQueue.empty())
|
|
{
|
|
delete g_ThreadQueue.front();
|
|
g_ThreadQueue.pop();
|
|
}
|
|
while (!g_FreeThreads.empty())
|
|
{
|
|
delete g_FreeThreads.front();
|
|
g_FreeThreads.pop();
|
|
}
|
|
g_QueueLock->Unlock();
|
|
g_QueueLock->DestroyThis();
|
|
|
|
FreeHandleTable();
|
|
}
|
|
|
|
//public QueryHandler(state, Handle:query, error[], errnum, data[], size)
|
|
//native SQL_ThreadQuery(Handle:cn_tuple, const handler[], const query[], const data[]="", dataSize=0);
|
|
static cell AMX_NATIVE_CALL SQL_ThreadQuery(AMX *amx, cell *params)
|
|
{
|
|
if (!g_pWorker)
|
|
{
|
|
MF_LogError(amx, AMX_ERR_NATIVE, "Thread worker was unable to start.");
|
|
return 0;
|
|
}
|
|
|
|
SQL_Connection *cn = (SQL_Connection *)GetHandle(params[1], Handle_Connection);
|
|
if (!cn)
|
|
{
|
|
MF_LogError(amx, AMX_ERR_NATIVE, "Invalid info tuple handle: %d", params[1]);
|
|
return 0;
|
|
}
|
|
|
|
int len;
|
|
const char *handler = MF_GetAmxString(amx, params[2], 0, &len);
|
|
int fwd = MF_RegisterSPForwardByName(amx, handler, FP_CELL, FP_CELL, FP_STRING, FP_CELL, FP_ARRAY, FP_CELL, FP_DONE);
|
|
if (fwd < 1)
|
|
{
|
|
MF_LogError(amx, AMX_ERR_NATIVE, "Function not found: %s", handler);
|
|
return 0;
|
|
}
|
|
|
|
MysqlThread *kmThread;
|
|
g_QueueLock->Lock();
|
|
if (g_FreeThreads.empty())
|
|
{
|
|
kmThread = new MysqlThread();
|
|
} else {
|
|
kmThread = g_FreeThreads.front();
|
|
g_FreeThreads.pop();
|
|
}
|
|
g_QueueLock->Unlock();
|
|
|
|
kmThread->SetInfo(cn->db);
|
|
kmThread->SetForward(fwd);
|
|
kmThread->SetQuery(MF_GetAmxString(amx, params[3], 1, &len));
|
|
kmThread->SetCellData(MF_GetAmxAddr(amx, params[4]), (ucell)params[5]);
|
|
|
|
g_pWorker->MakeThread(kmThread);
|
|
|
|
return 1;
|
|
}
|
|
|
|
MysqlThread::MysqlThread()
|
|
{
|
|
m_fwd = 0;
|
|
m_data = NULL;
|
|
m_datalen = 0;
|
|
m_maxdatalen = 0;
|
|
}
|
|
|
|
MysqlThread::~MysqlThread()
|
|
{
|
|
if (m_fwd)
|
|
{
|
|
MF_UnregisterSPForward(m_fwd);
|
|
m_fwd = 0;
|
|
}
|
|
|
|
delete [] m_data;
|
|
m_data = NULL;
|
|
}
|
|
|
|
void MysqlThread::SetCellData(cell data[], ucell len)
|
|
{
|
|
if (len > m_maxdatalen)
|
|
{
|
|
delete [] m_data;
|
|
m_data = new cell[len];
|
|
m_maxdatalen = len;
|
|
}
|
|
if (len)
|
|
{
|
|
m_datalen = len;
|
|
memcpy(m_data, data, len*sizeof(cell));
|
|
}
|
|
}
|
|
|
|
void MysqlThread::SetForward(int forward)
|
|
{
|
|
m_fwd = forward;
|
|
}
|
|
|
|
void MysqlThread::SetInfo(const char *db)
|
|
{
|
|
m_db.assign(db);
|
|
m_qrInfo.queue_time = gpGlobals->time;
|
|
}
|
|
|
|
void MysqlThread::SetQuery(const char *query)
|
|
{
|
|
m_query.assign(query);
|
|
}
|
|
|
|
void MysqlThread::RunThread(IThreadHandle *pHandle)
|
|
{
|
|
DatabaseInfo info;
|
|
|
|
info.database = m_db.c_str();
|
|
info.pass = "";
|
|
info.user = "";
|
|
info.host = "";
|
|
info.port = 0;
|
|
|
|
float save_time = m_qrInfo.queue_time;
|
|
|
|
memset(&m_qrInfo, 0, sizeof(m_qrInfo));
|
|
|
|
m_qrInfo.queue_time = save_time;
|
|
|
|
IDatabase *pDatabase = g_Sqlite.Connect(&info, &m_qrInfo.amxinfo.info.errorcode, m_qrInfo.amxinfo.error, 254);
|
|
IQuery *pQuery = NULL;
|
|
if (!pDatabase)
|
|
{
|
|
m_qrInfo.connect_success = false;
|
|
m_qrInfo.query_success = false;
|
|
} else {
|
|
m_qrInfo.connect_success = true;
|
|
pQuery = pDatabase->PrepareQuery(m_query.c_str());
|
|
if (!pQuery->Execute2(&m_qrInfo.amxinfo.info, m_qrInfo.amxinfo.error, 254))
|
|
{
|
|
m_qrInfo.query_success = false;
|
|
} else {
|
|
m_qrInfo.query_success = true;
|
|
}
|
|
}
|
|
|
|
if (m_qrInfo.query_success && m_qrInfo.amxinfo.info.rs)
|
|
{
|
|
m_atomicResult.CopyFrom(m_qrInfo.amxinfo.info.rs);
|
|
m_qrInfo.amxinfo.info.rs = &m_atomicResult;
|
|
}
|
|
|
|
if (pQuery)
|
|
{
|
|
m_qrInfo.amxinfo.pQuery = pQuery;
|
|
} else {
|
|
m_qrInfo.amxinfo.opt_ptr = new char[m_query.size() + 1];
|
|
strcpy(m_qrInfo.amxinfo.opt_ptr, m_query.c_str());
|
|
}
|
|
|
|
if (pDatabase)
|
|
{
|
|
pDatabase->FreeHandle();
|
|
pDatabase = NULL;
|
|
}
|
|
}
|
|
|
|
void MysqlThread::Invalidate()
|
|
{
|
|
m_atomicResult.FreeHandle();
|
|
}
|
|
|
|
void MysqlThread::OnTerminate(IThreadHandle *pHandle, bool cancel)
|
|
{
|
|
if (cancel)
|
|
{
|
|
Invalidate();
|
|
g_QueueLock->Lock();
|
|
g_FreeThreads.push(this);
|
|
g_QueueLock->Unlock();
|
|
} else {
|
|
g_QueueLock->Lock();
|
|
g_ThreadQueue.push(this);
|
|
g_QueueLock->Unlock();
|
|
}
|
|
}
|
|
|
|
void NullFunc(void *ptr, unsigned int num)
|
|
{
|
|
}
|
|
|
|
//public QueryHandler(state, Handle:query, error[], errnum, data[], size)
|
|
void MysqlThread::Execute()
|
|
{
|
|
cell data_addr;
|
|
if (m_datalen)
|
|
{
|
|
data_addr = MF_PrepareCellArray(m_data, m_datalen);
|
|
} else {
|
|
static cell tmpdata[1] = {0};
|
|
data_addr = MF_PrepareCellArray(tmpdata, 1);
|
|
}
|
|
int state = 0;
|
|
if (!m_qrInfo.connect_success)
|
|
{
|
|
state = -2;
|
|
} else if (!m_qrInfo.query_success) {
|
|
state = -1;
|
|
}
|
|
float diff = gpGlobals->time - m_qrInfo.queue_time;
|
|
cell c_diff = amx_ftoc(diff);
|
|
unsigned int hndl = MakeHandle(&m_qrInfo.amxinfo, Handle_Query, NullFunc);
|
|
if (state != 0)
|
|
{
|
|
MF_ExecuteForward(m_fwd,
|
|
(cell)state,
|
|
(cell)hndl,
|
|
m_qrInfo.amxinfo.error,
|
|
m_qrInfo.amxinfo.info.errorcode,
|
|
data_addr,
|
|
m_datalen,
|
|
c_diff);
|
|
} else {
|
|
MF_ExecuteForward(m_fwd,
|
|
(cell)0,
|
|
(cell)hndl,
|
|
"",
|
|
(cell)0,
|
|
data_addr,
|
|
m_datalen,
|
|
c_diff);
|
|
}
|
|
FreeHandle(hndl);
|
|
if (m_qrInfo.amxinfo.pQuery)
|
|
{
|
|
m_qrInfo.amxinfo.pQuery->FreeHandle();
|
|
m_qrInfo.amxinfo.pQuery = NULL;
|
|
}
|
|
delete [] m_qrInfo.amxinfo.opt_ptr;
|
|
m_qrInfo.amxinfo.opt_ptr = NULL;
|
|
}
|
|
|
|
/*****************
|
|
* METAMOD STUFF *
|
|
*****************/
|
|
|
|
void OnPluginsLoaded()
|
|
{
|
|
if (g_pWorker)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!g_QueueLock)
|
|
{
|
|
g_QueueLock = g_Threader.MakeMutex();
|
|
}
|
|
|
|
g_pWorker = new ThreadWorker(&g_Threader, 250);
|
|
if (!g_pWorker->Start())
|
|
{
|
|
delete g_pWorker;
|
|
g_pWorker = NULL;
|
|
}
|
|
g_pFunctionTable->pfnSpawn = NULL;
|
|
|
|
g_lasttime = 0.0f;
|
|
|
|
return;
|
|
}
|
|
|
|
void StartFrame()
|
|
{
|
|
if (g_pWorker && (g_lasttime < gpGlobals->time))
|
|
{
|
|
g_lasttime = gpGlobals->time + 0.05f;
|
|
g_QueueLock->Lock();
|
|
size_t remaining = g_ThreadQueue.size();
|
|
if (remaining)
|
|
{
|
|
MysqlThread *kmThread;
|
|
do
|
|
{
|
|
kmThread = g_ThreadQueue.front();
|
|
g_ThreadQueue.pop();
|
|
g_QueueLock->Unlock();
|
|
kmThread->Execute();
|
|
kmThread->Invalidate();
|
|
g_FreeThreads.push(kmThread);
|
|
g_QueueLock->Lock();
|
|
} while (!g_ThreadQueue.empty());
|
|
}
|
|
|
|
g_QueueLock->Unlock();
|
|
}
|
|
|
|
RETURN_META(MRES_IGNORED);
|
|
}
|
|
|
|
void OnPluginsUnloading()
|
|
{
|
|
if (!g_pWorker)
|
|
{
|
|
return;
|
|
}
|
|
|
|
g_pWorker->Stop(false);
|
|
delete g_pWorker;
|
|
g_pWorker = NULL;
|
|
|
|
g_QueueLock->Lock();
|
|
size_t remaining = g_ThreadQueue.size();
|
|
if (remaining)
|
|
{
|
|
MysqlThread *kmThread;
|
|
do
|
|
{
|
|
kmThread = g_ThreadQueue.front();
|
|
g_ThreadQueue.pop();
|
|
g_QueueLock->Unlock();
|
|
kmThread->Execute();
|
|
kmThread->Invalidate();
|
|
g_FreeThreads.push(kmThread);
|
|
g_QueueLock->Lock();
|
|
} while (!g_ThreadQueue.empty());
|
|
}
|
|
|
|
g_QueueLock->Unlock();
|
|
}
|
|
|
|
/***********************
|
|
* ATOMIC RESULT STUFF *
|
|
***********************/
|
|
|
|
AtomicResult::AtomicResult()
|
|
{
|
|
m_IsFree = true;
|
|
m_CurRow = 1;
|
|
m_RowCount = 0;
|
|
m_Table = NULL;
|
|
m_AllocSize = 0;
|
|
}
|
|
|
|
AtomicResult::~AtomicResult()
|
|
{
|
|
if (!m_IsFree)
|
|
{
|
|
FreeHandle();
|
|
}
|
|
|
|
for (size_t i=0; i<=m_AllocSize; i++)
|
|
{
|
|
delete m_Table[i];
|
|
}
|
|
|
|
delete [] m_Table;
|
|
|
|
m_Table = NULL;
|
|
m_IsFree = true;
|
|
}
|
|
|
|
unsigned int AtomicResult::RowCount()
|
|
{
|
|
return m_RowCount;
|
|
}
|
|
|
|
bool AtomicResult::IsNull(unsigned int columnId)
|
|
{
|
|
return (GetString(columnId) == NULL);
|
|
}
|
|
|
|
unsigned int AtomicResult::FieldCount()
|
|
{
|
|
return m_FieldCount;
|
|
}
|
|
|
|
bool AtomicResult::FieldNameToNum(const char *name, unsigned int *columnId)
|
|
{
|
|
for (unsigned int i=0; i<m_FieldCount; i++)
|
|
{
|
|
assert(m_Table[i] != NULL);
|
|
if (strcmp(m_Table[i]->c_str(), name) == 0)
|
|
{
|
|
if (columnId)
|
|
{
|
|
*columnId = i;
|
|
}
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
const char *AtomicResult::FieldNumToName(unsigned int num)
|
|
{
|
|
if (num >= m_FieldCount)
|
|
return NULL;
|
|
|
|
assert(m_Table[num] != NULL);
|
|
|
|
return m_Table[num]->c_str();
|
|
}
|
|
|
|
double AtomicResult::GetDouble(unsigned int columnId)
|
|
{
|
|
return atof(GetStringSafe(columnId));
|
|
}
|
|
|
|
float AtomicResult::GetFloat(unsigned int columnId)
|
|
{
|
|
return atof(GetStringSafe(columnId));
|
|
}
|
|
|
|
int AtomicResult::GetInt(unsigned int columnId)
|
|
{
|
|
return atoi(GetStringSafe(columnId));
|
|
}
|
|
|
|
const char *AtomicResult::GetRaw(unsigned int columnId, size_t *length)
|
|
{
|
|
//we don't support this yet...
|
|
*length = 0;
|
|
return "";
|
|
}
|
|
|
|
const char *AtomicResult::GetStringSafe(unsigned int columnId)
|
|
{
|
|
const char *str = GetString(columnId);
|
|
|
|
return str ? str : "";
|
|
}
|
|
|
|
const char *AtomicResult::GetString(unsigned int columnId)
|
|
{
|
|
if (columnId >= m_FieldCount)
|
|
return NULL;
|
|
|
|
size_t idx = (m_CurRow * m_FieldCount) + columnId;
|
|
|
|
assert(m_Table[idx] != NULL);
|
|
|
|
return m_Table[idx]->c_str();
|
|
}
|
|
|
|
IResultRow *AtomicResult::GetRow()
|
|
{
|
|
return static_cast<IResultRow *>(this);
|
|
}
|
|
|
|
bool AtomicResult::IsDone()
|
|
{
|
|
if (m_CurRow > m_RowCount)
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
void AtomicResult::NextRow()
|
|
{
|
|
m_CurRow++;
|
|
}
|
|
|
|
void AtomicResult::_InternalClear()
|
|
{
|
|
if (m_IsFree)
|
|
return;
|
|
|
|
m_IsFree = true;
|
|
}
|
|
|
|
void AtomicResult::FreeHandle()
|
|
{
|
|
_InternalClear();
|
|
}
|
|
|
|
void AtomicResult::CopyFrom(IResultSet *rs)
|
|
{
|
|
if (!m_IsFree)
|
|
{
|
|
_InternalClear();
|
|
}
|
|
|
|
m_IsFree = false;
|
|
|
|
m_FieldCount = rs->FieldCount();
|
|
m_RowCount = rs->RowCount();
|
|
m_CurRow = 1;
|
|
|
|
size_t newTotal = (m_RowCount * m_FieldCount) + m_FieldCount;
|
|
if (newTotal > m_AllocSize)
|
|
{
|
|
SourceHook::String **table = new SourceHook::String *[newTotal];
|
|
memset(table, 0, newTotal * sizeof(SourceHook::String *));
|
|
if (m_Table)
|
|
{
|
|
memcpy(table, m_Table, m_AllocSize * sizeof(SourceHook::String *));
|
|
delete [] m_Table;
|
|
}
|
|
m_Table = table;
|
|
m_AllocSize = newTotal;
|
|
}
|
|
|
|
for (unsigned int i=0; i<m_FieldCount; i++)
|
|
{
|
|
if (m_Table[i])
|
|
{
|
|
m_Table[i]->assign(rs->FieldNumToName(i));
|
|
} else {
|
|
m_Table[i] = new SourceHook::String(rs->FieldNumToName(i));
|
|
}
|
|
}
|
|
|
|
IResultRow *row;
|
|
unsigned int idx = m_FieldCount;
|
|
while (!rs->IsDone())
|
|
{
|
|
row = rs->GetRow();
|
|
for (unsigned int i=0; i<m_FieldCount; i++,idx++)
|
|
{
|
|
if (m_Table[idx])
|
|
{
|
|
m_Table[idx]->assign(row->GetString(i));
|
|
} else {
|
|
m_Table[idx] = new SourceHook::String(row->GetString(i));
|
|
}
|
|
}
|
|
rs->NextRow();
|
|
}
|
|
}
|
|
|
|
void AtomicResult::Rewind()
|
|
{
|
|
m_CurRow = 1;
|
|
}
|
|
|
|
bool AtomicResult::NextResultSet()
|
|
{
|
|
return false;
|
|
}
|
|
|
|
AMX_NATIVE_INFO g_ThreadSqlNatives[] =
|
|
{
|
|
{"SQL_ThreadQuery", SQL_ThreadQuery},
|
|
{NULL, NULL},
|
|
};
|