2 Star 1 Fork 0

Movead / walminer_for_opengauss

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
wal2sql_ddl.c 13.16 KB
一键复制 编辑 原始数据 按行查看 历史
movead 提交于 2021-01-11 16:19 . ³õʼ´úÂë
/*-------------------------------------------------------------------------
*
* IDENTIFICATION
* wal2sql_ddl.c
*
*-------------------------------------------------------------------------
*/
#include "wal2sql_ddl.h"
#include "catalog/pg_class.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_depend.h"
#include "wm_utils.h"
#include "wal2sql.h"
static DDLData ddldata;
ReorderBufferChange *mchange = NULL;
TransactionEntry *mte = NULL;
static void handle_catalog_for_ddl_trace(void);
static void handle_catalog_for_create_table(void);
static void handle_catalog_for_drop_table(void);
static void assemble_create_table(void);
static void assemble_drop_table(void);
static void assemble_truncate_table(void);
static void get_update_change_list(ReorderBufferChange *change, TransactionEntry *te,
int *ccollist, int *ccolnum);
#define IS_INSERT (REORDER_BUFFER_CHANGE_INSERT == mchange->action)
#define IS_UPDATE (REORDER_BUFFER_CHANGE_UPDATE == mchange->action)
#define IS_DELETE (REORDER_BUFFER_CHANGE_DELETE == mchange->action)
static void*
get_tuple_from_change(bool isnew)
{
void *result = NULL;
if(isnew)
{
if(mchange->data.tp.newtuple)
result = GETSTRUCT(&mchange->data.tp.newtuple->tuple);
}
else
{
if(mchange->data.tp.oldtuple)
result = GETSTRUCT(&mchange->data.tp.oldtuple->tuple);
}
Assert(result);
return result;
}
void
init_ddl_analyse(void)
{
memset(&ddldata, 0, sizeof(DDLData));
}
/*
* DDL解析时,处理每一个系统表修改的入口函数。
* 处理思路为,当前没有在ddl块里,那么需要使用handle_catalog_for_ddl_trace()
* 函数,判断当前是否为一个目前支持的DDL特征修改。如果当前在ddl块里,那么需要
* 根据不通的ddl块,分流处理这个系统表修改。
*
* 注:目前遵从简单策略,后续会改动
*/
void
ddl_handle(ReorderBufferChange *change, TransactionEntry *te)
{
mchange = change;
mte = te;
if(!ddldata.inddl)
{
walminer_debug("[ddl_handle]not inddl");
handle_catalog_for_ddl_trace();
}
else
{
walminer_debug("[ddl_handle]ddldata.ddlKind=%d", ddldata.ddlKind);
switch(ddldata.ddlKind)
{
case DDLNO_TABLE_CREATE:
handle_catalog_for_create_table();
break;
case DDLNO_TABLE_DROP:
handle_catalog_for_drop_table();
break;
default:
break;
}
}
}
static bool
handle_insert_catalog(void)
{
bool traced = false;
if(RelationRelationId == mte->reloid)
{
Form_pg_class fpc = NULL;
fpc = (Form_pg_class)get_tuple_from_change(true);
walminer_debug("[handle_insert_catalog] insert fpc->relkind=%d,%d", fpc->relkind, RELKIND_RELATION);
/* 对pg_class插入了一个RELKIND_RELATION,则为create table */
if(RELKIND_RELATION == fpc->relkind)
{
ddldata.ddlKind = DDLNO_TABLE_CREATE;
ddldata.relpersistence = fpc->relpersistence;
ddldata.reloid = HeapTupleGetOid(&mchange->data.tp.newtuple->tuple);
memcpy(ddldata.relName.data, fpc->relname.data, sizeof(NameData));
if(!get_nsp_by_nspoid(fpc->relnamespace, &ddldata.nspName))
{
elog(ERROR, "Can not find nspoid %u in data dictionary", fpc->relnamespace);
}
walminer_debug("[handle_catalog_for_ddl_trace] ddldata.reloid=%u", ddldata.reloid);
traced = true;
}
}
return traced;
}
static bool
handle_delete_catalog(void)
{
bool traced = false;
if(RelationRelationId == mte->reloid)
{
Form_pg_class fpc = NULL;
fpc = (Form_pg_class)get_tuple_from_change(false);
walminer_debug("[handle_delete_catalog] delete fpc->relkind=%d,%d", fpc->relkind, RELKIND_RELATION);
/* 对pg_class删除了一个RELKIND_RELATION,则为drop table */
if(RELKIND_RELATION == fpc->relkind)
{
ddldata.ddlKind = DDLNO_TABLE_DROP;
ddldata.relpersistence = fpc->relpersistence;
ddldata.reloid = HeapTupleGetOid(&mchange->data.tp.oldtuple->tuple);
memcpy(ddldata.relName.data, fpc->relname.data, sizeof(NameData));
if(!get_nsp_by_nspoid(fpc->relnamespace, &ddldata.nspName))
{
elog(ERROR, "Can not find nspoid %u in data dictionary", fpc->relnamespace);
}
walminer_debug("[handle_catalog_for_ddl_trace] ddldata.reloid=%u", ddldata.reloid);
traced = true;
}
}
return traced;
}
/*
* 调用者必须保证ccollist已申请了足够的空间
*/
static void
get_update_change_list(ReorderBufferChange *change, TransactionEntry *te,
int *ccollist, int *ccolnum)
{
ReorderBufferTupleBuf *tupbuf_new = NULL;
ReorderBufferTupleBuf *tupbuf_old = NULL;
HeapTupleData *tuple_new = NULL;
HeapTupleData *tuple_old = NULL;
TupleDesc tupdesc = NULL;
int natt = 0;
int unsame_loop = 0;
tupdesc = te->tupdesc;
tupbuf_new = change->data.tp.newtuple;
tupbuf_old = change->data.tp.oldtuple;
Assert(tupbuf_new && tupbuf_old);
tuple_new = &tupbuf_new->tuple;
tuple_old = &tupbuf_old->tuple;
for (natt = 0; natt < tupdesc->natts; natt++)
{
Form_pg_attribute attr; /* the attribute itself */
Oid typid; /* type of current attribute */
Oid typoutput; /* output function */
bool typisvarlena;
Datum neworigval; /* possibly toasted Datum */
Datum oldorigval; /* possibly toasted Datum */
bool newisnull; /* column is null? */
bool oldisnull; /* column is null? */
bool same = false;
char *newresstr = NULL;
char *oldresstr = NULL;
attr = TupleDescAttr(tupdesc, natt);
if (attr->attisdropped)
continue;
typid = attr->atttypid;
if(!get_typeoutput_fromdic(typid, &typoutput, &typisvarlena))
elog(ERROR, "Can not find datatype %u", typid);
neworigval = heap_getattr(tuple_new, natt + 1, tupdesc, &newisnull);
oldorigval = heap_getattr(tuple_old, natt + 1, tupdesc, &oldisnull);
if(!newisnull)
{
newresstr = convert_attr_to_str(attr, typoutput, typisvarlena, neworigval, te->toast_list);
}
if(!oldisnull)
{
oldresstr = convert_attr_to_str(attr, typoutput, typisvarlena, oldorigval, te->toast_list);
}
if(newisnull && oldisnull)
same = true;
else if(newisnull || oldisnull)
same = false;
else
{
Assert(!newisnull && !oldisnull);
same = (0 == strcmp(newresstr, oldresstr));
}
if(!same)
{
ccollist[unsame_loop++] = natt;
}
}
*ccolnum = unsame_loop;
}
static bool
handle_update_catalog(void)
{
bool traced = false;
int *c_col_list = NULL;
int c_col_num = 0;
if(RelationRelationId == mte->reloid)
{
Form_pg_class fpc = NULL;
fpc = (Form_pg_class)get_tuple_from_change(false);
walminer_debug("[handle_update_catalog] update fpc->relkind=%d,%d", fpc->relkind, RELKIND_RELATION);
if(RELKIND_RELATION == fpc->relkind)
{
bool inlist = false;
c_col_list = (int*)walminer_malloc(sizeof(int) * mte->tupdesc->natts, 0);
get_update_change_list(mchange, mte, c_col_list, &c_col_num);
inlist = number_in_array(c_col_list, c_col_num, Anum_pg_class_relfilenode - 1);
if(inlist)
{
memcpy(ddldata.relName.data, fpc->relname.data, sizeof(NameData));
if(!get_nsp_by_nspoid(fpc->relnamespace, &ddldata.nspName))
{
elog(ERROR, "Can not find nspoid %u in data dictionary", fpc->relnamespace);
}
assemble_truncate_table();
}
}
}
return traced;
}
static void
handle_catalog_for_ddl_trace(void)
{
bool traced = false;
Assert(IS_INSERT || IS_UPDATE || IS_DELETE);
walminer_debug("[handle_catalog_for_ddl_trace] foot");
if(IS_INSERT)
{
traced = handle_insert_catalog();
}
else if(IS_DELETE)
{
traced = handle_delete_catalog();
}
else if(IS_UPDATE)
{
traced = handle_update_catalog();
}
walminer_debug("[handle_catalog_for_ddl_trace]traced=%d", traced);
ddldata.inddl = traced;
}
/*
* 建表语句,目前解析策略:
* 入口为向pg_class插入一个RELKIND_RELATION类型的tuple
* 需要记录向pg_attribute插入的各个列属性
* 出口为向pg_depend插入的元组(classid=pg_class,objid=curoid)
*/
static void
handle_catalog_for_create_table(void)
{
walminer_debug("[handle_catalog_for_create_table] foot");
if(IS_INSERT)
{
walminer_debug("[handle_catalog_for_create_table] mte->reloid=%d,%d,%d",
mte->reloid,AttributeRelationId, DependRelationId);
if(AttributeRelationId == mte->reloid)
{
Form_pg_attribute fpa = NULL;
fpa = (Form_pg_attribute)get_tuple_from_change(true);
if(0 < fpa->attnum)
{
AttItem *att = NULL;
char *typname = NULL;
att = (AttItem*)walminer_malloc(sizeof(AttItem), 0);
memcpy(att->attName.data, fpa->attname.data, sizeof(NameData));
att->attnum = fpa->attnum;
att->attKindOid = fpa->atttypid;
typname = get_typname_by_typoid(att->attKindOid);
if(!typname)
{
typname = "(UNSURE)";
}
memcpy(att->addKindName.data, typname, strlen(typname));
ddldata.attList = lappend(ddldata.attList, att);
}
}
else if(DependRelationId == mte->reloid)
{
Form_pg_depend fpd = NULL;
fpd = (Form_pg_depend)get_tuple_from_change(true);
walminer_debug("[handle_catalog_for_create_table] fpd->objid=%u,%u",fpd->objid, ddldata.reloid);
if(fpd->objid == ddldata.reloid && RelationRelationId == fpd->classid)
{
assemble_create_table();
}
}
}
}
/*
* DROP表语句,目前解析策略:
* 入口为删除pg_class的一个RELKIND_RELATION类型的tuple
* 出口为删除pg_depend的元组(classid=pg_class,objid=curoid)
*/
static void
handle_catalog_for_drop_table(void)
{
walminer_debug("[handle_catalog_for_drop_table] foot");
if(IS_DELETE)
{
walminer_debug("[handle_catalog_for_drop_table] mte->reloid=%d,%d,%d",
mte->reloid,AttributeRelationId, DependRelationId);
if(DependRelationId == mte->reloid)
{
Form_pg_depend fpd = NULL;
fpd = (Form_pg_depend)get_tuple_from_change(false);
walminer_debug("[handle_catalog_for_drop_table] fpd->objid=%u,%u",fpd->objid, ddldata.reloid);
if(fpd->objid == ddldata.reloid && RelationRelationId == fpd->classid)
{
assemble_drop_table();
}
}
}
}
/* 组装DDL代码区域 */
static void
assemble_create_table(void)
{
StringInfo ddl = NULL;
int attlength = 0;
ListCell *cell = NULL;
AttItem *att = NULL;
int loop = 0;
ddl = &mte->sql.notsql;
attlength = ddldata.attList->length;
appendStringInfo(ddl, "CREATE TABLE %s.%s(", ddldata.nspName.data, ddldata.relName.data);
foreach(cell, ddldata.attList)
{
att = (AttItem*)lfirst(cell);
if(loop == attlength - 1)
appendStringInfo(ddl, "%s %s", att->attName.data, att->addKindName.data);
else
appendStringInfo(ddl, "%s %s,", att->attName.data, att->addKindName.data);
loop++;
}
appendStringInfoString(ddl, ")");
if(ddldata.attList)
list_free_deep(ddldata.attList);
mte->sqlkind = DECODE_SQL_KIND_DDL;
mte->cananalyse = true;
mte->sql.isddl = true;
walminer_debug("[assemble_create_table]%s", ddl->data);
insert_walcontents_tuple(mte);
init_ddl_analyse();
}
static void
assemble_drop_table(void)
{
StringInfo ddl = NULL;
ddl = &mte->sql.notsql;
appendStringInfo(ddl, "DROP TABLE %s.%s", ddldata.nspName.data, ddldata.relName.data);
mte->sqlkind = DECODE_SQL_KIND_DDL;
mte->cananalyse = true;
mte->sql.isddl = true;
walminer_debug("[assemble_drop_table]%s", ddl->data);
insert_walcontents_tuple(mte);
init_ddl_analyse();
}
static void
assemble_truncate_table(void)
{
StringInfo ddl = NULL;
ddl = &mte->sql.notsql;
appendStringInfo(ddl, "TRUNCATE TABLE %s.%s", ddldata.nspName.data, ddldata.relName.data);
mte->sqlkind = DECODE_SQL_KIND_DDL;
mte->cananalyse = true;
mte->sql.isddl = true;
walminer_debug("[assemble_truncate_table]%s", ddl->data);
insert_walcontents_tuple(mte);
init_ddl_analyse();
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/movead/walminer_for_opengauss.git
git@gitee.com:movead/walminer_for_opengauss.git
movead
walminer_for_opengauss
walminer_for_opengauss
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891