1 Star 0 Fork 346

低调/swoole-src

forked from swoole/swoole-src 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
swoole_postgresql_coro.c 33.33 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058
/*
+----------------------------------------------------------------------+
| Swoole |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| license@swoole.com so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Zhenyu Wu <936321732@qq.com> |
+----------------------------------------------------------------------+
*/
#include "php_swoole.h"
#ifdef SW_COROUTINE
#include "swoole_coroutine.h"
#ifdef SW_USE_POSTGRESQL
#include "swoole_postgresql_coro.h"
#include "swoole_coroutine.h"
static PHP_METHOD(swoole_postgresql_coro, __construct);
static PHP_METHOD(swoole_postgresql_coro, __destruct);
static PHP_METHOD(swoole_postgresql_coro, connect);
static PHP_METHOD(swoole_postgresql_coro, query);
static PHP_METHOD(swoole_postgresql_coro, fetchAll);
static PHP_METHOD(swoole_postgresql_coro, affectedRows);
static PHP_METHOD(swoole_postgresql_coro, numRows);
static PHP_METHOD(swoole_postgresql_coro,metaData);
static PHP_METHOD(swoole_postgresql_coro,fetchObject);
static PHP_METHOD(swoole_postgresql_coro,fetchAssoc);
static PHP_METHOD(swoole_postgresql_coro,fetchArray);
static PHP_METHOD(swoole_postgresql_coro,fetchRow);
static void php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAMETERS, zend_long result_type, int into_object);
static void _destroy_pgsql_link(zend_resource *rsrc);
static void _free_result(zend_resource *rsrc);
static int swoole_pgsql_coro_onRead(swReactor *reactor, swEvent *event);
static int swoole_pgsql_coro_onWrite(swReactor *reactor, swEvent *event);
static int swoole_pgsql_coro_onError(swReactor *reactor, swEvent *event);
int php_pgsql_result2array(PGresult *pg_result, zval *ret_array, long result_type);
static int swoole_postgresql_coro_close(pg_object *pg_object);
static int query_result_parse(pg_object *pg_object);
static int meta_data_result_parse(pg_object *pg_object);
ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_void, 0, 0, 0)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_connect, 0, 0, -1)
ZEND_ARG_INFO(0, conninfo)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_query, 0, 0, 0)
ZEND_ARG_INFO(0, connection)
ZEND_ARG_INFO(0, query)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_all, 0, 0, 0)
ZEND_ARG_INFO(0, result)
ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_affected_rows, 0, 0, 0)
ZEND_ARG_INFO(0, result)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_num_rows, 0, 0, 0)
ZEND_ARG_INFO(0, result)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_meta_data, 0, 0, 2)
ZEND_ARG_INFO(0, connection)
ZEND_ARG_INFO(0, table_name)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_row, 0, 0, 1)
ZEND_ARG_INFO(0, result)
ZEND_ARG_INFO(0, row)
ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_assoc, 0, 0, 1)
ZEND_ARG_INFO(0, result)
ZEND_ARG_INFO(0, row)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_array, 0, 0, 1)
ZEND_ARG_INFO(0, result)
ZEND_ARG_INFO(0, row)
ZEND_ARG_INFO(0, result_type)
ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_INFO_EX(arginfo_pg_fetch_object, 0, 0, 1)
ZEND_ARG_INFO(0, result)
ZEND_ARG_INFO(0, row)
ZEND_ARG_INFO(0, class_name)
ZEND_ARG_INFO(0, l)
ZEND_ARG_INFO(0, ctor_params)
ZEND_END_ARG_INFO()
static const zend_function_entry swoole_postgresql_coro_methods[] =
{
PHP_ME(swoole_postgresql_coro, __construct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
PHP_ME(swoole_postgresql_coro, connect, arginfo_pg_connect, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, query, arginfo_pg_query, ZEND_ACC_PUBLIC )
PHP_ME(swoole_postgresql_coro, fetchAll, arginfo_pg_fetch_all, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, affectedRows, arginfo_pg_affected_rows, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, numRows, arginfo_pg_num_rows, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, metaData, arginfo_pg_meta_data, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, fetchObject, arginfo_pg_fetch_object, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, fetchAssoc, arginfo_pg_fetch_assoc, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, fetchArray, arginfo_pg_fetch_array, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, fetchRow, arginfo_pg_fetch_row, ZEND_ACC_PUBLIC)
PHP_ME(swoole_postgresql_coro, __destruct, arginfo_swoole_void, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR)
PHP_FE_END
};
static zend_class_entry swoole_postgresql_coro_ce;
static zend_class_entry *swoole_postgresql_coro_class_entry_ptr;
static int le_link , le_result;
void swoole_postgresql_coro_init(int module_number TSRMLS_DC)
{
INIT_CLASS_ENTRY(swoole_postgresql_coro_ce, "Swoole\\Coroutine\\PostgreSQL", swoole_postgresql_coro_methods);
le_link = zend_register_list_destructors_ex(_destroy_pgsql_link, NULL, "pgsql link", module_number);
le_result = zend_register_list_destructors_ex(_free_result, NULL, "pgsql result", module_number);
swoole_postgresql_coro_class_entry_ptr = zend_register_internal_class(&swoole_postgresql_coro_ce TSRMLS_CC);
if (SWOOLE_G(use_shortname))
{
sw_zend_register_class_alias("Co\\PostgreSQL", swoole_postgresql_coro_class_entry_ptr);
}
REGISTER_LONG_CONSTANT("SW_PGSQL_ASSOC", PGSQL_ASSOC, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("SW_PGSQL_NUM", PGSQL_NUM, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("SW_PGSQL_BOTH", PGSQL_BOTH, CONST_CS | CONST_PERSISTENT);
}
static PHP_METHOD(swoole_postgresql_coro, __construct)
{
coro_check(TSRMLS_C);
pg_object *pg;
pg = emalloc(sizeof(pg_object));
bzero(pg,sizeof(pg_object));
pg -> object = getThis();
swoole_set_object(getThis(), pg);
}
static PHP_METHOD(swoole_postgresql_coro, connect)
{
zval *conninfo;
PGconn * pgsql;
ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_ZVAL(conninfo)
ZEND_PARSE_PARAMETERS_END();
pgsql = PQconnectStart(Z_STRVAL_P(conninfo));
int fd = PQsocket(pgsql);
php_swoole_check_reactor();
if (!swReactor_handle_isset(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL))
{
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_READ, swoole_pgsql_coro_onRead);
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_WRITE, swoole_pgsql_coro_onWrite);
SwooleG.main_reactor->setHandle(SwooleG.main_reactor, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_ERROR, swoole_pgsql_coro_onError);
}
if (SwooleG.main_reactor->add(SwooleG.main_reactor, fd, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_WRITE) < 0)
{
swoole_php_fatal_error(E_WARNING, "swoole_event_add failed.");
RETURN_FALSE;
}
pg_object *pg_object = swoole_get_object(getThis());
pg_object->fd = fd;
pg_object->conn = pgsql;
pg_object->status = CONNECTION_STARTED;
PQsetnonblocking(pgsql , 1);
if (pgsql==NULL || PQstatus(pgsql)==CONNECTION_BAD)
{
swWarn("Unable to connect to PostgreSQL server: [%s]",pgsql);
if (pgsql)
{
PQfinish(pgsql);
}
RETURN_FALSE;
}
swConnection *_socket = swReactor_get(SwooleG.main_reactor, fd);
_socket->object = pg_object;
_socket->active = 0;
php_context *sw_current_context = swoole_get_property(getThis(), 0);
if (!sw_current_context)
{
sw_current_context = emalloc(sizeof(php_context));
swoole_set_property(getThis(), 0, sw_current_context);
}
sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
sw_current_context->onTimeout = NULL;
#if PHP_MAJOR_VERSION < 7
sw_current_context->coro_params = getThis();
#else
sw_current_context->coro_params = *getThis();
#endif
//TODO: add the timeout
/*
if (pg_object->timeout > 0)
{
php_swoole_check_timer((int) (ph_object->timeout * 1000));
pg_object->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_pgsql_coro_onTimeout);
}*/
coro_save(sw_current_context);
coro_yield();
}
static int swoole_pgsql_coro_onWrite(swReactor *reactor, swEvent *event)
{
char *errMsg;
#if PHP_MAJOR_VERSION < 7
TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif
if (event->socket->active)
{
return swReactor_onWrite(SwooleG.main_reactor, event);
}
socklen_t len = sizeof(SwooleG.error);
if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0)
{
swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno);
return SW_ERR;
}
pg_object *pg_object = event->socket->object;
// wait the connection ok
ConnStatusType status = PQstatus(pg_object->conn);
if(status != CONNECTION_OK){
PostgresPollingStatusType flag = PGRES_POLLING_WRITING;
for (;;)
{
switch (flag)
{
case PGRES_POLLING_OK:
break;
case PGRES_POLLING_READING:
break;
case PGRES_POLLING_WRITING:
break;
case PGRES_POLLING_FAILED:
errMsg = PQerrorMessage(pg_object->conn);
php_printf("error:%s",errMsg);
break;
default:
break;
}
flag = PQconnectPoll(pg_object->conn);
if(flag == PGRES_POLLING_OK )
{
break;
}
if(flag == PGRES_POLLING_FAILED )
{
errMsg = PQerrorMessage(pg_object->conn);
swWarn("error:[%s] please cofirm that the connection configuration is correct \n",errMsg);
break;
}
}
}
//listen read event
SwooleG.main_reactor->set(SwooleG.main_reactor, event->fd, PHP_SWOOLE_FD_POSTGRESQL | SW_EVENT_READ);
//connected
event->socket->active = 1;
php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
zval *retval = NULL;
zval return_value;
ZVAL_RES(&return_value, zend_register_resource(pg_object->conn, le_link));
int ret = coro_resume(sw_current_context, &return_value, &retval);
if (ret == CORO_END && retval)
{
sw_zval_ptr_dtor(&retval);
}
return SW_OK;
}
static int swoole_pgsql_coro_onRead(swReactor *reactor, swEvent *event)
{
pg_object *pg_object = (event->socket->object);
#if PHP_MAJOR_VERSION < 7
TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif
switch (pg_object->request_type)
{
case NORMAL_QUERY:
query_result_parse(pg_object);
break;
case META_DATA:
meta_data_result_parse(pg_object);
break;
}
return SW_OK;
}
static int meta_data_result_parse(pg_object *pg_object)
{
int i, num_rows;
zval elem;
PGresult *pg_result;
zend_bool extended=0;
pg_result =PQgetResult(pg_object->conn);
if (PQresultStatus(pg_result) != PGRES_TUPLES_OK || (num_rows = PQntuples(pg_result)) == 0)
{
php_error_docref(NULL, E_WARNING, "Table doesn't exists");
return 0;
}
zval return_value;
array_init(&return_value);
zval * retval = NULL;
array_init(&elem);
for (i = 0; i < num_rows; i++)
{
pg_object->result = pg_result;
char *name;
/* pg_attribute.attnum */
add_assoc_long_ex(&elem, "num", sizeof("num") - 1, atoi(PQgetvalue(pg_result, i, 1)));
/* pg_type.typname */
add_assoc_string_ex(&elem, "type", sizeof("type") - 1, PQgetvalue(pg_result, i, 2));
/* pg_attribute.attlen */
add_assoc_long_ex(&elem, "len", sizeof("len") - 1, atoi(PQgetvalue(pg_result, i, 3)));
/* pg_attribute.attnonull */
add_assoc_bool_ex(&elem, "not null", sizeof("not null") - 1, !strcmp(PQgetvalue(pg_result, i, 4), "t"));
/* pg_attribute.atthasdef */
add_assoc_bool_ex(&elem, "has default", sizeof("has default") - 1,
!strcmp(PQgetvalue(pg_result, i, 5), "t"));
/* pg_attribute.attndims */
add_assoc_long_ex(&elem, "array dims", sizeof("array dims") - 1, atoi(PQgetvalue(pg_result, i, 6)));
/* pg_type.typtype */
add_assoc_bool_ex(&elem, "is enum", sizeof("is enum") - 1, !strcmp(PQgetvalue(pg_result, i, 7), "e"));
if (extended) {
/* pg_type.typtype */
add_assoc_bool_ex(&elem, "is base", sizeof("is base") - 1, !strcmp(PQgetvalue(pg_result, i, 7), "b"));
add_assoc_bool_ex(&elem, "is composite", sizeof("is composite") - 1,
!strcmp(PQgetvalue(pg_result, i, 7), "c"));
add_assoc_bool_ex(&elem, "is pesudo", sizeof("is pesudo") - 1,
!strcmp(PQgetvalue(pg_result, i, 7), "p"));
/* pg_description.description */
add_assoc_string_ex(&elem, "description", sizeof("description") - 1, PQgetvalue(pg_result, i, 8));
}
/* pg_attribute.attname */
name = PQgetvalue(pg_result, i, 0);
add_assoc_zval(&return_value, name, &elem);
}
php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
int ret = coro_resume(sw_current_context, &return_value, &retval);
if (ret == CORO_END && retval)
{
sw_zval_ptr_dtor(&retval);
}
zval_ptr_dtor(&return_value);
zval_ptr_dtor(&elem);
return SW_OK;
}
static int query_result_parse(pg_object *pg_object)
{
PGresult *pgsql_result;
ExecStatusType status;
int error = 0;
char *errMsg;
int ret, res;
zval *retval = NULL;
zval return_value;
php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
pgsql_result =PQgetResult(pg_object->conn);
status = PQresultStatus(pgsql_result);
switch (status) {
case PGRES_EMPTY_QUERY:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
errMsg = PQerrorMessage(pg_object->conn);
swWarn("Query failed: [%s]",errMsg);
PQclear(pgsql_result);
ZVAL_FALSE(&return_value);
ret = coro_resume(sw_current_context, &return_value, &retval);
if (ret == CORO_END && retval)
{
sw_zval_ptr_dtor(&retval);
}
swoole_postgresql_coro_close(pg_object);
break;
case PGRES_COMMAND_OK: /* successful command that did not return rows */
default:
pg_object->result = pgsql_result;
pg_object->row = 0;
/* Wait to finish sending buffer */
res = PQflush(pg_object->conn);
ZVAL_RES(&return_value, zend_register_resource(pg_object, le_result));
ret = coro_resume(sw_current_context, &return_value, &retval);
if (ret == CORO_END && retval)
{
sw_zval_ptr_dtor(&retval);
}
PQclear(pgsql_result);
if (error != 0)
{
swoole_php_fatal_error(E_WARNING, "swoole_event->onError[1]: socket error. Error: %s [%d]", strerror(error), error);
}
break;
}
return SW_OK;
}
static PHP_METHOD(swoole_postgresql_coro, query)
{
zval *pgsql_link = NULL;
zval *query;
PGconn *pgsql;
PGresult *pgsql_result;
ZEND_PARSE_PARAMETERS_START(2,2)
Z_PARAM_RESOURCE(pgsql_link)
Z_PARAM_ZVAL(query)
ZEND_PARSE_PARAMETERS_END();
pgsql = (PGconn *)zend_fetch_resource(Z_RES_P(pgsql_link), "postgresql connection", le_link);
pg_object *pg_object = swoole_get_object(getThis());
pg_object->request_type = NORMAL_QUERY;
while ((pgsql_result = PQgetResult(pgsql)))
{
PQclear(pgsql_result);
}
int ret = PQsendQuery(pgsql, Z_STRVAL_P(query));
if(ret == 0)
{
char * errMsg = PQerrorMessage(pgsql);
php_printf("error:%s",errMsg);
}
php_context *sw_current_context = swoole_get_property(getThis(), 0);
sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
sw_current_context->onTimeout = NULL;
#if PHP_MAJOR_VERSION < 7
sw_current_context->coro_params = getThis();
#else
sw_current_context->coro_params = *getThis();
#endif
//TODO: add the timeout
/*
if (pg_object->timeout > 0)
{
php_swoole_check_timer((int) (ph_object->timeout * 1000));
pg_object->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_pgsql_coro_onTimeout);
}*/
coro_save(sw_current_context);
coro_yield();
}
/* {{{ php_pgsql_result2array
*/
int swoole_pgsql_result2array(PGresult *pg_result, zval *ret_array, long result_type)
{
zval row;
char *field_name;
size_t num_fields;
int pg_numrows, pg_row;
uint32_t i;
assert(Z_TYPE_P(ret_array) == IS_ARRAY);
if ((pg_numrows = PQntuples(pg_result)) <= 0)
{
return FAILURE;
}
for (pg_row = 0; pg_row < pg_numrows; pg_row++)
{
array_init(&row);
for (i = 0, num_fields = PQnfields(pg_result); i < num_fields; i++)
{
field_name = PQfname(pg_result, i);
if (PQgetisnull(pg_result, pg_row, i))
{
if (result_type & PGSQL_ASSOC)
{
add_assoc_null(&row, field_name);
}
if (result_type & PGSQL_NUM)
{
add_next_index_null(&row);
}
}
else
{
char *element = PQgetvalue(pg_result, pg_row, i);
if (element)
{
const size_t element_len = strlen(element);
if (result_type & PGSQL_ASSOC)
{
add_assoc_stringl(&row, field_name, element, element_len);
}
if (result_type & PGSQL_NUM)
{
add_next_index_stringl(&row, element, element_len);
}
}
}
}
add_index_zval(ret_array, pg_row, &row);
}
return SUCCESS;
}
static PHP_METHOD(swoole_postgresql_coro, fetchAll)
{
zval *result;
PGresult *pgsql_result;
pg_object *object;
zend_long result_type = PGSQL_ASSOC;
ZEND_PARSE_PARAMETERS_START(1,2)
Z_PARAM_RESOURCE(result)
Z_PARAM_OPTIONAL
Z_PARAM_LONG(result_type)
ZEND_PARSE_PARAMETERS_END();
if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
{
RETURN_FALSE;
}
pgsql_result = object->result;
array_init(return_value);
if (swoole_pgsql_result2array(pgsql_result, return_value, result_type) == FAILURE)
{
zval_dtor(return_value);
RETURN_FALSE;
}
}
static PHP_METHOD(swoole_postgresql_coro,affectedRows)
{
zval *result;
PGresult *pgsql_result;
pg_object *object;
ZEND_PARSE_PARAMETERS_START(1,1)
Z_PARAM_RESOURCE(result)
ZEND_PARSE_PARAMETERS_END();
if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
{
RETURN_FALSE;
}
pgsql_result = object->result;
RETVAL_LONG(atoi(PQcmdTuples(pgsql_result)));
}
//query's num
static PHP_METHOD(swoole_postgresql_coro,numRows)
{
zval *result;
PGresult *pgsql_result;
pg_object *object;
ZEND_PARSE_PARAMETERS_START(1,1)
Z_PARAM_RESOURCE(result)
ZEND_PARSE_PARAMETERS_END();
if ((object = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
{
RETURN_FALSE;
}
pgsql_result = object->result;
RETVAL_LONG(PQntuples(pgsql_result));
}
static PHP_METHOD(swoole_postgresql_coro,metaData)
{
zval *pgsql_link;
char *table_name;
size_t table_name_len;
zend_bool extended=0;
PGconn *pgsql;
PGresult *pg_result;
char *src, *tmp_name, *tmp_name2 = NULL;
char *escaped;
smart_str querystr = {0};
size_t new_len;
ZEND_PARSE_PARAMETERS_START(2,2)
Z_PARAM_RESOURCE(pgsql_link)
Z_PARAM_STRING(table_name, table_name_len)
ZEND_PARSE_PARAMETERS_END();
pgsql = (PGconn *)zend_fetch_resource(Z_RES_P(pgsql_link), "postgresql connection", le_link);
while ((pg_result = PQgetResult(pgsql)))
{
PQclear(pg_result);
}
if (!*table_name)
{
php_error_docref(NULL, E_WARNING, "The table name must be specified");
RETURN_FALSE;
}
src = estrdup(table_name);
tmp_name = php_strtok_r(src, ".", &tmp_name2);
if (!tmp_name)
{
efree(src);
php_error_docref(NULL, E_WARNING, "The table name must be specified");
RETURN_FALSE;
}
if (!tmp_name2 || !*tmp_name2)
{
/* Default schema */
tmp_name2 = tmp_name;
tmp_name = "public";
}
if (extended)
{
smart_str_appends(&querystr,
"SELECT a.attname, a.attnum, t.typname, a.attlen, a.attnotNULL, a.atthasdef, a.attndims, t.typtype, "
"d.description "
"FROM pg_class as c "
" JOIN pg_attribute a ON (a.attrelid = c.oid) "
" JOIN pg_type t ON (a.atttypid = t.oid) "
" JOIN pg_namespace n ON (c.relnamespace = n.oid) "
" LEFT JOIN pg_description d ON (d.objoid=a.attrelid AND d.objsubid=a.attnum AND c.oid=d.objoid) "
"WHERE a.attnum > 0 AND c.relname = '");
}
else
{
smart_str_appends(&querystr,
"SELECT a.attname, a.attnum, t.typname, a.attlen, a.attnotnull, a.atthasdef, a.attndims, t.typtype "
"FROM pg_class as c "
" JOIN pg_attribute a ON (a.attrelid = c.oid) "
" JOIN pg_type t ON (a.atttypid = t.oid) "
" JOIN pg_namespace n ON (c.relnamespace = n.oid) "
"WHERE a.attnum > 0 AND c.relname = '");
}
escaped = (char *)safe_emalloc(strlen(tmp_name2), 2, 1);
new_len = PQescapeStringConn(pgsql, escaped, tmp_name2, strlen(tmp_name2), NULL);
if (new_len)
{
smart_str_appendl(&querystr, escaped, new_len);
}
efree(escaped);
smart_str_appends(&querystr, "' AND n.nspname = '");
escaped = (char *)safe_emalloc(strlen(tmp_name), 2, 1);
new_len = PQescapeStringConn(pgsql, escaped, tmp_name, strlen(tmp_name), NULL);
if (new_len)
{
smart_str_appendl(&querystr, escaped, new_len);
}
efree(escaped);
smart_str_appends(&querystr, "' ORDER BY a.attnum;");
smart_str_0(&querystr);
efree(src);
//pg_result = PQexec(pgsql, ZSTR_VAL(querystr.s));
pg_object *pg_object = swoole_get_object(getThis());
pg_object->request_type = META_DATA;
int ret = PQsendQuery(pgsql, ZSTR_VAL(querystr.s));
if(ret == 0)
{
char * errMsg = PQerrorMessage(pgsql);
php_printf("error:%s",errMsg);
}
smart_str_free(&querystr);
php_context *sw_current_context = swoole_get_property(getThis(), 0);
sw_current_context->state = SW_CORO_CONTEXT_RUNNING;
sw_current_context->onTimeout = NULL;
#if PHP_MAJOR_VERSION < 7
sw_current_context->coro_params = getThis();
#else
sw_current_context->coro_params = *getThis();
#endif
/*
if (redis->timeout > 0)
{
php_swoole_check_timer((int) (redis->timeout * 1000));
redis->timer = SwooleG.timer.add(&SwooleG.timer, (int) (redis->timeout * 1000), 0, sw_current_context, swoole_redis_coro_onTimeout);
}*/
zval_ptr_dtor(pgsql_link);
coro_save(sw_current_context);
coro_yield();
}
/* {{{ void php_pgsql_fetch_hash */
static void php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAMETERS, zend_long result_type, int into_object)
{
zval *result, *zrow = NULL;
PGresult *pgsql_result;
pg_object *pg_result;
int i, num_fields, pgsql_row, use_row;
zend_long row = -1;
char *field_name;
zval *ctor_params = NULL;
zend_class_entry *ce = NULL;
if (into_object)
{
zend_string *class_name = NULL;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "r|z!Sz", &result, &zrow, &class_name, &ctor_params) == FAILURE)
{
return;
}
if (!class_name)
{
ce = zend_standard_class_def;
} else {
ce = zend_fetch_class(class_name, ZEND_FETCH_CLASS_AUTO);
}
if (!ce)
{
php_error_docref(NULL, E_WARNING, "Could not find class '%s'", ZSTR_VAL(class_name));
return;
}
result_type = PGSQL_ASSOC;
}
else
{
if (zend_parse_parameters(ZEND_NUM_ARGS(), "r|z!l", &result, &zrow, &result_type) == FAILURE)
{
return;
}
}
if (zrow == NULL)
{
row = -1;
} else {
convert_to_long(zrow);
row = Z_LVAL_P(zrow);
if (row < 0) {
php_error_docref(NULL, E_WARNING, "The row parameter must be greater or equal to zero");
RETURN_FALSE;
}
}
use_row = ZEND_NUM_ARGS() > 1 && row != -1;
if (!(result_type & PGSQL_BOTH))
{
php_error_docref(NULL, E_WARNING, "Invalid result type");
RETURN_FALSE;
}
if ((pg_result = (pg_object *)zend_fetch_resource(Z_RES_P(result), "PostgreSQL result", le_result)) == NULL)
{
RETURN_FALSE;
}
pgsql_result = pg_result->result;
if (use_row)
{
if (row < 0 || row >= PQntuples(pgsql_result))
{
php_error_docref(NULL, E_WARNING, "Unable to jump to row " ZEND_LONG_FMT " on PostgreSQL result index " ZEND_LONG_FMT,
row, Z_LVAL_P(result));
RETURN_FALSE;
}
pgsql_row = (int)row;
pg_result->row = pgsql_row;
}
else
{
/* If 2nd param is NULL, use internal row counter to access next row */
pgsql_row = pg_result->row;
if (pgsql_row < 0 || pgsql_row >= PQntuples(pgsql_result)) {
RETURN_FALSE;
}
pg_result->row++;
}
array_init(return_value);
for (i = 0, num_fields = PQnfields(pgsql_result); i < num_fields; i++)
{
if (PQgetisnull(pgsql_result, pgsql_row, i)) {
if (result_type & PGSQL_NUM)
{
add_index_null(return_value, i);
}
if (result_type & PGSQL_ASSOC)
{
field_name = PQfname(pgsql_result, i);
add_assoc_null(return_value, field_name);
}
}
else
{
char *element = PQgetvalue(pgsql_result, pgsql_row, i);
if (element)
{
const size_t element_len = strlen(element);
if (result_type & PGSQL_NUM)
{
add_index_stringl(return_value, i, element, element_len);
}
if (result_type & PGSQL_ASSOC)
{
field_name = PQfname(pgsql_result, i);
add_assoc_stringl(return_value, field_name, element, element_len);
}
}
}
}
if (into_object)
{
zval dataset;
zend_fcall_info fci;
zend_fcall_info_cache fcc;
zval retval;
ZVAL_COPY_VALUE(&dataset, return_value);
object_and_properties_init(return_value, ce, NULL);
if (!ce->default_properties_count && !ce->__set)
{
Z_OBJ_P(return_value)->properties = Z_ARR(dataset);
}
else
{
zend_merge_properties(return_value, Z_ARRVAL(dataset));
zval_ptr_dtor(&dataset);
}
if (ce->constructor)
{
fci.size = sizeof(fci);
ZVAL_UNDEF(&fci.function_name);
fci.object = Z_OBJ_P(return_value);
fci.retval = &retval;
fci.params = NULL;
fci.param_count = 0;
fci.no_separation = 1;
if (ctor_params && Z_TYPE_P(ctor_params) != IS_NULL)
{
if (zend_fcall_info_args(&fci, ctor_params) == FAILURE) {
/* Two problems why we throw exceptions here: PHP is typeless
* and hence passing one argument that's not an array could be
* by mistake and the other way round is possible, too. The
* single value is an array. Also we'd have to make that one
* argument passed by reference.
*/
zend_throw_exception(zend_ce_exception, "Parameter ctor_params must be an array", 0);
return;
}
}
fcc.initialized = 1;
fcc.function_handler = ce->constructor;
fcc.calling_scope = zend_get_executed_scope();
fcc.called_scope = Z_OBJCE_P(return_value);
fcc.object = Z_OBJ_P(return_value);
if (zend_call_function(&fci, &fcc) == FAILURE)
{
zend_throw_exception_ex(zend_ce_exception, 0, "Could not execute %s::%s()", ZSTR_VAL(ce->name), ZSTR_VAL(ce->constructor->common.function_name));
} else
{
zval_ptr_dtor(&retval);
}
if (fci.params)
{
efree(fci.params);
}
}
else if (ctor_params)
{
zend_throw_exception_ex(zend_ce_exception, 0, "Class %s does not have a constructor hence you cannot use ctor_params", ZSTR_VAL(ce->name));
}
}
}
/* }}} */
/* {{{ proto array fetchRow(resource result [, int row [, int result_type]])
Get a row as an enumerated array */
static PHP_METHOD(swoole_postgresql_coro,fetchRow)
{
php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_NUM, 0);
}
/* }}} */
/* {{{ proto array fetchAssoc(resource result [, int row])
Fetch a row as an assoc array */
static PHP_METHOD(swoole_postgresql_coro,fetchAssoc)
{
/* pg_fetch_assoc() is added from PHP 4.3.0. It should raise error, when
there is 3rd parameter */
if (ZEND_NUM_ARGS() > 2)
WRONG_PARAM_COUNT;
php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_ASSOC, 0);
}
/* }}} */
/* {{{ proto array fetchArray(resource result [, int row [, int result_type]])
Fetch a row as an array */
static PHP_METHOD(swoole_postgresql_coro,fetchArray)
{
php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_BOTH, 0);
}
/* }}} */
/* {{{ proto object fetchObject(resource result [, int row [, string class_name [, NULL|array ctor_params]]])
Fetch a row as an object */
static PHP_METHOD(swoole_postgresql_coro,fetchObject)
{
/* fetchObject() allowed result_type used to be. 3rd parameter
must be allowed for compatibility */
php_pgsql_fetch_hash(INTERNAL_FUNCTION_PARAM_PASSTHRU, PGSQL_ASSOC, 1);
}
/* {{{ _destroy_pgsql_link
*/
static void _destroy_pgsql_link(zend_resource *rsrc)
{
PGconn *link = (PGconn *)rsrc->ptr;
PGresult *res;
while ((res = PQgetResult(link)))
{
PQclear(res);
}
PQfinish(link);
}
static void _free_result(zend_resource *rsrc)
{
pg_object *pg_result = (pg_object *)rsrc->ptr;
efree(pg_result);
}
static int swoole_pgsql_coro_onError(swReactor *reactor, swEvent *event)
{
#if PHP_MAJOR_VERSION < 7
TSRMLS_FETCH_FROM_CTX(sw_thread_ctx ? sw_thread_ctx : NULL);
#endif
pg_object *pg_object = (event->socket->object);
zval *retval = NULL, *result;
zval *zobject = pg_object->object;
swoole_postgresql_coro_close(pg_object);
SW_ALLOC_INIT_ZVAL(result);
ZVAL_BOOL(result, 0);
php_context *sw_current_context = swoole_get_property(zobject, 0);
int ret = coro_resume(sw_current_context, result, &retval);
sw_zval_free(result);
if (ret == CORO_END && retval)
{
sw_zval_ptr_dtor(&retval);
}
return SW_OK;
}
static PHP_METHOD(swoole_postgresql_coro, __destruct)
{
pg_object *pg_object = swoole_get_object(getThis());
swoole_postgresql_coro_close(pg_object);
}
static int swoole_postgresql_coro_close(pg_object *pg_object)
{
if (!pg_object)
{
swoole_php_fatal_error(E_WARNING, "object is not instanceof swoole_postgresql_coro.");
return FAILURE;
}
SwooleG.main_reactor->del(SwooleG.main_reactor, pg_object->fd);
swConnection *_socket = swReactor_get(SwooleG.main_reactor, pg_object->fd);
_socket->object = NULL;
_socket->active = 0;
efree(pg_object);
if(pg_object->object)
{
php_context *sw_current_context = swoole_get_property(pg_object->object, 0);
efree(sw_current_context);
}
return SUCCESS;
}
#endif
#endif
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/cfyy/swoole.git
git@gitee.com:cfyy/swoole.git
cfyy
swoole
swoole-src
master

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385