diff --git a/contrib/dolphin/expected/multi_select_in_proc.out b/contrib/dolphin/expected/multi_select_in_proc.out new file mode 100644 index 0000000000000000000000000000000000000000..60ff1d54dd0c14b3961acc5648aad665fa217b61 --- /dev/null +++ b/contrib/dolphin/expected/multi_select_in_proc.out @@ -0,0 +1,311 @@ +create schema multi_select_proc; +set current_schema to 'multi_select_proc'; +--open parameter sql_mode +set dolphin.sql_mode = 'block_return_multi_results'; +create table test_1(a int,b int); +create table t (a int); +insert into test_1 values(1,2),(3,4); +insert into t values(123),(789); +--one select +CREATE PROCEDURE proc_a_1 () as +begin + select * from t; +end; +/ +call proc_a_1(); + a +----- + 123 + 789 +(2 rows) + +-- two select +CREATE PROCEDURE proc_a_2 () as +begin + select * from t; + select * from test_1; +end; +/ +call proc_a_2(); + a +----- + 123 + 789 +(2 rows) + + a | b +---+--- + 1 | 2 + 3 | 4 +(2 rows) + +-- with input params +CREATE PROCEDURE proc_a_3 (aa int) as +begin + select aa from t; + select * from test_1; +end; +/ +call proc_a_3(1); + aa +---- + 1 + 1 +(2 rows) + + a | b +---+--- + 1 | 2 + 3 | 4 +(2 rows) + +-- with while and out param +CREATE PROCEDURE proc_b_1 (aa int, out re1 int,out re2 int) as +declare i int default 1; +begin + re1 = aa +100; + re2 = aa + 1000; + while i<=2 do + i := i+1; + select aa + 1,a from test_1; + end while; + select * from t; +end; +/ +--user var +call proc_b_1(1,@a,@b); +ERROR: out args must be uservar type. +set enable_set_variable_b_format = 1; +call proc_b_1(1,@a,@b); + ?column? | a +----------+--- + 2 | 1 + 2 | 3 +(2 rows) + + ?column? | a +----------+--- + 2 | 1 + 2 | 3 +(2 rows) + + a +----- + 123 + 789 +(2 rows) + +select @a; + @a +----- + 101 +(1 row) + +select @b; + @b +------ + 1001 +(1 row) + +CREATE PROCEDURE proc_b_2 (aa int, out re1 int,out re2 int) as +declare i int default 1; +begin + re1 = aa +100; + re2 = aa + 1000; + while i<=2 do + i := i+1; + insert into test_1 values(6,7); + end while; + select * from test_1; + +end; +/ +call proc_b_2 (102,@c,@d); + a | b +---+--- + 1 | 2 + 3 | 4 + 6 | 7 + 6 | 7 +(4 rows) + +--check params +select @c; + @c +----- + 202 +(1 row) + +select @d; + @d +------ + 1102 +(1 row) + +set enable_set_variable_b_format = 0; +--half wrong +create table tab_1145173(id int,pid int,a1 char(8)); +create table a_1145173(id int,a1 char(8)); +create table b_1145173(id int,a1 char(8)); +--insert; +insert into tab_1145173 values(1,2,'s'),(2,3,'b'),(3,4,'c'),(4,5,'d'); +insert into a_1145173 values(1,'s'),(2,'b'); +insert into b_1145173 values(2,'s'),(3,'b'); +create or replace procedure pro_1145173() +as +begin +select * from a_1145173 union select * from b_1145173 order by id; +select * from tab_1145173; +select tt_114; +end; +/ +call pro_1145173(); + id | a1 +----+---- + 1 | s + 2 | s + 2 | b + 3 | b +(4 rows) + + id | pid | a1 +----+-----+---- + 1 | 2 | s + 2 | 3 | b + 3 | 4 | c + 4 | 5 | d +(4 rows) + +ERROR: column "tt_114" does not exist +LINE 1: select tt_114 + ^ +QUERY: select tt_114 +CONTEXT: referenced column: tt_114 +PL/pgSQL function pro_1145173() line 4 at SQL statement +create table tab_1144052(id int,pid int,a1 char(8)); +--insert; +insert into tab_1144052 values(1,2,'s'),(2,3,'b'),(3,4,'c'),(4,5,'d'); +--proc; +create or replace procedure pro_1144052() +as +begin +with temp_1144052(a1,a2) as (select id,a1 from tab_1144052 where id > 1) select * from temp_1144052; +select * from tab_1144052 start with pid = 4 connect by prior id = pid order by a1; +select avg(id),a1 from tab_1144052 group by a1 having avg(id) > 1; +end; +/ +--func; +create or replace function fun_1144052()return int +as +b int; +begin +select count(*) into b from tab_1144052; +return b; +end; +/ +call pro_1144052(); + a1 | a2 +----+---- + 2 | b + 3 | c + 4 | d +(3 rows) + + id | pid | a1 +----+-----+---- + 2 | 3 | b + 3 | 4 | c + 1 | 2 | s +(3 rows) + + avg | a1 +--------------------+---- + 2.0000000000000000 | b + 3.0000000000000000 | c + 4.0000000000000000 | d +(3 rows) + +call fun_1144052(); +ERROR: Only support procedure in muiti result call statement +select fun_1144052(); + fun_1144052 +------------- + 4 +(1 row) + +select pro_1144052(); +ERROR: query has no destination for result data +HINT: If you want to discard the results of a SELECT, use PERFORM instead. +CONTEXT: PL/pgSQL function pro_1144052() line 2 at SQL statement +referenced column: pro_1144052 +-- mysql format +set dolphin.sql_mode = 'sql_mode_strict,sql_mode_full_group,pipes_as_concat,ansi_quotes,no_zero_date,pad_char_to_full_length,block_return_multi_results'; +delimiter // +CREATE PROCEDURE proc_a_m () +begin + select * from t; +end +// +delimiter ; +call proc_a_m(); + a +----- + 123 + 789 +(2 rows) + +create table testtyp (a int8, b varchar ,c date,d bytea); +insert into testtyp values(123,'abv','2020-01-01','a'); +insert into testtyp values(1123,'abcv','2022-01-01','c'); +insert into testtyp values(NULL,NULL,NULL,NULL); +delimiter // +CREATE PROCEDURE proc_a_m1 () +begin + select a,b,c,d from testtyp; + select a,c,d,b from testtyp; +end +// +delimiter ; +call proc_a_m1(); + a | b | c | d +------+------+------------+------ + 123 | abv | 01-01-2020 | \x61 + 1123 | abcv | 01-01-2022 | \x63 + | | | +(3 rows) + + a | c | d | b +------+------------+------+------ + 123 | 01-01-2020 | \x61 | abv + 1123 | 01-01-2022 | \x63 | abcv + | | | +(3 rows) + +create or replace procedure pro_11451713() +as +begin + +end; +/ +call pro_11451713(); +drop schema multi_select_proc cascade; +NOTICE: drop cascades to 18 other objects +DETAIL: drop cascades to table test_1 +drop cascades to table t +drop cascades to function proc_a_1() +drop cascades to function proc_a_2() +drop cascades to function proc_a_3(integer) +drop cascades to function proc_b_1(integer) +drop cascades to function proc_b_2(integer) +drop cascades to table tab_1145173 +drop cascades to table a_1145173 +drop cascades to table b_1145173 +drop cascades to function pro_1145173() +drop cascades to table tab_1144052 +drop cascades to function pro_1144052() +drop cascades to function fun_1144052() +drop cascades to function proc_a_m() +drop cascades to table testtyp +drop cascades to function proc_a_m1() +drop cascades to function pro_11451713() +reset current_schema; diff --git a/contrib/dolphin/include/plugin_commands/defrem.h b/contrib/dolphin/include/plugin_commands/defrem.h index d0c947d06701924b934232acbf8b0ac1321be30b..76658e59a0ac665bad6c921a16199e625f1d1e40 100644 --- a/contrib/dolphin/include/plugin_commands/defrem.h +++ b/contrib/dolphin/include/plugin_commands/defrem.h @@ -69,6 +69,9 @@ extern void DropCastById(Oid castOid); extern ObjectAddress AlterFunctionNamespace(List* name, List* argtypes, bool isagg, const char* newschema); extern Oid AlterFunctionNamespace_oid(Oid procOid, Oid nspOid); extern void ExecuteDoStmt(DoStmt* stmt, bool atomic); +#ifdef DOLPHIN +extern void ExecuteCallStmt(DolphinCallStmt *stmt, ParamListInfo params, bool atomic); +#endif extern Oid get_cast_oid(Oid sourcetypeid, Oid targettypeid, bool missing_ok); extern void IsThereFunctionInNamespace(const char *proname, int pronargs, oidvector *proargtypes, Oid nspOid); diff --git a/contrib/dolphin/include/plugin_commands/mysqlmode.h b/contrib/dolphin/include/plugin_commands/mysqlmode.h index 1a42de7cef469c5734105b68dff1bdbe978b5aae..54caa54907be552ad017e9adec21891d75800c4a 100644 --- a/contrib/dolphin/include/plugin_commands/mysqlmode.h +++ b/contrib/dolphin/include/plugin_commands/mysqlmode.h @@ -11,7 +11,8 @@ #define OPT_SQL_MODE_ANSI_QUOTES (1 << 4) #define OPT_SQL_MODE_NO_ZERO_DATE (1 << 5) #define OPT_SQL_MODE_PAD_CHAR_TO_FULL_LENGTH (1 << 6) -#define OPT_SQL_MODE_MAX 7 +#define OPT_SQL_MODE_BLOCK_RETURN_MULTI_RESULTS (1 << 7) +#define OPT_SQL_MODE_MAX 8 #define SQL_MODE_STRICT() (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_STRICT) #define SQL_MODE_FULL_GROUP() (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_FULL_GROUP) #define PG_RETURN_INT8(x) return Int8GetDatum(x) @@ -19,6 +20,8 @@ #define SQL_MODE_ANSI_QUOTES() (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_ANSI_QUOTES) #define SQL_MODE_NO_ZERO_DATE() (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_NO_ZERO_DATE) #define SQL_MODE_PAD_CHAR_TO_FULL_LENGTH() (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_PAD_CHAR_TO_FULL_LENGTH) +#define SQL_MODE_AllOW_PROCEDURE_WITH_SELECT() \ + (GetSessionContext()->sqlModeFlags & OPT_SQL_MODE_BLOCK_RETURN_MULTI_RESULTS) extern int32 PgAtoiInternal(char* s, int size, int c, bool sqlModeStrict, bool can_ignore, bool isUnsigned = false); extern int8 PgStrtoint8Internal(const char* s, bool sqlModeStrict, bool can_ignore); diff --git a/contrib/dolphin/include/plugin_nodes/parsenodes.h b/contrib/dolphin/include/plugin_nodes/parsenodes.h index b681de4882f2beb438b8cb442a1aec75b9c5bc06..a92745745ba08f9affdf6e4a8415e8dd294ad647 100755 --- a/contrib/dolphin/include/plugin_nodes/parsenodes.h +++ b/contrib/dolphin/include/plugin_nodes/parsenodes.h @@ -2375,5 +2375,17 @@ typedef struct GetDiagStmt { bool hasCondNum; List *condNum; } GetDiagStmt; + +/* ---------------------- + * DolphinCall Type Statement, call procedure + * ---------------------- + */ +typedef struct DolphinCallStmt { + NodeTag type; + FuncCall *funccall; /* procedure */ + FuncExpr *funcexpr; /* transformCallstmt deal, only input args */ + List *outargs; /* output args only be UserVar */ +} DolphinCallStmt; + #endif /* PARSENODES_H */ diff --git a/contrib/dolphin/include/plugin_postgres.h b/contrib/dolphin/include/plugin_postgres.h index 3020be02e97ab097a762e507fe2427e0ce6e5c61..171102a86c60efd525941406781d89f66f4c3824 100644 --- a/contrib/dolphin/include/plugin_postgres.h +++ b/contrib/dolphin/include/plugin_postgres.h @@ -162,6 +162,7 @@ typedef struct BSqlPluginContext { struct HTAB* b_sendBlobHash; char* useless_sql_mode; int useless_lower_case_table_names; + bool is_dolphin_call_stmt; #endif } bSqlPluginContext; diff --git a/contrib/dolphin/include/plugin_protocol/dqformat.h b/contrib/dolphin/include/plugin_protocol/dqformat.h index ebcf65c40685487b0d412a81638c0e70390028fd..7d60511ccfdf6b4db6aae82cd91b6f455b320087 100644 --- a/contrib/dolphin/include/plugin_protocol/dqformat.h +++ b/contrib/dolphin/include/plugin_protocol/dqformat.h @@ -140,6 +140,8 @@ void send_network_ok_packet(StringInfo buf, network_mysqld_ok_packet_t *ok_packe void send_general_ok_packet(); void send_network_eof_packet(StringInfo buf); +void send_network_fetch_packet(StringInfo buf); + void send_network_err_packet(StringInfo buf, network_mysqld_err_packet_t *err_packet); diff --git a/contrib/dolphin/include/plugin_protocol/printtup.h b/contrib/dolphin/include/plugin_protocol/printtup.h index d7f8c725509e32583936ad109cd74cbfe1269575..bbbd32e4924df9d74794a5d3aaea503d6cb7e8fa 100644 --- a/contrib/dolphin/include/plugin_protocol/printtup.h +++ b/contrib/dolphin/include/plugin_protocol/printtup.h @@ -24,8 +24,22 @@ #include "postgres.h" #include "tcop/dest.h" +#include "access/printtup.h" extern DestReceiver* dophin_printtup_create_DR(CommandDest dest); extern void dolphin_set_DR_params(DestReceiver *self, List *target_list); +extern void spi_sql_proc_dest_startup(DestReceiver* self, int operation, TupleDesc typeinfo); +extern void SetSqlProcSpiStmtParams(DestReceiver *self, SPIPlanPtr plan); +extern void spi_sql_proc_dest_printtup(TupleTableSlot *slot, DestReceiver *self); +extern DestReceiver* CreateSqlProcSpiDestReciver(CommandDest dest); +struct DR_Dolphin_proc_printtup { + DestReceiver dest; + StringInfoData buf; + bool sendDescrip; + Node* stmt; + TupleDesc attrinfo; /* The attr info we are set up for */ + int nattrs; + PrinttupAttrInfo* myinfo; /* Cached info about each attr */ +}; #endif /* printtup.h */ \ No newline at end of file diff --git a/contrib/dolphin/parallel_schedule_dolphin b/contrib/dolphin/parallel_schedule_dolphin index 4319242e504817de8a8eb49a80184bbafba62a67..c0885f69a3a2042f0e2e40402a5be307aa18ba3d 100644 --- a/contrib/dolphin/parallel_schedule_dolphin +++ b/contrib/dolphin/parallel_schedule_dolphin @@ -70,7 +70,7 @@ test: create_function_test/deterministic create_function_test/language_sql creat test: b_do_statment revoke option test_table_index test_float_double_real_double_precision_MD single_line_trigger prefixkey_index invisible_index -test: alter_function_test/alter_function alter_function_test/alter_procedure alter_function_test/language_sql replace_test/replace +test: alter_function_test/alter_function alter_function_test/alter_procedure alter_function_test/language_sql replace_test/replace multi_select_in_proc test: network2 use_dbname show_create view_definer_test insert_set show_create_database show_variables b_auto_increment diff --git a/contrib/dolphin/plugin_optimizer/commands/functioncmds.cpp b/contrib/dolphin/plugin_optimizer/commands/functioncmds.cpp index 4c82ab706f8f4c6df7f7c58585e87bdd40bcbc22..a5f2a89a4d2e388a5ee90d4da69e50c69b3b1a79 100755 --- a/contrib/dolphin/plugin_optimizer/commands/functioncmds.cpp +++ b/contrib/dolphin/plugin_optimizer/commands/functioncmds.cpp @@ -37,6 +37,8 @@ #ifdef DOLPHIN #include "plugin_nodes/parsenodes_common.h" #include "plugin_nodes/parsenodes.h" +#include "nodes/makefuncs.h" +#include "utils/typcache.h" #endif #include "access/genam.h" #include "access/heapam.h" @@ -92,6 +94,9 @@ #include "tcop/utility.h" #include "tsearch/ts_type.h" #include "commands/comment.h" +#ifdef DOLPHIN +#include "plugin_commands/mysqlmode.h" +#endif #ifdef ENABLE_MOT #include "storage/mot/jit_exec.h" @@ -3332,3 +3337,238 @@ IsThereFunctionInNamespace(const char *proname, int pronargs, NIL, proargtypes->values), get_namespace_name(nspOid)))); } + +#ifdef DOLPHIN +/* + transform out value into ConstType +*/ +Const* processOutResToConst(char* value, Oid atttypid) +{ + Const *con = NULL; + uint len = strlen(value); + char *str_value = (char *)palloc(len + 1); + errno_t rc = strncpy_s(str_value, len + 1, value, len + 1); + securec_check(rc, "\0", "\0"); + str_value[len] = '\0'; + Datum str_datum = CStringGetDatum(str_value); + + /* convert value to const expression. */ + if (atttypid == BOOLOID) { + if (strcmp(str_value, "t") == 0) { + con = makeConst(BOOLOID, -1, InvalidOid, sizeof(bool), BoolGetDatum(true), false, true); + } else { + con = makeConst(BOOLOID, -1, InvalidOid, sizeof(bool), BoolGetDatum(false), false, true); + } + } else { + con = makeConst(UNKNOWNOID, -1, InvalidOid, -2, str_datum, false, false); + } + return con; +} + +/* + * Execute call procedure statememt here + */ +void ExecuteCallStmt(DolphinCallStmt *stmt, ParamListInfo params, bool atomic) +{ + ListCell *lc; + FuncExpr *fexpr; + int nargs; + int i; + AclResult aclresult; + FmgrInfo flinfo; + CallContext *callcontext; + EState *estate; + ExprContext *econtext; + HeapTuple tp; + PgStat_FunctionCallUsage fcusage; + Datum retval; + FunctionCallInfoData fcinfo; + fexpr = stmt->funcexpr; + Assert(fexpr); + Assert(IsA(fexpr, FuncExpr)); + + Oid definer = GetUserId(); + Form_pg_proc procStruct; + bool topCall = false; + /* Get function's pg_proc entry */ + tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(tp)) { + ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), + errmsg("cache lookup failed for function %u", fexpr->funcid))); + } + procStruct = (Form_pg_proc)GETSTRUCT(tp); + /* in b format database , we shoule check definer operation */ + if (procStruct->prosecdef) { + definer = procStruct->proowner; + } + + aclresult = pg_proc_aclcheck(fexpr->funcid, definer, ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(fexpr->funcid)); + + /* ensure it is a procedure and language is plogsql */ + char prokind = get_func_prokind(fexpr->funcid); + if (!PROC_IS_PRO(prokind)) { + ereport(ERROR, (errcode(ERRCODE_PLPGSQL_ERROR), + errmsg("Only support procedure in muiti result call statement"))); + } + Oid prolang = procStruct->prolang; + if (strcasecmp(get_language_name((Oid)prolang), "plpgsql") != 0) { + ereport(ERROR, (errcode(ERRCODE_PLPGSQL_ERROR), + errmsg("Only support procedure with language plpgsql in muiti result call statement"))); + } + + /* Prep the context object we'll pass to the procedure */ + callcontext = makeNode(CallContext); + callcontext->atomic = atomic; + + if (!heap_attisnull(tp, Anum_pg_proc_proconfig, NULL)) + callcontext->atomic = true; + + ReleaseSysCache(tp); + + /* safety check; see ExecInitFunc() */ + nargs = list_length(fexpr->args); + if (nargs > FUNC_MAX_ARGS) + ereport(ERROR, (errcode(ERRCODE_TOO_MANY_ARGUMENTS), + errmsg_plural("cannot pass more than %d argument to a procedure", + "cannot pass more than %d arguments to a procedure", + FUNC_MAX_ARGS, + FUNC_MAX_ARGS))); + + /* Initialize function call structure */ + fmgr_info(fexpr->funcid, &flinfo); + fmgr_info_set_expr((Node *) fexpr, &flinfo); + InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, + (Node *) callcontext, NULL); + + /* + * Evaluate procedure arguments inside a suitable execution context. Note + * we can't free this context till the procedure returns. + */ + estate = CreateExecutorState(); + estate->es_param_list_info = params; + econtext = CreateExprContext(estate); + + /* + * If we're called in non-atomic context, we also have to ensure that the + * argument expressions run with an up-to-date snapshot. Our caller will + * have provided a current snapshot in atomic contexts, but not in + * non-atomic contexts, because the possibility of a COMMIT/ROLLBACK + * destroying the snapshot makes higher-level management too complicated. + */ + if (!atomic) + PushActiveSnapshot(GetTransactionSnapshot()); + + i = 0; + foreach(lc, fexpr->args) + { + ExprState *exprstate; + Datum val; + bool isnull; + exprstate = ExecPrepareExpr((Expr*)lfirst(lc), estate); + val = ExecEvalExprSwitchContext(exprstate, econtext, &isnull, NULL); + + fcinfo.arg[i] = val; + fcinfo.argnull[i] = isnull; + i++; + } + + /* Get rid of temporary snapshot for arguments, if we made one */ + if (!atomic) + PopActiveSnapshot(); + + /* Here we actually call the procedure */ + pgstat_init_function_usage(&fcinfo, &fcusage); + PG_TRY(); + { + if (!GetSessionContext()->is_dolphin_call_stmt) + topCall = true; + GetSessionContext()->is_dolphin_call_stmt = true; + retval = FunctionCallInvoke(&fcinfo); + } + PG_CATCH(); + { + if (topCall) + GetSessionContext()->is_dolphin_call_stmt = false; + PG_RE_THROW(); + } + PG_END_TRY(); + if (topCall) + GetSessionContext()->is_dolphin_call_stmt = false; + pgstat_end_function_usage(&fcusage, true); + + /* Handle the procedure's outputs */ + if (fexpr->funcresulttype == RECORDOID) { + /* send tuple to UserVar */ + HeapTupleHeader td; + Oid tupType; + int32 tupTypmod; + TupleDesc retdesc; + HeapTupleData rettupdata; + TupleTableSlot *slot; + + if (fcinfo.isnull) + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("procedure out parameters shoule be saved"))); + + /* make a tupletableslot and save uservar here */ + td = DatumGetHeapTupleHeader(retval); + tupType = HeapTupleHeaderGetTypeId(td); + tupTypmod = HeapTupleHeaderGetTypMod(td); + retdesc = lookup_rowtype_tupdesc(tupType, tupTypmod); + slot = MakeSingleTupleTableSlot(retdesc); + + rettupdata.t_len = HeapTupleHeaderGetDatumLength(td); + ItemPointerSetInvalid(&(rettupdata.t_self)); + rettupdata.t_tableOid = InvalidOid; + rettupdata.t_data = td; + + slot = ExecStoreTuple(&rettupdata, slot, InvalidBuffer, false); + tableam_tslot_getallattrs(slot); + uint i = 0; + ListCell* lc; + /* set uservar value */ + foreach (lc, stmt->outargs) { + Oid out_func_oid = InvalidOid; + bool isvarlena = false; + FmgrInfo* finfo = (FmgrInfo*)palloc(1 * sizeof(FmgrInfo)); + Oid atttypid = ((UserVar*)lfirst(lc))->value ? ((Const*)((UserVar*)lfirst(lc))->value)->consttype : + slot->tts_tupleDescriptor->attrs[i].atttypid; + getTypeOutputInfo(atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, finfo); + if (slot->tts_values[i] != 0) { + char* ovalue = OutputFunctionCall(finfo, slot->tts_values[i]); + Const *con = processOutResToConst(ovalue, atttypid); + Node* rnode = atttypid == BOOLOID ? (Node*)con : type_transfer((Node*)con, atttypid, true); + Expr *var_expr = (Expr *)const_expression_to_const(rnode); + check_variable_value_info(((UserVar *)lfirst(lc))->name, var_expr); + pfree(ovalue); + } + pfree(finfo); + ++i; + } + ExecDropSingleTupleTableSlot(slot); + ReleaseTupleDesc(retdesc); + } else if (fexpr->funcresulttype != VOIDOID) { + Assert(list_length(stmt->outargs) == 1); + Oid out_func_oid = InvalidOid; + bool isvarlena = false; + UserVar* var = (UserVar*)linitial(stmt->outargs); + Oid atttypid = var->value ? ((Const*)var->value)->consttype :fexpr->funcresulttype; + FmgrInfo* finfo = (FmgrInfo*)palloc(1 * sizeof(FmgrInfo)); + getTypeOutputInfo(atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, finfo); + if (PointerIsValid(retval)) { + char* ovalue = OutputFunctionCall(finfo, retval); + Const *con = processOutResToConst(ovalue, atttypid); + Node* rnode = atttypid == BOOLOID ? (Node*)con : type_transfer((Node*)con, atttypid, true); + Expr *var_expr = (Expr *)const_expression_to_const(rnode); + check_variable_value_info(((UserVar *)lfirst(lc))->name, var_expr); + pfree(ovalue); + } + pfree(finfo); + } + FreeExecutorState(estate); +} +#endif \ No newline at end of file diff --git a/contrib/dolphin/plugin_parser/analyze.cpp b/contrib/dolphin/plugin_parser/analyze.cpp index 8911bd51039216944b853a1d46bb57c711d46a45..ee4d66681846e6bc4df7c234a818319dbc9c93d0 100644 --- a/contrib/dolphin/plugin_parser/analyze.cpp +++ b/contrib/dolphin/plugin_parser/analyze.cpp @@ -109,6 +109,7 @@ #include "utils/jsonb.h" #include "utils/xml.h" #include "utils/typcache.h" +#include "parser/parse_func.h" /* Hook for plugins to get control at end of parse analysis */ THR_LOCAL post_parse_analyze_hook_type post_parse_analyze_hook = NULL; static const int MILLISECONDS_PER_SECONDS = 1000; @@ -168,6 +169,7 @@ static Node* makeConstByType(Form_pg_attribute att_tup); static Node* makeTimetypeConst(Oid targetType, int32 targetTypmod, Oid targetCollation, int16 targetLen, bool targetByval); static Node* makeNotTimetypeConst(Oid targetType, int32 targetTypmod, Oid targetCollation, int16 targetLen, bool targetByval); static List* makeValueLists(ParseState* pstate); +static Query* transformCallStmt(ParseState *pstate, DolphinCallStmt *stmt); #endif /* @@ -621,6 +623,10 @@ Query* transformStmt(ParseState* pstate, Node* parseTree, bool isFirstNode, bool result->commandType = CMD_UTILITY; result->utilityStmt = (Node*)parseTree; break; + + case T_DolphinCallStmt: + result = transformCallStmt(pstate, (DolphinCallStmt *) parseTree); + break; #endif default: @@ -5006,6 +5012,55 @@ static Query* transformCreateTableAsStmt(ParseState* pstate, CreateTableAsStmt* return result; } +#ifdef DOLPHIN +/* + * transformCallStmt - + * transform a call xxx() Statement in B format; + *. + */ + +static Query* transformCallStmt(ParseState *pstate, DolphinCallStmt *stmt) +{ + List *targs; + ListCell *lc; + Node *node; + FuncExpr *fexpr; + HeapTuple proctup; + Query *result; + + targs = NIL; + /* transform all args to expr */ + foreach(lc, stmt->funccall->args) + { + targs = lappend(targs, transformExprRecurse(pstate, (Node *) lfirst(lc))); + } + + /* parse the function/procedure use function name */ + node = ParseFuncOrColumn(pstate, stmt->funccall->funcname, targs, pstate->p_last_srf, + stmt->funccall, stmt->funccall->location, true); + + assign_expr_collations(pstate, node); + + fexpr = castNode(FuncExpr, node); + + /* find procedure object in systemtable */ + proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(fexpr->funcid)); + if (!HeapTupleIsValid(proctup)) + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("could not find procedure %u", fexpr->funcid))); + + stmt->funcexpr = fexpr; + + ReleaseSysCache(proctup); + + /* set as as a utility Query */ + result = makeNode(Query); + result->commandType = CMD_UTILITY; + result->utilityStmt = (Node *) stmt; + + return result; +} +#endif + #ifdef PGXC #ifdef ENABLE_DISTRIBUTE_TEST /* diff --git a/contrib/dolphin/plugin_parser/gram.y b/contrib/dolphin/plugin_parser/gram.y index b9e663ee9e92ac71c341ebf7d588c91d162ad6f6..902811fea9fcc35761559457139786417e929a4d 100644 --- a/contrib/dolphin/plugin_parser/gram.y +++ b/contrib/dolphin/plugin_parser/gram.y @@ -92,6 +92,7 @@ #include "utils/datetime.h" #ifdef DOLPHIN #include "plugin_utils/timestamp.h" +#include "plugin_commands/mysqlmode.h" #endif #include "utils/rel.h" #include "utils/numeric.h" @@ -38756,6 +38757,9 @@ makeCallFuncStmt(List* funcname,List* parameters, bool is_call) } if (IsA(arg, UserVar)) { userVarList = lappend(userVarList,arg); + } else if (SQL_MODE_AllOW_PROCEDURE_WITH_SELECT()) + { + ereport(errstate,(errcode(ERRCODE_UNDEFINED_FUNCTION),errmsg("out args must be uservar type." ))); } } } @@ -38771,6 +38775,14 @@ makeCallFuncStmt(List* funcname,List* parameters, bool is_call) } ReleaseSysCache(proctup); + if (SQL_MODE_AllOW_PROCEDURE_WITH_SELECT()) { + DolphinCallStmt *n = makeNode(DolphinCallStmt); + FuncCall* func= makeFuncCall(funcname, in_parameters,-1); + n->funccall = func; + n->outargs = userVarList; + + return (Node *)n; + } column = makeNode(ColumnRef); column->fields = list_make1(makeNode(A_Star)); column->location = -1; diff --git a/contrib/dolphin/plugin_pl/plpgsql/src/pl_handler.cpp b/contrib/dolphin/plugin_pl/plpgsql/src/pl_handler.cpp index 0025e3ae234bce12880555d8abf25af768ae362d..0531dbfdc2bfa94a57db6bc92f5e5961be805dc3 100644 --- a/contrib/dolphin/plugin_pl/plpgsql/src/pl_handler.cpp +++ b/contrib/dolphin/plugin_pl/plpgsql/src/pl_handler.cpp @@ -711,9 +711,18 @@ Datum b_plpgsql_call_handler(PG_FUNCTION_ARGS) * commit/rollback within stored procedure. * set the nonatomic and will be reused within function. */ +#ifdef DOLPHIN + if (fcinfo->context && IsA(fcinfo->context, FunctionScanState)) { + nonatomic = !castNode(FunctionScanState, fcinfo->context)->atomic; + } else if (fcinfo->context && IsA(fcinfo->context, CallContext)) { + nonatomic = !castNode(CallContext, fcinfo->context)->atomic; + } else { + nonatomic = false; + } +#else nonatomic = fcinfo->context && IsA(fcinfo->context, FunctionScanState) && !castNode(FunctionScanState, fcinfo->context)->atomic; - +#endif /* get cast owner and make sure current user is cast owner when execute cast-func */ GetUserIdAndSecContext(&old_user, &save_sec_context); cast_owner = u_sess->exec_cxt.cast_owner; diff --git a/contrib/dolphin/plugin_postgres.cpp b/contrib/dolphin/plugin_postgres.cpp index 7d94d1e0f4e4c1b19080998ff476a04f8e9fcd9d..ace9ecc2be047e4b88c6c4c10254aa78c5ea4202 100644 --- a/contrib/dolphin/plugin_postgres.cpp +++ b/contrib/dolphin/plugin_postgres.cpp @@ -74,6 +74,9 @@ #include "replication/archive_walreceiver.h" #include "plugin_commands/mysqlmode.h" #include "plugin_protocol/startup.h" +#include "libpq/libpq.h" +#include "plugin_protocol/printtup.h" +#include "plugin_protocol/dqformat.h" #ifdef DOLPHIN #include "plugin_utils/my_locale.h" #endif @@ -95,7 +98,8 @@ static const struct sql_mode_entry sql_mode_options[OPT_SQL_MODE_MAX] = { {"pipes_as_concat", OPT_SQL_MODE_PIPES_AS_CONCAT}, {"ansi_quotes", OPT_SQL_MODE_ANSI_QUOTES}, {"no_zero_date", OPT_SQL_MODE_NO_ZERO_DATE}, - {"pad_char_to_full_length", OPT_SQL_MODE_PAD_CHAR_TO_FULL_LENGTH} + {"pad_char_to_full_length", OPT_SQL_MODE_PAD_CHAR_TO_FULL_LENGTH}, + {"block_return_multi_results", OPT_SQL_MODE_BLOCK_RETURN_MULTI_RESULTS} }; #define DOLPHIN_TYPES_NUM 12 @@ -167,6 +171,9 @@ static bool check_query_cache_type(int* newval, void** extra, GucSource source); static bool check_system_time_zone(char** newval, void** extra, GucSource source); static bool check_time_zone(char** newval, void** extra, GucSource source); static bool check_wait_timeout(int* newval, void** extra, GucSource source); +static int SpiIsExecMultiSelect(PLpgSQL_execstate* estate, PLpgSQL_expr* expr, + PLpgSQL_stmt_execsql* pl_stmt, ParamListInfo paramLI, long tcount, bool* multi_res); +static void SpiMultiSelectException(); #endif static const int LOADER_COL_BUF_CNT = 5; static uint32 dolphin_index; @@ -291,6 +298,10 @@ void init_plugin_object() u_sess->hook_cxt.pluginCCHashEqFuncs = (void*)ccHashEqFuncs; u_sess->hook_cxt.plpgsqlParserSetHook = (void*)b_plpgsql_parser_setup; u_sess->hook_cxt.coreYYlexHook = (void*)core_yylex; + u_sess->hook_cxt.pluginProcDestReciverHook = (void*)CreateSqlProcSpiDestReciver; + u_sess->hook_cxt.pluginSpiReciverParamHook = (void*)SetSqlProcSpiStmtParams; + u_sess->hook_cxt.pluginSpiExecuteMultiResHook =(void*)SpiIsExecMultiSelect; + u_sess->hook_cxt.pluginMultiResExceptionHook =(void*)SpiMultiSelectException; set_default_guc(); if (g_instance.attr.attr_network.enable_dolphin_proto && u_sess->proc_cxt.MyProcPort && @@ -397,6 +408,76 @@ bool ccHashEqFuncs(Oid keytype, CCHashFN *hashfunc, RegProcedure *eqfunc, CCFast return false; } +#define IS_CLIENT_CONN_VALID_PROC_SPI(port) \ + (((port) == NULL) \ + ? false \ + : (((port)->is_logic_conn) ? ((port)->gs_sock.type != GSOCK_INVALID) : ((port)->sock != NO_SOCKET))) + +static int SpiIsExecMultiSelect(PLpgSQL_execstate* estate, PLpgSQL_expr* expr, PLpgSQL_stmt_execsql* pl_stmt, + ParamListInfo param_li, long tcount, bool* multi_res) +{ + bool outPutSelRes = false; + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + int tmpPos = t_thrd.libpq_cxt.PqSendPointer; + int rc; + if (SQL_MODE_AllOW_PROCEDURE_WITH_SELECT() && GetSessionContext()->is_dolphin_call_stmt) { + CachedPlan* cplan = SPI_plan_get_cached_plan(expr->plan); + List *stmt_list = NULL; + if (cplan) + stmt_list = cplan->stmt_list; + if (stmt_list) { + Node *stmt = (Node *)linitial(stmt_list); + if (IsA(stmt, PlannedStmt) && ((PlannedStmt *)stmt)->commandType == CMD_SELECT && + !pl_stmt->into) { + t_thrd.libpq_cxt.PqSendStart += t_thrd.libpq_cxt.PqSendPointer; + u_sess->SPI_cxt._current->dest = DestSqlProcSPI; + outPutSelRes = true; + } + } + if (cplan) + ReleaseCachedPlan(cplan, true); + } + rc = SPI_execute_plan_with_paramlist(expr->plan, param_li, estate->readonly_func, tcount); + if (outPutSelRes) { + if (MyProcPort && MyProcPort->protocol_config->fn_printtup_create_DR && + MyProcPort->protocol_config->fn_printtup_create_DR == dophin_printtup_create_DR) { + StringInfo buf = makeStringInfo(); + send_network_fetch_packet(buf); + DestroyStringInfo(buf); + } else { + const char* strs = "CALL"; + EndCommand(strs, DestRemote); + } + if (IS_CLIENT_CONN_VALID_PROC_SPI(MyProcPort) && (!t_thrd.int_cxt.ClientConnectionLost)) { + CHECK_FOR_INTERRUPTS(); + pq_flush(); + t_thrd.libpq_cxt.PqSendPointer = tmpPos; + } + } + *multi_res = outPutSelRes; + return rc; +} + + +static void SpiMultiSelectException() +{ + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (u_sess->SPI_cxt._current->dest == DestSqlProcSPI) { + if (MyProcPort && MyProcPort->protocol_config->fn_printtup_create_DR && + MyProcPort->protocol_config->fn_printtup_create_DR == dophin_printtup_create_DR) { + StringInfo buf = makeStringInfo(); + send_network_fetch_packet(buf); + DestroyStringInfo(buf); + } else { + const char* strs = "CALL"; + EndCommand(strs, DestRemote); + } + if (IS_CLIENT_CONN_VALID_PROC_SPI(MyProcPort) && (!t_thrd.int_cxt.ClientConnectionLost)) { + CHECK_FOR_INTERRUPTS(); + pq_flush(); + } + } +} /* * check_behavior_compat_options: GUC check_hook for behavior compat options */ @@ -848,6 +929,7 @@ void init_session_vars(void) cxt->is_schema_name = false; cxt->b_stmtInputTypeHash = NULL; cxt->b_sendBlobHash = NULL; + cxt->is_dolphin_call_stmt = false; DefineCustomBoolVariable("dolphin.b_compatibility_mode", "Enable mysql behavior override opengauss's when collision happens.", diff --git a/contrib/dolphin/plugin_protocol/dqformat.cpp b/contrib/dolphin/plugin_protocol/dqformat.cpp index 28bdd5dedb1392bb7448a4721e52ecd28b5af4fd..161cf29cdcb8cdf981c52aec5f7952859613eced 100644 --- a/contrib/dolphin/plugin_protocol/dqformat.cpp +++ b/contrib/dolphin/plugin_protocol/dqformat.cpp @@ -203,6 +203,16 @@ void send_network_eof_packet(StringInfo buf) dq_putmessage(buf->data, buf->len); } +void send_network_fetch_packet(StringInfo buf) +{ + resetStringInfo(buf); + + dq_append_int1(buf, 0xfe); + dq_append_int2(buf, 0x00); /* warning count */ + dq_append_int2(buf, 0x2a); /* status flags */ + + dq_putmessage(buf->data, buf->len); +} void send_network_err_packet(StringInfo buf, network_mysqld_err_packet_t *err_packet) { int errmsg_len; diff --git a/contrib/dolphin/plugin_protocol/printtup.cpp b/contrib/dolphin/plugin_protocol/printtup.cpp index 6063528195050653b77dd6f49d6b0169064e085f..912b06d2962efe09e642cd1ada003992a322d59d 100644 --- a/contrib/dolphin/plugin_protocol/printtup.cpp +++ b/contrib/dolphin/plugin_protocol/printtup.cpp @@ -32,7 +32,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "access/heapam.h" - +#include "catalog/pg_proc.h" #include "plugin_protocol/dqformat.h" #include "plugin_protocol/printtup.h" #include "plugin_protocol/proto_com.h" @@ -46,6 +46,60 @@ static void printtup_destroy(DestReceiver *self); static void SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo, List* targetlist); static void printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs); +static void spi_sql_proc_dest_destroy(DestReceiver *self); +static void spi_sql_proc_dest_shutdown(DestReceiver *self); + +static inline bool check_need_free_varchar_output(const char* str) +{ + return ((char*)str == u_sess->utils_cxt.varcharoutput_buffer); +} +static inline bool check_need_free_numeric_output(const char* str) +{ + return ((char*)str == u_sess->utils_cxt.numericoutput_buffer); +} +static inline bool check_need_free_date_output(const char* str) +{ + return ((char*)str == u_sess->utils_cxt.dateoutput_buffer); +} +DestReceiver* CreateSqlProcSpiDestReciver(CommandDest dest) +{ + DestReceiver* resdr = NULL; + Port* MyProcPort = u_sess->proc_cxt.MyProcPort; + if (MyProcPort && MyProcPort->protocol_config->fn_printtup_create_DR && + MyProcPort->protocol_config->fn_printtup_create_DR == dophin_printtup_create_DR) { + DR_printtup *dr = (DR_printtup *)palloc0(sizeof(DR_printtup)); + dr->pub.receiveSlot = printtup; /* might get changed later */ + dr->pub.rStartup = printtup_startup; + dr->pub.rShutdown = printtup_shutdown; + dr->pub.rDestroy = printtup_destroy; + dr->pub.mydest = DestRemote; + dr->pub.finalizeLocalStream = NULL; + dr->pub.tmpContext = NULL; + dr->sendDescrip = (dest == DestSqlProcSPI); + dr->attrinfo = NULL; + dr->nattrs = 0; + dr->myinfo = NULL; + dr->formats = NULL; + resdr =(DestReceiver*)dr; + } else { + DR_Dolphin_proc_printtup *dr = (DR_Dolphin_proc_printtup *)palloc0(sizeof(DR_Dolphin_proc_printtup)); + dr->dest.receiveSlot = spi_sql_proc_dest_printtup; /* might get changed later */ + dr->dest.rStartup = spi_sql_proc_dest_startup; + dr->dest.rShutdown = spi_sql_proc_dest_shutdown; + dr->dest.rDestroy = spi_sql_proc_dest_destroy; + dr->dest.mydest = dest; + dr->dest.finalizeLocalStream = NULL; + dr->dest.tmpContext = NULL; + dr->sendDescrip = (dest == DestSqlProcSPI); + dr->stmt = NULL; + dr->attrinfo = NULL; + dr->nattrs = 0; + dr->myinfo = NULL; + resdr =(DestReceiver*)dr; + } + return (DestReceiver*)resdr; +} + DestReceiver* dophin_printtup_create_DR(CommandDest dest) { DR_printtup *self = (DR_printtup *)palloc0(sizeof(DR_printtup)); @@ -289,3 +343,277 @@ static void printtup_destroy(DestReceiver *self) { pfree(self); } + +void SetSqlProcSpiStmtParams(DestReceiver *self, SPIPlanPtr plan) +{ + if (self->mydest == DestSqlProcSPI && plan) { + DR_Dolphin_proc_printtup *dr = (DR_Dolphin_proc_printtup *)self; + List* stmts = plan->stmt_list; + if (stmts) { + ListCell* lc = NULL; + foreach (lc, stmts) { + Node* stmt = (Node*)lfirst(lc); + if (IsA(stmt, PlannedStmt)) { + if (((PlannedStmt*)stmt)->canSetTag) { + dr->stmt = stmt; + break; + } + } else if (IsA(stmt, Query)) { + if (((Query*)stmt)->canSetTag) { + dr->stmt = stmt; + break; + } + } else if (list_length(stmts) == 1) { + /* Utility stmts are assumed canSetTag if they're the only stmt */ + dr->stmt = stmt; + break; + } + } + } else { + dr->stmt = NULL; + } + } +} + +void spi_sql_proc_dest_startup(DestReceiver* self, int operation, TupleDesc typeinfo) +{ + DR_Dolphin_proc_printtup *myState = (DR_Dolphin_proc_printtup *)self; + Node* stmt = myState->stmt; + + /* create buffer to be used for all messages */ + initStringInfo(&myState->buf); + + if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) { + /* + * Send portal name to frontend (obsolete cruft, gone in proto 3.0) + * + * If portal name not specified, use "blank" portal. + */ + const char *portalName = "blank"; + + pq_puttextmessage('P', portalName); + } + + /* + * If we are supposed to emit row descriptions, then send the tuple + * descriptor of the tuples. + */ + if (myState->sendDescrip) + SendRowDescriptionMessage(&myState->buf, typeinfo, FetchStatementTargetList(stmt), NULL); +} + +/* ---------------- + * spi_sql_proc_dest_printtup --- print a tuple in protocol 3.0,when call procedure + * ---------------- + */ + +static void spi_proc_printtup_prepare_info(DR_Dolphin_proc_printtup *myState, TupleDesc typeinfo, int numAttrs) +{ + int16 *formats = NULL; + int i; + + /* get rid of any old data */ + if (myState->myinfo != NULL) { + pfree(myState->myinfo); + } + myState->myinfo = NULL; + + myState->attrinfo = typeinfo; + myState->nattrs = numAttrs; + if (numAttrs <= 0) { + return; + } + + myState->myinfo = (PrinttupAttrInfo *)palloc0(numAttrs * sizeof(PrinttupAttrInfo)); + + for (i = 0; i < numAttrs; i++) { + PrinttupAttrInfo *thisState = myState->myinfo + i; + int16 format = (formats ? formats[i] : 0); + + /* + * for analyze global stats, because DN will send sample rows to CN, + * if we encounter droped columns, we should send it to CN. but atttypid of dropped column + * is invalid in pg_attribute, it will generate error, so we should do special process for the reason. + */ + if (typeinfo->attrs[i].attisdropped) { + typeinfo->attrs[i].atttypid = UNKNOWNOID; + } + + thisState->format = format; + if (format == 0) { + getTypeOutputInfo(typeinfo->attrs[i].atttypid, &thisState->typoutput, &thisState->typisvarlena); + fmgr_info(thisState->typoutput, &thisState->finfo); + } else if (format == 1) { + getTypeBinaryOutputInfo(typeinfo->attrs[i].atttypid, &thisState->typsend, &thisState->typisvarlena); + fmgr_info(thisState->typsend, &thisState->finfo); + } else { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unsupported format code: %d", format))); + } + } +} + + +void spi_sql_proc_dest_printtup(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc typeinfo = slot->tts_tupleDescriptor; + DR_Dolphin_proc_printtup *myState = (DR_Dolphin_proc_printtup *)self; + StringInfo buf = &myState->buf; + int natts = typeinfo->natts; + int i; + bool needFree = false; + bool binary = false; + /* just as we define in backend/commands/analyze.cpp */ +#define WIDTH_THRESHOLD 1024 + + /* Set or update my derived attribute info, if needed */ + if (myState->attrinfo != typeinfo || myState->nattrs != natts) + spi_proc_printtup_prepare_info(myState, typeinfo, natts); + +#ifdef PGXC + + /* + * The datanodes would have sent all attributes in TEXT form. But + * if the client has asked for any attribute to be sent in a binary format, + * then we must decode the datarow and send every attribute in the format + * that the client has asked for. Otherwise its ok to just forward the + * datarow as it is + */ + for (i = 0; i < natts; ++i) { + PrinttupAttrInfo *thisState = myState->myinfo + i; + if (thisState->format != 0) + binary = true; + } + + /* + * If we are having DataRow-based tuple we do not have to encode attribute + * values, just send over the DataRow message as we received it from the + * Datanode + */ + if (slot->tts_dataRow != NULL && (pg_get_client_encoding() == GetDatabaseEncoding()) && !binary) { + pq_beginmessage_reuse(buf, 'D'); + appendBinaryStringInfo(buf, slot->tts_dataRow, slot->tts_dataLen); + pq_endmessage_reuse(buf); + StreamTimeSerilizeEnd(t_thrd.pgxc_cxt.GlobalNetInstr); + return; + } +#endif + + /* Make sure the tuple is fully deconstructed */ + tableam_tslot_getallattrs(slot); + + MemoryContext old_context = changeToTmpContext(self); + /* + * Prepare a DataRow message + */ + pq_beginmessage_reuse(buf, 'D'); + + pq_sendint16(buf, natts); + + /* + * send the attributes of this tuple + */ + for (i = 0; i < natts; ++i) { + PrinttupAttrInfo *thisState = myState->myinfo + i; + Datum attr = slot->tts_values[i]; + + /* + * skip null value attribute, + * we need to skip the droped columns for analyze global stats. + */ + if (slot->tts_isnull[i] || typeinfo->attrs[i].attisdropped) { + pq_sendint32(buf, (uint32)-1); + continue; + } + + if (typeinfo->attrs[i].atttypid == ANYARRAYOID && slot->tts_dataRow != NULL) { + /* + * For ANYARRAY type, the not null DataRow-based tuple indicates the value in + * attr had been converted to CSTRING type previously by using anyarray_out. + * just send over the DataRow message as we received it. + */ + pq_sendcountedtext_printtup(buf, (char *)attr, strlen((char *)attr)); + } else { + if (thisState->format == 0) { + /* Text output */ + char *outputstr = NULL; +#ifndef ENABLE_MULTIPLE_NODES + t_thrd.xact_cxt.callPrint = true; +#endif + needFree = false; + switch (thisState->typoutput) { + case F_INT4OUT: + outputstr = output_int32_to_cstring(DatumGetInt32(attr)); + break; + case F_INT8OUT: + outputstr = output_int64_to_cstring(DatumGetInt64(attr)); + break; + case F_BPCHAROUT: + /* support dolphin customizing bpcharout */ + outputstr = OutputFunctionCall(&thisState->finfo, attr); + needFree = true; + break; + case F_VARCHAROUT: + outputstr = output_text_to_cstring((text*)DatumGetPointer(attr)); + needFree = !check_need_free_varchar_output(outputstr); + break; + case F_NUMERIC_OUT: + outputstr = output_numeric_out(DatumGetNumeric(attr)); + needFree = !check_need_free_numeric_output(outputstr); + break; + case F_DATE_OUT: + /* support dolphin customizing dateout */ + outputstr = OutputFunctionCall(&thisState->finfo, attr); + needFree = true; + break; + default: + outputstr = OutputFunctionCall(&thisState->finfo, attr); + needFree = true; + break; + } +#ifndef ENABLE_MULTIPLE_NODES + t_thrd.xact_cxt.callPrint = false; +#endif + pq_sendcountedtext_printtup(buf, outputstr, strlen(outputstr)); + if (needFree) { + pfree(outputstr); + } + } else { + /* Binary output */ + bytea *outputbytes = NULL; + + outputbytes = SendFunctionCall(&thisState->finfo, attr); + pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(buf, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); + pfree(outputbytes); + } + } + } + + (void)MemoryContextSwitchTo(old_context); + + pq_endmessage_reuse(buf); +} + +/* ---------------- + * spi_sql_proc_dest_destroy + * ---------------- + */ +static void spi_sql_proc_dest_shutdown(DestReceiver *self) +{ + DR_Dolphin_proc_printtup *myState = (DR_Dolphin_proc_printtup *)self; + + if (myState->myinfo != NULL) + pfree(myState->myinfo); + myState->myinfo = NULL; + + myState->attrinfo = NULL; +} + +/* ---------------- + * spi_sql_proc_dest_destroy + * ---------------- + */ +static void spi_sql_proc_dest_destroy(DestReceiver *self) +{ + pfree(self); +} diff --git a/contrib/dolphin/plugin_utility.cpp b/contrib/dolphin/plugin_utility.cpp index 7b8057a4ce3811a860c2f7d84622ad626edd7bfc..146774657ecade62545728e4bf2d93077fbe6912 100644 --- a/contrib/dolphin/plugin_utility.cpp +++ b/contrib/dolphin/plugin_utility.cpp @@ -5081,6 +5081,11 @@ void standard_ProcessUtility(processutility_context* processutility_cxt, GetDiagStmt *n = (GetDiagStmt *)parse_tree; getDiagnosticsInfo(n->condInfo, n->hasCondNum, n->condNum); } break; +#ifdef DOLPHIN + case T_DolphinCallStmt: { + ExecuteCallStmt(castNode(DolphinCallStmt, parse_tree), params, false); + } break; +#endif default: { ProcessUtilitySlow(parse_tree, query_string, params, dest, #ifdef PGXC @@ -9561,6 +9566,11 @@ const char* CreateCommandTag(Node* parse_tree) case T_GetDiagStmt: tag = "GET DIAGNOSTICS"; break; +#ifdef DOLPHIN + case T_DolphinCallStmt: + tag = "CALL"; + break; +#endif default: elog(WARNING, "unrecognized node type: %d", (int)nodeTag(parse_tree)); tag = "?\?\?"; diff --git a/contrib/dolphin/sql/multi_select_in_proc.sql b/contrib/dolphin/sql/multi_select_in_proc.sql new file mode 100644 index 0000000000000000000000000000000000000000..2acb4de22c04e9fa229917f7dfe650212ab8aa15 --- /dev/null +++ b/contrib/dolphin/sql/multi_select_in_proc.sql @@ -0,0 +1,187 @@ +create schema multi_select_proc; +set current_schema to 'multi_select_proc'; + +--open parameter sql_mode +set dolphin.sql_mode = 'block_return_multi_results'; + +create table test_1(a int,b int); +create table t (a int); +insert into test_1 values(1,2),(3,4); +insert into t values(123),(789); + +--one select +CREATE PROCEDURE proc_a_1 () as +begin + select * from t; +end; +/ + +call proc_a_1(); + + +-- two select +CREATE PROCEDURE proc_a_2 () as +begin + select * from t; + select * from test_1; +end; +/ + +call proc_a_2(); + +-- with input params +CREATE PROCEDURE proc_a_3 (aa int) as +begin + select aa from t; + select * from test_1; +end; +/ + +call proc_a_3(1); + +-- with while and out param +CREATE PROCEDURE proc_b_1 (aa int, out re1 int,out re2 int) as +declare i int default 1; +begin + re1 = aa +100; + re2 = aa + 1000; + while i<=2 do + i := i+1; + select aa + 1,a from test_1; + end while; + select * from t; +end; +/ + +--user var +call proc_b_1(1,@a,@b); + +set enable_set_variable_b_format = 1; + +call proc_b_1(1,@a,@b); + +select @a; + +select @b; + + +CREATE PROCEDURE proc_b_2 (aa int, out re1 int,out re2 int) as +declare i int default 1; +begin + re1 = aa +100; + re2 = aa + 1000; + while i<=2 do + i := i+1; + insert into test_1 values(6,7); + end while; + select * from test_1; + +end; +/ + +call proc_b_2 (102,@c,@d); + +--check params +select @c; + +select @d; + +set enable_set_variable_b_format = 0; + +--half wrong + +create table tab_1145173(id int,pid int,a1 char(8)); +create table a_1145173(id int,a1 char(8)); +create table b_1145173(id int,a1 char(8)); +--insert; +insert into tab_1145173 values(1,2,'s'),(2,3,'b'),(3,4,'c'),(4,5,'d'); +insert into a_1145173 values(1,'s'),(2,'b'); +insert into b_1145173 values(2,'s'),(3,'b'); + +create or replace procedure pro_1145173() +as +begin +select * from a_1145173 union select * from b_1145173 order by id; +select * from tab_1145173; +select tt_114; +end; +/ + +call pro_1145173(); + +create table tab_1144052(id int,pid int,a1 char(8)); +--insert; +insert into tab_1144052 values(1,2,'s'),(2,3,'b'),(3,4,'c'),(4,5,'d'); + +--proc; +create or replace procedure pro_1144052() +as +begin +with temp_1144052(a1,a2) as (select id,a1 from tab_1144052 where id > 1) select * from temp_1144052; +select * from tab_1144052 start with pid = 4 connect by prior id = pid order by a1; +select avg(id),a1 from tab_1144052 group by a1 having avg(id) > 1; +end; +/ +--func; +create or replace function fun_1144052()return int +as +b int; +begin +select count(*) into b from tab_1144052; +return b; +end; +/ + +call pro_1144052(); + +call fun_1144052(); + +select fun_1144052(); + +select pro_1144052(); + +-- mysql format +set dolphin.sql_mode = 'sql_mode_strict,sql_mode_full_group,pipes_as_concat,ansi_quotes,no_zero_date,pad_char_to_full_length,block_return_multi_results'; + +delimiter // + +CREATE PROCEDURE proc_a_m () +begin + select * from t; +end +// + +delimiter ; + +call proc_a_m(); + +create table testtyp (a int8, b varchar ,c date,d bytea); +insert into testtyp values(123,'abv','2020-01-01','a'); +insert into testtyp values(1123,'abcv','2022-01-01','c'); +insert into testtyp values(NULL,NULL,NULL,NULL); + +delimiter // + +CREATE PROCEDURE proc_a_m1 () +begin + select a,b,c,d from testtyp; + + select a,c,d,b from testtyp; +end +// + +delimiter ; + +call proc_a_m1(); + +create or replace procedure pro_11451713() +as +begin + +end; +/ + +call pro_11451713(); + +drop schema multi_select_proc cascade; +reset current_schema; \ No newline at end of file