1 Star 0 Fork 53

huangjiajun/openGauss-connector-odbc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
connection.c 101.93 KB
一键复制 编辑 原始数据 按行查看 历史
zhangxubo 提交于 2022-03-11 17:10 . add 3.0.0 patch file
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224
/*------
* Module: connection.c
*
* Description: This module contains routines related to
* connecting to and disconnecting from the Postgres DBMS.
*
* Classes: ConnectionClass (Functions prefix: "CC_")
*
* API functions: SQLAllocConnect, SQLConnect, SQLDisconnect, SQLFreeConnect,
* SQLBrowseConnect(NI)
*
* Comments: See "readme.txt" for copyright and license information.
*-------
*/
/* Multibyte support Eiji Tokuya 2001-03-15 */
/* TryEnterCritiaclSection needs the following #define */
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0400
#endif /* _WIN32_WINNT */
#include <ctype.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
/* for htonl */
#ifdef WIN32
#include <Winsock2.h>
#include <windows.h>
#else
#include <arpa/inet.h>
#endif
#ifndef WIN32
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <dlfcn.h>
#include <pwd.h>
#endif
#include "connection.h"
#include "misc.h"
#include "environ.h"
#include "statement.h"
#include "qresult.h"
#include "lobj.h"
#include "dlg_specific.h"
#include "loadlib.h"
#include "multibyte.h"
#include "pgapifunc.h"
#define SAFE_STR(s) (NULL != (s) ? (s) : "(null)")
/* how many statement holders to allocate
* at a time
*/
#define STMT_INCREMENT 16
#define MAX_CN 128 /* the maximum number of CN is 128 */
#define EMPTY 0
#define CORRECT 1
#define WRONG 2
#define lenNameType 63
BOOL conn_inited = FALSE;
BOOL conn_precheck = FALSE;
pthread_rwlock_t init_lock = PTHREAD_RWLOCK_INITIALIZER;
int refresh_flag = 0;
unsigned long int pgxc_node_thread_id;
typedef struct CnEntry {
char ip_list[MAX_CN][MEDIUM_REGISTRY_LEN]; /* a char array to store IPs */
int ip_status[MAX_CN]; /* an integer array that indicates the status of IPs */
char port_list[MAX_CN][SMALL_REGISTRY_LEN]; /* a char array that stores port */
int port_status[MAX_CN]; /* an integer array that indicates the status of IPs */
int ip_count; /* define integer to record the number of IPs stored in the IP_list */
int port_count; /*
* define integer to record the number of IPs stored in the IP_list
* this should be equal to IP_count
*/
int step[MAX_CN]; /* record the offset for roundRobin when autobalance is on */
BOOL is_usable;
pthread_rwlock_t ip_list_lock; /* define a lock to isolate read and write to ip_list */
pthread_rwlock_t step_lock[MAX_CN]; /* define a lock to isolate read and write to step */
} CnEntry;
typedef struct dsn_time {
char *DSN;
int timeinterval;
} dsn_time;
CnEntry orig_entry;
CnEntry pgxc_entry;
static int LIBPQ_connect(ConnectionClass *self);
#ifdef WIN32
DWORD WINAPI read_pgxc_node(LPVOID arg)
#else
static void *read_pgxc_node(void *arg)
#endif
{
dsn_time read_cn;
read_cn = *(dsn_time *) arg;
char *DSN = malloc(strlen(read_cn.DSN) + 1);
if (DSN == NULL) {
exit(1);
}
strncpy_null(DSN, read_cn.DSN, strlen(read_cn.DSN) + 1);
read_cn.DSN = DSN;
int time = read_cn.timeinterval;
#ifdef WIN32
pgxc_node_thread_id = GetCurrentThreadId();
#else
pgxc_node_thread_id = pthread_self();
#endif
int refresh_count = 0;
/* read CNs' IP from pgxc_node */
for (;;) {
MYLOG(0, "REFRESH starts\n");
SQLHENV hEnv = SQL_NULL_HENV;
SQLHDBC hDbc = SQL_NULL_HDBC;
SQLHSTMT hStmt = SQL_NULL_HSTMT;
SQLRETURN rc = SQL_SUCCESS;
SQLINTEGER RETCODE = 0;
char node_port[SMALL_REGISTRY_LEN];
char node_host[lenNameType];
SQLLEN lenPort=0, lenHost=0;
RETCODE = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hEnv);
if (RETCODE != SQL_SUCCESS) {
continue;
}
SQLSetEnvAttr(hEnv, SQL_ATTR_ODBC_VERSION, (void*) SQL_OV_ODBC3, 0);
RETCODE = SQLAllocHandle(SQL_HANDLE_DBC, hEnv, &hDbc);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLConnect(hDbc, // Connect handle
(SQLCHAR *)DSN, //DSN
SQL_NTS, // DSN is nul-terminated
NULL, // Null UID
0 ,
NULL, // Null Auth string
0);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLAllocHandle(SQL_HANDLE_STMT, hDbc, &hStmt);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLExecDirect(hStmt,"select node_port, node_host from pgxc_node where node_type = 'C' and nodeis_active order by node_name",SQL_NTS);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
RETCODE = SQLBindCol(hStmt, 1, SQL_C_CHAR, node_port, SMALL_REGISTRY_LEN, &lenPort);
RETCODE = SQLBindCol(hStmt, 2, SQL_C_CHAR, node_host, lenNameType + 1, &lenHost);
if (RETCODE != SQL_SUCCESS) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
continue;
}
int count = 0;
char IP_list_temp[MAX_CN][MEDIUM_REGISTRY_LEN];
char port_list_temp[MAX_CN][SMALL_REGISTRY_LEN];
char port_temp[SMALL_REGISTRY_LEN];
while ((rc = SQLFetch(hStmt)) == SQL_SUCCESS) {
refresh_flag = 1;
STRCPY_FIXED(IP_list_temp[count], node_host);
STRCPY_FIXED(port_list_temp[count++], node_port);
pgxc_entry.ip_count = count;
}
refresh_count++;
if (refresh_count > 10 && rc == SQL_ERROR) {
refresh_flag = 1;
MYLOG(0, "Refresh failed for ten times, change signal and unlock other threads.\n");
}
int i;
if (count != 0 && pthread_rwlock_wrlock(&pgxc_entry.ip_list_lock) == 0) {
for (i = 0; i < pgxc_entry.ip_count; i++) {
STRCPY_FIXED(pgxc_entry.ip_list[i], IP_list_temp[i]);
STRCPY_FIXED(pgxc_entry.port_list[i], port_list_temp[i]);
MYLOG(0, "ip = %s, port = %s\n", pgxc_entry.ip_list[i], pgxc_entry.port_list[i]);
}
MYLOG(0, "CN list has been refreshed.\n");
if (pthread_rwlock_unlock(&pgxc_entry.ip_list_lock) != 0) {
SQLFreeHandle(SQL_HANDLE_STMT, hStmt);
SQLDisconnect(hDbc);
SQLFreeHandle(SQL_HANDLE_DBC, hDbc);
SQLFreeHandle(SQL_HANDLE_ENV, hEnv);
MYLOG(0, "Unlock failed. Exit process.\n");
exit(1);
}
}
SQLDisconnect(hDbc);
sleep(time);
}
}
static BOOL check_IP_connection(ConnectionClass *conn, CnEntry *entry)
{
int i;
int pqret;
BOOL ret = FALSE;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return FALSE;
}
MYLOG(0, "Start checking the connection for each pair of IP and PORT.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_rdlock(&entry->ip_list_lock);
}
for (i = 0; i < entry->ip_count; i++) {
STRCPY_FIXED(ci->server, entry->ip_list[i]);
STRCPY_FIXED(ci->port, entry->port_list[i]);
if ((pqret = LIBPQ_connect(conn)) <= 0) {
/* connection failed, kick out the wrong IP from IP_list and write the wrong IP into log */
MYLOG(0, "Cannot establish connection via IP: %s\n", entry->ip_list[i]);
entry->ip_status[i] = WRONG;
} else {
/* connection successful, current IP remains in IP_list but disconnect */
entry->ip_status[i] = CORRECT;
PQfinish(conn->pqconn);
ret = TRUE;
}
}
MYLOG(0, "Check finished.\n");
if (entry == &pgxc_entry) {
pthread_rwlock_unlock(&entry->ip_list_lock);
}
return ret;
}
static void start_new_thread(dsn_time *read_cn)
{
#ifdef WIN32
CreateThread(NULL, 0, read_pgxc_node, (LPVOID)(read_cn), 0, NULL);
#else
pthread_t ntid;
pthread_create(&ntid, NULL, read_pgxc_node, read_cn);
#endif
}
static RETCODE init_conn(ConnectionClass *conn)
{
RETCODE ret = SQL_SUCCESS;
ConnInfo *ci = &conn->connInfo;
if (conn == NULL) {
return SQL_ERROR;
}
/* initialize */
memset(&pgxc_entry, 0, sizeof(CnEntry));
memset(&orig_entry, 0, sizeof(CnEntry));
pgxc_entry.is_usable = TRUE;
orig_entry.is_usable = TRUE;
int i;
for (i = 0; i < MAX_CN; i++) {
pthread_rwlock_init(&pgxc_entry.step_lock[i], NULL);
pthread_rwlock_init(&orig_entry.step_lock[i], NULL);
}
pthread_rwlock_init(&pgxc_entry.ip_list_lock, NULL);
pthread_rwlock_init(&orig_entry.ip_list_lock, NULL);
/* make a copy of ci->server and ci->port to prevent changing the original conn when parsing it */
char server[LARGE_REGISTRY_LEN];
STRCPY_FIXED(server, ci->server);
char port[LARGE_REGISTRY_LEN];
STRCPY_FIXED(port, ci->port);
/* parsing ci->server to seperate IPs and store them into IP_list */
char *p = strtok(server, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.ip_list[orig_entry.ip_count++], p);
STRCPY_FIXED(pgxc_entry.ip_list[pgxc_entry.ip_count++], p);
p = strtok(NULL, ",");
}
/* parsing ci->port to seperate PORTs and store them into port_list */
p = strtok(port, ",");
while (p != NULL) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], p);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], p);
p = strtok(NULL, ",");
}
/* if only one port was configured, then each CN has the same port by default */
if (orig_entry.port_count == 1) {
for (i = 1; i < orig_entry.ip_count; i++) {
STRCPY_FIXED(orig_entry.port_list[orig_entry.port_count++], orig_entry.port_list[0]);
STRCPY_FIXED(pgxc_entry.port_list[pgxc_entry.port_count++], pgxc_entry.port_list[0]);
}
}
/* if serverl ports were configured, the number of ports has to be equal to the number of IPs */
if (orig_entry.ip_count != orig_entry.port_count) {
MYLOG(0, "The number of IP %d does not match the number of Port %d.\n", orig_entry.ip_count, orig_entry.port_count);
return SQL_ERROR;
}
/* check the connection of each pair of IP and port and update the list and status */
if (!check_IP_connection(conn, &orig_entry)) {
return SQL_ERROR;
}
memcpy(pgxc_entry.ip_status, orig_entry.ip_status, sizeof(orig_entry.ip_status));
/* start new thread to connect to datbase and select node_port from pgxc_node */
dsn_time read_cn;
read_cn.DSN = ci->dsn;
if (ci->refreshcnlisttime == 0) {
read_cn.timeinterval = 10;
} else {
read_cn.timeinterval = ci->refreshcnlisttime;
}
start_new_thread(&read_cn);
return ret;
}
int get_location(BOOL *visited, CnEntry *entry, int *visited_count)
{
if (visited == NULL || entry == NULL || visited_count == NULL) {
return -1;
}
/* select random IP from ip_list */
srand(pthread_self());
unsigned int ind = rand() % entry->ip_count;
/* record the offset of each IP for roundRobin */
int *offset = &entry->step[ind];
pthread_rwlock_t *offset_lock = &entry->step_lock[ind];
/*
* if the selected IP can be connected and has not been visited, connect to this IP
* else enter the while loop to choose another IP for connect
* use visited_count to record the number of IPs that have been visited
* visited_count equals to ip_count means reaching the limit
*/
while ((entry->ip_status[ind] == WRONG || visited[ind] == TRUE) && (*visited_count) != entry->ip_count) {
if (visited[ind] == FALSE) {
visited[ind] = TRUE;
(*visited_count)++;
}
pthread_rwlock_wrlock(offset_lock);
while (visited[ind] == TRUE && (*visited_count) != entry->ip_count) {
ind = ind + *offset;
(*offset)++;
ind = ind % entry->ip_count;
}
pthread_rwlock_unlock(offset_lock);
}
/*
* the selected IP after the while loop should not have been visited
* if it has been visited, return error
*/
if (visited[ind] == TRUE) {
return -1;
}
visited[ind] = TRUE;
(*visited_count)++;
return ind;
}
static RETCODE connect_random_IP(ConnectionClass *conn, CnEntry *entry)
{
RETCODE ret = SQL_ERROR;
ConnInfo *ci = &conn->connInfo;
CSTR func = "PGAPI_Connect";
char fchar;
BOOL visited[MAX_CN] = {FALSE};
int visited_count = 0;
BOOL check_ret = check_IP_connection(conn, entry);
/* only connection successful and all connection failed will break the while loop */
while (ret == SQL_ERROR) {
if (entry == &pgxc_entry && pthread_rwlock_rdlock(&entry->ip_list_lock) != 0) {
return SQL_ERROR;
}
int ind = get_location(visited, entry, &visited_count);
if (ind == -1) {
pthread_rwlock_unlock(&entry->ip_list_lock);
return SQL_ERROR;
}
STRCPY_FIXED(ci->server, entry->ip_list[ind]);
STRCPY_FIXED(ci->port, entry->port_list[ind]);
while (entry == &pgxc_entry && pthread_rwlock_unlock(&entry->ip_list_lock) != 0);
if ((fchar = CC_connect(conn, NULL)) <= 0) {
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
} else {
ret = SQL_SUCCESS;
}
}
if (ret == SQL_SUCCESS && fchar == 2) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
/* Empty the password stored in memory to avoid password leak */
if (NAME_IS_VALID(ci->password))
memset(ci->password.name, 0, strlen(ci->password.name));
return ret;
}
static RETCODE connect_IP(ConnectionClass *conn)
{
if (conn->connInfo.priority == 1 && orig_entry.is_usable && connect_random_IP(conn, &orig_entry) != SQL_ERROR) {
return SQL_SUCCESS;
}
return connect_random_IP(conn, &pgxc_entry);
}
static RETCODE check_and_init(ConnectionClass *conn)
{
if (conn_precheck) {
return SQL_SUCCESS;
}
if (pthread_rwlock_rdlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
pthread_rwlock_unlock(&init_lock);
if (pthread_rwlock_wrlock(&init_lock)) {
return SQL_ERROR;
}
if (conn_inited) {
pthread_rwlock_unlock(&init_lock);
return SQL_SUCCESS;
}
if (init_conn(conn) != SQL_SUCCESS) {
pthread_rwlock_unlock(&init_lock);
return SQL_ERROR;
} else {
conn_inited = TRUE;
}
pthread_rwlock_unlock(&init_lock);
conn_precheck = TRUE;
return SQL_SUCCESS;
}
static SQLRETURN CC_lookup_lo(ConnectionClass *self);
static int CC_close_eof_cursors(ConnectionClass *self);
static void LIBPQ_update_transaction_status(ConnectionClass *self);
static void CC_set_error_if_not_set(ConnectionClass *self, int errornumber, const char *errormsg, const char *func)
{
int errornum = CC_get_errornumber(self);
const char *errmsg = CC_get_errormsg(self);
if (errornumber == 0)
return;
if (errornumber > 0)
{
if (errornum <= 0)
CC_set_error(self, errornumber, errormsg, func);
else if (!errmsg)
CC_set_errormsg(self, errormsg);
}
else if (errornum == 0)
CC_set_error(self, errornumber, errormsg, func);
else if (errornum < 0 && !errmsg)
CC_set_errormsg(self, errormsg);
}
RETCODE SQL_API
PGAPI_AllocConnect(HENV henv,
HDBC * phdbc)
{
EnvironmentClass *env = (EnvironmentClass *) henv;
ConnectionClass *conn;
CSTR func = "PGAPI_AllocConnect";
MYLOG(0, "entering...\n");
conn = CC_Constructor();
MYLOG(0, "**** henv = %p, conn = %p\n", henv, conn);
if (!conn)
{
env->errormsg = "Couldn't allocate memory for Connection object.";
env->errornumber = ENV_ALLOC_ERROR;
*phdbc = SQL_NULL_HDBC;
EN_log_error(func, "", env);
return SQL_ERROR;
}
if (!EN_add_connection(env, conn))
{
env->errormsg = "Maximum number of connections exceeded.";
env->errornumber = ENV_ALLOC_ERROR;
CC_Destructor(conn);
*phdbc = SQL_NULL_HDBC;
EN_log_error(func, "", env);
return SQL_ERROR;
}
if (phdbc)
*phdbc = (HDBC) conn;
return SQL_SUCCESS;
}
RETCODE SQL_API
PGAPI_Connect(HDBC hdbc,
const SQLCHAR * szDSN,
SQLSMALLINT cbDSN,
const SQLCHAR * szUID,
SQLSMALLINT cbUID,
const SQLCHAR * szAuthStr,
SQLSMALLINT cbAuthStr)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
ConnInfo *ci;
CSTR func = "PGAPI_Connect";
RETCODE ret = SQL_SUCCESS;
char fchar, *tmpstr;
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
ci = &conn->connInfo;
CC_conninfo_init(ci, INIT_GLOBALS);
make_string(szDSN, cbDSN, ci->dsn, sizeof(ci->dsn));
/* get the values for the DSN from the registry */
getDSNinfo(ci, NULL);
logs_on_off(1, ci->drivers.debug, ci->drivers.commlog);
/* initialize pg_version from connInfo.protocol */
CC_initialize_pg_version(conn);
MYLOG(0, "entering..cbDSN=%hi.\n", cbDSN);
/*
* override values from DSN info with UID and authStr(pwd) This only
* occurs if the values are actually there.
*/
fchar = ci->username[0]; /* save the first byte */
make_string(szUID, cbUID, ci->username, sizeof(ci->username));
if ('\0' == ci->username[0]) /* an empty string is specified */
ci->username[0] = fchar; /* restore the original username */
tmpstr = make_string(szAuthStr, cbAuthStr, NULL, 0);
if (tmpstr)
{
if (tmpstr[0]) /* non-empty string is specified */
STR_TO_NAME(ci->password, tmpstr);
free(tmpstr);
}
MYLOG(0, "conn = %p (DSN='%s', UID='%s', PWD='%s')\n", conn, ci->dsn, ci->username, NAME_IS_VALID(ci->password) ? "xxxxx" : "");
if (ci->autobalance == 1) {
if (check_and_init(conn) != SQL_SUCCESS) {
return SQL_ERROR;
}
#ifdef WIN32
if (GetCurrentThreadId() != pgxc_node_thread_id)
#else
if (pthread_self() != pgxc_node_thread_id)
#endif
{
while (refresh_flag != 1) {
#ifdef WIN32
sleep(10);
#else
usleep(10000);
#endif
}
}
ret = connect_IP(conn);
}
else {
if ((fchar = CC_connect(conn, NULL)) <= 0) {
CC_log_error(func, "Error on CC_connect", conn);
ret = SQL_ERROR;
}
if (SQL_SUCCESS == ret && 2 == fchar) {
ret = SQL_SUCCESS_WITH_INFO;
}
MYLOG(0, "leaving..%d.\n", ret);
if (NAME_IS_VALID(ci->password)) {
memset(ci->password.name, 0, strlen(ci->password.name));
}
}
return ret;
}
RETCODE SQL_API
PGAPI_BrowseConnect(HDBC hdbc,
const SQLCHAR * szConnStrIn,
SQLSMALLINT cbConnStrIn,
SQLCHAR * szConnStrOut,
SQLSMALLINT cbConnStrOutMax,
SQLSMALLINT * pcbConnStrOut)
{
CSTR func = "PGAPI_BrowseConnect";
ConnectionClass *conn = (ConnectionClass *) hdbc;
MYLOG(0, "entering...\n");
CC_set_error(conn, CONN_NOT_IMPLEMENTED_ERROR, "Function not implemented", func);
return SQL_ERROR;
}
/* Drop any hstmts open on hdbc and disconnect from database */
RETCODE SQL_API
PGAPI_Disconnect(HDBC hdbc)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
CSTR func = "PGAPI_Disconnect";
MYLOG(0, "entering...\n");
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
if (conn->status == CONN_EXECUTING)
{
CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func);
return SQL_ERROR;
}
logs_on_off(-1, conn->connInfo.drivers.debug, conn->connInfo.drivers.commlog);
MYLOG(0, "about to CC_cleanup\n");
/* Close the connection and free statements */
CC_cleanup(conn, FALSE);
MYLOG(0, "done CC_cleanup\n");
MYLOG(0, "leaving...\n");
return SQL_SUCCESS;
}
RETCODE SQL_API
PGAPI_FreeConnect(HDBC hdbc)
{
ConnectionClass *conn = (ConnectionClass *) hdbc;
CSTR func = "PGAPI_FreeConnect";
EnvironmentClass *env;
MYLOG(0, "entering...hdbc=%p\n", hdbc);
if (!conn)
{
CC_log_error(func, "", NULL);
return SQL_INVALID_HANDLE;
}
/* Remove the connection from the environment */
if (NULL != (env = CC_get_env(conn)) &&
!EN_remove_connection(env, conn))
{
CC_set_error(conn, CONN_IN_USE, "A transaction is currently being executed", func);
return SQL_ERROR;
}
CC_Destructor(conn);
MYLOG(0, "leaving...\n");
return SQL_SUCCESS;
}
/*
* IMPLEMENTATION CONNECTION CLASS
*/
static void
reset_current_schema(ConnectionClass *self)
{
if (self->current_schema)
{
free(self->current_schema);
self->current_schema = NULL;
}
self->current_schema_valid = FALSE;
}
static ConnectionClass *
CC_alloc(void)
{
return (ConnectionClass *) calloc(sizeof(ConnectionClass), 1);
}
static void
CC_lockinit(ConnectionClass *self)
{
INIT_CONNLOCK(self);
INIT_CONN_CS(self);
}
static ConnectionClass *
CC_initialize(ConnectionClass *rv, BOOL lockinit)
{
size_t clear_size;
#if defined(WIN_MULTITHREAD_SUPPORT) || defined(POSIX_THREADMUTEX_SUPPORT)
clear_size = (char *)&(rv->cs) - (char *)rv;
#else
clear_size = sizeof(ConnectionClass);
#endif /* WIN_MULTITHREAD_SUPPORT */
memset(rv, 0, clear_size);
rv->status = CONN_NOT_CONNECTED;
rv->transact_status = CONN_IN_AUTOCOMMIT; /* autocommit by default */
rv->unnamed_prepared_stmt = NULL;
rv->stmts = (StatementClass **) malloc(sizeof(StatementClass *) * STMT_INCREMENT);
if (!rv->stmts)
goto cleanup;
memset(rv->stmts, 0, sizeof(StatementClass *) * STMT_INCREMENT);
rv->num_stmts = STMT_INCREMENT;
rv->descs = (DescriptorClass **) malloc(sizeof(DescriptorClass *) * STMT_INCREMENT);
if (!rv->descs)
goto cleanup;
memset(rv->descs, 0, sizeof(DescriptorClass *) * STMT_INCREMENT);
rv->num_descs = STMT_INCREMENT;
rv->lobj_type = PG_TYPE_LO_UNDEFINED;
if (isMsAccess())
rv->ms_jet = 1;
rv->isolation = 0; // means initially unknown server's default isolation
rv->mb_maxbyte_per_char = 1;
rv->max_identifier_length = -1;
rv->autocommit_public = SQL_AUTOCOMMIT_ON;
/* Initialize statement options to defaults */
/* Statements under this conn will inherit these options */
InitializeStatementOptions(&rv->stmtOptions);
InitializeARDFields(&rv->ardOptions);
InitializeAPDFields(&rv->apdOptions);
#ifdef _HANDLE_ENLIST_IN_DTC_
rv->asdum = NULL;
rv->gTranInfo = 0;
#endif /* _HANDLE_ENLIST_IN_DTC_ */
if (lockinit)
CC_lockinit(rv);
return rv;
cleanup:
CC_Destructor(rv);
return NULL;
}
ConnectionClass *
CC_Constructor()
{
ConnectionClass *rv, *retrv = NULL;
if (rv = CC_alloc(), NULL != rv)
retrv = CC_initialize(rv, TRUE);
return retrv;
}
char
CC_Destructor(ConnectionClass *self)
{
MYLOG(0, "entering self=%p\n", self);
if (self->status == CONN_EXECUTING)
return 0;
CC_cleanup(self, FALSE); /* cleanup socket and statements */
MYLOG(0, "after CC_Cleanup\n");
/* Free up statement holders */
if (self->stmts)
{
free(self->stmts);
self->stmts = NULL;
}
if (self->descs)
{
free(self->descs);
self->descs = NULL;
}
MYLOG(0, "after free statement holders\n");
NULL_THE_NAME(self->schemaIns);
NULL_THE_NAME(self->tableIns);
CC_conninfo_release(&self->connInfo);
if (self->__error_message)
free(self->__error_message);
DELETE_CONN_CS(self);
DELETE_CONNLOCK(self);
free(self);
MYLOG(0, "leaving\n");
return 1;
}
/* Return how many cursors are opened on this connection */
int
CC_cursor_count(ConnectionClass *self)
{
StatementClass *stmt;
int i,
count = 0;
QResultClass *res;
MYLOG(0, "self=%p, num_stmts=%d\n", self, self->num_stmts);
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt && (res = SC_get_Result(stmt)) && QR_get_cursor(res))
count++;
}
CONNLOCK_RELEASE(self);
MYLOG(0, "leaving %d\n", count);
return count;
}
void
CC_clear_error(ConnectionClass *self)
{
if (!self) return;
CONNLOCK_ACQUIRE(self);
self->__error_number = 0;
if (self->__error_message)
{
free(self->__error_message);
self->__error_message = NULL;
}
self->sqlstate[0] = '\0';
CONNLOCK_RELEASE(self);
}
void
CC_examine_global_transaction(ConnectionClass *self)
{
if (!self) return;
#ifdef _HANDLE_ENLIST_IN_DTC_
if (CC_is_in_global_trans(self))
CALL_IsolateDtcConn(self, TRUE);
#endif /* _HANDLE_ENLIST_IN_DTC_ */
}
CSTR bgncmd = "START TRANSACTION";
CSTR cmtcmd = "COMMIT";
CSTR rbkcmd = "ROLLBACK";
CSTR svpcmd = "SAVEPOINT";
CSTR per_query_svp = "_per_query_svp_";
CSTR rlscmd = "RELEASE";
/*
* Used to begin a transaction.
*/
char
CC_begin(ConnectionClass *self)
{
char ret = TRUE;
if (!CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, bgncmd, NULL, 0, NULL);
MYLOG(0, " sending BEGIN!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
return ret;
}
/*
* Used to commit a transaction.
* We are almost always in the middle of a transaction.
*/
char
CC_commit(ConnectionClass *self)
{
char ret = TRUE;
if (CC_is_in_trans(self))
{
if (!CC_is_in_error_trans(self))
CC_close_eof_cursors(self);
if (CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, cmtcmd, NULL, 0, NULL);
MYLOG(0, " sending COMMIT!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
}
return ret;
}
/*
* Used to cancel a transaction.
* We are almost always in the middle of a transaction.
*/
char
CC_abort(ConnectionClass *self)
{
char ret = TRUE;
if (CC_is_in_trans(self))
{
QResultClass *res = CC_send_query(self, rbkcmd, NULL, 0, NULL);
MYLOG(0, " sending ABORT!\n");
ret = QR_command_maybe_successful(res);
QR_Destructor(res);
}
return ret;
}
/* This is called by SQLSetConnectOption etc also */
char
CC_set_autocommit(ConnectionClass *self, BOOL on)
{
BOOL currsts = CC_is_in_autocommit(self);
if ((on && currsts) ||
(!on && !currsts))
return on;
MYLOG(0, " %d->%d\n", currsts, on);
if (CC_is_in_trans(self))
CC_commit(self);
if (on)
self->transact_status |= CONN_IN_AUTOCOMMIT;
else
self->transact_status &= ~CONN_IN_AUTOCOMMIT;
return on;
}
/* Clear cached table info */
static void
CC_clear_col_info(ConnectionClass *self, BOOL destroy)
{
if (self->col_info)
{
int i;
COL_INFO *coli;
for (i = 0; i < self->ntables; i++)
{
if (coli = self->col_info[i], NULL != coli)
{
if (destroy || coli->refcnt == 0)
{
free_col_info_contents(coli);
free(coli);
self->col_info[i] = NULL;
}
else
coli->acc_time = 0;
}
}
self->ntables = 0;
if (destroy)
{
free(self->col_info);
self->col_info = NULL;
self->coli_allocated = 0;
}
}
}
static void
CC_set_locale_encoding(ConnectionClass *self, const char * encoding)
{
char *currenc = self->locale_encoding;
if (encoding)
self->locale_encoding = strdup(encoding);
else
self->locale_encoding = NULL;
if (currenc)
free(currenc);
}
static void
CC_determine_locale_encoding(ConnectionClass *self)
{
const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding");
const char *encoding;
QLOG(0, "PQparameterStatus(%p, \"client_encoding\")=%s\n", self->pqconn, SAFE_STR(dbencoding));
if (self->locale_encoding) /* already set */
return;
encoding = derive_locale_encoding(dbencoding);
if (!encoding)
encoding = "SQL_ASCII";
CC_set_locale_encoding(self, encoding);
}
static void
CC_set_client_encoding(ConnectionClass *self, const char * encoding)
{
char *currenc = self->original_client_encoding;
if (encoding)
{
self->original_client_encoding = strdup(encoding);
self->ccsc = pg_CS_code(encoding);
}
else
{
self->original_client_encoding = NULL;
self->ccsc = SQL_ASCII;
}
self->mb_maxbyte_per_char = pg_mb_maxlen(self->ccsc);
if (currenc)
free(currenc);
}
int
CC_send_client_encoding(ConnectionClass *self, const char * encoding)
{
const char *dbencoding = PQparameterStatus(self->pqconn, "client_encoding");
if (encoding && (!dbencoding || stricmp(encoding, dbencoding)))
{
char query[64];
QResultClass *res;
BOOL cmd_success;
SPRINTF_FIXED(query, "set client_encoding to '%s'", encoding);
res = CC_send_query(self, query, NULL, 0, NULL);
cmd_success = QR_command_maybe_successful(res);
QR_Destructor(res);
if (!cmd_success)
return SQL_ERROR;
}
CC_set_client_encoding(self, encoding);
return SQL_SUCCESS;
}
/* This is called by SQLDisconnect also */
char
CC_cleanup(ConnectionClass *self, BOOL keepCommunication)
{
int i;
StatementClass *stmt;
DescriptorClass *desc;
if (self->status == CONN_EXECUTING)
return FALSE;
MYLOG(0, "entering self=%p\n", self);
ENTER_CONN_CS(self);
/* Cancel an ongoing transaction */
/* We are always in the middle of a transaction, */
/* even if we are in auto commit. */
if (self->pqconn)
{
QLOG(0, "PQfinish: %p\n", self->pqconn);
PQfinish(self->pqconn);
self->pqconn = NULL;
}
MYLOG(0, "after PQfinish\n");
/* Free all the stmts on this connection */
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt)
{
stmt->hdbc = NULL; /* prevent any more dbase interactions */
SC_Destructor(stmt);
self->stmts[i] = NULL;
}
}
/* Free all the descs on this connection */
for (i = 0; i < self->num_descs; i++)
{
desc = self->descs[i];
if (desc)
{
DC_get_conn(desc) = NULL; /* prevent any more dbase interactions */
DC_Destructor(desc);
free(desc);
self->descs[i] = NULL;
}
}
/* Check for translation dll */
#ifdef WIN32
if (!keepCommunication && self->translation_handle)
{
FreeLibrary(self->translation_handle);
self->translation_handle = NULL;
}
#endif
if (!keepCommunication)
{
self->status = CONN_NOT_CONNECTED;
self->transact_status = CONN_IN_AUTOCOMMIT;
self->unnamed_prepared_stmt = NULL;
}
if (!keepCommunication)
{
CC_conninfo_init(&(self->connInfo), CLEANUP_FOR_REUSE);
if (self->original_client_encoding)
{
free(self->original_client_encoding);
self->original_client_encoding = NULL;
}
if (self->locale_encoding)
{
free(self->locale_encoding);
self->locale_encoding = NULL;
}
if (self->server_encoding)
{
free(self->server_encoding);
self->server_encoding = NULL;
}
reset_current_schema(self);
}
/* Free cached table info */
CC_clear_col_info(self, TRUE);
if (self->num_discardp > 0 && self->discardp)
{
for (i = 0; i < self->num_discardp; i++)
free(self->discardp[i]);
self->num_discardp = 0;
}
if (self->discardp)
{
free(self->discardp);
self->discardp = NULL;
}
LEAVE_CONN_CS(self);
MYLOG(0, "leaving\n");
return TRUE;
}
int
CC_set_translation(ConnectionClass *self)
{
#ifdef WIN32
CSTR func = "CC_set_translation";
if (self->translation_handle != NULL)
{
FreeLibrary(self->translation_handle);
self->translation_handle = NULL;
}
if (self->connInfo.translation_dll[0] == 0)
return TRUE;
self->translation_option = atoi(self->connInfo.translation_option);
self->translation_handle = LoadLibrary(self->connInfo.translation_dll);
if (self->translation_handle == NULL)
{
CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not load the translation DLL.", func);
return FALSE;
}
self->DataSourceToDriver
= (DataSourceToDriverProc) GetProcAddress(self->translation_handle,
"SQLDataSourceToDriver");
self->DriverToDataSource
= (DriverToDataSourceProc) GetProcAddress(self->translation_handle,
"SQLDriverToDataSource");
if (self->DataSourceToDriver == NULL || self->DriverToDataSource == NULL)
{
CC_set_error(self, CONN_UNABLE_TO_LOAD_DLL, "Could not find translation DLL functions.", func);
return FALSE;
}
#endif
return TRUE;
}
#ifndef PG_DIAG_SEVERITY_NONLOCALIZED
#define PG_DIAG_SEVERITY_NONLOCALIZED 'V'
#endif
void
handle_pgres_error(ConnectionClass *self, const PGresult *pgres,
const char *comment,
QResultClass *res, BOOL error_not_a_notice)
{
char *errseverity;
char *errseverity_nonloc = NULL;
char *errprimary = NULL;
char *errmsg = NULL;
size_t errmsglen;
char *sqlstate = NULL;
int level = MIN_LOG_LEVEL;
MYLOG(DETAIL_LOG_LEVEL, "entering\n");
sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
if (res && pgres)
{
if (sqlstate)
STRCPY_FIXED(res->sqlstate, sqlstate);
}
if (NULL == pgres &&
NULL == self->pqconn)
{
const char *errmsg = "The connection has been lost";
MYLOG(0, "setting error message=%s\n", errmsg);
QLOG(0, "\t%ssetting error message=%s\n", __FUNCTION__, errmsg);
if (CC_get_errornumber(self) <= 0)
CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, comment);
if (res)
{
QR_set_rstatus(res, PORES_FATAL_ERROR);
QR_set_message(res, errmsg);
}
goto cleanup;
}
/*
* The full message with details and context and everything could
* be obtained with PQresultErrorMessage(). I think that would be
* more user-friendly, but for now, construct a message with
* severity and primary message, which is backwards compatible.
*/
errseverity = PQresultErrorField(pgres, PG_DIAG_SEVERITY);
if (PG_VERSION_GE(self, 9.6))
{
errseverity_nonloc = PQresultErrorField(pgres, PG_DIAG_SEVERITY_NONLOCALIZED);
MYLOG(0, "PG_DIAG_SEVERITY_NONLOCALIZED=%s\n", SAFE_STR(errseverity_nonloc));
}
if (!error_not_a_notice)
{
if (errseverity_nonloc)
{
if (stricmp(errseverity_nonloc, "NOTICE") != 0)
level = 1;
}
else if (errseverity)
{
if (stricmp(errseverity, "NOTICE") != 0)
level = 1;
}
}
errprimary = PQresultErrorField(pgres, PG_DIAG_MESSAGE_PRIMARY);
if (errseverity_nonloc)
QLOG(level, "\t%s(%s) %s '%s'\n", errseverity_nonloc, SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary));
else
QLOG(level, "\t(%s) %s '%s'\n", SAFE_STR(errseverity), SAFE_STR(sqlstate), SAFE_STR(errprimary));
if (errprimary == NULL)
{
/* Hmm. got no primary message. Check if there's a connection error */
if (self->pqconn)
errprimary = PQerrorMessage(self->pqconn);
if (errprimary == NULL)
errprimary = "no error information";
}
if (errseverity && errprimary)
{
errmsglen = strlen(errseverity) + 2 + strlen(errprimary) + 1;
errmsg = malloc(errmsglen);
if (errmsg)
snprintf(errmsg, errmsglen, "%s: %s", errseverity, errprimary);
}
if (errmsg == NULL)
errmsg = errprimary;
if (!error_not_a_notice) /* warning, notice, log etc */
{
MYLOG(0, "notice message %s\n", errmsg);
if (res)
{
if (QR_command_successful(res))
QR_set_rstatus(res, PORES_NONFATAL_ERROR); /* notice or warning */
QR_add_notice(res, errmsg); /* will dup this string */
}
goto cleanup;
}
MYLOG(0, "error message=%s(" FORMAT_SIZE_T ")\n", errmsg, strlen(errmsg));
if (res)
{
QR_set_rstatus(res, PORES_FATAL_ERROR); /* error or fatal */
if (errmsg[0])
QR_set_message(res, errmsg);
QR_set_aborted(res, TRUE);
}
/*
* If the error is continuable after rollback?
*/
if (PQstatus(self->pqconn) == CONNECTION_BAD)
{
CC_set_errornumber(self, CONNECTION_COMMUNICATION_ERROR);
CC_on_abort(self, CONN_DEAD); /* give up the connection */
}
else if ((errseverity_nonloc && strcmp(errseverity_nonloc, "FATAL") == 0) ||
(NULL == errseverity_nonloc && errseverity && strcmp(errseverity, "FATAL") == 0)) /* no */
{
CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_FATAL);
CC_on_abort(self, CONN_DEAD); /* give up the connection */
}
else /* yes */
{
CC_set_errornumber(self, CONNECTION_SERVER_REPORTED_SEVERITY_ERROR);
if (CC_is_in_trans(self))
CC_set_in_error_trans(self);
}
/* If any error/warning/notice happened, there should be a message in connection,
* espacially while a connection is being created.
* While a connection is being created, postgresql never returns any error. But in
* our cluster, there maybe a warning or a notice when GTM is in trouble.
*/
CC_set_errormsg(self, errmsg);
cleanup:
if (errmsg != errprimary)
free(errmsg);
LIBPQ_update_transaction_status(self);
}
/*
* This is a libpq notice receiver callback, for handling incoming NOTICE
* messages while processing a query.
*/
typedef struct
{
ConnectionClass *conn;
const char *comment;
QResultClass *res;
} notice_receiver_arg;
void
receive_libpq_notice(void *arg, const PGresult *pgres)
{
if (arg != NULL)
{
notice_receiver_arg *nrarg = (notice_receiver_arg *) arg;
handle_pgres_error(nrarg->conn, pgres, nrarg->comment, nrarg->res, FALSE);
}
}
static char CC_initial_log(ConnectionClass *self, const char *func)
{
const ConnInfo *ci = &self->connInfo;
char *encoding, vermsg[128];
snprintf(vermsg, sizeof(vermsg), "Driver Version='%s,%s'"
#ifdef WIN32
" linking %d"
#ifdef _MT
#ifdef _DLL
" dynamic"
#else
" static"
#endif /* _DLL */
" Multithread"
#else
" Singlethread"
#endif /* _MT */
#ifdef _DEBUG
" Debug"
#endif /* DEBUG */
" library"
#endif /* WIN32 */
"\n", POSTGRESDRIVERVERSION, __DATE__
#ifdef _MSC_VER
, _MSC_VER
#endif /* _MSC_VER */
);
QLOG(0, "%s", vermsg);
MYLOG(DETAIL_LOG_LEVEL, "Global Options: fetch=%d, unknown_sizes=%d, max_varchar_size=%d, max_longvarchar_size=%d\n",
ci->drivers.fetch_max,
ci->drivers.unknown_sizes,
ci->drivers.max_varchar_size,
ci->drivers.max_longvarchar_size);
MYLOG(DETAIL_LOG_LEVEL, " unique_index=%d, use_declarefetch=%d\n",
ci->drivers.unique_index,
ci->drivers.use_declarefetch);
MYLOG(DETAIL_LOG_LEVEL, " text_as_longvarchar=%d, unknowns_as_longvarchar=%d, bools_as_char=%d NAMEDATALEN=%d\n",
ci->drivers.text_as_longvarchar,
ci->drivers.unknowns_as_longvarchar,
ci->drivers.bools_as_char,
TABLE_NAME_STORAGE_LEN);
if (NULL == self->locale_encoding)
{
encoding = check_client_encoding(ci->conn_settings);
CC_set_locale_encoding(self, encoding);
MYLOG(DETAIL_LOG_LEVEL, " extra_systable_prefixes='%s', conn_settings='%s' conn_encoding='%s'\n",
ci->drivers.extra_systable_prefixes,
PRINT_NAME(ci->conn_settings),
encoding ? encoding : "");
if (NULL != encoding)
{
free(encoding);
}
}
if (self->status == CONN_DOWN)
{
CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Connection broken.", func);
return 0;
}
else if (self->status != CONN_NOT_CONNECTED)
{
CC_set_error_if_not_set(self, CONN_OPENDB_ERROR, "Already connected.", func);
return 0;
}
MYLOG(0, "DSN = '%s', server = '%s', port = '%s', database = '%s'\n", ci->dsn, ci->server, ci->port, ci->database);
return 1;
}
static int handle_show_results(const QResultClass *res);
#define TRANSACTION_ISOLATION "transaction_isolation"
#define ISOLATION_SHOW_QUERY "show " TRANSACTION_ISOLATION
static char
LIBPQ_CC_connect(ConnectionClass *self, char *salt_para)
{
int ret;
CSTR func = "LIBPQ_CC_connect";
QResultClass *res;
MYLOG(0, "entering...\n");
if (0 == CC_initial_log(self, func))
return 0;
if (ret = LIBPQ_connect(self), ret <= 0)
return ret;
res = CC_send_query(self, "SET DateStyle = 'ISO';SET extra_float_digits = 2;" ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
{
handle_show_results(res);
ret = 1;
}
else
ret = 0;
QR_Destructor(res);
return ret;
}
RETCODE
CC_detect_batch_proto(ConnectionClass *self)
{
const char *value = NULL;
const char *query = "select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on';";
PGresult *res = NULL;
ExecStatusType restype;
int cnt = 0;
RETCODE ret = SQL_SUCCESS;
CSTR func = "CC_detect_batch_proto";
mylog("%s: entering...\n", func);
qlog(" [%s : conn = %p, query = select count(*) from pg_settings where name = 'support_batch_bind' and setting = 'on']\n",
func, self);
res = PQexec(self->pqconn, query);
if (NULL == res)
{
self->connInfo.backend_support_batch_proto = 0;
mylog("%s: NULL result detected, backend_support_batch_proto set to 0", func);
qlog(" [%s : conn = %p, query result = NULL]\n",
func, self);
return SQL_ERROR;
}
restype = PQresultStatus(res);
if (restype != PGRES_TUPLES_OK &&
restype != PGRES_SINGLE_TUPLE)
{
self->connInfo.backend_support_batch_proto = 0;
PQclear(res);
mylog("%s: invalid result type %d detected, backend_support_batch_proto set to 0", func, restype);
qlog(" [%s : conn = %p, query result type = %d]\n",
func, self, restype);
return SQL_ERROR;
}
if (PQntuples(res) < 1 ||
PQnfields(res) < 1)
{
self->connInfo.backend_support_batch_proto = 0;
mylog("%s: invalid result rowsCount(%d) or colsCount(%d) detected, backend_support_batch_proto set to 0",
func, PQntuples(res), PQnfields(res));
qlog(" [%s : conn = %p, invalid query result : %d rows, %d columns]\n",
func, self, PQntuples(res), PQnfields(res));
PQclear(res);
return SQL_ERROR;
}
value = PQgetvalue(res, 0, 0);
cnt = (value ? atoi(value) : 0);
PQclear(res);
if (cnt < 1)
self->connInfo.backend_support_batch_proto = 0;
else
self->connInfo.backend_support_batch_proto = 1;
if (0 == self->connInfo.backend_support_batch_proto &&
self->connInfo.use_batch_protocol)
{
CC_set_error(self, CONN_UNSUPPORTED_OPTION,
"Backend does not support batch bind protocol, \"" INI_USEBATCHPROTOCOL "\" disabled.", func);
ret = SQL_SUCCESS_WITH_INFO;
}
mylog("%s: query result: %d, backend_support_batch_proto set to %d",
func, cnt, self->connInfo.backend_support_batch_proto);
qlog(" [%s : conn = %p, query result : %d ]\n",
func, self, cnt);
mylog("%s: exiting\n", func);
return ret;
}
char
CC_connect(ConnectionClass *self, char *salt_para)
{
ConnInfo *ci = &(self->connInfo);
CSTR func = "CC_connect";
char ret, *saverr = NULL, retsend;
const char *errmsg = NULL;
RETCODE batchDetectRet = 0;
MYLOG(0, "entering...sslmode=%s\n", self->connInfo.sslmode);
ret = LIBPQ_CC_connect(self, salt_para);
if (ret <= 0)
return ret;
CC_set_translation(self);
/*
* Send any initial settings
*/
/*
* Since these functions allocate statements, and since the connection
* is not established yet, it would violate odbc state transition
* rules. Therefore, these functions call the corresponding local
* function instead.
*/
/* Per Datasource settings */
retsend = CC_send_settings(self, GET_NAME(self->connInfo.conn_settings));
if (CONN_DOWN == self->status)
{
ret = 0;
goto cleanup;
}
if (CC_get_errornumber(self) > 0 &&
NULL != (errmsg = CC_get_errormsg(self)))
saverr = strdup(errmsg);
CC_clear_error(self); /* clear any error */
if (!SQL_SUCCEEDED(CC_lookup_lo(self))) /* a hack to get the oid of our large object oid type */
{
ret = 0;
goto cleanup;
}
/*
* Multibyte handling
*
* Send 'UTF8' when required Unicode behavior, otherwise send
* locale encodings.
*/
CC_clear_error(self);
CC_determine_locale_encoding(self); /* determine the locale_encoding */
#ifdef UNICODE_SUPPORT
if (CC_is_in_unicode_driver(self))
{
if (!SQL_SUCCEEDED(CC_send_client_encoding(self, "UTF8")))
{
ret = 0;
goto cleanup;
}
}
else /* for unicode drivers require ANSI behavior */
#endif /* UNICODE_SUPPORT */
{
if (!SQL_SUCCEEDED(CC_send_client_encoding(self, self->locale_encoding)))
{
ret = 0;
goto cleanup;
}
}
CC_clear_error(self);
if (self->server_isolation != self->isolation)
if (!CC_set_transact(self, self->isolation))
{
ret = 0;
goto cleanup;
}
ci_updatable_cursors_set(ci);
if (CC_get_errornumber(self) > 0)
CC_clear_error(self); /* clear any initial command errors */
self->status = CONN_CONNECTED;
if (CC_is_in_unicode_driver(self)
&& (CC_is_in_ansi_app(self) || 0 < ci->bde_environment))
self->unicode |= CONN_DISALLOW_WCHAR;
MYLOG(0, "conn->unicode=%d Client Encoding='%s' (Code %d)\n", self->unicode, self->original_client_encoding, self->ccsc);
batchDetectRet = CC_detect_batch_proto(self);
if (batchDetectRet == SQL_ERROR)
goto cleanup;
else if (batchDetectRet == SQL_SUCCESS_WITH_INFO)
{
/* Caller will recognize 2 as SQL_SUCCESS_WITH_INFO. */
ret = 2;
goto cleanup;
}
ret = 1;
cleanup:
MYLOG(0, "leaving...%d\n", ret);
if (NULL != saverr)
{
if (ret > 0 && CC_get_errornumber(self) <= 0)
CC_set_error(self, -1, saverr, func);
free(saverr);
}
if (1 == ret && FALSE == retsend)
ret = 2;
return ret;
}
char
CC_add_statement(ConnectionClass *self, StatementClass *stmt)
{
int i;
char ret = TRUE;
MYLOG(0, "self=%p, stmt=%p\n", self, stmt);
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (!self->stmts[i])
{
stmt->hdbc = self;
self->stmts[i] = stmt;
break;
}
}
if (i >= self->num_stmts) /* no more room -- allocate more memory */
{
StatementClass **newstmts;
Int2 new_num_stmts;
new_num_stmts = STMT_INCREMENT + self->num_stmts;
if (new_num_stmts > 0)
newstmts = (StatementClass **)
realloc(self->stmts, sizeof(StatementClass *) * new_num_stmts);
else
newstmts = NULL; /* num_stmts overflowed */
if (!newstmts)
ret = FALSE;
else
{
self->stmts = newstmts;
memset(&self->stmts[self->num_stmts], 0, sizeof(StatementClass *) * STMT_INCREMENT);
stmt->hdbc = self;
self->stmts[self->num_stmts] = stmt;
self->num_stmts = new_num_stmts;
}
}
CONNLOCK_RELEASE(self);
return ret;
}
static void
CC_set_error_statements(ConnectionClass *self)
{
int i;
MYLOG(0, "entering self=%p\n", self);
for (i = 0; i < self->num_stmts; i++)
{
if (NULL != self->stmts[i])
SC_ref_CC_error(self->stmts[i]);
}
}
char
CC_remove_statement(ConnectionClass *self, StatementClass *stmt)
{
int i;
char ret = FALSE;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (self->stmts[i] == stmt && stmt->status != STMT_EXECUTING)
{
self->stmts[i] = NULL;
ret = TRUE;
break;
}
}
CONNLOCK_RELEASE(self);
return ret;
}
char CC_get_escape(const ConnectionClass *self)
{
const char *scf;
static const ConnectionClass *conn = NULL;
scf = PQparameterStatus(self->pqconn, "standard_conforming_strings");
if (self != conn)
{
QLOG(0, "PQparameterStatus(%p, \"standard_conforming_strings\")=%s\n", self->pqconn, SAFE_STR(scf));
conn = self;
}
if (scf == NULL)
{
/* we're connected to a pre-8.1 server, and E'' is not supported */
return '\0';
}
if (strcmp(scf, "on") != 0)
return ESCAPE_IN_LITERAL;
else
return '\0';
}
int CC_get_max_idlen(ConnectionClass *self)
{
int len = self->max_identifier_length;
if (len < 0)
{
QResultClass *res;
res = CC_send_query(self, "show max_identifier_length", NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
len = self->max_identifier_length = QR_get_value_backend_int(res, 0, 0, FALSE);
QR_Destructor(res);
}
MYLOG(0, "max_identifier_length=%d\n", len);
return len < 0 ? 0 : len;
}
static SQLINTEGER
isolation_str_to_enum(const char *str_isolation)
{
SQLINTEGER isolation = 0;
if (strnicmp(str_isolation, "seri", 4) == 0)
isolation = SQL_TXN_SERIALIZABLE;
else if (strnicmp(str_isolation, "repe", 4) == 0)
isolation = SQL_TXN_REPEATABLE_READ;
else if (strnicmp(str_isolation, "read com", 8) == 0)
isolation = SQL_TXN_READ_COMMITTED;
else if (strnicmp(str_isolation, "read unc", 8) == 0)
isolation = SQL_TXN_READ_UNCOMMITTED;
return isolation;
}
static int handle_show_results(const QResultClass *res)
{
int count = 0;
const QResultClass *qres;
ConnectionClass *conn = QR_get_conn(res);
for (qres = res; qres; qres = qres->next)
{
if (!qres->command ||
stricmp(qres->command, "SHOW") != 0)
continue;
if (strcmp(QR_get_fieldname(qres, 0), TRANSACTION_ISOLATION) == 0)
{
conn->server_isolation = isolation_str_to_enum(QR_get_value_backend_text(qres, 0, 0));
MYLOG(0, "isolation %d to be %d\n", conn->server_isolation, conn->isolation);
if (0 == conn->isolation)
conn->isolation = conn->server_isolation;
if (0 == conn->default_isolation)
conn->default_isolation = conn->server_isolation;
count++;
}
}
return count;
}
/*
* This function may not be called as long as ISOLATION_SHOW_QUERY is
* issued in LIBPQ_CC_connect.
*/
SQLUINTEGER CC_get_isolation(ConnectionClass *self)
{
SQLUINTEGER isolation = 0;
QResultClass *res;
res = CC_send_query(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL);
if (QR_command_maybe_successful(res))
{
handle_show_results(res);
isolation = self->server_isolation;
}
QR_Destructor(res);
MYLOG(0, "isolation=%d\n", isolation);
return isolation;
}
void
CC_set_error(ConnectionClass *self, int number, const char *message, const char *func)
{
CONNLOCK_ACQUIRE(self);
if (self->__error_message)
free(self->__error_message);
self->__error_number = number;
self->__error_message = message ? strdup(message) : NULL;
if (0 != number)
CC_set_error_statements(self);
if (func && number != 0)
CC_log_error(func, "", self);
CONNLOCK_RELEASE(self);
}
void
CC_set_errormsg(ConnectionClass *self, const char *message)
{
CONNLOCK_ACQUIRE(self);
if (self->__error_message)
free(self->__error_message);
self->__error_message = message ? strdup(message) : NULL;
CONNLOCK_RELEASE(self);
}
char
CC_get_error(ConnectionClass *self, int *number, char **message)
{
int rv;
MYLOG(0, "entering\n");
CONNLOCK_ACQUIRE(self);
if (CC_get_errornumber(self))
{
*number = CC_get_errornumber(self);
*message = CC_get_errormsg(self);
}
rv = (CC_get_errornumber(self) != 0);
CONNLOCK_RELEASE(self);
MYLOG(0, "leaving\n");
return rv;
}
static int CC_close_eof_cursors(ConnectionClass *self)
{
int i, ccount = 0;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return ccount;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
if (stmt = self->stmts[i], NULL == stmt)
continue;
if (res = SC_get_Result(stmt), NULL == res)
continue;
if (NULL != QR_get_cursor(res) &&
QR_is_withhold(res) &&
QR_once_reached_eof(res))
{
if (QR_get_num_cached_tuples(res) >= QR_get_num_total_tuples(res) ||
SQL_CURSOR_FORWARD_ONLY == stmt->options.cursor_type)
{
QR_close(res);
ccount++;
}
}
}
CONNLOCK_RELEASE(self);
return ccount;
}
static void CC_clear_cursors(ConnectionClass *self, BOOL on_abort)
{
int i;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (stmt && (res = SC_get_Result(stmt)) &&
(NULL != QR_get_cursor(res)))
{
/*
* non-holdable cursors are automatically closed
* at commit time.
* all non-permanent cursors are automatically closed
* at rollback time.
*/
if ((on_abort && !QR_is_permanent(res)) ||
!QR_is_withhold(res))
{
QR_on_close_cursor(res);
}
else if (!QR_is_permanent(res))
{
QResultClass *wres;
char cmd[64];
if (QR_needs_survival_check(res))
{
SPRINTF_FIXED(cmd, "MOVE 0 in \"%s\"", QR_get_cursor(res));
CONNLOCK_RELEASE(self);
wres = CC_send_query(self, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL);
QR_set_no_survival_check(res);
if (QR_command_maybe_successful(wres) &&
CONN_ERROR_IGNORED != CC_get_errornumber(self))
QR_set_permanent(res);
else
QR_set_cursor(res, NULL);
QR_Destructor(wres);
CONNLOCK_ACQUIRE(self);
MYLOG(DETAIL_LOG_LEVEL, "%p->permanent -> %d %p\n", res, QR_is_permanent(res), QR_get_cursor(res));
}
else
QR_set_permanent(res);
}
}
}
CONNLOCK_RELEASE(self);
}
static void CC_mark_cursors_doubtful(ConnectionClass *self)
{
int i;
StatementClass *stmt;
QResultClass *res;
if (!self->ncursors)
return;
CONNLOCK_ACQUIRE(self);
for (i = 0; i < self->num_stmts; i++)
{
stmt = self->stmts[i];
if (NULL != stmt &&
NULL != (res = SC_get_Result(stmt)) &&
NULL != QR_get_cursor(res) &&
!QR_is_permanent(res))
QR_set_survival_check(res);
}
CONNLOCK_RELEASE(self);
}
void CC_on_commit(ConnectionClass *conn)
{
if (conn->on_commit_in_progress)
return;
conn->on_commit_in_progress = 1;
CONNLOCK_ACQUIRE(conn);
if (CC_is_in_trans(conn))
{
CC_set_no_trans(conn);
CC_set_no_manual_trans(conn);
}
CC_svp_init(conn);
CC_start_stmt(conn);
CC_clear_cursors(conn, FALSE);
CONNLOCK_RELEASE(conn);
CC_discard_marked_objects(conn);
CONNLOCK_ACQUIRE(conn);
if (conn->result_uncommitted)
{
CONNLOCK_RELEASE(conn);
ProcessRollback(conn, FALSE, FALSE);
CONNLOCK_ACQUIRE(conn);
conn->result_uncommitted = 0;
}
CONNLOCK_RELEASE(conn);
conn->on_commit_in_progress = 0;
}
void CC_on_abort(ConnectionClass *conn, unsigned int opt)
{
BOOL set_no_trans = FALSE;
MYLOG(0, "entering opt=%x\n", opt);
CONNLOCK_ACQUIRE(conn);
if (0 != (opt & CONN_DEAD)) /* CONN_DEAD implies NO_TRANS also */
opt |= NO_TRANS;
if (CC_is_in_trans(conn))
{
if (0 != (opt & NO_TRANS))
{
CC_set_no_trans(conn);
CC_set_no_manual_trans(conn);
set_no_trans = TRUE;
}
}
CC_svp_init(conn);
CC_start_stmt(conn);
CC_clear_cursors(conn, TRUE);
if (0 != (opt & CONN_DEAD))
{
conn->status = CONN_DOWN;
if (conn->pqconn)
{
CONNLOCK_RELEASE(conn);
QLOG(0, "PQfinish: %p\n", conn->pqconn);
PQfinish(conn->pqconn);
CONNLOCK_ACQUIRE(conn);
conn->pqconn = NULL;
}
}
else if (set_no_trans)
{
CONNLOCK_RELEASE(conn);
CC_discard_marked_objects(conn);
CONNLOCK_ACQUIRE(conn);
}
if (conn->result_uncommitted)
{
CONNLOCK_RELEASE(conn);
ProcessRollback(conn, TRUE, FALSE);
CONNLOCK_ACQUIRE(conn);
conn->result_uncommitted = 0;
}
CONNLOCK_RELEASE(conn);
}
void CC_on_abort_partial(ConnectionClass *conn)
{
MYLOG(0, "entering\n");
CONNLOCK_ACQUIRE(conn);
ProcessRollback(conn, TRUE, TRUE);
CC_discard_marked_objects(conn);
CONNLOCK_RELEASE(conn);
}
static BOOL
is_setting_search_path(const char *query)
{
const char *q = query;
if (strnicmp(q, "set", 3) != 0)
return FALSE;
q += 3;
while (isspace(*q)) q++;
for (; *q;)
{
if (IS_NOT_SPACE(*q))
{
if (strnicmp(q, "search_path", 11) == 0)
return TRUE;
q++;
while (IS_NOT_SPACE(*q))
q++;
}
else
q++;
}
return FALSE;
}
static BOOL
CC_from_PGresult(QResultClass *res, StatementClass *stmt,
ConnectionClass *conn, const char *cursor, PGresult **pgres)
{
BOOL success = TRUE;
if (!QR_from_PGresult(res, stmt, conn, cursor, pgres))
{
QLOG(0, "\tGetting result from PGresult failed\n");
success = FALSE;
if (0 >= CC_get_errornumber(conn))
{
switch (QR_get_rstatus(res))
{
case PORES_NO_MEMORY_ERROR:
CC_set_error(conn, CONN_NO_MEMORY_ERROR, NULL, __FUNCTION__);
break;
case PORES_BAD_RESPONSE:
CC_set_error(conn, CONNECTION_COMMUNICATION_ERROR, "communication error occured", __FUNCTION__);
break;
default:
CC_set_error(conn, CONN_EXEC_ERROR, QR_get_message(res), __FUNCTION__);
break;
}
}
}
return success;
}
int
CC_internal_rollback(ConnectionClass *self, int rollback_type, BOOL ignore_abort)
{
int ret = 0;
char cmd[128];
PGresult *pgres = NULL;
if (!CC_is_in_error_trans(self))
return 1;
switch (rollback_type)
{
case PER_STATEMENT_ROLLBACK:
GenerateSvpCommand(self, INTERNAL_ROLLBACK_OPERATION, cmd, sizeof(cmd));
QLOG(0, "PQexec: %p '%s'\n", self->pqconn, cmd);
pgres = PQexec(self->pqconn, cmd);
switch (PQresultStatus(pgres))
{
case PGRES_COMMAND_OK:
QLOG(0, "\tok: - 'C' - %s\n", PQcmdStatus(pgres));
case PGRES_NONFATAL_ERROR:
ret = 1;
if (ignore_abort)
CC_set_no_error_trans(self);
LIBPQ_update_transaction_status(self);
break;
default:
handle_pgres_error(self, pgres, __FUNCTION__, NULL, TRUE);
break;
}
break;
case PER_QUERY_ROLLBACK:
SPRINTF_FIXED(cmd, "%s TO %s;%s %s"
, rbkcmd, per_query_svp , rlscmd, per_query_svp);
QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, cmd);
PQsendQuery(self->pqconn, cmd);
ret = 0;
while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL)
{
switch (PQresultStatus(pgres))
{
case PGRES_COMMAND_OK:
QLOG(0, "\tok: - 'C' - %s\n", PQcmdTuples(pgres));
ret = 1;
break;
case PGRES_NONFATAL_ERROR:
ret = 1;
default:
handle_pgres_error(self, pgres, __FUNCTION__, NULL, !ret);
}
}
if (!ret)
{
if (ignore_abort)
CC_set_no_error_trans(self);
else
MYLOG(0, " return error\n");
}
LIBPQ_update_transaction_status(self);
break;
}
if (pgres)
PQclear(pgres);
return ret;
}
/*
* The "result_in" is only used by QR_next_tuple() to fetch another group of rows into
* the same existing QResultClass (this occurs when the tuple cache is depleted and
* needs to be re-filled).
*
* The "cursor" is used by SQLExecute to associate a statement handle as the cursor name
* (i.e., C3326857) for SQL select statements. This cursor is then used in future
* 'declare cursor C3326857 for ...' and 'fetch 100 in C3326857' statements.
*
* * If issue_begin, send "BEGIN"
* * if needed, send "SAVEPOINT ..."
* * Send "query", read result
* * Send appendq, read result.
*
*/
QResultClass *
CC_send_query_append(ConnectionClass *self, const char *query, QueryInfo *qi, UDWORD flag, StatementClass *stmt, const char *appendq)
{
CSTR func = "CC_send_query";
QResultClass *cmdres = NULL,
*retres = NULL,
*res = NULL;
BOOL ignore_abort_on_conn = ((flag & IGNORE_ABORT_ON_CONN) != 0),
create_keyset = ((flag & CREATE_KEYSET) != 0),
issue_begin = ((flag & GO_INTO_TRANSACTION) != 0 && !CC_is_in_trans(self)),
rollback_on_error, query_rollback, end_with_commit,
read_only, prepend_savepoint = FALSE,
ignore_roundtrip_time = ((self->connInfo.extra_opts & BIT_IGNORE_ROUND_TRIP_TIME) != 0);
char *ptr;
BOOL ReadyToReturn = FALSE,
query_completed = FALSE,
aborted = FALSE,
used_passed_result_object = FALSE,
discard_next_begin = FALSE,
discard_next_savepoint = FALSE,
discard_next_release = FALSE,
consider_rollback;
BOOL discardTheRest = FALSE;
int func_cs_count = 0;
PQExpBufferData query_buf = {0};
size_t query_len;
/* QR_set_command() dups this string so doesn't need static */
char *cmdbuffer;
PGresult *pgres = NULL;
notice_receiver_arg nrarg;
if (appendq)
{
MYLOG(0, "conn=%p, query='%s'+'%s'\n", self, query, appendq);
}
else
{
MYLOG(0, "conn=%p, query='%s'\n", self, query);
}
if (!self->pqconn)
{
PQExpBufferData pbuf = {0};
initPQExpBuffer(&pbuf);
appendPQExpBuffer(&pbuf, "The connection is down\nFailed to send '%s'", query);
CC_set_error(self, CONNECTION_COULD_NOT_SEND, pbuf.data, func);
termPQExpBuffer(&pbuf);
return NULL;
}
ENTER_INNER_CONN_CS(self, func_cs_count);
/* Indicate that we are sending a query to the backend */
if ((NULL == query) || (query[0] == '\0'))
{
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
return NULL;
}
/*
* In case the round trip time can be ignored, the query
* and the appeneded query would be issued separately.
* Otherwise a multiple command query would be issued.
*/
if (appendq && ignore_roundtrip_time)
{
res = CC_send_query_append(self, query, qi, flag, stmt, NULL);
if (QR_command_maybe_successful(res))
{
cmdres = CC_send_query_append(self, appendq, qi, flag & (~(GO_INTO_TRANSACTION)), stmt, NULL);
if (QR_command_maybe_successful(cmdres))
res->next = cmdres;
else
{
QR_Destructor(res);
res = cmdres;
}
}
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
return res;
}
rollback_on_error = (flag & ROLLBACK_ON_ERROR) != 0;
end_with_commit = (flag & END_WITH_COMMIT) != 0;
read_only = (flag & READ_ONLY_QUERY) != 0;
#define return DONT_CALL_RETURN_FROM_HERE???
consider_rollback = (issue_begin || (CC_is_in_trans(self) && !CC_is_in_error_trans(self)) || strnicmp(query, "begin", 5) == 0);
if (rollback_on_error)
rollback_on_error = consider_rollback;
query_rollback = (rollback_on_error && !end_with_commit && PG_VERSION_GE(self, 8.0));
if (!query_rollback && consider_rollback && !end_with_commit)
{
if (stmt)
{
StatementClass *astmt = SC_get_ancestor(stmt);
unsigned int svpopt = 0;
if (read_only)
svpopt |= SVPOPT_RDONLY;
if (!ignore_roundtrip_time)
svpopt |= SVPOPT_REDUCE_ROUNDTRIP;
if (!CC_started_rbpoint(self))
{
if (SQL_ERROR == SetStatementSvp(astmt, svpopt))
{
SC_set_error(stmt, STMT_INTERNAL_ERROR, "internal savepoint error", func);
goto cleanup;
}
}
}
}
/* prepend internal savepoint command ? */
if (PREPEND_IN_PROGRESS == self->internal_op)
prepend_savepoint = TRUE;
/* append all these together, to avoid round-trips */
query_len = strlen(query);
MYLOG(0, "query_len=" FORMAT_SIZE_T "\n", query_len);
initPQExpBuffer(&query_buf);
/* issue_begin, query_rollback and prepend_savepoint are exclusive */
if (issue_begin)
{
appendPQExpBuffer(&query_buf, "%s;", bgncmd);
discard_next_begin = TRUE;
}
else if (query_rollback && !self->connInfo.drivers.for_extension_connector)
{
appendPQExpBuffer(&query_buf, "%s %s;", svpcmd, per_query_svp);
discard_next_savepoint = TRUE;
}
else if (prepend_savepoint)
{
char prepend_cmd[128];
GenerateSvpCommand(self, INTERNAL_SAVEPOINT_OPERATION, prepend_cmd, sizeof(prepend_cmd));
appendPQExpBuffer(&query_buf, "%s;", prepend_cmd);
self->internal_op = SAVEPOINT_IN_PROGRESS;
}
appendPQExpBufferStr(&query_buf, query);
if (appendq)
{
appendPQExpBuffer(&query_buf, ";%s", appendq);
}
if (query_rollback && !self->connInfo.drivers.for_extension_connector)
{
appendPQExpBuffer(&query_buf, ";%s %s", rlscmd, per_query_svp);
}
if (PQExpBufferDataBroken(query_buf))
{
CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", "");
goto cleanup;
}
/* Set up notice receiver */
nrarg.conn = self;
nrarg.comment = func;
nrarg.res = NULL;
PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, &nrarg);
QLOG(0, "PQsendQuery: %p '%s'\n", self->pqconn, query_buf.data);
if (!PQsendQuery(self->pqconn, query_buf.data))
{
char *errmsg = PQerrorMessage(self->pqconn);
QLOG(0, "\nCommunication Error: %s\n", SAFE_STR(errmsg));
CC_set_error(self, CONNECTION_COMMUNICATION_ERROR, errmsg, func);
goto cleanup;
}
PQsetSingleRowMode(self->pqconn);
cmdres = qi ? qi->result_in : NULL;
if (cmdres)
used_passed_result_object = TRUE;
else
{
cmdres = QR_Constructor();
if (!cmdres)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
goto cleanup;
}
}
res = cmdres;
if (qi)
{
res->cmd_fetch_size = qi->fetch_size;
res->cache_size = qi->row_size;
}
nrarg.res = res;
while (self->pqconn && (pgres = PQgetResult(self->pqconn)) != NULL)
{
int status = PQresultStatus(pgres);
if (discardTheRest)
continue;
switch (status)
{
case PGRES_COMMAND_OK:
/* portal query command, no tuples returned */
/* read in the return message from the backend */
cmdbuffer = PQcmdStatus(pgres);
QLOG(0, "\tok: - 'C' - %s\n", cmdbuffer);
if (query_completed) /* allow for "show" style notices */
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
res = res->next;
nrarg.res = res;
}
MYLOG(0, " setting cmdbuffer = '%s'\n", cmdbuffer);
my_trim(cmdbuffer); /* get rid of trailing space */
if (strnicmp(cmdbuffer, bgncmd, strlen(bgncmd)) == 0)
{
CC_set_in_trans(self);
CC_set_in_manual_trans(self);
if (discard_next_begin) /* discard the automatically issued BEGIN */
{
discard_next_begin = FALSE;
break; /* discard the result */
}
}
/*
* There are 2 risks to RELEASE an internal savepoint.
* One is to RELEASE the savepoint invalitated
* due to manually issued ROLLBACK or RELEASE.
* Another is to invalitate manual SAVEPOINTs unexpectedly
* by RELEASing the internal savepoint.
*/
else if (strnicmp(cmdbuffer, svpcmd, strlen(svpcmd)) == 0)
{
if (discard_next_savepoint)
{
discard_next_savepoint = FALSE;
discard_next_release = TRUE;
MYLOG(DETAIL_LOG_LEVEL, "Discarded a SAVEPOINT result\n");
break; /* discard the result */
}
if (SAVEPOINT_IN_PROGRESS == self->internal_op)
{
CC_start_rbpoint(self);
self->internal_op = 0;
break; /* discard the result */
}
/* Don't release the internal savepoint */
self->internal_svp = 0;
}
else if (strnicmp(cmdbuffer, rbkcmd, strlen(rbkcmd)) == 0)
{
CC_mark_cursors_doubtful(self);
CC_set_in_error_trans(self); /* mark the transaction error in case of manual rollback */
self->internal_svp = 0; /* possibly an internal savepoint is invalid */
self->opt_previous = 0; /* unknown */
CC_init_opt_in_progress(self);
}
else if (strnicmp(cmdbuffer, rlscmd, strlen(rlscmd)) == 0)
{
if (discard_next_release)
{
MYLOG(DETAIL_LOG_LEVEL, "Discarded a RELEASE result\n");
discard_next_release = FALSE;
break; /* discard the result */
}
self->internal_svp = 0;
if (SAVEPOINT_IN_PROGRESS == self->internal_op)
break; /* discard the result */
}
/*
* DROP TABLE or ALTER TABLE may change
* the table definition. So clear the
* col_info cache though it may be too simple.
*/
else if (strnicmp(cmdbuffer, "DROP TABLE", 10) == 0 ||
strnicmp(cmdbuffer, "ALTER TABLE", 11) == 0)
CC_clear_col_info(self, FALSE);
else
{
ptr = strrchr(cmdbuffer, ' ');
if (ptr)
res->recent_processed_row_count = atoi(ptr + 1);
else
res->recent_processed_row_count = -1;
if (self->current_schema_valid &&
strnicmp(cmdbuffer, "SET", 3) == 0)
{
if (is_setting_search_path(query))
reset_current_schema(self);
}
}
if (QR_command_successful(res))
QR_set_rstatus(res, PORES_COMMAND_OK);
QR_set_command(res, cmdbuffer);
query_completed = TRUE;
MYLOG(0, " returning res = %p\n", res);
break;
case PGRES_EMPTY_QUERY:
/* We return the empty query */
QR_set_rstatus(res, PORES_EMPTY_QUERY);
break;
case PGRES_NONFATAL_ERROR:
handle_pgres_error(self, pgres, "send_query", res, FALSE);
break;
case PGRES_BAD_RESPONSE:
case PGRES_FATAL_ERROR:
handle_pgres_error(self, pgres, "send_query", res, TRUE);
/* We should report that an error occured. Zoltan */
aborted = TRUE;
query_completed = TRUE;
break;
case PGRES_TUPLES_OK:
QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres));
case PGRES_SINGLE_TUPLE:
if (query_completed)
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
if (create_keyset)
{
QR_set_haskeyset(res->next);
if (stmt)
res->next->num_key_fields = stmt->num_key_fields;
}
MYLOG(0, " 'T' no result_in: res = %p\n", res->next);
res = res->next;
nrarg.res = res;
if (qi)
{
QR_set_cache_size(res, qi->row_size);
res->cmd_fetch_size = qi->fetch_size;
}
}
if (!used_passed_result_object)
{
const char *cursor = qi ? qi->cursor : NULL;
if (create_keyset)
{
QR_set_haskeyset(res);
if (stmt)
res->num_key_fields = stmt->num_key_fields;
if (cursor && cursor[0])
QR_set_synchronize_keys(res);
}
if (CC_from_PGresult(res, stmt, self, cursor, &pgres))
query_completed = TRUE;
else
{
aborted = TRUE;
if (QR_command_maybe_successful(res))
retres = NULL;
else
retres = cmdres;
}
}
else
{ /* next fetch, so reuse an existing result */
const char *cursor = res->cursor_name;
/*
* called from QR_next_tuple and must return
* immediately.
*/
if (!CC_from_PGresult(res, stmt, NULL, cursor, &pgres))
{
retres = NULL;
break;
}
retres = cmdres;
}
if (res->rstatus == PORES_TUPLES_OK && res->notice)
{
QR_set_rstatus(res, PORES_NONFATAL_ERROR);
}
else if (PORES_NO_MEMORY_ERROR == QR_get_rstatus(res))
{
PGcancel *cancel = PQgetCancel(self->pqconn);
char dummy[8];
discardTheRest = TRUE;
if (cancel != NULL)
{
PQcancel(cancel, dummy, sizeof(dummy));
PQfreeCancel(cancel);
}
else
goto cleanup;
}
break;
case PGRES_COPY_OUT:
/* XXX: We used to read from stdin here. Does that make any sense? */
case PGRES_COPY_IN:
if (query_completed)
{
res->next = QR_Constructor();
if (!res->next)
{
CC_set_error(self, CONNECTION_COULD_NOT_RECEIVE, "Could not create result info in send_query.", func);
ReadyToReturn = TRUE;
retres = NULL;
break;
}
res = res->next;
nrarg.res = res;
}
QR_set_rstatus(res, PORES_COPY_IN);
ReadyToReturn = TRUE;
retres = cmdres;
break;
case PGRES_COPY_BOTH:
default:
/* skip the unexpected response if possible */
CC_set_error(self, CONNECTION_BACKEND_CRAZY, "Unexpected result status (send_query)", func);
handle_pgres_error(self, pgres, "send_query", res, TRUE);
CC_on_abort(self, CONN_DEAD);
MYLOG(0, " error - %s\n", CC_get_errormsg(self));
ReadyToReturn = TRUE;
retres = NULL;
break;
}
if (pgres)
{
PQclear(pgres);
pgres = NULL;
}
}
cleanup:
if (self->pqconn)
PQsetNoticeReceiver(self->pqconn, receive_libpq_notice, NULL);
if (pgres != NULL)
{
PQclear(pgres);
pgres = NULL;
}
MYLOG(DETAIL_LOG_LEVEL, " rollback_on_error=%d CC_is_in_trans=%d discard_next_savepoint=%d query_rollback=%d\n", rollback_on_error, CC_is_in_trans(self), discard_next_savepoint, query_rollback);
if (rollback_on_error && CC_is_in_trans(self) && !discard_next_savepoint)
{
if (query_rollback)
{
if (!CC_internal_rollback(self, PER_QUERY_ROLLBACK, ignore_abort_on_conn))
ignore_abort_on_conn = FALSE;
}
else if (CC_is_in_error_trans(self))
{
QLOG(0, "PQexec: %p '%s'\n", self->pqconn, rbkcmd);
pgres = PQexec(self->pqconn, rbkcmd);
}
/*
* XXX: we don't check the result here. Should we? We're rolling back,
* so it's not clear what else we can do on error. Giving an error
* message to the application would be nice though.
*/
if (pgres != NULL)
{
PQclear(pgres);
pgres = NULL;
}
}
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
#undef return
/*
* Break before being ready to return.
*/
if (!ReadyToReturn)
retres = cmdres;
if (!PQExpBufferDataBroken(query_buf))
termPQExpBuffer(&query_buf);
/*
* Cleanup garbage results before returning.
*/
if (cmdres && retres != cmdres && !used_passed_result_object)
QR_Destructor(cmdres);
/*
* Cleanup the aborted result if specified
*/
if (retres)
{
if (aborted)
{
/** if (ignore_abort_on_conn)
{
if (!used_passed_result_object)
{
QR_Destructor(retres);
retres = NULL;
}
} **/
if (retres)
{
/*
* discard results other than errors.
*/
QResultClass *qres;
for (qres = retres; qres->next; qres = retres)
{
if (QR_get_aborted(qres))
break;
retres = qres->next;
qres->next = NULL;
QR_Destructor(qres);
}
/*
* If error message isn't set
*/
if (ignore_abort_on_conn)
{
CC_set_errornumber(self, CONN_ERROR_IGNORED);
if (retres)
QR_set_rstatus(retres, PORES_NONFATAL_ERROR);
MYLOG(DETAIL_LOG_LEVEL, " ignored abort_on_conn\n");
}
else if (retres)
{
if (NULL == CC_get_errormsg(self) ||
!CC_get_errormsg(self)[0])
CC_set_errormsg(self, QR_get_message(retres));
if (!self->sqlstate[0])
STRCPY_FIXED(self->sqlstate, retres->sqlstate);
}
}
}
}
/*
* Update our copy of the transaction status.
*
* XXX: Once we stop using the socket directly, and do everything with
* libpq, we can get rid of the transaction_status field altogether
* and always ask libpq for it.
*/
LIBPQ_update_transaction_status(self);
if (retres)
QR_set_conn(retres, self);
return retres;
}
#define MAX_SEND_FUNC_ARGS 3
static const char *func_param_str[MAX_SEND_FUNC_ARGS + 1] =
{
"()",
"($1)",
"($1, $2)",
"($1, $2, $3)"
};
static
Int8 odbc_hton64(Int8 h64)
{
union {
Int8 n64;
UInt4 i32[2];
} u;
u.i32[0] = htonl((UInt4) (h64 >> 32));
u.i32[1] = htonl((UInt4) h64);
return u.n64;
}
static
Int8 odbc_ntoh64(Int8 n64)
{
union {
Int8 h64;
UInt4 i32[2];
} u;
Int8 result;
u.h64 = n64;
result = ntohl(u.i32[0]);
result <<= 32;
result |= ntohl(u.i32[1]);
return result;
}
int
CC_send_function(ConnectionClass *self, const char *fn_name, void *result_buf, int *actual_result_len, int result_is_int, LO_ARG *args, int nargs)
{
int i;
int ret = FALSE;
int func_cs_count = 0;
char sqlbuffer[1000];
PGresult *pgres = NULL;
Oid paramTypes[MAX_SEND_FUNC_ARGS];
char *paramValues[MAX_SEND_FUNC_ARGS];
int paramLengths[MAX_SEND_FUNC_ARGS];
int paramFormats[MAX_SEND_FUNC_ARGS];
Int4 intParamBufs[MAX_SEND_FUNC_ARGS];
Int8 int8ParamBufs[MAX_SEND_FUNC_ARGS];
MYLOG(0, "conn=%p, fn_name=%s, result_is_int=%d, nargs=%d\n", self, fn_name, result_is_int, nargs);
/* Finish the pending extended query first */
#define return DONT_CALL_RETURN_FROM_HERE???
ENTER_INNER_CONN_CS(self, func_cs_count);
SPRINTF_FIXED(sqlbuffer, "SELECT pg_catalog.%s%s", fn_name,
func_param_str[nargs]);
for (i = 0; i < nargs; ++i)
{
MYLOG(0, " arg[%d]: len = %d, isint = %d, integer = " FORMATI64 ", ptr = %p\n", i, args[i].len, args[i].isint, args[i].isint == 2 ? args[i].u.integer64 : args[i].u.integer, args[i].u.ptr);
/* integers are sent as binary, others as text */
if (args[i].isint == 2)
{
paramTypes[i] = PG_TYPE_INT8;
int8ParamBufs[i] = odbc_hton64(args[i].u.integer64);
paramValues[i] = (char *) &int8ParamBufs[i];
paramLengths[i] = 8;
paramFormats[i] = 1;
}
else if (args[i].isint)
{
paramTypes[i] = PG_TYPE_INT4;
intParamBufs[i] = htonl(args[i].u.integer);
paramValues[i] = (char *) &intParamBufs[i];
paramLengths[i] = 4;
paramFormats[i] = 1;
}
else
{
paramTypes[i] = 0;
paramValues[i] = args[i].u.ptr;
paramLengths[i] = args[i].len;
paramFormats[i] = 1;
}
}
QLOG(0, "PQexecParams: %p '%s' nargs=%d\n", self->pqconn, sqlbuffer, nargs);
pgres = PQexecParams(self->pqconn, sqlbuffer, nargs,
paramTypes, (const char * const *) paramValues,
paramLengths, paramFormats, 1);
MYLOG(0, "done sending function\n");
if (PQresultStatus(pgres) == PGRES_TUPLES_OK)
QLOG(0, "\tok: - 'T' - %s\n", PQcmdStatus(pgres));
else
{
handle_pgres_error(self, pgres, "send_query", NULL, TRUE);
goto cleanup;
}
if (PQnfields(pgres) != 1 || PQntuples(pgres) != 1)
{
CC_set_errormsg(self, "unexpected result set from large_object function");
goto cleanup;
}
*actual_result_len = PQgetlength(pgres, 0, 0);
QLOG(0, "\tgot result with length: %d\n", *actual_result_len);
if (*actual_result_len > 0)
{
char *value = PQgetvalue(pgres, 0, 0);
if (result_is_int == 2)
{
Int8 int8val;
memcpy(&int8val, value, sizeof(Int8));
int8val = odbc_ntoh64(int8val);
memcpy(result_buf, &int8val, sizeof(Int8));
MYLOG(0, "int8 result=" FORMATI64 "\n", int8val);
}
else if (result_is_int)
{
Int4 int4val;
memcpy(&int4val, value, sizeof(Int4));
int4val = ntohl(int4val);
memcpy(result_buf, &int4val, sizeof(Int4));
}
else
memcpy(result_buf, value, *actual_result_len);
}
ret = TRUE;
cleanup:
#undef return
CLEANUP_FUNC_CONN_CS(func_cs_count, self);
if (pgres)
PQclear(pgres);
return ret;
}
char
CC_send_settings(ConnectionClass *self, const char *set_query)
{
HSTMT hstmt;
RETCODE result;
char status = TRUE;
char *cs,
*ptr;
#ifdef HAVE_STRTOK_R
char *last;
#endif /* HAVE_STRTOK_R */
CSTR func = "CC_send_settings";
MYLOG(0, "entering...\n");
if (set_query == NULL) return TRUE;
/*
* This function must use the local odbc API functions since the odbc state
* has not transitioned to "connected" yet.
*/
result = PGAPI_AllocStmt(self, &hstmt, 0);
if (!SQL_SUCCEEDED(result))
return FALSE;
/* non-external handle ensures no BEGIN/COMMIT/ABORT stuff */
cs = strdup(set_query);
if (cs == NULL)
{
CC_set_error(self, CONN_NO_MEMORY_ERROR, "Couldn't alloc buffer for query.", func);
return FALSE;
}
#ifdef HAVE_STRTOK_R
ptr = strtok_r(cs, ";", &last);
#else
ptr = strtok(cs, ";");
#endif /* HAVE_STRTOK_R */
while (ptr)
{
result = PGAPI_ExecDirect(hstmt, (SQLCHAR *) ptr, SQL_NTS, 0);
if (!SQL_SUCCEEDED(result))
status = FALSE;
MYLOG(0, "result %d, status %d from '%s'\n", result, status, ptr);
#ifdef HAVE_STRTOK_R
ptr = strtok_r(NULL, ";", &last);
#else
ptr = strtok(NULL, ";");
#endif /* HAVE_STRTOK_R */
}
free(cs);
PGAPI_FreeStmt(hstmt, SQL_DROP);
return status;
}
/*
* This function is just a hack to get the oid of our Large Object oid type.
* If a real Large Object oid type is made part of Postgres, this function
* will go away and the define 'PG_TYPE_LO' will be updated.
*/
static SQLRETURN
CC_lookup_lo(ConnectionClass *self)
{
SQLRETURN ret = SQL_SUCCESS;
QResultClass *res;
MYLOG(0, "entering...\n");
res = CC_send_query(self, "select oid, typbasetype from pg_type where typname = '" PG_TYPE_LO_NAME "'",
NULL, READ_ONLY_QUERY, NULL);
if (!QR_command_maybe_successful(res))
ret = SQL_ERROR;
else if (QR_command_maybe_successful(res) && QR_get_num_cached_tuples(res) > 0)
{
OID basetype;
self->lobj_type = QR_get_value_backend_int(res, 0, 0, NULL);
basetype = QR_get_value_backend_int(res, 0, 1, NULL);
if (PG_TYPE_OID == basetype)
self->lo_is_domain = 1;
else if (0 != basetype)
self->lobj_type = 0;
}
QR_Destructor(res);
MYLOG(0, "Got the large object oid: %d\n", self->lobj_type);
return ret;
}
/*
* This function initializes the version of PostgreSQL from
* connInfo.protocol that we're connected to.
* h-inoue 01-2-2001
*/
void
CC_initialize_pg_version(ConnectionClass *self)
{
STRCPY_FIXED(self->pg_version, "7.4");
self->pg_version_major = 7;
self->pg_version_minor = 4;
}
void
CC_log_error(const char *func, const char *desc, const ConnectionClass *self)
{
#define NULLCHECK(a) (a ? a : "(NULL)")
if (self)
{
MYLOG(0, "CONN ERROR: func=%s, desc='%s', errnum=%d, errmsg='%s'\n", func, desc, self->__error_number, NULLCHECK(self->__error_message));
MYLOG(DETAIL_LOG_LEVEL, " ------------------------------------------------------------\n");
MYLOG(DETAIL_LOG_LEVEL, " henv=%p, conn=%p, status=%u, num_stmts=%d\n", self->henv, self, self->status, self->num_stmts);
MYLOG(DETAIL_LOG_LEVEL, " pqconn=%p, stmts=%p, lobj_type=%d\n", self->pqconn, self->stmts, self->lobj_type);
}
else
{
MYLOG(0, "INVALID CONNECTION HANDLE ERROR: func=%s, desc='%s'\n", func, desc);
}
}
/*
* This doesn't really return the CURRENT SCHEMA
* but there's no alternative.
*/
const char *
CC_get_current_schema(ConnectionClass *conn)
{
if (!conn->current_schema_valid)
{
QResultClass *res;
if (res = CC_send_query(conn, "select current_schema()", NULL, READ_ONLY_QUERY, NULL), QR_command_maybe_successful(res))
{
if (QR_get_num_total_tuples(res) == 1)
{
char *curschema = QR_get_value_backend_text(res, 0, 0);
if (curschema)
conn->current_schema = strdup(curschema);
}
if (conn->current_schema)
conn->current_schema_valid = TRUE;
}
QR_Destructor(res);
}
return (const char *) conn->current_schema;
}
int CC_mark_a_object_to_discard(ConnectionClass *conn, int type, const char *plan)
{
int cnt = conn->num_discardp + 1, plansize;
char *pname;
CC_REALLOC_return_with_error(conn->discardp, char *,
(cnt * sizeof(char *)), conn, "Couldn't alloc discardp.", -1);
plansize = strlen(plan) + 2;
CC_MALLOC_return_with_error(pname, char, plansize,
conn, "Couldn't alloc discardp mem.", -1);
pname[0] = (char) type; /* 's':prepared statement 'p':cursor */
strncpy_null(pname + 1, plan, plansize - 1);
conn->discardp[conn->num_discardp++] = pname;
return 1;
}
int CC_discard_marked_objects(ConnectionClass *conn)
{
int i, cnt;
QResultClass *res;
char *pname, cmd[64];
if ((cnt = conn->num_discardp) <= 0)
return 0;
for (i = cnt - 1; i >= 0; i--)
{
pname = conn->discardp[i];
if ('s' == pname[0])
SPRINTF_FIXED(cmd, "DEALLOCATE \"%s\"", pname + 1);
else
SPRINTF_FIXED(cmd, "CLOSE \"%s\"", pname + 1);
res = CC_send_query(conn, cmd, NULL, ROLLBACK_ON_ERROR | IGNORE_ABORT_ON_CONN | READ_ONLY_QUERY, NULL);
QR_Destructor(res);
free(conn->discardp[i]);
/* CodeDEX with CID=12044 */
conn->discardp[i] = NULL;
conn->num_discardp--;
}
return 1;
}
static void
LIBPQ_update_transaction_status(ConnectionClass *self)
{
if (!self->pqconn)
return;
switch (PQtransactionStatus(self->pqconn))
{
case PQTRANS_IDLE:
if (CC_is_in_trans(self))
{
if (CC_is_in_error_trans(self))
CC_on_abort(self, NO_TRANS);
else
CC_on_commit(self);
}
break;
case PQTRANS_INTRANS:
CC_set_in_trans(self);
if (CC_is_in_error_trans(self))
{
CC_set_no_error_trans(self);
CC_on_abort_partial(self);
}
break;
case PQTRANS_INERROR:
CC_set_in_trans(self);
CC_set_in_error_trans(self);
break;
case PQTRANS_ACTIVE:
/*
* A query is still executing. It might have already aborted,
* but all we know for sure is that we're in a transaction.
*/
CC_set_in_trans(self);
break;
default: /* unknown status */
break;
}
}
static void CC_getLibpath(char *libpath, int libpathLen)
{
const char *fname = NULL;
int fnameIndex = 0;
int libpathIndex = 0;
#ifndef WIN32
Dl_info dl_info;
int ret = 0;
if (libpath == NULL || libpathLen <= 0)
{
return ;
}
ret = dladdr((void*)CC_getLibpath, &dl_info);
if (ret != 0)
{
fname = dl_info.dli_fname;
}
#else
MEMORY_BASIC_INFORMATION mbi;
char fpath[4096] = {'\0'};
if (libpath == NULL || libpathLen <= 0)
{
return ;
}
if ((VirtualQuery(CC_getLibpath, &mbi, sizeof(mbi)) != 0) &&
GetModuleFileName((HMODULE)mbi.AllocationBase, fpath, sizeof(fpath)))
{
fname = fpath;
}
#endif
while ((fname != NULL) && (fname[fnameIndex] != '\0') && (libpathIndex < libpathLen - 1))
{
if (fname[fnameIndex] == '\'')
{
libpath[libpathIndex++] = '\'';
}
else if ((fname[fnameIndex] == '"') || (fname[fnameIndex] == '\\'))
{
libpath[libpathIndex++] = '\\';
}
libpath[libpathIndex++] = fname[fnameIndex++];
}
libpath[libpathIndex] = '\0';
}
static void CC_getOSUser(char *username, int usernameLen)
{
#ifndef WIN32
struct passwd *pw = NULL;
if (username == NULL || usernameLen <= 0)
{
return ;
}
pw = getpwuid(geteuid());
if (pw == NULL)
{
username[0] = '\0';
}
else
{
strncpy(username, pw->pw_name, strlen(pw->pw_name));
}
#else
DWORD len = usernameLen;
if (username == NULL || usernameLen <= 0)
{
return ;
}
if(!GetUserName(username, &len))
{
username[0] = '\0';
return ;
}
#endif
}
#define PROTOCOL3_OPTS_MAX 30
static int
LIBPQ_connect(ConnectionClass *self)
{
CSTR func = "LIBPQ_connect";
ConnInfo *ci = &(self->connInfo);
char ret = 0;
void *pqconn = NULL;
int pqret;
int pversion;
const char *opts[PROTOCOL3_OPTS_MAX], *vals[PROTOCOL3_OPTS_MAX];
PQconninfoOption *conninfoOption = NULL, *pqopt;
int i, cnt;
char login_timeout_str[20];
char keepalive_idle_str[20];
char keepalive_interval_str[20];
char *errmsg = NULL;
char local_conninfo[8192];
MYLOG(0, "connecting to the database using %s as the server and pqopt={%s}\n", self->connInfo.server, SAFE_NAME(ci->pqopt));
if (NULL == (conninfoOption = PQconninfoParse(SAFE_NAME(ci->pqopt), &errmsg)))
{
char emsg[200];
if (errmsg != NULL)
SPRINTF_FIXED(emsg, "libpq connection parameter error:%s", errmsg);
else
STRCPY_FIXED(emsg, "memory error? in PQconninfoParse");
CC_set_error(self, CONN_OPENDB_ERROR, emsg, func);
goto cleanup;
}
/* Build arrays of keywords & values, for PQconnectDBParams */
cnt = 0;
if (ci->server[0])
{
opts[cnt] = "host"; vals[cnt++] = ci->server;
}
if (ci->port[0])
{
opts[cnt] = "port"; vals[cnt++] = ci->port;
}
if (ci->database[0])
{
opts[cnt] = "dbname"; vals[cnt++] = ci->database;
}
if (ci->username[0])
{
opts[cnt] = "user"; vals[cnt++] = ci->username;
}
switch (ci->sslmode[0])
{
case '\0':
break;
case SSLLBYTE_VERIFY:
opts[cnt] = "sslmode";
switch (ci->sslmode[1])
{
case 'f':
vals[cnt++] = SSLMODE_VERIFY_FULL;
break;
case 'c':
vals[cnt++] = SSLMODE_VERIFY_CA;
break;
default:
vals[cnt++] = ci->sslmode;
}
break;
default:
opts[cnt] = "sslmode";
vals[cnt++] = ci->sslmode;
}
if (NAME_IS_VALID(ci->password))
{
opts[cnt] = "password"; vals[cnt++] = SAFE_NAME(ci->password);
}
if (ci->disable_keepalive)
{
opts[cnt] = "keepalives"; vals[cnt++] = "0";
}
if (self->login_timeout > 0)
{
SPRINTF_FIXED(login_timeout_str, "%u", (unsigned int) self->login_timeout);
opts[cnt] = "connect_timeout"; vals[cnt++] = login_timeout_str;
}
if (self->connInfo.keepalive_idle > 0)
{
ITOA_FIXED(keepalive_idle_str, self->connInfo.keepalive_idle);
opts[cnt] = "keepalives_idle"; vals[cnt++] = keepalive_idle_str;
}
if (self->connInfo.keepalive_interval > 0)
{
ITOA_FIXED(keepalive_interval_str, self->connInfo.keepalive_interval);
opts[cnt] = "keepalives_interval"; vals[cnt++] = keepalive_interval_str;
}
if ((odbcVersionString != NULL) && (odbcVersionString[0] != '\0'))
{
if (self->connInfo.connection_extra_info > 0)
{
char libpath[4096] = {'\0'};
char username[128] = {'\0'};
(void)CC_getLibpath(libpath, sizeof(libpath));
(void)CC_getOSUser(username, sizeof(username));
snprintf(local_conninfo, sizeof(local_conninfo),
"{\"driver_name\":\"ODBC\",\"driver_version\":\"%s\",\"driver_path\":\"%s\",\"os_user\":\"%s\"}",
odbcVersionString, libpath, username);
}
else
{
snprintf(local_conninfo, sizeof(local_conninfo),
"{\"driver_name\":\"ODBC\",\"driver_version\":\"%s\"}",
odbcVersionString);
}
opts[cnt] = "connection_info"; vals[cnt++] = local_conninfo;
}
opts[cnt] = "target_session_attrs";
vals[cnt++] = "primary";
if (conninfoOption != NULL)
{
const char *keyword, *val;
int j;
for (i = 0, pqopt = conninfoOption; (keyword = pqopt->keyword) != NULL; i++, pqopt++)
{
if ((val = pqopt->val) != NULL)
{
for (j = 0; j < cnt; j++)
{
if (stricmp(opts[j], keyword) == 0)
{
char emsg[100];
if (vals[j] != NULL && strcmp(vals[j], val) == 0)
continue;
SPRINTF_FIXED(emsg, "%s parameter in pqopt option conflicts with other ordinary option", keyword);
CC_set_error(self, CONN_OPENDB_ERROR, emsg, func);
goto cleanup;
}
}
if (j >= cnt && cnt < PROTOCOL3_OPTS_MAX - 1)
{
opts[cnt] = keyword; vals[cnt++] = val;
}
}
}
}
opts[cnt] = vals[cnt] = NULL;
/* Ok, we're all set to connect */
if (get_qlog() > 0 || get_mylog() > 0)
{
const char **popt, **pval;
const char* pwdKey = "password";
QLOG(0, "PQconnectdbParams:");
for (popt = opts, pval = vals; *popt; popt++, pval++) {
if (strcmp(pwdKey, *popt) == 0) {
QPRINTF(0, " %s='xxxxx'", *popt);
} else {
QPRINTF(0, " %s='%s'", *popt, *pval);
}
}
QPRINTF(0, "\n");
}
pqconn = PQconnectdbParams(opts, vals, FALSE);
if (!pqconn)
{
CC_set_error(self, CONN_OPENDB_ERROR, "PQconnectdb error", func);
goto cleanup;
}
self->pqconn = pqconn;
pqret = PQstatus(pqconn);
if (pqret == CONNECTION_BAD && PQconnectionNeedsPassword(pqconn))
{
const char *errmsg;
MYLOG(0, "password retry\n");
errmsg = PQerrorMessage(pqconn);
CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func);
QLOG(0, "PQfinish: %p\n", pqconn);
PQfinish(pqconn);
self->pqconn = NULL;
self->connInfo.password_required = TRUE;
ret = -1;
goto cleanup;
}
if (CONNECTION_OK != pqret)
{
const char *errmsg;
MYLOG(DETAIL_LOG_LEVEL, "status=%d\n", pqret);
errmsg = PQerrorMessage(pqconn);
CC_set_error(self, CONNECTION_SERVER_NOT_REACHED, errmsg, func);
MYLOG(0, "Could not establish connection to the database; LIBPQ returned -> %s\n", errmsg);
goto cleanup;
}
if (PQpass(pqconn) && strlen(PQpass(pqconn)))
{
char *pwd = PQpass(pqconn);
memset(pwd, 0, strlen(pwd));
}
MYLOG(0, "libpq connection to the database established.(IP: %s)\n", PQhost(pqconn));
pversion = PQprotocolVersion(pqconn);
if (pversion < 3)
{
MYLOG(0, "Protocol version %d is not supported\n", pversion);
goto cleanup;
}
MYLOG(0, "protocol=%d\n", pversion);
pversion = PQserverVersion(pqconn);
self->pg_version_major = pversion / 10000;
self->pg_version_minor = (pversion % 10000) / 100;
SPRINTF_FIXED(self->pg_version, "%d.%d.%d", self->pg_version_major, self->pg_version_minor, pversion % 100);
MYLOG(0, "Server version=%s\n", self->pg_version);
if (!CC_get_username(self)[0])
{
MYLOG(0, "PQuser=%s\n", PQuser(pqconn));
STRCPY_FIXED(self->connInfo.username, PQuser(pqconn));
}
ret = 1;
cleanup:
if (errmsg != NULL)
free(errmsg);
PQconninfoFree(conninfoOption);
if (ret != 1)
{
if (self->pqconn)
{
QLOG(0, "PQfinish: %p\n", self->pqconn);
PQfinish(self->pqconn);
}
self->pqconn = NULL;
}
MYLOG(0, "leaving %d\n", ret);
return ret;
}
int
CC_send_cancel_request(const ConnectionClass *conn)
{
int ret = 0;
char errbuf[256];
void *cancel;
/* Check we have an open connection */
if (!conn || !conn->pqconn)
return FALSE;
cancel = PQgetCancel(conn->pqconn);
if (!cancel)
return FALSE;
ret = PQcancel(cancel, errbuf, sizeof(errbuf));
PQfreeCancel(cancel);
if (1 == ret)
return TRUE;
else
return FALSE;
}
const char *CurrCat(const ConnectionClass *conn)
{
/*
* Returning the database name causes problems in MS Query. It
* generates query like: "SELECT DISTINCT a FROM byronnbad3
* bad3"
*/
if (isMsQuery()) /* MS Query */
return NULL;
return conn->connInfo.database;
}
const char *CurrCatString(const ConnectionClass *conn)
{
const char *cat = CurrCat(conn);
if (!cat)
cat = NULL_STRING;
return cat;
}
/*------
* Create a null terminated lower-case string if the
* original string contains upper-case characters.
* The SQL_NTS length is considered.
*------
*/
SQLCHAR *
make_lstring_ifneeded(ConnectionClass *conn, const SQLCHAR *s, ssize_t len, BOOL ifallupper)
{
ssize_t length = len;
char *str = NULL;
const char *ccs = (const char *) s;
if (s && (len > 0 || (len == SQL_NTS && (length = strlen(ccs)) > 0)))
{
int i;
UCHAR tchar;
encoded_str encstr;
make_encoded_str(&encstr, conn, ccs);
for (i = 0; i < length; i++)
{
tchar = encoded_nextchar(&encstr);
if (MBCS_NON_ASCII(encstr))
continue;
if (ifallupper && islower(tchar))
{
if (str)
{
free(str);
str = NULL;
}
break;
}
if (tolower(tchar) != tchar)
{
if (!str)
{
str = malloc(length + 1);
if (!str) return NULL;
memcpy(str, s, length);
str[length] = '\0';
}
str[i] = tolower(tchar);
}
}
}
return (SQLCHAR *) str;
}
/*
* Concatenate a single formatted argument to a given buffer handling the SQL_NTS thing.
* "fmt" must contain somewhere in it the single form '%.*s'.
* This is heavily used in creating queries for info routines (SQLTables, SQLColumns).
* This routine could be modified to use vsprintf() to handle multiple arguments.
*/
static int
my_str(char *buf, int buflen, const char *fmt, const char *s, ssize_t len)
{
if (s && (len > 0 || (len == SQL_NTS && *s != 0)))
{
size_t length = (len > 0) ? len : strlen(s);
return snprintf(buf, buflen, fmt, length, s);
}
buf[0] = '\0';
return 0;
}
int
schema_str(char *buf, int buflen, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn)
{
CSTR fmt = "%.*s";
buf[0] = '\0';
if (!s || 0 == len)
{
/*
* Note that this driver assumes the implicit schema is
* the CURRENT_SCHEMA() though it doesn't worth the
* naming.
*/
if (table_is_valid)
return my_str(buf, buflen, fmt, CC_get_current_schema(conn), SQL_NTS);
return 0;
}
return my_str(buf, buflen, fmt, (char *) s, len);
}
static void
my_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const char *s, ssize_t len)
{
if (s && (len > 0 || (len == SQL_NTS && *s != 0)))
{
size_t length = (len > 0) ? len : strlen(s);
appendPQExpBuffer(buf, fmt, length, s);
}
}
/*
* my_appendPQExpBuffer1 is a extension of my_appendPQExpBuffer.
* It can have 1 more parameter than my_aapendPQExpBuffer.
*/
static void
my_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s)
{
if (s && s[0] != '\0')
{
ssize_t length = strlen(s);
if (s1)
appendPQExpBuffer(buf, fmt, s1, length, s);
else
appendPQExpBuffer(buf, fmt, length, s);
}
}
void
schema_appendPQExpBuffer(PQExpBufferData *buf, const char *fmt, const SQLCHAR *s, SQLLEN len, BOOL table_is_valid, ConnectionClass *conn)
{
if (!s || 0 == len)
{
/*
* Note that this driver assumes the implicit schema is
* the CURRENT_SCHEMA() though it doesn't worth the
* naming.
*/
if (table_is_valid)
my_appendPQExpBuffer(buf, fmt, CC_get_current_schema(conn), SQL_NTS);
return;
}
my_appendPQExpBuffer(buf, fmt, (char *) s, len);
}
void
schema_appendPQExpBuffer1(PQExpBufferData *buf, const char *fmt, const char *s1, const char *s, BOOL table_is_valid, ConnectionClass *conn)
{
if (!s || s[0] == '\0')
{
if (table_is_valid)
my_appendPQExpBuffer1(buf, fmt, s1, CC_get_current_schema(conn));
return;
}
my_appendPQExpBuffer1(buf, fmt, s1, s);
}
#ifdef _HANDLE_ENLIST_IN_DTC_
/*
* Export the following functions so that the pgenlist dll
* can handle ConnectionClass objects as opaque ones.
*/
#define _PGDTC_FUNCS_IMPLEMENT_
#include "connexp.h"
#define SYNC_AUTOCOMMIT(conn) \
(SQL_AUTOCOMMIT_OFF != conn->autocommit_public ? \
(conn->transact_status |= CONN_IN_AUTOCOMMIT) : \
(conn->transact_status &= ~CONN_IN_AUTOCOMMIT))
DLL_DECLARE void PgDtc_create_connect_string(void *self, char *connstr, int strsize)
{
ConnectionClass *conn = (ConnectionClass *) self;
ConnInfo *ci = &(conn->connInfo);
const char *drivername = ci->drivername;
char xaOptStr[32];
#if defined(_WIN32) && !defined(_WIN64)
/*
* If this is an x86 driver running on an x64 host then the driver name
* passed to MSDTC must be the (x64) driver but the client app will be
* using the 32-bit driver name. So MSDTC.exe will fail to find the driver
* and we'll fail to recover XA transactions.
*
* IsWow64Process(...) would be the ideal function for this, but is only
* available on Windows 6+ (Vista/2k8). We'd use GetNativeSystemInfo, which
* is supported on XP and 2k3, instead, but that won't link with older
* SDKs.
*
* It's impler to just test the PROCESSOR_ARCHITEW6432 environment
* variable.
*
* See http://www.postgresql.org/message-id/53A45B59.70303@2ndquadrant.com
* for details on this issue.
*/
const char * const procenv = getenv("PROCESSOR_ARCHITEW6432");
if (procenv != NULL && strcmp(procenv, "AMD64") == 0)
{
/*
* We're a 32-bit binary running under SysWow64 on a 64-bit host and need
* to pass a different driver name.
*
* To avoid playing memory management games, just return a different
* string constant depending on the unicode-ness of the driver.
*
* It probably doesn't matter whether we use the Unicode or ANSI driver
* for the XA transaction manager, but pick the same as the client driver
* to keep things as similar as possible.
*/
if (strcmp(drivername, DBMS_NAME) == 0)
#ifdef UNICODE_SUPPORT
drivername = DBMS_NAME_UNICODE"(x64)";
#else
drivername = DBMS_NAME_ANSI"(x64)";
#endif
}
#endif // _WIN32 && !_WIN64
if (0 >= ci->xa_opt) return;
switch (ci->xa_opt)
{
case DTC_CHECK_LINK_ONLY:
case DTC_CHECK_BEFORE_LINK:
SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=0;");
break;
case DTC_CHECK_RM_CONNECTION:
SPRINTF_FIXED(xaOptStr, KEYWORD_DTC_CHECK "=1;");
break;
default:
*xaOptStr = '\0';
break;
}
snprintf(connstr, strsize,
"DRIVER={%s};%s"
"SERVER=%s;PORT=%s;DATABASE=%s;"
"UID=%s;PWD=%s;" ABBR_SSLMODE "=%s",
drivername, xaOptStr,
ci->server, ci->port, ci->database, ci->username,
SAFE_NAME(ci->password), ci->sslmode
);
return;
}
#define SECURITY_WIN32
#include <security.h>
DLL_DECLARE int PgDtc_is_recovery_available(void *self, char *reason, int rsize)
{
ConnectionClass *conn = (ConnectionClass *) self;
ConnInfo *ci = &(conn->connInfo);
int ret = -1; // inknown
LONG nameSize;
char loginUser[256];
BOOL outReason = FALSE;
BOOL doubtRootCert = TRUE, doubtCert = TRUE;
const char *delim;
/*
* Root certificate is used?
*/
if (NULL != reason &&
rsize > 0)
outReason = TRUE;
/*
* Root certificate is used?
*/
doubtRootCert = FALSE;
if (0 == stricmp(ci->sslmode, SSLMODE_VERIFY_CA) ||
0 == stricmp(ci->sslmode, SSLMODE_VERIFY_FULL))
{
if (outReason)
strncpy_null(reason, "sslmode verify-[ca|full]", rsize);
return 0;
}
/*
* Did we use SSL client certificate, SSPI, Kerberos or similar
* authentication methods?
* There seems no way to check it directly.
*/
doubtCert = FALSE;
if (PQgetssl(conn->pqconn) != NULL)
doubtCert = TRUE;
nameSize = sizeof(loginUser);
if (GetUserNameEx(NameUserPrincipal, loginUser, &nameSize))
{
MYLOG(0, "loginUser=%s\n", loginUser);
}
else
{
int err = GetLastError();
switch (err)
{
case ERROR_NONE_MAPPED:
MYLOG(0, "The user name is unavailable in the specified format\n");
break;
case ERROR_NO_SUCH_DOMAIN:
MYLOG(0, "The domain controller is unavailable to perform the lookup\n");
break;
case ERROR_MORE_DATA:
MYLOG(0, "The buffer is too small\n");
break;
default:
MYLOG(0, "GetUserNameEx error=%d\n", err);
break;
}
}
ret = 1;
if (outReason)
*reason = '\0';
delim = "";
if (doubtRootCert)
{
if (outReason)
snprintf(reason, rsize, "%s%ssslmode verify-[ca|full]", reason, delim);
delim = ", ";
ret = -1;
}
if (doubtCert)
{
if (outReason)
snprintf(reason, rsize, "%s%scertificate", reason, delim);
delim = ", ";
ret = -1;
}
return ret;
}
DLL_DECLARE void PgDtc_set_async(void *self, void *async)
{
ConnectionClass *conn = (ConnectionClass *) self;
if (!conn) return;
CONNLOCK_ACQUIRE(conn);
if (NULL != async)
CC_set_autocommit(conn, FALSE);
else
SYNC_AUTOCOMMIT(conn);
conn->asdum = async;
CONNLOCK_RELEASE(conn);
}
DLL_DECLARE void *PgDtc_get_async(void *self)
{
ConnectionClass *conn = (ConnectionClass *) self;
return conn->asdum;
}
DLL_DECLARE void PgDtc_set_property(void *self, int property, void *value)
{
ConnectionClass *conn = (ConnectionClass *) self;
CONNLOCK_ACQUIRE(conn);
switch (property)
{
case inprogress:
if (NULL != value)
CC_set_dtc_executing(conn);
else
CC_no_dtc_executing(conn);
break;
case enlisted:
if (NULL != value)
CC_set_dtc_enlisted(conn);
else
CC_no_dtc_enlisted(conn);
break;
case prepareRequested:
if (NULL != value)
CC_set_dtc_prepareRequested(conn);
else
CC_no_dtc_prepareRequested(conn);
break;
}
CONNLOCK_RELEASE(conn);
}
DLL_DECLARE void PgDtc_set_error(void *self, const char *message, const char *func)
{
ConnectionClass *conn = (ConnectionClass *) self;
CC_set_error(conn, CONN_UNSUPPORTED_OPTION, message, func);
}
DLL_DECLARE int PgDtc_get_property(void *self, int property)
{
ConnectionClass *conn = (ConnectionClass *) self;
int ret;
CONNLOCK_ACQUIRE(conn);
switch (property)
{
case inprogress:
ret = CC_is_dtc_executing(conn);
break;
case enlisted:
ret = CC_is_dtc_enlisted(conn);
break;
case inTrans:
ret = CC_is_in_trans(conn);
break;
case errorNumber:
ret = CC_get_errornumber(conn);
break;
case idleInGlobalTransaction:
ret = CC_is_idle_in_global_transaction(conn);
break;
case connected:
ret = (CONN_CONNECTED == conn->status);
break;
case prepareRequested:
ret = CC_is_dtc_prepareRequested(conn);
break;
}
CONNLOCK_RELEASE(conn);
return ret;
}
DLL_DECLARE BOOL PgDtc_connect(void *self)
{
CSTR func = "PgDtc_connect";
ConnectionClass *conn = (ConnectionClass *) self;
if (CONN_CONNECTED == conn->status)
return TRUE;
if (CC_connect(conn, NULL) <= 0)
{
/* Error messages are filled in */
CC_log_error(func, "Error on CC_connect", conn);
return FALSE;
}
return TRUE;
}
DLL_DECLARE void PgDtc_free_connect(void *self)
{
ConnectionClass *conn = (ConnectionClass *) self;
PGAPI_FreeConnect(conn);
}
DLL_DECLARE BOOL PgDtc_one_phase_operation(void *self, int operation)
{
ConnectionClass *conn = (ConnectionClass *) self;
BOOL ret, is_in_progress = CC_is_dtc_executing(conn);
if (!is_in_progress)
CC_set_dtc_executing(conn);
switch (operation)
{
case ONE_PHASE_COMMIT:
ret = CC_commit(conn);
break;
default:
ret = CC_abort(conn);
break;
}
if (!is_in_progress)
CC_no_dtc_executing(conn);
return ret;
}
DLL_DECLARE BOOL
PgDtc_two_phase_operation(void *self, int operation, const char *gxid)
{
ConnectionClass *conn = (ConnectionClass *) self;
QResultClass *qres;
BOOL ret = TRUE;
char cmd[512];
switch (operation)
{
case PREPARE_TRANSACTION:
SPRINTF_FIXED(cmd, "PREPARE TRANSACTION '%s'", gxid);
break;
case COMMIT_PREPARED:
SPRINTF_FIXED(cmd, "COMMIT PREPARED '%s'", gxid);
break;
case ROLLBACK_PREPARED:
SPRINTF_FIXED(cmd, "ROLLBACK PREPARED '%s'", gxid);
break;
}
qres = CC_send_query(conn, cmd, NULL, 0, NULL);
if (!QR_command_maybe_successful(qres))
ret = FALSE;
QR_Destructor(qres);
return ret;
}
DLL_DECLARE BOOL
PgDtc_lock_cntrl(void *self, BOOL acquire, BOOL bTrial)
{
ConnectionClass *conn = (ConnectionClass *) self;
BOOL ret = TRUE;
if (acquire)
if (bTrial)
ret = TRY_ENTER_CONN_CS(conn);
else
ENTER_CONN_CS(conn);
else
LEAVE_CONN_CS(conn);
return ret;
}
static ConnectionClass *
CC_Copy(const ConnectionClass *conn)
{
ConnectionClass *newconn = CC_alloc();
if (newconn)
{
memcpy(newconn, conn, sizeof(ConnectionClass));
CC_lockinit(newconn);
}
return newconn;
}
DLL_DECLARE void *
PgDtc_isolate(void *self, DWORD option)
{
BOOL disposingConn = (0 != (disposingConnection & option));
ConnectionClass *sconn = (ConnectionClass *) self, *newconn = NULL;
if (0 == (useAnotherRoom & option))
{
HENV henv = sconn->henv;
CC_cleanup(sconn, TRUE);
if (newconn = CC_Copy(sconn), NULL == newconn)
return newconn;
MYLOG(0, "newconn=%p from %p\n", newconn, sconn);
CC_initialize(sconn, FALSE);
if (!disposingConn)
CC_copy_conninfo(&sconn->connInfo, &newconn->connInfo);
CC_initialize_pg_version(sconn);
sconn->henv = henv;
newconn->henv = NULL;
SYNC_AUTOCOMMIT(sconn);
return newconn;
}
newconn = CC_Constructor();
if (!newconn) return NULL;
CC_copy_conninfo(&newconn->connInfo, &sconn->connInfo);
CC_initialize_pg_version(newconn);
newconn->asdum = sconn->asdum;
newconn->gTranInfo = sconn->gTranInfo;
CC_set_dtc_isolated(newconn);
sconn->asdum = NULL;
SYNC_AUTOCOMMIT(sconn);
CC_set_dtc_clear(sconn);
MYLOG(0, "generated connection=%p with %p\n", newconn, newconn->asdum);
return newconn;
}
#endif /* _HANDLE_ENLIST_IN_DTC_ */
BOOL
CC_set_transact(ConnectionClass *self, UInt4 isolation)
{
char *query;
QResultClass *res;
BOOL bShow = FALSE;
if (PG_VERSION_LT(self, 8.0) &&
(isolation == SQL_TXN_READ_UNCOMMITTED ||
isolation == SQL_TXN_REPEATABLE_READ))
{
CC_set_error(self, CONN_NOT_IMPLEMENTED_ERROR, "READ_UNCOMMITTED or REPEATABLE_READ is not supported by the server", __FUNCTION__);
return FALSE;
}
switch (isolation)
{
case SQL_TXN_SERIALIZABLE:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE";
break;
case SQL_TXN_REPEATABLE_READ:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ";
break;
case SQL_TXN_READ_UNCOMMITTED:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ UNCOMMITTED";
break;
default:
query = "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED";
break;
}
if (self->default_isolation == 0)
bShow = TRUE;
if (bShow)
res = CC_send_query_append(self, ISOLATION_SHOW_QUERY, NULL, READ_ONLY_QUERY, NULL, query);
else
res = CC_send_query(self, query, NULL, READ_ONLY_QUERY, NULL);
if (!QR_command_maybe_successful(res))
{
CC_set_error(self, CONN_EXEC_ERROR, "ISOLATION change request to the server error", __FUNCTION__);
QR_Destructor(res);
return FALSE;
}
if (bShow)
handle_show_results(res);
QR_Destructor(res);
self->server_isolation = isolation;
return TRUE;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/b_127/openGauss-connector-odbc.git
git@gitee.com:b_127/openGauss-connector-odbc.git
b_127
openGauss-connector-odbc
openGauss-connector-odbc
master

搜索帮助