diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 066f9ab708c6efecf71d15771669c6daf109e8fc..b14af1b6f9dcee239a249ff07b8b48d4b67158eb 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -577,7 +577,9 @@ static void io_worker_handle_work(struct io_worker *worker) /* handle a whole dependent link */ do { struct io_wq_work *next_hashed, *linked; - unsigned int hash = io_get_work_hash(work); + unsigned int hash = io_wq_is_hashed(work) + ? io_get_work_hash(work) + : -1U; next_hashed = wq_next_work(work); @@ -622,7 +624,7 @@ static int io_wqe_worker(void *data) struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - bool last_timeout = false; + bool exit_mask = false, last_timeout = false; char buf[TASK_COMM_LEN]; set_mask_bits(&worker->flags, 0, @@ -641,8 +643,11 @@ static int io_wqe_worker(void *data) io_worker_handle_work(worker); goto loop; } - /* timed out, exit unless we're the last worker */ - if (last_timeout && acct->nr_workers > 1) { + /* + * Last sleep timed out. Exit if we're not the last worker, + * or if someone modified our affinity. + */ + if (last_timeout && (exit_mask || acct->nr_workers > 1)) { acct->nr_workers--; raw_spin_unlock(&wqe->lock); __set_current_state(TASK_RUNNING); @@ -661,7 +666,11 @@ static int io_wqe_worker(void *data) continue; break; } - last_timeout = !ret; + if (!ret) { + last_timeout = true; + exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), + wqe->cpu_mask); + } } if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { @@ -718,7 +727,6 @@ static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, tsk->pf_io_worker = worker; worker->task = tsk; set_cpus_allowed_ptr(tsk, wqe->cpu_mask); - tsk->flags |= PF_NO_SETAFFINITY; raw_spin_lock(&wqe->lock); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); @@ -1144,9 +1152,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); - ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); - if (ret) - goto err_wq; refcount_inc(&data->hash->refs); wq->hash = data->hash; @@ -1189,17 +1194,19 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->task = get_task_struct(data->task); atomic_set(&wq->worker_refs, 1); init_completion(&wq->worker_done); + ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); + if (ret) + goto err; + return wq; err: io_wq_put_hash(data->hash); - cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); for_each_node(node) { if (!wq->wqes[node]) continue; free_cpumask_var(wq->wqes[node]->cpu_mask); kfree(wq->wqes[node]); } -err_wq: kfree(wq); return ERR_PTR(ret); } diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 883e2b74f82e59587ed54e0b51b3e49a154527a8..fee6459547d91d21760ee3cbb834f149b274456d 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -1109,6 +1109,7 @@ static int io_close_fixed(struct io_kiocb *req, unsigned int issue_flags); static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer); static struct kmem_cache *req_cachep; +static struct workqueue_struct *iou_wq __ro_after_init; static const struct file_operations io_uring_fops; @@ -1586,7 +1587,14 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx) { struct io_rings *r = ctx->rings; - return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == ctx->sq_entries; + /* + * SQPOLL must use the actual sqring head, as using the cached_sq_head + * is race prone if the SQPOLL thread has grabbed entries but not yet + * committed them to the ring. For !SQPOLL, this doesn't matter, but + * since this helper is just used for SQPOLL sqring waits (or POLLOUT), + * just read the actual sqring head unconditionally. + */ + return READ_ONCE(r->sq.tail) - READ_ONCE(r->sq.head) == ctx->sq_entries; } static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) @@ -3073,7 +3081,7 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret, { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); - if (req->flags & REQ_F_CUR_POS) + if (ret >= 0 && req->flags & REQ_F_CUR_POS) req->file->f_pos = kiocb->ki_pos; if (ret >= 0 && (kiocb->ki_complete == io_complete_rw)) { if (!__io_complete_rw_common(req, ret)) { @@ -7523,6 +7531,7 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd) did_sig = get_signal(&ksig); cond_resched(); mutex_lock(&sqd->lock); + sqd->sq_cpu = raw_smp_processor_id(); } return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); } @@ -7538,11 +7547,15 @@ static int io_sq_thread(void *data) snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid); set_task_comm(current, buf); - if (sqd->sq_cpu != -1) + /* reset to our pid after we've set task_comm, for fdinfo */ + sqd->task_pid = current->pid; + + if (sqd->sq_cpu != -1) { set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); - else + } else { set_cpus_allowed_ptr(current, cpu_online_mask); - current->flags |= PF_NO_SETAFFINITY; + sqd->sq_cpu = raw_smp_processor_id(); + } mutex_lock(&sqd->lock); while (1) { @@ -7566,6 +7579,7 @@ static int io_sq_thread(void *data) if (sqt_spin || !time_after(jiffies, timeout)) { cond_resched(); + sqd->sq_cpu = raw_smp_processor_id(); if (sqt_spin) timeout = jiffies + sqd->sq_thread_idle; continue; @@ -7593,6 +7607,7 @@ static int io_sq_thread(void *data) mutex_unlock(&sqd->lock); schedule(); mutex_lock(&sqd->lock); + sqd->sq_cpu = raw_smp_processor_id(); } list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_clear_wakeup_flag(ctx); @@ -8568,6 +8583,7 @@ void __io_uring_free(struct task_struct *tsk) static int io_sq_offload_create(struct io_ring_ctx *ctx, struct io_uring_params *p) { + struct task_struct *task_to_put = NULL; int ret; /* Retain compatibility with failing for an invalid attach attempt */ @@ -8633,6 +8649,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, } sqd->thread = tsk; + task_to_put = get_task_struct(tsk); ret = io_uring_alloc_task_context(tsk, ctx); wake_up_new_task(tsk); if (ret) @@ -8643,11 +8660,15 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, goto err; } + if (task_to_put) + put_task_struct(task_to_put); return 0; err_sqpoll: complete(&ctx->sq_data->exited); err: io_sq_thread_finish(ctx); + if (task_to_put) + put_task_struct(task_to_put); return ret; } @@ -9487,7 +9508,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) * noise and overhead, there's no discernable change in runtime * over using system_wq. */ - queue_work(system_unbound_wq, &ctx->exit_work); + queue_work(iou_wq, &ctx->exit_work); } static int io_uring_release(struct inode *inode, struct file *file) @@ -10055,13 +10076,8 @@ static void __io_uring_show_fdinfo(struct io_ring_ctx *ctx, struct seq_file *m) if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL)) { struct io_sq_data *sq = ctx->sq_data; - if (mutex_trylock(&sq->lock)) { - if (sq->thread) { - sq_pid = task_pid_nr(sq->thread); - sq_cpu = task_cpu(sq->thread); - } - mutex_unlock(&sq->lock); - } + sq_pid = sq->task_pid; + sq_cpu = sq->sq_cpu; } seq_printf(m, "SqThread:\t%d\n", sq_pid); @@ -10932,9 +10948,10 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); - mutex_unlock(&ctx->uring_lock); + trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ctx->cq_ev_fd != NULL, ret); + mutex_unlock(&ctx->uring_lock); out_fput: fdput(f); return ret; @@ -10997,6 +11014,9 @@ static int __init io_uring_init(void) req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT); + + iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64); + BUG_ON(!iou_wq); return 0; }; __initcall(io_uring_init);