[PATCH][WIP v1] aio: experimental use of threads, demonstration of cancel method

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch is purely for experimentation purposes, and is by no means
complete or cleaned up for submission yet.  It is, however, useful for
demonstrating the cancellation of a kiocb when the kiocb is being
processed by using a kernel thread.

There are a number of things about this patch that are completely broken:
it uses a very simple thread pool, it does not yet implement vector ops,
it overrides aio operations for read/write/fsync/fdsync, and it has in no
way had any performance tuning done on it.  A subsequent demonstration
patch will be written to make use of queue_work() for the purpose of
examining and comparing the overhead of various APIs.

As for cancellation, the thread based cancellation implemented in this
patch is expected to function correctly.  A test program is available at
http://www.kvack.org/~bcrl/aio_tests/read-pipe-cancel.c and makes use of
io_cancel() on a read from a pipe file descriptor.  Hopefully the
simplicity of the cancel function is useful for providing a starting point
for further discussion of kiocb cancellation.

This change applies on top of the 3 previous aio patches posted earlier
today.  A git repository with the changes is available at
git://git.kvack.org/~bcrl/linux-next-20130213.git and is based off of
today's linux-next tree.  Please note that this is a throw-away repository
that will be rebased.

Not-signed-off-by: Benjamin LaHaise <bcrl@xxxxxxxxx>
---
 fs/aio.c              |  240 +++++++++++++++++++++++++++++++++++++++++++++----
 fs/exec.c             |    6 ++
 include/linux/aio.h   |    1 +
 include/linux/sched.h |    3 +
 kernel/exit.c         |    6 ++
 kernel/fork.c         |    5 +
 kernel/sched/core.c   |    2 +
 7 files changed, 244 insertions(+), 19 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 1bcb818..a95d9c5 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -38,6 +38,7 @@
 #include <linux/blkdev.h>
 #include <linux/compat.h>
 #include <linux/percpu-refcount.h>
+#include <linux/kthread.h>
 
 #include <asm/kmap_types.h>
 #include <asm/uaccess.h>
@@ -73,6 +74,8 @@ struct kioctx {
 	unsigned long		user_id;
 	struct hlist_node	list;
 
+	struct mm_struct	*mm;
+
 	struct __percpu kioctx_cpu *cpu;
 
 	unsigned		req_batch;
@@ -102,6 +105,11 @@ struct kioctx {
 	} ____cacheline_aligned_in_smp;
 
 	struct {
+		spinlock_t		worker_lock;
+		struct list_head	worker_list;
+	} ____cacheline_aligned_in_smp;
+
+	struct {
 		struct mutex	ring_lock;
 		wait_queue_head_t wait;
 
@@ -136,6 +144,8 @@ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request
 static struct kmem_cache	*kiocb_cachep;
 static struct kmem_cache	*kioctx_cachep;
 
+static int make_helper_thread(struct kioctx *ctx);
+
 /* aio_setup
  *	Creates the slab caches used by the aio routines, panic on
  *	failure as this is done early during the boot sequence.
@@ -295,9 +305,25 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb,
 static void free_ioctx_rcu(struct rcu_head *head)
 {
 	struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+	struct task_struct *task;
+	int nr = 0;
 
 	free_percpu(ctx->cpu);
+	do {
+		spin_lock(&ctx->worker_lock);
+		if (!list_empty(&ctx->worker_list)) {
+			task = list_entry(ctx->worker_list.next,
+					  struct task_struct, aio_list);
+			list_del(&task->aio_list);
+			nr++;
+		} else
+			task = NULL;
+		spin_unlock(&ctx->worker_lock);
+		if (task)
+			wake_up_process(task);
+	} while (task) ;
 	kmem_cache_free(kioctx_cachep, ctx);
+	printk("free_ioctx_rcu: nr of worker threads: %d\n", nr);
 }
 
 /*
@@ -339,7 +365,7 @@ static void free_ioctx(struct kioctx *ctx)
 	while (atomic_read(&ctx->reqs_available) < ctx->nr) {
 		wait_event(ctx->wait,
 			   (head != ctx->shadow_tail) ||
-			   (atomic_read(&ctx->reqs_available) != ctx->nr));
+			   (atomic_read(&ctx->reqs_available) == ctx->nr));
 
 		avail = (head <= ctx->shadow_tail ?
 			 ctx->shadow_tail : ctx->nr) - head;
@@ -360,6 +386,10 @@ static void free_ioctx(struct kioctx *ctx)
 
 	pr_debug("freeing %p\n", ctx);
 
+	if (ctx->mm)
+		mmdrop(ctx->mm);
+	ctx->mm = NULL;
+
 	/*
 	 * Here the call_rcu() is between the wait_event() for reqs_active to
 	 * hit 0, and freeing the ioctx.
@@ -407,6 +437,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 	rcu_read_unlock();
 
 	spin_lock_init(&ctx->ctx_lock);
+	spin_lock_init(&ctx->worker_lock);
+	INIT_LIST_HEAD(&ctx->worker_list);
 	mutex_init(&ctx->ring_lock);
 	init_waitqueue_head(&ctx->wait);
 
@@ -433,6 +465,9 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 	aio_nr += ctx->max_reqs;
 	spin_unlock(&aio_nr_lock);
 
+	ctx->mm = current->mm;
+	atomic_inc(&current->mm->mm_count);
+
 	/* now link into global list. */
 	spin_lock(&mm->ioctx_lock);
 	hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
@@ -629,6 +664,7 @@ static void kiocb_free(struct kiocb *req)
 
 void aio_put_req(struct kiocb *req)
 {
+	BUG_ON(atomic_read(&req->ki_users) <= 0);
 	if (atomic_dec_and_test(&req->ki_users))
 		kiocb_free(req);
 }
@@ -681,6 +717,7 @@ static inline unsigned kioctx_ring_put(struct kioctx *ctx, struct kiocb *req,
 
 static inline unsigned kioctx_ring_lock(struct kioctx *ctx)
 {
+	struct aio_ring *ring;
 	unsigned tail;
 
 	/*
@@ -690,6 +727,15 @@ static inline unsigned kioctx_ring_lock(struct kioctx *ctx)
 	while ((tail = xchg(&ctx->tail, UINT_MAX)) == UINT_MAX)
 		cpu_relax();
 
+	ring = kmap_atomic(ctx->ring_pages[0]);
+#if 0
+	if (ring->head == ring->tail) {
+		ring->head = ring->tail = 0;
+		tail = 0;
+	}
+#endif
+	kunmap_atomic(ring);
+
 	return tail;
 }
 
@@ -892,7 +938,7 @@ static long aio_read_events_ring(struct kioctx *ctx,
 		goto out;
 
 	while (ret < nr) {
-		long avail = (head <= ctx->shadow_tail
+		long avail = (head < ctx->shadow_tail
 			      ? ctx->shadow_tail : ctx->nr) - head;
 		struct io_event *ev;
 		struct page *page;
@@ -1031,6 +1077,9 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 		put_ioctx(ioctx);
 	}
 
+	if (!ret)
+		make_helper_thread(ioctx);
+
 out:
 	return ret;
 }
@@ -1156,12 +1205,24 @@ static ssize_t aio_setup_single_vector(int rw, struct kiocb *kiocb)
 	return 0;
 }
 
+static int aio_thread_cancel_fn(struct kiocb *iocb, struct io_event *event)
+{
+	struct task_struct *task = iocb->private;
+
+	barrier();
+	aio_put_req(iocb);
+	if (task == NULL)
+		return -EAGAIN;
+	force_sig(SIGSEGV, task);
+	return -EINPROGRESS;	/* the cancelled iocb will complete */
+}
+
 /*
  * aio_setup_iocb:
  *	Performs the initial checks and aio retry method
  *	setup for the kiocb at the time of io submission.
  */
-static ssize_t aio_run_iocb(struct kiocb *req, bool compat)
+static ssize_t aio_run_iocb(struct kiocb *req)
 {
 	struct file *file = req->ki_filp;
 	ssize_t ret;
@@ -1187,12 +1248,9 @@ rw_common:
 		if (unlikely(!(file->f_mode & mode)))
 			return -EBADF;
 
-		if (!rw_op)
-			return -EINVAL;
-
 		ret = (req->ki_opcode == IOCB_CMD_PREADV ||
 		       req->ki_opcode == IOCB_CMD_PWRITEV)
-			? aio_setup_vectored_rw(rw, req, compat)
+			? aio_setup_vectored_rw(rw, req, req->ki_compat)
 			: aio_setup_single_vector(rw, req);
 		if (ret)
 			return ret;
@@ -1204,23 +1262,36 @@ rw_common:
 		req->ki_nbytes = ret;
 		req->ki_left = ret;
 
+		if (current->aio_data)
+			goto aio_submit_task;
+		if (!rw_op)
+			return -EINVAL;
 		ret = aio_rw_vect_retry(req, rw, rw_op);
 		break;
 
 	case IOCB_CMD_FDSYNC:
-		if (!file->f_op->aio_fsync)
-			return -EINVAL;
-
-		ret = file->f_op->aio_fsync(req, 1);
-		break;
-
 	case IOCB_CMD_FSYNC:
-		if (!file->f_op->aio_fsync)
-			return -EINVAL;
-
-		ret = file->f_op->aio_fsync(req, 0);
+	{
+		struct task_struct *task;
+
+aio_submit_task:
+		task = current->aio_data;
+		BUG_ON(task->aio_data != NULL);
+		if (task) {
+			current->aio_data = NULL;
+			req->private = task;
+			task->aio_data = req;
+			kiocb_set_cancel_fn(req, aio_thread_cancel_fn);
+			wake_up_process(task);
+			ret = -EIOCBQUEUED;
+		} else {
+			if (!file->f_op->aio_fsync)
+				return -EINVAL;
+			ret = file->f_op->aio_fsync(req, req->ki_opcode ==
+							 IOCB_CMD_FDSYNC);
+		}
 		break;
-
+	}
 	default:
 		pr_debug("EINVAL: no operation provided\n");
 		return -EINVAL;
@@ -1240,6 +1311,128 @@ rw_common:
 	return 0;
 }
 
+static int aio_thread_fn(void *data)
+{
+	kiocb_cancel_fn *cancel;
+	struct kiocb *iocb;
+	struct kioctx *ctx;
+	ssize_t ret;
+
+again:
+	iocb = current->aio_data;
+	current->aio_data = NULL;
+
+	if (!iocb)
+		return 0;
+
+	ctx = iocb->ki_ctx;
+	use_mm(ctx->mm);
+	set_fs(USER_DS);
+
+	iocb->private = current;
+	ret = -EINVAL;
+
+	switch (iocb->ki_opcode) {
+	case IOCB_CMD_PREAD:
+		if (!iocb->ki_filp->f_op->read)
+			break;
+		ret = iocb->ki_filp->f_op->read(iocb->ki_filp, iocb->ki_buf,
+						iocb->ki_nbytes, &iocb->ki_pos);
+		break;
+
+	case IOCB_CMD_PWRITE:
+		if (!iocb->ki_filp->f_op->write)
+			break;
+		ret = iocb->ki_filp->f_op->write(iocb->ki_filp,
+						 iocb->ki_buf,
+						 iocb->ki_nbytes,
+						 &iocb->ki_pos);
+		break;
+
+	case IOCB_CMD_FSYNC:
+	case IOCB_CMD_FDSYNC:
+		ret = iocb->ki_filp->f_op->fsync(iocb->ki_filp, 0, LLONG_MAX,
+						 iocb->ki_opcode == IOCB_CMD_FDSYNC);
+	default:
+		break;
+	}
+
+	cancel = cmpxchg(&iocb->ki_cancel, aio_thread_cancel_fn, NULL);
+	if (cancel == KIOCB_CANCELLED) {
+		set_current_state(TASK_INTERRUPTIBLE);
+		while (!signal_pending(current)) {
+			schedule();
+			if (signal_pending(current))
+				break;
+			set_current_state(TASK_INTERRUPTIBLE);
+		}
+	} else
+		BUG_ON(cancel != aio_thread_cancel_fn);
+
+	if (signal_pending(current))
+		flush_signals(current);
+
+	set_current_state(TASK_INTERRUPTIBLE);
+
+	spin_lock(&ctx->worker_lock);
+	list_add(&current->aio_list, &ctx->worker_list);
+	spin_unlock(&ctx->worker_lock);
+
+	if (ret != -EIOCBQUEUED) {
+		/*
+		 * There's no easy way to restart the syscall since other AIO's
+		 * may be already running. Just fail this IO with EINTR.
+		 */
+		if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+			     ret == -ERESTARTNOHAND ||
+			     ret == -ERESTART_RESTARTBLOCK))
+			ret = -EINTR;
+		aio_complete(iocb, ret, 0);
+	}
+
+	set_fs(KERNEL_DS);
+	unuse_mm(current->mm);
+
+	if (current->aio_data) {
+		set_current_state(TASK_RUNNING);
+		goto again;
+	}
+
+	schedule();
+	if (current->aio_data)
+		goto again;
+	return 0;
+}
+
+static int make_helper_thread(struct kioctx *ctx)
+{
+	struct task_struct *task;
+	char name[32];
+
+	if (current->aio_data)
+		return 0;
+
+	spin_lock(&ctx->worker_lock);
+	if (!list_empty(&ctx->worker_list)) {
+		struct task_struct *task;
+		task = list_entry(ctx->worker_list.next, struct task_struct,
+				  aio_list);
+		list_del(&task->aio_list);
+		spin_unlock(&ctx->worker_lock);
+		current->aio_data = task;
+		return 0;
+	}
+	spin_unlock(&ctx->worker_lock);
+
+	snprintf(name, sizeof(name), "aio-helper-%d", current->pid);
+	task = kthread_create(aio_thread_fn, NULL, name);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+
+	current->aio_data = task;
+	return 0;
+}
+
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 struct iocb *iocb, bool compat)
 {
@@ -1293,6 +1486,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 		goto out_put_req;
 	}
 
+	ret = -ENOMEM;
+	if (make_helper_thread(ctx))
+		goto out_put_req;
+
 	req->ki_obj.user = user_iocb;
 	req->ki_user_data = iocb->aio_data;
 	req->ki_pos = iocb->aio_offset;
@@ -1300,8 +1497,11 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
 	req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
 	req->ki_opcode = iocb->aio_lio_opcode;
+	req->ki_compat = compat;
 
-	ret = aio_run_iocb(req, compat);
+	current->in_aio_submit = 1;
+	ret = aio_run_iocb(req);
+	current->in_aio_submit = 0;
 	if (ret)
 		goto out_put_req;
 
@@ -1488,3 +1688,5 @@ SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
 	asmlinkage_protect(5, ret, ctx_id, min_nr, nr, events, timeout);
 	return ret;
 }
+
+/* foo */
diff --git a/fs/exec.c b/fs/exec.c
index dc38755..be39eff 100644
--- a/fs/exec.c
+++ b/fs/exec.c
@@ -826,6 +826,12 @@ static int exec_mmap(struct mm_struct *mm)
 			return -EINTR;
 		}
 	}
+	if (tsk->aio_data) {
+		struct task_struct *p = tsk->aio_data;
+		tsk->aio_data = NULL;
+		wake_up_process(p);
+	}
+
 	task_lock(tsk);
 	active_mm = tsk->active_mm;
 	tsk->mm = mm;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index a7e4c59..c2ac93f 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -54,6 +54,7 @@ struct kiocb {
 	void			*private;
 	/* State that we remember to be able to restart/retry  */
 	unsigned short		ki_opcode;
+	unsigned short		ki_compat;
 	size_t			ki_nbytes; 	/* copy of iocb->aio_nbytes */
 	char 			__user *ki_buf;	/* remaining iocb->aio_buf */
 	size_t			ki_left; 	/* remaining bytes */
diff --git a/include/linux/sched.h b/include/linux/sched.h
index f0e3a11..34011b3 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1318,6 +1318,7 @@ struct task_struct {
 	/* Revert to default priority/policy when forking */
 	unsigned sched_reset_on_fork:1;
 	unsigned sched_contributes_to_load:1;
+	unsigned in_aio_submit:1;
 
 	pid_t pid;
 	pid_t tgid;
@@ -1607,6 +1608,8 @@ struct task_struct {
 #ifdef CONFIG_UPROBES
 	struct uprobe_task *utask;
 #endif
+	void *aio_data;
+	struct list_head aio_list;
 };
 
 /* Future-safe accessor for struct task_struct's cpus_allowed. */
diff --git a/kernel/exit.c b/kernel/exit.c
index 7dd2040..5202018 100644
--- a/kernel/exit.c
+++ b/kernel/exit.c
@@ -785,6 +785,12 @@ void do_exit(long code)
 	tsk->exit_code = code;
 	taskstats_exit(tsk, group_dead);
 
+	if (tsk->aio_data) {
+		wake_up_process(tsk->aio_data);
+		tsk->aio_data = NULL;
+	}
+
+
 	exit_mm(tsk);
 
 	if (group_dead)
diff --git a/kernel/fork.c b/kernel/fork.c
index e6d16bb..83c532d 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -207,6 +207,10 @@ static void account_kernel_stack(struct thread_info *ti, int account)
 
 void free_task(struct task_struct *tsk)
 {
+	if (current->aio_data) {
+		wake_up_process(current->aio_data);
+		current->aio_data = NULL;
+	}
 	account_kernel_stack(tsk->stack, -1);
 	arch_release_thread_info(tsk->stack);
 	free_thread_info(tsk->stack);
@@ -332,6 +336,7 @@ static struct task_struct *dup_task_struct(struct task_struct *orig)
 #endif
 	tsk->splice_pipe = NULL;
 	tsk->task_frag.page = NULL;
+	tsk->aio_data = NULL;
 
 	account_kernel_stack(ti, 1);
 
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 55a5ae3..626d6c0 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -2895,6 +2895,8 @@ static void __sched __schedule(void)
 	struct rq *rq;
 	int cpu;
 
+	WARN_ON(current->in_aio_submit);
+
 need_resched:
 	preempt_disable();
 	cpu = smp_processor_id();
-- 
1.7.4.1


-- 
"Thought is the essence of where you are now."
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux