diff --git a/include/uv.h b/include/uv.h index 0d39684b8e71dca3ea92a16321fa7e480a295c0a..23c6ece3fcb38960a517dbe68949c7c5a91c2fb3 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1180,6 +1180,12 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_cb work_cb, uv_after_work_cb after_work_cb); +UV_EXTERN int uv_queue_work_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + const char* task_name); + UV_EXTERN int uv_cancel(uv_req_t* req); typedef enum { @@ -1196,6 +1202,13 @@ UV_EXTERN int uv_queue_work_with_qos(uv_loop_t* loop, uv_after_work_cb after_work_cb, uv_qos_t qos); +UV_EXTERN int uv_queue_work_with_qos_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos, + const char* task_name); + struct uv_cpu_times_s { uint64_t user; /* milliseconds */ uint64_t nice; /* milliseconds */ @@ -1926,7 +1939,7 @@ union uv_any_req { #undef XX typedef void (*uv_io_cb)(void* work, int status); -typedef void (*uv_post_task)(void* handler, uv_io_cb func, void* work, int status, int prio); +typedef void (*uv_post_task)(const char* task_name, uv_io_cb func, void* work, int status, int prio); struct uv_loop_data { void* event_handler; diff --git a/include/uv/threadpool.h b/include/uv/threadpool.h index 190d20570daaa07b453de1006b2c9dc5bb3f9bb3..fe1328259bc424831d79cb7e804c7e5c5389542c 100644 --- a/include/uv/threadpool.h +++ b/include/uv/threadpool.h @@ -63,7 +63,11 @@ struct uv__statistic_work { #endif struct uv__work { +#ifdef USE_FFRT + void (*work)(struct uv__work *w, int qos); +#else void (*work)(struct uv__work *w); +#endif void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; struct uv__queue wq; diff --git a/src/random.c b/src/random.c index a6917ca39b31e402390f45527923b15907f9f328..a5bd1f50b41ea6c60571caab2e33b410464156ae 100644 --- a/src/random.c +++ b/src/random.c @@ -70,11 +70,18 @@ static int uv__random(void* buf, size_t buflen) { } +#ifdef USE_FFRT +static void uv__random_work(struct uv__work* w, int qos) { +#else static void uv__random_work(struct uv__work* w) { +#endif uv_random_t* req; req = container_of(w, uv_random_t, work_req); req->status = uv__random(req->buf, req->buflen); +#ifdef USE_FFRT + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); +#endif } diff --git a/src/threadpool.c b/src/threadpool.c index ee86f7b1340f8bf93ff472f74b92dc445bab9f82..b45f145bedd268dbf736eaf9724a475b09befabe 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -329,7 +329,11 @@ void on_uv_loop_close(uv_loop_t* loop) { } +#ifdef USE_FFRT +static void uv__cancelled(struct uv__work* w, int qos) { +#else static void uv__cancelled(struct uv__work* w) { +#endif abort(); } @@ -603,6 +607,41 @@ static void uv__task_done_wrapper(void* work, int status) { uv__print_active_reqs(w->loop, "complete"); w->done(w, status); } + + +void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos) { + uv_loop_t* loop = w->loop; +#ifdef UV_STATISTIC + uv__post_statistic_work(w, WORK_END); +#endif + rdlock_closed_uv_loop_rwlock(); + if (loop->magic != UV_LOOP_MAGIC) { + rdunlock_closed_uv_loop_rwlock(); + UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid", + (size_t)loop, loop->magic); + return; + } + + uv_mutex_lock(&loop->wq_mutex); + w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ + + if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) { + int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; + struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data - + (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); + if (req->type == UV_WORK) { + addr->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)2, status, qos); + } else { + addr->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); + } + } else { + uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); + uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); + uv_async_send(&loop->wq_async); + } + uv_mutex_unlock(&loop->wq_mutex); + rdunlock_closed_uv_loop_rwlock(); +} #endif @@ -653,7 +692,11 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)w->loop->data - (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); - addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); + if (req->type == UV_WORK) { + addr->post_task_func((char*)req->reserved[1], uv__task_done_wrapper, (void*)2, status, qos); + } else { + addr->post_task_func(NULL, uv__task_done_wrapper, (void*)w, status, qos); + } } else { uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); uv_async_send(&loop->wq_async); @@ -745,12 +788,31 @@ void uv__work_done(uv_async_t* handle) { } +#ifdef USE_FFRT +static void uv__queue_work(struct uv__work* w, int qos) { +#else static void uv__queue_work(struct uv__work* w) { +#endif uv_work_t* req = container_of(w, uv_work_t, work_req); + if (req->reserved[1] == NULL) { + req->work_cb(req); +#ifdef USE_FFRT + uv__work_submit_to_eventloop(req, w, qos); +#endif + return; + } + + char task_name[128]; + snprintf(task_name, sizeof(task_name), "uv execute:%s", (char*)req->reserved[1]); + uv_start_trace(UV_TRACE_TAG, task_name); #ifdef ASYNC_STACKTRACE LibuvSetStackId((uint64_t)req->reserved[3]); #endif req->work_cb(req); +#ifdef USE_FFRT + uv__work_submit_to_eventloop(req, w, qos); +#endif + uv_end_trace(UV_TRACE_TAG); } @@ -784,32 +846,6 @@ void uv__ffrt_work(ffrt_executor_task_t* data, ffrt_qos_t qos) uv__post_statistic_work(w, WORK_EXECUTING); #endif w->work(w); -#ifdef UV_STATISTIC - uv__post_statistic_work(w, WORK_END); -#endif - rdlock_closed_uv_loop_rwlock(); - if (loop->magic != UV_LOOP_MAGIC) { - rdunlock_closed_uv_loop_rwlock(); - UV_LOGE("uv_loop(%{public}zu:%{public}#x), task is invalid", - (size_t)loop, loop->magic); - return; - } - - uv_mutex_lock(&loop->wq_mutex); - w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - - if (uv_check_data_valid((struct uv_loop_data*)(loop->data)) == 0) { - int status = (w->work == uv__cancelled) ? UV_ECANCELED : 0; - struct uv_loop_data* addr = (struct uv_loop_data*)((uint64_t)loop->data - - (UV_EVENT_MAGIC_OFFSET << UV_EVENT_MAGIC_OFFSETBITS)); - addr->post_task_func(addr->event_handler, uv__task_done_wrapper, (void*)w, status, qos); - } else { - uv__loop_internal_fields_t* lfields = uv__get_internal_fields(loop); - uv__queue_insert_tail(&(lfields->wq_sub[qos]), &w->wq); - uv_async_send(&loop->wq_async); - } - uv_mutex_unlock(&loop->wq_mutex); - rdunlock_closed_uv_loop_rwlock(); } static void init_once(void) @@ -828,7 +864,7 @@ void uv__work_submit(uv_loop_t* loop, uv_req_t* req, struct uv__work* w, enum uv__work_kind kind, - void (*work)(struct uv__work *w), + void (*work)(struct uv__work *w, int qos), void (*done)(struct uv__work *w, int status)) { uv_once(&once, init_once); ffrt_task_attr_t attr; @@ -866,7 +902,7 @@ void uv__work_submit_with_qos(uv_loop_t* loop, uv_req_t* req, struct uv__work* w, ffrt_qos_t qos, - void (*work)(struct uv__work *w), + void (*work)(struct uv__work *w, int qos), void (*done)(struct uv__work *w, int status)) { uv_once(&once, init_once); ffrt_task_attr_t attr; @@ -893,6 +929,7 @@ int uv_queue_work(uv_loop_t* loop, uv__print_active_reqs(loop, "execute"); uv__req_init(loop, req, UV_WORK); + req->reserved[1] = NULL; req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; @@ -927,11 +964,66 @@ int uv_queue_work(uv_loop_t* loop, } +int uv_queue_work_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + const char* task_name) { + if (work_cb == NULL) + return UV_EINVAL; + + int ret = uv__copy_taskname((uv_req_t*)req, task_name); + if (ret != 0) { + return ret; + } + + char task_name[128]; + snprintf(task_name, sizeof(task_name), "uv_submit:%s", (char*)req->reserved[1]); + uv__print_active_reqs(loop, "execute"); + + uv__req_init(loop, req, UV_WORK); + req->reserved[1] = NULL; + req->loop = loop; + req->work_cb = work_cb; + req->after_work_cb = after_work_cb; + +#ifdef UV_STATISTIC + struct uv_work_dump_info* info = (struct uv_work_dump_info*) malloc(sizeof(struct uv_work_dump_info)); + if (info == NULL) { + abort(); + } + uv_init_dump_info(info, &req->work_req); + info->builtin_return_address[0] = __builtin_return_address(0); + info->builtin_return_address[1] = __builtin_return_address(1); + info->builtin_return_address[2] = __builtin_return_address(2); + (req->work_req).info = info; +#endif +#ifdef ASYNC_STACKTRACE + req->reserved[3] = (void*)LibuvCollectAsyncStack(); +#endif + uv_start_trace(UV_TRACE_TAG, task_name); + uv__work_submit(loop, +#ifdef USE_FFRT + (uv_req_t*)req, +#endif + &req->work_req, + UV__WORK_CPU, + uv__queue_work, + uv__queue_done + ); + uv_end_trace(UV_TRACE_TAG); +#ifdef UV_STATISTIC + uv_queue_statics(info); +#endif + return 0; +} + + int uv_queue_work_with_qos(uv_loop_t* loop, - uv_work_t* req, - uv_work_cb work_cb, - uv_after_work_cb after_work_cb, - uv_qos_t qos) { + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + uv_qos_t qos) { #ifdef USE_FFRT if (work_cb == NULL) return UV_EINVAL; @@ -947,6 +1039,7 @@ int uv_queue_work_with_qos(uv_loop_t* loop, uv__print_active_reqs(loop, "execute"); uv__req_init(loop, req, UV_WORK); + req->reserved[1] = NULL; req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; @@ -977,6 +1070,65 @@ int uv_queue_work_with_qos(uv_loop_t* loop, } +int uv_queue_work_internal_with_qos_internal(uv_loop_t* loop, + uv_work_t* req, + uv_work_cb work_cb, + uv_after_work_cb after_work_cb, + const char* task_name) { +#ifdef USE_FFRT + if (work_cb == NULL) + return UV_EINVAL; + + STATIC_ASSERT(uv_qos_background == ffrt_qos_background); + STATIC_ASSERT(uv_qos_utility == ffrt_qos_utility); + STATIC_ASSERT(uv_qos_default == ffrt_qos_default); + STATIC_ASSERT(uv_qos_user_initiated == ffrt_qos_user_initiated); + STATIC_ASSERT(uv_qos_user_interactive == ffrt_qos_deadline_request); + if (qos < ffrt_qos_background || qos > ffrt_qos_deadline_request) { + return UV_EINVAL; + } + + uv__print_active_reqs(loop, "execute"); + uv__req_init(loop, req, UV_WORK); + req->loop = loop; + req->work_cb = work_cb; + req->after_work_cb = after_work_cb; + int ret = uv__copy_taskname(req, task_name); + if (ret != 0) { + return ret; + } + + char task_name[128]; + snprintf(task_name, sizeof(task_name), "uv submit:%s", (char*)req->reserved[1]); + uv_start_trace(UV_TRACE_TAG); +#ifdef UV_STATISTIC + struct uv_work_dump_info* info = (struct uv_work_dump_info*)malloc(sizeof(struct uv_work_dump_info)); + if (info == NULL) { + abort(); + } + uv_init_dump_info(info, &req->work_req); + info->builtin_return_address[0] = __builtin_return_address(0); + info->builtin_return_address[1] = __builtin_return_address(1); + info->builtin_return_address[2] = __builtin_return_address(2); + (req->work_req).info = info; +#endif + uv__work_submit_with_qos(loop, + (uv_req_t*)req, + &req->work_req, + (ffrt_qos_t)qos, + uv__queue_work, + uv__queue_done); +#ifdef UV_STATISTIC + uv_queue_statics(info); +#endif + uv_end_trace(UV_TRACE_TAG); + return 0; +#else + return uv_queue_work(loop, req, work_cb, after_work_cb); +#endif +} + + int uv_cancel(uv_req_t* req) { struct uv__work* wreq; uv_loop_t* loop; diff --git a/src/unix/fs.c b/src/unix/fs.c index 484e40b459022f1645091ff2549efe1d72db91e9..74e8b5a1c4b5d7030b5f9b086c36143de2225694 100644 --- a/src/unix/fs.c +++ b/src/unix/fs.c @@ -160,7 +160,7 @@ extern char *mkdtemp(char *template); /* See issue #740 on AIX < 7 */ return 0; \ } \ else { \ - uv__fs_work(&req->work_req); \ + uv__fs_work(&req->work_req, -1); \ return req->result; \ } \ } \ @@ -1564,7 +1564,11 @@ static ssize_t uv__fs_write_all(uv_fs_t* req) { } +#ifdef USE_FFRT +static void uv__fs_work(struct uv__work* w, int qos) { +#else static void uv__fs_work(struct uv__work* w) { +#endif int retry_on_eintr; uv_fs_t* req; ssize_t r; @@ -1633,6 +1637,9 @@ static void uv__fs_work(struct uv__work* w) { req->fs_type == UV_FS_LSTAT)) { req->ptr = &req->statbuf; } +#ifdef USE_FFRT + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); +#endif } diff --git a/src/unix/getaddrinfo.c b/src/unix/getaddrinfo.c index 43984b17f2049478101766764453c3e694960ceb..dc4b2ebc3c9c7344ab97e4f3f2b5551ed7331aee 100644 --- a/src/unix/getaddrinfo.c +++ b/src/unix/getaddrinfo.c @@ -95,13 +95,20 @@ int uv__getaddrinfo_translate_error(int sys_err) { } +#ifdef USE_FFRT +static void uv__getaddrinfo_work(struct uv__work* w, int qos) { +#else static void uv__getaddrinfo_work(struct uv__work* w) { +#endif uv_getaddrinfo_t* req; int err; req = container_of(w, uv_getaddrinfo_t, work_req); err = getaddrinfo(req->hostname, req->service, req->hints, &req->addrinfo); req->retcode = uv__getaddrinfo_translate_error(err); +#ifdef USE_FFRT + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); +#endif } @@ -213,7 +220,11 @@ int uv_getaddrinfo(uv_loop_t* loop, uv__getaddrinfo_done); return 0; } else { +#ifdef USE_FFRT + uv__getaddrinfo_work(&req->work_req, -1); +#else uv__getaddrinfo_work(&req->work_req); +#endif uv__getaddrinfo_done(&req->work_req, 0); return req->retcode; } diff --git a/src/unix/getnameinfo.c b/src/unix/getnameinfo.c index 43e5127d81799bb4cec221ad8e43c223577522da..34eb19b1d677534c91b0eb38885fabf17c733c40 100644 --- a/src/unix/getnameinfo.c +++ b/src/unix/getnameinfo.c @@ -27,8 +27,11 @@ #include "uv.h" #include "internal.h" - +#ifdef USE_FFRT +static void uv__getnameinfo_work(struct uv__work* w, int qos) { +#else static void uv__getnameinfo_work(struct uv__work* w) { +#endif uv_getnameinfo_t* req; int err; socklen_t salen; @@ -50,6 +53,9 @@ static void uv__getnameinfo_work(struct uv__work* w) { sizeof(req->service), req->flags); req->retcode = uv__getaddrinfo_translate_error(err); +#ifdef USE_FFRT + uv__work_submit_to_eventloop((uv_req_t*)req, w, qos); +#endif } static void uv__getnameinfo_done(struct uv__work* w, int status) { @@ -117,7 +123,11 @@ int uv_getnameinfo(uv_loop_t* loop, uv__getnameinfo_done); return 0; } else { +#ifdef USE_FFRT + uv__getnameinfo_work(&req->work_req, -1); +#else uv__getnameinfo_work(&req->work_req); +#endif uv__getnameinfo_done(&req->work_req, 0); return req->retcode; } diff --git a/src/uv-common.c b/src/uv-common.c index 13419f494630c3a337f4b8a72a99a50eefd1954d..475c0cf3a08d904666c40eb190eb1bfc4c7560e6 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -38,6 +38,12 @@ # include /* AF_UNIX, sockaddr_un */ #endif +#ifdef USE_FFRT +#include +#ifndef TASK_NAME_LENGTH +#define TASK_NAME_LENGTH 128 +#endif +#endif typedef struct { uv_malloc_func local_malloc; @@ -1053,4 +1059,20 @@ uint64_t uv__get_addr_tag(void* addr) { } #endif return tag; +} + + +int uv__copy_taskname(const uv_req_t* req, const char* func_name) { +#ifdef USE_FFRT + char* str = (char*)malloc(TASK_NAME_LENGTH); + char func_name_str[TASK_NAME_LENGTH]; + if (str == NULL) { + UV_LOGE("malloc task name failed, func name:%{public}s", func_name); + return UV_EINVAL; + } + + snprintf(str, TASK_NAME_LENGTH, "func:%s", func_name); + req->reserved[1] = (void*)str; +#endif + return 0; } \ No newline at end of file diff --git a/src/uv-common.h b/src/uv-common.h index 7ca246f5762a136852a7f38bba9385491bbea457..748f3fba559523eef0b88af3596bcc25a2caa39a 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -212,7 +212,11 @@ void uv__work_submit(uv_loop_t* loop, #endif struct uv__work *w, enum uv__work_kind kind, +#ifdef USE_FFRT + void (*work)(struct uv__work *w, int qos), +#else void (*work)(struct uv__work *w), +#endif void (*done)(struct uv__work *w, int status)); void uv__work_done(uv_async_t* handle); @@ -453,5 +457,6 @@ struct uv__loop_internal_fields_s { }; uint64_t uv__get_addr_tag(void* addr); - +void uv__work_submit_to_eventloop(uv_req_t* req, struct uv__work* w, int qos); +int uv__copy_taskname(const uv_req_t* req, const char* func_name); #endif /* UV_COMMON_H_ */