diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile index 09410c4bb19d3e36c79f22d5e836a2ecc32933e3..9dca5044fc023841eea3a023d1bf8fc65808ee3a 100644 --- a/src/backend/libpq/Makefile +++ b/src/backend/libpq/Makefile @@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global # be-fsstubs is here for historical reasons, probably belongs elsewhere OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \ - pqformat.o pqmq.o pqsignal.o + pqformat.o pqmq.o pqsignal.o be-secure-shm.o ifeq ($(with_openssl),yes) OBJS += be-secure-openssl.o diff --git a/src/backend/libpq/be-secure-shm.c b/src/backend/libpq/be-secure-shm.c new file mode 100644 index 0000000000000000000000000000000000000000..57782d5ee325c04a64e457e8037bc81779cc7450 --- /dev/null +++ b/src/backend/libpq/be-secure-shm.c @@ -0,0 +1,85 @@ +#include "postgres.h" +#include "libpq/pqcomm.h" +#include "libpq/libpq-be.h" + +#define SHMTRYLIMIT 10 + +static ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_getbyte(&conn->client_to_server, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +static ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_putbyte(&conn->server_to_client, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +ssize_t be_shm_read(Port *port, char* ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection* shm_conn = port->shm_conn; + +rloop: + n = shm_read(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} + +ssize_t be_shm_write(Port *port, char* ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection* shm_conn = port->shm_conn; + +rloop: + n = shm_write(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} \ No newline at end of file diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index cdd07d577b08e381d92225bbdfe9c033fb302b29..fc9378df4220759ff0079e2064f436201e308574 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -37,7 +37,9 @@ #include "utils/memutils.h" #include "storage/ipc.h" #include "storage/proc.h" - +#include "libpq/libpq-be.h" +#include +#include char *ssl_cert_file; char *ssl_key_file; @@ -124,8 +126,14 @@ secure_read(Port *port, void *ptr, size_t len) int waitfor; retry: -#ifdef USE_SSL waitfor = 0; + + if (port->shm_conn) + { + n = be_shm_read(port, ptr, len); + } + else +#ifdef USE_SSL if (port->ssl_in_use) { n = be_tls_read(port, ptr, len, &waitfor); @@ -227,6 +235,12 @@ secure_write(Port *port, void *ptr, size_t len) retry: waitfor = 0; + + if (port->shm_conn) + { + n = be_shm_write(port, ptr, len); + } + else #ifdef USE_SSL if (port->ssl_in_use) { @@ -295,3 +309,106 @@ secure_raw_write(Port *port, const void *ptr, size_t len) return n; } + +static PQShmArea* libpq_shm_area = NULL; + +static void must_create_shm_file() { + FILE* fp = fopen(PQShmPath, "w+"); /* The stream is positioned at the beginning of the file */ + if (!fp) { + fprintf(stderr, "cant open shm file %s\n", strerror(errno)); + exit(0); + } + + fseek(fp, PQShmAreaSize, SEEK_CUR); + fwrite("a", 1, 1, fp); + fclose(fp); +} + +void be_pq_init_shm_area(void) +{ + int fd; + int i; + + must_create_shm_file(); + + Assert(!libpq_shm_area); + + fd = open(PQShmPath, O_RDWR); + if (fd < 0) { + fprintf(stderr, "open shm file error %s\n", strerror(errno)); + exit(0); + } + libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE | PROT_READ, MAP_SHARED, + fd, 0); + if (libpq_shm_area == MAP_FAILED) { + fprintf(stderr, "mmap %s\n", strerror(errno)); + exit(0); + } + + memset((void*)libpq_shm_area, 0, PQShmAreaSize); + + libpq_shm_area->manifest.shm_file_fd = fd; + libpq_shm_area->manifest.max_connections = PQ_SHM_MAX_CONNECTION; + + for (i = 0; i < PQ_SHM_MAX_CONNECTION; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + + if (sem_init(&conn->sem, 1, 1) < 0) { + elog(FATAL, "sem_init error %s\n", strerror(errno)); + exit(0); + } + conn->status = PQ_SHM_STATUS_EMPTY; + } + fprintf(stderr, "init shm ok\n"); +} + +int be_accept_shm_connections(PQShmConnection** conn_array, int array_size) +{ + int i; + int empty_connections = 0; + int established_connections = 0; + int connnecting_connections = 0; + int current_timestamp = pq_get_current_timestamp(); + + Assert(libpq_shm_area); + + for (i = 0; i < libpq_shm_area->manifest.max_connections; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + PQShmConnection_lock(conn); + switch (conn->status) + { + case PQ_SHM_STATUS_EMPTY: + empty_connections++; + break; + case PQ_SHM_STATUS_CONNECTING: + if (connnecting_connections < array_size) { + /*accpt 这个连接 */ + conn_array[connnecting_connections] = conn; + conn->status = PQ_SHM_STATUS_ESTABLISHED; + connnecting_connections++; + } + break; + case PQ_SHM_STATUS_ESTABLISHED: + /* 超时关闭 */ + if (conn->last_active_tm + SHM_ACTIVE_TIMEOUT_SEC < current_timestamp) { + conn->status = PQ_SHM_STATUS_CLOSED; + } + break; + case PQ_SHM_STATUS_CLOSED: + /* 回收重用 */ + if (conn->last_active_tm + SHM_CLOSE_TIMEOUT_SEC < current_timestamp) { + conn->status = PQ_SHM_STATUS_EMPTY; + } + break; + default: + Assert(false); + break; + } + PQShmConnection_unlock(conn); + } + + libpq_shm_area->manifest.empty_connections = empty_connections; + libpq_shm_area->manifest.established_connections = established_connections; + + return connnecting_connections; +} \ No newline at end of file diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index bbef1a119dacd69ad769e7645f15a56489ccc75b..4072327d1396b49c0b0b9662872b470425e7909d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -1326,6 +1326,8 @@ PostmasterMain(int argc, char *argv[]) /* Some workers may be scheduled to start now */ maybe_start_bgworkers(); + be_pq_init_shm_area(); + status = ServerLoop(); /* @@ -1547,6 +1549,7 @@ DetermineSleepTime(struct timeval * timeout) if (Shutdown > NoShutdown || (!StartWorkerNeeded && !HaveCrashedWorker)) { + bool use_shm = true; /*TODO 通过guc控制是否开启内存内存*/ if (AbortStartTime != 0) { /* time left to abort; clamp to 0 in case it already expired */ @@ -1560,6 +1563,11 @@ DetermineSleepTime(struct timeval * timeout) timeout->tv_sec = 60; timeout->tv_usec = 0; } + + if (use_shm) { + timeout->tv_sec = 1; + timeout->tv_usec = 0; + } return; } @@ -1628,6 +1636,25 @@ DetermineSleepTime(struct timeval * timeout) } } +static void handle_shm_connections() { + PQShmConnection *pending_shm_connections[10]; + int number_conn; + int i; + + number_conn = be_accept_shm_connections(pending_shm_connections, 10); + for (i = 0; i < number_conn; i++) { + PQShmConnection* conn = pending_shm_connections[i]; + Port* port = malloc(sizeof(Port)); + Assert(port); + + memset(port, 0, sizeof(Port)); + port->shm_conn = conn; + + BackendStartup(port); + ConnFree(port); + } +} + /* * Main idle loop of postmaster * @@ -1687,6 +1714,8 @@ ServerLoop(void) PG_SETMASK(&BlockSig); } + handle_shm_connections(); + /* Now check the select() result */ if (selres < 0) { diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index ea6cb840815d04ffc2233184f5f23bae05bf31d2..4b7c22442e98d260a50f4d1a6e90eb6b6b88f976 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -140,6 +140,7 @@ usage(unsigned short int pager) fprintf(output, _(" -U, --username=USERNAME database user name (default: \"%s\")\n"), env); fprintf(output, _(" -w, --no-password never prompt for password\n")); fprintf(output, _(" -W, --password force password prompt (should happen automatically)\n")); + fprintf(output, _(" -m, --shm use shard memory\n")); fprintf(output, _("\nFor more information, type \"\\?\" (for internal commands) or \"\\help\" (for SQL\n" "commands) from within psql, or consult the psql section in the PostgreSQL\n" diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c index 111593cd9da433e16e8faf088b3de60dc5259c5c..cbd515902d7c85402e5199281701d65ad361b069 100644 --- a/src/bin/psql/startup.c +++ b/src/bin/psql/startup.c @@ -28,6 +28,7 @@ #include "mainloop.h" #include "fe_utils/print.h" #include "settings.h" +#include "libpq/pqcomm.h" @@ -75,6 +76,7 @@ struct adhoc_opts char *port; char *username; char *logfilename; + bool use_shm; bool no_readline; bool no_psqlrc; bool single_txn; @@ -215,7 +217,7 @@ main(int argc, char *argv[]) /* loop until we have a password if requested by backend */ do { -#define PARAMS_ARRAY_SIZE 8 +#define PARAMS_ARRAY_SIZE 9 const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords)); const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values)); @@ -234,8 +236,10 @@ main(int argc, char *argv[]) values[5] = pset.progname; keywords[6] = "client_encoding"; values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto"; - keywords[7] = NULL; - values[7] = NULL; + keywords[7] = "shm_path"; + values[7] = options.use_shm ? PQShmPath : NULL; + keywords[8] = NULL; + values[8] = NULL; new_pass = false; pset.db = PQconnectdbParams(keywords, values, true); @@ -424,6 +428,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) {"html", no_argument, NULL, 'H'}, {"list", no_argument, NULL, 'l'}, {"log-file", required_argument, NULL, 'L'}, + {"shm", no_argument, NULL, 'm'}, {"no-readline", no_argument, NULL, 'n'}, {"single-transaction", no_argument, NULL, '1'}, {"output", required_argument, NULL, 'o'}, @@ -453,7 +458,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) memset(options, 0, sizeof *options); - while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01", + while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:mno:p:P:qR:sStT:U:v:VwWxXz?01", long_options, &optindex)) != -1) { switch (c) @@ -507,6 +512,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts * options) case 'L': options->logfilename = pg_strdup(optarg); break; + case 'm': + options->use_shm = true; + break; case 'n': options->no_readline = true; break; diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 5d07b782237ffdbfa7e0127f2a16039d0dfbcffc..bdf883b72eeaa9959a5a97a33c223413ba7fdf2e 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -194,6 +194,8 @@ typedef struct Port X509 *peer; unsigned long count; #endif + + PQShmConnection* shm_conn; } Port; #ifdef USE_SSL @@ -226,4 +228,9 @@ extern int pq_setkeepalivesidle(int idle, Port *port); extern int pq_setkeepalivesinterval(int interval, Port *port); extern int pq_setkeepalivescount(int count, Port *port); +/* 共享内存 */ +extern void be_pq_init_shm_area(void); +extern int be_accept_shm_connections(PQShmConnection** conn_array, int array_size); +extern ssize_t be_shm_read(Port *port, char* ptr, size_t len); +extern ssize_t be_shm_write(Port *port, char* ptr, size_t len); #endif /* LIBPQ_BE_H */ diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 1d063d12489b60e933825680e46299aeee733486..d22c3f8b3c8dcfcc05aecee19355589f6bb389b3 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -22,6 +22,8 @@ #include #endif #include +#include +#include #ifdef HAVE_STRUCT_SOCKADDR_STORAGE @@ -203,4 +205,121 @@ typedef struct CancelRequestPacket */ #define NEGOTIATE_SSL_CODE PG_PROTOCOL(1234,5679) +/* libpq 共享内存通讯 */ + + +/* 8K */ +#define PQ_SHM_BUFFER_SIZE (8 *1024) +#define PQ_SHM_MAX_CONNECTION (512) + +#define PQ_SHM_STATUS_EMPTY (0) +#define PQ_SHM_STATUS_CONNECTING (1) +#define PQ_SHM_STATUS_ESTABLISHED (2) +#define PQ_SHM_STATUS_CLOSED (3) + +typedef struct { + int shm_file_fd; + int max_connections; /* 最大连接数 */ + int empty_connections; /* 空闲连接数据 */ + int established_connections; /* 已经建立了多少连接 */ + int bytes_per_conn; /* 每个连接用了多少内存 */ +} PQShmManifest; /* 记录一些摘要信息,后续可以考虑通过函数打印出来 */ + +typedef struct { + uint32 read_index; + uint32 write_index; + char buffer[PQ_SHM_BUFFER_SIZE]; +} PQShmByteBuffer; + +typedef struct { + sem_t sem; /* lock for this connection */ + uint32 status; /*SEE PQ_SHM_STATUS_* */ + uint32 last_active_tm; /*last active timestamp*/ + PQShmByteBuffer client_to_server; + PQShmByteBuffer server_to_client; +} PQShmConnection; + + +/* 整块共享的内存区域 */ +typedef struct { + PQShmManifest manifest; + PQShmConnection connections[FLEXIBLE_ARRAY_MEMBER]; +} PQShmArea; + +#define PQShmAreaSize \ + (offsetof(PQShmArea, connections) + sizeof(PQShmConnection) * PQ_SHM_MAX_CONNECTION) + +#define PQShmPath "/tmp/pg_shm.bin" + + +static inline int PQShmConnection_lock(PQShmConnection* conn) { + int ret = sem_wait(&conn->sem); + Assert(ret == 0); + return ret; +} + +static inline int PQShmConnection_unlock(PQShmConnection* conn) { + int ret = sem_post(&conn->sem); + Assert(ret == 0); + return ret; +} + +static inline uint32 pq_get_current_timestamp() { + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint32)tv.tv_sec; +} + +/* 发起连接后,服务端没有影响,超时 */ +#define SHM_CONNECT_TIMEOUT_SEC (60) +#define SHM_ACTIVE_TIMEOUT_SEC (60) /* 超过配置时间,把连接状态标记为closed */ +#define SHM_CLOSE_TIMEOUT_SEC (2 * SHM_ACTIVE_TIMEOUT_SEC) /*closed一段时间后,连接重新记录为empty*/ + +static inline ssize_t bb_putbyte(PQShmByteBuffer *bb, char* ptr, size_t len) { + size_t w_len; + + // 可写空间满了 + if (bb->write_index >= PQ_SHM_BUFFER_SIZE) { + return -1; + } + + // 计算写入字节数 + w_len = PQ_SHM_BUFFER_SIZE - bb->write_index; + if (w_len > len) { + w_len = len; + } + + // 写入 + memcpy(bb->buffer + bb->write_index, ptr, w_len); + bb->write_index += w_len; + + return w_len; +} + +// ptr allocated by caller +static inline ssize_t bb_getbyte(PQShmByteBuffer *bb, char* ptr, size_t len) +{ + size_t r_len; + + // 没有数据 + if (bb->write_index == 0) { + return 0; + } + + // 计算读取字节数 + r_len = bb->write_index - bb->read_index; + if (r_len > len) { + r_len = len; + } + + // 读取 + memcpy(ptr, bb->buffer, r_len); + bb->read_index += r_len; + if (bb->read_index == bb->write_index) { + bb->read_index = bb->write_index = 0; + } + + return r_len; +} + #endif /* PQCOMM_H */ diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index 1b292d2cf22be1d30af4a83e9ad5cfe93eb50136..a4d17cb149969be8e8449e34ed6867f585189023 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -32,7 +32,7 @@ LIBS := $(LIBS:-lpgport=) # We can't use Makefile variables here because the MSVC build system scrapes # OBJS from this file. OBJS= fe-auth.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \ - fe-protocol2.o fe-protocol3.o pqexpbuffer.o fe-secure.o \ + fe-protocol2.o fe-protocol3.o pqexpbuffer.o fe-secure.o fe-shm.o \ libpq-events.o # libpgport C files we always use OBJS += chklocale.o inet_net_ntop.o noblock.o pgstrcasecmp.o pqsignal.o \ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 7d16c3b7243c823f41727f2bae68c32168797ed0..07f1c7e2dc086f3087853777fbc134e577cd577f 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -297,6 +297,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Require-Peer", "", 10, offsetof(struct pg_conn, requirepeer)}, + {"shm_path", NULL, NULL, NULL, + "Shared-memory-Filepath", "", 64, + offsetof(struct pg_conn, shm_path)}, + #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) /* Kerberos and GSSAPI authentication support specifying the service name */ {"krbsrvname", "PGKRBSRVNAME", PG_KRB_SRVNAM, NULL, @@ -1456,6 +1460,12 @@ connectDBStart(PGconn *conn) conn->inStart = conn->inCursor = conn->inEnd = 0; conn->outCount = 0; + if (conn->shm_path) { + conn->status = CONNECTION_SHM_START_CONNECT; + if (PQconnectPoll(conn) == PGRES_POLLING_WRITING) + return 1; + } + /* * Determine the parameters to pass to pg_getaddrinfo_all. */ @@ -1729,6 +1739,9 @@ PQconnectPoll(PGconn *conn) case CONNECTION_SSL_STARTUP: case CONNECTION_NEEDED: break; + + case CONNECTION_SHM_START_CONNECT: + break; default: appendPQExpBufferStr(&conn->errorMessage, @@ -2096,7 +2109,7 @@ keep_going: /* We will come back to here until there is /* Don't bother requesting SSL over a Unix socket */ conn->allow_ssl_try = false; } - if (conn->allow_ssl_try && !conn->wait_ssl_try && + if (!conn->shm_conn && conn->allow_ssl_try && !conn->wait_ssl_try && !conn->ssl_in_use) { ProtocolVersion pv; @@ -2698,6 +2711,56 @@ keep_going: /* We will come back to here until there is /* We are open for business! */ conn->status = CONNECTION_OK; return PGRES_POLLING_OK; + + case CONNECTION_SHM_START_CONNECT: + conn->shm_conn = fe_pq_connect_to_shm(conn->shm_path); + if (!conn->shm_conn) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot not connect to shared memory: %s"), conn->shm_path); + goto error_return; + } + conn->status = CONNECTION_SHM_WAITING_ESTABLISHED; + goto keep_going; + + case CONNECTION_SHM_WAITING_ESTABLISHED: + { + uint32 current_timestamp = pq_get_current_timestamp(); + bool connect_failed = false; + bool connect_timeout = false; + bool connect_success = false; + + Assert(conn->shm_conn); + + PQShmConnection_lock(conn->shm_conn); + if (conn->shm_conn->last_active_tm + SHM_CONNECT_TIMEOUT_SEC < current_timestamp) { + connect_timeout = true; + } + if (conn->shm_conn->status == PQ_SHM_STATUS_ESTABLISHED) { + connect_success = true; + } else if (conn->shm_conn->status == PQ_SHM_STATUS_CONNECTING) { + /*keep going*/ + } else { + connect_failed = false; + } + PQShmConnection_unlock(conn->shm_conn); + + if (connect_success) { + conn->status = CONNECTION_MADE; + return PGRES_POLLING_WRITING; /* connected to shm */ + } else if (connect_timeout) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("connect to shared memory timeout")); + goto error_return; + } else if (connect_failed) { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("connect to shared memory failed")); + goto error_return; + } else { + usleep(1000); /*0.1s*/ + goto keep_going; + } + break; + } default: appendPQExpBuffer(&conn->errorMessage, diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 32da8ca4616b55dc7a8494b04a28afc8eddbeac7..6645d5e62089bf7daf7085a140d14dd9b3bcff67 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -636,7 +636,7 @@ pqReadData(PGconn *conn) int someread = 0; int nread; - if (conn->sock == PGINVALID_SOCKET) + if (!conn->shm_conn && conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); @@ -837,7 +837,7 @@ pqSendSome(PGconn *conn, int len) int remaining = conn->outCount; int result = 0; - if (conn->sock == PGINVALID_SOCKET) + if (!conn->shm_conn && conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("connection not open\n")); @@ -1055,6 +1055,9 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time) if (!conn) return -1; + if (conn->shm_conn) { + return 1; /* short-circuit the select */ + } if (conn->sock == PGINVALID_SOCKET) { printfPQExpBuffer(&conn->errorMessage, diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c index 94e47a50ed2d8af53fc3643a82e81b0c5f58c423..4b8811ae6cca808ae006f80dda008879701a64d8 100644 --- a/src/interfaces/libpq/fe-secure.c +++ b/src/interfaces/libpq/fe-secure.c @@ -206,6 +206,10 @@ pqsecure_read(PGconn *conn, void *ptr, size_t len) { ssize_t n; + if (conn->shm_conn) + { + n = pgshm_read(conn, ptr, len); + } else #ifdef USE_SSL if (conn->ssl_in_use) { @@ -283,6 +287,10 @@ pqsecure_write(PGconn *conn, const void *ptr, size_t len) { ssize_t n; + if (conn->shm_conn) + { + n = pgshm_write(conn, ptr, len); + } else #ifdef USE_SSL if (conn->ssl_in_use) { diff --git a/src/interfaces/libpq/fe-shm.c b/src/interfaces/libpq/fe-shm.c new file mode 100644 index 0000000000000000000000000000000000000000..065514de85c036de4fdd407f767129179522c327 --- /dev/null +++ b/src/interfaces/libpq/fe-shm.c @@ -0,0 +1,145 @@ +#include "postgres_fe.h" + +#include "libpq-fe.h" +#include "fe-auth.h" +#include "libpq-int.h" +#include "libpq/pqcomm.h" +#include +#include +#include +#include +#include + +#define SHMTRYLIMIT 10 + +static PQShmArea* libpq_shm_area = NULL; + +static void must_connect_to_shm(const char* path) { + int fd; + + if (libpq_shm_area) { + return; + } + + fd = open(PQShmPath, O_RDWR); + if (fd < 0) { + fprintf(stderr, "cant not open shm file %s\n", strerror(errno)); + exit(0); + } + + libpq_shm_area = (PQShmArea*)mmap(NULL, PQShmAreaSize, PROT_WRITE | PROT_READ, MAP_SHARED, + fd, 0); + + if (libpq_shm_area == MAP_FAILED) { + fprintf(stderr, "cant not mmap shm file %s\n", strerror(errno)); + exit(0); + } +} + +PQShmConnection* fe_pq_connect_to_shm(const char* path) +{ + int i; + PQShmConnection* free_conn = NULL; + + must_connect_to_shm(path); + + for (i = 0; i < libpq_shm_area->manifest.max_connections; i++) { + PQShmConnection* conn = &libpq_shm_area->connections[i]; + PQShmConnection_lock(conn); + if (conn->status == PQ_SHM_STATUS_EMPTY) { /* 空闲连接 */ + free_conn = conn; + + conn->last_active_tm = pq_get_current_timestamp(); + conn->status = PQ_SHM_STATUS_CONNECTING; /*向server发起连接*/ + } + + PQShmConnection_unlock(conn); + + if (free_conn) { + break; + } + } + + return free_conn; +} + +static ssize_t shm_read(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_getbyte(&conn->server_to_client, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +static ssize_t shm_write(PQShmConnection* conn, char* ptr, size_t len) +{ + ssize_t n; + Assert(conn->status == PQ_SHM_STATUS_ESTABLISHED); + + // 申请锁 + if (-1 == sem_trywait(&conn->sem)) { + return -1; + } + + n = bb_putbyte(&conn->server_to_client, ptr, len); + conn->last_active_tm = pq_get_current_timestamp(); + sem_post(&conn->sem); // 释放锁 + + return n; +} + +ssize_t pgshm_read(PGconn* conn, void *ptr, size_t len) +{ + ssize_t n; + int trytime; + int result_errno = 0; + PQShmConnection *shm_conn = conn->shm_conn; + +rloop: + n = shm_read(shm_conn, ptr, len); + if (n < 0) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} + +ssize_t pgshm_write(PGconn* conn, void *ptr, size_t len) +{ + ssize_t n; + int trytime; + PQShmConnection *shm_conn = conn->shm_conn; + +rloop: + n = shm_write(shm_conn, ptr, len); + if (n == -1) + { + if (trytime++ >= SHMTRYLIMIT) { + /* 达到最大尝试次数 */ + shm_conn->status = PQ_SHM_STATUS_CLOSED; + errno = ECONNRESET; + } else { + /* 重新申请锁 */ + pg_usleep(1000000L); /* 1000 ms */ + goto rloop; + } + } + return n; +} \ No newline at end of file diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 9ca0756c4bfaa7e0ecf256656f5e6f0ee8214515..6f748a34d69c3a2bffe04080c66914fef6c64bb3 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -27,6 +27,7 @@ extern "C" * such as Oid. */ #include "postgres_ext.h" +#include "libpq/pqcomm.h" /* * Option flags for PQcopyResult @@ -62,7 +63,10 @@ typedef enum * backend startup. */ CONNECTION_SETENV, /* Negotiating environment. */ CONNECTION_SSL_STARTUP, /* Negotiating SSL. */ - CONNECTION_NEEDED /* Internal state: connect() needed */ + CONNECTION_NEEDED, /* Internal state: connect() needed */ + + CONNECTION_SHM_START_CONNECT, /* start connect to shm */ + CONNECTION_SHM_WAITING_ESTABLISHED, /* 等待服务端accept该连接 */ } ConnStatusType; typedef enum @@ -600,6 +604,10 @@ extern int pg_char_to_encoding(const char *name); extern const char *pg_encoding_to_char(int encoding); extern int pg_valid_server_encoding_id(int encoding); +extern PQShmConnection* fe_pq_connect_to_shm(const char* path); +extern ssize_t pgshm_read(PGconn* conn, void *ptr, size_t len); +extern ssize_t pgshm_write(PGconn* conn, void *ptr, size_t len); + #ifdef __cplusplus } #endif diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9e4e62578f6aba750aba477afff5c3a69b9373ed..6edb395c1748c5682c3024897c3dd1a14064f972 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -330,11 +330,14 @@ struct pg_conn char *sslrootcert; /* root certificate filename */ char *sslcrl; /* certificate revocation list filename */ char *requirepeer; /* required peer credentials for local sockets */ + char *shm_path; /* shared memory file path */ #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) char *krbsrvname; /* Kerberos service name */ #endif + PQShmConnection* shm_conn; /* connection of shared memory */ + /* Optional file to write trace info to */ FILE *Pfdebug;