From aed0c78e0e9ebf403ed5f1b75ab72dd3f478fd03 Mon Sep 17 00:00:00 2001 From: openGaussDev Date: Fri, 31 Mar 2023 16:36:42 +0800 Subject: [PATCH] fix logical decoding bugs --- src/common/backend/utils/cache/relcache.cpp | 12 +- .../process/threadpool/knl_instance.cpp | 1 - .../storage/replication/logical/decode.cpp | 68 --------- .../replication/logical/logical_parse.cpp | 37 +---- .../replication/logical/logicalfuncs.cpp | 9 +- .../replication/logical/parallel_decode.cpp | 16 +- .../logical/parallel_decode_worker.cpp | 138 +++++++++--------- .../logical/parallel_reorderbuffer.cpp | 2 - .../storage/replication/slotfuncs.cpp | 9 +- src/include/knl/knl_instance.h | 1 - .../replication/parallel_reorderbuffer.h | 2 + 11 files changed, 106 insertions(+), 189 deletions(-) diff --git a/src/common/backend/utils/cache/relcache.cpp b/src/common/backend/utils/cache/relcache.cpp index 64244ab3a2..f22fbfbeb2 100644 --- a/src/common/backend/utils/cache/relcache.cpp +++ b/src/common/backend/utils/cache/relcache.cpp @@ -8293,12 +8293,20 @@ char RelationGetRelReplident(Relation r) bool IsRelationReplidentKey(Relation r, int attno) { - if (RelationGetRelReplident(r) == REPLICA_IDENTITY_FULL) + /* system column is not replica identify key. */ + if (attno <= 0) { + return false; + } + + /* any user attribute is replica identity key for FULL */ + if (r->relreplident == REPLICA_IDENTITY_FULL) { return true; + } Oid replidindex = RelationGetReplicaIndex(r); - if (!OidIsValid(replidindex)) + if (!OidIsValid(replidindex)) { return true; + } Relation idx_rel = RelationIdGetRelation(replidindex); diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 01dc6b8cb8..d09ecb8522 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -287,7 +287,6 @@ static void knl_g_parallel_decode_init(knl_g_parallel_decode_context* pdecode_cx pdecode_cxt->totalNum = 0; pdecode_cxt->edata = NULL; SpinLockInit(&(pdecode_cxt->rwlock)); - SpinLockInit(&(pdecode_cxt->destroy_lock)); } static void knl_g_cache_init(knl_g_cache_context* cache_cxt) diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index f76e78d09d..642c8232b7 100755 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -1149,12 +1149,6 @@ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* only interested in our database */ Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen))); - return; - } - XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) return; @@ -1196,11 +1190,6 @@ static void AreaDecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1241,11 +1230,6 @@ static void DecodeUInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size tuplelen = 0; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &targetNode, NULL, NULL); if (targetNode.dbNode != ctx->slot->data.database) { return; @@ -1286,11 +1270,6 @@ static void AreaDecodeUInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf Size tuplelen = 0; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &targetNode, NULL, NULL); /* output plugin doesn't look for this origin, no need to queue */ @@ -1344,11 +1323,6 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalen_new = 0; char *data_new = XLogRecGetBlockData(r, 0, &datalen_new); Size tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_new))); - return; - } Size datalen_old = 0; /* adapt 64 xid, if this tuple is the first tuple of a new page */ @@ -1362,11 +1336,6 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } datalen_old -= hasCSN ? sizeof(CommitSeqNo) : 0; Size tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1418,11 +1387,6 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalen_new = 0; char *data_new = XLogRecGetBlockData(r, 0, &datalen_new); Size tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", tuplelen_new))); - return; - } Size datalen_old = 0; /* adapt 64 xid, if this tuple is the first tuple of a new page */ @@ -1436,11 +1400,6 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } datalen_old -= hasCSN ? sizeof(CommitSeqNo) : 0; Size tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { @@ -1566,12 +1525,6 @@ static void DecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0); char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0); uint32 toastLen = 0; @@ -1661,11 +1614,6 @@ static void AreaDecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UUPDATE; @@ -1737,11 +1685,6 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; change->origin_id = XLogRecGetOrigin(r); @@ -1790,11 +1733,6 @@ static void AreaDecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } Size datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("datalen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } ReorderBufferChange *change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; change->origin_id = XLogRecGetOrigin(r); @@ -1926,12 +1864,6 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf SizeOfXLUndoHeader + addLen); addLen += metaLen; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalen))); - return; - } - ReorderBufferChange* change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UDELETE; change->origin_id = XLogRecGetOrigin(r); diff --git a/src/gausskernel/storage/replication/logical/logical_parse.cpp b/src/gausskernel/storage/replication/logical/logical_parse.cpp index 00c2b1894f..a057214502 100755 --- a/src/gausskernel/storage/replication/logical/logical_parse.cpp +++ b/src/gausskernel/storage/replication/logical/logical_parse.cpp @@ -544,11 +544,6 @@ void ParseInsertXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); int slotId = worker->slotId; - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), - errmsg("ParseInsertXlog tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) { return; @@ -612,11 +607,6 @@ void ParseUInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); int slotId = worker->slotId; - if (tuplelen == 0 && !AllocSizeIsValid(tuplelen)) { - ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), - errmsg("ParseUinsert tuplelen is invalid(%lu), don't decode it", tuplelen))); - return; - } XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) { @@ -691,11 +681,6 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, data_new = XLogRecGetBlockData(r, 0, &datalen_new); tuplelen_new = datalen_new - SizeOfHeapHeader; - if (tuplelen_new == 0 && !AllocSizeIsValid(tuplelen_new)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_new))); - return; - } /* adapt 64 xid, if this tuple is the first tuple of a new page */ is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; @@ -707,11 +692,6 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(CommitSeqNo); } tuplelen_old = datalen_old - SizeOfHeapHeader; - if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", tuplelen_old))); - return; - } /* output plugin doesn't look for this origin, no need to queue */ if (ParallelFilterByOrigin(ctx, XLogRecGetOrigin(r))) @@ -775,12 +755,6 @@ void ParseUUpdate(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - if (datalenNew == 0 && !AllocSizeIsValid(datalenNew)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), don't decode it", datalenNew))); - return; - } - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - sizeof(CommitSeqNo); char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + sizeof(CommitSeqNo); uint32 toastLen = 0; @@ -887,11 +861,6 @@ void ParseDeleteXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, } datalen = XLogRecGetDataLen(r) - heapDeleteSize; - if (datalen == 0 && !AllocSizeIsValid(datalen)) { - ereport(WARNING, (errmodule(MOD_LOGICAL_DECODE), - errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen))); - return; - } change = ParallelReorderBufferGetChange(ctx->reorder, slotId); change->action = PARALLEL_REORDER_BUFFER_CHANGE_DELETE; @@ -1045,7 +1014,11 @@ void ParseMultiInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf */ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { HeapTupleHeader header; - xlhdr = (xl_multi_insert_tuple *)SHORTALIGN(data); + if ((data - tupledata) % ALIGNOF_SHORT == 0) { + xlhdr = (xl_multi_insert_tuple *)data; + } else { + xlhdr = (xl_multi_insert_tuple *)(data + ALIGNOF_SHORT - (data - tupledata) % ALIGNOF_SHORT); + } data = ((char *)xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; if (datalen != 0 && AllocSizeIsValid((uint)datalen)) { diff --git a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp index a8cf889ae2..27d2b0c8eb 100755 --- a/src/gausskernel/storage/replication/logical/logicalfuncs.cpp +++ b/src/gausskernel/storage/replication/logical/logicalfuncs.cpp @@ -88,8 +88,8 @@ static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran str_lsn_temp = (char *)palloc0(str_lsn_len); rc = sprintf_s(str_lsn_temp, str_lsn_len, "%X/%X", uint32(lsn >> 32), uint32(lsn)); securec_check_ss(rc, "", ""); - values[0] = CStringGetTextDatum(str_lsn_temp); - values[1] = TransactionIdGetDatum(xid); + values[ARR_0] = CStringGetTextDatum(str_lsn_temp); + values[ARR_1] = TransactionIdGetDatum(xid); /* * Assert ctx->out is in database encoding when we're writing textual @@ -100,9 +100,12 @@ static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran } /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ - values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, (size_t)(uint)(ctx->out->len))); + values[ARR_2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, (size_t)(uint)(ctx->out->len))); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + pfree(DatumGetPointer(values[ARR_0])); + pfree(DatumGetPointer(values[ARR_2])); + pfree(str_lsn_temp); p->returned_rows++; } diff --git a/src/gausskernel/storage/replication/logical/parallel_decode.cpp b/src/gausskernel/storage/replication/logical/parallel_decode.cpp index d268ab97c0..0f84cf8afe 100755 --- a/src/gausskernel/storage/replication/logical/parallel_decode.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode.cpp @@ -971,41 +971,43 @@ ParallelStatusData *GetParallelDecodeStatus(uint32 *num) knl_g_parallel_decode_context *pDecodeCxt = &g_instance.comm_cxt.pdecode_cxt[i]; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal) { + FreeStringInfo(&readQueueLen); + FreeStringInfo(&decodeQueueLen); continue; } for (int j = 0; j < result[id].parallelDecodeNum; j++) { - SpinLockAcquire(&pDecodeCxt->destroy_lock); + SpinLockAcquire(&pDecodeCxt->rwlock); ParallelDecodeWorker *worker = g_Logicaldispatcher[i].decodeWorkers[j]; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || worker == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } LogicalQueue *readQueue = worker->changeQueue; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || readQueue == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } uint32 rmask = readQueue->mask; uint32 readHead = pg_atomic_read_u32(&readQueue->writeHead); uint32 readTail = pg_atomic_read_u32(&readQueue->readTail); uint32 readCnt = COUNT(readHead, readTail, rmask); - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); appendStringInfo(&readQueueLen, "queue%d: %u", j, readCnt); - SpinLockAcquire(&pDecodeCxt->destroy_lock); + SpinLockAcquire(&pDecodeCxt->rwlock); LogicalQueue *decodeQueue = worker->LogicalLogQueue; if (!g_Logicaldispatcher[i].active || g_Logicaldispatcher[i].abnormal || decodeQueue == NULL) { escape = true; - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); break; } uint32 dmask = decodeQueue->mask; uint32 decodeHead = pg_atomic_read_u32(&decodeQueue->writeHead); uint32 decodeTail = pg_atomic_read_u32(&decodeQueue->readTail); uint32 decodeCnt = COUNT(decodeHead, decodeTail, dmask); - SpinLockRelease(&pDecodeCxt->destroy_lock); + SpinLockRelease(&pDecodeCxt->rwlock); appendStringInfo(&decodeQueueLen, "queue%d: %u", j, decodeCnt); if (j < result[id].parallelDecodeNum - 1) { diff --git a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp index 899277df0d..37423f8085 100755 --- a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp @@ -198,23 +198,23 @@ static void SetDecodeWorkerThreadState(int slotId, int workId, int state) void ReleaseParallelDecodeResource(int slotId) { knl_g_parallel_decode_context *pDecodeCxt = &g_instance.comm_cxt.pdecode_cxt[slotId]; - - SpinLockAcquire(&pDecodeCxt->destroy_lock); - if (pDecodeCxt->parallelDecodeCtx != NULL) { - MemoryContextDelete(pDecodeCxt->parallelDecodeCtx); - pDecodeCxt->parallelDecodeCtx = NULL; - } - if (pDecodeCxt->logicalLogCtx != NULL) { - MemoryContextDelete(pDecodeCxt->logicalLogCtx); - pDecodeCxt->logicalLogCtx = NULL; - } - SpinLockRelease(&pDecodeCxt->destroy_lock); + MemoryContext decode_cxt = pDecodeCxt->parallelDecodeCtx; + MemoryContext llog_cxt = pDecodeCxt->logicalLogCtx; SpinLockAcquire(&pDecodeCxt->rwlock); + pDecodeCxt->parallelDecodeCtx = NULL; + pDecodeCxt->logicalLogCtx = NULL; g_Logicaldispatcher[slotId].active = false; g_Logicaldispatcher[slotId].abnormal = false; SpinLockRelease(&pDecodeCxt->rwlock); + if (decode_cxt != NULL) { + MemoryContextDelete(decode_cxt); + } + if (llog_cxt != NULL) { + MemoryContextDelete(llog_cxt); + } + ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = false", slotId))); } @@ -639,25 +639,14 @@ int GetLogicalDispatcher() const int maxReaderNum = 20; int maxDispatcherNum = Min(g_instance.attr.attr_storage.max_replication_slots, maxReaderNum); LWLockAcquire(ParallelDecodeLock, LW_EXCLUSIVE); - MemoryContext ctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeDispatcher", - ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); - MemoryContext logctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeLog", - ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); - knl_g_parallel_decode_context *gDecodeCxt = g_instance.comm_cxt.pdecode_cxt; - for (int i = 0; i < maxDispatcherNum; i++) { if (g_Logicaldispatcher[i].active == false) { slotId = i; - errno_t rc = memset_s(&g_Logicaldispatcher[slotId], sizeof(LogicalDispatcher), 0, - sizeof(LogicalDispatcher)); + errno_t rc = + memset_s(&g_Logicaldispatcher[slotId], sizeof(LogicalDispatcher), 0, sizeof(LogicalDispatcher)); securec_check(rc, "", ""); InitLogicalDispatcher(&g_Logicaldispatcher[slotId]); g_Logicaldispatcher[i].active = true; - - ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = true", slotId))); - g_Logicaldispatcher[i].abnormal = false; - gDecodeCxt[i].parallelDecodeCtx = ctx; - gDecodeCxt[i].logicalLogCtx = logctx; break; } } @@ -665,6 +654,17 @@ int GetLogicalDispatcher() if(slotId == -1) { return slotId; } + ereport(LOG, (errmsg("g_Logicaldispatcher[%d].active = true", slotId))); + + MemoryContext ctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeDispatcher", + ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); + MemoryContext logctx = AllocSetContextCreate(g_instance.instance_context, "ParallelDecodeLog", + ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, SHARED_CONTEXT); + knl_g_parallel_decode_context *gDecodeCxt = g_instance.comm_cxt.pdecode_cxt; + + g_Logicaldispatcher[slotId].abnormal = false; + gDecodeCxt[slotId].parallelDecodeCtx = ctx; + gDecodeCxt[slotId].logicalLogCtx = logctx; SpinLockAcquire(&(gDecodeCxt[slotId].rwlock)); int state = gDecodeCxt[slotId].state; @@ -707,56 +707,51 @@ bool CheckWhiteList(const List *whiteList, const char *schema, const char *table */ static bool ParseSchemaAndTableName(List *tableList, List **tableWhiteList) { - ListCell *lc = NULL; - char *str = NULL; - char *startPos = NULL; + ListCell *table_cell = NULL; char *curPos = NULL; - size_t len = 0; chosenTable *cTable = NULL; - bool anySchema = false; - bool anyTable = false; - errno_t rc = 0; - foreach(lc, tableList) { - str = (char*)lfirst(lc); - cTable = (chosenTable *)palloc(sizeof(chosenTable)); + foreach(table_cell, tableList) { + bool anySchema = false; + bool anyTable = false; + char *head = (char*)lfirst(table_cell); + cTable = (chosenTable *)palloc0(sizeof(chosenTable)); - if (*str == '*' && *(str + 1) == '.') { + if (*head == '*' && *(head + 1) == '.') { cTable->schema = NULL; anySchema = true; } - startPos = str; - curPos = str; + curPos = head; while (*curPos != '\0' && *curPos != '.') { curPos++; } - len = (size_t)(curPos - startPos); + size_t schema_len = (size_t)(curPos - head); if (*curPos == '\0') { pfree(cTable); return false; } else { if (!anySchema) { - cTable->schema = (char *)palloc0((len + 1) * sizeof(char)); - errno_t rc = strncpy_s(cTable->schema, len + 1, startPos, len); + cTable->schema = (char *)palloc0((schema_len + 1) * sizeof(char)); + errno_t rc = strncpy_s(cTable->schema, schema_len + 1, head, schema_len); securec_check(rc, "", ""); } curPos++; - startPos = curPos; + head = curPos; - if (*startPos == '*' && *(startPos + 1) == '\0') { + if (*head == '*' && *(head + 1) == '\0') { cTable->table = NULL; anyTable = true; } while (*curPos != '\0') { curPos++; } - len = (size_t)(curPos - startPos); + size_t table_len = (size_t)(curPos - head); if (!anyTable) { - cTable->table = (char *)palloc((len + 1) * sizeof(char)); - rc = strncpy_s(cTable->table, len + 1, startPos, len); + cTable->table = (char *)palloc0((table_len + 1) * sizeof(char)); + errno_t rc = strncpy_s(cTable->table, table_len + 1, head, table_len); securec_check(rc, "", ""); } } @@ -766,40 +761,44 @@ static bool ParseSchemaAndTableName(List *tableList, List **tableWhiteList) } /* - * Parse a rawstring to a list of table names. + * Skip leading spaces. + */ +inline void SkipSpaceForString(char **str) +{ + while (isspace(**str)) { + (*str)++; + } +} + +/* + * Parse a raw string to a list of table names. */ bool ParseStringToWhiteList(char *tableString, List **tableWhiteList) { char *curPos = tableString; - bool finished = false; - List *tableList = NIL; - while (isspace(*curPos)) { - curPos++; - } + SkipSpaceForString(&curPos); if (*curPos == '\0') { return true; } + bool finished = false; + List *tableList = NIL; do { char* tmpName = curPos; while (*curPos != '\0' && *curPos != ',' && !isspace(*curPos)) { curPos++; } - char *tmpEnd = curPos; if (tmpName == curPos) { list_free_deep(tableList); return false; } - while (isspace(*curPos)) { - curPos++; - } + char *tmpEnd = curPos; + SkipSpaceForString(&curPos); if (*curPos == '\0') { finished = true; } else if (*curPos == ',') { curPos++; - while (isspace(*curPos)) { - curPos++; - } + SkipSpaceForString(&curPos); } else { list_free_deep(tableList); return false; @@ -809,13 +808,9 @@ bool ParseStringToWhiteList(char *tableString, List **tableWhiteList) tableList = lappend(tableList, tableName); } while (!finished); - if (!ParseSchemaAndTableName(tableList, tableWhiteList)) { - list_free_deep(tableList); - return false; - } - + bool parseSuccess = ParseSchemaAndTableName(tableList, tableWhiteList); list_free_deep(tableList); - return true; + return parseSuccess; } /* @@ -1235,6 +1230,7 @@ void LogicalReadRecordMain(ParallelDecodeReaderWorker *worker) PG_TRY(); { + int retries = 0; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(u_sess->proc_cxt.MyDatabaseId); @@ -1288,15 +1284,21 @@ void LogicalReadRecordMain(ParallelDecodeReaderWorker *worker) ProcessConfigFile(PGC_SIGHUP); } char *errm = NULL; + const uint32 upperLen = 32; XLogRecord *record = XLogReadRecord(ctx->reader, startptr, &errm); if (errm != NULL) { - const uint32 upperLen = 32; - ereport(LOG, (errmsg("Stop parsing any XLog Record at %X/%X, sleep 1 second: %s.", - (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr, errm))); - + retries++; + if (retries >= XLOG_STREAM_READREC_MAXTRY) { + ereport(ERROR, (errmsg("Stop parsing any XLog Record at %X/%X after %d attempts: %s.", + (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr, retries, errm))); + } const long sleepTime = 1000000L; pg_usleep(sleepTime); continue; + } else if (retries != 0) { + ereport(LOG, (errmsg("Reread XLog Record after %d retries at %X/%X.", retries, + (uint32)(ctx->reader->EndRecPtr >> upperLen), (uint32)ctx->reader->EndRecPtr))); + retries = 0; } startptr = InvalidXLogRecPtr; diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index 677a300771..42f5569089 100755 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -75,8 +75,6 @@ static Size ParallelReorderBufferRestoreChanges(ParallelReorderBuffer *rb, Paral XLogSegNo *segno, int slotId); static void ParallelReorderBufferRestoreChange(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, char *data, int slotId); -void ParallelReorderBufferCleanupTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, - XLogRecPtr lsn = InvalidXLogRecPtr); /* Parallel decoding batch sending unit length is set to 1MB. */ static const int g_batch_unit_length = 1 * 1024 * 1024; diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index 83b614daf3..237b8b246e 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -39,7 +39,6 @@ #define IS_STANDBY_CLUSTER_MODE (IS_SHARED_STORAGE_STANDBY_CLUSTER && \ static_cast(g_instance.attr.attr_common.stream_cluster_run_mode) == RUN_MODE_STANDBY) extern void *internal_load_library(const char *libname); -extern bool PMstateIsRun(void); static void redo_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_content = NULL); #ifndef ENABLE_LITE_MODE static XLogRecPtr create_physical_replication_slot_for_backup(const char* slot_name, bool is_dummy, char* extra); @@ -51,7 +50,7 @@ static void slot_advance(const char* slotname, XLogRecPtr &moveto, NameData &dat void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_content) { - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) { + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) { return; } @@ -85,7 +84,7 @@ void log_slot_create(const ReplicationSlotPersistentData *slotInfo, char* extra_ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra_content) { - if ((!u_sess->attr.attr_sql.enable_slot_log && t_thrd.role != ARCH) || !PMstateIsRun()) { + if ((!u_sess->attr.attr_sql.enable_slot_log && t_thrd.role != ARCH) || RecoveryInProgress()) { return; } @@ -121,7 +120,7 @@ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo, char* extra void log_slot_drop(const char *name) { - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) return; XLogRecPtr Ptr; ReplicationSlotPersistentData xlrec; @@ -155,7 +154,7 @@ void LogCheckSlot() LogicalPersistentData *LogicalSlot = NULL; size = GetAllLogicalSlot(LogicalSlot); - if (!u_sess->attr.attr_sql.enable_slot_log || !PMstateIsRun()) + if (!u_sess->attr.attr_sql.enable_slot_log || RecoveryInProgress()) return; START_CRIT_SECTION(); diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index c57515701a..1c9d9e6fdf 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -692,7 +692,6 @@ typedef struct knl_g_parallel_decode_context { MemoryContext logicalLogCtx; int state; slock_t rwlock; - slock_t destroy_lock; /* redo worker destroy lock */ char* dbUser; char* dbName; int totalNum; diff --git a/src/include/replication/parallel_reorderbuffer.h b/src/include/replication/parallel_reorderbuffer.h index 948ab8bdf0..7c7a1ccb9b 100755 --- a/src/include/replication/parallel_reorderbuffer.h +++ b/src/include/replication/parallel_reorderbuffer.h @@ -391,6 +391,8 @@ extern void WalSndPrepareWriteHelper(StringInfo out, XLogRecPtr lsn, Transaction extern void ParallelReorderBufferUpdateMemory(ParallelReorderBuffer *rb, logicalLog *change, int slotId, bool add); extern void CheckNewTupleMissingToastChunk(ParallelReorderBufferChange *change, bool isHeap); extern void ParallelReorderBufferChildAssignment(ParallelReorderBuffer *prb, logicalLog *logChange); +extern void ParallelReorderBufferCleanupTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn, + XLogRecPtr lsn = InvalidXLogRecPtr); const uint32 max_decode_cache_num = 100000; #endif -- Gitee