io_uring: limit local tw done

Instead of eagerly running all available local tw, limit the amount of
local tw done to the max of IO_LOCAL_TW_DEFAULT_MAX (20) or wait_nr. The
value of 20 is chosen as a reasonable heuristic to allow enough work
batching but also keep latency down.

Add a retry_llist that maintains a list of local tw that couldn't be
done in time. No synchronisation is needed since it is only modified
within the task context.

Signed-off-by: David Wei <dw@davidwei.uk>
Link: https://lore.kernel.org/r/20241120221452.3762588-3-dw@davidwei.uk
Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
David Wei 2024-11-20 14:14:52 -08:00 committed by Jens Axboe
parent 40cfe55324
commit f46b9cdb22
3 changed files with 34 additions and 12 deletions

View file

@ -335,6 +335,7 @@ struct io_ring_ctx {
*/ */
struct { struct {
struct llist_head work_llist; struct llist_head work_llist;
struct llist_head retry_llist;
unsigned long check_cq; unsigned long check_cq;
atomic_t cq_wait_nr; atomic_t cq_wait_nr;
atomic_t cq_timeouts; atomic_t cq_timeouts;

View file

@ -122,6 +122,7 @@
#define IO_COMPL_BATCH 32 #define IO_COMPL_BATCH 32
#define IO_REQ_ALLOC_BATCH 8 #define IO_REQ_ALLOC_BATCH 8
#define IO_LOCAL_TW_DEFAULT_MAX 20
struct io_defer_entry { struct io_defer_entry {
struct list_head list; struct list_head list;
@ -1256,6 +1257,8 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
struct llist_node *node = llist_del_all(&ctx->work_llist); struct llist_node *node = llist_del_all(&ctx->work_llist);
__io_fallback_tw(node, false); __io_fallback_tw(node, false);
node = llist_del_all(&ctx->retry_llist);
__io_fallback_tw(node, false);
} }
static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@ -1270,37 +1273,55 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
return false; return false;
} }
static int __io_run_local_work_loop(struct llist_node **node,
struct io_tw_state *ts,
int events)
{
while (*node) {
struct llist_node *next = (*node)->next;
struct io_kiocb *req = container_of(*node, struct io_kiocb,
io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, ts);
*node = next;
if (--events <= 0)
break;
}
return events;
}
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
int min_events) int min_events)
{ {
struct llist_node *node; struct llist_node *node;
unsigned int loops = 0; unsigned int loops = 0;
int ret = 0; int ret, limit;
if (WARN_ON_ONCE(ctx->submitter_task != current)) if (WARN_ON_ONCE(ctx->submitter_task != current))
return -EEXIST; return -EEXIST;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
again: again:
ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
if (ctx->retry_llist.first)
goto retry_done;
/* /*
* llists are in reverse order, flip it back the right way before * llists are in reverse order, flip it back the right way before
* running the pending items. * running the pending items.
*/ */
node = llist_reverse_order(llist_del_all(&ctx->work_llist)); node = llist_reverse_order(llist_del_all(&ctx->work_llist));
while (node) { ret = __io_run_local_work_loop(&node, ts, ret);
struct llist_node *next = node->next; ctx->retry_llist.first = node;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
req, ts);
ret++;
node = next;
}
loops++; loops++;
ret = limit - ret;
if (io_run_local_work_continue(ctx, ret, min_events)) if (io_run_local_work_continue(ctx, ret, min_events))
goto again; goto again;
retry_done:
io_submit_flush_completions(ctx); io_submit_flush_completions(ctx);
if (io_run_local_work_continue(ctx, ret, min_events)) if (io_run_local_work_continue(ctx, ret, min_events))
goto again; goto again;

View file

@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
static inline bool io_local_work_pending(struct io_ring_ctx *ctx) static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
{ {
return !llist_empty(&ctx->work_llist); return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
} }
static inline bool io_task_work_pending(struct io_ring_ctx *ctx) static inline bool io_task_work_pending(struct io_ring_ctx *ctx)