Re: io_cancel return value change

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Aug 15, 2014 at 03:27:53PM -0400, Jeff Moyer wrote:
> Hi, Kent,
> 
> Sorry to bring this up long after your patches have gone in--I am
> remiss.  In the following commit, you changed what was returned by the
> io_cancel system call:
> 
> commit bec68faaf3ba74ed0dcd5dc3a881b30aec542973
> Author: Kent Overstreet <koverstreet@xxxxxxxxxx>
> Date:   Mon May 13 14:45:08 2013 -0700
> 
>     aio: io_cancel() no longer returns the io_event
>     
>     Originally, io_event() was documented to return the io_event if
>     cancellation succeeded - the io_event wouldn't be delivered via the ring
>     buffer like it normally would.
>     
>     But this isn't what the implementation was actually doing; the only
>     driver implementing cancellation, the usb gadget code, never returned an
>     io_event in its cancel function. And aio_complete() was recently changed
>     to no longer suppress event delivery if the kiocb had been cancelled.
>     
>     This gets rid of the unused io_event argument to kiocb_cancel() and
>     kiocb->ki_cancel(), and changes io_cancel() to return -EINPROGRESS if
>     kiocb->ki_cancel() returned success.
>     
>     Also tweak the refcounting in kiocb_cancel() to make more sense.
> 
> You made a passing reference to previous aio_complete changes as well
> that I can't easily track down (it wasn't mentioned in the commit log
> that I can tell, and it isn't obvious to me by looking through the patch
> set).
> 
> At the very least, you need to also include man page updates for this.
> Honestly, though, we should not have let this patch set go in as is.  If

Yes, the man page needs to be updated, but this change in the API is 
backwards compatible.  The existing man page says that it returns 0 on 
sucess and returns an io_event -- anything other than a 0 return implies 
that a completion event will be returned later.  Cancellation has never 
provided a 100% guarantee that it will succeed without providing a 
completion event.

> any software actually wrote code to handle cancellation, this change
> would surely cause confusion.  That said, I think you got away with this
> for two reasons:
> 1) no drivers implements a cancellation routine (except the crazy usb
>    gadgetfs thing)
> 2) userspace code may not even call io_cancel (who knows?), and if they
>    do, well, see point 1.

I implemented some test code to verify that the new approach to cancel 
works.  I have a bunch of other changes against 3.4 that need to be ported 
to 3.17, but I just haven't had time to work on them.  Below are 3 of the 
patches implementing async pipe reads that need forward porting and should 
be relatively easy to merge.

> My preference would be for this behavior change to be reverted.  You
> could make a case for keeping the change and updating the
> documentation.  I'm not convinced 100% one way or the other.  So what do
> you (and others) think?

I'm not convinced it needs to be reverted.  Anything that's not a file or 
block device requires cancellation support, and the in-kernel api changes 
were meant to better handle various race conditions.

> Cheers,
> Jeff
> 
> p.s. I did do a quick google search, and the only caller of io_cancel I
> could find was a test harness.  But, that's certainly not proof that
> nobody does it!

You're ignoring the fact that cancellation is used internally to the kernel 
on process exit or io context destruction.  Cancel methods have to work, as 
otherwise a process will hang at exit.

		-ben
-- 
"Thought is the essence of where you are now."


commit fff36ea0b18906d0a2254b31b1c2ebf05453ac74
Author: Benjamin LaHaise <bcrl@xxxxxxxxx>
Date:   Tue Jul 23 13:05:44 2013 -0400

    aio/pipe: break out the core __pipe_read() from pipe_read() to prep for aio
    
    Move code around from pipe_read() to a helper function __pipe_read() in
    preparation for adding async read support to pipes.

diff --git a/fs/pipe.c b/fs/pipe.c
index 1667e6f..2b596ca 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -356,6 +356,103 @@ static const struct pipe_buf_operations packet_pipe_buf_ops = {
 	.get = generic_pipe_buf_get,
 };
 
+static int __pipe_read(struct pipe_inode_info *pipe, size_t *total_len,
+		       struct iovec *iov, ssize_t *ret, int *do_wakeup,
+		       struct file *filp)
+{
+	int bufs;
+try_again:
+	bufs = pipe->nrbufs;
+	if (bufs) {
+		int curbuf = pipe->curbuf;
+		struct pipe_buffer *buf = pipe->bufs + curbuf;
+		const struct pipe_buf_operations *ops = buf->ops;
+		void *addr;
+		size_t chars = buf->len;
+		int error, atomic;
+
+		if (chars > *total_len)
+			chars = *total_len;
+
+		error = ops->confirm(pipe, buf);
+		if (error) {
+			if (!*ret)
+				*ret = error;
+			goto out_break;
+		}
+
+		atomic = !iov_fault_in_pages_write(iov, chars);
+redo:
+		addr = ops->map(pipe, buf, atomic);
+		error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
+		ops->unmap(pipe, buf, addr);
+		if (unlikely(error)) {
+			/*
+			 * Just retry with the slow path if we failed.
+			 */
+			if (atomic) {
+				atomic = 0;
+				goto redo;
+			}
+			if (!*ret)
+				*ret = error;
+			goto out_break;
+		}
+		*ret += chars;
+		buf->offset += chars;
+		buf->len -= chars;
+
+		/* Was it a packet buffer? Clean up and exit */
+		if (buf->flags & PIPE_BUF_FLAG_PACKET) {
+			*total_len = chars;
+			buf->len = 0;
+		}
+
+		if (!buf->len) {
+			buf->ops = NULL;
+			ops->release(pipe, buf);
+			curbuf = (curbuf + 1) & (pipe->buffers - 1);
+			pipe->curbuf = curbuf;
+			pipe->nrbufs = --bufs;
+			*do_wakeup = 1;
+		}
+		*total_len -= chars;
+		if (!*total_len)
+			goto out_break;	/* common path: read succeeded */
+	}
+	if (bufs)	/* More to do? */
+		goto try_again;
+	if (!pipe->writers)
+		goto out_break;
+	if (!pipe->waiting_writers) {
+		/* syscall merging: Usually we must not sleep
+		 * if O_NONBLOCK is set, or if we got some data.
+		 * But if a writer sleeps in kernel space, then
+		 * we can wait for that data without violating POSIX.
+		 */
+		if (*ret)
+			goto out_break;
+		if (filp->f_flags & O_NONBLOCK) {
+			*ret = -EAGAIN;
+			goto out_break;
+		}
+	}
+	if (signal_pending(current)) {
+		if (!*ret)
+			*ret = -ERESTARTSYS;
+		goto out_break;
+	}
+	if (*do_wakeup) {
+		wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
+ 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
+	}
+
+	return 0;
+
+out_break:
+	return 1;
+}
+
 static ssize_t
 pipe_read(struct kiocb *iocb, const struct iovec *_iov,
 	   unsigned long nr_segs, loff_t pos)
@@ -378,90 +475,8 @@ pipe_read(struct kiocb *iocb, const struct iovec *_iov,
 	mutex_lock(&inode->i_mutex);
 	pipe = inode->i_pipe;
 	for (;;) {
-		int bufs = pipe->nrbufs;
-		if (bufs) {
-			int curbuf = pipe->curbuf;
-			struct pipe_buffer *buf = pipe->bufs + curbuf;
-			const struct pipe_buf_operations *ops = buf->ops;
-			void *addr;
-			size_t chars = buf->len;
-			int error, atomic;
-
-			if (chars > total_len)
-				chars = total_len;
-
-			error = ops->confirm(pipe, buf);
-			if (error) {
-				if (!ret)
-					ret = error;
-				break;
-			}
-
-			atomic = !iov_fault_in_pages_write(iov, chars);
-redo:
-			addr = ops->map(pipe, buf, atomic);
-			error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
-			ops->unmap(pipe, buf, addr);
-			if (unlikely(error)) {
-				/*
-				 * Just retry with the slow path if we failed.
-				 */
-				if (atomic) {
-					atomic = 0;
-					goto redo;
-				}
-				if (!ret)
-					ret = error;
-				break;
-			}
-			ret += chars;
-			buf->offset += chars;
-			buf->len -= chars;
-
-			/* Was it a packet buffer? Clean up and exit */
-			if (buf->flags & PIPE_BUF_FLAG_PACKET) {
-				total_len = chars;
-				buf->len = 0;
-			}
-
-			if (!buf->len) {
-				buf->ops = NULL;
-				ops->release(pipe, buf);
-				curbuf = (curbuf + 1) & (pipe->buffers - 1);
-				pipe->curbuf = curbuf;
-				pipe->nrbufs = --bufs;
-				do_wakeup = 1;
-			}
-			total_len -= chars;
-			if (!total_len)
-				break;	/* common path: read succeeded */
-		}
-		if (bufs)	/* More to do? */
-			continue;
-		if (!pipe->writers)
+		if (__pipe_read(pipe, &total_len, iov, &ret, &do_wakeup, filp))
 			break;
-		if (!pipe->waiting_writers) {
-			/* syscall merging: Usually we must not sleep
-			 * if O_NONBLOCK is set, or if we got some data.
-			 * But if a writer sleeps in kernel space, then
-			 * we can wait for that data without violating POSIX.
-			 */
-			if (ret)
-				break;
-			if (filp->f_flags & O_NONBLOCK) {
-				ret = -EAGAIN;
-				break;
-			}
-		}
-		if (signal_pending(current)) {
-			if (!ret)
-				ret = -ERESTARTSYS;
-			break;
-		}
-		if (do_wakeup) {
-			wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
- 			kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
-		}
 		pipe_wait(pipe);
 	}
 	mutex_unlock(&inode->i_mutex);

commit 187de6fbe66fd70485d53bb45a7a034ad747a7cb
Author: Benjamin LaHaise <bcrl@xxxxxxxxx>
Date:   Tue Jul 23 14:57:45 2013 -0400

    aio, pipe: implement async pipe read
    
    Add code to pipe_read to implement async reads of pipes without using the threaded
    aio core.  This will allow optimizations to reduce the number of context switches
    to be made.

diff --git a/fs/pipe.c b/fs/pipe.c
index 2b596ca..fa0969c 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -103,6 +103,15 @@ void pipe_wait(struct pipe_inode_info *pipe)
 	pipe_lock(pipe);
 }
 
+/* Drop the inode semaphore and wait for a pipe event, atomically */
+ssize_t async_pipe_wait(struct pipe_inode_info *pipe, struct kiocb *iocb)
+{
+	BUG_ON(pipe->read_iocb);
+	pipe->read_iocb = iocb;
+	pipe_unlock(pipe);
+	return -EIOCBRETRY;
+}
+
 static int
 pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len,
 			int atomic)
@@ -443,6 +452,10 @@ redo:
 		goto out_break;
 	}
 	if (*do_wakeup) {
+		struct kiocb *read_iocb = pipe->read_iocb;
+		pipe->read_iocb = NULL;
+		if (read_iocb)
+			kick_iocb(read_iocb);
 		wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
  		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 	}
@@ -453,6 +466,36 @@ out_break:
 	return 1;
 }
 
+int pipe_read_cancel(struct kiocb *iocb, struct io_event *event)
+{
+	struct file *filp = iocb->ki_filp;
+	struct inode *inode = filp->f_path.dentry->d_inode;
+	struct pipe_inode_info *pipe;
+	int did_cancel = 0;
+
+	mutex_lock(&inode->i_mutex);
+	pipe = inode->i_pipe;
+
+	if (pipe->read_iocb == iocb) {
+		pipe->read_iocb = NULL;
+		did_cancel = 1;
+	}
+
+	mutex_unlock(&inode->i_mutex);
+
+	if (did_cancel) {
+		ssize_t ret = (long)iocb->private2;
+		kiocbClearCancelled(iocb);
+		aio_put_req(iocb);
+		if (ret == 0)
+			ret = -EINTR;
+		aio_complete(iocb, ret, 0);
+		return -EINPROGRESS;
+	}
+
+	return -EAGAIN;
+}
+
 static ssize_t
 pipe_read(struct kiocb *iocb, const struct iovec *_iov,
 	   unsigned long nr_segs, loff_t pos)
@@ -463,6 +506,7 @@ pipe_read(struct kiocb *iocb, const struct iovec *_iov,
 	int do_wakeup;
 	ssize_t ret;
 	struct iovec *iov = (struct iovec *)_iov;
+	struct kiocb *read_iocb = NULL;
 	size_t total_len;
 
 	total_len = iov_length(iov, nr_segs);
@@ -472,17 +516,42 @@ pipe_read(struct kiocb *iocb, const struct iovec *_iov,
 
 	do_wakeup = 0;
 	ret = 0;
+
+	if (!is_sync_kiocb(iocb)) {
+		if (iocb->ki_cancel == NULL) {
+			kiocb_set_cancel_fn(iocb, pipe_read_cancel);
+			iocb->private = (void *)total_len;
+			iocb->private2 = (void *)0;
+			iocb->ki_pos = 0;
+		}
+		total_len = (long)iocb->private;
+		ret = (long)iocb->private2;
+		do_wakeup = iocb->ki_pos;
+	}
+
 	mutex_lock(&inode->i_mutex);
 	pipe = inode->i_pipe;
 	for (;;) {
 		if (__pipe_read(pipe, &total_len, iov, &ret, &do_wakeup, filp))
 			break;
+		if (!is_sync_kiocb(iocb)) {
+			iocb->private = (void *)total_len;
+			iocb->private2 = (void *)ret;
+			iocb->ki_pos = do_wakeup;
+			return async_pipe_wait(pipe, iocb);
+		}
 		pipe_wait(pipe);
 	}
+	if (do_wakeup) {
+		read_iocb = pipe->read_iocb;
+		pipe->read_iocb = NULL;
+	}
 	mutex_unlock(&inode->i_mutex);
 
 	/* Signal writers asynchronously that there is more room. */
 	if (do_wakeup) {
+		if (read_iocb)
+			kick_iocb(read_iocb);
 		wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 	}
@@ -506,6 +575,7 @@ pipe_write(struct kiocb *iocb, const struct iovec *_iov,
 	ssize_t ret;
 	int do_wakeup;
 	struct iovec *iov = (struct iovec *)_iov;
+	struct kiocb *read_iocb = NULL;
 	size_t total_len;
 	ssize_t chars;
 
@@ -655,6 +725,11 @@ redo2:
 			break;
 		}
 		if (do_wakeup) {
+			if (pipe->read_iocb) {
+				read_iocb = pipe->read_iocb;
+				pipe->read_iocb = NULL;
+				kick_iocb(read_iocb);
+			}
 			wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
 			kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 			do_wakeup = 0;
@@ -664,8 +739,14 @@ redo2:
 		pipe->waiting_writers--;
 	}
 out:
+	if (do_wakeup) {
+		read_iocb = pipe->read_iocb;
+		pipe->read_iocb = NULL;
+	}
 	mutex_unlock(&inode->i_mutex);
 	if (do_wakeup) {
+		if (read_iocb)
+			kick_iocb(read_iocb);
 		wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 	}
@@ -748,6 +829,7 @@ pipe_poll(struct file *filp, poll_table *wait)
 static int
 pipe_release(struct inode *inode, int decr, int decw)
 {
+	struct kiocb *read_iocb = NULL;
 	struct pipe_inode_info *pipe;
 
 	mutex_lock(&inode->i_mutex);
@@ -758,12 +840,17 @@ pipe_release(struct inode *inode, int decr, int decw)
 	if (!pipe->readers && !pipe->writers) {
 		free_pipe_info(inode);
 	} else {
+		read_iocb = pipe->read_iocb;
+		pipe->read_iocb = NULL;
 		wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLOUT | POLLRDNORM | POLLWRNORM | POLLERR | POLLHUP);
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 	}
 	mutex_unlock(&inode->i_mutex);
 
+	if (read_iocb)
+		kick_iocb(read_iocb);
+
 	return 0;
 }
 
@@ -909,6 +996,7 @@ const struct file_operations read_pipefifo_fops = {
 	.open		= pipe_read_open,
 	.release	= pipe_read_release,
 	.fasync		= pipe_read_fasync,
+	.use_aio_read	= true,
 };
 
 const struct file_operations write_pipefifo_fops = {
@@ -934,6 +1022,7 @@ const struct file_operations rdwr_pipefifo_fops = {
 	.open		= pipe_rdwr_open,
 	.release	= pipe_rdwr_release,
 	.fasync		= pipe_rdwr_fasync,
+	.use_aio_read	= true,
 };
 
 struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index e1ac1ce..d404e796 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -55,6 +55,7 @@ struct pipe_inode_info {
 	struct fasync_struct *fasync_writers;
 	struct inode *inode;
 	struct pipe_buffer *bufs;
+	struct kiocb *read_iocb;
 };
 
 /*


commit 0ff94f4f7612e2161cacf872565df22758190918
Author: Benjamin LaHaise <bcrl@xxxxxxxxx>
Date:   Tue Jul 23 15:48:20 2013 -0400

    aio/pipe: reduce context switches when async pipe read is in same mm as write
    
    For applications that make use of a pipe internally for waking internal
    state machines, an async pipe read may occur in the same address space as
    the write to wake up that pipe.  In this case, 2 context switches can be
    eliminated by completing the async read directly in the pipe writer.  On
    one application making use of this facilty, this results in the number of
    context switches per second dropping from approximately 35k to 22k.

diff --git a/fs/pipe.c b/fs/pipe.c
index fa0969c..42b1d3c 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -565,6 +565,43 @@ static inline int is_packetized(struct file *file)
 	return (file->f_flags & O_DIRECT) != 0;
 }
 
+static void pipe_maybe_complete_aio_read(struct pipe_inode_info *pipe)
+{
+	struct kiocb *read_iocb = pipe->read_iocb;
+	pipe->read_iocb = NULL;
+	if (!read_iocb)
+		return;
+	/* Is an awaiting read in the same address space as the current
+	 * process?  If so, complete the read to avoid the context switch
+	 * to the aio work queue.
+	 */
+	if (!is_sync_kiocb(read_iocb) &&
+            (read_iocb->ki_ctx->mm == current->mm)) {
+		int do_wakeup;
+		ssize_t ret;
+		size_t total_len;
+
+		total_len = (long)read_iocb->private;
+		ret = (long)read_iocb->private2;
+		do_wakeup = read_iocb->ki_pos;
+
+		for (;;) {
+			if (__pipe_read(pipe, &total_len, read_iocb->ki_iovec,
+					&ret, &do_wakeup, read_iocb->ki_filp))
+				break;
+			read_iocb->private = (void *)total_len;
+			read_iocb->private2 = (void *)ret;
+			read_iocb->ki_pos = do_wakeup;
+			pipe->read_iocb = read_iocb;
+			return;
+		}
+		aio_complete(read_iocb, ret, 0);
+		return;
+	}
+
+	kick_iocb(read_iocb);
+}
+
 static ssize_t
 pipe_write(struct kiocb *iocb, const struct iovec *_iov,
 	    unsigned long nr_segs, loff_t ppos)
@@ -725,11 +762,7 @@ redo2:
 			break;
 		}
 		if (do_wakeup) {
-			if (pipe->read_iocb) {
-				read_iocb = pipe->read_iocb;
-				pipe->read_iocb = NULL;
-				kick_iocb(read_iocb);
-			}
+			pipe_maybe_complete_aio_read(pipe);
 			wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
 			kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 			do_wakeup = 0;
@@ -740,6 +773,7 @@ redo2:
 	}
 out:
 	if (do_wakeup) {
+		pipe_maybe_complete_aio_read(pipe);
 		read_iocb = pipe->read_iocb;
 		pipe->read_iocb = NULL;
 	}
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux