io_uring: batch reap of dead file registrations
authorJens Axboe <axboe@kernel.dk>
Thu, 14 May 2020 23:21:15 +0000 (17:21 -0600)
committerJens Axboe <axboe@kernel.dk>
Fri, 15 May 2020 17:56:18 +0000 (11:56 -0600)
We currently embed and queue a work item per fixed_file_ref_node that
we update, but if the workload does a lot of these, then the associated
kworker-events overhead can become quite noticeable.

Since we rarely need to wait on these, batch them at 1 second intervals
instead. If we do need to wait for them, we just flush the pending
delayed work.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
fs/io_uring.c

index 414e940323d45d4defbb987a80c5d852cebde62a..99dbd43442f2aa88a4a96effa57296b2ee62d610 100644 (file)
@@ -191,7 +191,7 @@ struct fixed_file_ref_node {
        struct list_head                node;
        struct list_head                file_list;
        struct fixed_file_data          *file_data;
-       struct work_struct              work;
+       struct llist_node               llist;
 };
 
 struct fixed_file_data {
@@ -327,6 +327,9 @@ struct io_ring_ctx {
                struct list_head        inflight_list;
        } ____cacheline_aligned_in_smp;
 
+       struct delayed_work             file_put_work;
+       struct llist_head               file_put_llist;
+
        struct work_struct              exit_work;
 };
 
@@ -879,6 +882,8 @@ struct sock *io_uring_get_socket(struct file *file)
 }
 EXPORT_SYMBOL(io_uring_get_socket);
 
+static void io_file_put_work(struct work_struct *work);
+
 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
 {
        struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
@@ -934,6 +939,8 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
        init_waitqueue_head(&ctx->inflight_wait);
        spin_lock_init(&ctx->inflight_lock);
        INIT_LIST_HEAD(&ctx->inflight_list);
+       INIT_DELAYED_WORK(&ctx->file_put_work, io_file_put_work);
+       init_llist_head(&ctx->file_put_llist);
        return ctx;
 err:
        if (ctx->fallback_req)
@@ -6190,6 +6197,7 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
        percpu_ref_kill(&data->refs);
 
        /* wait for all refs nodes to complete */
+       flush_delayed_work(&ctx->file_put_work);
        wait_for_completion(&data->done);
 
        __io_sqe_files_unregister(ctx);
@@ -6420,18 +6428,13 @@ struct io_file_put {
        struct file *file;
 };
 
-static void io_file_put_work(struct work_struct *work)
+static void __io_file_put_work(struct fixed_file_ref_node *ref_node)
 {
-       struct fixed_file_ref_node *ref_node;
-       struct fixed_file_data *file_data;
-       struct io_ring_ctx *ctx;
+       struct fixed_file_data *file_data = ref_node->file_data;
+       struct io_ring_ctx *ctx = file_data->ctx;
        struct io_file_put *pfile, *tmp;
        unsigned long flags;
 
-       ref_node = container_of(work, struct fixed_file_ref_node, work);
-       file_data = ref_node->file_data;
-       ctx = file_data->ctx;
-
        list_for_each_entry_safe(pfile, tmp, &ref_node->file_list, list) {
                list_del_init(&pfile->list);
                io_ring_file_put(ctx, pfile->file);
@@ -6447,13 +6450,42 @@ static void io_file_put_work(struct work_struct *work)
        percpu_ref_put(&file_data->refs);
 }
 
+static void io_file_put_work(struct work_struct *work)
+{
+       struct io_ring_ctx *ctx;
+       struct llist_node *node;
+
+       ctx = container_of(work, struct io_ring_ctx, file_put_work.work);
+       node = llist_del_all(&ctx->file_put_llist);
+
+       while (node) {
+               struct fixed_file_ref_node *ref_node;
+               struct llist_node *next = node->next;
+
+               ref_node = llist_entry(node, struct fixed_file_ref_node, llist);
+               __io_file_put_work(ref_node);
+               node = next;
+       }
+}
+
 static void io_file_data_ref_zero(struct percpu_ref *ref)
 {
        struct fixed_file_ref_node *ref_node;
+       struct io_ring_ctx *ctx;
+       bool first_add;
+       int delay = HZ;
 
        ref_node = container_of(ref, struct fixed_file_ref_node, refs);
+       ctx = ref_node->file_data->ctx;
 
-       queue_work(system_wq, &ref_node->work);
+       if (percpu_ref_is_dying(&ctx->file_data->refs))
+               delay = 0;
+
+       first_add = llist_add(&ref_node->llist, &ctx->file_put_llist);
+       if (!delay)
+               mod_delayed_work(system_wq, &ctx->file_put_work, 0);
+       else if (first_add)
+               queue_delayed_work(system_wq, &ctx->file_put_work, delay);
 }
 
 static struct fixed_file_ref_node *alloc_fixed_file_ref_node(
@@ -6472,10 +6504,8 @@ static struct fixed_file_ref_node *alloc_fixed_file_ref_node(
        }
        INIT_LIST_HEAD(&ref_node->node);
        INIT_LIST_HEAD(&ref_node->file_list);
-       INIT_WORK(&ref_node->work, io_file_put_work);
        ref_node->file_data = ctx->file_data;
        return ref_node;
-
 }
 
 static void destroy_fixed_file_ref_node(struct fixed_file_ref_node *ref_node)