21 Star 42 Fork 57

openGauss/openGauss-connector-odbc

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
connection.c 115.03 KB
一键复制 编辑 原始数据 按行查看 历史
zhangtingtingting 提交于 2025-03-26 11:39 +08:00 . 完善配置参数
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532
/*------
* Module: connection.c
*
* Description: This module contains routines related to
* connecting to and disconnecting from the Postgres DBMS.
*
* Classes: ConnectionClass (Functions prefix: "CC_")
*
* API functions: SQLAllocConnect, SQLConnect, SQLDisconnect, SQLFreeConnect,
* SQLBrowseConnect(NI)
*
* Comments: See "readme.txt" for copyright and license information.
*-------
*/
/* Multibyte support Eiji Tokuya 2001-03-15 */
/* TryEnterCritiaclSection needs the following #define */
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0400
#endif /* _WIN32_WINNT */
#include <ctype.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
/* for htonl */
#ifdef WIN32
#include <Winsock2.h>
#include <windows.h>
#else
#include <arpa/inet.h>
#endif
#ifndef WIN32
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <dlfcn.h>
#include <pwd.h>
#endif
#include "connection.h"
#include "misc.h"
#include "environ.h"
#include "statement.h"
#include "qresult.h"
#include "lobj.h"
#include "dlg_specific.h"
#include "loadlib.h"
#include "multibyte.h"
#include "pgapifunc.h"
#define SAFE_STR(s) (NULL != (s) ? (s) : "(null)")
/* how many statement holders to allocate
* at a time
*/
#define STMT_INCREMENT 16
#define MAX_PARTS 16
#define EXTRA_ROOM 128 /* Extra room for string additional splicing */
#define MAX_CN 128 /* the maximum number of CN is 128 */
#define EMPTY 0
#define CORRECT 1
#define WRONG 2
#define lenNameType 63
BOOL conn_inited = FALSE;
BOOL conn_precheck = FALSE;
pthread_rwlock_t init_lock = PTHREAD_RWLOCK_INITIALIZER;
int refresh_flag = 0;
unsigned long int pgxc_node_thread_id;
typedef struct CnEntry {
char ip_list[MAX_CN][MEDIUM_REGISTRY_LEN]; /* a char array to store IPs */
int ip_status[MAX_CN]; /* an integer array that indicates the status of IPs */
char port_list[MAX_CN][SMALL_REGISTRY_LEN]; /* a char array that stores port */
int port_status[MAX_CN]; /* an integer array that indicates the status of IPs */
int ip_count; /* define integer to record the number of IPs stored in the IP_list */
int port_count; /*
* define integer to record the number of IPs stored in the IP_list
* this should be equal to IP_count
*/
int step[MAX_CN]; /* record the offset for roundRobin when autobalance is on */
BOOL is_usable;
pthread_rwlock_t ip_list_lock; /* define a lock to isolate read and write to ip_list */
pthread_rwlock_t step_lock[MAX_CN]; /* define a lock to isolate read and write to step */
} CnEntry;
typedef struct dsn_time {
char *DSN;
int timeinterval;
} dsn_time;
CnEntry orig_entry;
CnEntry pgxc_entry;
static int LIBPQ_connect(ConnectionClass *self);
int split_host_or_port_with_limit(const char *str, char *result_array[MAX_PARTS], int* total_length);
char* generate_conninfo_URL_by_ConnInfo(ConnInfo* ci, int* host_number, int* port_number);
#ifdef WIN32
DWORD WINAPI read_pgxc_node(LPVOID arg)
#else
static void *read_pgxc_node(void *arg)
#endif
{
dsn_time read_cn;
read_cn = *(dsn_time *) arg;
char *DSN = malloc(strlen(read_cn.DSN) + 1);
if (DSN == NULL) {
exit(1);
}
strncpy_null(DSN, read_cn.DSN, strlen(read_cn.DSN) + 1);
read_cn.DSN = DSN;
int time = read_cn.timeinterval;
#ifdef WIN32
pgxc_node_thread_id = GetCurrentThreadId();
#else
pgxc_node_thread_id = pthread_self();
#endif
int refresh_count = 0;
/* read CNs' IP from pgxc_node */
for (;;) {
MYLOG(0, "REFRESH starts\n");
SQLHENV hEnv = SQL_NULL_HENV;
SQLHDBC hDbc = SQL_NULL_HDBC;
SQLHSTMT hStmt = SQL_NULL_HSTMT;
SQLRETURN rc = SQL_SUCCESS;
SQLINTEGER RETCODE = 0;
char node_port[SMALL_REGISTRY_LEN];
char node_host[lenNameType];
SQLLEN lenPort=0, lenHost=0;
RETCODE = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hEnv);
if (RETCODE != SQL_SUCCESS) {
continue;
}
SQLSetEnvAttr(hEnv, SQL_ATTR_ODBC_VERSION, (void*) SQL_OV_ODBC3, 0);
RETCODE = SQLAllocHandle(SQL_HANDLE_DBC, hEnv, &hDbc);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLConnect(hDbc, // Connect handle
(SQLCHAR *)DSN, //DSN
SQL_NTS, // DSN is nul-terminated
NULL, // Null UID
0 ,
NULL, // Null Auth string
0);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLAllocHandle(SQL_HANDLE_STMT, hDbc, &hStmt);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLExecDirect(hStmt,"select node_port, node_host from pgxc_node where node_type = 'C' and nodeis_active order by node_name",SQL_NTS);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLBindCol(hStmt, 1, SQL_C_CHAR, node_port, SMALL_REGISTRY_LEN, &lenPort);
RETCODE = SQLBindCol(hStmt, 2, SQL_C_CHAR, node_host, lenNameType + 1, &lenHost);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
int count = 0;
char IP_list_temp[MAX_CN][MEDIUM_REGISTRY_LEN];
char port_list_temp[MAX_CN][SMALL_REGISTRY_LEN];
char port_temp[SMALL_REGISTRY_LEN];
while ((rc = SQLFetch(hStmt)) == SQL_SUCCESS) {
refresh_flag = 1;
STRCPY_FIXED(IP_list_temp[count], node_host);
STRCPY_FIXED(port_list_temp[count++], node_port);
pgxc_entry.ip_count = count;
}
refresh_count++;
if (refresh_count > 10 && rc == SQL_ERROR) {
refresh_flag = 1;
MYLOG(0, "Refresh failed for ten times, change signal and unlock other threads.\n");
}
int i;
if (count != 0 && pthread_rwlock_wrlock(&pgxc_entry.ip_list_lock) == 0) {
for (i = 0; i < pgxc_entry.ip_count; i++) {
STRCPY_FIXED(pgxc_entry.ip_list[i], IP_list_temp[i]);
STRCPY_FIXED(pgxc_entry.port_list[i], port_list_temp[i]);
MYLOG(0, "ip = %s, port = %s\n", pgxc_entry.ip_list[i], pgxc_entry.port_list[i]);
}
MYLOG(0, "CN list has been refreshed.\n");
if (pthread_rwlock_unlock(&pgxc_entry.ip_list_lock) != 0) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
MYLOG(0, "Unlock failed. Exit process.\n");
exit(1);
}
}
SQLDisconnect(hDbc);
sleep(time);
}
}
static BOOL check_IP_connection(ConnectionClass *conn, CnEntry *entry)
{
int i;
int pqret;
BOOL ret = FALSE;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return FALSE;
}
MYLOG(0, "Start checking the connection for each pair of IP and PORT.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_rdlock(&entry->ip_list_lock);
}
for (i = 0; i < entry->ip_count; i++) {
STRCPY_FIXED(ci->server, entry->ip_list[i]);
STRCPY_FIXED(ci->port, entry->port_list[i]);
if ((pqret = LIBPQ_connect(conn)) <= 0) {
/* connection failed, kick out the wrong IP from IP_list and write the wrong IP into log */
MYLOG(0, "Cannot establish connection via IP: %s\n", entry->ip_list[i]);
entry->ip_status[i] = WRONG;
} else {
/* connection successful, current IP remains in IP_list but disconnect */
entry->ip_status[i] = CORRECT;
PQfinish(conn->pqconn);
ret = TRUE;
}
}
MYLOG(0, "Check finished.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_unlock(&entry->ip_list_lock);
}
return ret;
}
static void start_new_thread(dsn_time *read_cn)
{
#ifdef WIN32
CreateThread(NULL, 0, read_pgxc_node, (LPVOID)(read_cn), 0, NULL);
#else
pthread_t ntid;
pthread_create(&ntid, NULL, read_pgxc_node, read_cn);
#endif
}
static RETCODE init_conn(ConnectionClass *conn)
{
RETCODE ret = SQL_SUCCESS;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return SQL_ERROR;
}
/* initialize */
memset(&pgxc_entry, 0, sizeof(CnEntry));
memset(&orig_entry, 0, sizeof(CnEntry));
pgxc_entry.is_usable = TRUE;
orig_entry.is_usable = TRUE;
int i;
for (i = 0; i < MAX_CN; i++) {
pthread_rwlock_init(&pgxc_entry.step_lock[i], NULL);
pthread_rwlock_init(&orig_entry.step_lock[i], NULL);
}
pthread_rwlock_init(&pgxc_entry.ip_list_lock, NULL);
pthread_rwlock_init(&orig_entry.ip_list_lock, NULL);
/* make a copy of ci->server and ci->port to prevent changing the original conn when parsing it */
char server[LARGE_REGISTRY_LEN];
STRCPY_FIXED(server, ci->server);
char port[LARGE_REGISTRY_LEN];
STRCPY_FIXED(port, ci->port);
/* parsing ci->server to seperate IPs and store them into IP_list */
char *p = strtok(server, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.ip_list[orig_entry.ip_count++], p);
STRCPY_FIXED(pgxc_entry.ip_list[pgxc_entry.ip_count++], p);
p = strtok(NULL, ",");
}
/* parsing ci->port to seperate PORTs and store them into port_list */
p = strtok(port, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], p);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], p);
p = strtok(NULL, ",");
}
/* if only one port was configured, then each CN has the same port by default */
if (orig_entry.port_count == 1) {
for (i = 1; i < orig_entry.ip_count; i++) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], orig_entry.port_list[0]);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], pgxc_entry.port_list[0]);
}
}
/* if serverl ports were configured, the number of ports has to be equal to the number of IPs */
if (orig_entry.ip_count != orig_entry.port_count) {
MYLOG(0, "The number of IP %d does not match the number of Port %d.\n", orig_entry.ip_count, orig_entry.port_count);
return SQL_ERROR;
}
/* check the connection of each pair of IP and port and update the list and status */
if (!check_IP_connection(conn, &orig_entry)) {
return SQL_ERROR;
}
memcpy(pgxc_entry.ip_status, orig_entry.ip_status, sizeof(orig_entry.ip_status));
/* start new thread to connect to datbase and select node_port from pgxc_node */
dsn_time read_cn;
read_cn.DSN = ci->dsn;
if (ci->refreshcnlisttime == 0) {
read_cn.timeinterval = 10;
} else {
read_cn.timeinterval = ci->refreshcnlisttime;
}
start_new_thread(&read_cn);
return ret;
}
int get_location(BOOL *visited, CnEntry *entry, int *visited_count)
{
if (visited == NULL || entry == NULL || visited_count == NULL) {
return -1;
}
/* select random IP from ip_list */
srand(pthread_self());
unsigned int ind = rand() % entry->ip_count;
/* record the offset of each IP for roundRobin */
int *offset = &entry->step[ind];
pthread_rwlock_t *offset_lock = &entry->step_lock[ind];
/*
* if the selected IP can be connected and has not been visited, connect to this IP
* else enter the while loop to choose another IP for connect
* use visited_count to record the number of IPs that have been visited
* visited_count equals to ip_count means reaching the limit
*/
while ((entry->ip_status[ind] == WRONG || visited[ind] == TRUE) && (*visited_count) != entry->ip_count) {
if (visited[ind] == FALSE) {
visited[ind] = TRUE;
(*visited_count)++;
}
pthread_rwlock_wrlock(offset_lock);
while (visited[ind] == TRUE && (*visited_count) != entry->ip_count) {
ind = ind + *offset;
(*offset)++;
ind = ind % entry->ip_count;
}
pthread_rwlock_unlock(offset_lock);
}
/*
* the selected IP after the while loop should not have been visited
* if it has been visited, return error
*/
if (visited[ind] == TRUE) {
return -1;
}
visited[ind] = TRUE;
(*visited_count)++;
return ind;
}
static RETCODE connect_random_IP(ConnectionClass *conn, CnEntry *entry)
{
RETCODE ret = SQL_ERROR;
ConnInfo *ci = &conn->connInfo;
CSTR func = "PGAPI_Connect";
char fchar;
BOOL visited[MAX_CN] = {FALSE};
int visited_count = 0;
BOOL check_ret = check_IP_connection(conn, entry);
/* only connection successful and all connection failed will break the while loop */
while (ret == SQL_ERROR) {
if (entry == &pgxc_entry && pthread_rwlock_rdlock(&entry->ip_list_lock) != 0) {
return SQL_ERROR;
}
int ind = get_location(visited, entry, &visited_count);
if (ind == -1) {
pthread_rwlock_unlock(&entry->ip_list_lock);
return SQL_ERROR;
}
STRCPY_FIXED(ci->server, entry->ip_list[ind]);
STRCPY_FIXED(ci->port, entry->port_list[ind]);
while (entry == &pgxc_entry && pthread_rwlock_unlock(&entry->ip_list_lock) != 0);
if ((fchar = CC_connect(conn, NULL)) <= 0) {
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
} else {
ret = SQL_SUCCESS;
}
}
if (ret == SQL_SUCCESS && fchar == 2) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
/* Empty the password stored in memory to avoid password leak */
if (NAME_IS_VALID(ci->password))
memset(ci->password.name, 0, strlen(ci->password.name));
return ret;
}
static RETCODE connect_IP(ConnectionClass *conn)
{
if (conn->connInfo.priority == 1 && orig_entry.is_usable && connect_random_IP(conn, &orig_entry) != SQL_ERROR) {
return SQL_SUCCESS;
}
return connect_random_IP(conn, &pgxc_entry);
}
static RETCODE check_and_init(ConnectionClass *conn)
{
if (conn_precheck) {
return SQL_SUCCESS;
}
if (pthread_rwlock_rdlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
pthread_rwlock_unlock(&init_lock);
if (pthread_rwlock_wrlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
if (init_conn(conn) != SQL_SUCCESS) {
pthread_rwlock_unlock(&init_lock);
return SQL_ERROR;
} else {
conn_inited = TRUE;
}
pthread_rwlock_unlock(&init_lock);
conn_precheck = TRUE;
return SQL_SUCCESS;
}
static SQLRETURN CC_lookup_lo(ConnectionClass *self);
static int CC_close_eof_cursors(ConnectionClass *self);
static void LIBPQ_update_transaction_status(ConnectionClass *self);
static void CC_set_error_if_not_set(ConnectionClass *self, int errornumber, const char *errormsg, const char *func)
{
int errornum = CC_get_errornumber(self);
const char *errmsg = CC_get_errormsg(self);
if (errornumber == 0)
return;
if (errornumber > 0)
{
if (errornum <= 0)
CC_set_error(self, errornumber, errormsg, func);
else if (!errmsg)
CC_set_errormsg(self, errormsg);
}
else if (errornum == 0)
CC_set_error(self, errornumber, errormsg, func);
else if (errornum < 0 && !errmsg)
CC_set_errormsg(self, errormsg);
}
RETCODE SQL_API
PGAPI_AllocConnect(HENV henv,
HDBC * phdbc)
{
EnvironmentClass *env = (EnvironmentClass *) henv;
ConnectionClass *conn;
CSTR func = "PGAPI_AllocConnect";
MYLOG(0, "entering...\n");
conn = CC_Constructor();
MYLOG(0, "**** henv = %p, conn = %p\n", henv, conn);
if (!conn)
{
env->errormsg = "Couldn't allocate memory for Connection object.";
env->errornumber = ENV_ALLOC_ERROR;
*phdbc = SQL_NULL_HDBC;
EN_log_error(func, "", env);
return SQL_ERROR;
}
if (!EN_add_connection(env, conn))
{
env->errormsg = "Maximum number of connections exceeded.";
env->errornumber = ENV_ALLOC_ERROR;
CC_Destructor(conn);
*phdbc = SQL_NULL_HDBC;
EN_log_error(func, "", env);
return SQL_ERROR;
}
if (phdbc)
*phdbc = (HDBC) conn;
return SQL_SUCCESS;
}
RETCODE SQL_API
PGAPI_Connect(HDBC hdbc,
const SQLCHAR * szDSN,
SQLSMALLINT cbDSN,
const SQLCHAR * szUID,
SQLSMALLINT cbUID,
const SQLCHAR * szAuthStr,
SQLSMALLINT cbAuthStr)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
ConnInfo *ci;
CSTR func = "PGAPI_Connect";
RETCODE ret = SQL_SUCCESS;
char fchar, *tmpstr;
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
ci = &conn->connInfo;
CC_conninfo_init(ci, INIT_GLOBALS);
make_string(szDSN, cbDSN, ci->dsn, sizeof(ci->dsn));
/* get the values for the DSN from the registry */
getDSNinfo(ci, NULL);
logs_on_off(1, ci->drivers.debug, ci->drivers.commlog);
/* initialize pg_version from connInfo.protocol */
CC_initialize_pg_version(conn);
MYLOG(0, "entering..cbDSN=%hi.\n", cbDSN);
/*
* override values from DSN info with UID and authStr(pwd) This only
* occurs if the values are actually there.
*/
fchar = ci->username[0]; /* save the first byte */
make_string(szUID, cbUID, ci->username, sizeof(ci->username));
if ('\0' == ci->username[0]) /* an empty string is specified */
ci->username[0] = fchar; /* restore the original username */
tmpstr = make_string(szAuthStr, cbAuthStr, NULL, 0);
if (tmpstr)
{
if (tmpstr[0]) /* non-empty string is specified */
STR_TO_NAME(ci->password, tmpstr);
free(tmpstr);
}
MYLOG(0, "conn = %p (DSN='%s', UID='%s', PWD='%s')\n", conn, ci->dsn, ci->username, NAME_IS_VALID(ci->password) ? "xxxxx" : "");
if (ci->autobalance == 1) {
if (check_and_init(conn) != SQL_SUCCESS) {
return SQL_ERROR;
}
#ifdef WIN32
if (GetCurrentThreadId() != pgxc_node_thread_id)
#else
if (pthread_self() != pgxc_node_thread_id)
#endif
{
while (refresh_flag != 1) {
#ifdef WIN32
sleep(10);
#else
usleep(10000);
#endif
}
}
ret = connect_IP(conn);
}
else {
if ((fchar = CC_connect(conn, NULL)) <= 0) {
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
}
if (SQL_SUCCESS == ret && 2 == fchar) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
if (NAME_IS_VALID(ci->password)) {
memset(ci->password.name, 0, strlen(ci->password.name));
}
}
return ret;
}
RETCODE SQL_API
PGAPI_BrowseConnect(HDBC hdbc,
const SQLCHAR * szConnStrIn,
SQLSMALLINT cbConnStrIn,
SQLCHAR * szConnStrOut,
SQLSMALLINT cbConnStrOutMax,
SQLSMALLINT * pcbConnStrOut)
{
CSTR func = "PGAPI_BrowseConnect";
ConnectionClass *conn = (ConnectionClass *) hdbc;
MYLOG(0, "entering...\n");
CC_set_error(conn, CONN_NOT_IMPLEMENTED_ERROR, "Function not implemented", func);
return SQL_ERROR;
}
/* Drop any hstmts open on hdbc and disconnect from database */
RETCODE SQL_API
PGAPI_Disconnect(HDBC hdbc)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
CSTR func = "PGAPI_Disconnect";
MYLOG(0, "entering...\n");
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
if (conn->status == CONN_EXECUTING)
{
CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func);
return SQL_ERROR;
}
logs_on_off(-1, conn->connInfo.drivers.debug, conn->connInfo.drivers.commlog);
MYLOG(0, "about to CC_cleanup\n");
/* Close the connection and free statements */
CC_cleanup(conn, FALSE);
MYLOG(0, "done CC_cleanup\n");
MYLOG(0, "leaving...\n");
return SQL_SUCCESS;
}
RETCODE SQL_API
PGAPI_FreeConnect(HDBC hdbc)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
CSTR func = "PGAPI_FreeConnect";
EnvironmentClass *env;
MYLOG(0, "entering...hdbc=%p\n", hdbc);
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
/* Remove the connection from the environment */
if (NULL != (env = CC_get_env(conn)) &&
!EN_remove_connection(env, conn))
{
CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func);
return SQL_ERROR;
}
CC_Destructor(conn);
MYLOG(0, "leaving...\n");
return SQL_SUCCESS;
}
/*
* IMPLEMENTATION CONNECTION CLASS
*/
static void
reset_current_schema(ConnectionClass *self)
{
if (self->current_schema)
{
free(self->current_schema);
self->current_schema = NULL;
}
self->current_schema_valid = FALSE;
}
static ConnectionClass *
CC_alloc(void)
{
return (ConnectionClass *) calloc(sizeof(ConnectionClass), 1);
}
static void
CC_lockinit(ConnectionClass *self)
{
INIT_CONNLOCK(self);
INIT_CONN_CS(self);
}
static ConnectionClass *
CC_initialize(ConnectionClass *rv, BOOL lockinit)
{
size_t clear_size;
#if defined(WIN_MULTITHREAD_SUPPORT) || defined(POSIX_THREADMUTEX_SUPPORT)
clear_size = (char *)&(rv->cs) - (char *)rv;
#else
clear_size = sizeof(ConnectionClass);
#endif /* WIN_MULTITHREAD_SUPPORT */
memset(rv, 0, clear_size);
rv->status = CONN_NOT_CONNECTED;
rv->transact_status = CONN_IN_AUTOCOMMIT; /* autocommit by default */
rv->unnamed_prepared_stmt = NULL;
rv->stmts = (StatementClass **) malloc(sizeof(StatementClass *) * STMT_INCREMENT);
if (!rv->stmts)
goto cleanup;
memset(rv->stmts, 0, sizeof(StatementClass *) * STMT_INCREMENT);
rv->num_stmts = STMT_INCREMENT;
rv->descs = (DescriptorClass **) malloc(sizeof(DescriptorClass *) * STMT_INCREMENT);
if (!rv->descs)
goto cleanup;
memset(rv->descs, 0, sizeof(DescriptorClass *) * STMT_INCREMENT);
rv->num_descs = STMT_INCREMENT;
rv->lobj_type = PG_TYPE_LO_UNDEFINED;
if (isMsAccess())
rv->ms_jet = 1;
rv->isolation = 0; // means initially unknown server's default isolation
rv->mb_maxbyte_per_char = 1;
rv->max_identifier_length = -1;
rv->autocommit_public = SQL_AUTOCOMMIT_ON;
/* Initialize statement options to defaults */
/* Statements under this conn will inherit these options */
InitializeStatementOptions(&rv->stmtOptions);
InitializeARDFields(&rv->ardOptions);
InitializeAPDFields(&rv->apdOptions);
#ifdef _HANDLE_ENLIST_IN_DTC_
rv->asdum = NULL;
rv->gTranInfo = 0;
#endif /* _HANDLE_ENLIST_IN_DTC_ */
if (lockinit)
CC_lockinit(rv);
return rv;
cleanup:
CC_Destructor(rv);
return NULL;
}
ConnectionClass *
CC_Constructor()
{
ConnectionClass *rv, *retrv = NULL;
if (rv = CC_alloc(), NULL != rv)
retrv = CC_initialize(rv, TRUE);
return retrv;
}
char
CC_Destructor(ConnectionClass *self)
{
MYLOG(0, "entering self=%p\n", self);
if (self->status == CONN_EXECUTING)
return 0;
CC_cleanup(self, FALSE); /* cleanup socket and statements */
MYLOG(0, "after CC_Cleanup\n");
/* Free up statement holders */
if (self->stmts)
{
free(self->stmts);
self->stmts = NULL;
}
if (self->descs)
{
free(self->descs);
self->descs = NULL;
}
MYLOG(0, "after free statement holders\n");
NULL_THE_NAME(self->schemaIns);
NULL_THE_NAME(self->tableIns);
CC_conninfo_release(&self->connInfo);
if (self->__error_message)
free(self->__error_message);
DELETE_CONN_CS(self);
DELETE_CONNLOCK(self);
free(self);
MYLOG(0, "leaving\n");
return 1;
}
/* Return how many cursors are opened on this connection */
int
CC_cursor_count(ConnectionClass *self)
{
StatementClass *stmt;
int i,
count = 0;
QResultClass *res;
MYLOG(0, "self=%p, num_stmts=%d\n", self, self->num_stmts);
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt && (res = SC_get_Result(stmt)) && QR_get_cursor(res))
count++;
}
CONNLOCK_RELEASE(self);
MYLOG(0, "leaving %d\n", count);
return count;
}
void
CC_clear_error(ConnectionClass *self)
{
if (!self) return;
CONNLOCK_ACQUIRE(self);
self->__error_number = 0;
if (self->__error_message)
{
free(self->__error_message);
self->__error_message = NULL;
}
self->sqlstate[0] = '\0';
CONNLOCK_RELEASE(self);
}
void
CC_examine_global_transaction(ConnectionClass *self)
{
if (!self) return;
#ifdef _HANDLE_ENLIST_IN_DTC_
if (CC_is_in_global_trans(self))
CALL_IsolateDtcConn(self, TRUE);
#endif /* _HANDLE_ENLIST_IN_DTC_ */
}
CSTR bgncmd = "START TRANSACTION";
CSTR cmtcmd = "COMMIT";
CSTR rbkcmd = "ROLLBACK";
CSTR svpcmd = "SAVEPOINT";
CSTR per_query_svp = "_per_query_svp_";
CSTR rlscmd = "RELEASE";
/*
* Used to begin a transaction.
*/
char
CC_begin(ConnectionClass *self)
{
char ret = TRUE;
if (!CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, bgncmd, NULL, 0, NULL);
MYLOG(0, " sending BEGIN!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
return ret;
}
/*
* Used to commit a transaction.
* We are almost always in the middle of a transaction.
*/
char
CC_commit(ConnectionClass *self)
{
char ret = TRUE;
if (CC_is_in_trans(self))
{
if (!CC_is_in_error_trans(self))
CC_close_eof_cursors(self);
if (CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, cmtcmd, NULL, 0, NULL);
MYLOG(0, " sending COMMIT!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
}
return ret;
}
/*
* Used to cancel a transaction.
* We are almost always in the middle of a transaction.
*/
char
CC_abort(ConnectionClass *self)
{
char ret = TRUE;
if (CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, rbkcmd, NULL, 0, NULL);
MYLOG(0, " sending ABORT!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
return ret;
}
/* This is called by SQLSetConnectOption etc also */
char
CC_set_autocommit(ConnectionClass *self, BOOL on)
{
BOOL currsts = CC_is_in_autocommit(self);
if ((on && currsts) ||
(!on && !currsts))
return on;
MYLOG(0, " %d->%d\n", currsts, on);
if (CC_is_in_trans(self))
CC_commit(self);
if (on)
self->transact_status |= CONN_IN_AUTOCOMMIT;
else
self->transact_status &= ~CONN_IN_AUTOCOMMIT;
return on;
}
/* Clear cached table info */
static void
CC_clear_col_info(ConnectionClass *self, BOOL destroy)
{
if (self->col_info)
{
int i;
COL_INFO *coli;
for (i = 0; i < self->ntables; i++)
{
if (coli = self->col_info[i], NULL != coli)
{
if (destroy || coli->refcnt == 0)
{
free_col_info_contents(coli);
free(coli);
self->col_info[i] = NULL;
}
else
coli->acc_time = 0;
}
}
self->ntables = 0;
if (destroy)
{
free(self->col_info);
self->col_info = NULL;
self->coli_allocated = 0;
}
}
}
static void
CC_set_locale_encoding(ConnectionClass *self, const char * encoding)
{
char *currenc = self->locale_encoding;
if (encoding)
self->locale_encoding = strdup(encoding);
else
self->locale_encoding = NULL;
if (currenc)
free(currenc);
}
static void
CC_determine_locale_encoding(ConnectionClass *self)
{
const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding");
const char *encoding;
QLOG(0, "PQparameterStatus(%p, \"client_encoding\")=%s\n", self->pqconn, SAFE_STR(dbencoding));
if (self->locale_encoding) /* already set */
return;
encoding = derive_locale_encoding(dbencoding);
if (!encoding)
encoding = "SQL_ASCII";
CC_set_locale_encoding(self, encoding);
}
static void
CC_set_client_encoding(ConnectionClass *self, const char * encoding)
{
char *currenc = self->original_client_encoding;
if (encoding)
{
self->original_client_encoding = strdup(encoding);
self->ccsc = pg_CS_code(encoding);
}
else
{
self->original_client_encoding = NULL;
self->ccsc = SQL_ASCII;
}
self->mb_maxbyte_per_char = pg_mb_maxlen(self->ccsc);
if (currenc)
free(currenc);
}
int
CC_send_client_encoding(ConnectionClass *self, const char * encoding)
{
const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding");
if (encoding && (!dbencoding || stricmp(encoding, dbencoding)))
{
char query[64];
QResultClass *res;
BOOL cmd_success;
SPRINTF_FIXED(query, "set client_encoding to '%s'", encoding);
res = CC_send_query(self, query, NULL, 0, NULL);
cmd_success = QR_command_maybe_successful(res);
QR_Destructor(res);
if (!cmd_success)
return SQL_ERROR;
}
CC_set_client_encoding(self, encoding);
return SQL_SUCCESS;
}
/* This is called by SQLDisconnect also */
char
CC_cleanup(ConnectionClass *self, BOOL keepCommunication)
{
int i;
StatementClass *stmt;
DescriptorClass *desc;
if (self->status == CONN_EXECUTING)
return FALSE;
MYLOG(0, "entering self=%p\n", self);
ENTER_CONN_CS(self);
/* Cancel an ongoing transaction */
/* We are always in the middle of a transaction, */
/* even if we are in auto commit. */
if (self->pqconn)
{
QLOG(0, "PQfinish: %p\n", self->pqconn);
PQfinish(self->pqconn);
self->pqconn = NULL;
}
MYLOG(0, "after PQfinish\n");
/* Free all the stmts on this connection */
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt)
{
stmt->hdbc = NULL; /* prevent any more dbase interactions */
SC_Destructor(stmt);
self->stmts[i] = NULL;
}
}
/* Free all the descs on this connection */
for (i = 0; i < self->num_descs; i++)
{
desc = self->descs[i];
if (desc)
{
DC_get_conn(desc) = NULL; /* prevent any more dbase interactions */
DC_Destructor(desc);
free(desc);
self->descs[i] = NULL;
}
}
/* Check for translation dll */
#ifdef WIN32
if (!keepCommunication && self->translation_handle)
{
FreeLibrary(self->translation_handle);
self->translation_handle = NULL;
}
#endif
if (!keepCommunication)
{
self->status = CONN_NOT_CONNECTED;
self->transact_status = CONN_IN_AUTOCOMMIT;
self->unnamed_prepared_stmt = NULL;
}
if (!keepCommunication)
{
CC_conninfo_init(&(self->connInfo), CLEANUP_FOR_REUSE);
if (self->original_client_encoding)
{
free(self->original_client_encoding);
self->original_client_encoding = NULL;
}
if (self->locale_encoding)
{
free(self->locale_encoding);
self->locale_encoding = NULL;
}
if (self->server_encoding)
{
free(self->server_encoding);
self->server_encoding = NULL;
}
reset_current_schema(self);
}
/* Free cached table info */
CC_clear_col_info(self, TRUE);
if (self->num_discardp > 0 && self->discardp)
{
for (i = 0; i < self->num_discardp; i++)
free(self->discardp[i]);
self->num_discardp = 0;
}
if (self->discardp)
{
free(self->discardp);
self->discardp = NULL;
}
LEAVE_CONN_CS(self);
MYLOG(0, "leaving\n");
return TRUE;
}
int
CC_set_translation(ConnectionClass *self)
{
#ifdef WIN32
CSTR func = "CC_set_translation";
if (self->translation_handle != NULL)
{
FreeLibrary(self->translation_handle);
self->translation_handle = NULL;
}
if (self->connInfo.translation_dll[0] == 0)
return TRUE;
self->translation_option = atoi(self->connInfo.translation_option);
self->translation_handle = LoadLibrary(self->connInfo.translation_dll);
if (self->translation_handle == NULL)
{
CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not load the translation DLL.", func);
return FALSE;
}
self->DataSourceToDriver
= (DataSourceToDriverProc) GetProcAddress(self->translation_handle,
"SQLDataSourceToDriver");
self->DriverToDataSource
= (DriverToDataSourceProc) GetProcAddress(self->translation_handle,
"SQLDriverToDataSource");
if (self->DataSourceToDriver == NULL || self->DriverToDataSource == NULL)
{
CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not find translation DLL functions.", func);
return FALSE;
}
#endif
return TRUE;
}
#ifndef PG_DIAG_SEVERITY_NONLOCALIZED
#define PG_DIAG_SEVERITY_NONLOCALIZED 'V'
#endif
void
handle_pgres_error(ConnectionClass *self, const PGresult *pgres,
const char *comment,
QResultClass *res, BOOL error_not_a_notice)
{
char *errseverity;
char *errseverity_nonloc = NULL;
char *errprimary = NULL;
char *errmsg = NULL;
size_t errmsglen;
char *sqlstate = NULL;
int level = MIN_LOG_LEVEL;
MYLOG(DETAIL_LOG_LEVEL, "entering\n");
sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
if (res && pgres)
{
if (sqlstate)
STRCPY_FIXED(res->sqlstate, sqlstate);
}
if (NULL == pgres &&
NULL == self->pqconn)
{
const char *errmsg = "The connection has been lost";
MYLOG(0, "setting error message=%s\n", errmsg);
QLOG(0, "\t%ssetting error message=%s\n", __FUNCTION__, errmsg);
if (CC_get_errornumber(self) <= 0)
CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, comment);
if (res)
{
QR_set_rstatus(res, PORES_FATAL_ERROR);
QR_set_message(res, errmsg);
}
goto cleanup;
}
/*
* The full message with details and context and everything could
* be obtained with PQresultErrorMessage(). I think that would be
* more user-friendly, but for now, construct a message with
* severity and primary message, which is backwards compatible.
*/
errseverity = PQresultErrorField(pgres, PG_DIAG_SEVERITY);
if (PG_VERSION_GE(self, 9.6))
{
errseverity_nonloc = PQresultErrorField(pgres, PG_DIAG_SEVERITY_NONLOCALIZED);
MYLOG(0, "PG_DIAG_SEVERITY_NONLOCALIZED=%s\n", SAFE_STR(errseverity_nonloc));
}
if (!error_not_a_notice)
{
if (errseverity_nonloc)
{
if (stricmp(errseverity_nonloc, "NOTICE") != 0)
level = 1;
}
else if (errseverity)
{
if (stricmp(errseverity, "NOTICE") != 0)
level = 1;
}
}
errprimary = PQresultErrorField(pgres, PG_DIAG_MESSAGE_PRIMARY);
if (errseverity_nonloc)
QLOG(level, "\t%s(%s) %s '%s'\n", errseverity_nonloc, SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary));
else
QLOG(level, "\t(%s) %s '%s'\n", SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary));
if (errprimary == NULL)
{
/* Hmm. got no primary message. Check if there's a connection error */
if (self->pqconn)
errprimary = PQerrorMessage(self->pqconn);
if (errprimary == NULL)
errprimary = "no error information";
}
if (errseverity && errprimary)
{
errmsglen = strlen(errseverity) + 2 + strlen(errprimary) + 1;
errmsg = malloc(errmsglen);
if (errmsg)
snprintf(errmsg, errmsglen, "%s: %s", errseverity, errprimary);
}
if (errmsg == NULL)
errmsg = errprimary;
if (!error_not_a_notice) /* warning, notice, log etc */
{
MYLOG(0, "notice message %s\n", errmsg);
if (res)
{
if (QR_command_successful(res))
QR_set_rstatus(res, PORES_NONFATAL_ERROR); /* notice or warning */
QR_add_notice(res, errmsg); /* will dup this string */
}
goto cleanup;
}
MYLOG(0, "error message=%s(" FORMAT_SIZE_T ")\n", errmsg, strlen(errmsg));
if (res)
{
QR_set_rstatus(res, PORES_FATAL_ERROR); /* error or fatal */
if (errmsg[0])
QR_set_message(res, errmsg);
QR_set_aborted(res, TRUE);
}
/*
* If the error is continuable after rollback?
*/
if (PQstatus(self->pqconn) == CONNECTION_BAD)
{
CC_set_errornumber(self, CONNECTION_COMMUNICATION_ERROR);
CC_on_abort(self, CONN_DEAD); /* give up the connection */
}
else if ((errseverity_nonloc && strcmp(errseverity_nonloc, "FATAL") == 0) ||
(NULL == errseverity_nonloc && errseverity && strcmp(errseverity, "FATAL") == 0)) /* no */
{
CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_FATAL);
CC_on_abort(self, CONN_DEAD); /* give up the connection */
}
else /* yes */
{
CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_ERROR);
if (CC_is_in_trans(self))
CC_set_in_error_trans(self);
}
/* If any error/warning/notice happened, there should be a message in connection,
* espacially while a connection is being created.
* While a connection is being created, postgresql never returns any error. But in
* our cluster, there maybe a warning or a notice when GTM is in trouble.
*/
CC_set_errormsg(self, errmsg);
cleanup:
if (errmsg != errprimary)
free(errmsg);
LIBPQ_update_transaction_status(self);
}
/*
* This is a libpq notice receiver callback, for handling incoming NOTICE
* messages while processing a query.
*/
typedef struct
{
ConnectionClass *conn;
const char *comment;
QResultClass *res;
} notice_receiver_arg;
void
receive_libpq_notice(void *arg, const PGresult *pgres)
{
if (arg != NULL)
{
notice_receiver_arg *nrarg = (notice_receiver_arg *) arg;
handle_pgres_error(nrarg->conn, pgres, nrarg->comment, nrarg->res, FALSE);
}
}
static char CC_initial_log(ConnectionClass *self, const char *func)
{
const ConnInfo *ci = &self->connInfo;
char *encoding, vermsg[128];
snprintf(vermsg, sizeof(vermsg), "Driver Version='%s,%s'"
#ifdef WIN32
" linking %d"
#ifdef _MT
#ifdef _DLL
" dynamic"
#else
" static"
#endif /* _DLL */
" Multithread"
#else
" Singlethread"
#endif /* _MT */
#ifdef _DEBUG
" Debug"
#endif /* DEBUG */
" library"
#endif /* WIN32 */
"\n", POSTGRESDRIVERVERSION, __DATE__
#ifdef _MSC_VER
, _MSC_VER
#endif /* _MSC_VER */
);
QLOG(0, "%s", vermsg);
MYLOG(DETAIL_LOG_LEVEL, "Global Options: fetch=%d, unknown_sizes=%d, max_varchar_size=%d, max_longvarchar_size=%d\n",
ci->drivers.fetch_max,
ci->drivers.unknown_sizes,
ci->drivers.max_varchar_size,
ci->drivers.max_longvarchar_size);
MYLOG(DETAIL_LOG_LEVEL, " unique_index=%d, use_declarefetch=%d\n",
ci->drivers.unique_index,
ci->drivers.use_declarefetch);
MYLOG(DETAIL_LOG_LEVEL, " text_as_longvarchar=%d, unknowns_as_longvarchar=%d, bools_as_char=%d NAMEDATALEN=%d\n",
ci->drivers.text_as_longvarchar,
ci->drivers.unknowns_as_longvarchar,
ci->drivers.bools_as_char,
TABLE_NAME_STORAGE_LEN);
if (NULL == self->locale_encoding)
{
encoding = check_client_encoding(ci->conn_settings);
CC_set_locale_encoding(self, encoding);
MYLOG(DETAIL_LOG_LEVEL, " extra_systable_prefixes='%s', conn_settings='%s' conn_encoding='%s'\n",
ci->drivers.extra_systable_prefixes,
PRINT_NAME(ci->conn_settings),
encoding ? encoding : "");
if (NULL != encoding)
{
free(encoding);
}
}
if (self->status == CONN_DOWN)
{
CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Connection broken.", func);
return 0;
}
else if (self->status != CONN_NOT_CONNECTED)
{
CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Already connected.", func);
return 0;
}
MYLOG(0, "DSN = '%s', server = '%s', port = '%s', database = '%s'\n", ci->dsn, ci->server, ci->port, ci->database);
return 1;
}
static int handle_show_results(const QResultClass *res);
#define TRANSACTION_ISOLATION "transaction_isolation"
#define ISOLATION_SHOW_QUERY "show " TRANSACTION_ISOLATION
static char
LIBPQ_CC_connect(ConnectionClass *self, char *salt_para)
{
int ret;
CSTR func = "LIBPQ_CC_connect";
QResultClass *res;
MYLOG(0, "entering...\n");
if (0 == CC_initial_log(self, func))
return 0;
if (ret = LIBPQ_connect(self), ret <= 0)
return ret;
res = CC_send_query(self, "SET DateStyle = 'ISO';SET extra_float_digits = 2;" ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
{
handle_show_results(res);
ret = 1;
}
else
ret = 0;
QR_Destructor(res);
return ret;
}
RETCODE
CC_detect_batch_proto(ConnectionClass *self)
{
const char *value = NULL;
const char *query = "select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on';";
PGresult *res = NULL;
ExecStatusType restype;
int cnt = 0;
RETCODE ret = SQL_SUCCESS;
CSTR func = "CC_detect_batch_proto";
mylog("%s: entering...\n", func);
qlog(" [%s : conn = %p, query = select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on']\n",
func, self);
res = PQexec(self->pqconn, query);
if (NULL == res)
{
self->connInfo.backend_support_batch_proto = 0;
mylog("%s: NULL result detected, backend_support_batch_proto set to 0", func);
qlog(" [%s : conn = %p, query result = NULL]\n",
func, self);
return SQL_ERROR;
}
restype = PQresultStatus(res);
if (restype != PGRES_TUPLES_OK &&
restype != PGRES_SINGLE_TUPLE)
{
self->connInfo.backend_support_batch_proto = 0;
PQclear(res);
mylog("%s: invalid result type %d detected, backend_support_batch_proto set to 0", func, restype);
qlog(" [%s : conn = %p, query result type = %d]\n",
func, self, restype);
return SQL_ERROR;
}
if (PQntuples(res) < 1 ||
PQnfields(res) < 1)
{
self->connInfo.backend_support_batch_proto = 0;
mylog("%s: invalid result rowsCount(%d) or colsCount(%d) detected, backend_support_batch_proto set to 0",
func, PQntuples(res), PQnfields(res));
qlog(" [%s : conn = %p, invalid query result : %d rows, %d columns]\n",
func, self, PQntuples(res), PQnfields(res));
PQclear(res);
return SQL_ERROR;
}
value = PQgetvalue(res, 0, 0);
cnt = (value ? atoi(value) : 0);
PQclear(res);
if (cnt < 1)
self->connInfo.backend_support_batch_proto = 0;
else
self->connInfo.backend_support_batch_proto = 1;
if (0 == self->connInfo.backend_support_batch_proto &&
self->connInfo.use_batch_protocol)
{
CC_set_error(self, CONN_UNSUPPORTED_OPTION,
"Backend does not support batch bind protocol, \"" INI_USEBATCHPROTOCOL "\" disabled.", func);
ret = SQL_SUCCESS_WITH_INFO;
}
mylog("%s: query result: %d, backend_support_batch_proto set to %d",
func, cnt, self->connInfo.backend_support_batch_proto);
qlog(" [%s : conn = %p, query result : %d ]\n",
func, self, cnt);
mylog("%s: exiting\n", func);
return ret;
}
char
CC_connect(ConnectionClass *self, char *salt_para)
{
ConnInfo *ci = &(self->connInfo);
CSTR func = "CC_connect";
char ret, *saverr = NULL, retsend;
const char *errmsg = NULL;
RETCODE batchDetectRet = 0;
MYLOG(0, "entering...sslmode=%s\n", self->connInfo.sslmode);
ret = LIBPQ_CC_connect(self, salt_para);
if (ret <= 0)
return ret;
CC_set_translation(self);
/*
* Send any initial settings
*/
/*
* Since these functions allocate statements, and since the connection
* is not established yet, it would violate odbc state transition
* rules. Therefore, these functions call the corresponding local
* function instead.
*/
/* Per Datasource settings */
retsend = CC_send_settings(self, GET_NAME(self->connInfo.conn_settings));
if (CONN_DOWN == self->status)
{
ret = 0;
goto cleanup;
}
if (CC_get_errornumber(self) > 0 &&
NULL != (errmsg = CC_get_errormsg(self)))
saverr = strdup(errmsg);
CC_clear_error(self); /* clear any error */
if (!SQL_SUCCEEDED(CC_lookup_lo(self))) /* a hack to get the oid of our large object oid type */
{
ret = 0;
goto cleanup;
}
/*
* Multibyte handling
*
* Send 'UTF8' when required Unicode behavior, otherwise send
* locale encodings.
*/
CC_clear_error(self);
CC_determine_locale_encoding(self); /* determine the locale_encoding */
#ifdef UNICODE_SUPPORT
if (CC_is_in_unicode_driver(self))
{
if (!SQL_SUCCEEDED(CC_send_client_encoding(self, "UTF8")))
{
ret = 0;
goto cleanup;
}
}
else /* for unicode drivers require ANSI behavior */
#endif /* UNICODE_SUPPORT */
{
if (!SQL_SUCCEEDED(CC_send_client_encoding(self, self->locale_encoding)))
{
ret = 0;
goto cleanup;
}
}
CC_clear_error(self);
if (self->server_isolation != self->isolation)
if (!CC_set_transact(self, self->isolation))
{
ret = 0;
goto cleanup;
}
ci_updatable_cursors_set(ci);
if (CC_get_errornumber(self) > 0)
CC_clear_error(self); /* clear any initial command errors */
self->status = CONN_CONNECTED;
if (CC_is_in_unicode_driver(self)
&& (CC_is_in_ansi_app(self) || 0 < ci->bde_environment))
self->unicode |= CONN_DISALLOW_WCHAR;
MYLOG(0, "conn->unicode=%d Client Encoding='%s' (Code %d)\n", self->unicode, self->original_client_encoding, self->ccsc);
batchDetectRet = CC_detect_batch_proto(self);
if (batchDetectRet == SQL_ERROR)
goto cleanup;
else if (batchDetectRet == SQL_SUCCESS_WITH_INFO)
{
/* Caller will recognize 2 as SQL_SUCCESS_WITH_INFO. */
ret = 2;
goto cleanup;
}
ret = 1;
cleanup:
MYLOG(0, "leaving...%d\n", ret);
if (NULL != saverr)
{
if (ret > 0 && CC_get_errornumber(self) <= 0)
CC_set_error(self, -1, saverr, func);
free(saverr);
}
if (1 == ret && FALSE == retsend)
ret = 2;
return ret;
}
char
CC_add_statement(ConnectionClass *self, StatementClass *stmt)
{
int i;
char ret = TRUE;
MYLOG(0, "self=%p, stmt=%p\n", self, stmt);
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (!self->stmts[i])
{
stmt->hdbc = self;
self->stmts[i] = stmt;
break;
}
}
if (i >= self->num_stmts) /* no more room -- allocate more memory */
{
StatementClass **newstmts;
Int2 new_num_stmts;
new_num_stmts = STMT_INCREMENT + self->num_stmts;
if (new_num_stmts > 0)
newstmts = (StatementClass **)
realloc(self->stmts, sizeof(StatementClass *) * new_num_stmts);
else
newstmts = NULL; /* num_stmts overflowed */
if (!newstmts)
ret = FALSE;
else
{
self->stmts = newstmts;
memset(&self->stmts[self->num_stmts], 0, sizeof(StatementClass *) * STMT_INCREMENT);
stmt->hdbc = self;
self->stmts[self->num_stmts] = stmt;
self->num_stmts = new_num_stmts;
}
}
CONNLOCK_RELEASE(self);
return ret;
}
static void
CC_set_error_statements(ConnectionClass *self)
{
int i;
MYLOG(0, "entering self=%p\n", self);
for (i = 0; i < self->num_stmts; i++)
{
if (NULL != self->stmts[i])
SC_ref_CC_error(self->stmts[i]);
}
}
char
CC_remove_statement(ConnectionClass *self, StatementClass *stmt)
{
int i;
char ret = FALSE;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (self->stmts[i] == stmt && stmt->status != STMT_EXECUTING)
{
self->stmts[i] = NULL;
ret = TRUE;
break;
}
}
CONNLOCK_RELEASE(self);
return ret;
}
char CC_get_escape(const ConnectionClass *self)
{
const char *scf;
static const ConnectionClass *conn = NULL;
scf = PQparameterStatus(self->pqconn, "standard_conforming_strings");
if (self != conn)
{
QLOG(0, "PQparameterStatus(%p, \"standard_conforming_strings\")=%s\n", self->pqconn, SAFE_STR(scf));
conn = self;
}
if (scf == NULL)
{
/* we're connected to a pre-8.1 server, and E'' is not supported */
return '\0';
}
if (strcmp(scf, "on") != 0)
return ESCAPE_IN_LITERAL;
else
return '\0';
}
int CC_get_max_idlen(ConnectionClass *self)
{
int len = self->max_identifier_length;
if (len < 0)
{
QResultClass *res;
res = CC_send_query(self, "show max_identifier_length", NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
len = self->max_identifier_length = QR_get_value_backend_int(res, 0, 0, FALSE);
QR_Destructor(res);
}
MYLOG(0, "max_identifier_length=%d\n", len);
return len < 0 ? 0 : len;
}
static SQLINTEGER
isolation_str_to_enum(const char *str_isolation)
{
SQLINTEGER isolation = 0;
if (strnicmp(str_isolation, "seri", 4) == 0)
isolation = SQL_TXN_SERIALIZABLE;
else if (strnicmp(str_isolation, "repe", 4) == 0)
isolation = SQL_TXN_REPEATABLE_READ;
else if (strnicmp(str_isolation, "read com", 8) == 0)
isolation = SQL_TXN_READ_COMMITTED;
else if (strnicmp(str_isolation, "read unc", 8) == 0)
isolation = SQL_TXN_READ_UNCOMMITTED;
return isolation;
}
static int handle_show_results(const QResultClass *res)
{
int count = 0;
const QResultClass *qres;
ConnectionClass *conn = QR_get_conn(res);
for (qres = res; qres; qres = qres->next)
{
if (!qres->command ||
stricmp(qres->command, "SHOW") != 0)
continue;
if (strcmp(QR_get_fieldname(qres, 0), TRANSACTION_ISOLATION) == 0)
{
conn->server_isolation = isolation_str_to_enum(QR_get_value_backend_text(qres, 0, 0));
MYLOG(0, "isolation %d to be %d\n", conn->server_isolation, conn->isolation);
if (0 == conn->isolation)
conn->isolation = conn->server_isolation;
if (0 == conn->default_isolation)
conn->default_isolation = conn->server_isolation;
count++;
}
}
return count;
}
/*
* This function may not be called as long as ISOLATION_SHOW_QUERY is
* issued in LIBPQ_CC_connect.
*/
SQLUINTEGER CC_get_isolation(ConnectionClass *self)
{
SQLUINTEGER isolation = 0;
QResultClass *res;
res = CC_send_query(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
{
handle_show_results(res);
isolation = self->server_isolation;
}
QR_Destructor(res);
MYLOG(0, "isolation=%d\n", isolation);
return isolation;
}
void
CC_set_error(ConnectionClass *self, int number, const char *message, const char *func)
{
CONNLOCK_ACQUIRE(self);
if (self->__error_message)
free(self->__error_message);
self->__error_number = number;
self->__error_message = message ? strdup(message) : NULL;
if (0 != number)
CC_set_error_statements(self);
if (func && number != 0)
CC_log_error(func, "", self);
CONNLOCK_RELEASE(self);
}
void
CC_set_errormsg(ConnectionClass *self, const char *message)
{
CONNLOCK_ACQUIRE(self);
if (self->__error_message)
free(self->__error_message);
self->__error_message = message ? strdup(message) : NULL;
CONNLOCK_RELEASE(self);
}
char
CC_get_error(ConnectionClass *self, int *number, char **message)
{
int rv;
MYLOG(0, "entering\n");
CONNLOCK_ACQUIRE(self);
if (CC_get_errornumber(self))
{
*number = CC_get_errornumber(self);
*message = CC_get_errormsg(self);
}
rv = (CC_get_errornumber(self) != 0);
CONNLOCK_RELEASE(self);
MYLOG(0, "leaving\n");
return rv;
}
static int CC_close_eof_cursors(ConnectionClass *self)
{
int i, ccount = 0;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return ccount;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (stmt = self->stmts[i], NULL == stmt)
continue;
if (res = SC_get_Result(stmt), NULL == res)
continue;
if (NULL != QR_get_cursor(res) &&
QR_is_withhold(res) &&
QR_once_reached_eof(res))
{
if (QR_get_num_cached_tuples(res) >= QR_get_num_total_tuples(res) ||
SQL_CURSOR_FORWARD_ONLY == stmt->options.cursor_type)
{
QR_close(res);
ccount++;
}
}
}
CONNLOCK_RELEASE(self);
return ccount;
}
static void CC_clear_cursors(ConnectionClass *self, BOOL on_abort)
{
int i;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt && (res = SC_get_Result(stmt)) &&
(NULL != QR_get_cursor(res)))
{
/*
* non-holdable cursors are automatically closed
* at commit time.
* all non-permanent cursors are automatically closed
* at rollback time.
*/
if ((on_abort && !QR_is_permanent(res)) ||
!QR_is_withhold(res))
{
QR_on_close_cursor(res);
}
else if (!QR_is_permanent(res))
{
QResultClass *wres;
char cmd[64];
if (QR_needs_survival_check(res))
{
SPRINTF_FIXED(cmd, "MOVE 0 in \"%s\"", QR_get_cursor(res));
CONNLOCK_RELEASE(self);
wres = CC_send_query(self, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL);
QR_set_no_survival_check(res);
if (QR_command_maybe_successful(wres) &&
CONN_ERROR_IGNORED != CC_get_errornumber(self))
QR_set_permanent(res);
else
QR_set_cursor(res, NULL);
QR_Destructor(wres);
CONNLOCK_ACQUIRE(self);
MYLOG(DETAIL_LOG_LEVEL, "%p->permanent -> %d %p\n", res, QR_is_permanent(res), QR_get_cursor(res));
}
else
QR_set_permanent(res);
}
}
}
CONNLOCK_RELEASE(self);
}
static void CC_mark_cursors_doubtful(ConnectionClass *self)
{
int i;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (NULL != stmt &&
NULL != (res = SC_get_Result(stmt)) &&
NULL != QR_get_cursor(res) &&
!QR_is_permanent(res))
QR_set_survival_check(res);
}
CONNLOCK_RELEASE(self);
}
void CC_on_commit(ConnectionClass *conn)
{
if (conn->on_commit_in_progress)
return;
conn->on_commit_in_progress = 1;
CONNLOCK_ACQUIRE(conn);
if (CC_is_in_trans(conn))
{
CC_set_no_trans(conn);
CC_set_no_manual_trans(conn);
}
CC_svp_init(conn);
CC_start_stmt(conn);
CC_clear_cursors(conn, FALSE);
CONNLOCK_RELEASE(conn);
CC_discard_marked_objects(conn);
CONNLOCK_ACQUIRE(conn);
if (conn->result_uncommitted)
{
CONNLOCK_RELEASE(conn);
ProcessRollback(conn, FALSE, FALSE);
CONNLOCK_ACQUIRE(conn);
conn->result_uncommitted = 0;
}
CONNLOCK_RELEASE(conn);
conn->on_commit_in_progress = 0;
}
void CC_on_abort(ConnectionClass *conn, unsigned int opt)
{
BOOL set_no_trans = FALSE;
MYLOG(0, "entering opt=%x\n", opt);
CONNLOCK_ACQUIRE(conn);
if (0 != (opt & CONN_DEAD)) /* CONN_DEAD implies NO_TRANS also */
opt |= NO_TRANS;
if (CC_is_in_trans(conn))
{
if (0 != (opt & NO_TRANS))
{
CC_set_no_trans(conn);
CC_set_no_manual_trans(conn);
set_no_trans = TRUE;
}
}
CC_svp_init(conn);
CC_start_stmt(conn);
CC_clear_cursors(conn, TRUE);
if (0 != (opt & CONN_DEAD))
{
conn->status = CONN_DOWN;
if (conn->pqconn)
{
CONNLOCK_RELEASE(conn);
QLOG(0, "PQfinish: %p\n", conn->pqconn);
PQfinish(conn->pqconn);
CONNLOCK_ACQUIRE(conn);
conn->pqconn = NULL;
}
}
else if (set_no_trans)
{
CONNLOCK_RELEASE(conn);
CC_discard_marked_objects(conn);
CONNLOCK_ACQUIRE(conn);
}
if (conn->result_uncommitted)
{
CONNLOCK_RELEASE(conn);
ProcessRollback(conn, TRUE, FALSE);
CONNLOCK_ACQUIRE(conn);
conn->result_uncommitted = 0;
}
CONNLOCK_RELEASE(conn);
}
void CC_on_abort_partial(ConnectionClass *conn)
{
MYLOG(0, "entering\n");
CONNLOCK_ACQUIRE(conn);
ProcessRollback(conn, TRUE, TRUE);
CC_discard_marked_objects(conn);
CONNLOCK_RELEASE(conn);
}
static BOOL
is_setting_search_path(const char *query)
{
const char *q = query;
if (strnicmp(q, "set", 3) != 0)
return FALSE;
q += 3;
while (isspace(*q)) q++;
for (; *q;)
{
if (IS_NOT_SPACE(*q))
{
if (strnicmp(q, "search_path", 11) == 0)
return TRUE;
q++;
while (IS_NOT_SPACE(*q))
q++;
}
else
q++;
}
return FALSE;
}
static BOOL
CC_from_PGresult(QResultClass *res, StatementClass *stmt,
ConnectionClass *conn, const char *cursor, PGresult **pgres)
{
BOOL success = TRUE;
if (!QR_from_PGresult(res, stmt, conn, cursor, pgres))
{
QLOG(0, "\tGetting result from PGresult failed\n");
success = FALSE;
if (0 >= CC_get_errornumber(conn))
{
switch (QR_get_rstatus(res))
{
case PORES_NO_MEMORY_ERROR:
CC_set_error(conn, CONN_NO_MEMORY_ERROR, NULL, __FUNCTION__);
break;
case PORES_BAD_RESPONSE:
CC_set_error(conn, CONNECTION_COMMUNICATION_ERROR, "communication error occured", __FUNCTION__);
break;
default:
CC_set_error(conn, CONN_EXEC_ERROR, QR_get_message(res), __FUNCTION__);
break;
}
}
}
return success;
}
int
CC_internal_rollback(ConnectionClass *self, int rollback_type, BOOL ignore_abort)
{
int ret = 0;
char cmd[128];
PGresult *pgres = NULL;
if (!CC_is_in_error_trans(self))
return 1;
switch (rollback_type)
{
case PER_STATEMENT_ROLLBACK:
GenerateSvpCommand(self, INTERNAL_ROLLBACK_OPERATION, cmd, sizeof(cmd));
QLOG(0, "PQexec: %p '%s'\n", self->pqconn, cmd);
pgres = PQexec(self->pqconn, cmd);
switch (PQresultStatus(pgres))
{
case PGRES_COMMAND_OK:
QLOG(0, "\tok: - 'C' - %s\n", PQcmdStatus(pgres));
case PGRES_NONFATAL_ERROR:
ret = 1;
if (ignore_abort)
CC_set_no_error_trans(self);
LIBPQ_update_transaction_status(self);
break;
default:
handle_pgres_error(self, pgres, __FUNCTION__, NULL, TRUE);
break;
}
break;
case PER_QUERY_ROLLBACK:
SPRINTF_FIXED(cmd, "%s TO %s;%s %s"
, rbkcmd, per_query_svp , rlscmd, per_query_svp);
QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, cmd);
PQsendQuery(self->pqconn, cmd);
ret = 0;
while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL)
{
switch (PQresultStatus(pgres))
{
case PGRES_COMMAND_OK:
QLOG(0, "\tok: - 'C' - %s\n", PQcmdTuples(pgres));
ret = 1;
break;
case PGRES_NONFATAL_ERROR:
ret = 1;
default:
handle_pgres_error(self, pgres, __FUNCTION__, NULL, !ret);
}
}
if (!ret)
{
if (ignore_abort)
CC_set_no_error_trans(self);
else
MYLOG(0, " return error\n");
}
LIBPQ_update_transaction_status(self);
break;
}
if (pgres)
PQclear(pgres);
return ret;
}
/*
* The "result_in" is only used by QR_next_tuple() to fetch another group of rows into
* the same existing QResultClass (this occurs when the tuple cache is depleted and
* needs to be re-filled).
*
* The "cursor" is used by SQLExecute to associate a statement handle as the cursor name
* (i.e., C3326857) for SQL select statements. This cursor is then used in future
* 'declare cursor C3326857 for ...' and 'fetch 100 in C3326857' statements.
*
* * If issue_begin, send "BEGIN"
* * if needed, send "SAVEPOINT ..."
* * Send "query", read result
* * Send appendq, read result.
*
*/
QResultClass *
CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UDWORD flag, StatementClass *stmt, const char *appendq)
{
CSTR func = "CC_send_query";
QResultClass *cmdres = NULL,
*retres = NULL,
*res = NULL;
BOOL ignore_abort_on_conn = ((flag & IGNORE_ABORT_ON_CONN) != 0),
create_keyset = ((flag & CREATE_KEYSET) != 0),
issue_begin = ((flag & GO_INTO_TRANSACTION) != 0 && !CC_is_in_trans(self)),
rollback_on_error, query_rollback, end_with_commit,
read_only, prepend_savepoint = FALSE,
ignore_roundtrip_time = ((self->connInfo.extra_opts & BIT_IGNORE_ROUND_TRIP_TIME) != 0);
char *ptr;
BOOL ReadyToReturn = FALSE,
query_completed = FALSE,
aborted = FALSE,
used_passed_result_object = FALSE,
discard_next_begin = FALSE,
discard_next_savepoint = FALSE,
discard_next_release = FALSE,
consider_rollback;
BOOL discardTheRest = FALSE;
int func_cs_count = 0;
PQExpBufferData query_buf = {0};
size_t query_len;
/* QR_set_command() dups this string so doesn't need static */
char *cmdbuffer;
PGresult *pgres = NULL;
notice_receiver_arg nrarg;
if (appendq)
{
MYLOG(0, "conn=%p, query='%s'+'%s'\n", self, query, appendq);
}
else
{
MYLOG(0, "conn=%p, query='%s'\n", self, query);
}
if (!self->pqconn)
{
PQExpBufferData pbuf = {0};
initPQExpBuffer(&pbuf);
appendPQExpBuffer(&pbuf, "The connection is down\nFailed to send '%s'", query);
CC_set_error(self, CONNECTION_COULD_NOT_SEND, pbuf.data, func);
termPQExpBuffer(&pbuf);
return NULL;
}
ENTER_INNER_CONN_CS(self, func_cs_count);
/* Indicate that we are sending a query to the backend */
if ((NULL == query) || (query[0] == '\0'))
{
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
return NULL;
}
/*
* In case the round trip time can be ignored, the query
* and the appeneded query would be issued separately.
* Otherwise a multiple command query would be issued.
*/
if (appendq && ignore_roundtrip_time)
{
res = CC_send_query_append(self, query, qi, flag, stmt, NULL);
if (QR_command_maybe_successful(res))
{
cmdres = CC_send_query_append(self, appendq, qi, flag & (~(GO_INTO_TRANSACTION)), stmt, NULL);
if (QR_command_maybe_successful(cmdres))
res->next = cmdres;
else
{
QR_Destructor(res);
res = cmdres;
}
}
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
return res;
}
rollback_on_error = (flag & ROLLBACK_ON_ERROR) != 0;
end_with_commit = (flag & END_WITH_COMMIT) != 0;
read_only = (flag & READ_ONLY_QUERY) != 0;
#define return DONT_CALL_RETURN_FROM_HERE???
consider_rollback = (issue_begin || (CC_is_in_trans(self) && !CC_is_in_error_trans(self)) || strnicmp(query, "begin", 5) == 0);
if (rollback_on_error)
rollback_on_error = consider_rollback;
query_rollback = (rollback_on_error && !end_with_commit && PG_VERSION_GE(self, 8.0));
if (!query_rollback && consider_rollback && !end_with_commit)
{
if (stmt)
{
StatementClass *astmt = SC_get_ancestor(stmt);
unsigned int svpopt = 0;
if (read_only)
svpopt |= SVPOPT_RDONLY;
if (!ignore_roundtrip_time)
svpopt |= SVPOPT_REDUCE_ROUNDTRIP;
if (!CC_started_rbpoint(self))
{
if (SQL_ERROR == SetStatementSvp(astmt, svpopt))
{
SC_set_error(stmt, STMT_INTERNAL_ERROR, "internal savepoint error", func);
goto cleanup;
}
}
}
}
/* prepend internal savepoint command ? */
if (PREPEND_IN_PROGRESS == self->internal_op)
prepend_savepoint = TRUE;
/* append all these together, to avoid round-trips */
query_len = strlen(query);
MYLOG(0, "query_len=" FORMAT_SIZE_T "\n", query_len);
initPQExpBuffer(&query_buf);
/* issue_begin, query_rollback and prepend_savepoint are exclusive */
if (issue_begin)
{
appendPQExpBuffer(&query_buf, "%s;", bgncmd);
discard_next_begin = TRUE;
}
else if (query_rollback && !self->connInfo.drivers.for_extension_connector)
{
appendPQExpBuffer(&query_buf, "%s %s;", svpcmd, per_query_svp);
discard_next_savepoint = TRUE;
}
else if (prepend_savepoint)
{
char prepend_cmd[128];
GenerateSvpCommand(self, INTERNAL_SAVEPOINT_OPERATION, prepend_cmd, sizeof(prepend_cmd));
appendPQExpBuffer(&query_buf, "%s;", prepend_cmd);
self->internal_op = SAVEPOINT_IN_PROGRESS;
}
appendPQExpBufferStr(&query_buf, query);
if (appendq)
{
appendPQExpBuffer(&query_buf, ";%s", appendq);
}
if (query_rollback && !self->connInfo.drivers.for_extension_connector)
{
appendPQExpBuffer(&query_buf, ";%s %s", rlscmd, per_query_svp);
}
if (PQExpBufferDataBroken(query_buf))
{
CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", "");
goto cleanup;
}
/* Set up notice receiver */
nrarg.conn = self;
nrarg.comment = func;
nrarg.res = NULL;
PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, &nrarg);
QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, query_buf.data);
if (!PQsendQuery(self->pqconn, query_buf.data))
{
char *errmsg = PQerrorMessage(self->pqconn);
QLOG(0, "\nCommunication Error: %s\n", SAFE_STR(errmsg));
CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, func);
goto cleanup;
}
PQsetSingleRowMode(self->pqconn);
cmdres = qi ? qi->result_in : NULL;
if (cmdres)
used_passed_result_object = TRUE;
else
{
cmdres = QR_Constructor();
if (!cmdres)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
goto cleanup;
}
}
res = cmdres;
if (qi)
{
res->cmd_fetch_size = qi->fetch_size;
res->cache_size = qi->row_size;
}
nrarg.res = res;
while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL)
{
int status = PQresultStatus(pgres);
if (discardTheRest)
continue;
switch (status)
{
case PGRES_COMMAND_OK:
/* portal query command, no tuples returned */
/* read in the return message from the backend */
cmdbuffer = PQcmdStatus(pgres);
QLOG(0, "\tok: - 'C' - %s\n", cmdbuffer);
if (query_completed) /* allow for "show" style notices */
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
res = res->next;
nrarg.res = res;
}
MYLOG(0, " setting cmdbuffer = '%s'\n", cmdbuffer);
my_trim(cmdbuffer); /* get rid of trailing space */
if (strnicmp(cmdbuffer, bgncmd, strlen(bgncmd)) == 0)
{
CC_set_in_trans(self);
CC_set_in_manual_trans(self);
if (discard_next_begin) /* discard the automatically issued BEGIN */
{
discard_next_begin = FALSE;
break; /* discard the result */
}
}
/*
* There are 2 risks to RELEASE an internal savepoint.
* One is to RELEASE the savepoint invalitated
* due to manually issued ROLLBACK or RELEASE.
* Another is to invalitate manual SAVEPOINTs unexpectedly
* by RELEASing the internal savepoint.
*/
else if (strnicmp(cmdbuffer, svpcmd, strlen(svpcmd)) == 0)
{
if (discard_next_savepoint)
{
discard_next_savepoint = FALSE;
discard_next_release = TRUE;
MYLOG(DETAIL_LOG_LEVEL, "Discarded a SAVEPOINT result\n");
break; /* discard the result */
}
if (SAVEPOINT_IN_PROGRESS == self->internal_op)
{
CC_start_rbpoint(self);
self->internal_op = 0;
break; /* discard the result */
}
/* Don't release the internal savepoint */
self->internal_svp = 0;
}
else if (strnicmp(cmdbuffer, rbkcmd, strlen(rbkcmd)) == 0)
{
CC_mark_cursors_doubtful(self);
CC_set_in_error_trans(self); /* mark the transaction error in case of manual rollback */
self->internal_svp = 0; /* possibly an internal savepoint is invalid */
self->opt_previous = 0; /* unknown */
CC_init_opt_in_progress(self);
}
else if (strnicmp(cmdbuffer, rlscmd, strlen(rlscmd)) == 0)
{
if (discard_next_release)
{
MYLOG(DETAIL_LOG_LEVEL, "Discarded a RELEASE result\n");
discard_next_release = FALSE;
break; /* discard the result */
}
self->internal_svp = 0;
if (SAVEPOINT_IN_PROGRESS == self->internal_op)
break; /* discard the result */
}
/*
* DROP TABLE or ALTER TABLE may change
* the table definition. So clear the
* col_info cache though it may be too simple.
*/
else if (strnicmp(cmdbuffer, "DROP TABLE", 10) == 0 ||
strnicmp(cmdbuffer, "ALTER TABLE", 11) == 0)
CC_clear_col_info(self, FALSE);
else
{
ptr = strrchr(cmdbuffer, ' ');
if (ptr)
res->recent_processed_row_count = atoi(ptr + 1);
else
res->recent_processed_row_count = -1;
if (self->current_schema_valid &&
strnicmp(cmdbuffer, "SET", 3) == 0)
{
if (is_setting_search_path(query))
reset_current_schema(self);
}
}
if (QR_command_successful(res))
QR_set_rstatus(res, PORES_COMMAND_OK);
QR_set_command(res, cmdbuffer);
query_completed = TRUE;
MYLOG(0, " returning res = %p\n", res);
break;
case PGRES_EMPTY_QUERY:
/* We return the empty query */
QR_set_rstatus(res, PORES_EMPTY_QUERY);
break;
case PGRES_NONFATAL_ERROR:
handle_pgres_error(self, pgres, "send_query", res, FALSE);
break;
case PGRES_BAD_RESPONSE:
case PGRES_FATAL_ERROR:
handle_pgres_error(self, pgres, "send_query", res, TRUE);
/* We should report that an error occured. Zoltan */
aborted = TRUE;
query_completed = TRUE;
break;
case PGRES_TUPLES_OK:
QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres));
case PGRES_SINGLE_TUPLE:
if (query_completed)
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
if (create_keyset)
{
QR_set_haskeyset(res->next);
if (stmt)
res->next->num_key_fields = stmt->num_key_fields;
}
MYLOG(0, " 'T' no result_in: res = %p\n", res->next);
res = res->next;
nrarg.res = res;
if (qi)
{
QR_set_cache_size(res, qi->row_size);
res->cmd_fetch_size = qi->fetch_size;
}
}
if (!used_passed_result_object)
{
const char *cursor = qi ? qi->cursor : NULL;
if (create_keyset)
{
QR_set_haskeyset(res);
if (stmt)
res->num_key_fields = stmt->num_key_fields;
if (cursor && cursor[0])
QR_set_synchronize_keys(res);
}
if (CC_from_PGresult(res, stmt, self, cursor, &pgres))
query_completed = TRUE;
else
{
aborted = TRUE;
if (QR_command_maybe_successful(res))
retres = NULL;
else
retres = cmdres;
}
}
else
{ /* next fetch, so reuse an existing result */
const char *cursor = res->cursor_name;
/*
* called from QR_next_tuple and must return
* immediately.
*/
if (!CC_from_PGresult(res, stmt, NULL, cursor, &pgres))
{
retres = NULL;
break;
}
retres = cmdres;
}
if (res->rstatus == PORES_TUPLES_OK && res->notice)
{
QR_set_rstatus(res, PORES_NONFATAL_ERROR);
}
else if (PORES_NO_MEMORY_ERROR == QR_get_rstatus(res))
{
PGcancel *cancel = PQgetCancel(self->pqconn);
char dummy[8];
discardTheRest = TRUE;
if (cancel != NULL)
{
PQcancel(cancel, dummy, sizeof(dummy));
PQfreeCancel(cancel);
}
else
goto cleanup;
}
break;
case PGRES_COPY_OUT:
/* XXX: We used to read from stdin here. Does that make any sense? */
case PGRES_COPY_IN:
if (query_completed)
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
res = res->next;
nrarg.res = res;
}
QR_set_rstatus(res, PORES_COPY_IN);
ReadyToReturn = TRUE;
retres = cmdres;
break;
case PGRES_COPY_BOTH:
default:
/* skip the unexpected response if possible */
CC_set_error(self, CONNECTION_BACKEND_CRAZY, "Unexpected result status (send_query)", func);
handle_pgres_error(self, pgres, "send_query", res, TRUE);
CC_on_abort(self, CONN_DEAD);
MYLOG(0, " error - %s\n", CC_get_errormsg(self));
ReadyToReturn = TRUE;
retres = NULL;
break;
}
if (pgres)
{
PQclear(pgres);
pgres = NULL;
}
}
cleanup:
if (self->pqconn)
PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, NULL);
if (pgres != NULL)
{
PQclear(pgres);
pgres = NULL;
}
MYLOG(DETAIL_LOG_LEVEL, " rollback_on_error=%d CC_is_in_trans=%d discard_next_savepoint=%d query_rollback=%d\n", rollback_on_error, CC_is_in_trans(self), discard_next_savepoint, query_rollback);
if (rollback_on_error && CC_is_in_trans(self) && !discard_next_savepoint)
{
if (query_rollback)
{
if (!CC_internal_rollback(self, PER_QUERY_ROLLBACK, ignore_abort_on_conn))
ignore_abort_on_conn = FALSE;
}
else if (CC_is_in_error_trans(self))
{
QLOG(0, "PQexec: %p '%s'\n", self->pqconn, rbkcmd);
pgres = PQexec(self->pqconn, rbkcmd);
}
/*
* XXX: we don't check the result here. Should we? We're rolling back,
* so it's not clear what else we can do on error. Giving an error
* message to the application would be nice though.
*/
if (pgres != NULL)
{
PQclear(pgres);
pgres = NULL;
}
}
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
#undef return
/*
* Break before being ready to return.
*/
if (!ReadyToReturn)
retres = cmdres;
if (!PQExpBufferDataBroken(query_buf))
termPQExpBuffer(&query_buf);
/*
* Cleanup garbage results before returning.
*/
if (cmdres && retres != cmdres && !used_passed_result_object)
QR_Destructor(cmdres);
/*
* Cleanup the aborted result if specified
*/
if (retres)
{
if (aborted)
{
/** if (ignore_abort_on_conn)
{
if (!used_passed_result_object)
{
QR_Destructor(retres);
retres = NULL;
}
} **/
if (retres)
{
/*
* discard results other than errors.
*/
QResultClass *qres;
for (qres = retres; qres->next; qres = retres)
{
if (QR_get_aborted(qres))
break;
retres = qres->next;
qres->next = NULL;
QR_Destructor(qres);
}
/*
* If error message isn't set
*/
if (ignore_abort_on_conn)
{
CC_set_errornumber(self, CONN_ERROR_IGNORED);
if (retres)
QR_set_rstatus(retres, PORES_NONFATAL_ERROR);
MYLOG(DETAIL_LOG_LEVEL, " ignored abort_on_conn\n");
}
else if (retres)
{
if (NULL == CC_get_errormsg(self) ||
!CC_get_errormsg(self)[0])
CC_set_errormsg(self, QR_get_message(retres));
if (!self->sqlstate[0])
STRCPY_FIXED(self->sqlstate, retres->sqlstate);
}
}
}
}
/*
* Update our copy of the transaction status.
*
* XXX: Once we stop using the socket directly, and do everything with
* libpq, we can get rid of the transaction_status field altogether
* and always ask libpq for it.
*/
LIBPQ_update_transaction_status(self);
if (retres)
QR_set_conn(retres, self);
return retres;
}
#define MAX_SEND_FUNC_ARGS 3
static const char *func_param_str[MAX_SEND_FUNC_ARGS + 1] =
{
"()",
"($1)",
"($1, $2)",
"($1, $2, $3)"
};
static
Int8 odbc_hton64(Int8 h64)
{
union {
Int8 n64;
UInt4 i32[2];
} u;
u.i32[0] = htonl((UInt4) (h64 >> 32));
u.i32[1] = htonl((UInt4) h64);
return u.n64;
}
static
Int8 odbc_ntoh64(Int8 n64)
{
union {
Int8 h64;
UInt4 i32[2];
} u;
Int8 result;
u.h64 = n64;
result = ntohl(u.i32[0]);
result <<= 32;
result |= ntohl(u.i32[1]);
return result;
}
int
CC_send_function(ConnectionClass *self, const char *fn_name, void *result_buf, int *actual_result_len, int result_is_int, LO_ARG *args, int nargs)
{
int i;
int ret = FALSE;
int func_cs_count = 0;
char sqlbuffer[1000];
PGresult *pgres = NULL;
Oid paramTypes[MAX_SEND_FUNC_ARGS];
char *paramValues[MAX_SEND_FUNC_ARGS];
int paramLengths[MAX_SEND_FUNC_ARGS];
int paramFormats[MAX_SEND_FUNC_ARGS];
Int4 intParamBufs[MAX_SEND_FUNC_ARGS];
Int8 int8ParamBufs[MAX_SEND_FUNC_ARGS];
MYLOG(0, "conn=%p, fn_name=%s, result_is_int=%d, nargs=%d\n", self, fn_name, result_is_int, nargs);
/* Finish the pending extended query first */
#define return DONT_CALL_RETURN_FROM_HERE???
ENTER_INNER_CONN_CS(self, func_cs_count);
SPRINTF_FIXED(sqlbuffer, "SELECT pg_catalog.%s%s", fn_name,
func_param_str[nargs]);
for (i = 0; i < nargs; ++i)
{
MYLOG(0, " arg[%d]: len = %d, isint = %d, integer = " FORMATI64 ", ptr = %p\n", i, args[i].len, args[i].isint, args[i].isint == 2 ? args[i].u.integer64 : args[i].u.integer, args[i].u.ptr);
/* integers are sent as binary, others as text */
if (args[i].isint == 2)
{
paramTypes[i] = PG_TYPE_INT8;
int8ParamBufs[i] = odbc_hton64(args[i].u.integer64);
paramValues[i] = (char *) &int8ParamBufs[i];
paramLengths[i] = 8;
paramFormats[i] = 1;
}
else if (args[i].isint)
{
paramTypes[i] = PG_TYPE_INT4;
intParamBufs[i] = htonl(args[i].u.integer);
paramValues[i] = (char *) &intParamBufs[i];
paramLengths[i] = 4;
paramFormats[i] = 1;
}
else
{
paramTypes[i] = 0;
paramValues[i] = args[i].u.ptr;
paramLengths[i] = args[i].len;
paramFormats[i] = 1;
}
}
QLOG(0, "PQexecParams: %p '%s' nargs=%d\n", self->pqconn, sqlbuffer, nargs);
pgres = PQexecParams(self->pqconn, sqlbuffer, nargs,
paramTypes, (const char * const *) paramValues,
paramLengths, paramFormats, 1);
MYLOG(0, "done sending function\n");
if (PQresultStatus(pgres) == PGRES_TUPLES_OK)
QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres));
else
{
handle_pgres_error(self, pgres, "send_query", NULL, TRUE);
goto cleanup;
}
if (PQnfields(pgres) != 1 || PQntuples(pgres) != 1)
{
CC_set_errormsg(self, "unexpected result set from large_object function");
goto cleanup;
}
*actual_result_len = PQgetlength(pgres, 0, 0);
QLOG(0, "\tgot result with length: %d\n", *actual_result_len);
if (*actual_result_len > 0)
{
char *value = PQgetvalue(pgres, 0, 0);
if (result_is_int == 2)
{
Int8 int8val;
memcpy(&int8val, value, sizeof(Int8));
int8val = odbc_ntoh64(int8val);
memcpy(result_buf, &int8val, sizeof(Int8));
MYLOG(0, "int8 result=" FORMATI64 "\n", int8val);
}
else if (result_is_int)
{
Int4 int4val;
memcpy(&int4val, value, sizeof(Int4));
int4val = ntohl(int4val);
memcpy(result_buf, &int4val, sizeof(Int4));
}
else
memcpy(result_buf, value, *actual_result_len);
}
ret = TRUE;
cleanup:
#undef return
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
if (pgres)
PQclear(pgres);
return ret;
}
char
CC_send_settings(ConnectionClass *self, const char *set_query)
{
HSTMT hstmt;
RETCODE result;
char status = TRUE;
char *cs,
*ptr;
#ifdef HAVE_STRTOK_R
char *last;
#endif /* HAVE_STRTOK_R */
CSTR func = "CC_send_settings";
MYLOG(0, "entering...\n");
if (set_query == NULL) return TRUE;
/*
* This function must use the local odbc API functions since the odbc state
* has not transitioned to "connected" yet.
*/
result = PGAPI_AllocStmt(self, &hstmt, 0);
if (!SQL_SUCCEEDED(result))
return FALSE;
/* non-external handle ensures no BEGIN/COMMIT/ABORT stuff */
cs = strdup(set_query);
if (cs == NULL)
{
CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", func);
return FALSE;
}
#ifdef HAVE_STRTOK_R
ptr = strtok_r(cs, ";", &last);
#else
ptr = strtok(cs, ";");
#endif /* HAVE_STRTOK_R */
while (ptr)
{
result = PGAPI_ExecDirect(hstmt, (SQLCHAR *) ptr, SQL_NTS, 0);
if (!SQL_SUCCEEDED(result))
status = FALSE;
MYLOG(0, "result %d, status %d from '%s'\n", result, status, ptr);
#ifdef HAVE_STRTOK_R
ptr = strtok_r(NULL, ";", &last);
#else
ptr = strtok(NULL, ";");
#endif /* HAVE_STRTOK_R */
}
free(cs);
PGAPI_FreeStmt(hstmt, SQL_DROP);
return status;
}
/*
* This function is just a hack to get the oid of our Large Object oid type.
* If a real Large Object oid type is made part of Postgres, this function
* will go away and the define 'PG_TYPE_LO' will be updated.
*/
static SQLRETURN
CC_lookup_lo(ConnectionClass *self)
{
SQLRETURN ret = SQL_SUCCESS;
QResultClass *res;
MYLOG(0, "entering...\n");
res = CC_send_query(self, "select oid, typbasetype from pg_type where typname = '" PG_TYPE_LO_NAME "'",
NULL, READ_ONLY_QUERY, NULL);
if (!QR_command_maybe_successful(res))
ret = SQL_ERROR;
else if (QR_command_maybe_successful(res) && QR_get_num_cached_tuples(res) > 0)
{
OID basetype;
self->lobj_type = QR_get_value_backend_int(res, 0, 0, NULL);
basetype = QR_get_value_backend_int(res, 0, 1, NULL);
if (PG_TYPE_OID == basetype)
self->lo_is_domain = 1;
else if (0 != basetype)
self->lobj_type = 0;
}
QR_Destructor(res);
MYLOG(0, "Got the large object oid: %d\n", self->lobj_type);
return ret;
}
/*
* This function initializes the version of PostgreSQL from
* connInfo.protocol that we're connected to.
* h-inoue 01-2-2001
*/
void
CC_initialize_pg_version(ConnectionClass *self)
{
STRCPY_FIXED(self->pg_version, "7.4");
self->pg_version_major = 7;
self->pg_version_minor = 4;
}
void
CC_log_error(const char *func, const char *desc, const ConnectionClass *self)
{
#define NULLCHECK(a) (a ? a : "(NULL)")
if (self)
{
MYLOG(0, "CONN ERROR: func=%s, desc='%s', errnum=%d, errmsg='%s'\n", func, desc, self->__error_number, NULLCHECK(self->__error_message));
MYLOG(DETAIL_LOG_LEVEL, " ------------------------------------------------------------\n");
MYLOG(DETAIL_LOG_LEVEL, " henv=%p, conn=%p, status=%u, num_stmts=%d\n", self->henv, self, self->status, self->num_stmts);
MYLOG(DETAIL_LOG_LEVEL, " pqconn=%p, stmts=%p, lobj_type=%d\n", self->pqconn, self->stmts, self->lobj_type);
}
else
{
MYLOG(0, "INVALID CONNECTION HANDLE ERROR: func=%s, desc='%s'\n", func, desc);
}
}
/*
* This doesn't really return the CURRENT SCHEMA
* but there's no alternative.
*/
const char *
CC_get_current_schema(ConnectionClass *conn)
{
if (!conn->current_schema_valid)
{
QResultClass *res;
if (res = CC_send_query(conn, "select current_schema()", NULL, READ_ONLY_QUERY, NULL), QR_command_maybe_successful(res))
{
if (QR_get_num_total_tuples(res) == 1)
{
char *curschema = QR_get_value_backend_text(res, 0, 0);
if (curschema)
conn->current_schema = strdup(curschema);
}
if (conn->current_schema)
conn->current_schema_valid = TRUE;
}
QR_Destructor(res);
}
return (const char *) conn->current_schema;
}
int CC_mark_a_object_to_discard(ConnectionClass *conn, int type, const char *plan)
{
int cnt = conn->num_discardp + 1, plansize;
char *pname;
CC_REALLOC_return_with_error(conn->discardp, char *,
(cnt * sizeof(char *)), conn, "Couldn't alloc discardp.", -1);
plansize = strlen(plan) + 2;
CC_MALLOC_return_with_error(pname, char, plansize,
conn, "Couldn't alloc discardp mem.", -1);
pname[0] = (char) type; /* 's':prepared statement 'p':cursor */
strncpy_null(pname + 1, plan, plansize - 1);
conn->discardp[conn->num_discardp++] = pname;
return 1;
}
int CC_discard_marked_objects(ConnectionClass *conn)
{
int i, cnt;
QResultClass *res;
char *pname, cmd[64];
if ((cnt = conn->num_discardp) <= 0)
return 0;
for (i = cnt - 1; i >= 0; i--)
{
pname = conn->discardp[i];
if ('s' == pname[0])
SPRINTF_FIXED(cmd, "DEALLOCATE \"%s\"", pname + 1);
else
SPRINTF_FIXED(cmd, "CLOSE \"%s\"", pname + 1);
res = CC_send_query(conn, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL);
QR_Destructor(res);
free(conn->discardp[i]);
/* CodeDEX with CID=12044 */
conn->discardp[i] = NULL;
conn->num_discardp--;
}
return 1;
}
static void
LIBPQ_update_transaction_status(ConnectionClass *self)
{
if (!self->pqconn)
return;
switch (PQtransactionStatus(self->pqconn))
{
case PQTRANS_IDLE:
if (CC_is_in_trans(self))
{
if (CC_is_in_error_trans(self))
CC_on_abort(self, NO_TRANS);
else
CC_on_commit(self);
}
break;
case PQTRANS_INTRANS:
CC_set_in_trans(self);
if (CC_is_in_error_trans(self))
{
CC_set_no_error_trans(self);
CC_on_abort_partial(self);
}
break;
case PQTRANS_INERROR:
CC_set_in_trans(self);
CC_set_in_error_trans(self);
break;
case PQTRANS_ACTIVE:
/*
* A query is still executing. It might have already aborted,
* but all we know for sure is that we're in a transaction.
*/
CC_set_in_trans(self);
break;
default: /* unknown status */
break;
}
}
static void CC_getLibpath(char *libpath, int libpathLen)
{
const char *fname = NULL;
int fnameIndex = 0;
int libpathIndex = 0;
#ifndef WIN32
Dl_info dl_info;
int ret = 0;
if (libpath == NULL || libpathLen <= 0)
{
return ;
}
ret = dladdr((void*)CC_getLibpath, &dl_info);
if (ret != 0)
{
fname = dl_info.dli_fname;
}
#else
MEMORY_BASIC_INFORMATION mbi;
char fpath[4096] = {'\0'};
if (libpath == NULL || libpathLen <= 0)
{
return ;
}
if ((VirtualQuery(CC_getLibpath, &mbi, sizeof(mbi)) != 0) &&
GetModuleFileName((HMODULE)mbi.AllocationBase, fpath, sizeof(fpath)))
{
fname = fpath;
}
#endif
while ((fname != NULL) && (fname[fnameIndex] != '\0') && (libpathIndex < libpathLen - 1))
{
if (fname[fnameIndex] == '\'')
{
libpath[libpathIndex++] = '\'';
}
else if ((fname[fnameIndex] == '"') || (fname[fnameIndex] == '\\'))
{
libpath[libpathIndex++] = '\\';
}
libpath[libpathIndex++] = fname[fnameIndex++];
}
libpath[libpathIndex] = '\0';
}
static void CC_getOSUser(char *username, int usernameLen)
{
#ifndef WIN32
struct passwd *pw = NULL;
if (username == NULL || usernameLen <= 0)
{
return ;
}
pw = getpwuid(geteuid());
if (pw == NULL)
{
username[0] = '\0';
}
else
{
strncpy(username, pw->pw_name, strlen(pw->pw_name));
}
#else
DWORD len = usernameLen;
if (username == NULL || usernameLen <= 0)
{
return ;
}
if(!GetUserName(username, &len))
{
username[0] = '\0';
return ;
}
#endif
}
#define PROTOCOL3_OPTS_MAX 30
int split_host_or_port_with_limit(const char *str, char *result_array[MAX_PARTS], int* total_length)
{
int count = 0;
char *lasts = NULL;
char *dup_str = strdup(str);
char *token = strtok_r(dup_str, ",", &lasts);
while (token && count < MAX_PARTS) {
result_array[count] = strdup(token);
if (!result_array[count]) {
for (int i = 0; i < count; i++) {
free(result_array[i]);
}
free(dup_str);
return -1;
}
*total_length += strlen(result_array[count]);
count++;
token = strtok_r(NULL, ",", &lasts);
}
free(dup_str);
for (int i = count;i < MAX_PARTS;i++) {
result_array[i] = NULL;
}
return count;
}
char* generate_conninfo_URL_by_ConnInfo(ConnInfo* ci, int* host_number, int* port_number)
{
char *host_array[MAX_PARTS] = {0};
char *port_array[MAX_PARTS] = {0};
int host_length = 0;
int port_length = 0;
if (!ci->server || !ci->port) {
return NULL;
}
*host_number = split_host_or_port_with_limit(ci->server, host_array, &host_length);
*port_number = split_host_or_port_with_limit(ci->port, port_array, &port_length);
if ((-1 == *host_number || -1 == *port_number) ||
((*host_number != *port_number) && *host_number != 1 && *port_number != 1)) {
for (int i = 0; i < MAX_PARTS; i++) {
if (host_array[i] != NULL) {
free(host_array[i]);
}
if (port_array[i] != NULL) {
free(port_array[i]);
}
}
return NULL;
}
size_t total_length = host_length + port_length + *host_number * 2 + *port_number * 2 +
(ci->username ? strlen(ci->username) : 0) +
(ci->password.name ? strlen(ci->password.name) : 0) +
(ci->database ? strlen(ci->database) : 0) + EXTRA_ROOM;
char* temp_URL = (char*)malloc(total_length);
if (!temp_URL) {
return NULL;
}
memset(temp_URL, 0, total_length);
int valid_count = *host_number > *port_number ? *host_number : *port_number;
(void)snprintf(temp_URL, -1, "postgres://%s@", ci->username);
for (int i = 0; i < valid_count; i++) {
strcat(temp_URL, (*host_number == 1) ? host_array[0] : host_array[i]);
strcat(temp_URL, ":");
strcat(temp_URL, (*port_number == 1) ? port_array[0] : port_array[i]);
if (i != valid_count - 1) {
strcat(temp_URL, ",");
}
}
strcat(temp_URL, "/");
strcat(temp_URL, ci->database);
char target_session_attrs[MEDIUM_REGISTRY_LEN] = "?target_session_attrs=";
if (*host_number == 1 && *port_number == 1) {
strcat(target_session_attrs, "any");
} else if ('\0' == ci->target_session_attrs[0]) {
strcat(target_session_attrs, "read-write");
} else {
strcat(target_session_attrs, ci->target_session_attrs);
}
strcat(temp_URL, target_session_attrs);
strcat(temp_URL, "&password=");
strcat(temp_URL, ci->password.name);
if ('\0' != ci->sslmode) {
strcat(temp_URL, "&sslmode=");
strcat(temp_URL, ci->sslmode);
}
if ('\0' != ci->connect_timeout[0]) {
char connect_timeout[MEDIUM_REGISTRY_LEN] = "&connect_timeout=";
strcat(connect_timeout, ci->connect_timeout);
strcat(temp_URL, connect_timeout);
}
if ('\0' != ci->rw_timeout[0]) {
char rw_timeout[MEDIUM_REGISTRY_LEN] = "&rw_timeout=";
strcat(rw_timeout, ci->rw_timeout);
strcat(temp_URL, rw_timeout);
}
if ('\0' != ci->client_encoding[0]) {
char client_encoding[MEDIUM_REGISTRY_LEN] = "&client_encoding=";
strcat(client_encoding, ci->client_encoding);
strcat(temp_URL, client_encoding);
}
if ('\0' != ci->application_name[0]) {
char application_name[MEDIUM_REGISTRY_LEN] = "&application_name=";
strcat(application_name, ci->application_name);
strcat(temp_URL, application_name);
}
if ('\0' != ci->keepalives_idle[0]) {
char keepalives_idle[MEDIUM_REGISTRY_LEN] = "&keepalives_idle=";
strcat(keepalives_idle, ci->keepalives_idle);
strcat(temp_URL, keepalives_idle);
}
if ('\0' != ci->keepalives_interval[0]) {
char keepalives_interval[MEDIUM_REGISTRY_LEN] = "&keepalives_interval=";
strcat(keepalives_interval, ci->keepalives_interval);
strcat(temp_URL, keepalives_interval);
}
if ('\0' != ci->keepalives_count[0]) {
char keepalives_count[MEDIUM_REGISTRY_LEN] = "&keepalives_count=";
strcat(keepalives_count, ci->keepalives_count);
strcat(temp_URL, keepalives_count);
}
if ('\0' != ci->tcp_user_timeout[0]) {
char tcp_user_timeout[MEDIUM_REGISTRY_LEN] = "&tcp_user_timeout=";
strcat(tcp_user_timeout, ci->tcp_user_timeout);
strcat(temp_URL, tcp_user_timeout);
}
if ('\0' != ci->sslcompression[0]) {
char sslcompression[MEDIUM_REGISTRY_LEN] = "&sslcompression=";
strcat(sslcompression, ci->sslcompression);
strcat(temp_URL, sslcompression);
}
if ('\0' != ci->sslcert[0]) {
char sslcert[MEDIUM_REGISTRY_LEN] = "&sslcert=";
strcat(sslcert, ci->sslcert);
strcat(temp_URL, sslcert);
}
if ('\0' != ci->sslkey[0]) {
char sslkey[MEDIUM_REGISTRY_LEN] = "&sslkey=";
strcat(sslkey, ci->sslkey);
strcat(temp_URL, sslkey);
}
if ('\0' != ci->sslrootcert[0]) {
char sslrootcert[MEDIUM_REGISTRY_LEN] = "&sslrootcert=";
strcat(sslrootcert, ci->sslrootcert);
strcat(temp_URL, sslrootcert);
}
if ('\0' != ci->sslcrl[0]) {
char sslcrl[MEDIUM_REGISTRY_LEN] = "&sslcrl=";
strcat(sslcrl, ci->sslcrl);
strcat(temp_URL, sslcrl);
}
if ('\0' != ci->requirepeer[0]) {
char requirepeer[MEDIUM_REGISTRY_LEN] = "&requirepeer=";
strcat(requirepeer, ci->requirepeer);
strcat(temp_URL, requirepeer);
}
if ('\0' != ci->krbsrvname[0]) {
char krbsrvname[MEDIUM_REGISTRY_LEN] = "&krbsrvname=";
strcat(krbsrvname, ci->krbsrvname);
strcat(temp_URL, krbsrvname);
}
if ('\0' != ci->gsslib[0]) {
char gsslib[MEDIUM_REGISTRY_LEN] = "&gsslib=";
strcat(gsslib, ci->gsslib);
strcat(temp_URL, gsslib);
}
if ('\0' != ci->service[0]) {
char service[MEDIUM_REGISTRY_LEN] = "&service=";
strcat(service, ci->service);
strcat(temp_URL, service);
}
if ('\0' != ci->remote_nodename[0]) {
char remote_nodename[MEDIUM_REGISTRY_LEN] = "&remote_nodename=";
strcat(remote_nodename, ci->remote_nodename);
strcat(temp_URL, remote_nodename);
}
if ('\0' != ci->localhost[0]) {
char localhost[MEDIUM_REGISTRY_LEN] = "&localhost=";
strcat(localhost, ci->localhost);
strcat(temp_URL, localhost);
}
if ('\0' != ci->localport[0]) {
char localport[MEDIUM_REGISTRY_LEN] = "&localport=";
strcat(localport, ci->localport);
strcat(temp_URL, localport);
}
if ('\0' != ci->replication[0]) {
char replication[MEDIUM_REGISTRY_LEN] = "&replication=";
strcat(replication, ci->replication);
strcat(temp_URL, replication);
}
if ('\0' != ci->backend_version[0]) {
char backend_version[MEDIUM_REGISTRY_LEN] = "&backend_version=";
strcat(backend_version, ci->backend_version);
strcat(temp_URL, backend_version);
}
if ('\0' != ci->prototype[0]) {
char prototype[MEDIUM_REGISTRY_LEN] = "&prototype=";
strcat(prototype, ci->prototype);
strcat(temp_URL, prototype);
}
if ('\0' != ci->enable_ce[0]) {
char enable_ce[MEDIUM_REGISTRY_LEN] = "&enable_ce=";
strcat(enable_ce, ci->enable_ce);
strcat(temp_URL, enable_ce);
}
return temp_URL;
}
static int
LIBPQ_connect(ConnectionClass *self)
{
CSTR func = "LIBPQ_connect";
ConnInfo *ci = &(self->connInfo);
char ret = 0;
void *pqconn = NULL;
int pqret;
int pversion;
const char *opts[PROTOCOL3_OPTS_MAX], *vals[PROTOCOL3_OPTS_MAX];
PQconninfoOption *conninfoOption = NULL, *pqopt;
int i, cnt;
char login_timeout_str[20];
char keepalive_idle_str[20];
char keepalive_interval_str[20];
char *errmsg = NULL;
char local_conninfo[8192];
int host_number = 1;
int port_number = 1;
char* URL = NULL;
MYLOG(0, "connecting to the database using %s as the server and pqopt={%s}\n", self->connInfo.server, SAFE_NAME(ci->pqopt));
if (NULL == (conninfoOption = PQconninfoParse(SAFE_NAME(ci->pqopt), &errmsg)))
{
char emsg[200];
if (errmsg != NULL)
SPRINTF_FIXED(emsg, "libpq connection parameter error:%s", errmsg);
else
STRCPY_FIXED(emsg, "memory error? in PQconninfoParse");
CC_set_error(self, CONN_OPENDB_ERROR, emsg, func);
goto cleanup;
}
/* multiple_hostip or multiple_port from DSN */
URL = generate_conninfo_URL_by_ConnInfo(ci, &host_number, &port_number);
if (!URL) {
if (!ci->server || !ci->port) {
CC_set_error(self, CONN_INVALID_ARGUMENT_NO, "The server or port should not be empty.", func);
} else if (-1 == host_number || -1 == port_number) {
CC_set_error(self, CONN_NO_MEMORY_ERROR, "Memory allocation failure when resolving address.", func);
} else if ((host_number != port_number) && host_number != 1 && port_number != 1) {
CC_set_error(self, CONN_VALUE_OUT_OF_RANGE,
"The number of hosts should be the same as the number of ports when both are multiple.", func);
} else {
CC_set_error(self, CONN_NO_MEMORY_ERROR,
"Memory allocation failure when Trying to splice strings.", func);
}
}
if (host_number > 1 || port_number > 1) {
MYLOG(0, "connecting to the database using URL: %s\n", URL);
pqconn = PQconnectdb(URL);
} else {
/* Build arrays of keywords & values, for PQconnectDBParams */
cnt = 0;
if (ci->server[0]) {
opts[cnt] = "host";
vals[cnt++] = ci->server;
}
if (ci->port[0]) {
opts[cnt] = "port";
vals[cnt++] = ci->port;
}
if (ci->database[0]) {
opts[cnt] = "dbname";
vals[cnt++] = ci->database;
}
if (ci->username[0]) {
opts[cnt] = "user";
vals[cnt++] = ci->username;
}
switch (ci->sslmode[0]) {
case '\0':
break;
case SSLLBYTE_VERIFY:
opts[cnt] = "sslmode";
switch (ci->sslmode[1]) {
case 'f':
vals[cnt++] = SSLMODE_VERIFY_FULL;
break;
case 'c':
vals[cnt++] = SSLMODE_VERIFY_CA;
break;
default:
vals[cnt++] = ci->sslmode;
}
break;
default:
opts[cnt] = "sslmode";
vals[cnt++] = ci->sslmode;
}
if (NAME_IS_VALID(ci->password)) {
opts[cnt] = "password";
vals[cnt++] = SAFE_NAME(ci->password);
}
if (ci->disable_keepalive) {
opts[cnt] = "keepalives";
vals[cnt++] = "0";
}
if (ci->connect_timeout[0]) {
opts[cnt] = "connect_timeout";
vals[cnt++] = ci->connect_timeout;
}
if (ci->rw_timeout[0]) {
opts[cnt] = "rw_timeout";
vals[cnt++] = ci->rw_timeout;
}
if (ci->keepalive_idle > 0) {
ITOA_FIXED(keepalive_idle_str, ci->keepalive_idle);
opts[cnt] = "keepalives_idle";
vals[cnt++] = keepalive_idle_str;
} else if ('\0' != ci->keepalives_idle[0]) {
opts[cnt] = "keepalives_idle";
vals[cnt++] = ci->keepalives_idle;
}
if (ci->keepalive_interval > 0) {
ITOA_FIXED(keepalive_interval_str, ci->keepalive_interval);
opts[cnt] = "keepalives_interval";
vals[cnt++] = keepalive_interval_str;
} else if ('\0' != ci->keepalives_interval[0]) {
opts[cnt] = "keepalives_interval";
vals[cnt++] = ci->keepalives_interval;
}
if ('\0' != ci->keepalives_count[0]) {
opts[cnt] = "keepalives_count";
vals[cnt++] = ci->keepalives_count;
}
if ('\0' != ci->client_encoding[0]) {
opts[cnt] = "client_encoding";
vals[cnt++] = ci->client_encoding;
}
if ('\0' != ci->application_name[0]) {
opts[cnt] = "application_name";
vals[cnt++] = ci->application_name;
}
if ('\0' != ci->tcp_user_timeout[0]) {
opts[cnt] = "tcp_user_timeout";
vals[cnt++] = ci->tcp_user_timeout;
}
if ('\0' != ci->sslcompression[0]) {
opts[cnt] = "sslcompression";
vals[cnt++] = ci->sslcompression;
}
if ('\0' != ci->sslcert[0]) {
opts[cnt] = "sslcert";
vals[cnt++] = ci->sslcert;
}
if ('\0' != ci->sslkey[0]) {
opts[cnt] = "sslkey";
vals[cnt++] = ci->sslkey;
}
if ('\0' != ci->sslrootcert[0]) {
opts[cnt] = "sslrootcert";
vals[cnt++] = ci->sslrootcert;
}
if ('\0' != ci->sslcrl[0]) {
opts[cnt] = "sslcrl";
vals[cnt++] = ci->sslcrl;
}
if ('\0' != ci->requirepeer[0]) {
opts[cnt] = "requirepeer";
vals[cnt++] = ci->requirepeer;
}
if ('\0' != ci->krbsrvname[0]) {
opts[cnt] = "krbsrvname";
vals[cnt++] = ci->krbsrvname;
}
if ('\0' != ci->gsslib[0]) {
opts[cnt] = "gsslib";
vals[cnt++] = ci->gsslib;
}
if ('\0' != ci->service[0]) {
opts[cnt] = "service";
vals[cnt++] = ci->service;
}
if ('\0' != ci->remote_nodename[0]) {
opts[cnt] = "remote_nodename";
vals[cnt++] = ci->remote_nodename;
}
if ('\0' != ci->localhost[0]) {
opts[cnt] = "localhost";
vals[cnt++] = ci->localhost;
}
if ('\0' != ci->localport[0]) {
opts[cnt] = "localport";
vals[cnt++] = ci->localport;
}
if ('\0' != ci->replication[0]) {
opts[cnt] = "replication";
vals[cnt++] = ci->replication;
}
if ('\0' != ci->backend_version[0]) {
opts[cnt] = "backend_version";
vals[cnt++] = ci->backend_version;
}
if ('\0' != ci->prototype[0]) {
opts[cnt] = "prototype";
vals[cnt++] = ci->prototype;
}
if ('\0' != ci->enable_ce[0]) {
opts[cnt] = "enable_ce";
vals[cnt++] = ci->enable_ce;
}
if ((odbcVersionString != NULL) && (odbcVersionString[0] != '\0')) {
if (self->connInfo.connection_extra_info > 0) {
char libpath[4096] = {'\0'};
char username[128] = {'\0'};
(void)CC_getLibpath(libpath, sizeof(libpath));
(void)CC_getOSUser(username, sizeof(username));
snprintf(local_conninfo, sizeof(local_conninfo),
"{\"driver_name\":\"ODBC\", \"driver_version\":\"%s\", \"driver_path\":\"%s\", \"os_user\":\"%s\"}",
odbcVersionString, libpath, username);
} else {
snprintf(local_conninfo, sizeof(local_conninfo),
"{\"driver_name\":\"ODBC\",\"driver_version\":\"%s\"}",
odbcVersionString);
}
opts[cnt] = "connection_info";
vals[cnt++] = local_conninfo;
}
if (conninfoOption != NULL) {
const char *keyword, *val;
int j;
for (i = 0, pqopt = conninfoOption; (keyword = pqopt->keyword) != NULL; i++, pqopt++) {
if ((val = pqopt->val) != NULL) {
for (j = 0; j < cnt; j++) {
if (stricmp(opts[j], keyword) == 0) {
char emsg[100];
if (vals[j] != NULL && strcmp(vals[j], val) == 0) {
continue;
}
SPRINTF_FIXED(emsg,
"%s parameter in pqopt option conflicts with other ordinary option", keyword);
CC_set_error(self, CONN_OPENDB_ERROR, emsg, func);
goto cleanup;
}
}
if (j >= cnt && cnt < PROTOCOL3_OPTS_MAX - 1) {
opts[cnt] = keyword;
vals[cnt++] = val;
}
}
}
}
opts[cnt] = vals[cnt] = NULL;
/* Ok, we're all set to connect */
if (get_qlog() > 0 || get_mylog() > 0) {
const char **popt, **pval;
QLOG(0, "PQconnectdbParams:");
for (popt = opts, pval = vals; *popt; popt++, pval++) {
QPRINTF(0, " %s='%s'", *popt, *pval);
}
QPRINTF(0, "\n");
}
pqconn = PQconnectdbParams(opts, vals, FALSE);
}
free(URL);
if (!pqconn)
{
CC_set_error(self, CONN_OPENDB_ERROR, "PQconnectdb error", func);
goto cleanup;
}
self->pqconn = pqconn;
pqret = PQstatus(pqconn);
if (pqret == CONNECTION_BAD && PQconnectionNeedsPassword(pqconn))
{
const char *errmsg;
MYLOG(0, "password retry\n");
errmsg = PQerrorMessage(pqconn);
CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func);
QLOG(0, "PQfinish: %p\n", pqconn);
PQfinish(pqconn);
self->pqconn = NULL;
self->connInfo.password_required = TRUE;
ret = -1;
goto cleanup;
}
if (CONNECTION_OK != pqret)
{
const char *errmsg;
MYLOG(DETAIL_LOG_LEVEL, "status=%d\n", pqret);
errmsg = PQerrorMessage(pqconn);
CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func);
MYLOG(0, "Could not establish connection to the database; LIBPQ returned -> %s\n", errmsg);
goto cleanup;
}
if (PQpass(pqconn) && strlen(PQpass(pqconn)))
{
char *pwd = PQpass(pqconn);
memset(pwd, 0, strlen(pwd));
}
MYLOG(0, "libpq connection to the database established.(IP: %s)\n", PQhost(pqconn));
pversion = PQprotocolVersion(pqconn);
if (pversion < 3)
{
MYLOG(0, "Protocol version %d is not supported\n", pversion);
goto cleanup;
}
MYLOG(0, "protocol=%d\n", pversion);
pversion = PQserverVersion(pqconn);
self->pg_version_major = pversion / 10000;
self->pg_version_minor = (pversion % 10000) / 100;
SPRINTF_FIXED(self->pg_version, "%d.%d.%d", self->pg_version_major, self->pg_version_minor, pversion % 100);
MYLOG(0, "Server version=%s\n", self->pg_version);
if (!CC_get_username(self)[0])
{
MYLOG(0, "PQuser=%s\n", PQuser(pqconn));
STRCPY_FIXED(self->connInfo.username, PQuser(pqconn));
}
ret = 1;
cleanup:
if (errmsg != NULL)
free(errmsg);
PQconninfoFree(conninfoOption);
if (ret != 1)
{
if (self->pqconn)
{
QLOG(0, "PQfinish: %p\n", self->pqconn);
PQfinish(self->pqconn);
}
self->pqconn = NULL;
}
MYLOG(0, "leaving %d\n", ret);
return ret;
}
int
CC_send_cancel_request(const ConnectionClass *conn)
{
int ret = 0;
char errbuf[256];
void *cancel;
/* Check we have an open connection */
if (!conn || !conn->pqconn)
return FALSE;
cancel = PQgetCancel(conn->pqconn);
if (!cancel)
return FALSE;
ret = PQcancel(cancel, errbuf, sizeof(errbuf));
PQfreeCancel(cancel);
if (1 == ret)
return TRUE;
else
return FALSE;
}
const char *CurrCat(const ConnectionClass *conn)
{
/*
* Returning the database name causes problems in MS Query. It
* generates query like: "SELECT DISTINCT a FROM byronnbad3
* bad3"
*/
if (isMsQuery()) /* MS Query */
return NULL;
return conn->connInfo.database;
}
const char *CurrCatString(const ConnectionClass *conn)
{
const char *cat = CurrCat(conn);
if (!cat)
cat = NULL_STRING;
return cat;
}
/*------
* Create a null terminated lower-case string if the
* original string contains upper-case characters.
* The SQL_NTS length is considered.
*------
*/
SQLCHAR *
make_lstring_ifneeded(ConnectionClass *conn, const SQLCHAR *s, ssize_t len, BOOL ifallupper)
{
ssize_t length = len;
char *str = NULL;
const char *ccs = (const char *) s;
if (s && (len > 0 || (len == SQL_NTS && (length = strlen(ccs)) > 0)))
{
int i;
UCHAR tchar;
encoded_str encstr;
make_encoded_str(&encstr, conn, ccs);
for (i = 0; i < length; i++)
{
tchar = encoded_nextchar(&encstr);
if (MBCS_NON_ASCII(encstr))
continue;
if (ifallupper && islower(tchar))
{
if (str)
{
free(str);
str = NULL;
}
break;
}
if (tolower(tchar) != tchar)
{
if (!str)
{
str = malloc(length + 1);
if (!str) return NULL;
memcpy(str, s, length);
str[length] = '\0';
}
str[i] = tolower(tchar);
}
}
}
return (SQLCHAR *) str;
}
/*
* Concatenate a single formatted argument to a given buffer handling the SQL_NTS thing.
* "fmt" must contain somewhere in it the single form '%.*s'.
* This is heavily used in creating queries for info routines (SQLTables, SQLColumns).
* This routine could be modified to use vsprintf() to handle multiple arguments.
*/
static int
my_str(char *buf, int buflen, const char *fmt, const char *s, ssize_t len)
{
if (s && (len > 0 || (len == SQL_NTS && *s != 0)))
{
size_t length = (len > 0) ? len : strlen(s);
return snprintf(buf, buflen, fmt, length, s);
}
buf[0] = '\0';
return 0;
}
int
schema_str(char *buf, int buflen, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn)
{
CSTR fmt = "%.*s";
buf[0] = '\0';
if (!s || 0 == len)
{
/*
* Note that this driver assumes the implicit schema is
* the CURRENT_SCHEMA() though it doesn't worth the
* naming.
*/
if (table_is_valid)
return my_str(buf, buflen, fmt, CC_get_current_schema(conn), SQL_NTS);
return 0;
}
return my_str(buf, buflen, fmt, (char *) s, len);
}
static void
my_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const char *s, ssize_t len)
{
if (s && (len > 0 || (len == SQL_NTS && *s != 0)))
{
size_t length = (len > 0) ? len : strlen(s);
appendPQExpBuffer(buf, fmt, length, s);
}
}
/*
* my_appendPQExpBuffer1 is a extension of my_appendPQExpBuffer.
* It can have 1 more parameter than my_aapendPQExpBuffer.
*/
static void
my_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s)
{
if (s && s[0] != '\0')
{
ssize_t length = strlen(s);
if (s1)
appendPQExpBuffer(buf, fmt, s1, length, s);
else
appendPQExpBuffer(buf, fmt, length, s);
}
}
void
schema_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn)
{
if (!s || 0 == len)
{
/*
* Note that this driver assumes the implicit schema is
* the CURRENT_SCHEMA() though it doesn't worth the
* naming.
*/
if (table_is_valid)
my_appendPQExpBuffer(buf, fmt, CC_get_current_schema(conn), SQL_NTS);
return;
}
my_appendPQExpBuffer(buf, fmt, (char *) s, len);
}
void
schema_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s, BOOL table_is_valid, ConnectionClass *conn)
{
if (!s || s[0] == '\0')
{
if (table_is_valid)
my_appendPQExpBuffer1(buf, fmt, s1, CC_get_current_schema(conn));
return;
}
my_appendPQExpBuffer1(buf, fmt, s1, s);
}
#ifdef _HANDLE_ENLIST_IN_DTC_
/*
* Export the following functions so that the pgenlist dll
* can handle ConnectionClass objects as opaque ones.
*/
#define _PGDTC_FUNCS_IMPLEMENT_
#include "connexp.h"
#define SYNC_AUTOCOMMIT(conn) \
(SQL_AUTOCOMMIT_OFF != conn->autocommit_public ? \
(conn->transact_status |= CONN_IN_AUTOCOMMIT) : \
(conn->transact_status &= ~CONN_IN_AUTOCOMMIT))
DLL_DECLARE void PgDtc_create_connect_string(void *self, char *connstr, int strsize)
{
ConnectionClass *conn = (ConnectionClass *) self;
ConnInfo *ci = &(conn->connInfo);
const char *drivername = ci->drivername;
char xaOptStr[32];
#if defined(_WIN32) && !defined(_WIN64)
/*
* If this is an x86 driver running on an x64 host then the driver name
* passed to MSDTC must be the (x64) driver but the client app will be
* using the 32-bit driver name. So MSDTC.exe will fail to find the driver
* and we'll fail to recover XA transactions.
*
* IsWow64Process(...) would be the ideal function for this, but is only
* available on Windows 6+ (Vista/2k8). We'd use GetNativeSystemInfo, which
* is supported on XP and 2k3, instead, but that won't link with older
* SDKs.
*
* It's impler to just test the PROCESSOR_ARCHITEW6432 environment
* variable.
*
* See http://www.postgresql.org/message-id/53A45B59.70303@2ndquadrant.com
* for details on this issue.
*/
const char * const procenv = getenv("PROCESSOR_ARCHITEW6432");
if (procenv != NULL && strcmp(procenv, "AMD64") == 0)
{
/*
* We're a 32-bit binary running under SysWow64 on a 64-bit host and need
* to pass a different driver name.
*
* To avoid playing memory management games, just return a different
* string constant depending on the unicode-ness of the driver.
*
* It probably doesn't matter whether we use the Unicode or ANSI driver
* for the XA transaction manager, but pick the same as the client driver
* to keep things as similar as possible.
*/
if (strcmp(drivername, DBMS_NAME) == 0)
#ifdef UNICODE_SUPPORT
drivername = DBMS_NAME_UNICODE"(x64)";
#else
drivername = DBMS_NAME_ANSI"(x64)";
#endif
}
#endif // _WIN32 && !_WIN64
if (0 >= ci->xa_opt) return;
switch (ci->xa_opt)
{
case DTC_CHECK_LINK_ONLY:
case DTC_CHECK_BEFORE_LINK:
SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=0;");
break;
case DTC_CHECK_RM_CONNECTION:
SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=1;");
break;
default:
*xaOptStr = '\0';
break;
}
snprintf(connstr, strsize,
"DRIVER={%s};%s"
"SERVER=%s;PORT=%s;DATABASE=%s;"
"UID=%s;PWD=%s;" ABBR_SSLMODE "=%s",
drivername, xaOptStr,
ci->server, ci->port, ci->database, ci->username,
SAFE_NAME(ci->password), ci->sslmode
);
return;
}
#define SECURITY_WIN32
#include <security.h>
DLL_DECLARE int PgDtc_is_recovery_available(void *self, char *reason, int rsize)
{
ConnectionClass *conn = (ConnectionClass *) self;
ConnInfo *ci = &(conn->connInfo);
int ret = -1; // inknown
LONG nameSize;
char loginUser[256];
BOOL outReason = FALSE;
BOOL doubtRootCert = TRUE, doubtCert = TRUE;
const char *delim;
/*
* Root certificate is used?
*/
if (NULL != reason &&
rsize > 0)
outReason = TRUE;
/*
* Root certificate is used?
*/
doubtRootCert = FALSE;
if (0 == stricmp(ci->sslmode, SSLMODE_VERIFY_CA) ||
0 == stricmp(ci->sslmode, SSLMODE_VERIFY_FULL))
{
if (outReason)
strncpy_null(reason, "sslmode verify-[ca|full]", rsize);
return 0;
}
/*
* Did we use SSL client certificate, SSPI, Kerberos or similar
* authentication methods?
* There seems no way to check it directly.
*/
doubtCert = FALSE;
if (PQgetssl(conn->pqconn) != NULL)
doubtCert = TRUE;
nameSize = sizeof(loginUser);
if (GetUserNameEx(NameUserPrincipal, loginUser, &nameSize))
{
MYLOG(0, "loginUser=%s\n", loginUser);
}
else
{
int err = GetLastError();
switch (err)
{
case ERROR_NONE_MAPPED:
MYLOG(0, "The user name is unavailable in the specified format\n");
break;
case ERROR_NO_SUCH_DOMAIN:
MYLOG(0, "The domain controller is unavailable to perform the lookup\n");
break;
case ERROR_MORE_DATA:
MYLOG(0, "The buffer is too small\n");
break;
default:
MYLOG(0, "GetUserNameEx error=%d\n", err);
break;
}
}
ret = 1;
if (outReason)
*reason = '\0';
delim = "";
if (doubtRootCert)
{
if (outReason)
snprintf(reason, rsize, "%s%ssslmode verify-[ca|full]", reason, delim);
delim = ", ";
ret = -1;
}
if (doubtCert)
{
if (outReason)
snprintf(reason, rsize, "%s%scertificate", reason, delim);
delim = ", ";
ret = -1;
}
return ret;
}
DLL_DECLARE void PgDtc_set_async(void *self, void *async)
{
ConnectionClass *conn = (ConnectionClass *) self;
if (!conn) return;
CONNLOCK_ACQUIRE(conn);
if (NULL != async)
CC_set_autocommit(conn, FALSE);
else
SYNC_AUTOCOMMIT(conn);
conn->asdum = async;
CONNLOCK_RELEASE(conn);
}
DLL_DECLARE void *PgDtc_get_async(void *self)
{
ConnectionClass *conn = (ConnectionClass *) self;
return conn->asdum;
}
DLL_DECLARE void PgDtc_set_property(void *self, int property, void *value)
{
ConnectionClass *conn = (ConnectionClass *) self;
CONNLOCK_ACQUIRE(conn);
switch (property)
{
case inprogress:
if (NULL != value)
CC_set_dtc_executing(conn);
else
CC_no_dtc_executing(conn);
break;
case enlisted:
if (NULL != value)
CC_set_dtc_enlisted(conn);
else
CC_no_dtc_enlisted(conn);
break;
case prepareRequested:
if (NULL != value)
CC_set_dtc_prepareRequested(conn);
else
CC_no_dtc_prepareRequested(conn);
break;
}
CONNLOCK_RELEASE(conn);
}
DLL_DECLARE void PgDtc_set_error(void *self, const char *message, const char *func)
{
ConnectionClass *conn = (ConnectionClass *) self;
CC_set_error(conn, CONN_UNSUPPORTED_OPTION, message, func);
}
DLL_DECLARE int PgDtc_get_property(void *self, int property)
{
ConnectionClass *conn = (ConnectionClass *) self;
int ret;
CONNLOCK_ACQUIRE(conn);
switch (property)
{
case inprogress:
ret = CC_is_dtc_executing(conn);
break;
case enlisted:
ret = CC_is_dtc_enlisted(conn);
break;
case inTrans:
ret = CC_is_in_trans(conn);
break;
case errorNumber:
ret = CC_get_errornumber(conn);
break;
case idleInGlobalTransaction:
ret = CC_is_idle_in_global_transaction(conn);
break;
case connected:
ret = (CONN_CONNECTED == conn->status);
break;
case prepareRequested:
ret = CC_is_dtc_prepareRequested(conn);
break;
}
CONNLOCK_RELEASE(conn);
return ret;
}
DLL_DECLARE BOOL PgDtc_connect(void *self)
{
CSTR func = "PgDtc_connect";
ConnectionClass *conn = (ConnectionClass *) self;
if (CONN_CONNECTED == conn->status)
return TRUE;
if (CC_connect(conn, NULL) <= 0)
{
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
return FALSE;
}
return TRUE;
}
DLL_DECLARE void PgDtc_free_connect(void *self)
{
ConnectionClass *conn = (ConnectionClass *) self;
PGAPI_FreeConnect(conn);
}
DLL_DECLARE BOOL PgDtc_one_phase_operation(void *self, int operation)
{
ConnectionClass *conn = (ConnectionClass *) self;
BOOL ret, is_in_progress = CC_is_dtc_executing(conn);
if (!is_in_progress)
CC_set_dtc_executing(conn);
switch (operation)
{
case ONE_PHASE_COMMIT:
ret = CC_commit(conn);
break;
default:
ret = CC_abort(conn);
break;
}
if (!is_in_progress)
CC_no_dtc_executing(conn);
return ret;
}
DLL_DECLARE BOOL
PgDtc_two_phase_operation(void *self, int operation, const char *gxid)
{
ConnectionClass *conn = (ConnectionClass *) self;
QResultClass *qres;
BOOL ret = TRUE;
char cmd[512];
switch (operation)
{
case PREPARE_TRANSACTION:
SPRINTF_FIXED(cmd, "PREPARE TRANSACTION '%s'", gxid);
break;
case COMMIT_PREPARED:
SPRINTF_FIXED(cmd, "COMMIT PREPARED '%s'", gxid);
break;
case ROLLBACK_PREPARED:
SPRINTF_FIXED(cmd, "ROLLBACK PREPARED '%s'", gxid);
break;
}
qres = CC_send_query(conn, cmd, NULL, 0, NULL);
if (!QR_command_maybe_successful(qres))
ret = FALSE;
QR_Destructor(qres);
return ret;
}
DLL_DECLARE BOOL
PgDtc_lock_cntrl(void *self, BOOL acquire, BOOL bTrial)
{
ConnectionClass *conn = (ConnectionClass *) self;
BOOL ret = TRUE;
if (acquire)
if (bTrial)
ret = TRY_ENTER_CONN_CS(conn);
else
ENTER_CONN_CS(conn);
else
LEAVE_CONN_CS(conn);
return ret;
}
static ConnectionClass *
CC_Copy(const ConnectionClass *conn)
{
ConnectionClass *newconn = CC_alloc();
if (newconn)
{
memcpy(newconn, conn, sizeof(ConnectionClass));
CC_lockinit(newconn);
}
return newconn;
}
DLL_DECLARE void *
PgDtc_isolate(void *self, DWORD option)
{
BOOL disposingConn = (0 != (disposingConnection & option));
ConnectionClass *sconn = (ConnectionClass *) self, *newconn = NULL;
if (0 == (useAnotherRoom & option))
{
HENV henv = sconn->henv;
CC_cleanup(sconn, TRUE);
if (newconn = CC_Copy(sconn), NULL == newconn)
return newconn;
MYLOG(0, "newconn=%p from %p\n", newconn, sconn);
CC_initialize(sconn, FALSE);
if (!disposingConn)
CC_copy_conninfo(&sconn->connInfo, &newconn->connInfo);
CC_initialize_pg_version(sconn);
sconn->henv = henv;
newconn->henv = NULL;
SYNC_AUTOCOMMIT(sconn);
return newconn;
}
newconn = CC_Constructor();
if (!newconn) return NULL;
CC_copy_conninfo(&newconn->connInfo, &sconn->connInfo);
CC_initialize_pg_version(newconn);
newconn->asdum = sconn->asdum;
newconn->gTranInfo = sconn->gTranInfo;
CC_set_dtc_isolated(newconn);
sconn->asdum = NULL;
SYNC_AUTOCOMMIT(sconn);
CC_set_dtc_clear(sconn);
MYLOG(0, "generated connection=%p with %p\n", newconn, newconn->asdum);
return newconn;
}
#endif /* _HANDLE_ENLIST_IN_DTC_ */
BOOL
CC_set_transact(ConnectionClass *self, UInt4 isolation)
{
char *query;
QResultClass *res;
BOOL bShow = FALSE;
if (PG_VERSION_LT(self, 8.0) &&
(isolation == SQL_TXN_READ_UNCOMMITTED ||
isolation == SQL_TXN_REPEATABLE_READ))
{
CC_set_error(self, CONN_NOT_IMPLEMENTED_ERROR, "READ_UNCOMMITTED or REPEATABLE_READ is not supported by the server", __FUNCTION__);
return FALSE;
}
switch (isolation)
{
case SQL_TXN_SERIALIZABLE:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE";
break;
case SQL_TXN_REPEATABLE_READ:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ";
break;
case SQL_TXN_READ_UNCOMMITTED:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED";
break;
default:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED";
break;
}
if (self->default_isolation == 0)
bShow = TRUE;
if (bShow)
res = CC_send_query_append(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL, query);
else
res = CC_send_query(self, query, NULL, READ_ONLY_QUERY, NULL);
if (!QR_command_maybe_successful(res))
{
CC_set_error(self, CONN_EXEC_ERROR, "ISOLATION change request to the server error", __FUNCTION__);
QR_Destructor(res);
return FALSE;
}
if (bShow)
handle_show_results(res);
QR_Destructor(res);
self->server_isolation = isolation;
return TRUE;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/opengauss/openGauss-connector-odbc.git
git@gitee.com:opengauss/openGauss-connector-odbc.git
opengauss
openGauss-connector-odbc
openGauss-connector-odbc
master

搜索帮助