Recent changes (master)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The following changes since commit 8a16f59b66e5198050ec735998692ba9436a9884:

  blktrace: only probe and set depth if option isn't set (2015-04-22 19:54:54 -0600)

are available in the git repository at:

  git://git.kernel.dk/fio.git master

for you to fetch changes up to 36870c47801ca85d1a98c7ec47950b0aa360f865:

  Merge branch 'io-threads' (2015-04-23 11:13:27 -0600)

----------------------------------------------------------------
Jens Axboe (7):
      Add ->bytes_done[] to struct thread_data
      backend: split queue io_u event handling into helper
      First cut at supporting IO offload
      libaio: don't call io_destroy(), let exit_aio() take care of it
      Add man page and HOWTO for io_submit_mode option
      Add sample job file for fixed submission rate
      Merge branch 'io-threads'

 HOWTO                              |  12 +
 Makefile                           |   2 +-
 backend.c                          | 326 ++++++++++++++-------------
 cconv.c                            |   2 +
 engines/libaio.c                   |   9 +-
 examples/fixed-rate-submission.fio |  10 +
 fio.1                              |  11 +
 fio.h                              |  61 +++--
 init.c                             |   3 +
 io_u.c                             | 113 ++++++----
 ioengine.h                         |  15 +-
 ioengines.c                        |  16 +-
 libfio.c                           |   1 +
 options.c                          |  20 ++
 stat.c                             |   6 +
 thread_options.h                   |   3 +-
 verify.c                           |   8 +-
 workqueue.c                        | 444 +++++++++++++++++++++++++++++++++++++
 workqueue.h                        |  29 +++
 19 files changed, 853 insertions(+), 238 deletions(-)
 create mode 100644 examples/fixed-rate-submission.fio
 create mode 100644 workqueue.c
 create mode 100644 workqueue.h

---

Diff of recent changes:

diff --git a/HOWTO b/HOWTO
index 60eab24..bcc72b5 100644
--- a/HOWTO
+++ b/HOWTO
@@ -820,6 +820,18 @@ iodepth_low=int	The low water mark indicating when to start filling
 		after fio has filled the queue of 16 requests, it will let
 		the depth drain down to 4 before starting to fill it again.
 
+io_submit_mode=str	This option controls how fio submits the IO to
+		the IO engine. The default is 'inline', which means that the
+		fio job threads submit and reap IO directly. If set to
+		'offload', the job threads will offload IO submission to a
+		dedicated pool of IO threads. This requires some coordination
+		and thus has a bit of extra overhead, especially for lower
+		queue depth IO where it can increase latencies. The benefit
+		is that fio can manage submission rates independently of
+		the device completion rates. This avoids skewed latency
+		reporting if IO gets back up on the device side (the
+		coordinated omission problem).
+
 direct=bool	If value is true, use non-buffered io. This is usually
 		O_DIRECT. Note that ZFS on Solaris doesn't support direct io.
 		On Windows the synchronous ioengines don't support direct io.
diff --git a/Makefile b/Makefile
index 4202ed8..9b7f27a 100644
--- a/Makefile
+++ b/Makefile
@@ -36,7 +36,7 @@ SOURCE := gettime.c ioengines.c init.c stat.c log.c time.c filesetup.c \
 		lib/lfsr.c gettime-thread.c helpers.c lib/flist_sort.c \
 		lib/hweight.c lib/getrusage.c idletime.c td_error.c \
 		profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
-		lib/tp.c lib/bloom.c lib/gauss.c
+		lib/tp.c lib/bloom.c lib/gauss.c workqueue.c
 
 ifdef CONFIG_LIBHDFS
   HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
diff --git a/backend.c b/backend.c
index 25479b4..65a3e18 100644
--- a/backend.c
+++ b/backend.c
@@ -54,6 +54,7 @@
 #include "idletime.h"
 #include "err.h"
 #include "lib/tp.h"
+#include "workqueue.h"
 
 static pthread_t helper_thread;
 static pthread_mutex_t helper_lock;
@@ -229,16 +230,15 @@ static int __check_min_rate(struct thread_data *td, struct timeval *now,
 	return 0;
 }
 
-static int check_min_rate(struct thread_data *td, struct timeval *now,
-			  uint64_t *bytes_done)
+static int check_min_rate(struct thread_data *td, struct timeval *now)
 {
 	int ret = 0;
 
-	if (bytes_done[DDIR_READ])
+	if (td->bytes_done[DDIR_READ])
 		ret |= __check_min_rate(td, now, DDIR_READ);
-	if (bytes_done[DDIR_WRITE])
+	if (td->bytes_done[DDIR_WRITE])
 		ret |= __check_min_rate(td, now, DDIR_WRITE);
-	if (bytes_done[DDIR_TRIM])
+	if (td->bytes_done[DDIR_TRIM])
 		ret |= __check_min_rate(td, now, DDIR_TRIM);
 
 	return ret;
@@ -255,7 +255,7 @@ static void cleanup_pending_aio(struct thread_data *td)
 	/*
 	 * get immediately available events, if any
 	 */
-	r = io_u_queued_complete(td, 0, NULL);
+	r = io_u_queued_complete(td, 0);
 	if (r < 0)
 		return;
 
@@ -276,7 +276,7 @@ static void cleanup_pending_aio(struct thread_data *td)
 	}
 
 	if (td->cur_depth)
-		r = io_u_queued_complete(td, td->cur_depth, NULL);
+		r = io_u_queued_complete(td, td->cur_depth);
 }
 
 /*
@@ -306,7 +306,7 @@ requeue:
 		put_io_u(td, io_u);
 		return 1;
 	} else if (ret == FIO_Q_QUEUED) {
-		if (io_u_queued_complete(td, 1, NULL) < 0)
+		if (io_u_queued_complete(td, 1) < 0)
 			return 1;
 	} else if (ret == FIO_Q_COMPLETED) {
 		if (io_u->error) {
@@ -314,7 +314,7 @@ requeue:
 			return 1;
 		}
 
-		if (io_u_sync_complete(td, io_u, NULL) < 0)
+		if (io_u_sync_complete(td, io_u) < 0)
 			return 1;
 	} else if (ret == FIO_Q_BUSY) {
 		if (td_io_commit(td))
@@ -418,8 +418,7 @@ static void check_update_rusage(struct thread_data *td)
 	}
 }
 
-static int wait_for_completions(struct thread_data *td, struct timeval *time,
-				uint64_t *bytes_done)
+static int wait_for_completions(struct thread_data *td, struct timeval *time)
 {
 	const int full = queue_full(td);
 	int min_evts = 0;
@@ -438,7 +437,7 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time,
 		fio_gettime(time, NULL);
 
 	do {
-		ret = io_u_queued_complete(td, min_evts, bytes_done);
+		ret = io_u_queued_complete(td, min_evts);
 		if (ret < 0)
 			break;
 	} while (full && (td->cur_depth > td->o.iodepth_low));
@@ -446,13 +445,99 @@ static int wait_for_completions(struct thread_data *td, struct timeval *time,
 	return ret;
 }
 
+int io_queue_event(struct thread_data *td, struct io_u *io_u, int *ret,
+		   enum fio_ddir ddir, uint64_t *bytes_issued, int from_verify,
+		   struct timeval *comp_time)
+{
+	int ret2;
+
+	switch (*ret) {
+	case FIO_Q_COMPLETED:
+		if (io_u->error) {
+			*ret = -io_u->error;
+			clear_io_u(td, io_u);
+		} else if (io_u->resid) {
+			int bytes = io_u->xfer_buflen - io_u->resid;
+			struct fio_file *f = io_u->file;
+
+			if (bytes_issued)
+				*bytes_issued += bytes;
+
+			if (!from_verify)
+				trim_io_piece(td, io_u);
+
+			/*
+			 * zero read, fail
+			 */
+			if (!bytes) {
+				if (!from_verify)
+					unlog_io_piece(td, io_u);
+				td_verror(td, EIO, "full resid");
+				put_io_u(td, io_u);
+				break;
+			}
+
+			io_u->xfer_buflen = io_u->resid;
+			io_u->xfer_buf += bytes;
+			io_u->offset += bytes;
+
+			if (ddir_rw(io_u->ddir))
+				td->ts.short_io_u[io_u->ddir]++;
+
+			f = io_u->file;
+			if (io_u->offset == f->real_file_size)
+				goto sync_done;
+
+			requeue_io_u(td, &io_u);
+		} else {
+sync_done:
+			if (comp_time && (__should_check_rate(td, DDIR_READ) ||
+			    __should_check_rate(td, DDIR_WRITE) ||
+			    __should_check_rate(td, DDIR_TRIM)))
+				fio_gettime(comp_time, NULL);
+
+			*ret = io_u_sync_complete(td, io_u);
+			if (*ret < 0)
+				break;
+		}
+		return 0;
+	case FIO_Q_QUEUED:
+		/*
+		 * if the engine doesn't have a commit hook,
+		 * the io_u is really queued. if it does have such
+		 * a hook, it has to call io_u_queued() itself.
+		 */
+		if (td->io_ops->commit == NULL)
+			io_u_queued(td, io_u);
+		if (bytes_issued)
+			*bytes_issued += io_u->xfer_buflen;
+		break;
+	case FIO_Q_BUSY:
+		if (!from_verify)
+			unlog_io_piece(td, io_u);
+		requeue_io_u(td, &io_u);
+		ret2 = td_io_commit(td);
+		if (ret2 < 0)
+			*ret = ret2;
+		break;
+	default:
+		assert(ret < 0);
+		td_verror(td, -(*ret), "td_io_queue");
+		break;
+	}
+
+	if (break_on_this_error(td, ddir, ret))
+		return 1;
+
+	return 0;
+}
+
 /*
  * The main verify engine. Runs over the writes we previously submitted,
  * reads the blocks back in, and checks the crc/md5 of the data.
  */
 static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 {
-	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
 	struct fio_file *f;
 	struct io_u *io_u;
 	int ret, min_events;
@@ -483,7 +568,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 	io_u = NULL;
 	while (!td->terminate) {
 		enum fio_ddir ddir;
-		int ret2, full;
+		int full;
 
 		update_tv_cache(td);
 		check_update_rusage(td);
@@ -514,7 +599,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 				break;
 			}
 		} else {
-			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
+			if (ddir_rw_sum(td->bytes_done) + td->o.rw_min_bs > verify_bytes)
 				break;
 
 			while ((io_u = get_io_u(td)) != NULL) {
@@ -539,7 +624,7 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 					continue;
 				} else if (io_u->ddir == DDIR_TRIM) {
 					io_u->ddir = DDIR_READ;
-					io_u->flags |= IO_U_F_TRIMMED;
+					io_u_set(io_u, IO_U_F_TRIMMED);
 					break;
 				} else if (io_u->ddir == DDIR_WRITE) {
 					io_u->ddir = DDIR_READ;
@@ -569,57 +654,8 @@ static void do_verify(struct thread_data *td, uint64_t verify_bytes)
 			fio_gettime(&io_u->start_time, NULL);
 
 		ret = td_io_queue(td, io_u);
-		switch (ret) {
-		case FIO_Q_COMPLETED:
-			if (io_u->error) {
-				ret = -io_u->error;
-				clear_io_u(td, io_u);
-			} else if (io_u->resid) {
-				int bytes = io_u->xfer_buflen - io_u->resid;
-
-				/*
-				 * zero read, fail
-				 */
-				if (!bytes) {
-					td_verror(td, EIO, "full resid");
-					put_io_u(td, io_u);
-					break;
-				}
-
-				io_u->xfer_buflen = io_u->resid;
-				io_u->xfer_buf += bytes;
-				io_u->offset += bytes;
-
-				if (ddir_rw(io_u->ddir))
-					td->ts.short_io_u[io_u->ddir]++;
 
-				f = io_u->file;
-				if (io_u->offset == f->real_file_size)
-					goto sync_done;
-
-				requeue_io_u(td, &io_u);
-			} else {
-sync_done:
-				ret = io_u_sync_complete(td, io_u, bytes_done);
-				if (ret < 0)
-					break;
-			}
-			continue;
-		case FIO_Q_QUEUED:
-			break;
-		case FIO_Q_BUSY:
-			requeue_io_u(td, &io_u);
-			ret2 = td_io_commit(td);
-			if (ret2 < 0)
-				ret = ret2;
-			break;
-		default:
-			assert(ret < 0);
-			td_verror(td, -ret, "td_io_queue");
-			break;
-		}
-
-		if (break_on_this_error(td, ddir, &ret))
+		if (io_queue_event(td, io_u, &ret, ddir, NULL, 1, NULL))
 			break;
 
 		/*
@@ -630,7 +666,7 @@ sync_done:
 reap:
 		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
 		if (full || !td->o.iodepth_batch_complete)
-			ret = wait_for_completions(td, NULL, bytes_done);
+			ret = wait_for_completions(td, NULL);
 
 		if (ret < 0)
 			break;
@@ -642,7 +678,7 @@ reap:
 		min_events = td->cur_depth;
 
 		if (min_events)
-			ret = io_u_queued_complete(td, min_events, NULL);
+			ret = io_u_queued_complete(td, min_events);
 	} else
 		cleanup_pending_aio(td);
 
@@ -716,7 +752,6 @@ static int io_complete_bytes_exceeded(struct thread_data *td)
  */
 static uint64_t do_io(struct thread_data *td)
 {
-	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
 	unsigned int i;
 	int ret = 0;
 	uint64_t total_bytes, bytes_issued = 0;
@@ -754,7 +789,7 @@ static uint64_t do_io(struct thread_data *td)
 		td->o.time_based) {
 		struct timeval comp_time;
 		struct io_u *io_u;
-		int ret2, full;
+		int full;
 		enum fio_ddir ddir;
 
 		check_update_rusage(td);
@@ -834,97 +869,35 @@ static uint64_t do_io(struct thread_data *td)
 		    !td->o.experimental_verify)
 			log_io_piece(td, io_u);
 
-		ret = td_io_queue(td, io_u);
-		switch (ret) {
-		case FIO_Q_COMPLETED:
-			if (io_u->error) {
-				ret = -io_u->error;
-				unlog_io_piece(td, io_u);
-				clear_io_u(td, io_u);
-			} else if (io_u->resid) {
-				int bytes = io_u->xfer_buflen - io_u->resid;
-				struct fio_file *f = io_u->file;
-
-				bytes_issued += bytes;
-
-				trim_io_piece(td, io_u);
-
-				/*
-				 * zero read, fail
-				 */
-				if (!bytes) {
-					unlog_io_piece(td, io_u);
-					td_verror(td, EIO, "full resid");
-					put_io_u(td, io_u);
-					break;
-				}
-
-				io_u->xfer_buflen = io_u->resid;
-				io_u->xfer_buf += bytes;
-				io_u->offset += bytes;
-
-				if (ddir_rw(io_u->ddir))
-					td->ts.short_io_u[io_u->ddir]++;
-
-				if (io_u->offset == f->real_file_size)
-					goto sync_done;
+		if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+			if (td->error)
+				break;
+			ret = workqueue_enqueue(&td->io_wq, io_u);
+		} else {
+			ret = td_io_queue(td, io_u);
 
-				requeue_io_u(td, &io_u);
-			} else {
-sync_done:
-				if (__should_check_rate(td, DDIR_READ) ||
-				    __should_check_rate(td, DDIR_WRITE) ||
-				    __should_check_rate(td, DDIR_TRIM))
-					fio_gettime(&comp_time, NULL);
+			if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 1, &comp_time))
+				break;
 
-				ret = io_u_sync_complete(td, io_u, bytes_done);
-				if (ret < 0)
-					break;
-				bytes_issued += io_u->xfer_buflen;
-			}
-			break;
-		case FIO_Q_QUEUED:
 			/*
-			 * if the engine doesn't have a commit hook,
-			 * the io_u is really queued. if it does have such
-			 * a hook, it has to call io_u_queued() itself.
+			 * See if we need to complete some commands. Note that
+			 * we can get BUSY even without IO queued, if the
+			 * system is resource starved.
 			 */
-			if (td->io_ops->commit == NULL)
-				io_u_queued(td, io_u);
-			bytes_issued += io_u->xfer_buflen;
-			break;
-		case FIO_Q_BUSY:
-			unlog_io_piece(td, io_u);
-			requeue_io_u(td, &io_u);
-			ret2 = td_io_commit(td);
-			if (ret2 < 0)
-				ret = ret2;
-			break;
-		default:
-			assert(ret < 0);
-			put_io_u(td, io_u);
-			break;
-		}
-
-		if (break_on_this_error(td, ddir, &ret))
-			break;
-
-		/*
-		 * See if we need to complete some commands. Note that we
-		 * can get BUSY even without IO queued, if the system is
-		 * resource starved.
-		 */
 reap:
-		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
-		if (full || !td->o.iodepth_batch_complete)
-			ret = wait_for_completions(td, &comp_time, bytes_done);
+			full = queue_full(td) ||
+				(ret == FIO_Q_BUSY && td->cur_depth);
+			if (full || !td->o.iodepth_batch_complete)
+				ret = wait_for_completions(td, &comp_time);
+		}
 		if (ret < 0)
 			break;
-		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
+		if (!ddir_rw_sum(td->bytes_done) &&
+		    !(td->io_ops->flags & FIO_NOIO))
 			continue;
 
-		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
-			if (check_min_rate(td, &comp_time, bytes_done)) {
+		if (!in_ramp_time(td) && should_check_rate(td)) {
+			if (check_min_rate(td, &comp_time)) {
 				if (exitall_on_terminate)
 					fio_terminate_threads(td->groupid);
 				td_verror(td, EIO, "check_min_rate");
@@ -965,9 +938,14 @@ reap:
 	if (!td->error) {
 		struct fio_file *f;
 
-		i = td->cur_depth;
+		if (td->o.io_submit_mode == IO_MODE_OFFLOAD) {
+			workqueue_flush(&td->io_wq);
+			i = 0;
+		} else
+			i = td->cur_depth;
+
 		if (i) {
-			ret = io_u_queued_complete(td, i, bytes_done);
+			ret = io_u_queued_complete(td, i);
 			if (td->o.fill_device && td->error == ENOSPC)
 				td->error = 0;
 		}
@@ -992,7 +970,7 @@ reap:
 	if (!ddir_rw_sum(td->this_io_bytes))
 		td->done = 1;
 
-	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+	return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
 }
 
 static void cleanup_io_u(struct thread_data *td)
@@ -1262,8 +1240,6 @@ static int exec_string(struct thread_options *o, const char *string, const char
  */
 static uint64_t do_dry_run(struct thread_data *td)
 {
-	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
-
 	td_set_runstate(td, TD_RUNNING);
 
 	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
@@ -1278,7 +1254,7 @@ static uint64_t do_dry_run(struct thread_data *td)
 		if (!io_u)
 			break;
 
-		io_u->flags |= IO_U_F_FLIGHT;
+		io_u_set(io_u, IO_U_F_FLIGHT);
 		io_u->error = 0;
 		io_u->resid = 0;
 		if (ddir_rw(acct_ddir(io_u)))
@@ -1294,11 +1270,34 @@ static uint64_t do_dry_run(struct thread_data *td)
 		    !td->o.experimental_verify)
 			log_io_piece(td, io_u);
 
-		ret = io_u_sync_complete(td, io_u, bytes_done);
+		ret = io_u_sync_complete(td, io_u);
 		(void) ret;
 	}
 
-	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
+	return td->bytes_done[DDIR_WRITE] + td->bytes_done[DDIR_TRIM];
+}
+
+static void io_workqueue_fn(struct thread_data *td, struct io_u *io_u)
+{
+	const enum fio_ddir ddir = io_u->ddir;
+	int ret;
+
+	dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid());
+
+	io_u_set(io_u, IO_U_F_NO_FILE_PUT);
+
+	td->cur_depth++;
+
+	ret = td_io_queue(td, io_u);
+
+	dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid());
+
+	io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL);
+
+	if (ret == FIO_Q_QUEUED)
+		ret = io_u_queued_complete(td, 1);
+
+	td->cur_depth--;
 }
 
 /*
@@ -1498,6 +1497,10 @@ static void *thread_main(void *data)
 
 	fio_verify_init(td);
 
+	if ((o->io_submit_mode == IO_MODE_OFFLOAD) &&
+	    workqueue_init(td, &td->io_wq, io_workqueue_fn, td->o.iodepth))
+		goto err;
+
 	fio_gettime(&td->epoch, NULL);
 	fio_getrusage(&td->ru_start);
 	clear_state = 0;
@@ -1606,6 +1609,9 @@ static void *thread_main(void *data)
 
 	fio_writeout_logs(td);
 
+	if (o->io_submit_mode == IO_MODE_OFFLOAD)
+		workqueue_exit(&td->io_wq);
+
 	if (td->flags & TD_F_COMPRESS_LOG)
 		tp_exit(&td->tp_data);
 
diff --git a/cconv.c b/cconv.c
index 68f119f..976059c 100644
--- a/cconv.c
+++ b/cconv.c
@@ -116,6 +116,7 @@ void convert_thread_options_to_cpu(struct thread_options *o,
 	}
 
 	o->ratecycle = le32_to_cpu(top->ratecycle);
+	o->io_submit_mode = le32_to_cpu(top->io_submit_mode);
 	o->nr_files = le32_to_cpu(top->nr_files);
 	o->open_files = le32_to_cpu(top->open_files);
 	o->file_lock_mode = le32_to_cpu(top->file_lock_mode);
@@ -295,6 +296,7 @@ void convert_thread_options_to_net(struct thread_options_pack *top,
 	top->fill_device = cpu_to_le32(o->fill_device);
 	top->file_append = cpu_to_le32(o->file_append);
 	top->ratecycle = cpu_to_le32(o->ratecycle);
+	top->io_submit_mode = cpu_to_le32(o->io_submit_mode);
 	top->nr_files = cpu_to_le32(o->nr_files);
 	top->open_files = cpu_to_le32(o->open_files);
 	top->file_lock_mode = cpu_to_le32(o->file_lock_mode);
diff --git a/engines/libaio.c b/engines/libaio.c
index d4f4830..8ba21f8 100644
--- a/engines/libaio.c
+++ b/engines/libaio.c
@@ -316,7 +316,14 @@ static void fio_libaio_cleanup(struct thread_data *td)
 	struct libaio_data *ld = td->io_ops->data;
 
 	if (ld) {
-		io_destroy(ld->aio_ctx);
+		/*
+		 * Work-around to avoid huge RCU stalls at exit time. If we
+		 * don't do this here, then it'll be torn down by exit_aio().
+		 * But for that case we can parallellize the freeing, thus
+		 * speeding it up a lot.
+		 */
+		if (!(td->flags & TD_F_CHILD))
+			io_destroy(ld->aio_ctx);
 		free(ld->aio_events);
 		free(ld->iocbs);
 		free(ld->io_us);
diff --git a/examples/fixed-rate-submission.fio b/examples/fixed-rate-submission.fio
new file mode 100644
index 0000000..076a868
--- /dev/null
+++ b/examples/fixed-rate-submission.fio
@@ -0,0 +1,10 @@
+[fixed-rate-submit]
+size=128m
+rw=read
+ioengine=libaio
+iodepth=32
+direct=1
+# by setting the submit mode to offload, we can guarantee a fixed rate of
+# submission regardless of what the device completion rate is.
+io_submit_mode=offload
+rate_iops=1000
diff --git a/fio.1 b/fio.1
index 81bcf06..a77c71c 100644
--- a/fio.1
+++ b/fio.1
@@ -694,6 +694,17 @@ cost of more retrieval system calls.
 Low watermark indicating when to start filling the queue again.  Default:
 \fBiodepth\fR. 
 .TP
+.BI io_submit_mode \fR=\fPstr
+This option controls how fio submits the IO to the IO engine. The default is
+\fBinline\fR, which means that the fio job threads submit and reap IO directly.
+If set to \fBoffload\fR, the job threads will offload IO submission to a
+dedicated pool of IO threads. This requires some coordination and thus has a
+bit of extra overhead, especially for lower queue depth IO where it can
+increase latencies. The benefit is that fio can manage submission rates
+independently of the device completion rates. This avoids skewed latency
+reporting if IO gets back up on the device side (the coordinated omission
+problem).
+.TP
 .BI direct \fR=\fPbool
 If true, use non-buffered I/O (usually O_DIRECT).  Default: false.
 .TP
diff --git a/fio.h b/fio.h
index 0fb86ea..a4637bb 100644
--- a/fio.h
+++ b/fio.h
@@ -40,6 +40,7 @@
 #include "stat.h"
 #include "flow.h"
 #include "io_u_queue.h"
+#include "workqueue.h"
 
 #ifdef CONFIG_SOLARISAIO
 #include <sys/asynch.h>
@@ -75,6 +76,8 @@ enum {
 	TD_F_NOIO		= 256,
 	TD_F_COMPRESS_LOG	= 512,
 	TD_F_VSTATE_SAVED	= 1024,
+	TD_F_NEED_LOCK		= 2048,
+	TD_F_CHILD		= 4096,
 };
 
 enum {
@@ -94,6 +97,11 @@ enum {
 	FIO_RAND_NR_OFFS,
 };
 
+enum {
+	IO_MODE_INLINE = 0,
+	IO_MODE_OFFLOAD,
+};
+
 /*
  * This describes a single thread/process executing a fio job.
  */
@@ -118,6 +126,8 @@ struct thread_data {
 
 	struct tp_data *tp_data;
 
+	struct thread_data *parent;
+
 	uint64_t stat_io_bytes[DDIR_RWDIR_CNT];
 	struct timeval bw_sample_time;
 
@@ -232,6 +242,11 @@ struct thread_data {
 	unsigned long rate_blocks[DDIR_RWDIR_CNT];
 	struct timeval lastrate[DDIR_RWDIR_CNT];
 
+	/*
+	 * Enforced rate submission/completion workqueue
+	 */
+	struct workqueue io_wq;
+
 	uint64_t total_io_size;
 	uint64_t fill_device_size;
 
@@ -248,10 +263,11 @@ struct thread_data {
 	uint64_t io_blocks[DDIR_RWDIR_CNT];
 	uint64_t this_io_blocks[DDIR_RWDIR_CNT];
 	uint64_t io_bytes[DDIR_RWDIR_CNT];
-	uint64_t io_skip_bytes;
 	uint64_t this_io_bytes[DDIR_RWDIR_CNT];
+	uint64_t io_skip_bytes;
 	uint64_t zone_bytes;
 	struct fio_mutex *mutex;
+	uint64_t bytes_done[DDIR_RWDIR_CNT];
 
 	/*
 	 * State for random io, a bitmap of blocks done vs not done
@@ -364,12 +380,23 @@ enum {
 	} while (0)
 
 
-#define td_clear_error(td)		\
-	(td)->error = 0;
-#define td_verror(td, err, func)	\
-	__td_verror((td), (err), strerror((err)), (func))
-#define td_vmsg(td, err, msg, func)	\
-	__td_verror((td), (err), (msg), (func))
+#define td_clear_error(td)		do {		\
+	(td)->error = 0;				\
+	if ((td)->parent)				\
+		(td)->parent->error = 0;		\
+} while (0)
+
+#define td_verror(td, err, func)	do {			\
+	__td_verror((td), (err), strerror((err)), (func));	\
+	if ((td)->parent)					\
+		__td_verror((td)->parent, (err), strerror((err)), (func)); \
+} while (0)
+
+#define td_vmsg(td, err, msg, func)	do {			\
+	__td_verror((td), (err), (msg), (func));		\
+	if ((td)->parent)					\
+		__td_verror((td)->parent, (err), (msg), (func));	\
+} while (0)
 
 #define __fio_stringify_1(x)	#x
 #define __fio_stringify(x)	__fio_stringify_1(x)
@@ -574,16 +601,15 @@ static inline int __should_check_rate(struct thread_data *td,
 	return 0;
 }
 
-static inline int should_check_rate(struct thread_data *td,
-				    uint64_t *bytes_done)
+static inline int should_check_rate(struct thread_data *td)
 {
 	int ret = 0;
 
-	if (bytes_done[DDIR_READ])
+	if (td->bytes_done[DDIR_READ])
 		ret |= __should_check_rate(td, DDIR_READ);
-	if (bytes_done[DDIR_WRITE])
+	if (td->bytes_done[DDIR_WRITE])
 		ret |= __should_check_rate(td, DDIR_WRITE);
-	if (bytes_done[DDIR_TRIM])
+	if (td->bytes_done[DDIR_TRIM])
 		ret |= __should_check_rate(td, DDIR_TRIM);
 
 	return ret;
@@ -610,25 +636,30 @@ static inline int is_power_of_2(uint64_t val)
 	return (val != 0 && ((val & (val - 1)) == 0));
 }
 
+static inline int td_async_processing(struct thread_data *td)
+{
+	return (td->flags & TD_F_NEED_LOCK) != 0;
+}
+
 /*
  * We currently only need to do locking if we have verifier threads
  * accessing our internal structures too
  */
 static inline void td_io_u_lock(struct thread_data *td)
 {
-	if (td->o.verify_async)
+	if (td_async_processing(td))
 		pthread_mutex_lock(&td->io_u_lock);
 }
 
 static inline void td_io_u_unlock(struct thread_data *td)
 {
-	if (td->o.verify_async)
+	if (td_async_processing(td))
 		pthread_mutex_unlock(&td->io_u_lock);
 }
 
 static inline void td_io_u_free_notify(struct thread_data *td)
 {
-	if (td->o.verify_async)
+	if (td_async_processing(td))
 		pthread_cond_signal(&td->free_cond);
 }
 
diff --git a/init.c b/init.c
index a126f79..1a5d4c9 100644
--- a/init.c
+++ b/init.c
@@ -956,6 +956,9 @@ static void init_flags(struct thread_data *td)
 		td->flags |= TD_F_SCRAMBLE_BUFFERS;
 	if (o->verify != VERIFY_NONE)
 		td->flags |= TD_F_VER_NONE;
+
+	if (o->verify_async || o->io_submit_mode == IO_MODE_OFFLOAD)
+		td->flags |= TD_F_NEED_LOCK;
 }
 
 static int setup_random_seeds(struct thread_data *td)
diff --git a/io_u.c b/io_u.c
index ebd75c1..ba3f7ca 100644
--- a/io_u.c
+++ b/io_u.c
@@ -326,7 +326,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u,
 				*is_random = 1;
 			} else {
 				*is_random = 0;
-				io_u->flags |= IO_U_F_BUSY_OK;
+				io_u_set(io_u, IO_U_F_BUSY_OK);
 				ret = get_next_seq_offset(td, f, ddir, &offset);
 				if (ret)
 					ret = get_next_rand_block(td, f, ddir, &b);
@@ -336,7 +336,7 @@ static int get_next_block(struct thread_data *td, struct io_u *io_u,
 			ret = get_next_seq_offset(td, f, ddir, &offset);
 		}
 	} else {
-		io_u->flags |= IO_U_F_BUSY_OK;
+		io_u_set(io_u, IO_U_F_BUSY_OK);
 		*is_random = 0;
 
 		if (td->o.rw_seq == RW_SEQ_SEQ) {
@@ -552,7 +552,7 @@ void io_u_quiesce(struct thread_data *td)
 	while (td->io_u_in_flight) {
 		int fio_unused ret;
 
-		ret = io_u_queued_complete(td, 1, NULL);
+		ret = io_u_queued_complete(td, 1);
 	}
 }
 
@@ -591,7 +591,8 @@ static enum fio_ddir rate_ddir(struct thread_data *td, enum fio_ddir ddir)
 	} else
 		usec = td->rate_pending_usleep[ddir];
 
-	io_u_quiesce(td);
+	if (td->o.io_submit_mode == IO_MODE_INLINE)
+		io_u_quiesce(td);
 
 	usec = usec_sleep(td, usec);
 
@@ -684,7 +685,7 @@ static void set_rw_ddir(struct thread_data *td, struct io_u *io_u)
 	    td->o.barrier_blocks &&
 	   !(td->io_issues[DDIR_WRITE] % td->o.barrier_blocks) &&
 	     td->io_issues[DDIR_WRITE])
-		io_u->flags |= IO_U_F_BARRIER;
+		io_u_set(io_u, IO_U_F_BARRIER);
 }
 
 void put_file_log(struct thread_data *td, struct fio_file *f)
@@ -697,16 +698,21 @@ void put_file_log(struct thread_data *td, struct fio_file *f)
 
 void put_io_u(struct thread_data *td, struct io_u *io_u)
 {
+	if (td->parent)
+		td = td->parent;
+
 	td_io_u_lock(td);
 
 	if (io_u->file && !(io_u->flags & IO_U_F_NO_FILE_PUT))
 		put_file_log(td, io_u->file);
 
 	io_u->file = NULL;
-	io_u->flags |= IO_U_F_FREE;
+	io_u_set(io_u, IO_U_F_FREE);
 
-	if (io_u->flags & IO_U_F_IN_CUR_DEPTH)
+	if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
 		td->cur_depth--;
+		assert(!(td->flags & TD_F_CHILD));
+	}
 	io_u_qpush(&td->io_u_freelist, io_u);
 	td_io_u_unlock(td);
 	td_io_u_free_notify(td);
@@ -714,7 +720,7 @@ void put_io_u(struct thread_data *td, struct io_u *io_u)
 
 void clear_io_u(struct thread_data *td, struct io_u *io_u)
 {
-	io_u->flags &= ~IO_U_F_FLIGHT;
+	io_u_clear(io_u, IO_U_F_FLIGHT);
 	put_io_u(td, io_u);
 }
 
@@ -725,18 +731,24 @@ void requeue_io_u(struct thread_data *td, struct io_u **io_u)
 
 	dprint(FD_IO, "requeue %p\n", __io_u);
 
+	if (td->parent)
+		td = td->parent;
+
 	td_io_u_lock(td);
 
-	__io_u->flags |= IO_U_F_FREE;
+	io_u_set(__io_u, IO_U_F_FREE);
 	if ((__io_u->flags & IO_U_F_FLIGHT) && ddir_rw(ddir))
 		td->io_issues[ddir]--;
 
-	__io_u->flags &= ~IO_U_F_FLIGHT;
-	if (__io_u->flags & IO_U_F_IN_CUR_DEPTH)
+	io_u_clear(__io_u, IO_U_F_FLIGHT);
+	if (__io_u->flags & IO_U_F_IN_CUR_DEPTH) {
 		td->cur_depth--;
+		assert(!(td->flags & TD_F_CHILD));
+	}
 
 	io_u_rpush(&td->io_u_requeues, __io_u);
 	td_io_u_unlock(td);
+	td_io_u_free_notify(td);
 	*io_u = NULL;
 }
 
@@ -1329,21 +1341,23 @@ again:
 
 	if (io_u) {
 		assert(io_u->flags & IO_U_F_FREE);
-		io_u->flags &= ~(IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
+		io_u_clear(io_u, IO_U_F_FREE | IO_U_F_NO_FILE_PUT |
 				 IO_U_F_TRIMMED | IO_U_F_BARRIER |
 				 IO_U_F_VER_LIST);
 
 		io_u->error = 0;
 		io_u->acct_ddir = -1;
 		td->cur_depth++;
-		io_u->flags |= IO_U_F_IN_CUR_DEPTH;
+		assert(!(td->flags & TD_F_CHILD));
+		io_u_set(io_u, IO_U_F_IN_CUR_DEPTH);
 		io_u->ipo = NULL;
-	} else if (td->o.verify_async) {
+	} else if (td_async_processing(td)) {
 		/*
 		 * We ran out, wait for async verify threads to finish and
 		 * return one
 		 */
-		pthread_cond_wait(&td->free_cond, &td->io_u_lock);
+		assert(!(td->flags & TD_F_CHILD));
+		assert(!pthread_cond_wait(&td->free_cond, &td->io_u_lock));
 		goto again;
 	}
 
@@ -1542,7 +1556,7 @@ err_put:
 	return ERR_PTR(ret);
 }
 
-void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+static void __io_u_log_error(struct thread_data *td, struct io_u *io_u)
 {
 	enum error_type_bit eb = td_error_type(io_u->ddir, io_u->error);
 
@@ -1560,6 +1574,13 @@ void io_u_log_error(struct thread_data *td, struct io_u *io_u)
 		td_verror(td, io_u->error, "io_u error");
 }
 
+void io_u_log_error(struct thread_data *td, struct io_u *io_u)
+{
+	__io_u_log_error(td, io_u);
+	if (td->parent)
+		__io_u_log_error(td, io_u);
+}
+
 static inline int gtod_reduce(struct thread_data *td)
 {
 	return td->o.disable_clat && td->o.disable_lat && td->o.disable_slat
@@ -1570,9 +1591,10 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u,
 				  struct io_completion_data *icd,
 				  const enum fio_ddir idx, unsigned int bytes)
 {
+	const int no_reduce = !gtod_reduce(td);
 	unsigned long lusec = 0;
 
-	if (!gtod_reduce(td))
+	if (no_reduce)
 		lusec = utime_since(&io_u->issue_time, &icd->time);
 
 	if (!td->o.disable_lat) {
@@ -1601,10 +1623,13 @@ static void account_io_completion(struct thread_data *td, struct io_u *io_u,
 		io_u_mark_latency(td, lusec);
 	}
 
+	if (td->parent)
+		td = td->parent;
+
 	if (!td->o.disable_bw)
 		add_bw_sample(td, idx, bytes, &icd->time);
 
-	if (!gtod_reduce(td))
+	if (no_reduce)
 		add_iops_sample(td, idx, bytes, &icd->time);
 
 	if (td->ts.nr_block_infos && io_u->ddir == DDIR_TRIM) {
@@ -1625,6 +1650,7 @@ static long long usec_for_io(struct thread_data *td, enum fio_ddir ddir)
 {
 	uint64_t secs, remainder, bps, bytes;
 
+	assert(!(td->flags & TD_F_CHILD));
 	bytes = td->this_io_bytes[ddir];
 	bps = td->rate_bps[ddir];
 	secs = bytes / bps;
@@ -1641,9 +1667,8 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
 
 	dprint_io_u(io_u, "io complete");
 
-	td_io_u_lock(td);
 	assert(io_u->flags & IO_U_F_FLIGHT);
-	io_u->flags &= ~(IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
+	io_u_clear(io_u, IO_U_F_FLIGHT | IO_U_F_BUSY_OK);
 
 	/*
 	 * Mark IO ok to verify
@@ -1660,8 +1685,6 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
 		}
 	}
 
-	td_io_u_unlock(td);
-
 	if (ddir_sync(ddir)) {
 		td->last_was_sync = 1;
 		if (f) {
@@ -1706,18 +1729,23 @@ static void io_completed(struct thread_data *td, struct io_u **io_u_ptr,
 
 		if (ramp_time_over(td) && (td->runstate == TD_RUNNING ||
 					   td->runstate == TD_VERIFYING)) {
+			struct thread_data *__td = td;
+
 			account_io_completion(td, io_u, icd, ddir, bytes);
 
-			if (__should_check_rate(td, ddir)) {
-				td->rate_pending_usleep[ddir] =
-					(usec_for_io(td, ddir) -
-					 utime_since_now(&td->start));
+			if (td->parent)
+				__td = td->parent;
+
+			if (__should_check_rate(__td, ddir)) {
+				__td->rate_pending_usleep[ddir] =
+					(usec_for_io(__td, ddir) -
+					 utime_since_now(&__td->start));
 			}
 			if (ddir != DDIR_TRIM &&
-			    __should_check_rate(td, oddir)) {
-				td->rate_pending_usleep[oddir] =
-					(usec_for_io(td, oddir) -
-					 utime_since_now(&td->start));
+			    __should_check_rate(__td, oddir)) {
+				__td->rate_pending_usleep[oddir] =
+					(usec_for_io(__td, oddir) -
+					 utime_since_now(&__td->start));
 			}
 		}
 
@@ -1785,10 +1813,10 @@ static void ios_completed(struct thread_data *td,
 /*
  * Complete a single io_u for the sync engines.
  */
-int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
-		       uint64_t *bytes)
+int io_u_sync_complete(struct thread_data *td, struct io_u *io_u)
 {
 	struct io_completion_data icd;
+	int ddir;
 
 	init_icd(td, &icd, 1);
 	io_completed(td, &io_u, &icd);
@@ -1801,12 +1829,8 @@ int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
 		return -1;
 	}
 
-	if (bytes) {
-		int ddir;
-
-		for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
-			bytes[ddir] += icd.bytes_done[ddir];
-	}
+	for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
+		td->bytes_done[ddir] += icd.bytes_done[ddir];
 
 	return 0;
 }
@@ -1814,12 +1838,11 @@ int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
 /*
  * Called to complete min_events number of io for the async engines.
  */
-int io_u_queued_complete(struct thread_data *td, int min_evts,
-			 uint64_t *bytes)
+int io_u_queued_complete(struct thread_data *td, int min_evts)
 {
 	struct io_completion_data icd;
 	struct timespec *tvp = NULL;
-	int ret;
+	int ret, ddir;
 	struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
 
 	dprint(FD_IO, "io_u_queued_completed: min=%d\n", min_evts);
@@ -1843,12 +1866,8 @@ int io_u_queued_complete(struct thread_data *td, int min_evts,
 		return -1;
 	}
 
-	if (bytes) {
-		int ddir;
-
-		for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
-			bytes[ddir] += icd.bytes_done[ddir];
-	}
+	for (ddir = DDIR_READ; ddir < DDIR_RWDIR_CNT; ddir++)
+		td->bytes_done[ddir] += icd.bytes_done[ddir];
 
 	return 0;
 }
diff --git a/ioengine.h b/ioengine.h
index f9a0235..3d49993 100644
--- a/ioengine.h
+++ b/ioengine.h
@@ -119,6 +119,7 @@ struct io_u {
 		struct ibv_mr *mr;
 #endif
 		void *mmap_data;
+		uint64_t null;
 	};
 };
 
@@ -209,8 +210,8 @@ extern struct io_u *get_io_u(struct thread_data *);
 extern void put_io_u(struct thread_data *, struct io_u *);
 extern void clear_io_u(struct thread_data *, struct io_u *);
 extern void requeue_io_u(struct thread_data *, struct io_u **);
-extern int __must_check io_u_sync_complete(struct thread_data *, struct io_u *, uint64_t *);
-extern int __must_check io_u_queued_complete(struct thread_data *, int, uint64_t *);
+extern int __must_check io_u_sync_complete(struct thread_data *, struct io_u *);
+extern int __must_check io_u_queued_complete(struct thread_data *, int);
 extern void io_u_queued(struct thread_data *, struct io_u *);
 extern void io_u_quiesce(struct thread_data *);
 extern void io_u_log_error(struct thread_data *, struct io_u *);
@@ -251,4 +252,14 @@ static inline enum fio_ddir acct_ddir(struct io_u *io_u)
 	return io_u->ddir;
 }
 
+static inline void io_u_clear(struct io_u *io_u, unsigned int flags)
+{
+	__sync_fetch_and_and(&io_u->flags, ~flags);
+}
+
+static inline void io_u_set(struct io_u *io_u, unsigned int flags)
+{
+	__sync_fetch_and_or(&io_u->flags, flags);
+}
+
 #endif
diff --git a/ioengines.c b/ioengines.c
index b42e2c4..b724e0e 100644
--- a/ioengines.c
+++ b/ioengines.c
@@ -264,13 +264,15 @@ out:
 
 int td_io_queue(struct thread_data *td, struct io_u *io_u)
 {
+	const enum fio_ddir ddir = acct_ddir(io_u);
+	unsigned long buflen = io_u->xfer_buflen;
 	int ret;
 
 	dprint_io_u(io_u, "queue");
 	fio_ro_check(td, io_u);
 
 	assert((io_u->flags & IO_U_F_FLIGHT) == 0);
-	io_u->flags |= IO_U_F_FLIGHT;
+	io_u_set(io_u, IO_U_F_FLIGHT);
 
 	assert(fio_file_open(io_u->file));
 
@@ -294,18 +296,18 @@ int td_io_queue(struct thread_data *td, struct io_u *io_u)
 					sizeof(struct timeval));
 	}
 
-	if (ddir_rw(acct_ddir(io_u))) {
-		td->io_issues[acct_ddir(io_u)]++;
-		td->io_issue_bytes[acct_ddir(io_u)] += io_u->xfer_buflen;
+	if (ddir_rw(ddir)) {
+		td->io_issues[ddir]++;
+		td->io_issue_bytes[ddir] += buflen;
 	}
 
 	ret = td->io_ops->queue(td, io_u);
 
 	unlock_file(td, io_u->file);
 
-	if (ret == FIO_Q_BUSY && ddir_rw(acct_ddir(io_u))) {
-		td->io_issues[acct_ddir(io_u)]--;
-		td->io_issue_bytes[acct_ddir(io_u)] -= io_u->xfer_buflen;
+	if (ret == FIO_Q_BUSY && ddir_rw(ddir)) {
+		td->io_issues[ddir]--;
+		td->io_issue_bytes[ddir] -= buflen;
 	}
 
 	/*
diff --git a/libfio.c b/libfio.c
index 57ce725..ed26114 100644
--- a/libfio.c
+++ b/libfio.c
@@ -88,6 +88,7 @@ static void reset_io_counters(struct thread_data *td)
 		td->this_io_blocks[ddir] = 0;
 		td->rate_bytes[ddir] = 0;
 		td->rate_blocks[ddir] = 0;
+		td->bytes_done[ddir] = 0;
 	}
 	td->zone_bytes = 0;
 
diff --git a/options.c b/options.c
index 5fbad31..3de1248 100644
--- a/options.c
+++ b/options.c
@@ -1623,6 +1623,26 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
 		.group	= FIO_OPT_G_IO_BASIC,
 	},
 	{
+		.name	= "io_submit_mode",
+		.lname	= "IO submit mode",
+		.type	= FIO_OPT_STR,
+		.off1	= td_var_offset(io_submit_mode),
+		.help	= "How IO submissions and completions are done",
+		.def	= "inline",
+		.category = FIO_OPT_C_IO,
+		.group	= FIO_OPT_G_IO_BASIC,
+		.posval = {
+			  { .ival = "inline",
+			    .oval = IO_MODE_INLINE,
+			    .help = "Submit and complete IO inline",
+			  },
+			  { .ival = "offload",
+			    .oval = IO_MODE_OFFLOAD,
+			    .help = "Offload submit and complete to threads",
+			  },
+		},
+	},
+	{
 		.name	= "size",
 		.lname	= "Size",
 		.type	= FIO_OPT_STR_VAL,
diff --git a/stat.c b/stat.c
index e42edc9..d143d36 100644
--- a/stat.c
+++ b/stat.c
@@ -2009,6 +2009,8 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
 	if (spent < td->o.bw_avg_time)
 		return;
 
+	td_io_u_lock(td);
+
 	/*
 	 * Compute both read and write rates for the interval.
 	 */
@@ -2033,6 +2035,7 @@ void add_bw_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
 	}
 
 	fio_gettime(&td->bw_sample_time, NULL);
+	td_io_u_unlock(td);
 }
 
 void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs,
@@ -2048,6 +2051,8 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs
 	if (spent < td->o.iops_avg_time)
 		return;
 
+	td_io_u_lock(td);
+
 	/*
 	 * Compute both read and write rates for the interval.
 	 */
@@ -2072,6 +2077,7 @@ void add_iops_sample(struct thread_data *td, enum fio_ddir ddir, unsigned int bs
 	}
 
 	fio_gettime(&td->iops_sample_time, NULL);
+	td_io_u_unlock(td);
 }
 
 void stat_init(void)
diff --git a/thread_options.h b/thread_options.h
index 026b85b..aa7f3f2 100644
--- a/thread_options.h
+++ b/thread_options.h
@@ -223,6 +223,7 @@ struct thread_options {
 	unsigned int rate[DDIR_RWDIR_CNT];
 	unsigned int ratemin[DDIR_RWDIR_CNT];
 	unsigned int ratecycle;
+	unsigned int io_submit_mode;
 	unsigned int rate_iops[DDIR_RWDIR_CNT];
 	unsigned int rate_iops_min[DDIR_RWDIR_CNT];
 
@@ -452,6 +453,7 @@ struct thread_options_pack {
 	uint32_t rate[DDIR_RWDIR_CNT];
 	uint32_t ratemin[DDIR_RWDIR_CNT];
 	uint32_t ratecycle;
+	uint32_t io_submit_mode;
 	uint32_t rate_iops[DDIR_RWDIR_CNT];
 	uint32_t rate_iops_min[DDIR_RWDIR_CNT];
 
@@ -489,7 +491,6 @@ struct thread_options_pack {
 
 	uint64_t latency_target;
 	uint64_t latency_window;
-	uint32_t pad3;
 	fio_fp64_t latency_percentile;
 
 	uint32_t block_error_hist;
diff --git a/verify.c b/verify.c
index b6793d7..aa178e9 100644
--- a/verify.c
+++ b/verify.c
@@ -656,7 +656,7 @@ int verify_io_u_async(struct thread_data *td, struct io_u **io_u_ptr)
 
 	if (io_u->flags & IO_U_F_IN_CUR_DEPTH) {
 		td->cur_depth--;
-		io_u->flags &= ~IO_U_F_IN_CUR_DEPTH;
+		io_u_clear(io_u, IO_U_F_IN_CUR_DEPTH);
 	}
 	flist_add_tail(&io_u->verify_list, &td->verify_list);
 	*io_u_ptr = NULL;
@@ -1105,10 +1105,10 @@ int get_next_verify(struct thread_data *td, struct io_u *io_u)
 		io_u->buflen = ipo->len;
 		io_u->numberio = ipo->numberio;
 		io_u->file = ipo->file;
-		io_u->flags |= IO_U_F_VER_LIST;
+		io_u_set(io_u, IO_U_F_VER_LIST);
 
 		if (ipo->flags & IP_F_TRIMMED)
-			io_u->flags |= IO_U_F_TRIMMED;
+			io_u_set(io_u, IO_U_F_TRIMMED);
 
 		if (!fio_file_open(io_u->file)) {
 			int r = td_io_open_file(td, io_u->file);
@@ -1192,7 +1192,7 @@ static void *verify_async_thread(void *data)
 			io_u = flist_first_entry(&list, struct io_u, verify_list);
 			flist_del_init(&io_u->verify_list);
 
-			io_u->flags |= IO_U_F_NO_FILE_PUT;
+			io_u_set(io_u, IO_U_F_NO_FILE_PUT);
 			ret = verify_io_u(td, &io_u);
 
 			put_io_u(td, io_u);
diff --git a/workqueue.c b/workqueue.c
new file mode 100644
index 0000000..92088ba
--- /dev/null
+++ b/workqueue.c
@@ -0,0 +1,444 @@
+/*
+ * Rated submission helpers
+ *
+ * Copyright (C) 2015 Jens Axboe <axboe@xxxxxxxxx>
+ *
+ */
+#include <unistd.h>
+
+#include "fio.h"
+#include "ioengine.h"
+#include "flist.h"
+#include "workqueue.h"
+#include "lib/getrusage.h"
+
+struct submit_worker {
+	pthread_t thread;
+	pthread_mutex_t lock;
+	pthread_cond_t cond;
+	struct flist_head work_list;
+	unsigned int flags;
+	unsigned int index;
+	uint64_t seq;
+	struct workqueue *wq;
+	struct thread_data td;
+};
+
+enum {
+	SW_F_IDLE	= 1 << 0,
+	SW_F_RUNNING	= 1 << 1,
+	SW_F_EXIT	= 1 << 2,
+	SW_F_EXITED	= 1 << 3,
+	SW_F_ACCOUNTED	= 1 << 4,
+	SW_F_ERROR	= 1 << 5,
+};
+
+static struct submit_worker *__get_submit_worker(struct workqueue *wq,
+						 unsigned int start,
+						 unsigned int end,
+						 struct submit_worker **best)
+{
+	struct submit_worker *sw = NULL;
+
+	while (start <= end) {
+		sw = &wq->workers[start];
+		if (sw->flags & SW_F_IDLE)
+			return sw;
+		if (!(*best) || sw->seq < (*best)->seq)
+			*best = sw;
+		start++;
+	}
+
+	return NULL;
+}
+
+static struct submit_worker *get_submit_worker(struct workqueue *wq)
+{
+	unsigned int next = wq->next_free_worker;
+	struct submit_worker *sw, *best = NULL;
+
+	assert(next < wq->max_workers);
+
+	sw = __get_submit_worker(wq, next, wq->max_workers - 1, &best);
+	if (!sw && next)
+		sw = __get_submit_worker(wq, 0, next - 1, &best);
+
+	/*
+	 * No truly idle found, use best match
+	 */
+	if (!sw)
+		sw = best;
+
+	if (sw->index == wq->next_free_worker) {
+		if (sw->index + 1 < wq->max_workers)
+			wq->next_free_worker = sw->index + 1;
+		else
+			wq->next_free_worker = 0;
+	}
+
+	return sw;
+}
+
+static int all_sw_idle(struct workqueue *wq)
+{
+	int i;
+
+	for (i = 0; i < wq->max_workers; i++) {
+		struct submit_worker *sw = &wq->workers[i];
+
+		if (!(sw->flags & SW_F_IDLE))
+			return 0;
+	}
+
+	return 1;
+}
+
+/*
+ * Must be serialized wrt workqueue_enqueue() by caller
+ */
+void workqueue_flush(struct workqueue *wq)
+{
+	wq->wake_idle = 1;
+
+	while (!all_sw_idle(wq)) {
+		pthread_mutex_lock(&wq->flush_lock);
+		pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+		pthread_mutex_unlock(&wq->flush_lock);
+	}
+
+	wq->wake_idle = 0;
+}
+
+/*
+ * Must be serialized by caller.
+ */
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u)
+{
+	struct submit_worker *sw;
+
+	sw = get_submit_worker(wq);
+	if (sw) {
+		const enum fio_ddir ddir = acct_ddir(io_u);
+		struct thread_data *parent = wq->td;
+
+		if (ddir_rw(ddir)) {
+			parent->io_issues[ddir]++;
+			parent->io_issue_bytes[ddir] += io_u->xfer_buflen;
+		}
+
+		pthread_mutex_lock(&sw->lock);
+		flist_add_tail(&io_u->verify_list, &sw->work_list);
+		sw->seq = ++wq->work_seq;
+		sw->flags &= ~SW_F_IDLE;
+		pthread_mutex_unlock(&sw->lock);
+
+		pthread_cond_signal(&sw->cond);
+		return FIO_Q_QUEUED;
+	}
+
+	return FIO_Q_BUSY;
+}
+
+static void handle_list(struct submit_worker *sw, struct flist_head *list)
+{
+	struct workqueue *wq = sw->wq;
+	struct io_u *io_u;
+
+	while (!flist_empty(list)) {
+		io_u = flist_first_entry(list, struct io_u, verify_list);
+		flist_del_init(&io_u->verify_list);
+		wq->fn(&sw->td, io_u);
+	}
+}
+
+static int init_submit_worker(struct submit_worker *sw)
+{
+	struct thread_data *parent = sw->wq->td;
+	struct thread_data *td = &sw->td;
+	int fio_unused ret;
+
+	memcpy(&td->o, &parent->o, sizeof(td->o));
+	memcpy(&td->ts, &parent->ts, sizeof(td->ts));
+	td->o.uid = td->o.gid = -1U;
+	dup_files(td, parent);
+	fio_options_mem_dupe(td);
+
+	if (ioengine_load(td))
+		goto err;
+
+	if (td->o.odirect)
+		td->io_ops->flags |= FIO_RAWIO;
+
+	td->pid = gettid();
+
+	INIT_FLIST_HEAD(&td->io_log_list);
+	INIT_FLIST_HEAD(&td->io_hist_list);
+	INIT_FLIST_HEAD(&td->verify_list);
+	INIT_FLIST_HEAD(&td->trim_list);
+	INIT_FLIST_HEAD(&td->next_rand_list);
+	td->io_hist_tree = RB_ROOT;
+
+	td->o.iodepth = 1;
+	if (td_io_init(td))
+		goto err_io_init;
+
+	fio_gettime(&td->epoch, NULL);
+	fio_getrusage(&td->ru_start);
+	clear_io_state(td);
+
+	td_set_runstate(td, TD_RUNNING);
+	td->flags |= TD_F_CHILD;
+	td->parent = parent;
+	return 0;
+
+err_io_init:
+	close_ioengine(td);
+err:
+	return 1;
+}
+
+static void sum_val(uint64_t *dst, uint64_t *src)
+{
+	if (*src) {
+		__sync_fetch_and_add(dst, *src);
+		*src = 0;
+	}
+}
+
+static void sum_ddir(struct thread_data *dst, struct thread_data *src,
+		     enum fio_ddir ddir)
+{
+	sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]);
+	sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]);
+	sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]);
+	sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]);
+	sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]);
+}
+
+static void update_accounting(struct submit_worker *sw)
+{
+	struct thread_data *src = &sw->td;
+	struct thread_data *dst = sw->wq->td;
+
+	if (td_read(src))
+		sum_ddir(dst, src, DDIR_READ);
+	if (td_write(src))
+		sum_ddir(dst, src, DDIR_WRITE);
+	if (td_trim(src))
+		sum_ddir(dst, src, DDIR_TRIM);
+}
+
+static void *worker_thread(void *data)
+{
+	struct submit_worker *sw = data;
+	struct workqueue *wq = sw->wq;
+	struct thread_data *td = &sw->td;
+	unsigned int eflags = 0, ret;
+	FLIST_HEAD(local_list);
+
+	ret = init_submit_worker(sw);
+	pthread_mutex_lock(&sw->lock);
+	sw->flags |= SW_F_RUNNING;
+	if (ret)
+		sw->flags |= SW_F_ERROR;
+	pthread_mutex_unlock(&sw->lock);
+
+	pthread_mutex_lock(&wq->flush_lock);
+	pthread_cond_signal(&wq->flush_cond);
+	pthread_mutex_unlock(&wq->flush_lock);
+
+	if (sw->flags & SW_F_ERROR)
+		goto done;
+
+	while (1) {
+		pthread_mutex_lock(&sw->lock);
+
+		if (flist_empty(&sw->work_list)) {
+			if (sw->flags & SW_F_EXIT) {
+				pthread_mutex_unlock(&sw->lock);
+				break;
+			}
+
+			if (td->io_u_queued || td->cur_depth ||
+			    td->io_u_in_flight) {
+				pthread_mutex_unlock(&sw->lock);
+				io_u_quiesce(td);
+				pthread_mutex_lock(&sw->lock);
+			}
+
+			/*
+			 * We dropped and reaquired the lock, check
+			 * state again.
+			 */
+			if (!flist_empty(&sw->work_list))
+				goto handle_work;
+
+			if (sw->flags & SW_F_EXIT) {
+				pthread_mutex_unlock(&sw->lock);
+				break;
+			} else if (!(sw->flags & SW_F_IDLE)) {
+				sw->flags |= SW_F_IDLE;
+				wq->next_free_worker = sw->index;
+				if (wq->wake_idle)
+					pthread_cond_signal(&wq->flush_cond);
+			}
+			update_accounting(sw);
+			pthread_cond_wait(&sw->cond, &sw->lock);
+		} else {
+handle_work:
+			flist_splice_init(&sw->work_list, &local_list);
+		}
+		pthread_mutex_unlock(&sw->lock);
+		handle_list(sw, &local_list);
+	}
+
+	update_accounting(sw);
+
+done:
+	pthread_mutex_lock(&sw->lock);
+	sw->flags |= (SW_F_EXITED | eflags);
+	pthread_mutex_unlock(&sw->lock);
+	return NULL;
+}
+
+static void free_worker(struct submit_worker *sw)
+{
+	struct thread_data *td = &sw->td;
+
+	fio_options_free(td);
+	close_and_free_files(td);
+	if (td->io_ops)
+		close_ioengine(td);
+	td_set_runstate(td, TD_EXITED);
+
+	pthread_cond_destroy(&sw->cond);
+	pthread_mutex_destroy(&sw->lock);
+}
+
+static void shutdown_worker(struct submit_worker *sw, unsigned int *sum_cnt)
+{
+	struct thread_data *parent = sw->wq->td;
+
+	pthread_join(sw->thread, NULL);
+	(*sum_cnt)++;
+	sum_thread_stats(&parent->ts, &sw->td.ts, *sum_cnt);
+	free_worker(sw);
+}
+
+void workqueue_exit(struct workqueue *wq)
+{
+	unsigned int shutdown, sum_cnt = 0;
+	struct submit_worker *sw;
+	int i;
+
+	for (i = 0; i < wq->max_workers; i++) {
+		sw = &wq->workers[i];
+
+		pthread_mutex_lock(&sw->lock);
+		sw->flags |= SW_F_EXIT;
+		pthread_cond_signal(&sw->cond);
+		pthread_mutex_unlock(&sw->lock);
+	}
+
+	do {
+		shutdown = 0;
+		for (i = 0; i < wq->max_workers; i++) {
+			sw = &wq->workers[i];
+			if (sw->flags & SW_F_ACCOUNTED)
+				continue;
+			sw->flags |= SW_F_ACCOUNTED;
+			shutdown_worker(sw, &sum_cnt);
+			shutdown++;
+		}
+	} while (shutdown && shutdown != wq->max_workers);
+
+	free(wq->workers);
+	pthread_mutex_destroy(&wq->flush_lock);
+	pthread_cond_destroy(&wq->flush_cond);
+}
+
+static int start_worker(struct workqueue *wq, unsigned int index)
+{
+	struct submit_worker *sw = &wq->workers[index];
+	int ret;
+
+	INIT_FLIST_HEAD(&sw->work_list);
+	pthread_cond_init(&sw->cond, NULL);
+	pthread_mutex_init(&sw->lock, NULL);
+	sw->wq = wq;
+	sw->index = index;
+
+	ret = pthread_create(&sw->thread, NULL, worker_thread, sw);
+	if (!ret) {
+		pthread_mutex_lock(&sw->lock);
+		sw->flags = SW_F_IDLE;
+		pthread_mutex_unlock(&sw->lock);
+		return 0;
+	}
+
+	free_worker(sw);
+	return 1;
+}
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq,
+		   workqueue_fn *fn, unsigned max_pending)
+{
+	unsigned int running;
+	int i, error;
+
+	wq->max_workers = max_pending;
+	wq->td = td;
+	wq->fn = fn;
+	wq->work_seq = 0;
+	wq->next_free_worker = 0;
+	pthread_cond_init(&wq->flush_cond, NULL);
+	pthread_mutex_init(&wq->flush_lock, NULL);
+
+	wq->workers = calloc(wq->max_workers, sizeof(struct submit_worker));
+
+	for (i = 0; i < wq->max_workers; i++)
+		if (start_worker(wq, i))
+			break;
+
+	wq->max_workers = i;
+	if (!wq->max_workers) {
+err:
+		log_err("Can't create rate workqueue\n");
+		td_verror(td, ESRCH, "workqueue_init");
+		workqueue_exit(wq);
+		return 1;
+	}
+
+	/*
+	 * Wait for them all to be started and initialized
+	 */
+	error = 0;
+	do {
+		struct submit_worker *sw;
+
+		running = 0;
+		pthread_mutex_lock(&wq->flush_lock);
+		for (i = 0; i < wq->max_workers; i++) {
+			sw = &wq->workers[i];
+			pthread_mutex_lock(&sw->lock);
+			if (sw->flags & SW_F_RUNNING)
+				running++;
+			if (sw->flags & SW_F_ERROR)
+				error++;
+			pthread_mutex_unlock(&sw->lock);
+		}
+
+		if (error || running == wq->max_workers) {
+			pthread_mutex_unlock(&wq->flush_lock);
+			break;
+		}
+
+		pthread_cond_wait(&wq->flush_cond, &wq->flush_lock);
+		pthread_mutex_unlock(&wq->flush_lock);
+	} while (1);
+
+	if (error)
+		goto err;
+
+	return 0;
+}
diff --git a/workqueue.h b/workqueue.h
new file mode 100644
index 0000000..5d47a5e
--- /dev/null
+++ b/workqueue.h
@@ -0,0 +1,29 @@
+#ifndef FIO_RATE_H
+#define FIO_RATE_H
+
+#include "flist.h"
+
+typedef void (workqueue_fn)(struct thread_data *, struct io_u *);
+
+struct workqueue {
+	unsigned int max_workers;
+
+	struct thread_data *td;
+	workqueue_fn *fn;
+
+	uint64_t work_seq;
+	struct submit_worker *workers;
+	unsigned int next_free_worker;
+
+	pthread_cond_t flush_cond;
+	pthread_mutex_t flush_lock;
+	volatile int wake_idle;
+};
+
+int workqueue_init(struct thread_data *td, struct workqueue *wq, workqueue_fn *fn, unsigned int max_workers);
+void workqueue_exit(struct workqueue *wq);
+
+int workqueue_enqueue(struct workqueue *wq, struct io_u *io_u);
+void workqueue_flush(struct workqueue *wq);
+
+#endif
--
To unsubscribe from this list: send the line "unsubscribe fio" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux Kernel]     [Linux SCSI]     [Linux IDE]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux SCSI]

  Powered by Linux