From de986e6cb5b1219cac11762a033669d71616e3cb Mon Sep 17 00:00:00 2001 From: suixinpr Date: Mon, 24 Oct 2022 18:56:55 +0800 Subject: [PATCH 01/12] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=BD=93byte=5Fbuffer=EF=BC=8C=E5=AE=9A=E4=B9=89SHM=E5=92=8C?= =?UTF-8?q?=E7=9B=B8=E5=BA=94=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure-shm.c | 80 +++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 src/backend/libpq/be-secure-shm.c diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c new file mode 100644 index 00000000000..c1503f93fa8 --- /dev/null +++ b/src/backend/libpq/be-secure-shm.c @@ -0,0 +1,80 @@ +#ifdef USE_SHM + +#include "postgres.h" +#include "libpq/libpq.h" + +ssize_t bb_putbyte(byte_buffer *bb, char* ptr, size_t len) +{ + size_t w_len; + + // 可写空间满了 + if (bb->write_idx >= bb->capacity) { + return -1; + } + + // 计算写入字节数 + w_len = bb->capacity - bb->write_idx; + if (w_len > len) { + w_len = len; + } + + // 写入 + memcpy(bb->data + bb->write_idx, ptr, w_len); + bb->write_idx += w_len; + + return w_len; +} + +// ptr allocated by caller +ssize_t bb_getbyte(byte_buffer *bb, char* ptr, size_t len) +{ + size_t r_len; + + // 没有数据 + if (bb->write_idx == 0) { + return 0; + } + + // 计算读取字节数 + r_len = bb->write_idx - bb->read_idx; + if (r_len > len) { + r_len = len; + } + + // 读取 + memcpy(ptr, bb->data, r_len); + bb->read_idx += w_len; + if (bb->read_idx == bb->write_idx) { + bb_clear(); + } + + return r_len; +} + +void bb_clear() { + bb->read_idx = 0; + bb->write_idx = 0; +} + + +void shm_open_client() +{ + return +} + +void shm_connect() +{ + return +} + +ssize_t shm_read(SHM* shm, char* ptr, size_t len) +{ + return 0 +} + +ssize_t shm_write(SHM* shm, char* ptr, size_t len) +{ + return 0 +} + +#endif \ No newline at end of file -- Gitee From 2f88737bae9c6dfdf1f2b64cd514e1f9c618c839 Mon Sep 17 00:00:00 2001 From: suixinpr Date: Mon, 24 Oct 2022 18:58:59 +0800 Subject: [PATCH 02/12] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=BD=93byte=5Fbuffer=EF=BC=8C=E5=AE=9A=E4=B9=89SHM=E5=92=8C?= =?UTF-8?q?=E7=9B=B8=E5=BA=94=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure-shm.c | 8 ++++---- src/include/libpq/libpq-be.h | 32 +++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c index c1503f93fa8..f46ed6d7d44 100644 --- a/src/backend/libpq/be-secure-shm.c +++ b/src/backend/libpq/be-secure-shm.c @@ -59,22 +59,22 @@ void bb_clear() { void shm_open_client() { - return + return; } void shm_connect() { - return + return; } ssize_t shm_read(SHM* shm, char* ptr, size_t len) { - return 0 + return 0; } ssize_t shm_write(SHM* shm, char* ptr, size_t len) { - return 0 + return 0; } #endif \ No newline at end of file diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 5d07b782237..023ff5d0de4 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -194,6 +194,10 @@ typedef struct Port X509 *peer; unsigned long count; #endif + +#ifdef USE_SHM + SHM *shm; +#endif } Port; #ifdef USE_SSL @@ -227,3 +231,31 @@ extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); #endif /* LIBPQ_BE_H */ + +#ifdef USE_SHM + +typedef struct byte_buffer +{ + char *data; /* 存储实际数据 */ + size_t read_idx; + size_t write_idx; + size_t capacity; /* 容量 */ +} byte_buffer; + +extern ssize_t bb_putbyte(byte_buffer bbuff,char* ptr, size_t len); +extern ssize_t bb_getbyte(char* ptr, size_t len); +extern void bb_clear(); + +typedef struct SHM +{ + int state; + char *send_buf; + char *recv_buf; +} SHM; + +extern void shm_open_client(); +extern void shm_connect(); +extern ssize_t shm_read(SHM* shm, char* ptr, size_t len); +extern ssize_t shm_write(SHM* shm, char* ptr, size_t len); + +#endif \ No newline at end of file -- Gitee From a8d259ed5556d131b4e7fab4cc7a8e281fee4488 Mon Sep 17 00:00:00 2001 From: ljy Date: Mon, 24 Oct 2022 07:04:56 -0400 Subject: [PATCH 03/12] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=85=B1=E4=BA=AB?= =?UTF-8?q?=E5=86=85=E5=AD=98=E7=9B=B8=E5=85=B3=E5=AE=9A=E4=B9=89=EF=BC=8C?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E5=85=B1=E4=BA=AB=E5=86=85=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure.c | 48 ++++++++++++++++++++++++++++- src/backend/postmaster/postmaster.c | 2 ++ src/include/libpq/libpq-be.h | 1 + src/include/libpq/pqcomm.h | 47 ++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 1 deletion(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index cdd07d577b0..d243444dcb0 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -37,7 +37,8 @@ #include "utils/memutils.h" #include "storage/ipc.h" #include "storage/proc.h" - +#include "libpq/libpq-be.h" +#include char *ssl_cert_file; char *ssl_key_file; @@ -295,3 +296,48 @@ secure_raw_write(Port *port, const void *ptr, size_t len) return n; } + +static PQShmArea* libpq_shm_area = NULL; + +void be_init_pq_shm_area() +{ + int fd; + int i; + + Assert(!libpq_shm_area); + remove(PQShmPath); + + fd = open(PQShmPath, O_CREAT | O_RDWR); + if (fd < 0) { + fprintf(stderr, "creat error %s\n", strerror(errno)); + exit(0); + } + + if(lseek(fd, PQShmAreaSize, SEEK_CUR) == -1) + { + fprintf(stderr, "lseek error %s\n", strerror(errno)); + exit(0); + } + + write(fd, "a", 1); + + libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE, MAP_SHARED, + fd, 0); + if (libpq_shm_area == MAP_FAILED) { + fprintf(stderr, "mmap %s\n", strerror(errno)); + exit(0); + } + + memset((void*)libpq_shm_area, 0, PQShmAreaSize); + libpq_shm_area->manifest.shm_file_fd = fd; + for (i = 0; i < PQ_SHM_MAX_CONNECTION; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + + if (sem_init(&conn->sem, 1, 0) < 0) { + elog(FATAL, "sem_init error %s\n", strerror(errno)); + exit(0); + } + conn->status = PQ_SHM_STATUS_EMPTY; + } + fprintf(stderr, "init shm ok\n"); +} \ No newline at end of file diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index bbef1a119da..bf2650a4d9a 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1326,6 +1326,8 @@ PostmasterMain(int argc, char *argv[]) /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); + be_init_pq_shm_area(); + status = ServerLoop(); /* diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 5d07b782237..ce618a3396b 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -226,4 +226,5 @@ extern int pq_setkeepalivesidle(int idle, Port *port); extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); +extern void be_init_pq_shm_area(); #endif /* LIBPQ_BE_H */ diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 1d063d12489..b6347932663 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -22,6 +22,7 @@ #include #endif #include +#include #ifdef HAVE_STRUCT_SOCKADDR_STORAGE @@ -203,4 +204,50 @@ typedef struct CancelRequestPacket */ #define NEGOTIATE_SSL_CODE PG_PROTOCOL(1234,5679) +/* libpq 共享内存通讯 */ + + +/* 8K */ +#define PQ_SHM_BUFFER_SIZE (8 *1024) +#define PQ_SHM_MAX_CONNECTION (512) + +#define PQ_SHM_STATUS_EMPTY (0); +#define PQ_SHM_STATUS_CONNECTING (1); +#define PQ_SHM_STATUS_ESTABLISHED (2); +#define PQ_SHM_STATUS_CLOSED (3); + +typedef struct { + int shm_file_fd; + int max_connections; /* 最大连接数 */ + int empty_connections; /* 空闲连接数据 */ + int established_connections; /* 已经建立了多少连接 */ + int bytes_per_conn; /* 每个连接用了多少内存 */ +} PQShmManifest; /* 记录一些摘要信息,后续可以考虑通过函数打印出来 */ + +typedef struct { + uint32 read_index; + uint32 write_index; + char buffer[PQ_SHM_BUFFER_SIZE]; +} PQShmByteBuffer; + +typedef struct { + sem_t sem; /* lock for this connection */ + uint32 status; /*SEE PQ_SHM_STATUS_* */ + uint32 last_active_tm; /*last active timestamp*/ + PQShmByteBuffer client_to_server; + PQShmByteBuffer server_to_client; +} PQShmConnection; + + +/* 整块共享的内存区域 */ +typedef struct { + PQShmManifest manifest; + PQShmConnection connections[FLEXIBLE_ARRAY_MEMBER]; +} PQShmArea; + +#define PQShmAreaSize \ + (offsetof(PQShmArea, connections) + sizeof(PQShmConnection) * PQ_SHM_MAX_CONNECTION) + +#define PQShmPath "/tmp/.s.PGSQL.shm" + #endif /* PQCOMM_H */ -- Gitee From f2316e2f6d1f13af8eeed3bcec455ceccb05fccb Mon Sep 17 00:00:00 2001 From: suixinpr Date: Tue, 25 Oct 2022 12:27:07 +0800 Subject: [PATCH 04/12] =?UTF-8?q?=E5=8A=A0=E5=85=A5shm=5Fread=E3=80=81shm?= =?UTF-8?q?=5Fwrite=E5=8F=8A=E7=9B=B8=E5=BA=94=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure-shm.c | 47 +++++++++++++++++++++-------- src/backend/libpq/be-secure.c | 50 ++++++++++++++++++++++++++++++- src/include/libpq/libpq-be.h | 31 ++----------------- 3 files changed, 86 insertions(+), 42 deletions(-) diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c index f46ed6d7d44..c79cca7e51f 100644 --- a/src/backend/libpq/be-secure-shm.c +++ b/src/backend/libpq/be-secure-shm.c @@ -1,25 +1,24 @@ -#ifdef USE_SHM - #include "postgres.h" #include "libpq/libpq.h" +#include "libpg/pgcomm.h" ssize_t bb_putbyte(byte_buffer *bb, char* ptr, size_t len) { size_t w_len; // 可写空间满了 - if (bb->write_idx >= bb->capacity) { + if (bb->write_idx >= PQ_SHM_BUFFER_SIZE) { return -1; } // 计算写入字节数 - w_len = bb->capacity - bb->write_idx; + w_len = PQ_SHM_BUFFER_SIZE - bb->write_idx; if (w_len > len) { w_len = len; } // 写入 - memcpy(bb->data + bb->write_idx, ptr, w_len); + memcpy(bb->buffer + bb->write_idx, ptr, w_len); bb->write_idx += w_len; return w_len; @@ -42,7 +41,7 @@ ssize_t bb_getbyte(byte_buffer *bb, char* ptr, size_t len) } // 读取 - memcpy(ptr, bb->data, r_len); + memcpy(ptr, bb->buffer, r_len); bb->read_idx += w_len; if (bb->read_idx == bb->write_idx) { bb_clear(); @@ -56,7 +55,6 @@ void bb_clear() { bb->write_idx = 0; } - void shm_open_client() { return; @@ -67,14 +65,37 @@ void shm_connect() return; } -ssize_t shm_read(SHM* shm, char* ptr, size_t len) +ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) { - return 0; + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_getbyte(conn->client_to_server, ptr, len); + // conn->last_active_tm = + sem_post(conn->sem); // 释放锁 + + return n; } -ssize_t shm_write(SHM* shm, char* ptr, size_t len) +ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) { - return 0; -} + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + +rloop: + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_putbyte(conn->server_to_client, ptr, len); + // conn->last_active_tm = + sem_post(conn->sem); // 释放锁 -#endif \ No newline at end of file + return n; +} \ No newline at end of file diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index d243444dcb0..e1c778d7948 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -125,8 +125,32 @@ secure_read(Port *port, void *ptr, size_t len) int waitfor; retry: -#ifdef USE_SSL waitfor = 0; + + if (false) + { +#define TRYLIMIT 10 + int trytime; + PQShmConnection* conn; + +rloop: + conn = port->shm_conn; + n = shm_read(conn, ptr, len); + if (n == -1) + { + if (trytime++ >= TRYLIMIT) { + /* 达到最大尝试次数 */ + conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + } + else +#ifdef USE_SSL if (port->ssl_in_use) { n = be_tls_read(port, ptr, len, &waitfor); @@ -228,6 +252,30 @@ secure_write(Port *port, void *ptr, size_t len) retry: waitfor = 0; + + if (false) + { +#define TRYLIMIT 10 + int trytime; + PQShmConnection* conn; + +rloop: + conn = port->shm_conn; + n = shm_write(conn, ptr, len); + if (n == -1) + { + if (trytime++ >= TRYLIMIT) { + /* 达到最大尝试次数 */ + conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + } + else #ifdef USE_SSL if (port->ssl_in_use) { diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 67297a0c919..857d0d821fa 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -195,9 +195,7 @@ typedef struct Port unsigned long count; #endif -#ifdef USE_SHM - SHM *shm; -#endif + PQShmConnection* shm_conn; } Port; #ifdef USE_SSL @@ -233,30 +231,7 @@ extern int pq_setkeepalivescount(int count, Port *port); extern void be_init_pq_shm_area(); #endif /* LIBPQ_BE_H */ -#ifdef USE_SHM - -typedef struct byte_buffer -{ - char *data; /* 存储实际数据 */ - size_t read_idx; - size_t write_idx; - size_t capacity; /* 容量 */ -} byte_buffer; - -extern ssize_t bb_putbyte(byte_buffer bbuff,char* ptr, size_t len); -extern ssize_t bb_getbyte(char* ptr, size_t len); -extern void bb_clear(); - -typedef struct SHM -{ - int state; - char *send_buf; - char *recv_buf; -} SHM; - extern void shm_open_client(); extern void shm_connect(); -extern ssize_t shm_read(SHM* shm, char* ptr, size_t len); -extern ssize_t shm_write(SHM* shm, char* ptr, size_t len); - -#endif +extern ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len); +extern ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len); -- Gitee From 5dd65b0cfff71a8edeaa579ac185c7e3cb5284c2 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 02:02:41 -0400 Subject: [PATCH 05/12] psql connect success --- src/backend/libpq/be-secure.c | 40 ++++++++++-------- src/backend/postmaster/postmaster.c | 2 +- src/bin/psql/help.c | 1 + src/bin/psql/startup.c | 16 ++++++-- src/include/libpq/libpq-be.h | 2 +- src/include/libpq/pqcomm.h | 33 ++++++++++++--- src/interfaces/libpq/Makefile | 2 +- src/interfaces/libpq/fe-connect.c | 63 +++++++++++++++++++++++++++++ src/interfaces/libpq/fe-shm.c | 63 +++++++++++++++++++++++++++++ src/interfaces/libpq/libpq-fe.h | 8 +++- src/interfaces/libpq/libpq-int.h | 3 ++ 11 files changed, 204 insertions(+), 29 deletions(-) create mode 100644 src/interfaces/libpq/fe-shm.c diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index d243444dcb0..653b9277dd4 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -39,6 +39,7 @@ #include "storage/proc.h" #include "libpq/libpq-be.h" #include +#include char *ssl_cert_file; char *ssl_key_file; @@ -299,29 +300,33 @@ secure_raw_write(Port *port, const void *ptr, size_t len) static PQShmArea* libpq_shm_area = NULL; -void be_init_pq_shm_area() +static void must_create_shm_file() { + FILE* fp = fopen(PQShmPath, "w+"); /* The stream is positioned at the beginning of the file */ + if (!fp) { + fprintf(stderr, "cant open shm file %s\n", strerror(errno)); + exit(0); + } + + fseek(fp, PQShmAreaSize, SEEK_CUR); + fwrite("a", 1, 1, fp); + fclose(fp); +} + +void be_pq_init_shm_area(void) { int fd; int i; - Assert(!libpq_shm_area); - remove(PQShmPath); + must_create_shm_file(); - fd = open(PQShmPath, O_CREAT | O_RDWR); + Assert(!libpq_shm_area); + + fd = open(PQShmPath, O_RDWR); if (fd < 0) { - fprintf(stderr, "creat error %s\n", strerror(errno)); + fprintf(stderr, "open shm file error %s\n", strerror(errno)); exit(0); } - - if(lseek(fd, PQShmAreaSize, SEEK_CUR) == -1) - { - fprintf(stderr, "lseek error %s\n", strerror(errno)); - exit(0); - } - - write(fd, "a", 1); - - libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE, MAP_SHARED, + libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0); if (libpq_shm_area == MAP_FAILED) { fprintf(stderr, "mmap %s\n", strerror(errno)); @@ -329,11 +334,14 @@ void be_init_pq_shm_area() } memset((void*)libpq_shm_area, 0, PQShmAreaSize); + libpq_shm_area->manifest.shm_file_fd = fd; + libpq_shm_area->manifest.max_connections = PQ_SHM_MAX_CONNECTION; + for (i = 0; i < PQ_SHM_MAX_CONNECTION; i++) { PQShmConnection* conn = &libpq_shm_area->connections[i]; - if (sem_init(&conn->sem, 1, 0) < 0) { + if (sem_init(&conn->sem, 1, 1) < 0) { elog(FATAL, "sem_init error %s\n", strerror(errno)); exit(0); } diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index bf2650a4d9a..02ffde8cd74 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1326,7 +1326,7 @@ PostmasterMain(int argc, char *argv[]) /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); - be_init_pq_shm_area(); + be_pq_init_shm_area(); status = ServerLoop(); diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index ea6cb840815..4b7c22442e9 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -140,6 +140,7 @@ usage(unsigned short int pager) fprintf(output, _(" -U, --username=USERNAME database user name (default: \"%s\")\n"), env); fprintf(output, _(" -w, --no-password never prompt for password\n")); fprintf(output, _(" -W, --password force password prompt (should happen automatically)\n")); + fprintf(output, _(" -m, --shm use shard memory\n")); fprintf(output, _("\nFor more information, type \"\\?\" (for internal commands) or \"\\help\" (for SQL\n" "commands) from within psql, or consult the psql section in the PostgreSQL\n" diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c index 111593cd9da..cbd515902d7 100644 --- a/src/bin/psql/startup.c +++ b/src/bin/psql/startup.c @@ -28,6 +28,7 @@ #include "mainloop.h" #include "fe_utils/print.h" #include "settings.h" +#include "libpq/pqcomm.h" @@ -75,6 +76,7 @@ struct adhoc_opts char *port; char *username; char *logfilename; + bool use_shm; bool no_readline; bool no_psqlrc; bool single_txn; @@ -215,7 +217,7 @@ main(int argc, char *argv[]) /* loop until we have a password if requested by backend */ do { -#define PARAMS_ARRAY_SIZE 8 +#define PARAMS_ARRAY_SIZE 9 const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords)); const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values)); @@ -234,8 +236,10 @@ main(int argc, char *argv[]) values[5] = pset.progname; keywords[6] = "client_encoding"; values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto"; - keywords[7] = NULL; - values[7] = NULL; + keywords[7] = "shm_path"; + values[7] = options.use_shm ? PQShmPath : NULL; + keywords[8] = NULL; + values[8] = NULL; new_pass = false; pset.db = PQconnectdbParams(keywords, values, true); @@ -424,6 +428,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) {"html", no_argument, NULL, 'H'}, {"list", no_argument, NULL, 'l'}, {"log-file", required_argument, NULL, 'L'}, + {"shm", no_argument, NULL, 'm'}, {"no-readline", no_argument, NULL, 'n'}, {"single-transaction", no_argument, NULL, '1'}, {"output", required_argument, NULL, 'o'}, @@ -453,7 +458,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) memset(options, 0, sizeof *options); - while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01", + while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:mno:p:P:qR:sStT:U:v:VwWxXz?01", long_options, &optindex)) != -1) { switch (c) @@ -507,6 +512,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) case 'L': options->logfilename = pg_strdup(optarg); break; + case 'm': + options->use_shm = true; + break; case 'n': options->no_readline = true; break; diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 67297a0c919..d8e92e2e96c 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -230,7 +230,7 @@ extern int pq_setkeepalivesidle(int idle, Port *port); extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); -extern void be_init_pq_shm_area(); +extern void be_pq_init_shm_area(void); #endif /* LIBPQ_BE_H */ #ifdef USE_SHM diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index b6347932663..87f4dad3dff 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -23,6 +23,7 @@ #endif #include #include +#include #ifdef HAVE_STRUCT_SOCKADDR_STORAGE @@ -211,10 +212,10 @@ typedef struct CancelRequestPacket #define PQ_SHM_BUFFER_SIZE (8 *1024) #define PQ_SHM_MAX_CONNECTION (512) -#define PQ_SHM_STATUS_EMPTY (0); -#define PQ_SHM_STATUS_CONNECTING (1); -#define PQ_SHM_STATUS_ESTABLISHED (2); -#define PQ_SHM_STATUS_CLOSED (3); +#define PQ_SHM_STATUS_EMPTY (0) +#define PQ_SHM_STATUS_CONNECTING (1) +#define PQ_SHM_STATUS_ESTABLISHED (2) +#define PQ_SHM_STATUS_CLOSED (3) typedef struct { int shm_file_fd; @@ -248,6 +249,28 @@ typedef struct { #define PQShmAreaSize \ (offsetof(PQShmArea, connections) + sizeof(PQShmConnection) * PQ_SHM_MAX_CONNECTION) -#define PQShmPath "/tmp/.s.PGSQL.shm" +#define PQShmPath "/tmp/pg_shm.bin" + + +static inline int PQShmConnection_lock(PQShmConnection* conn) { + int ret = sem_wait(&conn->sem); + Assert(ret == 0); + return ret; +} + +static inline int PQShmConnection_unlock(PQShmConnection* conn) { + int ret = sem_post(&conn->sem); + Assert(ret == 0); + return ret; +} + +static inline uint32 pq_get_current_timestamp() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint32)tv.tv_sec; +} + +/* 发起连接后,服务端没有影响,超时 */ +#define SHM_CONNECT_TIMEOUT_SEC (5) #endif /* PQCOMM_H */ diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index 1b292d2cf22..a4d17cb1499 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -32,7 +32,7 @@ LIBS := $(LIBS:-lpgport=) # We can't use Makefile variables here because the MSVC build system scrapes # OBJS from this file. OBJS= fe-auth.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \ - fe-protocol2.o fe-protocol3.o pqexpbuffer.o fe-secure.o \ + fe-protocol2.o fe-protocol3.o pqexpbuffer.o fe-secure.o fe-shm.o \ libpq-events.o # libpgport C files we always use OBJS += chklocale.o inet_net_ntop.o noblock.o pgstrcasecmp.o pqsignal.o \ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 7d16c3b7243..caad4c2314e 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -297,6 +297,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Require-Peer", "", 10, offsetof(struct pg_conn, requirepeer)}, + {"shm_path", NULL, NULL, NULL, + "Shared-memory-Filepath", "", 64, + offsetof(struct pg_conn, shm_path)}, + #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) /* Kerberos and GSSAPI authentication support specifying the service name */ {"krbsrvname", "PGKRBSRVNAME", PG_KRB_SRVNAM, NULL, @@ -1456,6 +1460,12 @@ connectDBStart(PGconn *conn) conn->inStart = conn->inCursor = conn->inEnd = 0; conn->outCount = 0; + if (conn->shm_path) { + conn->status = CONNECTION_SHM_START_CONNECT; + if (PQconnectPoll(conn) == PGRES_POLLING_WRITING) + return 1; + } + /* * Determine the parameters to pass to pg_getaddrinfo_all. */ @@ -1729,6 +1739,9 @@ PQconnectPoll(PGconn *conn) case CONNECTION_SSL_STARTUP: case CONNECTION_NEEDED: break; + + case CONNECTION_SHM_START_CONNECT: + break; default: appendPQExpBufferStr(&conn->errorMessage, @@ -2698,6 +2711,56 @@ keep_going: /* We will come back to here until there is /* We are open for business! */ conn->status = CONNECTION_OK; return PGRES_POLLING_OK; + + case CONNECTION_SHM_START_CONNECT: + conn->shm_conn = fe_pq_connect_to_shm(conn->shm_path); + if (!conn->shm_conn) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot not connect to shared memory: %s"), conn->shm_path); + goto error_return; + } + conn->status = CONNECTION_SHM_WAITING_ESTABLISHED; + goto keep_going; + + case CONNECTION_SHM_WAITING_ESTABLISHED: + { + uint32 current_timestamp = pq_get_current_timestamp(); + bool connect_failed = false; + bool connect_timeout = false; + bool connect_success = false; + + Assert(conn->shm_conn); + + PQShmConnection_lock(conn->shm_conn); + if (conn->shm_conn->last_active_tm + SHM_CONNECT_TIMEOUT_SEC < current_timestamp) { + connect_timeout = true; + } + if (conn->shm_conn->status == PQ_SHM_STATUS_ESTABLISHED) { + connect_success = true; + } else if (conn->shm_conn->status == PQ_SHM_STATUS_CONNECTING) { + /*keep going*/ + } else { + connect_failed = false; + } + PQShmConnection_unlock(conn->shm_conn); + + if (connect_success) { + conn->status = CONNECTION_SHM_WAITING_ESTABLISHED; + goto keep_going; + } else if (connect_timeout) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("connect to shared memory timeout")); + goto error_return; + } else if (connect_failed) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("connect to shared memory failed")); + goto error_return; + } else { + usleep(1000); /*0.1s*/ + goto keep_going; + } + break; + } default: appendPQExpBuffer(&conn->errorMessage, diff --git a/src/interfaces/libpq/fe-shm.c b/src/interfaces/libpq/fe-shm.c new file mode 100644 index 00000000000..4b6b3abac04 --- /dev/null +++ b/src/interfaces/libpq/fe-shm.c @@ -0,0 +1,63 @@ +#include "postgres_fe.h" + +#include "libpq-fe.h" +#include "fe-auth.h" +#include "libpq-int.h" +#include "libpq/pqcomm.h" +#include +#include +#include +#include +#include + + +static PQShmArea* libpq_shm_area = NULL; + +static void must_connect_to_shm(const char* path) { + int fd; + + if (libpq_shm_area) { + return; + } + + fd = open(PQShmPath, O_RDWR); + if (fd < 0) { + fprintf(stderr, "cant not open shm file %s\n", strerror(errno)); + exit(0); + } + + libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE | PROT_READ, MAP_SHARED, + fd, 0); + + if (libpq_shm_area == MAP_FAILED) { + fprintf(stderr, "cant not mmap shm file %s\n", strerror(errno)); + exit(0); + } +} + +PQShmConnection* fe_pq_connect_to_shm(const char* path) +{ + int i; + PQShmConnection* free_conn = NULL; + + must_connect_to_shm(path); + + for (i = 0; i < libpq_shm_area->manifest.max_connections; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + PQShmConnection_lock(conn); + if (conn->status == PQ_SHM_STATUS_EMPTY) { /* 空闲连接 */ + free_conn = conn; + + conn->last_active_tm = pq_get_current_timestamp(); + conn->status = PQ_SHM_STATUS_CONNECTING; /*向server发起连接*/ + } + + PQShmConnection_unlock(conn); + + if (free_conn) { + break; + } + } + + return free_conn; +} \ No newline at end of file diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 9ca0756c4bf..1e92e113abb 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -27,6 +27,7 @@ extern "C" * such as Oid. */ #include "postgres_ext.h" +#include "libpq/pqcomm.h" /* * Option flags for PQcopyResult @@ -62,7 +63,10 @@ typedef enum * backend startup. */ CONNECTION_SETENV, /* Negotiating environment. */ CONNECTION_SSL_STARTUP, /* Negotiating SSL. */ - CONNECTION_NEEDED /* Internal state: connect() needed */ + CONNECTION_NEEDED, /* Internal state: connect() needed */ + + CONNECTION_SHM_START_CONNECT, /* start connect to shm */ + CONNECTION_SHM_WAITING_ESTABLISHED, /* 等待服务端accept该连接 */ } ConnStatusType; typedef enum @@ -600,6 +604,8 @@ extern int pg_char_to_encoding(const char *name); extern const char *pg_encoding_to_char(int encoding); extern int pg_valid_server_encoding_id(int encoding); +extern PQShmConnection* fe_pq_connect_to_shm(const char* path); + #ifdef __cplusplus } #endif diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9e4e62578f6..6edb395c174 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -330,11 +330,14 @@ struct pg_conn char *sslrootcert; /* root certificate filename */ char *sslcrl; /* certificate revocation list filename */ char *requirepeer; /* required peer credentials for local sockets */ + char *shm_path; /* shared memory file path */ #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) char *krbsrvname; /* Kerberos service name */ #endif + PQShmConnection* shm_conn; /* connection of shared memory */ + /* Optional file to write trace info to */ FILE *Pfdebug; -- Gitee From 7268139e72040210bc298bc1a537dc17ea3f6668 Mon Sep 17 00:00:00 2001 From: suixinpr Date: Tue, 25 Oct 2022 14:20:20 +0800 Subject: [PATCH 06/12] =?UTF-8?q?=E7=BC=96=E8=AF=91=E5=8A=A0=E5=85=A5be-se?= =?UTF-8?q?cure-shm.c=EF=BC=8C=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/Makefile | 2 +- src/backend/libpq/be-secure-shm.c | 43 +++++++++++++++++-------------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile index 09410c4bb19..9dca5044fc0 100644 --- a/src/backend/libpq/Makefile +++ b/src/backend/libpq/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global # be-fsstubs is here for historical reasons, probably belongs elsewhere OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \ - pqformat.o pqmq.o pqsignal.o + pqformat.o pqmq.o pqsignal.o be-secure-shm.o ifeq ($(with_openssl),yes) OBJS += be-secure-openssl.o diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c index c79cca7e51f..c9219358196 100644 --- a/src/backend/libpq/be-secure-shm.c +++ b/src/backend/libpq/be-secure-shm.c @@ -1,58 +1,62 @@ #include "postgres.h" #include "libpq/libpq.h" -#include "libpg/pgcomm.h" +#include "libpq/pqcomm.h" -ssize_t bb_putbyte(byte_buffer *bb, char* ptr, size_t len) +static ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len); +static ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len); +static void bb_clear(PQShmByteBuffer *bb); + +static ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len) { size_t w_len; // 可写空间满了 - if (bb->write_idx >= PQ_SHM_BUFFER_SIZE) { + if (bb->write_index >= PQ_SHM_BUFFER_SIZE) { return -1; } // 计算写入字节数 - w_len = PQ_SHM_BUFFER_SIZE - bb->write_idx; + w_len = PQ_SHM_BUFFER_SIZE - bb->write_index; if (w_len > len) { w_len = len; } // 写入 - memcpy(bb->buffer + bb->write_idx, ptr, w_len); - bb->write_idx += w_len; + memcpy(bb->buffer + bb->write_index, ptr, w_len); + bb->write_index += w_len; return w_len; } // ptr allocated by caller -ssize_t bb_getbyte(byte_buffer *bb, char* ptr, size_t len) +static ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len) { size_t r_len; // 没有数据 - if (bb->write_idx == 0) { + if (bb->write_index == 0) { return 0; } // 计算读取字节数 - r_len = bb->write_idx - bb->read_idx; + r_len = bb->write_index - bb->read_index; if (r_len > len) { r_len = len; } // 读取 memcpy(ptr, bb->buffer, r_len); - bb->read_idx += w_len; - if (bb->read_idx == bb->write_idx) { - bb_clear(); + bb->read_index += r_len; + if (bb->read_index == bb->write_index) { + bb_clear(bb); } return r_len; } -void bb_clear() { - bb->read_idx = 0; - bb->write_idx = 0; +static void bb_clear(PQShmByteBuffer *bb) { + bb->read_index = 0; + bb->write_index = 0; } void shm_open_client() @@ -75,9 +79,9 @@ ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) return -1; } - n = bb_getbyte(conn->client_to_server, ptr, len); + n = bb_getbyte(&conn->client_to_server, ptr, len); // conn->last_active_tm = - sem_post(conn->sem); // 释放锁 + sem_post(&conn->sem); // 释放锁 return n; } @@ -87,15 +91,14 @@ ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) ssize_t n; Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); -rloop: // 申请锁 if (-1 == sem_trywait(&conn->sem)) { return -1; } - n = bb_putbyte(conn->server_to_client, ptr, len); + n = bb_putbyte(&conn->server_to_client, ptr, len); // conn->last_active_tm = - sem_post(conn->sem); // 释放锁 + sem_post(&conn->sem); // 释放锁 return n; } \ No newline at end of file -- Gitee From 0436a604232d6664772ec7626df0360a1f3572e7 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 02:59:40 -0400 Subject: [PATCH 07/12] connect/accept ok --- src/backend/libpq/be-secure.c | 51 +++++++++++++++++++++++++++++ src/backend/postmaster/postmaster.c | 14 ++++++++ src/include/libpq/libpq-be.h | 1 + src/include/libpq/pqcomm.h | 2 ++ src/interfaces/libpq/fe-connect.c | 2 ++ 5 files changed, 70 insertions(+) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 41f85bce26b..36e3179566a 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -396,4 +396,55 @@ void be_pq_init_shm_area(void) conn->status = PQ_SHM_STATUS_EMPTY; } fprintf(stderr, "init shm ok\n"); +} + +int be_accept_shm_connections(PQShmConnection** conn_array, int array_size) +{ + int i; + int empty_connections = 0; + int established_connections = 0; + int connnecting_connections = 0; + int current_timestamp = pq_get_current_timestamp(); + + Assert(libpq_shm_area); + + for (i = 0; i < libpq_shm_area->manifest.max_connections; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + PQShmConnection_lock(conn); + switch (conn->status) + { + case PQ_SHM_STATUS_EMPTY: + empty_connections++; + break; + case PQ_SHM_STATUS_CONNECTING: + if (connnecting_connections < array_size) { + /*accpt 这个连接 */ + conn_array[connnecting_connections] = conn; + conn->status = PQ_SHM_STATUS_ESTABLISHED; + connnecting_connections++; + } + break; + case PQ_SHM_STATUS_ESTABLISHED: + /* 超时关闭 */ + if (conn->last_active_tm + SHM_ACTIVE_TIMEOUT_SEC < current_timestamp) { + conn->status = PQ_SHM_STATUS_CLOSED; + } + break; + case PQ_SHM_STATUS_CLOSED: + /* 回收重用 */ + if (conn->last_active_tm + SHM_CLOSE_TIMEOUT_SEC < current_timestamp) { + conn->status = PQ_SHM_STATUS_EMPTY; + } + break; + default: + Assert(false); + break; + } + PQShmConnection_unlock(conn); + } + + libpq_shm_area->manifest.empty_connections = empty_connections; + libpq_shm_area->manifest.established_connections = established_connections; + + return connnecting_connections; } \ No newline at end of file diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 02ffde8cd74..cc4ffdc8f8d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1549,6 +1549,7 @@ DetermineSleepTime(struct timeval * timeout) if (Shutdown > NoShutdown || (!StartWorkerNeeded && !HaveCrashedWorker)) { + bool use_shm = true; /*TODO 通过guc控制是否开启内存内存*/ if (AbortStartTime != 0) { /* time left to abort; clamp to 0 in case it already expired */ @@ -1562,6 +1563,11 @@ DetermineSleepTime(struct timeval * timeout) timeout->tv_sec = 60; timeout->tv_usec = 0; } + + if (use_shm) { + timeout->tv_sec = 1; + timeout->tv_usec = 0; + } return; } @@ -1652,6 +1658,9 @@ ServerLoop(void) fd_set rmask; int selres; time_t now; + PQShmConnection *pending_shm_connections[10]; + int number_pending_shm_connections; + int index; /* * Wait for a connection request to arrive. @@ -1689,6 +1698,11 @@ ServerLoop(void) PG_SETMASK(&BlockSig); } + number_pending_shm_connections = be_accept_shm_connections(pending_shm_connections, 10); + for (index = 0; index < number_pending_shm_connections; index++) { + elog(LOG, "todo: new shm connections"); + } + /* Now check the select() result */ if (selres < 0) { diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index f8384d90739..48e8e0e222a 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -229,6 +229,7 @@ extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); extern void be_pq_init_shm_area(void); +extern int be_accept_shm_connections(PQShmConnection** conn_array, int array_size); #endif /* LIBPQ_BE_H */ extern void shm_open_client(); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 87f4dad3dff..d4ef501ecbc 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -272,5 +272,7 @@ static inline uint32 pq_get_current_timestamp() { /* 发起连接后,服务端没有影响,超时 */ #define SHM_CONNECT_TIMEOUT_SEC (5) +#define SHM_ACTIVE_TIMEOUT_SEC (60) /* 超过配置时间,把连接状态标记为closed */ +#define SHM_CLOSE_TIMEOUT_SEC (2 * SHM_ACTIVE_TIMEOUT_SEC) /*closed一段时间后,连接重新记录为empty*/ #endif /* PQCOMM_H */ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index caad4c2314e..b73773a25b9 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2746,6 +2746,8 @@ keep_going: /* We will come back to here until there is if (connect_success) { conn->status = CONNECTION_SHM_WAITING_ESTABLISHED; + fprintf(stderr, "todo: connected to server success\n"); + exit(0); goto keep_going; } else if (connect_timeout) { appendPQExpBuffer(&conn->errorMessage, -- Gitee From 728248f275521a559685dbb65d4ff95942e090b6 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 03:17:45 -0400 Subject: [PATCH 08/12] =?UTF-8?q?=E7=BB=86=E8=8A=82=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/postmaster/postmaster.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index cc4ffdc8f8d..38f81e3ce54 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1636,6 +1636,18 @@ DetermineSleepTime(struct timeval * timeout) } } +static void handle_shm_connections() { + PQShmConnection *pending_shm_connections[10]; + int number_conn; + int i; + + number_conn = be_accept_shm_connections(pending_shm_connections, 10); + for (i = 0; i < number_conn; i++) { + PQShmConnection* conn = pending_shm_connections[i]; + elog(LOG, "todo: new shm connections"); + } +} + /* * Main idle loop of postmaster * @@ -1658,9 +1670,6 @@ ServerLoop(void) fd_set rmask; int selres; time_t now; - PQShmConnection *pending_shm_connections[10]; - int number_pending_shm_connections; - int index; /* * Wait for a connection request to arrive. @@ -1698,10 +1707,7 @@ ServerLoop(void) PG_SETMASK(&BlockSig); } - number_pending_shm_connections = be_accept_shm_connections(pending_shm_connections, 10); - for (index = 0; index < number_pending_shm_connections; index++) { - elog(LOG, "todo: new shm connections"); - } + handle_shm_connections(); /* Now check the select() result */ if (selres < 0) -- Gitee From ce34adff6dabc846675390b687734ca541004fc6 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 03:57:37 -0400 Subject: [PATCH 09/12] fork process ok --- src/backend/postmaster/postmaster.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 38f81e3ce54..4072327d139 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1644,7 +1644,14 @@ static void handle_shm_connections() { number_conn = be_accept_shm_connections(pending_shm_connections, 10); for (i = 0; i < number_conn; i++) { PQShmConnection* conn = pending_shm_connections[i]; - elog(LOG, "todo: new shm connections"); + Port* port = malloc(sizeof(Port)); + Assert(port); + + memset(port, 0, sizeof(Port)); + port->shm_conn = conn; + + BackendStartup(port); + ConnFree(port); } } -- Gitee From c66f45059fdfb00fc5022d6285e12ccabb9de952 Mon Sep 17 00:00:00 2001 From: suixinpr Date: Tue, 25 Oct 2022 17:01:32 +0800 Subject: [PATCH 10/12] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BB=93=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E5=AE=A2=E6=88=B7=E7=AB=AF=E5=8A=A0=E5=85=A5=E5=85=B1?= =?UTF-8?q?=E4=BA=AB=E5=86=85=E5=AD=98write=E5=92=8Cread=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure-shm.c | 123 +++++++++++++----------------- src/backend/libpq/be-secure.c | 40 +--------- src/include/libpq/libpq-be.h | 8 +- src/include/libpq/pqcomm.h | 47 ++++++++++++ src/interfaces/libpq/fe-secure.c | 8 ++ src/interfaces/libpq/fe-shm.c | 82 ++++++++++++++++++++ src/interfaces/libpq/libpq-fe.h | 2 + 7 files changed, 196 insertions(+), 114 deletions(-) diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c index c9219358196..57782d5ee32 100644 --- a/src/backend/libpq/be-secure-shm.c +++ b/src/backend/libpq/be-secure-shm.c @@ -1,75 +1,10 @@ #include "postgres.h" -#include "libpq/libpq.h" #include "libpq/pqcomm.h" +#include "libpq/libpq-be.h" -static ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len); -static ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len); -static void bb_clear(PQShmByteBuffer *bb); +#define SHMTRYLIMIT 10 -static ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len) -{ - size_t w_len; - - // 可写空间满了 - if (bb->write_index >= PQ_SHM_BUFFER_SIZE) { - return -1; - } - - // 计算写入字节数 - w_len = PQ_SHM_BUFFER_SIZE - bb->write_index; - if (w_len > len) { - w_len = len; - } - - // 写入 - memcpy(bb->buffer + bb->write_index, ptr, w_len); - bb->write_index += w_len; - - return w_len; -} - -// ptr allocated by caller -static ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len) -{ - size_t r_len; - - // 没有数据 - if (bb->write_index == 0) { - return 0; - } - - // 计算读取字节数 - r_len = bb->write_index - bb->read_index; - if (r_len > len) { - r_len = len; - } - - // 读取 - memcpy(ptr, bb->buffer, r_len); - bb->read_index += r_len; - if (bb->read_index == bb->write_index) { - bb_clear(bb); - } - - return r_len; -} - -static void bb_clear(PQShmByteBuffer *bb) { - bb->read_index = 0; - bb->write_index = 0; -} - -void shm_open_client() -{ - return; -} - -void shm_connect() -{ - return; -} - -ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) +static ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) { ssize_t n; Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); @@ -80,13 +15,13 @@ ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) } n = bb_getbyte(&conn->client_to_server, ptr, len); - // conn->last_active_tm = + conn->last_active_tm = pq_get_current_timestamp(); sem_post(&conn->sem); // 释放锁 return n; } -ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) +static ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) { ssize_t n; Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); @@ -97,8 +32,54 @@ ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) } n = bb_putbyte(&conn->server_to_client, ptr, len); - // conn->last_active_tm = + conn->last_active_tm = pq_get_current_timestamp(); sem_post(&conn->sem); // 释放锁 + return n; +} + +ssize_t be_shm_read(Port *port, char* ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection* shm_conn = port->shm_conn; + +rloop: + n = shm_read(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} + +ssize_t be_shm_write(Port *port, char* ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection* shm_conn = port->shm_conn; + +rloop: + n = shm_write(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } return n; } \ No newline at end of file diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 36e3179566a..e65e63f1679 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -130,25 +130,7 @@ retry: if (false) { -#define TRYLIMIT 10 - int trytime; - PQShmConnection* conn; - -rloop: - conn = port->shm_conn; - n = shm_read(conn, ptr, len); - if (n == -1) - { - if (trytime++ >= TRYLIMIT) { - /* 达到最大尝试次数 */ - conn->status = PQ_SHM_STATUS_CLOSED; - errno = ECONNRESET; - } else { - /* 重新申请锁 */ - pg_usleep(1000000L); /* 1000 ms */ - goto rloop; - } - } + n = be_shm_read(port, ptr, len); } else #ifdef USE_SSL @@ -256,25 +238,7 @@ retry: if (false) { -#define TRYLIMIT 10 - int trytime; - PQShmConnection* conn; - -rloop: - conn = port->shm_conn; - n = shm_write(conn, ptr, len); - if (n == -1) - { - if (trytime++ >= TRYLIMIT) { - /* 达到最大尝试次数 */ - conn->status = PQ_SHM_STATUS_CLOSED; - errno = ECONNRESET; - } else { - /* 重新申请锁 */ - pg_usleep(1000000L); /* 1000 ms */ - goto rloop; - } - } + n = be_shm_write(port, ptr, len); } else #ifdef USE_SSL diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 48e8e0e222a..bdf883b72ee 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -228,11 +228,9 @@ extern int pq_setkeepalivesidle(int idle, Port *port); extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); +/* 共享内存 */ extern void be_pq_init_shm_area(void); extern int be_accept_shm_connections(PQShmConnection** conn_array, int array_size); +extern ssize_t be_shm_read(Port *port, char* ptr, size_t len); +extern ssize_t be_shm_write(Port *port, char* ptr, size_t len); #endif /* LIBPQ_BE_H */ - -extern void shm_open_client(); -extern void shm_connect(); -extern ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len); -extern ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len); diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index d4ef501ecbc..a8d43230f56 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -275,4 +275,51 @@ static inline uint32 pq_get_current_timestamp() { #define SHM_ACTIVE_TIMEOUT_SEC (60) /* 超过配置时间,把连接状态标记为closed */ #define SHM_CLOSE_TIMEOUT_SEC (2 * SHM_ACTIVE_TIMEOUT_SEC) /*closed一段时间后,连接重新记录为empty*/ +static inline ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len) { + size_t w_len; + + // 可写空间满了 + if (bb->write_index >= PQ_SHM_BUFFER_SIZE) { + return -1; + } + + // 计算写入字节数 + w_len = PQ_SHM_BUFFER_SIZE - bb->write_index; + if (w_len > len) { + w_len = len; + } + + // 写入 + memcpy(bb->buffer + bb->write_index, ptr, w_len); + bb->write_index += w_len; + + return w_len; +} + +// ptr allocated by caller +static inline ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len) +{ + size_t r_len; + + // 没有数据 + if (bb->write_index == 0) { + return 0; + } + + // 计算读取字节数 + r_len = bb->write_index - bb->read_index; + if (r_len > len) { + r_len = len; + } + + // 读取 + memcpy(ptr, bb->buffer, r_len); + bb->read_index += r_len; + if (bb->read_index == bb->write_index) { + bb->read_index = bb->write_index = 0; + } + + return r_len; +} + #endif /* PQCOMM_H */ diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c index 94e47a50ed2..43ae237c219 100644 --- a/src/interfaces/libpq/fe-secure.c +++ b/src/interfaces/libpq/fe-secure.c @@ -206,6 +206,10 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len) { ssize_t n; + if (false) + { + n = pgshm_read(conn, ptr, len); + } else #ifdef USE_SSL if (conn->ssl_in_use) { @@ -283,6 +287,10 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len) { ssize_t n; + if (false) + { + n = pgshm_write(conn, ptr, len); + } else #ifdef USE_SSL if (conn->ssl_in_use) { diff --git a/src/interfaces/libpq/fe-shm.c b/src/interfaces/libpq/fe-shm.c index 4b6b3abac04..065514de85c 100644 --- a/src/interfaces/libpq/fe-shm.c +++ b/src/interfaces/libpq/fe-shm.c @@ -10,6 +10,7 @@ #include #include +#define SHMTRYLIMIT 10 static PQShmArea* libpq_shm_area = NULL; @@ -60,4 +61,85 @@ PQShmConnection* fe_pq_connect_to_shm(const char* path) } return free_conn; +} + +static ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_getbyte(&conn->server_to_client, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +static ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_putbyte(&conn->server_to_client, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +ssize_t pgshm_read(PGconn* conn, void *ptr, size_t len) +{ + ssize_t n; + int trytime; + int result_errno = 0; + PQShmConnection *shm_conn = conn->shm_conn; + +rloop: + n = shm_read(shm_conn, ptr, len); + if (n < 0) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} + +ssize_t pgshm_write(PGconn* conn, void *ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection *shm_conn = conn->shm_conn; + +rloop: + n = shm_write(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; } \ No newline at end of file diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 1e92e113abb..6f748a34d69 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -605,6 +605,8 @@ extern const char *pg_encoding_to_char(int encoding); extern int pg_valid_server_encoding_id(int encoding); extern PQShmConnection* fe_pq_connect_to_shm(const char* path); +extern ssize_t pgshm_read(PGconn* conn, void *ptr, size_t len); +extern ssize_t pgshm_write(PGconn* conn, void *ptr, size_t len); #ifdef __cplusplus } -- Gitee From 3a97a554ff377ee6586b5e563b65831e45bb5049 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 06:39:24 -0400 Subject: [PATCH 11/12] =?UTF-8?q?psql=E5=92=8Cpostgres=E5=9D=87=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E8=AF=BB=E5=86=99=E5=85=B1=E4=BA=AB=E5=86=85=E5=AD=98?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/libpq/be-secure.c | 4 ++-- src/include/libpq/pqcomm.h | 2 +- src/interfaces/libpq/fe-connect.c | 8 +++----- src/interfaces/libpq/fe-misc.c | 5 ++++- src/interfaces/libpq/fe-secure.c | 4 ++-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index e65e63f1679..fc9378df422 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -128,7 +128,7 @@ secure_read(Port *port, void *ptr, size_t len) retry: waitfor = 0; - if (false) + if (port->shm_conn) { n = be_shm_read(port, ptr, len); } @@ -236,7 +236,7 @@ secure_write(Port *port, void *ptr, size_t len) retry: waitfor = 0; - if (false) + if (port->shm_conn) { n = be_shm_write(port, ptr, len); } diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index a8d43230f56..d22c3f8b3c8 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -271,7 +271,7 @@ static inline uint32 pq_get_current_timestamp() { } /* 发起连接后,服务端没有影响,超时 */ -#define SHM_CONNECT_TIMEOUT_SEC (5) +#define SHM_CONNECT_TIMEOUT_SEC (60) #define SHM_ACTIVE_TIMEOUT_SEC (60) /* 超过配置时间,把连接状态标记为closed */ #define SHM_CLOSE_TIMEOUT_SEC (2 * SHM_ACTIVE_TIMEOUT_SEC) /*closed一段时间后,连接重新记录为empty*/ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index b73773a25b9..07f1c7e2dc0 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2109,7 +2109,7 @@ keep_going: /* We will come back to here until there is /* Don't bother requesting SSL over a Unix socket */ conn->allow_ssl_try = false; } - if (conn->allow_ssl_try && !conn->wait_ssl_try && + if (!conn->shm_conn && conn->allow_ssl_try && !conn->wait_ssl_try && !conn->ssl_in_use) { ProtocolVersion pv; @@ -2745,10 +2745,8 @@ keep_going: /* We will come back to here until there is PQShmConnection_unlock(conn->shm_conn); if (connect_success) { - conn->status = CONNECTION_SHM_WAITING_ESTABLISHED; - fprintf(stderr, "todo: connected to server success\n"); - exit(0); - goto keep_going; + conn->status = CONNECTION_MADE; + return PGRES_POLLING_WRITING; /* connected to shm */ } else if (connect_timeout) { appendPQExpBuffer(&conn->errorMessage, libpq_gettext("connect to shared memory timeout")); diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 32da8ca4616..c9ae55ad2ca 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -837,7 +837,7 @@ pqSendSome(PGconn *conn, int len) int remaining = conn->outCount; int result = 0; - if (conn->sock == PGINVALID_SOCKET) + if (!conn->shm_conn && conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); @@ -1055,6 +1055,9 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) if (!conn) return -1; + if (conn->shm_conn) { + return 1; /* short-circuit the select */ + } if (conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c index 43ae237c219..4b8811ae6cc 100644 --- a/src/interfaces/libpq/fe-secure.c +++ b/src/interfaces/libpq/fe-secure.c @@ -206,7 +206,7 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len) { ssize_t n; - if (false) + if (conn->shm_conn) { n = pgshm_read(conn, ptr, len); } else @@ -287,7 +287,7 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len) { ssize_t n; - if (false) + if (conn->shm_conn) { n = pgshm_write(conn, ptr, len); } else -- Gitee From d0038ebb77f64e998f7bd092a8567e21c49b9e14 Mon Sep 17 00:00:00 2001 From: ljy Date: Tue, 25 Oct 2022 06:43:18 -0400 Subject: [PATCH 12/12] fix pqReadData --- src/interfaces/libpq/fe-misc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index c9ae55ad2ca..6645d5e6208 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -636,7 +636,7 @@ pqReadData(PGconn *conn) int someread = 0; int nread; - if (conn->sock == PGINVALID_SOCKET) + if (!conn->shm_conn && conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); -- Gitee