diff --git a/doc/he3db/prom.md b/doc/he3db/prom.md index 76203d0d2506dd8f36d116922cb0f890269588cc..90295a99f7ab4d7d02b45b5bf0261181929d4f3f 100644 --- a/doc/he3db/prom.md +++ b/doc/he3db/prom.md @@ -5,8 +5,8 @@ 需要安装prom、promhttp两个库,其中promhttp依赖microhttd库; 安装包下载路径: ``` -链接:https://www.ecpan.cn/web/#/yunpanProxy?path=%2F%23%2Fdrive%2Foutside&data=101563ee2c886c73b05fe6708920cf51HSvCu&isShare=1 -提取码:EGC2S8 +链接:https://www.ecpan.cn/web/#/yunpanProxy?path=%2F%23%2Fdrive%2Foutside&data=fb07a83cb8d5fe8e7e4ce36730bb5488Wlq3edsza2&isShare=1 +提取码:BJZLBD ``` 安装命令: ``` diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 08665481940b965fa94a899ba133985a170900d7..b84997f6bb86ee17f5aa2d765c17018c1a5e1854 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1831,7 +1831,7 @@ ServerLoop(void) PageParallelPush = true; StartALLPageFlushWorker(); } - if (PrometheusClientPID == 0) + if (PrometheusClientPID == 0 && he3db_prom_enable) PrometheusClientPID = StartPrometheusClient(); } @@ -3126,7 +3126,7 @@ reaper(SIGNAL_ARGS) WalWriterPID = StartWalWriter(); if (SecondBufferPID == 0) SecondBufferPID = StartSecondBuffer(); //作用? - if (PrometheusClientPID == 0) + if (PrometheusClientPID == 0 && he3db_prom_enable) PrometheusClientPID = StartPrometheusClient(); if (CleanLogIndexPID == 0) CleanLogIndexPID = StartCleanLogIndex(); @@ -5456,6 +5456,12 @@ sigusr1_handler(SIGNAL_ARGS) /* The autovacuum launcher wants us to start a worker process. */ StartAutovacuumWorker(); } + + if (CheckPostmasterSignal(PMSIGNAL_PROMETHEUSCLIENT_WORKER)) { + if ( PrometheusClientPID == 0 && he3db_prom_enable) { + PrometheusClientPID = StartPrometheusClient(); + } + } /* start Flush Page */ if (!PageParallelPush && CheckPostmasterSignal(PMSIGNAL_PARALLEL_FLUSH_WORKER)) { @@ -5468,12 +5474,6 @@ sigusr1_handler(SIGNAL_ARGS) CleanLogIndexPID = StartCleanLogIndex(); } } - - if (CheckPostmasterSignal(PMSIGNAL_PROMETHEUSCLIENT_WORKER)) { - if ( PrometheusClientPID == 0) { - PrometheusClientPID = StartPrometheusClient(); - } - } // if (CheckPostmasterSignal(PMSIGNAL_SECONDBUFFER_WORKER)) { // if (SecondBufferPID == 0) { diff --git a/src/backend/postmaster/prometheusclient.c b/src/backend/postmaster/prometheusclient.c index 1dc560a401454eadce226b6ffbd61ef38c41a6b3..ef5c670e7be3d873b7e899edad150cc2449f729a 100644 --- a/src/backend/postmaster/prometheusclient.c +++ b/src/backend/postmaster/prometheusclient.c @@ -33,8 +33,14 @@ void initSharedQueue(SharedQueue *queue) { // in void EnqueueShared(SharedQueue *queue, double item, int type) { - while (queue->count == MAX_SIZE) { - pg_usleep(1000L); + // avoid block + int loop = 0; + while (queue->count == MAX_SIZE && loop <3 ) { + pg_usleep(500L); + loop++; + } + if (loop !=0) { + return; } SharedQueueData q={type, item}; SpinLockAcquire(&queue->s_lock); @@ -61,11 +67,7 @@ SharedQueueData dequeueShared(SharedQueue *queue) { } int GetSizeShared(SharedQueue *queue) { - SpinLockAcquire(&queue->s_lock); - int size = queue->count; - SpinLockRelease(&queue->s_lock); - - return size; + return queue->count; } /* @@ -177,6 +179,25 @@ void putValToFlushWalHistogram(double hist_value) { } } +void consumeQueueTOProm(){ + while(GetSizeShared(prom_double_queue)>0) + { + SharedQueueData val = dequeueShared(prom_double_queue); + switch (val.type) { + case DOUBLE_LOGINDEX_GET_BY_PAGE: + if (val.data >= 0) { + putValToLogIndexHistogram(val.data); + } + case DOUBLE_FLUSHWAL_COST: + if (val.data >= 0) { + putValToFlushWalHistogram(val.data); + } + } + } + // may add new dequeueShared here + +}; + void PrometheusClientMain(void) { init(); MyBackendType = B_PROMETHEUSCLIENT; @@ -220,26 +241,12 @@ void PrometheusClientMain(void) { // exit when postmaster stop ResetLatch(MyLatch); if (ShutdownRequestPending) { + consumeQueueTOProm(); prom_collector_registry_destroy(PROM_COLLECTOR_REGISTRY_DEFAULT); MHD_stop_daemon(daemon); proc_exit(0); } - while(GetSizeShared(prom_double_queue)>0) - { - SharedQueueData val = dequeueShared(prom_double_queue); - switch (val.type) { - case DOUBLE_LOGINDEX_GET_BY_PAGE: - if (val.data >= 0) { - putValToLogIndexHistogram(val.data); - } - case DOUBLE_FLUSHWAL_COST: - if (val.data >= 0) { - putValToFlushWalHistogram(val.data); - } - } - } - // may add new dequeueShared here - + consumeQueueTOProm(); pg_usleep(1000L); (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, diff --git a/src/backend/storage/lmgr/he3db_logindex.c b/src/backend/storage/lmgr/he3db_logindex.c index b12b36c8ddde9c0f1d0253350fc09a8b4a11a8db..652134566e81e420bf3b53fbe59cbdb5691e99ec 100644 --- a/src/backend/storage/lmgr/he3db_logindex.c +++ b/src/backend/storage/lmgr/he3db_logindex.c @@ -1,5 +1,6 @@ #include +#include #include "postgres.h" #include "storage/he3db_logindex.h" @@ -464,8 +465,10 @@ void InsertLogIndexByPage(const BufferTag *page, XLogRecPtr lsn) LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn) { - clock_t start, end; - start = clock();//start + struct timeval tv; + long start, end; + gettimeofday(&tv,NULL); + start =tv.tv_sec*1000000 + tv.tv_usec; LsnNode *head_node; LsnNode *tail; uint64 tbl_index; @@ -506,8 +509,9 @@ LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecP }else{ LWLockRelease(LogIndexMemListLock); if(he3db_prom_enable){ - end = clock();//end - double cost_time = (double)(end - start) / CLOCKS_PER_SEC * 1000000.0; + gettimeofday(&tv,NULL); + end = tv.tv_sec*1000000 + tv.tv_usec; + double cost_time = (double)(end - start); EnqueueShared(prom_double_queue, cost_time, DOUBLE_LOGINDEX_GET_BY_PAGE); } return head_node; @@ -541,8 +545,9 @@ LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecP }else{ LWLockRelease(LogIndexMemListLock); if(he3db_prom_enable){ - end = clock();//end - double cost_time = (double)(end - start) / CLOCKS_PER_SEC * 1000000.0; + gettimeofday(&tv,NULL); + end = tv.tv_sec*1000000 + tv.tv_usec; + double cost_time = (double)(end - start); EnqueueShared(prom_double_queue, cost_time, DOUBLE_LOGINDEX_GET_BY_PAGE); } return head_node; @@ -556,16 +561,18 @@ LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecP } LWLockRelease(LogIndexMemListLock); if(he3db_prom_enable){ - end = clock();//end - double cost_time = (double)(end - start) / CLOCKS_PER_SEC * 1000000.0; + gettimeofday(&tv,NULL); + end = tv.tv_sec*1000000 + tv.tv_usec; + double cost_time = (double)(end - start); EnqueueShared(prom_double_queue, cost_time, DOUBLE_LOGINDEX_GET_BY_PAGE); } return head_node; } LWLockRelease(LogIndexMemListLock); if(he3db_prom_enable){ - end = clock();//end - double cost_time = (double)(end - start) / CLOCKS_PER_SEC * 1000000.0; + gettimeofday(&tv,NULL); + end = tv.tv_sec*1000000 + tv.tv_usec; + double cost_time = (double)(end - start); EnqueueShared(prom_double_queue, cost_time, DOUBLE_LOGINDEX_GET_BY_PAGE); } return head_node;