Ai
1 Star 0 Fork 0

wangcichen/urbackup_backend

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
Server.cpp 51.89 KB
一键复制 编辑 原始数据 按行查看 历史
Martin 提交于 2021-06-21 00:28 +08:00 . Revert "Fix init issue"
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2011-2016 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "vld.h"
#ifdef _WIN32
#define _CRT_RAND_S
#include <ws2tcpip.h>
#endif
#include <iostream>
#include <fstream>
#include <algorithm>
#include <memory.h>
#include <assert.h>
#ifndef _WIN32
#include <errno.h>
#endif
#include "libfastcgi/fastcgi.hpp"
#include "Interface/PluginMgr.h"
#include "Interface/Thread.h"
#include "Interface/DatabaseFactory.h"
#include "Interface/WebSocket.h"
#include "Server.h"
#include "Template.h"
#include "stringtools.h"
#include "defaults.h"
#include "SessionMgr.h"
#include "md5.h"
#include "ServiceAcceptor.h"
#include "FileSettingsReader.h"
#include "DBSettingsReader.h"
#include "StreamPipe.h"
#include "ThreadPool.h"
#include "file.h"
#include "file_memory.h"
#include "utf8/utf8.h"
#include "MemoryPipe.h"
#include "MemorySettingsReader.h"
#include "Database.h"
#include "SQLiteFactory.h"
#include "PipeThrottler.h"
#include "mt19937ar.h"
#include "Query.h"
#ifdef _WIN32
#include "SChannelPipe.h"
#endif
#ifdef WITH_OPENSSL
#include "OpenSSLPipe.h"
#endif
#ifdef _WIN32
#include <condition_variable>
#include "Mutex_std.h"
#include "Condition_std.h"
#include "SharedMutex_std.h"
#else
#include <pthread.h>
#include "Mutex_lin.h"
#include "Condition_lin.h"
#include "SharedMutex_lin.h"
#endif
#ifdef _WIN32
# include <windows.h>
# include "Helper_win32.h"
# include <time.h>
#else
# include <ctime>
# include <sys/time.h>
# include <unistd.h>
# include <sys/types.h>
# include <pwd.h>
# include "config.h"
#endif
#include "StaticPluginRegistration.h"
#ifdef __MACH__
#include <mach/clock.h>
#include <mach/mach.h>
#endif
#ifdef HAVE_SYS_RANDOM_H
#if defined(__FreeBSD__)
extern "C" {
#include <sys/random.h>
}
#else
#include <sys/random.h>
#endif //__FreeBSD__
#endif
const size_t SEND_BLOCKSIZE=8192;
const size_t MAX_THREAD_ID=std::string::npos;
namespace
{
//GetTickCount64 for Windows Server 2003
#ifdef _WIN32
typedef ULONGLONG(WINAPI GetTickCount64_t)(VOID);
GetTickCount64_t* GetTickCount64_fun = NULL;
void initialize_GetTickCount64()
{
HMODULE hKernel32 = GetModuleHandleW(L"Kernel32.dll");
if(hKernel32)
{
GetTickCount64_fun = reinterpret_cast<GetTickCount64_t*> (GetProcAddress(hKernel32, "GetTickCount64"));
}
}
#endif
#ifdef _WIN32
//from https://msdn.microsoft.com/en-us/library/xcb2z8hs.aspx
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push,8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void SetThreadName(DWORD dwThreadID, const char* threadName) {
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
#pragma warning(push)
#pragma warning(disable: 6320 6322)
__try {
RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
}
__except (EXCEPTION_EXECUTE_HANDLER) {
}
#pragma warning(pop)
}
#endif
}
extern bool run;
CServer::CServer()
{
curr_thread_id=0;
curr_pluginid=0;
curr_postfilekey=0;
loglevel=LL_INFO;
logfile_a=false;
circular_log_buffer_id=0;
circular_log_buffer_idx=0;
has_circular_log_buffer=false;
failbits=0;
startup_complete=false;
log_mutex=createMutex();
action_mutex=createMutex();
web_socket_mutex = createMutex();
requests_mutex=createMutex();
outputs_mutex=createMutex();
db_mutex=createMutex();
thread_mutex=createMutex();
plugin_mutex=createMutex();
rps_mutex=createMutex();
postfiles_mutex=createMutex();
param_mutex=createMutex();
startup_complete_mutex=createMutex();
startup_complete_cond=createCondition();
rnd_mutex=createMutex();
initRandom(static_cast<unsigned int>(time(0)));
initRandom(getSecureRandomNumber());
log_rotation_size = std::string::npos;
log_rotation_files = 10;
if(FileExists("large_log_rotation"))
{
log_rotation_files = 100;
}
log_console_time = true;
#ifdef _WIN32
initialize_GetTickCount64();
log_rotation_size = 20*1024*1024; //20MB
send_window_size = -1;
recv_window_size = -1;
#endif
}
void CServer::setup(void)
{
sessmgr=new CSessionMgr();
threadpool=new CThreadPool(std::string::npos, 2, "idle pool thread");
#ifndef NO_SQLITE
CDatabase::initMutex();
registerDatabaseFactory("sqlite", new SQLiteFactory );
#endif
CQuery::init_mutex();
#ifdef MODE_WIN
File::init_mutex();
#endif
#ifdef _WIN32
SChannelPipe::init();
#endif
#ifdef WITH_OPENSSL
OpenSSLPipe::init();
#endif
#ifdef ENABLE_C_ARES
LookupInit();
#endif
}
void CServer::destroyAllDatabases(void)
{
IScopedLock lock(db_mutex);
for(std::map<DATABASE_ID, SDatabase* >::iterator i=databases.begin();
i!=databases.end();++i)
{
for( std::map<THREAD_ID, IDatabaseInt*>::iterator j=i->second->tmap.begin();j!=i->second->tmap.end();++j)
{
delete j->second;
}
i->second->tmap.clear();
}
}
void CServer::destroyDatabases(THREAD_ID tid)
{
IScopedLock lock(db_mutex);
for(std::map<DATABASE_ID, SDatabase* >::iterator i=databases.begin();
i!=databases.end();++i)
{
std::map<THREAD_ID, IDatabaseInt*>::iterator iter=i->second->tmap.find(tid);
if(iter!=i->second->tmap.end())
{
delete iter->second;
i->second->tmap.erase(iter);
}
}
}
CServer::~CServer()
{
if(getServerParameter("leak_check")!="true") //minimal cleanup
{
return;
}
Log("deleting stream services...");
//Delete extra Services
for(size_t i=0;i<stream_services.size();++i)
{
delete stream_services[i];
}
Log("deleting plugins...");
//delete Plugins
for(std::map<PLUGIN_ID, std::pair<IPluginMgr*,str_map> >::iterator iter1=perthread_pluginparams.begin();
iter1!=perthread_pluginparams.end();++iter1)
{
std::map<PLUGIN_ID, std::map<THREAD_ID, IPlugin*> >::iterator iter2=perthread_plugins.find( iter1->first );
if( iter2!=perthread_plugins.end() )
{
std::map<THREAD_ID, IPlugin*> *pmap=&iter2->second;
for( std::map<THREAD_ID, IPlugin*>::iterator iter3=pmap->begin();iter3!=pmap->end();++iter3 )
{
iter1->second.first->destroyPluginInstance( iter3->second );
}
}
}
Log("Deleting threadsafe plugins...");
for(std::map<PLUGIN_ID, IPlugin*>::iterator iter1=threadsafe_plugins.begin();iter1!=threadsafe_plugins.end();++iter1)
{
iter1->second->Remove();
}
Log("Deleting pluginmanagers...");
for(std::map<std::string, IPluginMgr*>::iterator iter=perthread_pluginmgrs.begin();iter!=perthread_pluginmgrs.end();++iter)
{
iter->second->Remove();
}
for(std::map<std::string, IPluginMgr*>::iterator iter=threadsafe_pluginmgrs.begin();iter!=threadsafe_pluginmgrs.end();++iter)
{
iter->second->Remove();
}
Log("Deleting actions...");
for(std::map< std::string, std::map<std::string, IAction*> >::iterator iter1=actions.begin();iter1!=actions.end();++iter1)
{
for(std::map<std::string, IAction*>::iterator iter2=iter1->second.begin();iter2!=iter1->second.end();++iter2)
{
iter2->second->Remove();
}
}
actions.clear();
Log("Deleting sessmgr...");
delete sessmgr;
Log("Shutting down ThreadPool...");
threadpool->Shutdown();
delete threadpool;
Log("destroying databases...");
//Destroy Databases
destroyAllDatabases();
Log("deleting database factories...");
for(std::map<std::string, IDatabaseFactory*>::iterator it=database_factories.begin();it!=database_factories.end();++it)
{
it->second->Remove();
}
Log("unloading dlls...");
UnloadDLLs();
Log("Destroying mutexes");
destroy(log_mutex);
destroy(action_mutex);
destroy(web_socket_mutex);
destroy(requests_mutex);
destroy(outputs_mutex);
destroy(db_mutex);
destroy(thread_mutex);
destroy(plugin_mutex);
destroy(rps_mutex);
destroy(postfiles_mutex);
destroy(param_mutex);
destroy(startup_complete_mutex);
destroy(startup_complete_cond);
destroy(rnd_mutex);
#ifndef NO_SQLITE
CDatabase::destroyMutex();
#endif
#ifdef MODE_WIN
File::destroy_mutex();
#endif
std::cout << "Server cleanup done..." << std::endl;
}
void CServer::setServerParameters(const str_map &pServerParams)
{
IScopedLock lock(param_mutex);
server_params=pServerParams;
}
std::string CServer::getServerParameter(const std::string &key)
{
return getServerParameter(key, "");
}
std::string CServer::getServerParameter(const std::string &key, const std::string &def)
{
IScopedLock lock(param_mutex);
str_map::iterator iter=server_params.find(key);
if( iter!=server_params.end() )
{
return iter->second;
}
else
return def;
}
void CServer::setServerParameter(const std::string &key, const std::string &value)
{
IScopedLock lock(param_mutex);
server_params[key]=value;
}
void CServer::Log( const std::string &pStr, int LogLevel)
{
if( loglevel <=LogLevel )
{
IScopedLock lock(log_mutex);
time_t rawtime;
char buffer [100];
time ( &rawtime );
#ifdef _WIN32
struct tm timeinfo;
localtime_s(&timeinfo, &rawtime);
strftime (buffer,100,"%Y-%m-%d %X: ",&timeinfo);
#else
struct tm *timeinfo;
timeinfo = localtime ( &rawtime );
strftime (buffer,100,"%Y-%m-%d %X: ",timeinfo);
#endif
if(log_console_time)
{
std::cout << buffer;
}
if( LogLevel==LL_ERROR )
{
std::cout << "ERROR: " << pStr << std::endl;
if(logfile_a)
logfile << buffer << "ERROR: " << pStr << std::endl;
}
else if( LogLevel==LL_WARNING )
{
std::cout << "WARNING: " << pStr << std::endl;
if(logfile_a)
logfile<< buffer << "WARNING: " << pStr << std::endl;
}
else
{
std::cout << pStr << std::endl;
if(logfile_a)
logfile << buffer << pStr << std::endl;
}
if(logfile_a)
{
logfile.flush();
rotateLogfile();
}
if(has_circular_log_buffer)
{
logToCircularBuffer(pStr, LogLevel);
}
}
else if(has_circular_log_buffer)
{
IScopedLock lock(log_mutex);
logToCircularBuffer(pStr, LogLevel);
}
}
void CServer::setLogRotationFiles(size_t n)
{
log_rotation_files = n;
}
void CServer::rotateLogfile()
{
if(static_cast<size_t>(logfile.tellp())>log_rotation_size)
{
logfile.close();
logfile_a=false;
for(size_t i=log_rotation_files-1;i>0;--i)
{
rename((logfile_fn+"."+convert(i)).c_str(), (logfile_fn+"."+convert(i+1)).c_str());
}
deleteFile(logfile_fn+"."+convert(log_rotation_files));
rename(logfile_fn.c_str(), (logfile_fn+".1").c_str());
setLogFile(logfile_fn, logfile_chown_user);
}
}
void CServer::setLogFile(const std::string &plf, std::string chown_user)
{
IScopedLock lock(log_mutex);
if(logfile_a)
{
logfile.close();
logfile_a=false;
}
logfile_fn=plf;
logfile_chown_user=chown_user;
logfile.open( logfile_fn.c_str(), std::ios::app | std::ios::out | std::ios::binary );
if(logfile.is_open() )
{
#ifndef _WIN32
if(!chown_user.empty())
{
char buf[1000];
passwd pwbuf;
passwd *pw;
int rc=getpwnam_r(chown_user.c_str(), &pwbuf, buf, 1000, &pw);
if(pw!=NULL)
{
chown(plf.c_str(), pw->pw_uid, pw->pw_gid);
}
else
{
Server->Log("Unable to change logfile ownership", LL_ERROR);
}
}
#endif
logfile_a=true;
}
}
void CServer::setLogLevel(int LogLevel)
{
IScopedLock lock(log_mutex);
loglevel=LogLevel;
}
THREAD_ID CServer::Execute(const std::string &action, const std::string &context, str_map &GET, str_map &POST, str_map &PARAMS, IOutputStream *req)
{
IAction *action_ptr=NULL;
{
IScopedLock lock(action_mutex);
std::map<std::string, std::map<std::string, IAction*> >::iterator iter1=actions.find( context );
if( iter1!=actions.end() )
{
std::map<std::string, IAction*>::iterator iter2=iter1->second.find(action);
if( iter2!=iter1->second.end() )
action_ptr=iter2->second;
}
}
if( action_ptr!=NULL )
{
THREAD_ID tid=getThreadID();
IOutputStream *current_req=NULL;
{
IScopedLock lock(requests_mutex);
std::map<THREAD_ID, IOutputStream*>::iterator iter=current_requests.find(tid);
if( iter!=current_requests.end() )
{
current_req=iter->second;
iter->second=req;
}
else
{
current_requests.insert(std::pair<THREAD_ID, IOutputStream*>(tid, req) );
}
}
{
IScopedLock lock(outputs_mutex);
current_outputs[tid]=std::pair<bool, std::string>(false, "");
}
action_ptr->Execute( GET, POST, tid, PARAMS);
clearDatabases(tid);
bool b = WriteRaw(tid, NULL, 0, false);
if( current_req!=NULL )
{
IScopedLock lock(requests_mutex);
current_requests[tid]=current_req;
}
if(!b)
{
throw std::runtime_error("OutputStream error");
}
return tid;
}
return 0;
}
std::string CServer::Execute(const std::string &action, const std::string &context, str_map &GET, str_map &POST, str_map &PARAMS)
{
CStringOutputStream cos;
Execute(action, context, GET, POST, PARAMS, &cos);
return cos.getData();
}
void CServer::AddAction(IAction *action)
{
IScopedLock lock(action_mutex);
std::map<std::string, IAction*> *ptr=&actions[action_context];
ptr->insert( std::pair<std::string, IAction*>(action->getName(), action ) );
}
bool CServer::RemoveAction(IAction *action)
{
IScopedLock lock(action_mutex);
std::map<std::string, std::map<std::string, IAction*> >::iterator iter1=actions.find(action_context);
if( iter1!=actions.end() )
{
std::map<std::string, IAction*>::iterator iter2=iter1->second.find( action->getName() );
if( iter2!=iter1->second.end() )
{
iter1->second.erase( iter2 );
return true;
}
}
return false;
}
void CServer::setActionContext(std::string context)
{
action_context=context;
}
void CServer::resetActionContext(void)
{
action_context.clear();
}
int64 CServer::getTimeSeconds(void)
{
#ifdef _WIN32
SYSTEMTIME st;
GetSystemTime(&st);
return unix_timestamp(&st);
#else
timeval t;
gettimeofday(&t,NULL);
return t.tv_sec;
#endif
}
int64 CServer::getTimeMS(void)
{
#ifdef _WIN32
if(GetTickCount64_fun)
{
return GetTickCount64_fun();
}
else
{
return GetTickCount();
}
#else
//return (unsigned int)(((double)clock()/(double)CLOCKS_PER_SEC)*1000.0);
/*
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC);
static boost::int_fast64_t start_t=xt.sec;
xt.sec-=start_t;
unsigned int t=xt.sec*1000+(unsigned int)((double)xt.nsec/1000000.0);
return t;*/
/*timeval tp;
gettimeofday(&tp, NULL);
static long start_t=tp.tv_sec;
tp.tv_sec-=start_t;
return tp.tv_sec*1000+tp.tv_usec/1000;
*/
#ifdef __APPLE__
clock_serv_t cclock;
mach_timespec_t mts;
host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock);
clock_get_time(cclock, &mts);
mach_port_deallocate(mach_task_self(), cclock);
return static_cast<int64>(mts.tv_sec) * 1000 + mts.tv_nsec / 1000000;
#else
timespec tp;
if(clock_gettime(CLOCK_MONOTONIC, &tp)!=0)
{
timeval tv;
gettimeofday(&tv, NULL);
static long start_t=tv.tv_sec;
tv.tv_sec-=start_t;
return tv.tv_sec*1000+tv.tv_usec/1000;
}
return static_cast<int64>(tp.tv_sec)*1000+tp.tv_nsec/1000000;
#endif //__APPLE__
#endif
}
bool CServer::WriteRaw(THREAD_ID tid, const char *buf, size_t bsize, bool cached)
{
bool ret=true;
if( cached==false )
{
IOutputStream* req=NULL;
{
IScopedLock lock(requests_mutex);
std::map<THREAD_ID, IOutputStream*>::iterator iter=current_requests.find( tid );
if( iter!=current_requests.end() )
req=iter->second;
}
if( req!=NULL )
{
{
IScopedLock lock(outputs_mutex);
std::pair<bool, std::string> *co=&current_outputs[tid];
std::string *curr_output=&co->second;
bool sent=co->first;
if( sent==false && next(*curr_output,0,"Content-type: ")==false )
{
curr_output->insert(0, "Content-type: "+DEFAULT_CONTENTTYPE+"\r\n\r\n");
co->first=true;
}
try
{
if(curr_output->size()<SEND_BLOCKSIZE)
{
req->write(*curr_output);
}
else
{
for(size_t i=0,size=curr_output->size();i<size;i+=SEND_BLOCKSIZE)
{
req->write(&(*curr_output)[i], (std::min)(SEND_BLOCKSIZE, size-i) );
}
}
}
catch(std::exception&)
{
Server->Log("Sending data failed", LL_INFO);
ret=false;
}
curr_output->clear();
}
try
{
if( bsize>0 && bsize<SEND_BLOCKSIZE)
{
req->write(buf, bsize);
}
else if(bsize>0 )
{
for(size_t i=0,size=bsize;i<size;i+=SEND_BLOCKSIZE)
{
req->write(&buf[i], (std::min)(SEND_BLOCKSIZE, size-i) );
}
}
}
catch(std::exception&)
{
Server->Log("Sending data failed", LL_INFO);
ret=false;
}
}
else
Log("Couldn't find THREAD_ID - cached=true", LL_ERROR);
}
else
{
if( bsize>0 )
{
IScopedLock lock(outputs_mutex);
std::map<THREAD_ID, std::pair<bool, std::string> >::iterator iter=current_outputs.find( tid );
if( iter!=current_outputs.end() )
{
iter->second.second.append(buf, bsize);
}
else
{
Log("Couldn't find THREAD_ID - cached=false", LL_ERROR);
ret=false;
}
}
}
return ret;
}
bool CServer::Write(THREAD_ID tid, const std::string &str, bool cached)
{
return WriteRaw(tid, str.c_str(), str.size(), cached);
}
bool CServer::UnloadDLLs(void)
{
UnloadDLLs2();
return true;
}
void CServer::ShutdownPlugins(void)
{
for(std::map<std::string, UNLOADACTIONS>::iterator iter=unload_functs.begin();
iter!=unload_functs.end();++iter)
{
if(iter->second!=NULL)
iter->second();
}
unload_functs.clear();
}
bool CServer::UnloadDLL(const std::string &name)
{
std::map<std::string, UNLOADACTIONS>::iterator iter=unload_functs.find(name);
if(iter!=unload_functs.end() )
{
if( iter->second!=NULL )
{
iter->second();
unload_functs.erase(iter);
}
return true;
}
return false;
}
void CServer::destroy(IObject *obj)
{
if(obj!=NULL)
{
obj->Remove();
}
}
ITemplate* CServer::createTemplate(std::string pFile)
{
return new CTemplate(pFile);
}
THREAD_ID CServer::getThreadID(void)
{
#ifdef _WIN32
IScopedLock lock(thread_mutex);
std::thread::id ct = std::this_thread::get_id();
std::map<std::thread::id, THREAD_ID>::iterator iter=threads.find(ct);
if(iter!=threads.end() )
{
return iter->second;
}
++curr_thread_id;
if( curr_thread_id>=MAX_THREAD_ID )
curr_thread_id=0;
threads.insert( std::pair<std::thread::id, THREAD_ID>( ct, curr_thread_id) );
return curr_thread_id;
#else //_WIN32
IScopedLock lock(thread_mutex);
pthread_t ct=pthread_self();
std::map<pthread_t, THREAD_ID>::iterator iter=threads.find(ct);
if(iter!=threads.end() )
{
return iter->second;
}
++curr_thread_id;
if( curr_thread_id>=MAX_THREAD_ID )
curr_thread_id=0;
threads.insert( std::pair<pthread_t, THREAD_ID>( ct, curr_thread_id) );
return curr_thread_id;
#endif //_WIN32
}
bool CServer::openDatabase(std::string pFile, DATABASE_ID pIdentifier, const str_map& params, std::string pEngine)
{
IScopedLock lock(db_mutex);
std::map<DATABASE_ID, SDatabase* >::iterator iter=databases.find(pIdentifier);
if( iter!=databases.end() )
{
Log("Database already openend", LL_ERROR);
return false;
}
std::map<std::string, IDatabaseFactory*>::iterator iter2=database_factories.find(pEngine);
if(iter2==database_factories.end())
{
Log("Database engine not found", LL_ERROR);
return false;
}
SDatabase* ndb= new SDatabase(iter2->second, pFile);
ndb->single_user_mutex.reset(createSharedMutex());
ndb->lock_mutex.reset(createMutex());
ndb->lock_count.reset(new int);
*ndb->lock_count = 0;
ndb->unlock_cond.reset(createCondition());
ndb->params = params;
databases.insert( std::pair<DATABASE_ID, SDatabase* >(pIdentifier, ndb) );
return true;
}
IDatabase* CServer::getDatabase(THREAD_ID tid, DATABASE_ID pIdentifier)
{
IScopedLock lock(db_mutex);
std::map<DATABASE_ID, SDatabase* >::iterator database_iter=databases.find(pIdentifier);
if( database_iter==databases.end() )
{
Log("Database with identifier \""+convert((int)pIdentifier)+"\" couldn't be opened", LL_ERROR);
return NULL;
}
std::map<THREAD_ID, IDatabaseInt*>::iterator thread_iter=database_iter->second->tmap.find( tid );
if( thread_iter==database_iter->second->tmap.end() )
{
IDatabaseInt *db=database_iter->second->factory->createDatabase();
std::pair<std::map<THREAD_ID, IDatabaseInt*>::iterator, bool> ins =
database_iter->second->tmap.insert(std::pair< THREAD_ID, IDatabaseInt* >(tid, db));
SDatabase* params = database_iter->second;
lock.relock(NULL);
if(!db->Open(params->file, params->attach,
params->allocation_chunk_size, params->single_user_mutex.get(),
params->lock_mutex.get(), params->lock_count.get(), params->unlock_cond.get(),
params->params) )
{
destroy(db);
Log("Database \""+database_iter->second->file+"\" couldn't be opened", LL_ERROR);
lock.relock(db_mutex);
database_iter->second->tmap.erase(ins.first);
return NULL;
}
return db;
}
else
{
return thread_iter->second;
}
}
void CServer::clearDatabases(THREAD_ID tid)
{
IScopedLock lock(db_mutex);
for(std::map<DATABASE_ID, SDatabase* >::iterator i=databases.begin();
i!=databases.end();++i)
{
std::map<THREAD_ID, IDatabaseInt*>::iterator iter=i->second->tmap.find(tid);
if( iter!=i->second->tmap.end() )
{
iter->second->destroyAllQueries();
}
}
}
void CServer::setContentType(THREAD_ID tid, const std::string &str)
{
{
IScopedLock lock(outputs_mutex);
std::pair<bool, std::string> *co=&current_outputs[tid];
std::string *curr_output=&co->second;
if( curr_output->find("Content-type: ")==0 )
{
*curr_output=strdelete("Content-type: "+getbetween("Content-type: ","\r\n\r\n", *curr_output)+"\r\n\r\n", *curr_output);
}
if(curr_output->find("\r\n\r\n")!=std::string::npos )
{
curr_output->insert(0, "Content-type: "+str+"\r\n");
}
else
{
curr_output->insert(0, "Content-type: "+str+"\r\n\r\n");
}
co->first=true;
}
}
void CServer::addHeader(THREAD_ID tid, const std::string &str)
{
{
IScopedLock lock(outputs_mutex);
std::pair<bool, std::string> *co=&current_outputs[tid];
std::string *curr_output=&co->second;
std::string tadd=str;
if( curr_output->find("\r\n\r\n")!=std::string::npos )
{
tadd+="\r\n";
}
else
{
tadd+="\r\n\r\n";
}
curr_output->insert(0, tadd);
co->first=true;
}
}
ISessionMgr *CServer::getSessionMgr(void)
{
return sessmgr;
}
std::string CServer::GenerateHexMD5(const std::string &input)
{
MD5 md((unsigned char*)input.c_str() );
return md.hex_digest();
}
std::string CServer::GenerateBinaryMD5(const std::string &input)
{
MD5 md((unsigned char*)input.c_str(), static_cast<unsigned int>(input.size()));
unsigned char *p=md.raw_digest_int();
std::string ret(reinterpret_cast<char*>(p), reinterpret_cast<char*>(p)+16);
return ret;
}
void CServer::StartCustomStreamService(IService *pService, std::string pServiceName, unsigned short pPort, int pMaxClientsPerThread, IServer::BindTarget bindTarget)
{
CServiceAcceptor *acc=new CServiceAcceptor(pService, pServiceName, pPort, pMaxClientsPerThread, bindTarget);
Server->createThread(acc, pServiceName+": accept");
stream_services.push_back( acc );
}
IPipe* CServer::ConnectStream(std::string pServer, unsigned short pPort, unsigned int pTimeoutms)
{
std::vector<SLookupBlockingResult> lookup_result;
#ifdef ENABLE_C_ARES
lookup_result = LookupWithTimeout(pServer, pTimeoutms, pTimeoutms/2);
#else
lookup_result = LookupBlocking(pServer);
#endif
if (lookup_result.empty())
{
#ifdef _WIN32
Server->Log("No result when looking up hostname \"" + pServer + "\". Err: " + convert(WSAGetLastError()), LL_DEBUG);
#else
Server->Log("No result when looking up hostname \"" + pServer + "\". Err: " + convert(errno), LL_DEBUG);
#endif
return NULL;
}
for (size_t i = 0; i < lookup_result.size(); ++i)
{
IPipe* ret = ConnectStream(lookup_result[i], pPort, pTimeoutms);
if (ret != NULL)
return ret;
}
return NULL;
}
IPipe* CServer::ConnectStream(const SLookupBlockingResult& lookup_result, unsigned short pPort, unsigned int pTimeoutms)
{
union
{
sockaddr_in addr_v4;
sockaddr_in6 addr_v6;
} addr = {};
if (lookup_result.is_ipv6)
{
memcpy(&addr.addr_v6.sin6_addr, lookup_result.addr_v6, sizeof(addr.addr_v6.sin6_addr));
addr.addr_v6.sin6_port = htons(pPort);
addr.addr_v6.sin6_family = AF_INET6;
addr.addr_v6.sin6_scope_id = lookup_result.zone;
}
else
{
addr.addr_v4.sin_addr.s_addr = lookup_result.addr_v4;
addr.addr_v4.sin_port = htons(pPort);
addr.addr_v4.sin_family = AF_INET;
}
int type = SOCK_STREAM;
#if !defined(_WIN32) && defined(SOCK_CLOEXEC)
type |= SOCK_CLOEXEC;
#endif
SOCKET s = socket(lookup_result.is_ipv6 ? AF_INET6 : AF_INET, type, 0);
if (s == SOCKET_ERROR)
{
#if !defined(_WIN32) && defined(SOCK_CLOEXEC)
if (errno == EINVAL)
{
type &= ~SOCK_CLOEXEC;
s = socket(lookup_result.is_ipv6 ? AF_INET6 : AF_INET, type, 0);
}
#endif
if (s == SOCKET_ERROR)
{
return NULL;
}
}
#if !defined(_WIN32) && !defined(SOCK_CLOEXEC)
fcntl(s, F_SETFD, fcntl(s, F_GETFD, 0) | FD_CLOEXEC);
#endif
#ifdef _WIN32
u_long nonBlocking = 1;
if (ioctlsocket(s, FIONBIO, &nonBlocking) == SOCKET_ERROR)
{
Server->Log("Error setting socket to non-blocking. Err: " + convert(WSAGetLastError()), LL_ERROR);
closesocket(s);
return NULL;
}
#else
int flags = fcntl(s, F_GETFL, 0);
if (flags == -1)
{
Server->Log("Error getting socket flags. Errno: " + convert(errno), LL_ERROR);
closesocket(s);
return NULL;
}
if (fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1)
{
Server->Log("Error setting socket to non-blocking. Err: " + convert(errno), LL_ERROR);
closesocket(s);
return NULL;
}
#endif
#ifdef __APPLE__
int val = 1;
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
#endif
sockaddr* addrptr = lookup_result.is_ipv6 ?
reinterpret_cast<sockaddr*>(&addr.addr_v6) : reinterpret_cast<sockaddr*>(&addr.addr_v4);
int rc=connect(s, addrptr, lookup_result.is_ipv6 ? sizeof(addr.addr_v6) : sizeof(addr.addr_v4) );
#ifndef _WIN32
if(rc==SOCKET_ERROR)
{
if(errno!=EINPROGRESS)
{
closesocket(s);
Server->Log("errno !=EINPROGRESS. Connect failed...", LL_DEBUG);
return NULL;
}
}
else
{
return new CStreamPipe(s);
}
#else
if(rc!=SOCKET_ERROR)
{
return new CStreamPipe(s);
}
#endif
#ifdef _WIN32
fd_set conn;
FD_ZERO(&conn);
FD_SET(s, &conn);
timeval lon;
lon.tv_sec=(long)(pTimeoutms/1000);
lon.tv_usec=(long)(pTimeoutms%1000)*1000;
rc=select((int)s+1,NULL,&conn,NULL,&lon);
if( rc>0 && FD_ISSET(s, &conn) )
{
#else
pollfd conn[1];
conn[0].fd=s;
conn[0].events=POLLOUT;
conn[0].revents=0;
rc = poll(conn, 1, pTimeoutms);
if( rc>0 )
{
#endif
int err;
socklen_t len=sizeof(int);
rc=getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
if(rc<0)
{
closesocket(s);
Server->Log("Error getting socket status.", LL_ERROR);
return NULL;
}
if(err)
{
closesocket(s);
Server->Log("Socket has error: "+convert(err), LL_INFO);
return NULL;
}
else
{
#ifdef _WIN32
if(send_window_size>0)
setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *) &send_window_size, sizeof(send_window_size));
if(recv_window_size>0)
setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *) &recv_window_size, sizeof(recv_window_size));
#endif
return new CStreamPipe(s);
}
}
else
{
closesocket(s);
return NULL;
}
}
IPipe * CServer::ConnectSslStream(const std::string & pServer, unsigned short pPort, unsigned int pTimeoutms)
{
#if defined(_WIN32) || defined(WITH_OPENSSL)
int64 starttime = Server->getTimeMS();
CStreamPipe* bpipe = static_cast<CStreamPipe*>(ConnectStream(pServer, pPort, pTimeoutms));
if (bpipe == NULL)
return NULL;
int64 remaining_time = pTimeoutms - (Server->getTimeMS() - starttime);
if (remaining_time < 0) remaining_time = 0;
#ifdef WITH_OPENSSL
OpenSSLPipe* ssl_pipe = new OpenSSLPipe(bpipe);
#else
SChannelPipe* ssl_pipe = new SChannelPipe(bpipe);
#endif
if (!ssl_pipe->ssl_connect(pServer, static_cast<int>(remaining_time)))
{
delete ssl_pipe;
return NULL;
}
return ssl_pipe;
#else
return NULL;
#endif
}
IPipe *CServer::PipeFromSocket(SOCKET pSocket)
{
return new CStreamPipe(pSocket);
}
void CServer::DisconnectStream(IPipe *pipe)
{
CStreamPipe *sp=(CStreamPipe*)pipe;
SOCKET s=sp->getSocket();
closesocket(s);
}
std::string CServer::LookupHostname(const std::string & pIp)
{
std::string hostname;
if (!::LookupHostname(pIp, hostname))
{
return std::string();
}
return hostname;
}
bool CServer::RegisterPluginPerThreadModel(IPluginMgr *pPluginMgr, std::string pName)
{
IScopedLock lock(plugin_mutex);
std::map<std::string, IPluginMgr*>::iterator iter=perthread_pluginmgrs.find( pName );
if( iter!= perthread_pluginmgrs.end() )
return false;
perthread_pluginmgrs.insert( std::pair<std::string, IPluginMgr*>( pName, pPluginMgr ) );
return true;
}
bool CServer::RegisterPluginThreadsafeModel(IPluginMgr *pPluginMgr, std::string pName)
{
IScopedLock lock(plugin_mutex);
std::map<std::string, IPluginMgr*>::iterator iter=threadsafe_pluginmgrs.find( pName );
if( iter!= threadsafe_pluginmgrs.end() )
return false;
threadsafe_pluginmgrs.insert( std::pair<std::string, IPluginMgr*>( pName, pPluginMgr ) );
return true;
}
PLUGIN_ID CServer::StartPlugin(std::string pName, str_map &params)
{
IScopedLock lock(plugin_mutex);
{
std::map<std::string, IPluginMgr*>::iterator iter=perthread_pluginmgrs.find( pName );
if( iter!=perthread_pluginmgrs.end() )
{
++curr_pluginid;
std::pair<IPluginMgr*, str_map> tmp(iter->second, params);
perthread_pluginparams.insert( std::pair<PLUGIN_ID, std::pair<IPluginMgr*, str_map> >(curr_pluginid, tmp) );
return curr_pluginid;
}
}
{
std::map<std::string, IPluginMgr*>::iterator iter1=threadsafe_pluginmgrs.find( pName );
if( iter1!=threadsafe_pluginmgrs.end() )
{
++curr_pluginid;
IPlugin *plugin=iter1->second->createPluginInstance( params );
threadsafe_plugins.insert( std::pair<PLUGIN_ID, IPlugin*>( curr_pluginid, plugin) );
return curr_pluginid;
}
}
return ILLEGAL_PLUGIN_ID;
}
bool CServer::RestartPlugin(PLUGIN_ID pIdentifier)
{
IScopedLock lock(plugin_mutex);
{
std::map<PLUGIN_ID, std::map<THREAD_ID, IPlugin*> >::iterator iter1=perthread_plugins.find( pIdentifier );
if( iter1!=perthread_plugins.end() )
{
bool ret=true;
for( std::map<THREAD_ID, IPlugin*>::iterator iter2=iter1->second.begin();iter2!=iter1->second.end();++iter2)
{
bool b=iter2->second->Reload();
if( b==false )
ret=false;
}
return ret;
}
}
{
std::map<PLUGIN_ID, IPlugin*>::iterator iter1=threadsafe_plugins.find( pIdentifier );
if( iter1!=threadsafe_plugins.end() )
{
return iter1->second->Reload();
}
}
return false;
}
IPlugin* CServer::getPlugin(THREAD_ID tid, PLUGIN_ID pIdentifier)
{
IScopedLock lock(plugin_mutex);
{
std::map<PLUGIN_ID, IPlugin*>::iterator iter1=threadsafe_plugins.find( pIdentifier );
if( iter1!=threadsafe_plugins.end() )
{
return iter1->second;
}
}
{
std::map<PLUGIN_ID, std::pair<IPluginMgr*,str_map> >::iterator iter1=perthread_pluginparams.find( pIdentifier );
if( iter1!= perthread_pluginparams.end() )
{
std::map<THREAD_ID, IPlugin*> *pmap=&perthread_plugins[pIdentifier];
std::map<THREAD_ID, IPlugin*>::iterator iter2=pmap->find(tid);
if( iter2==pmap->end() )
{
IPlugin* newplugin=iter1->second.first->createPluginInstance( iter1->second.second);
pmap->insert( std::pair<THREAD_ID, IPlugin*>( tid, newplugin) );
newplugin->Reset();
return newplugin;
}
else
{
iter2->second->Reset();
return iter2->second;
}
}
}
return NULL;
}
IMutex* CServer::createMutex(void)
{
return new CMutex;
}
IPipe *CServer::createMemoryPipe(void)
{
return new CMemoryPipe;
}
#ifdef _WIN32
struct SThreadInfo
{
std::string name;
IThread* t;
};
void thread_helper_f2(IThread* t)
{
(*t)();
Server->destroyDatabases(Server->getThreadID());
}
DWORD WINAPI thread_helper_f(LPVOID param)
{
IThread* t;
#if defined(_WIN32) && defined(_DEBUG)
SThreadInfo* thread_info = reinterpret_cast<SThreadInfo*>(param);
SetThreadName(-1, thread_info->name.c_str());
t = thread_info->t;
delete thread_info;
#else
t = reinterpret_cast<IThread*>(param);
#endif
#ifndef _DEBUG
__try
{
#endif
thread_helper_f2(t);
#ifndef _DEBUG
}
__except(CServer::WriteDump(GetExceptionInformation()))
{
throw;
}
#endif
return 0;
}
#else //_WIN32
void os_reset_priority();
void* thread_helper_f(void * t)
{
#ifdef __linux__
os_reset_priority();
#endif
IThread *tmp=(IThread*)t;
#ifndef _DEBUG
try
{
#endif
(*tmp)();
#ifndef _DEBUG
}
catch (std::exception& e)
{
Server->Log(std::string("Thread exit with unhandled std::exception ") + e.what(), LL_ERROR);
throw;
}
catch (...)
{
Server->Log(std::string("Thread exit with unhandled C++ exception "), LL_ERROR);
throw;
}
#endif
Server->destroyDatabases(Server->getThreadID());
return NULL;
}
#endif //_WIN32
bool CServer::createThread(IThread *thread, const std::string& name, CreateThreadFlags flags)
{
const size_t thread_large_stack_size = 64 * 1024 * 1024;
#ifdef _WIN32
SIZE_T stack_size = 0;
if (flags & IServer::CreateThreadFlags_LargeStackSize)
{
stack_size = thread_large_stack_size;
}
void* param;
#if defined(_WIN32) && defined(_DEBUG)
SThreadInfo* thread_info = new SThreadInfo;
thread_info->name = name;
thread_info->t = thread;
param = thread_info;
#else
param = thread;
#endif
HANDLE hThread = CreateThread(NULL, stack_size, &thread_helper_f, param, 0, NULL);
if (hThread == NULL
&& flags & IServer::CreateThreadFlags_LargeStackSize)
{
Server->Log("Error creating thread with large stack size. Trying to create thread without LargeStackSize", LL_WARNING);
hThread = CreateThread(NULL, 0, &thread_helper_f, param, 0, NULL);
}
if (hThread == NULL)
{
Server->Log("Error creating thread. Errno: " + convert((int)GetLastError()), LL_ERROR);
return false;
}
else
{
CloseHandle(hThread);
return true;
}
#else
pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0)
{
Server->Log("Error initializing pthread attrs", LL_ERROR);
return false;
}
#if !defined(URB_THREAD_STACKSIZE64) || !defined(URB_THREAD_STACKSIZE32)
#ifndef _LP64
//Only on 32bit architectures
pthread_attr_setstacksize(&attr, 1*1024*1024);
#endif
#else
#ifdef _LP64
pthread_attr_setstacksize(&attr, (URB_THREAD_STACKSIZE64));
#else
pthread_attr_setstacksize(&attr, (URB_THREAD_STACKSIZE32));
#endif
#endif
if (flags & IServer::CreateThreadFlags_LargeStackSize)
{
pthread_attr_setstacksize(&attr, thread_large_stack_size);
}
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0)
{
Server->Log("Error setting thread to detached", LL_ERROR);
pthread_attr_destroy(&attr);
return false;
}
pthread_t t;
if (pthread_create(&t, &attr, &thread_helper_f, (void*)thread) != 0)
{
if (errno == ENOMEM
&& flags & IServer::CreateThreadFlags_LargeStackSize)
{
pthread_attr_destroy(&attr);
Server->Log("Error creating pthread (ENOMEM). Trying to create thread without LargeStackSize", LL_WARNING);
return createThread(thread, name,
static_cast<CreateThreadFlags>(flags & ~(IServer::CreateThreadFlags_LargeStackSize)));
}
Server->Log("Error creating pthread. Errno: " + convert(errno), LL_ERROR);
pthread_attr_destroy(&attr);
return false;
}
#ifdef HAVE_PTHREAD_SETNAME_NP
if (!name.empty())
{
std::string thread_name;
if (name.size() > 15)
{
thread_name = name.substr(0, 15);
}
else
{
thread_name = name;
}
pthread_setname_np(t, thread_name.c_str());
}
#endif //HAVE_PTHREAD_SETNAME_NP
pthread_attr_destroy(&attr);
return true;
#endif
}
void CServer::setCurrentThreadName(const std::string& name)
{
#if defined(_WIN32)
#if defined(_DEBUG)
SetThreadName(-1, name.c_str());
#endif
#elif defined(HAVE_PTHREAD_SETNAME_NP)
if (!name.empty())
{
pthread_t ct = pthread_self();
std::string thread_name;
if (name.size() > 15)
{
thread_name = name.substr(0, 15);
}
else
{
thread_name = name;
}
pthread_setname_np(ct, thread_name.c_str());
}
#endif
}
IThreadPool *CServer::getThreadPool(void)
{
return threadpool;
}
ISettingsReader* CServer::createFileSettingsReader(const std::string& pFile)
{
return new CFileSettingsReader(pFile);
}
ISettingsReader* CServer::createDBSettingsReader(THREAD_ID tid, DATABASE_ID pIdentifier, const std::string &pTable, const std::string &pSQL)
{
return new CDBSettingsReader(tid, pIdentifier, pTable, pSQL);
}
ISettingsReader* CServer::createDBSettingsReader(IDatabase *db, const std::string &pTable, const std::string &pSQL)
{
return new CDBSettingsReader(db, pTable, pSQL);
}
ISettingsReader * CServer::createDBMemSettingsReader(THREAD_ID tid, DATABASE_ID pIdentifier, const std::string & pTable, const std::string & pSQL)
{
return new CDBMemSettingsReader(tid, pIdentifier, pTable, pSQL);
}
ISettingsReader * CServer::createDBMemSettingsReader(IDatabase * db, const std::string & pTable, const std::string & pSQL)
{
return new CDBMemSettingsReader(db, pTable, pSQL);
}
ISettingsReader* CServer::createMemorySettingsReader(const std::string &pData)
{
return new CMemorySettingsReader(pData);
}
void CServer::wait(unsigned int ms)
{
#ifdef _WIN32
Sleep(ms);
#else
if (ms > 1000)
{
sleep(ms / 1000);
if (ms % 1000 != 0)
{
usleep((ms % 1000) * 1000);
}
}
else
{
usleep(ms * 1000);
}
#endif
}
unsigned int CServer::getNumRequests(void)
{
IScopedLock lock(rps_mutex);
unsigned int ret=num_requests;
num_requests=0;
return ret;
}
void CServer::addRequest(void)
{
IScopedLock lock(rps_mutex);
++num_requests;
}
IFsFile* CServer::openFile(std::string pFilename, int pMode)
{
File *file=new File;
if(!file->Open(pFilename, pMode) )
{
delete file;
return NULL;
}
return file;
}
IFsFile* CServer::openFileFromHandle(void *handle, const std::string& pFilename)
{
File *file=new File;
if(!file->Open(handle, pFilename) )
{
delete file;
return NULL;
}
return file;
}
IFsFile* CServer::openTemporaryFile(void)
{
File *file=new File;
if(!file->OpenTemporaryFile(tmpdir) )
{
Server->Log("Error creating temporary file at \""+file->getFilename()+"\"", LL_ERROR);
delete file;
return NULL;
}
return file;
}
IMemFile* CServer::openMemoryFile(const std::string& name, bool mlock_mem)
{
return new CMemoryFile(name, mlock_mem);
}
bool CServer::deleteFile(std::string pFilename)
{
return DeleteFileInt(pFilename);
}
bool CServer::fileExists(std::string pFilename)
{
#ifndef WIN32
return ::FileExists(pFilename);
#else
std::fstream in(ConvertToWchar(pFilename).c_str(), std::ios::in);
if( in.is_open()==false )
return false;
in.close();
return true;
#endif
}
std::string CServer::ConvertToUTF16(const std::string &input)
{
std::string ret;
try
{
std::vector<utf8::uint16_t> tmp;
utf8::utf8to16(input.begin(), input.end(), back_inserter(tmp) );
ret.resize(tmp.size()*2);
memcpy(&ret[0], &tmp[0], tmp.size()*2);
}
catch(...){}
return ret;
}
std::string CServer::ConvertToUTF32(const std::string &input)
{
std::string ret;
try
{
std::vector<utf8::uint32_t> tmp;
utf8::utf8to32(input.begin(), input.end(), back_inserter(tmp) );
ret.resize(tmp.size()*4);
memcpy(&ret[0], &tmp[0], tmp.size()*4);
}
catch(...){}
return ret;
}
std::string CServer::ConvertFromUTF16(const std::string &input)
{
if(input.empty())
{
return std::string();
}
std::string ret;
try
{
utf8::utf16to8((utf8::uint16_t*)&input[0], (utf8::uint16_t*)(&input[input.size()-1]+1), back_inserter(ret));
}
catch(...){}
return ret;
}
std::string CServer::ConvertFromUTF32(const std::string &input)
{
if(input.empty())
{
return std::string();
}
std::string ret;
try
{
utf8::utf32to8((utf8::uint32_t*)&input[0], (utf8::uint32_t*)(&input[input.size()-1]+1), back_inserter(ret));
}
catch(...){}
return ret;
}
std::wstring CServer::ConvertToWchar(const std::string &input)
{
if(input.empty())
{
return std::wstring();
}
std::wstring ret;
try
{
if(sizeof(wchar_t)==2)
{
utf8::utf8to16(&input[0], &input[input.size()-1]+1, back_inserter(ret));
}
else if(sizeof(wchar_t)==4)
{
utf8::utf8to32(&input[0], &input[input.size()-1]+1, back_inserter(ret));
}
}
catch(...){}
return ret;
}
std::string CServer::ConvertFromWchar(const std::wstring &input)
{
if(input.empty())
{
return std::string();
}
std::string ret;
try
{
if(sizeof(wchar_t)==2)
{
utf8::utf16to8(&input[0], &input[input.size()-1]+1, back_inserter(ret));
}
else if(sizeof(wchar_t)==4)
{
utf8::utf32to8(&input[0], &input[input.size()-1]+1, back_inserter(ret));
}
}
catch(...){}
return ret;
}
ICondition* CServer::createCondition(void)
{
return new CCondition();
}
void CServer::addPostFile(POSTFILE_KEY pfkey, const std::string &name, const SPostfile &pf)
{
IScopedLock lock(postfiles_mutex);
postfiles[pfkey][name]=pf;
}
SPostfile CServer::getPostFile(POSTFILE_KEY pfkey, const std::string &name)
{
IScopedLock lock(postfiles_mutex);
std::map<THREAD_ID, std::map<std::string, SPostfile > >::iterator iter1=postfiles.find(pfkey);
if(iter1!=postfiles.end())
{
std::map<std::string, SPostfile >::iterator iter2=iter1->second.find(name);
if(iter2!=iter1->second.end() )
{
return iter2->second;
}
}
return SPostfile();
}
void CServer::clearPostFiles(POSTFILE_KEY pfkey)
{
IScopedLock lock(postfiles_mutex);
std::map<THREAD_ID, std::map<std::string, SPostfile > >::iterator iter1=postfiles.find(pfkey);
if(iter1!=postfiles.end())
{
for(std::map<std::string, SPostfile >::iterator iter2=iter1->second.begin();iter2!=iter1->second.end();++iter2)
{
destroy(iter2->second.file);
}
postfiles.erase(iter1);
}
}
POSTFILE_KEY CServer::getPostFileKey()
{
IScopedLock lock(postfiles_mutex);
return curr_postfilekey++;
}
std::string CServer::getServerWorkingDir(void)
{
return workingdir;
}
void CServer::setServerWorkingDir(const std::string &wdir)
{
workingdir=wdir;
}
void CServer::setTemporaryDirectory(const std::string &dir)
{
tmpdir=dir;
}
void CServer::registerDatabaseFactory(const std::string &pEngineName, IDatabaseFactory *factory)
{
IScopedLock lock(db_mutex);
database_factories[pEngineName]=factory;
}
bool CServer::hasDatabaseFactory(const std::string &pEngineName)
{
IScopedLock lock(db_mutex);
std::map<std::string, IDatabaseFactory*>::iterator it=database_factories.find(pEngineName);
return it!=database_factories.end();
}
bool CServer::attachToDatabase(const std::string &pFile, const std::string &pName, DATABASE_ID pIdentifier)
{
IScopedLock lock(db_mutex);
std::map<DATABASE_ID, SDatabase* >::iterator iter=databases.find(pIdentifier);
if( iter==databases.end() )
{
return false;
}
if(std::find(iter->second->attach.begin(), iter->second->attach.end(), std::pair<std::string,std::string>(pFile, pName))==iter->second->attach.end())
{
iter->second->attach.push_back(std::pair<std::string,std::string>(pFile, pName));
}
return true;
}
bool CServer::setDatabaseAllocationChunkSize(DATABASE_ID pIdentifier, size_t allocation_chunk_size)
{
IScopedLock lock(db_mutex);
std::map<DATABASE_ID, SDatabase* >::iterator iter=databases.find(pIdentifier);
if( iter==databases.end() )
{
return false;
}
iter->second->allocation_chunk_size = allocation_chunk_size;
return true;
}
void CServer::waitForStartupComplete(void)
{
IScopedLock lock(startup_complete_mutex);
if(!startup_complete)
{
startup_complete_cond->wait(&lock);
}
}
void CServer::startupComplete(void)
{
IScopedLock lock(startup_complete_mutex);
startup_complete=true;
startup_complete_cond->notify_all();
}
IPipeThrottler* CServer::createPipeThrottler(size_t bps,
bool percent_max)
{
return new PipeThrottler(bps, percent_max, NULL);
}
IPipeThrottler* CServer::createPipeThrottler(
IPipeThrottlerUpdater* updater)
{
bool percent_max = false;
size_t bps = updater->getThrottleLimit(percent_max);
return new PipeThrottler(bps, percent_max, updater);
}
IThreadPool * CServer::createThreadPool(size_t max_threads, size_t max_waiting_threads, const std::string & idle_name)
{
return new CThreadPool(max_threads, max_waiting_threads, idle_name);
}
void CServer::shutdown(void)
{
run=false;
}
void CServer::initRandom(unsigned int seed)
{
init_genrand(seed);
}
unsigned int CServer::getRandomNumber(void)
{
IScopedLock lock(rnd_mutex);
return genrand_int32();
}
std::vector<unsigned int> CServer::getRandomNumbers(size_t n)
{
IScopedLock lock(rnd_mutex);
std::vector<unsigned int> ret;
ret.resize(n);
for(size_t i=0;i<n;++i)
{
ret[i]=genrand_int32();
}
return ret;
}
void CServer::randomFill(char *buf, size_t blen)
{
IScopedLock lock(rnd_mutex);
char *dptr=buf+blen;
while(buf<dptr)
{
if(dptr-buf>=sizeof(unsigned int))
{
*((unsigned int*)buf)=genrand_int32();
buf+=sizeof(unsigned int);
}
else
{
unsigned int rnd=genrand_int32();
memcpy(buf, &rnd, dptr-buf);
buf+=dptr-buf;
}
}
}
unsigned int CServer::getSecureRandomNumber(void)
{
#ifdef _WIN32
unsigned int rnd;
errno_t err = rand_s(&rnd);
if (err != 0)
{
Log("Error generating secure random number", LL_ERROR);
abort();
return getRandomNumber();
}
return rnd;
#else
unsigned int rnd;
secureRandomFill(reinterpret_cast<char*>(&rnd), sizeof(rnd));
return rnd;
#endif
}
std::vector<unsigned int> CServer::getSecureRandomNumbers(size_t n)
{
if (n == 0)
{
return std::vector<unsigned int>();
}
std::vector<unsigned int> ret;
ret.resize(n);
#ifdef _WIN32
for (size_t i = 0; i < n; ++i)
{
ret[i] = getSecureRandomNumber();
}
#else
char* buf = reinterpret_cast<char*>(&ret[0]);
size_t bsize = sizeof(unsigned int) * n;
secureRandomFill(buf, bsize);
#endif
return ret;
}
void CServer::secureRandomFill(char *buf, size_t blen)
{
#ifdef _WIN32
char *dptr=buf+blen;
while(buf<dptr)
{
const unsigned int rnd = getSecureRandomNumber();
const size_t to_write = (std::min)(sizeof(rnd), static_cast<size_t>(dptr - buf));
memcpy(buf, &rnd, to_write);
buf += to_write;
}
#elif defined(HAVE_GETRANDOM) && defined(HAVE_SYS_RANDOM_H)
while (blen > 0)
{
ssize_t rc = getrandom(buf, blen, 0);
if (rc == -1
&& errno == EINTR)
{
rc = 0;
}
else if (rc == -1)
{
Server->Log("Error filling secure random buffer. Errno " + convert((int64)errno), LL_ERROR);
abort();
}
blen -= rc;
buf += rc;
}
#else
std::fstream rnd_in("/dev/urandom", std::ios::in | std::ios::binary);
if (!rnd_in.is_open())
{
Log("Error opening /dev/urandom for secure random number fill. Errno " + convert((int64)errno), LL_ERROR);
randomFill(buf, blen);
return;
}
rnd_in.read(buf, blen);
assert(rnd_in.gcount() == blen);
if (rnd_in.gcount() != blen
|| rnd_in.fail() || rnd_in.eof())
{
Log("Error reading secure random numbers fill. Errno " + convert((int64)errno), LL_ERROR);
abort();
randomFill(buf, blen);
return;
}
#endif
}
std::string CServer::secureRandomString(size_t len)
{
std::string rchars="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
std::string key;
std::vector<unsigned int> rnd_n=Server->getSecureRandomNumbers(len);
for(size_t j=0;j<len;++j)
key+=rchars[rnd_n[j]%rchars.size()];
return key;
}
void CServer::setLogCircularBufferSize(size_t size)
{
IScopedLock lock(log_mutex);
circular_log_buffer.resize(size);
has_circular_log_buffer=size>0?true:false;
}
std::vector<SCircularLogEntry> CServer::getCicularLogBuffer( size_t minid )
{
IScopedLock lock(log_mutex);
if(minid==std::string::npos)
{
return circular_log_buffer;
}
for(size_t i=0;i<circular_log_buffer.size();++i)
{
if(circular_log_buffer[i].id>minid &&
circular_log_buffer[i].id!=std::string::npos)
{
return circular_log_buffer;
}
}
return std::vector<SCircularLogEntry>();
}
void CServer::logToCircularBuffer(const std::string& msg, int loglevel)
{
if(circular_log_buffer.empty())
return;
SCircularLogEntry &entry=circular_log_buffer[circular_log_buffer_idx];
entry.utf8_msg=msg;
entry.loglevel=loglevel;
entry.id=circular_log_buffer_id++;
entry.time=Server->getTimeSeconds();
circular_log_buffer_idx=(circular_log_buffer_idx+1)%circular_log_buffer.size();
}
void CServer::setFailBit(size_t failbit)
{
failbits=failbits|failbit;
}
void CServer::clearFailBit(size_t failbit)
{
failbits=failbit & (~failbit);
}
size_t CServer::getFailBits(void)
{
return failbits;
}
ISharedMutex* CServer::createSharedMutex()
{
return new SharedMutex;
}
void CServer::setLogRotationFilesize( size_t filesize )
{
log_rotation_size=filesize;
}
void CServer::setLogRotationNumFiles( size_t numfiles )
{
log_rotation_files=numfiles;
}
void CServer::LoadStaticPlugins()
{
std::vector<SStaticPlugin>& staticplugins = get_static_plugin_registrations();
std::sort(staticplugins.begin(), staticplugins.end());
for(size_t i=0;i<staticplugins.size();++i)
{
LOADACTIONS loadfunc = staticplugins[i].loadactions;
loadfunc(this);
}
}
void CServer::setLogConsoleTime(bool b)
{
log_console_time = b;
}
#ifdef _WIN32
void CServer::setSocketWindowSizes(int p_send_window_size, int p_recv_window_size)
{
send_window_size = p_send_window_size;
recv_window_size = p_recv_window_size;
}
int CServer::getSendWindowSize()
{
return send_window_size;
}
int CServer::getRecvWindowSize()
{
return recv_window_size;
}
#endif
void CServer::mallocFlushTcache()
{
}
void CServer::addWebSocket(IWebSocket* websocket)
{
IScopedLock lock(web_socket_mutex);
web_sockets[websocket->getName()] = websocket;
}
THREAD_ID CServer::ExecuteWebSocket(const std::string& name, str_map& GET, str_map& PARAMS, IPipe* pipe, const std::string& endpoint_name)
{
IWebSocket* ws;
{
IScopedLock lock(web_socket_mutex);
std::map<std::string, IWebSocket*>::iterator it = web_sockets.find(name);
if (it == web_sockets.end())
return ILLEGAL_THREAD_ID;
ws = it->second;
}
THREAD_ID tid = getThreadID();
ws->Execute(GET, tid, PARAMS, pipe, endpoint_name);
return tid;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/wangcichen/urbackup_backend.git
git@gitee.com:wangcichen/urbackup_backend.git
wangcichen
urbackup_backend
urbackup_backend
dev

搜索帮助