diff --git a/Add-io_uring-support.patch b/Add-io_uring-support.patch new file mode 100644 index 0000000000000000000000000000000000000000..c74e5bdcdf9698b9ced25cb82e7d07383b305c07 --- /dev/null +++ b/Add-io_uring-support.patch @@ -0,0 +1,316 @@ +From 96277a114f726affcba75cdf0b39808296b25ff0 Mon Sep 17 00:00:00 2001 +From: Jiufei Xue +Date: Fri, 7 Aug 2020 16:59:20 +0800 +Subject: [PATCH 1/7] add io_uring support + +Use io_uring to do poll_add/poll_remove. Make sure you have liburing +installed and kernel supported. + +Signed-off-by: Betty +Signed-off-by: Jiufei Xue +--- + src/Makefile | 2 +- + src/ae.c | 14 ++-- + src/ae_iouring.c | 172 +++++++++++++++++++++++++++++++++++++++++++++++ + src/config.h | 5 ++ + src/networking.c | 28 ++++++++ + 5 files changed, 215 insertions(+), 6 deletions(-) + create mode 100644 src/ae_iouring.c + +diff --git a/src/Makefile b/src/Makefile +index 691d543..a1e6ef8 100644 +--- a/src/Makefile ++++ b/src/Makefile +@@ -154,7 +154,7 @@ ifeq ($(uname_S),Haiku) + else + # All the other OSes (notably Linux) + FINAL_LDFLAGS+= -rdynamic +- FINAL_LIBS+=-ldl -pthread -lrt ++ FINAL_LIBS+=-ldl -pthread -lrt -luring + endif + endif + endif +diff --git a/src/ae.c b/src/ae.c +index c516665..7c4569f 100644 +--- a/src/ae.c ++++ b/src/ae.c +@@ -49,13 +49,17 @@ + #ifdef HAVE_EVPORT + #include "ae_evport.c" + #else +- #ifdef HAVE_EPOLL +- #include "ae_epoll.c" ++ #ifdef HAVE_IO_URING ++ #include "ae_iouring.c" + #else +- #ifdef HAVE_KQUEUE +- #include "ae_kqueue.c" ++ #ifdef HAVE_EPOLL ++ #include "ae_epoll.c" + #else +- #include "ae_select.c" ++ #ifdef HAVE_KQUEUE ++ #include "ae_kqueue.c" ++ #else ++ #include "ae_select.c" ++ #endif + #endif + #endif + #endif +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +new file mode 100644 +index 0000000..1475ff1 +--- /dev/null ++++ b/src/ae_iouring.c +@@ -0,0 +1,172 @@ ++/* Linux epoll(2) based ae.c module ++ * ++ * Copyright (c) 2009-2012, Salvatore Sanfilippo ++ * All rights reserved. ++ * ++ * Redistribution and use in source and binary forms, with or without ++ * modification, are permitted provided that the following conditions are met: ++ * ++ * * Redistributions of source code must retain the above copyright notice, ++ * this list of conditions and the following disclaimer. ++ * * Redistributions in binary form must reproduce the above copyright ++ * notice, this list of conditions and the following disclaimer in the ++ * documentation and/or other materials provided with the distribution. ++ * * Neither the name of Redis nor the names of its contributors may be used ++ * to endorse or promote products derived from this software without ++ * specific prior written permission. ++ * ++ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" ++ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE ++ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ++ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE ++ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ++ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF ++ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ++ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN ++ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ++ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE ++ * POSSIBILITY OF SUCH DAMAGE. ++ */ ++ ++ ++#include //used for define EPOLLIN and EPOLLOUT ++#include "liburing.h" ++ ++#define BACKLOG 4096 ++#define MAX_ENTRIES 16384 /* entries should be configured by users */ ++ ++typedef struct uring_event { ++ int fd; ++ int type; ++} uring_event; ++ ++typedef struct aeApiState { ++ int urfd; ++ struct io_uring *ring; ++ struct uring_event *events; ++} aeApiState; ++ ++static int aeApiCreate(aeEventLoop *eventLoop) { ++ aeApiState *state = zmalloc(sizeof(aeApiState)); ++ if (!state) return -1; ++ ++ state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); ++ if (!state->events) { ++ zfree(state); ++ return -1; ++ } ++ ++ state->ring = zmalloc(sizeof(struct io_uring)); ++ if (!state->ring) { ++ zfree(state->events); ++ zfree(state); ++ return -1; ++ } ++ ++ state->urfd = io_uring_queue_init(MAX_ENTRIES, state->ring, 0); ++ if (state->urfd == -1) { ++ zfree(state->ring); ++ zfree(state->events); ++ zfree(state); ++ return -1; ++ } ++ eventLoop->apidata = state; ++ return 0; ++} ++ ++static int aeApiResize(aeEventLoop *eventLoop, int setsize) { ++ (void)eventLoop; ++ if (setsize >= FD_SETSIZE) return -1; ++ return 0; ++} ++ ++static void aeApiFree(aeEventLoop *eventLoop) { ++ aeApiState *state = eventLoop->apidata; ++ ++ close(state->urfd); ++ zfree(state->events); ++ zfree(state->ring); ++ zfree(state); ++} ++ ++static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { ++ aeApiState *state = eventLoop->apidata; ++ ++ struct io_uring_sqe *sqe = io_uring_get_sqe(state->ring); ++ if (!sqe) return -1; ++ ++ int poll_mask = 0; ++ /* io_uring only support onshot epoll */ ++ if (mask == AE_READABLE) poll_mask |= EPOLLIN; ++ if (mask == AE_WRITABLE) poll_mask |= EPOLLOUT; ++ io_uring_prep_poll_add(sqe, fd, poll_mask); ++ ++ uring_event *ev = &state->events[fd]; ++ ev->fd = fd; ++ ev->type = mask; ++ io_uring_sqe_set_data(sqe, (void *)ev); ++ io_uring_submit(state->ring); ++ ++ return 0; ++} ++ ++static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { ++ (void)delmask; ++ aeApiState *state = eventLoop->apidata; ++ ++ struct io_uring_sqe *sqe = io_uring_get_sqe(state->ring); ++ if (!sqe) exit(1); ++ ++ uring_event *ev = &state->events[fd]; ++ io_uring_prep_poll_remove(sqe, (void *)ev); ++ io_uring_submit(state->ring); ++} ++ ++static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { ++ aeApiState *state = eventLoop->apidata; ++ int retval, numevents = 0; ++ ++ /* TODO: handle timeout */ ++ (void)tvp; ++ ++ struct io_uring_cqe *cqe; ++ retval = io_uring_wait_cqe(state->ring, &cqe); ++ if (retval < 0) { ++ return numevents; ++ } ++ ++ struct io_uring_cqe *cqes[BACKLOG]; ++ int cqe_count = io_uring_peek_batch_cqe(state->ring, cqes, sizeof(cqes) / sizeof(cqes[0])); ++ ++ /* go through all the cqe's */ ++ for (int i = 0; i < cqe_count; ++i) { ++ int mask = 0; ++ struct io_uring_cqe *cqe = cqes[i]; ++ uring_event *ev = io_uring_cqe_get_data(cqe); ++ if (!ev) { ++ io_uring_cqe_seen(state->ring, cqe); ++ continue; ++ } ++ ++ if (cqe->res < 0) { ++ io_uring_cqe_seen(state->ring, cqe); ++ continue; ++ } ++ ++ if (cqe->res & EPOLLIN) mask |= AE_READABLE; ++ if (cqe->res & EPOLLOUT) mask |= AE_WRITABLE; ++ if (cqe->res & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; ++ if (cqe->res & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; ++ eventLoop->fired[numevents].fd = ev->fd; ++ eventLoop->fired[numevents].mask = mask; ++ ++ io_uring_cqe_seen(state->ring, cqe); ++ numevents++; ++ } ++ ++ return numevents; ++} ++ ++static char *aeApiName(void) { ++ return "io_uring"; ++} +diff --git a/src/config.h b/src/config.h +index b9c68f3..66a9cce 100644 +--- a/src/config.h ++++ b/src/config.h +@@ -79,6 +79,11 @@ + #define HAVE_EPOLL 1 + #endif + ++/* Test for io uring API */ ++#ifdef __linux__ ++#define HAVE_IO_URING 1 ++#endif ++ + #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) + #define HAVE_KQUEUE 1 + #endif +diff --git a/src/networking.c b/src/networking.c +index 21fef52..b4cc62c 100644 +--- a/src/networking.c ++++ b/src/networking.c +@@ -1048,6 +1048,13 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); ++ ++#ifdef HAVE_IO_URING ++ if (aeCreateFileEvent(el, fd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) { ++ serverPanic("Unrecoverable error creating server.ipfd file event."); ++ } ++#endif ++ + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); +@@ -1068,6 +1075,12 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); ++#ifdef HAVE_IO_URING ++ if (aeCreateFileEvent(el, fd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) { ++ serverPanic("Unrecoverable error creating server.ipfd file event."); ++ } ++#endif ++ + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); +@@ -1087,6 +1100,13 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + if (errno != EWOULDBLOCK) + serverLog(LL_WARNING, + "Accepting client connection: %s", server.neterr); ++ ++#ifdef HAVE_IO_URING ++ if (aeCreateFileEvent(el, fd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) { ++ serverPanic("Unrecoverable error creating server.ipfd file event."); ++ } ++#endif ++ + return; + } + serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); +@@ -2077,6 +2097,14 @@ void readQueryFromClient(connection *conn) { + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. */ + processInputBuffer(c); ++ ++ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; ++ ++#ifdef HAVE_IO_URING ++ if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) { ++ serverPanic("Unrecoverable error creating conn.fd file event."); ++ } ++#endif + } + + void getClientsMaxBuffers(unsigned long *longest_output_list, +-- +2.18.2 + diff --git a/Ae_iouring-add-register-files-support.patch b/Ae_iouring-add-register-files-support.patch new file mode 100644 index 0000000000000000000000000000000000000000..c3411bc1b8936b692d7f98a9712efeebf2d5e81f --- /dev/null +++ b/Ae_iouring-add-register-files-support.patch @@ -0,0 +1,164 @@ +From 438e59330ae09a1ed805debfd73bb554ad89d0b2 Mon Sep 17 00:00:00 2001 +From: Jiufei Xue +Date: Fri, 14 Aug 2020 15:34:31 +0800 +Subject: [PATCH 5/7] ae_iouring: add register files support + +Signed-off-by: Betty +Signed-off-by: Jiufei Xue +--- + src/ae.c | 7 +++++++ + src/ae.h | 1 + + src/ae_iouring.c | 32 +++++++++++++++++++++++++++++++- + src/networking.c | 3 +++ + src/server.c | 3 +++ + 5 files changed, 45 insertions(+), 1 deletion(-) + +diff --git a/src/ae.c b/src/ae.c +index 1d04a11..cbf8b83 100644 +--- a/src/ae.c ++++ b/src/ae.c +@@ -586,3 +586,10 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep + void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) { + eventLoop->aftersleep = aftersleep; + } ++ ++void aeRegisterFile(aeEventLoop *eventLoop, int fd) { ++#if defined(HAVE_IO_URING) ++ aeApiRegisterFile(eventLoop, fd); ++#endif ++} ++ +diff --git a/src/ae.h b/src/ae.h +index 08f5416..06806a2 100644 +--- a/src/ae.h ++++ b/src/ae.h +@@ -138,5 +138,6 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); + int aeGetSetSize(aeEventLoop *eventLoop); + int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); + void aeSetDontWait(aeEventLoop *eventLoop, int noWait); ++void aeRegisterFile(aeEventLoop *eventLoop, int fd); + + #endif +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +index 88ea3e1..b230f5e 100644 +--- a/src/ae_iouring.c ++++ b/src/ae_iouring.c +@@ -35,6 +35,8 @@ + #define BACKLOG 8192 + #define MAX_ENTRIES 16384 /* entries should be configured by users */ + ++static int register_files = 1; ++ + typedef struct uring_event { + int fd; + int type; +@@ -81,6 +83,22 @@ static int aeApiCreate(aeEventLoop *eventLoop) { + } + + eventLoop->apidata = state; ++ ++ if (register_files) { ++ int *files = malloc(MAX_ENTRIES * sizeof(int)); ++ if (!files) { ++ register_files = 0; ++ return 0; ++ } ++ for (int i = 0; i < MAX_ENTRIES; i++) ++ files[i] = -1; ++ ++ int ret = io_uring_register_files(state->ring, files, MAX_ENTRIES); ++ if (ret < 0) { ++ fprintf(stderr, "error register_files %d\n", ret); ++ register_files = 0; ++ } ++ } + return 0; + } + +@@ -124,7 +142,8 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask, + ev->type = mask; + if (!iovecs) + ev->type |= AE_POLLABLE; +- ++ if (register_files) ++ sqe->flags |= IOSQE_FIXED_FILE; + io_uring_sqe_set_data(sqe, (void *)ev); + + return 0; +@@ -191,6 +210,17 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + return numevents; + } + ++void aeApiRegisterFile(aeEventLoop *eventLoop, int fd){ ++ aeApiState *state = eventLoop->apidata; ++ if (register_files) { ++ int ret = io_uring_register_files_update(state->ring, fd, &fd, 1); ++ if (ret < 0) { ++ fprintf(stderr, "lege io_uring_register_files_update failed: %d %d\n", fd, ret); ++ register_files = 0; ++ } ++ } ++} ++ + static char *aeApiName(void) { + return "io_uring"; + } +diff --git a/src/networking.c b/src/networking.c +index 506041c..3bfa8d8 100644 +--- a/src/networking.c ++++ b/src/networking.c +@@ -1074,6 +1074,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); ++ aeRegisterFile(el, cfd); + acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); + } + } +@@ -1100,6 +1101,7 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + return; + } + serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); ++ aeRegisterFile(el, cfd); + acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip); + } + } +@@ -1126,6 +1128,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + return; + } + serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); ++ aeRegisterFile(el, cfd); + acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL); + } + } +diff --git a/src/server.c b/src/server.c +index c32eccc..7337cfd 100644 +--- a/src/server.c ++++ b/src/server.c +@@ -2983,6 +2983,7 @@ void initServer(void) { + /* Create an event handler for accepting new connections in TCP and Unix + * domain sockets. */ + for (j = 0; j < server.ipfd_count; j++) { ++ aeRegisterFile(server.el, server.ipfd[j]); + if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, + acceptTcpHandler,NULL) == AE_ERR) + { +@@ -2991,6 +2992,7 @@ void initServer(void) { + } + } + for (j = 0; j < server.tlsfd_count; j++) { ++ aeRegisterFile(server.el, server.tlsfd[j]); + if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, + acceptTLSHandler,NULL) == AE_ERR) + { +@@ -3004,6 +3006,7 @@ void initServer(void) { + + /* Register a readable event for the pipe used to awake the event loop + * when a blocked client in a module needs attention. */ ++ aeRegisterFile(server.el, server.module_blocked_pipe[0]); + if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, + moduleBlockedClientPipeReadable,NULL) == AE_ERR) { + serverPanic( +-- +2.18.2 + diff --git a/Ae_iouring-enable-sqpoll.patch b/Ae_iouring-enable-sqpoll.patch new file mode 100644 index 0000000000000000000000000000000000000000..4ad0ad14c70b2af9bc33bc3a077808c2ad3423eb --- /dev/null +++ b/Ae_iouring-enable-sqpoll.patch @@ -0,0 +1,28 @@ +From fe51d7d0eda77456fe5b214d03705850163661aa Mon Sep 17 00:00:00 2001 +From: Zhihao Cheng +Date: Sat, 13 Mar 2021 15:34:21 +0800 +Subject: [PATCH 6/7] ae_iouring: enable sqpoll + +Need non-fixedfd. + +Signed-off-by: Zhihao Cheng +--- + src/ae_iouring.c | 2 ++ + 1 file changed, 2 insertions(+) + +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +index b230f5e..6c53bca 100644 +--- a/src/ae_iouring.c ++++ b/src/ae_iouring.c +@@ -67,6 +67,8 @@ static int aeApiCreate(aeEventLoop *eventLoop) { + + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); ++ params.flags = IORING_SETUP_SQPOLL | IORING_SETUP_SQ_AFF; ++ params.sq_thread_cpu = 14; + state->urfd = io_uring_queue_init_params(MAX_ENTRIES, state->ring, ¶ms); + if (state->urfd == -1) { + zfree(state->ring); +-- +2.18.2 + diff --git a/Fix-compile-err-unknown-type-sds-in-connection.h.patch b/Fix-compile-err-unknown-type-sds-in-connection.h.patch new file mode 100644 index 0000000000000000000000000000000000000000..d4e20253402a541bf64dbd85804b587390ab16f9 --- /dev/null +++ b/Fix-compile-err-unknown-type-sds-in-connection.h.patch @@ -0,0 +1,25 @@ +From 405b06f06b92d5fa0b4862df13c0e447f16c527d Mon Sep 17 00:00:00 2001 +From: Zhihao Cheng +Date: Wed, 1 Sep 2021 02:51:38 +0800 +Subject: [PATCH 7/7] Fix compile err unknown type sds in connection.h + +--- + src/connection.h | 2 ++ + 1 file changed, 2 insertions(+) + +diff --git a/src/connection.h b/src/connection.h +index 01af4eb..0aaf474 100644 +--- a/src/connection.h ++++ b/src/connection.h +@@ -31,6 +31,8 @@ + #ifndef __REDIS_CONNECTION_H + #define __REDIS_CONNECTION_H + ++#include "sds.h" ++ + #define CONN_INFO_LEN 32 + + struct aeEventLoop; +-- +2.18.2 + diff --git a/Use-fast-poll-feature-for-read.patch b/Use-fast-poll-feature-for-read.patch new file mode 100644 index 0000000000000000000000000000000000000000..96ed7e5741ad9b314b4026e105b95d0e29d8f342 --- /dev/null +++ b/Use-fast-poll-feature-for-read.patch @@ -0,0 +1,416 @@ +From 653d39195c1c4ed82f3f3a3fdaed8776e4b635ac Mon Sep 17 00:00:00 2001 +From: Jiufei Xue +Date: Wed, 12 Aug 2020 15:46:04 +0800 +Subject: [PATCH 2/7] use fast poll feature for read + +Signed-off-by: Betty < +Signed-off-by: Jiufei Xue +--- + src/ae.c | 23 +++++++++++++ + src/ae.h | 5 +++ + src/ae_iouring.c | 56 +++++++++++++++++++++++--------- + src/connection.h | 1 + + src/networking.c | 84 +++++++++++++++++++++++++++++++++++++++++++----- + src/server.c | 2 +- + src/server.h | 6 ++++ + 7 files changed, 153 insertions(+), 24 deletions(-) + +diff --git a/src/ae.c b/src/ae.c +index 7c4569f..cca630c 100644 +--- a/src/ae.c ++++ b/src/ae.c +@@ -43,6 +43,7 @@ + #include "ae.h" + #include "zmalloc.h" + #include "config.h" ++#include "connection.h" + + /* Include the best multiplexing layer supported by this system. + * The following should be ordered by performances, descending. */ +@@ -156,6 +157,12 @@ void aeStop(aeEventLoop *eventLoop) { + + int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData) ++{ ++ return aeCreateFileEventWithBuf(eventLoop, fd, mask, proc, clientData, NULL); ++} ++ ++int aeCreateFileEventWithBuf(aeEventLoop *eventLoop, int fd, int mask, ++ aeFileProc *proc, void *clientData, struct iovec *iovec) + { + if (fd >= eventLoop->setsize) { + errno = ERANGE; +@@ -163,7 +170,11 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + } + aeFileEvent *fe = &eventLoop->events[fd]; + ++#if defined(HAVE_IO_URING) ++ if (aeApiAddEvent(eventLoop, fd, mask, iovec) == -1) ++#else + if (aeApiAddEvent(eventLoop, fd, mask) == -1) ++#endif + return AE_ERR; + fe->mask |= mask; + if (mask & AE_READABLE) fe->rfileProc = proc; +@@ -480,6 +491,12 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) + * Fire the readable event if the call sequence is not + * inverted. */ + if (!invert && fe->mask & mask & AE_READABLE) { ++#if defined(HAVE_IO_URING) ++ if (!(mask & AE_POLLABLE)) { ++ connection *conn = fe->clientData; ++ conn->cqe_res = eventLoop->fired[j].res; ++ } ++#endif + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ +@@ -500,6 +517,12 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) + if ((fe->mask & mask & AE_READABLE) && + (!fired || fe->wfileProc != fe->rfileProc)) + { ++#if defined(HAVE_IO_URING) ++ if (!(mask & AE_POLLABLE)) { ++ connection *conn = fe->clientData; ++ conn->cqe_res = eventLoop->fired[j].res; ++ } ++#endif + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } +diff --git a/src/ae.h b/src/ae.h +index d1b7f34..08f5416 100644 +--- a/src/ae.h ++++ b/src/ae.h +@@ -46,6 +46,7 @@ + loop iteration. Useful when you want to persist + things to disk before sending replies, and want + to do that in a group fashion. */ ++#define AE_POLLABLE 8 /* Especially for io_uring */ + + #define AE_FILE_EVENTS (1<<0) + #define AE_TIME_EVENTS (1<<1) +@@ -61,6 +62,7 @@ + #define AE_NOTUSED(V) ((void) V) + + struct aeEventLoop; ++struct iovec; + + /* Types and data structures */ + typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); +@@ -94,6 +96,7 @@ typedef struct aeTimeEvent { + typedef struct aeFiredEvent { + int fd; + int mask; ++ int res; + } aeFiredEvent; + + /* State of an event based program */ +@@ -118,6 +121,8 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop); + void aeStop(aeEventLoop *eventLoop); + int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData); ++int aeCreateFileEventWithBuf(aeEventLoop *eventLoop, int fd, int mask, ++ aeFileProc *proc, void *clientData, struct iovec *iovecs); + void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); + int aeGetFileEvents(aeEventLoop *eventLoop, int fd); + long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +index 1475ff1..b109370 100644 +--- a/src/ae_iouring.c ++++ b/src/ae_iouring.c +@@ -32,7 +32,7 @@ + #include //used for define EPOLLIN and EPOLLOUT + #include "liburing.h" + +-#define BACKLOG 4096 ++#define BACKLOG 8192 + #define MAX_ENTRIES 16384 /* entries should be configured by users */ + + typedef struct uring_event { +@@ -63,13 +63,23 @@ static int aeApiCreate(aeEventLoop *eventLoop) { + return -1; + } + +- state->urfd = io_uring_queue_init(MAX_ENTRIES, state->ring, 0); ++ struct io_uring_params params; ++ memset(¶ms, 0, sizeof(params)); ++ state->urfd = io_uring_queue_init_params(MAX_ENTRIES, state->ring, ¶ms); + if (state->urfd == -1) { + zfree(state->ring); + zfree(state->events); + zfree(state); + return -1; + } ++ if (!(params.features & IORING_FEAT_FAST_POLL)) { ++ io_uring_queue_exit(state->ring); ++ zfree(state->ring); ++ zfree(state->events); ++ zfree(state); ++ return -1; ++ } ++ + eventLoop->apidata = state; + return 0; + } +@@ -89,21 +99,31 @@ static void aeApiFree(aeEventLoop *eventLoop) { + zfree(state); + } + +-static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { ++static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask, ++ struct iovec *iovecs) { + aeApiState *state = eventLoop->apidata; + + struct io_uring_sqe *sqe = io_uring_get_sqe(state->ring); + if (!sqe) return -1; + +- int poll_mask = 0; +- /* io_uring only support onshot epoll */ +- if (mask == AE_READABLE) poll_mask |= EPOLLIN; +- if (mask == AE_WRITABLE) poll_mask |= EPOLLOUT; +- io_uring_prep_poll_add(sqe, fd, poll_mask); ++ /* NULL iovec means doing poll_add behavior */ ++ if (!iovecs) { ++ unsigned int poll_mask = 0; ++ if (mask == AE_READABLE) poll_mask |= EPOLLIN; ++ if (mask == AE_WRITABLE) poll_mask |= EPOLLOUT; ++ io_uring_prep_poll_add(sqe, fd, EPOLLIN); ++ } else { ++ if (mask & AE_READABLE) ++ io_uring_prep_readv(sqe, fd, iovecs, 1, 0); ++ if (mask & AE_WRITABLE) ++ io_uring_prep_writev(sqe, fd, iovecs, 1, 0); ++ } + + uring_event *ev = &state->events[fd]; + ev->fd = fd; + ev->type = mask; ++ if (!iovecs) ++ ev->type |= AE_POLLABLE; + io_uring_sqe_set_data(sqe, (void *)ev); + io_uring_submit(state->ring); + +@@ -148,15 +168,21 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + continue; + } + +- if (cqe->res < 0) { +- io_uring_cqe_seen(state->ring, cqe); +- continue; ++ if (ev->type & AE_POLLABLE) { ++ if (cqe->res < 0) { ++ io_uring_cqe_seen(state->ring, cqe); ++ continue; ++ } ++ ++ if (cqe->res & EPOLLIN) mask |= AE_READABLE|AE_POLLABLE; ++ if (cqe->res & EPOLLOUT) mask |= AE_WRITABLE|AE_POLLABLE; ++ if (cqe->res & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE|AE_POLLABLE; ++ if (cqe->res & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE|AE_POLLABLE; ++ } else { ++ mask = ev->type; ++ eventLoop->fired[numevents].res = cqe->res; + } + +- if (cqe->res & EPOLLIN) mask |= AE_READABLE; +- if (cqe->res & EPOLLOUT) mask |= AE_WRITABLE; +- if (cqe->res & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; +- if (cqe->res & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; + eventLoop->fired[numevents].fd = ev->fd; + eventLoop->fired[numevents].mask = mask; + +diff --git a/src/connection.h b/src/connection.h +index 03281a3..01af4eb 100644 +--- a/src/connection.h ++++ b/src/connection.h +@@ -80,6 +80,7 @@ struct connection { + ConnectionCallbackFunc conn_handler; + ConnectionCallbackFunc write_handler; + ConnectionCallbackFunc read_handler; ++ int cqe_res; + int fd; + }; + +diff --git a/src/networking.c b/src/networking.c +index b4cc62c..59201f8 100644 +--- a/src/networking.c ++++ b/src/networking.c +@@ -105,11 +105,19 @@ client *createClient(connection *conn) { + * in the context of a client. When commands are executed in other + * contexts (for instance a Lua script) we need a non connected client. */ + if (conn) { ++#if !defined(HAVE_IO_URING) + connNonBlock(conn); ++#endif + connEnableTcpNoDelay(conn); + if (server.tcpkeepalive) + connKeepAlive(conn,server.tcpkeepalive); ++ ++#if defined(HAVE_IO_URING) ++ /* Do not create poll event */ ++ conn->read_handler = readDoneFromClient; ++#else + connSetReadHandler(conn, readQueryFromClient); ++#endif + connSetPrivateData(conn, c); + } + +@@ -121,7 +129,9 @@ client *createClient(connection *conn) { + c->name = NULL; + c->bufpos = 0; + c->qb_pos = 0; ++ c->qblen = 0; + c->querybuf = sdsempty(); ++ c->submitted_query = 0; + c->pending_querybuf = sdsempty(); + c->querybuf_peak = 0; + c->reqtype = 0; +@@ -1033,6 +1043,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) { + freeClient(connGetPrivateData(conn)); + return; + } ++ ++#ifdef HAVE_IO_URING ++ readQueryFromClient(conn); ++#endif + } + + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { +@@ -2028,7 +2042,7 @@ void processInputBuffer(client *c) { + + void readQueryFromClient(connection *conn) { + client *c = connGetPrivateData(conn); +- int nread, readlen; ++ int readlen; + size_t qblen; + + /* Check if we want to read from the client later when exiting from +@@ -2055,10 +2069,12 @@ void readQueryFromClient(connection *conn) { + if (remaining > 0 && remaining < readlen) readlen = remaining; + } + +- qblen = sdslen(c->querybuf); ++ qblen = c->qblen = sdslen(c->querybuf); + if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; + c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); +- nread = connRead(c->conn, c->querybuf+qblen, readlen); ++ ++#if !defined(HAVE_IO_URING) ++ int nread = connRead(c->conn, c->querybuf+qblen, readlen); + if (nread == -1) { + if (connGetState(conn) == CONN_STATE_CONNECTED) { + return; +@@ -2098,15 +2114,67 @@ void readQueryFromClient(connection *conn) { + * in case to check if there is a full command to execute. */ + processInputBuffer(c); + +- if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; +- +-#ifdef HAVE_IO_URING +- if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR) { +- serverPanic("Unrecoverable error creating conn.fd file event."); ++#else ++ c->riov.iov_len = readlen; ++ c->riov.iov_base = c->querybuf+qblen; ++ c->submitted_query++; ++ if (aeCreateFileEventWithBuf(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn, &c->riov) == AE_ERR) { ++ serverPanic("Unrecoverable error creating file event."); + } + #endif + } + ++#if defined(HAVE_IO_URING) ++void readDoneFromClient(connection *conn) { ++ client *c = connGetPrivateData(conn); ++ int nread = conn->cqe_res; ++ ++ if (nread < 0) { ++ if (connGetState(conn) == CONN_STATE_CONNECTED) { ++ return; ++ } else { ++ serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); ++ freeClientAsync(c); ++ return; ++ } ++ } else if (nread == 0) { ++ serverLog(LL_VERBOSE, "Client closed connection"); ++ freeClientAsync(c); ++ return; ++ } else if (c->flags & CLIENT_MASTER) { ++ /* Append the query buffer to the pending (not applied) buffer ++ * of the master. We'll use this buffer later in order to have a ++ * copy of the string applied by the last command executed. */ ++ c->pending_querybuf = sdscatlen(c->pending_querybuf, ++ c->querybuf+c->qblen,nread); ++ } ++ ++ sdsIncrLen(c->querybuf,nread); ++ c->lastinteraction = server.unixtime; ++ if (c->flags & CLIENT_MASTER) c->read_reploff += nread; ++ server.stat_net_input_bytes += nread; ++ if (sdslen(c->querybuf) > server.client_max_querybuf_len) { ++ sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); ++ ++ bytes = sdscatrepr(bytes,c->querybuf,64); ++ serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); ++ sdsfree(ci); ++ sdsfree(bytes); ++ ++ freeClientAsync(c); ++ return; ++ } ++ ++ /* There is more data in the client input buffer, continue parsing it ++ * in case to check if there is a full command to execute. */ ++ processInputBuffer(c); ++ c->submitted_query--; ++ ++ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; ++ readQueryFromClient(conn); ++} ++#endif ++ + void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer) { + client *c; +diff --git a/src/server.c b/src/server.c +index 2cabb2e..c32eccc 100644 +--- a/src/server.c ++++ b/src/server.c +@@ -1534,7 +1534,7 @@ int clientsCronResizeQueryBuffer(client *c) { + { + /* Only resize the query buffer if it is actually wasting + * at least a few kbytes. */ +- if (sdsavail(c->querybuf) > 1024*4) { ++ if (sdsavail(c->querybuf) > 1024*4 && !c->submitted_query) { + c->querybuf = sdsRemoveFreeSpace(c->querybuf); + } + } +diff --git a/src/server.h b/src/server.h +index 3261935..1538935 100644 +--- a/src/server.h ++++ b/src/server.h +@@ -801,6 +801,9 @@ typedef struct client { + redisDb *db; /* Pointer to currently SELECTed DB. */ + robj *name; /* As set by CLIENT SETNAME. */ + sds querybuf; /* Buffer we use to accumulate client queries. */ ++ struct iovec riov; ++ int submitted_query; ++ size_t qblen; + size_t qb_pos; /* The position we have read in querybuf. */ + sds pending_querybuf; /* If this client is flagged as master, this buffer + represents the yet not applied portion of the +@@ -1675,6 +1678,9 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void readQueryFromClient(connection *conn); ++#if defined(HAVE_IO_URING) ++void readDoneFromClient(connection *conn); ++#endif + void addReplyNull(client *c); + void addReplyNullArray(client *c); + void addReplyBool(client *c, int b); +-- +2.18.2 + diff --git a/Use-fast-poll-feature-for-write.patch b/Use-fast-poll-feature-for-write.patch new file mode 100644 index 0000000000000000000000000000000000000000..476987d0f79d135807b6f83849b3ee62c58dc781 --- /dev/null +++ b/Use-fast-poll-feature-for-write.patch @@ -0,0 +1,297 @@ +From 9f170f4a5859b066afba47430e0c03a9f43089ec Mon Sep 17 00:00:00 2001 +From: Jiufei Xue +Date: Fri, 14 Aug 2020 14:48:49 +0800 +Subject: [PATCH 3/7] use fast poll feature for write + +Signed-off-by: Betty < +Signed-off-by: Jiufei Xue +--- + src/ae.c | 6 +++ + src/ae_iouring.c | 1 + + src/networking.c | 131 +++++++++++++++++++++++++++++++++++++++++++++-- + src/server.h | 3 ++ + 4 files changed, 136 insertions(+), 5 deletions(-) + +diff --git a/src/ae.c b/src/ae.c +index cca630c..1d04a11 100644 +--- a/src/ae.c ++++ b/src/ae.c +@@ -505,6 +505,12 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) + /* Fire the writable event. */ + if (fe->mask & mask & AE_WRITABLE) { + if (!fired || fe->wfileProc != fe->rfileProc) { ++#if defined(HAVE_IO_URING) ++ if (!(mask & AE_POLLABLE)) { ++ connection *conn = fe->clientData; ++ conn->cqe_res = eventLoop->fired[j].res; ++ } ++#endif + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +index b109370..48c7858 100644 +--- a/src/ae_iouring.c ++++ b/src/ae_iouring.c +@@ -124,6 +124,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask, + ev->type = mask; + if (!iovecs) + ev->type |= AE_POLLABLE; ++ + io_uring_sqe_set_data(sqe, (void *)ev); + io_uring_submit(state->ring); + +diff --git a/src/networking.c b/src/networking.c +index 59201f8..506041c 100644 +--- a/src/networking.c ++++ b/src/networking.c +@@ -115,6 +115,7 @@ client *createClient(connection *conn) { + #if defined(HAVE_IO_URING) + /* Do not create poll event */ + conn->read_handler = readDoneFromClient; ++ conn->write_handler = writeDoneToClient; + #else + connSetReadHandler(conn, readQueryFromClient); + #endif +@@ -143,6 +144,7 @@ client *createClient(connection *conn) { + c->multibulklen = 0; + c->bulklen = -1; + c->sentlen = 0; ++ c->totwritten = 0; + c->flags = 0; + c->ctime = c->lastinteraction = server.unixtime; + /* If the default user does not require authentication, the user is +@@ -1404,12 +1406,21 @@ int writeToClient(client *c, int handler_installed) { + /* Update total number of writes on server */ + server.stat_total_writes_processed++; + +- ssize_t nwritten = 0, totwritten = 0; + size_t objlen; + clientReplyBlock *o; + ++#if defined(HAVE_IO_URING) ++ (void)handler_installed; ++ if (clientHasPendingReplies(c)) { ++#else ++ ssize_t nwritten = 0, totwritten = 0; + while(clientHasPendingReplies(c)) { ++#endif + if (c->bufpos > 0) { ++#if defined(HAVE_IO_URING) ++ c->wiov.iov_len = c->bufpos-c->sentlen; ++ c->wiov.iov_base = c->buf+c->sentlen; ++#else + nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; +@@ -1421,6 +1432,7 @@ int writeToClient(client *c, int handler_installed) { + c->bufpos = 0; + c->sentlen = 0; + } ++#endif + } else { + o = listNodeValue(listFirst(c->reply)); + objlen = o->used; +@@ -1428,9 +1440,15 @@ int writeToClient(client *c, int handler_installed) { + if (objlen == 0) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); +- continue; ++#if defined(HAVE_IO_URING) ++ return C_OK; + } + ++ c->wiov.iov_len = objlen - c->sentlen; ++ c->wiov.iov_base = o->buf+c->sentlen; ++#else ++ continue; ++ } + nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; +@@ -1446,7 +1464,18 @@ int writeToClient(client *c, int handler_installed) { + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } ++#endif ++ } ++ ++#if defined(HAVE_IO_URING) ++ if (aeCreateFileEventWithBuf(server.el, c->conn->fd, AE_WRITABLE, ++ c->conn->type->ae_handler, c->conn, &(c->wiov)) == AE_ERR) { ++ serverPanic("Unrecoverable error creating WRITABLE file event."); + } ++ } ++ ++#else ++ + /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from +@@ -1464,6 +1493,7 @@ int writeToClient(client *c, int handler_installed) { + zmalloc_used_memory() < server.maxmemory) && + !(c->flags & CLIENT_SLAVE)) break; + } ++ + server.stat_net_output_bytes += totwritten; + if (nwritten == -1) { + if (connGetState(c->conn) == CONN_STATE_CONNECTED) { +@@ -1496,9 +1526,99 @@ int writeToClient(client *c, int handler_installed) { + return C_ERR; + } + } ++#endif + return C_OK; + } + ++#if defined(HAVE_IO_URING) ++void writeDoneToClient(connection *conn) { ++ client *c = connGetPrivateData(conn); ++ ssize_t nwritten = conn->cqe_res; ++ ++ if (nwritten < 0) { ++ if (connGetState(conn) == CONN_STATE_CONNECTED) { ++ nwritten = 0; ++ } else { ++ serverLog(LL_VERBOSE, ++ "Error writing to client: %s", connGetLastError(c->conn)); ++ freeClientAsync(c); ++ return; ++ } ++ } ++ ++ if (nwritten <= 0) return; ++ ++ c->sentlen += nwritten; ++ c->totwritten += nwritten; ++ ++ size_t objlen; ++ clientReplyBlock *o; ++ ++ if (c->bufpos > 0) { ++ /* If the buffer was sent, set bufpos to zero to continue with ++ * the remainder of the reply. */ ++ if ((int)c->sentlen == c->bufpos) { ++ c->bufpos = 0; ++ c->sentlen = 0; ++ } ++ } else { ++ o = listNodeValue(listFirst(c->reply)); ++ objlen = o->used; ++ ++ /* If we fully sent the object on head go to the next one */ ++ if (c->sentlen == objlen) { ++ c->reply_bytes -= o->size; ++ listDelNode(c->reply,listFirst(c->reply)); ++ c->sentlen = 0; ++ /* If there are no longer objects in the list, we expect ++ * the count of reply bytes to be exactly zero. */ ++ if (listLength(c->reply) == 0) ++ serverAssert(c->reply_bytes == 0); ++ } ++ } ++ ++ /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT ++ * bytes, in a single threaded server it's a good idea to serve ++ * other clients as well, even if a very large request comes from ++ * super fast link that is always able to accept data (in real world ++ * scenario think about 'KEYS *' against the loopback interface). ++ * ++ * However if we are over the maxmemory limit we ignore that and ++ * just deliver as much data as it is possible to deliver. ++ * ++ * Moreover, we also send as much as possible if the client is ++ * a slave or a monitor (otherwise, on high-speed traffic, the ++ * replication/output buffer will grow indefinitely) */ ++ if (c->totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 ++ || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) ++ return; ++ ++ if (clientHasPendingReplies(c)) { ++ writeToClient(c, 0); ++ } else { ++ server.stat_net_output_bytes += c->totwritten; ++ if (c->totwritten > 0) { ++ /* For clients representing masters we don't count sending data ++ * as an interaction, since we always send REPLCONF ACK commands ++ * that take some time to just fill the socket output buffer. ++ * We just rely on data / pings received for timeout detection. */ ++ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; ++ } ++ ++ c->sentlen = 0; ++ c->totwritten = 0; ++ ++ /* Close connection after entire reply has been sent. */ ++ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { ++ freeClientAsync(c); ++ } ++ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; ++ readQueryFromClient(conn); ++ ++ } ++} ++#endif ++ + /* Write event handler. Just send data to the client. */ + void sendReplyToClient(connection *conn) { + client *c = connGetPrivateData(conn); +@@ -1530,6 +1650,9 @@ int handleClientsWithPendingWrites(void) { + /* Try to write buffers to the client socket. */ + if (writeToClient(c,0) == C_ERR) continue; + ++ /* writeToClient always prepare a writev request, so clientHasPendingReplies ++ * is always true, need not add event */ ++#if !defined(HAVE_IO_URING) + /* If after the synchronous writes above we still have data to + * output to the client, we need to install the writable handler. */ + if (clientHasPendingReplies(c)) { +@@ -1548,6 +1671,7 @@ int handleClientsWithPendingWrites(void) { + freeClientAsync(c); + } + } ++#endif + } + return processed; + } +@@ -2169,9 +2293,6 @@ void readDoneFromClient(connection *conn) { + * in case to check if there is a full command to execute. */ + processInputBuffer(c); + c->submitted_query--; +- +- if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; +- readQueryFromClient(conn); + } + #endif + +diff --git a/src/server.h b/src/server.h +index 1538935..1639ae4 100644 +--- a/src/server.h ++++ b/src/server.h +@@ -802,6 +802,7 @@ typedef struct client { + robj *name; /* As set by CLIENT SETNAME. */ + sds querybuf; /* Buffer we use to accumulate client queries. */ + struct iovec riov; ++ struct iovec wiov; + int submitted_query; + size_t qblen; + size_t qb_pos; /* The position we have read in querybuf. */ +@@ -824,6 +825,7 @@ typedef struct client { + unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */ + size_t sentlen; /* Amount of bytes already sent in the current + buffer or object being sent. */ ++ size_t totwritten; + time_t ctime; /* Client creation time. */ + time_t lastinteraction; /* Time of the last interaction, used for timeout */ + time_t obuf_soft_limit_reached_time; +@@ -1680,6 +1682,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void readQueryFromClient(connection *conn); + #if defined(HAVE_IO_URING) + void readDoneFromClient(connection *conn); ++void writeDoneToClient(connection *conn); + #endif + void addReplyNull(client *c); + void addReplyNullArray(client *c); +-- +2.18.2 + diff --git a/Use-io_uring_submit_and_wait.patch b/Use-io_uring_submit_and_wait.patch new file mode 100644 index 0000000000000000000000000000000000000000..b69f9b91688588ee8861099a7dfceda204eb2f74 --- /dev/null +++ b/Use-io_uring_submit_and_wait.patch @@ -0,0 +1,44 @@ +From 16f9d0129431b335e87c229444996c7bca7d44b8 Mon Sep 17 00:00:00 2001 +From: Jiufei Xue +Date: Fri, 14 Aug 2020 14:53:13 +0800 +Subject: [PATCH 4/7] use io_uring_submit_and_wait + +Signed-off-by: Betty < +Signed-off-by: Jiufei Xue +--- + src/ae_iouring.c | 5 +---- + 1 file changed, 1 insertion(+), 4 deletions(-) + +diff --git a/src/ae_iouring.c b/src/ae_iouring.c +index 48c7858..88ea3e1 100644 +--- a/src/ae_iouring.c ++++ b/src/ae_iouring.c +@@ -126,7 +126,6 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask, + ev->type |= AE_POLLABLE; + + io_uring_sqe_set_data(sqe, (void *)ev); +- io_uring_submit(state->ring); + + return 0; + } +@@ -140,7 +139,6 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { + + uring_event *ev = &state->events[fd]; + io_uring_prep_poll_remove(sqe, (void *)ev); +- io_uring_submit(state->ring); + } + + static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { +@@ -150,8 +148,7 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { + /* TODO: handle timeout */ + (void)tvp; + +- struct io_uring_cqe *cqe; +- retval = io_uring_wait_cqe(state->ring, &cqe); ++ retval = io_uring_submit_and_wait(state->ring, 1); + if (retval < 0) { + return numevents; + } +-- +2.18.2 + diff --git a/redis6.spec b/redis6.spec index 1135107d020293278a70d2bfeeaba62e2f52ec19..3e5f07d54905dbe5d2d38a46a62d8a4532544cd9 100644 --- a/redis6.spec +++ b/redis6.spec @@ -20,6 +20,13 @@ Source9: macros.%{Pname} Source10: https://github.com/%{Pname}/%{Pname}-doc/archive/%{doc_commit}/%{Pname}-doc-%{short_doc_commit}.tar.gz Patch0001: Modify-aarch64-architecture-jemalloc-page-size-from-from-4k-to-64k.patch +Patch0002: Add-io_uring-support.patch +Patch0003: Use-fast-poll-feature-for-read.patch +Patch0004: Use-fast-poll-feature-for-write.patch +Patch0005: Use-io_uring_submit_and_wait.patch +Patch0006: Ae_iouring-add-register-files-support.patch +Patch0007: Ae_iouring-enable-sqpoll.patch +Patch0008: Fix-compile-err-unknown-type-sds-in-connection.h.patch BuildRequires: make gcc %if %{with tests} @@ -80,6 +87,13 @@ tar -xvf %{SOURCE10} %ifarch aarch64 %patch0001 -p1 %endif +%patch0002 -p1 +%patch0003 -p1 +%patch0004 -p1 +%patch0005 -p1 +%patch0006 -p1 +%patch0007 -p1 +%patch0008 -p1 mv ../%{Pname}-doc-%{doc_commit} doc mv deps/lua/COPYRIGHT COPYRIGHT-lua mv deps/jemalloc/COPYING COPYING-jemalloc