37 Star 141 Fork 26

郭新华/php-cp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
connect_pool.c 30.99 KB
一键复制 编辑 原始数据 按行查看 历史
goodlike1 提交于 2017-01-17 18:34 . php7 memleak
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
/*
+----------------------------------------------------------------------+
| common con pool |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| license@php.net so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Xinhua Guo <woshiguo35@sina.com> |
+----------------------------------------------------------------------+
*/
#include "php_connect_pool.h"
#include <ext/standard/info.h>
#include "zend_interfaces.h"
#include "zend_exceptions.h"
#include <sys/time.h>
#include <Zend/zend_types.h>
#include <Zend/zend.h>
#include <ext/spl/spl_iterators.h>
static zval* pdo_stmt = NULL;
extern zval* pdo_object;
extern zval* redis_object;
cpServerG ConProxyG;
cpServerGS *ConProxyGS = NULL;
cpWorkerG ConProxyWG;
extern sapi_module_struct sapi_module;
static void cp_destory_client(zend_resource *rsrc TSRMLS_DC);
static void pdo_dispatch(zval *args);
static void pdo_proxy_pdo(zval *args);
static void pdo_proxy_stmt(zval *args);
static void cp_add_fail_into_mem(zval *conf, zval *data_source);
#define CP_VERSION "1.5.0"
#define CP_INTERNAL_ERROR_SEND(send_data)\
({ \
CP_INTERNAL_SEND_RAW(send_data,CP_SIGEVENT_EXCEPTION)\
})
#define CP_INTERNAL_NORMAL_SEND(send_data)\
({ \
CP_INTERNAL_SEND_RAW(send_data,CP_SIGEVENT_TURE)\
})
#define CP_SEND_EXCEPTION do{zval *str;CP_SEND_EXCEPTION_ARGS(&str);cp_zval_ptr_dtor(&str);}while(0);
#define CP_INTERNAL_NORMAL_SEND_RETURN(send_data)({CP_INTERNAL_NORMAL_SEND(send_data);return CP_TRUE;})
#define CP_INTERNAL_ERROR_SEND_RETURN(send_data) ({ CP_INTERNAL_ERROR_SEND(send_data);return CP_FALSE;})
#define CP_SEND_EXCEPTION_RETURN do{CP_SEND_EXCEPTION;return CP_FALSE;}while(0);
#define CP_TEST_RETURN_TRUE(flag) ({if(flag==CP_CONNECT_PING)return CP_TRUE;})
const zend_function_entry cp_functions[] = {
PHP_FE(pool_server_create, NULL)
PHP_FE(pool_server_status, NULL)
PHP_FE(pool_server_shutdown, NULL)
PHP_FE(pool_server_reload, NULL)
PHP_FE(pool_server_version, NULL)
PHP_FE_END /* Must be the last line in cp_functions[] */
};
ZEND_BEGIN_ARG_INFO_EX(__call_args, 0, 0, 2)
ZEND_ARG_INFO(0, function_name)
ZEND_ARG_INFO(0, arguments)
ZEND_END_ARG_INFO()
const zend_function_entry pdo_connect_pool_methods[] = {
PHP_ME(pdo_connect_pool, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
PHP_ME(pdo_connect_pool, __destruct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR)
PHP_ME(pdo_connect_pool, __call, __call_args, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool, release, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool, close, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool, setAsync, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool, done, NULL, ZEND_ACC_PUBLIC)
PHP_FE_END
};
ZEND_BEGIN_ARG_INFO_EX(arginfo_statement_void, 0, 0, 0)
ZEND_END_ARG_INFO()
const zend_function_entry pdo_connect_pool_PDOStatement_methods[] = {
PHP_ME(pdo_connect_pool_PDOStatement, __call, __call_args, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, setAsync, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, release, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, done, NULL, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, rewind, arginfo_statement_void, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, next, arginfo_statement_void, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, current, arginfo_statement_void, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, key, arginfo_statement_void, ZEND_ACC_PUBLIC)
PHP_ME(pdo_connect_pool_PDOStatement, valid, arginfo_statement_void, ZEND_ACC_PUBLIC)
PHP_FE_END
};
PHP_METHOD(pdo_connect_pool_PDOStatement, rewind)
{
zval *object = getThis();
zval *method_ptr, method, *ret_value = NULL;
method_ptr = &method;
zend_class_entry *ce;
ce = Z_OBJCE_P(object);
CP_ZVAL_STRING(method_ptr, "fetchAll", 0);
if (cp_call_user_function_ex(EG(function_table), &object, method_ptr, &ret_value, 0, NULL, 0, NULL TSRMLS_CC) == FAILURE)
{
return;
}
zend_update_property_long(ce, object, "pos", sizeof ("pos") - 1, 0 TSRMLS_CC);
zend_update_property(ce, object, "rs", sizeof ("rs") - 1, ret_value TSRMLS_CC);
cp_zval_ptr_dtor(&ret_value);
}
PHP_METHOD(pdo_connect_pool_PDOStatement, current)
{
zval *pos, *rs, *row = NULL;
zend_class_entry *ce;
ce = Z_OBJCE_P(getThis());
pos = cp_zend_read_property(ce, getThis(), "pos", sizeof ("pos") - 1, 0 TSRMLS_DC);
rs = cp_zend_read_property(ce, getThis(), "rs", sizeof ("rs") - 1, 0 TSRMLS_DC);
cp_zend_hash_index_find(Z_ARRVAL_P(rs), Z_LVAL_P(pos), (void**) &row);
RETVAL_ZVAL(row, 1, 1);
}
PHP_METHOD(pdo_connect_pool_PDOStatement, key)
{
zval *pos;
zend_class_entry *ce;
ce = Z_OBJCE_P(getThis());
pos = cp_zend_read_property(ce, getThis(), "pos", sizeof ("pos") - 1, 0 TSRMLS_DC);
ZVAL_LONG(return_value, Z_LVAL_P(pos));
}
PHP_METHOD(pdo_connect_pool_PDOStatement, next)
{
zval *pos;
zend_class_entry *ce;
ce = Z_OBJCE_P(getThis());
pos = cp_zend_read_property(ce, getThis(), "pos", sizeof ("pos") - 1, 0 TSRMLS_DC);
zend_update_property_long(ce, getThis(), "pos", sizeof ("pos") - 1, ++Z_LVAL_P(pos) TSRMLS_CC);
}
PHP_METHOD(pdo_connect_pool_PDOStatement, valid)
{
zval *pos, *rs, *row = NULL;
zend_class_entry *ce;
ce = Z_OBJCE_P(getThis());
pos = cp_zend_read_property(ce, getThis(), "pos", sizeof ("pos") - 1, 0 TSRMLS_DC);
rs = cp_zend_read_property(ce, getThis(), "rs", sizeof ("rs") - 1, 0 TSRMLS_DC);
if (cp_zend_hash_index_find(Z_ARRVAL_P(rs), Z_LVAL_P(pos), (void**) &row) == SUCCESS)
{
RETURN_BOOL(1);
}
else
{
RETURN_BOOL(0);
}
}
const zend_function_entry redis_connect_pool_methods[] = {
PHP_ME(redis_connect_pool, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
PHP_ME(redis_connect_pool, __destruct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_DTOR)
PHP_ME(redis_connect_pool, __call, __call_args, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, release, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, select, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, connect, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, auth, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, done, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, setAsync, NULL, ZEND_ACC_PUBLIC)
PHP_ME(redis_connect_pool, close, NULL, ZEND_ACC_PUBLIC)
PHP_MALIAS(redis_connect_pool, pconnect, connect, NULL, ZEND_ACC_PUBLIC) /* pconnect 别名指向connect */
PHP_FE_END
};
int le_cp_server;
int le_cli_connect_pool;
zend_class_entry pdo_connect_pool_ce;
zend_class_entry *pdo_connect_pool_class_entry_ptr;
zend_class_entry redis_connect_pool_ce;
zend_class_entry *redis_connect_pool_class_entry_ptr;
zend_class_entry pdo_connect_pool_PDOStatement_ce;
zend_class_entry *pdo_connect_pool_PDOStatement_class_entry_ptr;
zend_module_entry connect_pool_module_entry = {
#if ZEND_MODULE_API_NO >= 20050922
STANDARD_MODULE_HEADER_EX,
NULL,
NULL,
#else
STANDARD_MODULE_HEADER,
#endif
"connect_pool",
cp_functions,
PHP_MINIT(connect_pool),
PHP_MSHUTDOWN(connect_pool),
PHP_RINIT(connect_pool), //RINIT
PHP_RSHUTDOWN(connect_pool), //RSHUTDOWN
PHP_MINFO(connect_pool),
CP_VERSION,
STANDARD_MODULE_PROPERTIES
};
#ifdef COMPILE_DL_CONNECT_POOL
ZEND_GET_MODULE(connect_pool)
#endif
PHP_MINIT_FUNCTION(connect_pool)
{
le_cli_connect_pool = zend_register_list_destructors_ex(send_oob2proxy, cp_destory_client, CP_RES_CLIENT_NAME, module_number); //持久
INIT_CLASS_ENTRY(pdo_connect_pool_ce, "pdoProxy", pdo_connect_pool_methods);
pdo_connect_pool_class_entry_ptr = zend_register_internal_class(&pdo_connect_pool_ce TSRMLS_CC);
zend_register_class_alias("pdo_connect_pool", pdo_connect_pool_class_entry_ptr);
INIT_CLASS_ENTRY(redis_connect_pool_ce, "redisProxy", redis_connect_pool_methods);
redis_connect_pool_class_entry_ptr = zend_register_internal_class(&redis_connect_pool_ce TSRMLS_CC);
zend_register_class_alias("redis_connect_pool", pdo_connect_pool_class_entry_ptr);
INIT_CLASS_ENTRY(pdo_connect_pool_PDOStatement_ce, "pdo_connect_pool_PDOStatement", pdo_connect_pool_PDOStatement_methods);
//zend_class_entry *pdo_dbstmt_ce = cp_zend_fetch_class("PDOStatement", ZEND_FETCH_CLASS_AUTO);
pdo_connect_pool_PDOStatement_class_entry_ptr = zend_register_internal_class(&pdo_connect_pool_PDOStatement_ce TSRMLS_CC);
zend_class_implements(pdo_connect_pool_PDOStatement_class_entry_ptr TSRMLS_CC, 1, spl_ce_Iterator, spl_ce_Countable);
REGISTER_LONG_CONSTANT("CP_DEFAULT_PDO_PORT", CP_PORT_PDO, CONST_CS | CONST_PERSISTENT);
REGISTER_LONG_CONSTANT("CP_DEFAULT_REDIS_PORT", CP_PORT_REDIS, CONST_CS | CONST_PERSISTENT);
bzero(&ConProxyG, sizeof (ConProxyG));
bzero(&ConProxyWG, sizeof (ConProxyWG));
return SUCCESS;
}
/* }}} */
/* {{{ PHP_MSHUTDOWN_FUNCTION
*/
PHP_MSHUTDOWN_FUNCTION(connect_pool)
{
if (pdo_stmt)
{
cp_zval_ptr_dtor(&pdo_stmt);
}
return SUCCESS;
}
/* }}} */
/* {{{ PHP_MINFO_FUNCTION
*/
PHP_MINFO_FUNCTION(connect_pool)
{
php_info_print_table_start();
php_info_print_table_header(2, "connect_pool support", "enabled");
php_info_print_table_row(2, "Version", CP_VERSION);
php_info_print_table_row(2, "Author", "郭新华,张磊");
php_info_print_table_row(2, "email", "woshiguo35@sina.com");
php_info_print_table_end();
}
/* }}} */
PHP_RINIT_FUNCTION(connect_pool)
{
return SUCCESS;
}
PHP_RSHUTDOWN_FUNCTION(connect_pool)
{
return SUCCESS;
}
static void cp_destory_client(zend_resource *rsrc TSRMLS_DC)
{
cpClient *cli = (cpClient *) rsrc->ptr;
if (cli->sock > 0)
{
cpClient_close(cli);
}
}
void send_oob2proxy(zend_resource *rsrc TSRMLS_DC)
{
cpClient *cli = (cpClient *) rsrc->ptr;
if (cli->sock == 0)
{
pefree(cli, 1);
}
else if (cli->server_fd != 0)
{//防止release后rshutdown的重复释放
cpConnection *conn = CONN(cli);
if (conn->release == CP_FD_NRELEASED)
{
cpGroup* G = &CPGS->G[conn->group_id];
if (cli->lock(G) == 0)
{
conn->release = CP_FD_RELEASED;
if (G->first_wait_id && conn->worker_index <= G->worker_max)
{//wait is not null&&use queue&&use reload to reduce max maybe trigger this
int wait_pid = cpPopWaitQueue(G, conn);
cli->unLock(G);
if (kill(wait_pid, SIGRTMIN) < 0)
{
php_printf("send sig 2 %d error. Error: %s [%d]", wait_pid, strerror(errno), errno);
// return send_oob2proxy(rsrc);
}
}
else
{
CPGS->G[conn->group_id].workers_status[conn->worker_index] = CP_WORKER_IDLE;
cli->unLock(G);
}
}
log_end(cli);
}
}
}
PHP_FUNCTION(pool_server_create)
{
zval *conf = NULL;
char *config_file = NULL;
//zend_string *config_file = NULL;
zend_size_t file_len = 0;
if (strcasecmp("cli", sapi_module.name) != 0)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "pool_server must run at php_cli environment.");
RETURN_FALSE;
}
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &config_file, &file_len) == FAILURE)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "config file error");
RETURN_FALSE;
}
conf = cpGetConfig(config_file);
int sock = cpServer_init(conf, config_file);
if (sock <= 0)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "pool_server: start server fail. Error: %s [%d]", strerror(errno), errno);
}
int ret = cpServer_create();
if (ret < 0)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "pool_server: create server fail. Error: %s [%d]", strerror(errno), errno);
}
ret = cpServer_start(sock);
if (ret < 0)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "pool_server: start server fail. Error: %s [%d]", strerror(errno), errno);
}
cp_zval_ptr_dtor(&conf);
}
PHP_FUNCTION(pool_server_reload)
{
long pid;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &pid) == FAILURE)
{
return;
}
if (kill(pid, SIGUSR1) < 0)
{
php_printf("reload fail. kill -SIGUSR1 master_pid[%d] fail. Error: %s[%d]\n", (int) pid, strerror(errno), errno);
RETURN_FALSE;
}
else
{
RETURN_TRUE;
}
}
PHP_FUNCTION(pool_server_version)
{
CP_RETURN_STRING(CP_VERSION, 1);
}
PHP_FUNCTION(pool_server_shutdown)
{
long pid;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &pid) == FAILURE)
{
return;
}
if (kill(pid, SIGTERM) < 0)
{
php_printf("shutdown fail. kill -SIGTERM master_pid[%d] fail. Error: %s[%d]\n", (int) pid, strerror(errno), errno);
RETURN_FALSE;
}
else
{
RETURN_TRUE;
}
}
int CP_INTERNAL_SERIALIZE_SEND_MEM(zval *send_data, uint8_t __type)
{
cpShareMemory *sm_obj = &(CPGS->G[CPWG.gid].workers[CPWG.id].sm_obj);
int real_len = 0;
#if PHP_MAJOR_VERSION < 7
instead_smart dest;
dest.len = 0;
dest.addr = sm_obj->mem;
dest.max = CPGC.max_read_len;
dest.exceed = 0;
php_msgpack_serialize(&dest, send_data);
real_len = dest.len;
if (dest.exceed == 1)
{
CP_INTERNAL_ERROR_SEND("data is exceed,increase max_read_len");
return SUCCESS;
}
#else
zend_string * zstr = php_swoole_serialize(send_data);
if (zstr->len >= CPGS->max_buffer_len)
{
CP_INTERNAL_ERROR_SEND("data is exceed,increase max_read_len");
return SUCCESS;
}
real_len = zstr->len;
memcpy(sm_obj->mem, zstr->val, zstr->len);
zend_string_release(zstr);
#endif
cpWorkerInfo worker_event;
worker_event.len = real_len;
worker_event.type = __type;
worker_event.pid = CPWG.event.pid;
int ret = write(CPWG.pipe_fd_write, &worker_event, sizeof (worker_event));
if (ret == -1)
{
php_error_docref(NULL TSRMLS_CC, E_ERROR, "write error Error: %s [%d]", strerror(errno), errno);
}
return SUCCESS;
}
int pdo_proxy_connect(zval *args, int flag)
{
zval *data_source, **tmp_pass[4], *new_obj, *username, *password, *options, * ret_pdo_obj, *null_arr = NULL;
zval pdo_name, con_fun_name;
zend_class_entry *pdo_ce;
if (pdo_object)
{
pdo_proxy_pdo(args);
return 1;
}
#if PHP_MAJOR_VERSION < 7
CP_MAKE_STD_ZVAL(new_obj);
#else
new_obj = ecalloc(sizeof (zval), 1);
#endif
CP_ZVAL_STRING(&pdo_name, "pdo", 0);
if (cp_zend_hash_find_ptr(EG(class_table), &pdo_name, (void **) &pdo_ce) == FAILURE)
{
CP_INTERNAL_ERROR_SEND_RETURN("pdo extension is not install");
}
object_init_ex(new_obj, pdo_ce);
cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("data_source"), (void **) &data_source);
tmp_pass[0] = &data_source;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("username"), (void **) &username) == SUCCESS)
{
tmp_pass[1] = &username;
}
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("password"), (void **) &password) == SUCCESS)
{
tmp_pass[2] = &password;
}
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("options"), (void **) &options) == SUCCESS)
{
tmp_pass[3] = &options;
}
else
{
CP_MAKE_STD_ZVAL(null_arr);
array_init(null_arr);
tmp_pass[3] = &null_arr;
}
CP_ZVAL_STRING(&con_fun_name, "__construct", 0);
cp_call_user_function_ex(NULL, &new_obj, &con_fun_name, &ret_pdo_obj, 4, tmp_pass, 0, NULL TSRMLS_CC);
#if PHP_MAJOR_VERSION ==7
zval_ptr_dtor(&con_fun_name);
#endif
if (null_arr)
cp_zval_ptr_dtor(&null_arr);
if (ret_pdo_obj)
cp_zval_ptr_dtor(&ret_pdo_obj);
if (EG(exception))
{
CP_DEL_OBJ(new_obj);
CP_SEND_EXCEPTION_RETURN;
}
else
{
pdo_object = new_obj;
if (flag == CP_CONNECT_NORMAL)
{
pdo_proxy_pdo(args);
return 1;
}
}
return CP_FALSE;
}
static void pdo_proxy_pdo(zval * args)
{
zval *str = NULL, *method = NULL, * ret_value = NULL;
if (pdo_object)
{
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("method"), (void **) &method) == FAILURE)
{
CP_INTERNAL_ERROR_SEND("PDO no method!");
}
#if PHP_MAJOR_VERSION ==7
ret_value = (zval *) emalloc(sizeof (zval));
#endif
if (cp_internal_call_user_function(pdo_object, method, &ret_value, args) == FAILURE)
{
CP_DEL_OBJ(pdo_object);
char cp_error_str[FAILUREOR_MSG_SIZE] = {0};
snprintf(cp_error_str, FAILUREOR_MSG_SIZE, "call pdo method( %s ) error!", Z_STRVAL_P(method));
CP_INTERNAL_ERROR_SEND(cp_error_str);
}
else
{
if (EG(exception))
{
CP_EXCEPTION_ARGS(&str);
char *p = strcasestr(Z_STRVAL_P(str), "server has gone away");
char *p2 = strcasestr(Z_STRVAL_P(str), "There is already an active transaction");
if (p || p2)
{//del reconnect and retry
cpLog("del and retry %s,%s", p, p2);
CP_DEL_OBJ(pdo_object);
pdo_proxy_connect(args, CP_CONNECT_NORMAL);
}
else
{
CP_INTERNAL_SERIALIZE_SEND_MEM(str, CP_SIGEVENT_EXCEPTION);
}
cp_zval_ptr_dtor(&str);
if (ret_value)
{
cp_zval_ptr_dtor(&ret_value);
#if PHP_MAJOR_VERSION == 7
efree(ret_value);
#endif
}
}
else
{
if (Z_TYPE_P(ret_value) == IS_OBJECT)
{
#if PHP_MAJOR_VERSION < 7
char *name;
zend_uint name_len;
zend_get_object_classname(ret_value, (const char **) &name, &name_len TSRMLS_CC);
if (strcmp(name, "PDOStatement") == 0)
{
if (pdo_stmt)
{
zval_ptr_dtor(&pdo_stmt);
pdo_stmt = NULL;
}
pdo_stmt = ret_value;
zval send_zval = {0};
ZVAL_STRING(&send_zval, "PDOStatement!", 0);
CP_INTERNAL_SERIALIZE_SEND_MEM(&send_zval, CP_SIGEVENT_PDO);
}
efree(name);
#else
zend_string *name = Z_OBJ_HANDLER_P(ret_value, get_class_name)(Z_OBJ_P(ret_value));
if (strcmp(name->val, "PDOStatement") == 0)
{
if (pdo_stmt)
{
zval_dtor(pdo_stmt);
efree(pdo_stmt);
pdo_stmt = NULL;
}
pdo_stmt = ret_value;
zval send_zval = {0};
CP_ZVAL_STRING(&send_zval, "PDOStatement!", 0);
CP_INTERNAL_SERIALIZE_SEND_MEM(&send_zval, CP_SIGEVENT_PDO);
zval_ptr_dtor(&send_zval);
}
zend_string_release(name);
#endif
}
else
{//pdo
CP_INTERNAL_SERIALIZE_SEND_MEM(ret_value, CP_SIGEVENT_TURE);
if (ret_value)
{
cp_zval_ptr_dtor(&ret_value);
#if PHP_MAJOR_VERSION == 7
efree(ret_value);
#endif
}
}
}
}
}
else
{
CP_INTERNAL_ERROR_SEND("no connect to mysql");
}
}
static void pdo_proxy_stmt(zval * args)
{
zval *method = NULL, * ret_value = NULL;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("method"), (void **) &method) == FAILURE)
{
CP_INTERNAL_ERROR_SEND("PDO no method!");
}
if (cp_internal_call_user_function(pdo_stmt, method, &ret_value, args) == FAILURE)
{
char cp_error_str[FAILUREOR_MSG_SIZE] = {0};
snprintf(cp_error_str, FAILUREOR_MSG_SIZE, "call pdo stmt method (%s) error!", Z_STRVAL_P(method));
CP_INTERNAL_ERROR_SEND(cp_error_str);
}
else
{
if (EG(exception))
{
zval *str;
CP_SEND_EXCEPTION_ARGS(&str);
char *p = strcasestr(Z_STRVAL_P(str), "server has gone away");
char *p2 = strcasestr(Z_STRVAL_P(str), "There is already an active transaction");
if (p || p2)
{
CP_DEL_OBJ(pdo_object);
}
cp_zval_ptr_dtor(&str);
cp_zval_ptr_dtor(&pdo_stmt);
pdo_stmt = NULL;
return; //when the exception,the ret_value dont need dtor
}
if (!ret_value)
{
CP_INTERNAL_ERROR_SEND("call pdo stmt method error ret_value is null!");
return;
}
if (Z_TYPE_P(ret_value) == IS_OBJECT)
{
CP_INTERNAL_SERIALIZE_SEND_MEM(ret_value, CP_SIGEVENT_STMT_OBJ);
}
else
{
CP_INTERNAL_SERIALIZE_SEND_MEM(ret_value, CP_SIGEVENT_TURE);
}
}
if (ret_value)
{
cp_zval_ptr_dtor(&ret_value);
}
}
static void pdo_dispatch(zval * args)
{
zval *m_type;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("method_type"), (void **) &m_type) == SUCCESS)
{
if (strcmp(Z_STRVAL_P(m_type), "connect") == 0)
{
pdo_proxy_connect(args, CP_CONNECT_NORMAL);
}
else if (strcmp(Z_STRVAL_P(m_type), "PDOStatement") == 0)
{
pdo_proxy_stmt(args);
}
// else
// {//not use now
// pdo_proxy_pdo(args);
// }
}
else
{//操作pdo
CP_INTERNAL_ERROR_SEND("PDO method_type is none!");
}
}
static int cp_redis_select(zval *new_obj, zval **db)
{
//有db并且不0那么就select
if (strcmp("0", Z_STRVAL_PP(db)) != 0)
{
zval **tmp_pass[1];
tmp_pass[0] = db;
zval * ret_redis_select = NULL;
zval select_fun_name;
CP_ZVAL_STRING(&select_fun_name, "select", 0);
cp_call_user_function_ex(NULL, &new_obj, &select_fun_name, &ret_redis_select, 1, tmp_pass, 0, NULL TSRMLS_CC);
if (ret_redis_select)
cp_zval_ptr_dtor(&ret_redis_select);
if (EG(exception))
{
cp_zval_ptr_dtor(&new_obj);
CP_SEND_EXCEPTION_RETURN;
}
}
return CP_TRUE;
}
static int cp_redis_auth(zval *new_obj, zval *args)
{
zval *auth, *ret_redis_auth = NULL;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("auth"), (void **) &auth) == SUCCESS)
{
zval auth_fun_name;
CP_ZVAL_STRING(&auth_fun_name, "auth", 0);
zval **tmp_pass[1];
tmp_pass[0] = &auth;
cp_call_user_function_ex(NULL, &new_obj, &auth_fun_name, &ret_redis_auth, 1, tmp_pass, 0, NULL TSRMLS_CC);
if (ret_redis_auth)
{
cp_zval_ptr_dtor(&ret_redis_auth);
}
if (EG(exception))
{
cp_zval_ptr_dtor(&new_obj);
CP_SEND_EXCEPTION_RETURN;
}
}
return CP_TRUE;
}
int redis_proxy_connect(zval *args, int flag)
{
zval *ex_arr, *data_source, zdelim, *ip, *port, *db, *timeout, * ret_redis_obj = NULL, *new_obj, **tmp_pass[3];
cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("data_source"), (void **) &data_source);
CP_MAKE_STD_ZVAL(ex_arr);
array_init(ex_arr);
CP_ZVAL_STRINGL(&zdelim, ":", 1, 0);
cp_explode(&zdelim, data_source, ex_arr, LONG_MAX);
#if PHP_MAJOR_VERSION < 7
CP_MAKE_STD_ZVAL(new_obj);
#else
new_obj = ecalloc(sizeof (zval), 1);
#endif
zend_class_entry *redis_ce = NULL;
zval redis_name;
CP_ZVAL_STRING(&redis_name, "redis", 0);
if (cp_zend_hash_find_ptr(EG(class_table), &redis_name, (void **) &redis_ce) == FAILURE)
{
CP_INTERNAL_ERROR_SEND_RETURN("redis extension is not install");
}
object_init_ex(new_obj, redis_ce);
if (cp_zend_hash_index_find(Z_ARRVAL_P(ex_arr), 0, (void**) &ip) == SUCCESS)
{
tmp_pass[0] = &ip;
}
if (cp_zend_hash_index_find(Z_ARRVAL_P(ex_arr), 1, (void**) &port) == SUCCESS)
{
tmp_pass[1] = &port;
}
CP_MAKE_STD_ZVAL(timeout);
CP_ZVAL_STRING(timeout, "10", 0);
tmp_pass[2] = &timeout;
zval pcon_fun_name;
CP_ZVAL_STRING(&pcon_fun_name, "connect", 0);
cp_call_user_function_ex(NULL, &new_obj, &pcon_fun_name, &ret_redis_obj, 3, tmp_pass, 0, NULL TSRMLS_CC);
if (ret_redis_obj)
{
if (Z_BVAL_P(ret_redis_obj) == FALSE)
{
cp_zval_ptr_dtor(&ex_arr);
cp_zval_ptr_dtor(&ret_redis_obj);
CP_INTERNAL_ERROR_SEND_RETURN("connect redis error!");
}
else
{
cp_zval_ptr_dtor(&ret_redis_obj);
}
}
if (EG(exception))
{
CP_DEL_OBJ(new_obj);
cp_zval_ptr_dtor(&ex_arr);
CP_SEND_EXCEPTION_RETURN;
}
if (!cp_redis_auth(new_obj, args))
{
return CP_FALSE;
}
if (cp_zend_hash_index_find(Z_ARRVAL_P(ex_arr), 2, (void**) &db) == SUCCESS)
{
if (!cp_redis_select(new_obj, &db))
{
cp_zval_ptr_dtor(&ex_arr);
return CP_FALSE;
}
}
cp_zval_ptr_dtor(&ex_arr);
//存起來
redis_object = new_obj;
zval *method;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("method"), (void **) &method) == FAILURE)
{
CP_INTERNAL_ERROR_SEND_RETURN("redis no method!");
}
if (strcmp(Z_STRVAL_P(method), "select") == 0)
{
CP_INTERNAL_NORMAL_SEND_RETURN("CON_SUCCESS!");
}
else
{
zval * ret_value = NULL;
if (cp_internal_call_user_function(new_obj, method, &ret_value, args) == SUCCESS)
{
if (!EG(exception))
{
CP_INTERNAL_SERIALIZE_SEND_MEM(ret_value, CP_SIGEVENT_TURE);
}
else
{
CP_DEL_OBJ(redis_object);
CP_SEND_EXCEPTION;
}
}
else
{
CP_INTERNAL_ERROR_SEND("call redis method error!");
}
if (ret_value)
cp_zval_ptr_dtor(&ret_value);
return CP_TRUE; //no use
}
}
static void redis_dispatch(zval * args)
{
zval *data_source;
if (redis_object)
{
zval *method;
if (cp_zend_hash_find(Z_ARRVAL_P(args), ZEND_STRS("method"), (void **) &method) == FAILURE)
{
CP_INTERNAL_ERROR_SEND("redis no method error!");
return;
}
else if (strcmp(Z_STRVAL_P(method), "select") == 0)
{
zval select_return = {0};
ZVAL_BOOL(&select_return, 1);
CP_INTERNAL_SERIALIZE_SEND_MEM(&select_return, CP_SIGEVENT_TURE);
return;
}
zval * ret_value = NULL;
if (cp_internal_call_user_function(redis_object, method, &ret_value, args) == FAILURE)
{
CP_INTERNAL_ERROR_SEND("call redis method error!");
}
else
{
if (EG(exception))
{
zval *str;
CP_SEND_EXCEPTION_ARGS(&str);
// char *p = strstr(Z_STRVAL_P(str), "server went away");
// char *p2 = strstr(Z_STRVAL_P(str), "Connection lost");
// char *p3 = strstr(Z_STRVAL_P(str), "read error on connection");
// char *p4 = strstr(Z_STRVAL_P(str), "Connection closed");
// if (p || p2 || p3 || p4)
// {
CP_DEL_OBJ(redis_object);
// }
cp_zval_ptr_dtor(&str);
}
else
{
CP_INTERNAL_SERIALIZE_SEND_MEM(ret_value, CP_SIGEVENT_TURE);
}
}
if (ret_value)
cp_zval_ptr_dtor(&ret_value);
}
else
{
redis_proxy_connect(args, CP_CONNECT_NORMAL);
}
}
int worker_onReceive(zval * user_value)
{
zval *type;
if (cp_zend_hash_find(Z_ARRVAL_P(user_value), ZEND_STRS("type"), (void **) &type) == SUCCESS)
{
if (strcmp(Z_STRVAL_P(type), "pdo") == 0)
{
pdo_dispatch(user_value);
}
else if (strcmp(Z_STRVAL_P(type), "redis") == 0)
{
redis_dispatch(user_value);
}
else
{
cpLog("wrong type");
}
}
else
{
cpLog("args error no type!");
}
cp_zval_ptr_dtor(&user_value);
return CP_TRUE;
}
static void cp_add_fail_into_mem(zval *o_arg, zval * data_source)
{
zval *args;
CP_MAKE_STD_ZVAL(args);
*args = *o_arg;
zval_copy_ctor(args);
if (!CPGL.ping_mem_addr)
{
CPGL.ping_mem_addr = CPGS->ping_workers->sm_obj.mem;
}
zval *arr = CP_PING_GET_PRO(CPGL.ping_mem_addr);
if (Z_TYPE_P(arr) == IS_NULL)
{
zval first_arr;
array_init(&first_arr);
add_assoc_long(args, "count", 1);
add_assoc_zval(&first_arr, Z_STRVAL_P(data_source), args);
cp_ser_and_setpro(&first_arr);
zval_dtor(&first_arr);
}
else if (Z_TYPE_P(arr) != IS_TRUE)
{
zval **zval_source;
if (cp_zend_hash_find(Z_ARRVAL_P(arr), Z_STRVAL_P(data_source), Z_STRLEN_P(data_source) + 1, (void **) &zval_source) == SUCCESS)
{//++
zval **zval_probably_count;
if (cp_zend_hash_find(Z_ARRVAL_PP(zval_source), ZEND_STRS("count"), (void **) &zval_probably_count) == SUCCESS)
{
int num = (int) Z_LVAL_PP(zval_probably_count);
add_assoc_long(args, "count", ++num);
cp_zend_hash_del(Z_ARRVAL_P(arr), Z_STRVAL_P(data_source), Z_STRLEN_P(data_source));
add_assoc_zval(arr, Z_STRVAL_P(data_source), args);
cp_ser_and_setpro(arr);
}
}
else
{//add
add_assoc_long(args, "count", 1);
add_assoc_zval(arr, Z_STRVAL_P(data_source), args);
cp_ser_and_setpro(arr);
}
}
cp_zval_ptr_dtor(&arr);
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/Gxhua/php-cp.git
git@gitee.com:Gxhua/php-cp.git
Gxhua
php-cp
php-cp
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891