[PATCH 11/32] aio: Make aio_put_req() lockless

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Freeing a kiocb needed to touch the kioctx for three things:

 * Pull it off the reqs_active list
 * Decrementing reqs_active
 * Issuing a wakeup, if the kioctx was in the process of being freed.

This patch moves these to aio_complete(), for a couple reasons:

 * aio_complete() already has to issue the wakeup, so if we drop the
   kioctx refcount before aio_complete does its wakeup we don't have to
   do it twice.
 * aio_complete currently has to take the kioctx lock, so it makes sense
   for it to pull the kiocb off the reqs_active list too.
 * A later patch is going to change reqs_active to include unreaped
   completions - this will mean allocating a kiocb doesn't have to look
   at the ringbuffer. So taking the decrement of reqs_active out of
   kiocb_free() is useful prep work for that patch.

This doesn't really affect cancellation, since existing (usb) code that
implements a cancel function still calls aio_complete() - we just have
to make sure that aio_complete does the necessary teardown for cancelled
kiocbs.

It does affect code paths where we free kiocbs that were never
submitted; they need to decrement reqs_active and pull the kiocb off the
reqs_active list. This occurs in two places: kiocb_batch_free(), which
is going away in a later patch, and the error path in io_submit_one.

Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx>
---
 fs/aio.c            | 85 +++++++++++++++++++++--------------------------------
 include/linux/aio.h |  4 +--
 2 files changed, 35 insertions(+), 54 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index db6cb02..37eac67 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -89,7 +89,7 @@ struct kioctx {
 
 	spinlock_t		ctx_lock;
 
-	int			reqs_active;
+	atomic_t		reqs_active;
 	struct list_head	active_reqs;	/* used for cancellation */
 
 	/* sys_io_setup currently limits this to an unsigned int */
@@ -247,7 +247,7 @@ static void ctx_rcu_free(struct rcu_head *head)
 static void __put_ioctx(struct kioctx *ctx)
 {
 	unsigned nr_events = ctx->max_reqs;
-	BUG_ON(ctx->reqs_active);
+	BUG_ON(atomic_read(&ctx->reqs_active));
 
 	aio_free_ring(ctx);
 	if (nr_events) {
@@ -281,7 +281,7 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
 	cancel = kiocb->ki_cancel;
 	kiocbSetCancelled(kiocb);
 	if (cancel) {
-		kiocb->ki_users++;
+		atomic_inc(&kiocb->ki_users);
 		spin_unlock_irq(&ctx->ctx_lock);
 
 		memset(res, 0, sizeof(*res));
@@ -380,12 +380,12 @@ static void kill_ctx(struct kioctx *ctx)
 		kiocb_cancel(ctx, req, &res);
 	}
 
-	if (!ctx->reqs_active)
+	if (!atomic_read(&ctx->reqs_active))
 		goto out;
 
 	add_wait_queue(&ctx->wait, &wait);
 	set_task_state(tsk, TASK_UNINTERRUPTIBLE);
-	while (ctx->reqs_active) {
+	while (atomic_read(&ctx->reqs_active)) {
 		spin_unlock_irq(&ctx->ctx_lock);
 		io_schedule();
 		set_task_state(tsk, TASK_UNINTERRUPTIBLE);
@@ -403,9 +403,9 @@ out:
  */
 ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
 {
-	while (iocb->ki_users) {
+	while (atomic_read(&iocb->ki_users)) {
 		set_current_state(TASK_UNINTERRUPTIBLE);
-		if (!iocb->ki_users)
+		if (!atomic_read(&iocb->ki_users))
 			break;
 		io_schedule();
 	}
@@ -435,7 +435,7 @@ void exit_aio(struct mm_struct *mm)
 			printk(KERN_DEBUG
 				"exit_aio:ioctx still alive: %d %d %d\n",
 				atomic_read(&ctx->users), ctx->dead,
-				ctx->reqs_active);
+				atomic_read(&ctx->reqs_active));
 		/*
 		 * We don't need to bother with munmap() here -
 		 * exit_mmap(mm) is coming and it'll unmap everything.
@@ -450,11 +450,11 @@ void exit_aio(struct mm_struct *mm)
 }
 
 /* aio_get_req
- *	Allocate a slot for an aio request.  Increments the users count
+ *	Allocate a slot for an aio request.  Increments the ki_users count
  * of the kioctx so that the kioctx stays around until all requests are
  * complete.  Returns NULL if no requests are free.
  *
- * Returns with kiocb->users set to 2.  The io submit code path holds
+ * Returns with kiocb->ki_users set to 2.  The io submit code path holds
  * an extra reference while submitting the i/o.
  * This prevents races between the aio code path referencing the
  * req (after submitting it) and aio_complete() freeing the req.
@@ -468,7 +468,7 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
 		return NULL;
 
 	req->ki_flags = 0;
-	req->ki_users = 2;
+	atomic_set(&req->ki_users, 2);
 	req->ki_key = 0;
 	req->ki_ctx = ctx;
 	req->ki_cancel = NULL;
@@ -509,9 +509,9 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
 		list_del(&req->ki_batch);
 		list_del(&req->ki_list);
 		kmem_cache_free(kiocb_cachep, req);
-		ctx->reqs_active--;
+		atomic_dec(&ctx->reqs_active);
 	}
-	if (unlikely(!ctx->reqs_active && ctx->dead))
+	if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
 		wake_up_all(&ctx->wait);
 	spin_unlock_irq(&ctx->ctx_lock);
 }
@@ -542,7 +542,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
 	spin_lock_irq(&ctx->ctx_lock);
 	ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
 
-	avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+	avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active);
 	BUG_ON(avail < 0);
 	if (avail < allocated) {
 		/* Trim back the number of requests. */
@@ -557,7 +557,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
 	batch->count -= allocated;
 	list_for_each_entry(req, &batch->head, ki_batch) {
 		list_add(&req->ki_list, &ctx->active_reqs);
-		ctx->reqs_active++;
+		atomic_inc(&ctx->reqs_active);
 	}
 
 	kunmap_atomic(ring);
@@ -580,10 +580,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx,
 	return req;
 }
 
-static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
+static void kiocb_free(struct kiocb *req)
 {
-	assert_spin_locked(&ctx->ctx_lock);
-
 	if (req->ki_filp)
 		fput(req->ki_filp);
 	if (req->ki_eventfd != NULL)
@@ -593,40 +591,12 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
 	if (req->ki_iovec != &req->ki_inline_vec)
 		kfree(req->ki_iovec);
 	kmem_cache_free(kiocb_cachep, req);
-	ctx->reqs_active--;
-
-	if (unlikely(!ctx->reqs_active && ctx->dead))
-		wake_up_all(&ctx->wait);
 }
 
-/* __aio_put_req
- *	Returns true if this put was the last user of the request.
- */
-static void __aio_put_req(struct kioctx *ctx, struct kiocb *req)
-{
-	assert_spin_locked(&ctx->ctx_lock);
-
-	req->ki_users--;
-	BUG_ON(req->ki_users < 0);
-	if (likely(req->ki_users))
-		return;
-	list_del(&req->ki_list);		/* remove from active_reqs */
-	req->ki_cancel = NULL;
-	req->ki_retry = NULL;
-
-	really_put_req(ctx, req);
-}
-
-/* aio_put_req
- *	Returns true if this put was the last user of the kiocb,
- *	false if the request is still in use.
- */
 void aio_put_req(struct kiocb *req)
 {
-	struct kioctx *ctx = req->ki_ctx;
-	spin_lock_irq(&ctx->ctx_lock);
-	__aio_put_req(ctx, req);
-	spin_unlock_irq(&ctx->ctx_lock);
+	if (atomic_dec_and_test(&req->ki_users))
+		kiocb_free(req);
 }
 EXPORT_SYMBOL(aio_put_req);
 
@@ -675,9 +645,9 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 	 *  - the sync task helpfully left a reference to itself in the iocb
 	 */
 	if (is_sync_kiocb(iocb)) {
-		BUG_ON(iocb->ki_users != 1);
+		BUG_ON(atomic_read(&iocb->ki_users) != 1);
 		iocb->ki_user_data = res;
-		iocb->ki_users = 0;
+		atomic_set(&iocb->ki_users, 0);
 		wake_up_process(iocb->ki_obj.tsk);
 		return;
 	}
@@ -692,6 +662,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 	 */
 	spin_lock_irqsave(&ctx->ctx_lock, flags);
 
+	list_del(&iocb->ki_list); /* remove from active_reqs */
+
 	/*
 	 * cancelled requests don't get events, userland was given one
 	 * when the event got cancelled.
@@ -738,7 +710,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 
 put_rq:
 	/* everything turned out well, dispose of the aiocb. */
-	__aio_put_req(ctx, iocb);
+	aio_put_req(iocb);
+	atomic_dec(&ctx->reqs_active);
 
 	/*
 	 * We have to order our ring_info tail store above and test
@@ -903,7 +876,7 @@ static int read_events(struct kioctx *ctx,
 				break;
 			/* Try to only show up in io wait if there are ops
 			 *  in flight */
-			if (ctx->reqs_active)
+			if (atomic_read(&ctx->reqs_active))
 				io_schedule();
 			else
 				schedule();
@@ -1362,6 +1335,14 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	return 0;
 
 out_put_req:
+	spin_lock_irq(&ctx->ctx_lock);
+	list_del(&req->ki_list);
+	spin_unlock_irq(&ctx->ctx_lock);
+
+	atomic_dec(&ctx->reqs_active);
+	if (unlikely(!atomic_read(&ctx->reqs_active) && ctx->dead))
+		wake_up_all(&ctx->wait);
+
 	aio_put_req(req);	/* drop extra ref to req */
 	aio_put_req(req);	/* drop i/o ref to req */
 	return ret;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 7b1eb23..1e728f0 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -49,7 +49,7 @@ struct kioctx;
  */
 struct kiocb {
 	unsigned long		ki_flags;
-	int			ki_users;
+	atomic_t		ki_users;
 	unsigned		ki_key;		/* id of this request */
 
 	struct file		*ki_filp;
@@ -96,7 +96,7 @@ static inline bool is_sync_kiocb(struct kiocb *kiocb)
 static inline void init_sync_kiocb(struct kiocb *kiocb, struct file *filp)
 {
 	*kiocb = (struct kiocb) {
-			.ki_users = 1,
+			.ki_users = ATOMIC_INIT(1),
 			.ki_key = KIOCB_SYNC_KEY,
 			.ki_filp = filp,
 			.ki_obj.tsk = current,
-- 
1.7.12

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux