12 Star 0 Fork 11

Kunpeng/boostkit-database
暂停

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
add_support_for_kbaio_to_redis6.0.20.patch 27.02 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
diff --git a/src/Makefile b/src/Makefile
index 691d5434..224de813 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -154,7 +154,7 @@ ifeq ($(uname_S),Haiku)
else
# All the other OSes (notably Linux)
FINAL_LDFLAGS+= -rdynamic
- FINAL_LIBS+=-ldl -pthread -lrt
+ FINAL_LIBS+=-ldl -pthread -lrt -lkbaio
endif
endif
endif
@@ -166,7 +166,7 @@ endif
endif
endif
# Include paths to dependencies
-FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src
+FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I ./
# Determine systemd support and/or build preference (defaulting to auto-detection)
BUILD_WITH_SYSTEMD=no
@@ -253,7 +253,7 @@ REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o siph
REDIS_CHECK_RDB_NAME=redis-check-rdb$(PROG_SUFFIX)
REDIS_CHECK_AOF_NAME=redis-check-aof$(PROG_SUFFIX)
-all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME)
+all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) #$(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME)
@echo ""
@echo "Hint: It's a good idea to run 'make test' ;)"
@echo ""
@@ -334,7 +334,7 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ
$(REDIS_CC) -MMD -o $@ -c $<
clean:
- rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
+ rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
rm -f $(DEP)
.PHONY: clean
@@ -392,11 +392,11 @@ src/help.h:
install: all
@mkdir -p $(INSTALL_BIN)
$(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
- $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
- $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
+ #$(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
+ #$(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
$(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN)
$(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
@ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME)
uninstall:
- rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)}
+ rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)}
diff --git a/src/ae.c b/src/ae.c
index c5166656..b98f26d8 100644
--- a/src/ae.c
+++ b/src/ae.c
@@ -43,24 +43,31 @@
#include "ae.h"
#include "zmalloc.h"
#include "config.h"
+#include "connection.h"
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
- #ifdef HAVE_EPOLL
- #include "ae_epoll.c"
+ #ifdef HAVE_KBAIO
+ #include "ae_kbaio.h"
#else
- #ifdef HAVE_KQUEUE
- #include "ae_kqueue.c"
+ #ifdef HAVE_EPOLL
+ #include "ae_epoll.c"
#else
- #include "ae_select.c"
+ #ifdef HAVE_KQUEUE
+ #include "ae_kqueue.c"
+ #else
+ #include "ae_select.c"
+ #endif
#endif
#endif
#endif
-aeEventLoop *aeCreateEventLoop(int setsize) {
+int (*aeApiAddEventPtr)(aeEventLoop *eventloop, int fd, int mask, struct iovec *iovec);
+
+aeEventLoop *aeCreateEventLoop(int setsize, aeKbaioFlags *kbaio) {
aeEventLoop *eventLoop;
int i;
@@ -77,6 +84,9 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
+ eventLoop->kbaio = kbaio;
+ aeApiAddEventPtr = (int (*)(aeEventLoop*, int, int, struct iovec*))aeApiAddEvent;
+
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
@@ -88,6 +98,7 @@ err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
+ zfree(eventLoop->kbaio);
zfree(eventLoop);
}
return NULL;
@@ -135,6 +146,7 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
+ zfree(eventLoop->kbaio);
/* Free the time events list. */
aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
@@ -152,6 +164,12 @@ void aeStop(aeEventLoop *eventLoop) {
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
+{
+ return aeCreateFileEventWithBuf(eventLoop, fd, mask, proc, clientData, NULL);
+}
+
+int aeCreateFileEventWithBuf(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData, struct iovec *iovec)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
@@ -159,8 +177,9 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
}
aeFileEvent *fe = &eventLoop->events[fd];
- if (aeApiAddEvent(eventLoop, fd, mask) == -1)
+ if (aeApiAddEventPtr(eventLoop, fd, mask, iovec) == -1) {
return AE_ERR;
+ }
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
@@ -533,6 +552,10 @@ int aeWait(int fd, int mask, long long milliseconds) {
}
}
+void aeRegisterFile(aeEventLoop *eventLoop, int fd) {
+ aeApiRegisterFile(eventLoop, fd);
+}
+
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
diff --git a/src/ae.h b/src/ae.h
index d1b7f34b..3cbf73ba 100644
--- a/src/ae.h
+++ b/src/ae.h
@@ -34,18 +34,7 @@
#define __AE_H__
#include <time.h>
-
-#define AE_OK 0
-#define AE_ERR -1
-
-#define AE_NONE 0 /* No events registered. */
-#define AE_READABLE 1 /* Fire when descriptor is readable. */
-#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
-#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the
- READABLE event already fired in the same event
- loop iteration. Useful when you want to persist
- things to disk before sending replies, and want
- to do that in a group fashion. */
+#include "common.h"
#define AE_FILE_EVENTS (1<<0)
#define AE_TIME_EVENTS (1<<1)
@@ -60,64 +49,14 @@
/* Macros */
#define AE_NOTUSED(V) ((void) V)
-struct aeEventLoop;
-
-/* Types and data structures */
-typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
-typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
-typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
-typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
-
-/* File event structure */
-typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
- aeFileProc *rfileProc;
- aeFileProc *wfileProc;
- void *clientData;
-} aeFileEvent;
-
-/* Time event structure */
-typedef struct aeTimeEvent {
- long long id; /* time event identifier. */
- long when_sec; /* seconds */
- long when_ms; /* milliseconds */
- aeTimeProc *timeProc;
- aeEventFinalizerProc *finalizerProc;
- void *clientData;
- struct aeTimeEvent *prev;
- struct aeTimeEvent *next;
- int refcount; /* refcount to prevent timer events from being
- * freed in recursive time event calls. */
-} aeTimeEvent;
-
-/* A fired event */
-typedef struct aeFiredEvent {
- int fd;
- int mask;
-} aeFiredEvent;
-
-/* State of an event based program */
-typedef struct aeEventLoop {
- int maxfd; /* highest file descriptor currently registered */
- int setsize; /* max number of file descriptors tracked */
- long long timeEventNextId;
- time_t lastTime; /* Used to detect system clock skew */
- aeFileEvent *events; /* Registered events */
- aeFiredEvent *fired; /* Fired events */
- aeTimeEvent *timeEventHead;
- int stop;
- void *apidata; /* This is used for polling API specific data */
- aeBeforeSleepProc *beforesleep;
- aeBeforeSleepProc *aftersleep;
- int flags;
-} aeEventLoop;
-
/* Prototypes */
-aeEventLoop *aeCreateEventLoop(int setsize);
+aeEventLoop *aeCreateEventLoop(int setsize, aeKbaioFlags *kbaio);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
+int aeCreateFileEventWithBuf(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData, struct iovec *iovecs);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
@@ -133,5 +72,6 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
+void aeRegisterFile(aeEventLoop *eventLoop, int fd);
#endif
diff --git a/src/common.h b/src/common.h
new file mode 100644
index 00000000..87086c6d
--- /dev/null
+++ b/src/common.h
@@ -0,0 +1,73 @@
+#define AE_OK 0
+#define AE_ERR -1
+
+#define AE_NONE 0 /* No events registered. */
+#define AE_READABLE 1 /* Fire when descriptor is readable. */
+#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
+#define AE_BARRIER 4 /* With WRITABLE, never fire the event if the\
+ READABLE event already fired in the same event\
+ loop iteration. Useful when you want to persist\
+ things to disk before sending replies, and want\
+ to do that in a group fashion. */
+
+struct aeEventLoop;
+struct iovec;
+
+/* Types and data structures */
+typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
+typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
+typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
+typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
+
+/* File event structure */
+typedef struct aeFileEvent {
+ int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
+ aeFileProc *rfileProc;
+ aeFileProc *wfileProc;
+ void *clientData;
+ int res;
+} aeFileEvent;
+
+/* Time event structure */
+typedef struct aeTimeEvent {
+ long long id; /* time event identifier. */
+ long when_sec; /* seconds */
+ long when_ms; /* milliseconds */
+ aeTimeProc *timeProc;
+ aeEventFinalizerProc *finalizerProc;
+ void *clientData;
+ struct aeTimeEvent *prev;
+ struct aeTimeEvent *next;
+ int refcount; /* refcount to prevent timer events from being
+ * freed in recursive time event calls. */
+} aeTimeEvent;
+
+/* A fired event */
+typedef struct aeFiredEvent {
+ int fd;
+ int mask;
+} aeFiredEvent;
+
+/* kbaio event structure */
+typedef struct aeKbaioFlags {
+ int sqpoll;
+ int sq_thread_cpu;
+ int specify_core;
+} aeKbaioFlags;
+
+/* State of an event based program */
+typedef struct aeEventLoop {
+ int maxfd; /* highest file descriptor currently registered */
+ int setsize; /* max number of file descriptors tracked */
+ long long timeEventNextId;
+ time_t lastTime; /* Used to detect system clock skew */
+ aeFileEvent *events; /* Registered events */
+ aeFiredEvent *fired; /* Fired events */
+ aeTimeEvent *timeEventHead;
+ int stop;
+ void *apidata; /* This is used for polling API specific data */
+ aeBeforeSleepProc *beforesleep;
+ aeBeforeSleepProc *aftersleep;
+ int flags;
+ aeKbaioFlags *kbaio;
+} aeEventLoop;
\ No newline at end of file
diff --git a/src/config.c b/src/config.c
index 03104f56..1ca6c2a5 100644
--- a/src/config.c
+++ b/src/config.c
@@ -284,6 +284,16 @@ void resetServerSaveParams(void) {
server.saveparamslen = 0;
}
+void setServerSqpoll() {
+ server.sqpoll = 1;
+ server.specify_core = 0;
+}
+
+void setServerSqpollThreadCpu(int index) {
+ server.specify_core = 1;
+ server.sq_thread_cpu = index;
+}
+
void queueLoadModule(sds path, sds *argv, int argc) {
int i;
struct moduleLoadQueueEntry *loadmod;
@@ -595,6 +605,16 @@ void loadServerConfigFromString(char *config) {
err = sentinelHandleConfiguration(argv+1,argc-1);
if (err) goto loaderr;
}
+ } else if (!strcasecmp(argv[0], "sqpoll")) {
+ setServerSqpoll();
+ if (argc == 2) {
+ int index = atoi(argv[1]);
+ if (index < 0) {
+ err = "Invalid kbaio sqpoll thread parameters specified";
+ goto loaderr;
+ }
+ setServerSqpollThreadCpu(index);
+ }
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
diff --git a/src/config.h b/src/config.h
index bbda595a..7eba2157 100644
--- a/src/config.h
+++ b/src/config.h
@@ -79,6 +79,11 @@
#define HAVE_EPOLL 1
#endif
+/* Test for io kbaio API */
+#ifdef __linux__
+#define HAVE_KBAIO 1
+#endif
+
#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#define HAVE_KQUEUE 1
#endif
diff --git a/src/connection.c b/src/connection.c
index 83fb84d6..c5965d5e 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -29,6 +29,7 @@
#include "server.h"
#include "connhelpers.h"
+#include "networking_kbaio.h"
/* The connections module provides a lean abstraction of network connections
* to avoid direct socket and async event management across the Redis code base.
@@ -50,6 +51,7 @@
*/
ConnectionType CT_Socket;
+ConnectionType CT_Kbaio;
/* When a connection is created we must know its type already, but the
* underlying socket may or may not exist:
@@ -76,7 +78,11 @@ ConnectionType CT_Socket;
connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection));
- conn->type = &CT_Socket;
+ if (server.enable_kbaio) {
+ conn->type = &CT_Kbaio;
+ } else {
+ conn->type = &CT_Socket;
+ }
conn->fd = -1;
return conn;
@@ -147,8 +153,11 @@ void *connGetPrivateData(connection *conn) {
/* Close the connection and free resources. */
static void connSocketClose(connection *conn) {
if (conn->fd != -1) {
- aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
- aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
+ aeFileEvent *fe = &server.el->events[conn->fd];
+ if (!server.enable_kbaio || fe->rfileProc == acceptTcpHandler) {
+ aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
+ aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
+ }
close(conn->fd);
conn->fd = -1;
}
@@ -254,10 +263,8 @@ static const char *connSocketGetLastError(connection *conn) {
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
- UNUSED(el);
- UNUSED(fd);
connection *conn = clientData;
-
+ conn->cqe_res = el->events[fd].res;
if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {
@@ -364,6 +371,23 @@ ConnectionType CT_Socket = {
};
+ConnectionType CT_Kbaio = {
+ .ae_handler = connSocketEventHandler,
+ .close = connSocketClose,
+ .write = NULL,
+ .read = NULL,
+ .accept = connSocketAccept,
+ .connect = connSocketConnect,
+ .set_write_handler = connSetWriteHandlerKbaio,
+ .set_read_handler = connSetReadHandlerKbaio,
+ .get_last_error = connSocketGetLastError,
+ .blocking_connect = connSocketBlockingConnect,
+ .sync_write = connSocketSyncWrite,
+ .sync_read = connSocketSyncRead,
+ .sync_readline = connSocketSyncReadLine,
+ .get_type = connSocketGetType
+};
+
int connGetSocketError(connection *conn) {
int sockerr = 0;
socklen_t errlen = sizeof(sockerr);
diff --git a/src/connection.h b/src/connection.h
index 03281a3d..cad51161 100644
--- a/src/connection.h
+++ b/src/connection.h
@@ -28,6 +28,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
+#include "sds.h"
#ifndef __REDIS_CONNECTION_H
#define __REDIS_CONNECTION_H
@@ -80,6 +81,7 @@ struct connection {
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
+ int cqe_res;
int fd;
};
diff --git a/src/networking.c b/src/networking.c
index af6e0a87..41bcac94 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -30,6 +30,7 @@
#include "server.h"
#include "atomicvar.h"
#include "cluster.h"
+#include "networking_kbaio.h"
#include <sys/socket.h>
#include <sys/uio.h>
#include <math.h>
@@ -118,7 +119,12 @@ client *createClient(connection *conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
- connSetReadHandler(conn, readQueryFromClient);
+ if (server.enable_kbaio) {
+ connSetReadHandler(conn, readDoneFromClientKbaio);
+ connSetWriteHandler(conn, writeDoneToClientKbaio);
+ } else {
+ connSetReadHandler(conn, readQueryFromClient);
+ }
connSetPrivateData(conn, c);
}
@@ -130,6 +136,9 @@ client *createClient(connection *conn) {
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
+ c->submitted_query = 0;
+ c->qblen = 0;
+ c->totwritten = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
@@ -1054,6 +1063,9 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip) {
freeClient(connGetPrivateData(conn));
return;
}
+ if (server.enable_kbaio) {
+ setCreateReadEventKbaio(conn);
+ }
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -1069,9 +1081,13 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
+ if (server.enable_kbaio && aeCreateFileEvent(el, fd, AE_READABLE, acceptTcpHandler, NULL) == AE_ERR) {
+ serverPanic("Unrecoverable error creating server.ipfd file event.");
+ }
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
+ aeRegisterFile(el, cfd);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
@@ -1089,9 +1105,13 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
+ if (server.enable_kbaio && aeCreateFileEvent(el, fd, AE_READABLE, acceptTLSHandler, NULL) == AE_ERR) {
+ serverPanic("Unrecoverable error creating server.ipfd file event.");
+ }
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
+ aeRegisterFile(el, cfd);
acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip);
}
}
@@ -1108,9 +1128,13 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
+ if (server.enable_kbaio && aeCreateFileEvent(el, fd, AE_READABLE, acceptUnixHandler, NULL) == AE_ERR) {
+ serverPanic("Unrecoverable error creating server.ipfd file event.");
+ }
return;
}
serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
+ aeRegisterFile(el, cfd);
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
}
}
@@ -1391,10 +1415,16 @@ int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
server.stat_total_writes_processed++;
- ssize_t nwritten = 0, totwritten = 0;
+ if (server.enable_kbaio) {
+ (void)handler_installed;
+ writeToClientKbaio(c);
+ return C_OK;
+ }
+
size_t objlen;
clientReplyBlock *o;
+ ssize_t nwritten = 0, totwritten = 0;
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
@@ -1515,8 +1545,10 @@ int handleClientsWithPendingWrites(void) {
if (c->flags & CLIENT_CLOSE_ASAP) continue;
/* Try to write buffers to the client socket. */
- if (writeToClient(c,0) == C_ERR) continue;
+ if (writeToClient(c,0) == C_ERR || server.enable_kbaio) continue;
+ /* writeToClient always prapare a writev request, so clientHasPendingReplies
+ * is always true, need noit add event */
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
@@ -2037,7 +2069,7 @@ void processInputBuffer(client *c) {
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
- int nread, readlen;
+ int readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
@@ -2064,10 +2096,10 @@ void readQueryFromClient(connection *conn) {
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
- qblen = sdslen(c->querybuf);
+ qblen = c->qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
- nread = connRead(c->conn, c->querybuf+qblen, readlen);
+ int nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c
index bec92fb3..e18deb7c 100644
--- a/src/redis-benchmark.c
+++ b/src/redis-benchmark.c
@@ -648,6 +648,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
}
+ aeRegisterFile(config.el, c->context->fd);
c->thread_id = thread_id;
/* Suppress hiredis cleanup of unused buffers for max speed. */
c->context->reader->maxbuf = 0;
@@ -930,7 +931,7 @@ static benchmarkThread *createBenchmarkThread(int index) {
benchmarkThread *thread = zmalloc(sizeof(*thread));
if (thread == NULL) return NULL;
thread->index = index;
- thread->el = aeCreateEventLoop(1024*10);
+ thread->el = aeCreateEventLoop(1024*10, NULL);
aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL);
return thread;
}
@@ -1532,7 +1533,7 @@ int main(int argc, const char **argv) {
config.numclients = 50;
config.requests = 100000;
config.liveclients = 0;
- config.el = aeCreateEventLoop(1024*10);
+ config.el = aeCreateEventLoop(1024*10, NULL);
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
config.keepalive = 1;
config.datasize = 3;
diff --git a/src/server.c b/src/server.c
index 5f335905..c906ec2e 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1536,7 +1536,7 @@ int clientsCronResizeQueryBuffer(client *c) {
{
/* Only resize the query buffer if it is actually wasting
* at least a few kbytes. */
- if (sdsavail(c->querybuf) > 1024*4) {
+ if (sdsavail(c->querybuf) > 1024*4 && !c->submitted_query) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
@@ -2470,6 +2470,10 @@ void initServerConfig(void) {
* Redis 5. However it is possible to revert it via redis.conf. */
server.lua_always_replicate_commands = 1;
+ /* By default to using kbaio */
+ server.enable_kbaio = 1;
+ server.sqpoll = 0;
+
initConfigValues();
}
@@ -2873,7 +2877,17 @@ void initServer(void) {
createSharedObjects();
adjustOpenFilesLimit();
- server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
+
+ aeKbaioFlags *kbaio = NULL;
+ if (server.enable_kbaio) {
+ kbaio = zmalloc(sizeof(aeKbaioFlags));
+ kbaio->sqpoll = server.sqpoll;
+ kbaio->specify_core = server.specify_core;
+ if (kbaio->specify_core) {
+ kbaio->sq_thread_cpu = server.sq_thread_cpu;
+ }
+ }
+ server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR, kbaio);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
@@ -2978,6 +2992,7 @@ void initServer(void) {
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
+ aeRegisterFile(server.el, server.ipfd[j]);
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
@@ -2986,6 +3001,7 @@ void initServer(void) {
}
}
for (j = 0; j < server.tlsfd_count; j++) {
+ aeRegisterFile(server.el, server.tlsfd[j]);
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
acceptTLSHandler,NULL) == AE_ERR)
{
@@ -2999,6 +3015,7 @@ void initServer(void) {
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
+ aeRegisterFile(server.el, server.module_blocked_pipe[0]);
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
diff --git a/src/server.h b/src/server.h
index 8bd00f80..e3956d30 100644
--- a/src/server.h
+++ b/src/server.h
@@ -808,6 +808,11 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
+ struct iovec riov;
+ struct iovec wiov;
+ int submitted_query;
+ size_t qblen;
+ size_t totwritten;
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
@@ -1496,6 +1501,11 @@ struct redisServer {
char *bio_cpulist; /* cpu affinity list of bio thread. */
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
+ /* kbaio config */
+ int enable_kbaio;
+ int sqpoll;
+ int sq_thread_cpu;
+ int specify_core;
};
typedef struct pubsubPattern {
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/kunpeng_compute/boostkit-database.git
git@gitee.com:kunpeng_compute/boostkit-database.git
kunpeng_compute
boostkit-database
boostkit-database
redis-6.0.20-kbaio

搜索帮助