The following changes since commit 856b09c838bfd45a9d6d87a1ab03458c9b058d56: Merge branch 'rand-map' (2012-11-29 21:56:06 +0100) are available in the git repository at: git://git.kernel.dk/fio.git master Jens Axboe (7): net: sent udp open messages verify: treat as failure if given verify type is different from media net: add basic ping/pong type workload support net: fix receiver start time Cache layout improvements Change preferred default clocksource to gettimeofday() Wire up SIGUSR2 to kill blocking threads backend.c | 2 +- engines/net.c | 165 +++++++++++++++++++++++++++++++++++++++++++++------------ fio.h | 13 ++++- init.c | 20 +++++++ io_u.c | 42 +++++++++----- ioengine.h | 22 ++++---- libfio.c | 8 ++-- os/os.h | 2 +- profile.c | 4 +- verify.c | 12 ++++- 10 files changed, 220 insertions(+), 70 deletions(-) --- Diff of recent changes: diff --git a/backend.c b/backend.c index f901503..faa861c 100644 --- a/backend.c +++ b/backend.c @@ -1385,7 +1385,7 @@ static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, if (WIFSIGNALED(status)) { int sig = WTERMSIG(status); - if (sig != SIGTERM) + if (sig != SIGTERM && sig != SIGUSR2) log_err("fio: pid=%d, got signal=%d\n", (int) td->pid, sig); td->sig = sig; diff --git a/engines/net.c b/engines/net.c index 373821b..81e173c 100644 --- a/engines/net.c +++ b/engines/net.c @@ -33,6 +33,7 @@ struct netio_options { unsigned int port; unsigned int proto; unsigned int listen; + unsigned int pingpong; }; struct udp_close_msg { @@ -42,7 +43,8 @@ struct udp_close_msg { enum { FIO_LINK_CLOSE = 0x89, - FIO_LINK_CLOSE_MAGIC = 0x6c696e6b, + FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b, + FIO_LINK_OPEN = 0x98, FIO_TYPE_TCP = 1, FIO_TYPE_UDP = 2, @@ -94,6 +96,12 @@ static struct fio_option options[] = { .help = "Listen for incoming TCP connections", }, { + .name = "pingpong", + .type = FIO_OPT_STR_SET, + .off1 = offsetof(struct netio_options, pingpong), + .help = "Ping-pong IO requests", + }, + { .name = NULL, }, }; @@ -287,7 +295,7 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) { struct netio_data *nd = td->io_ops->data; struct netio_options *o = td->eo; - int ret, flags = OS_MSG_DONTWAIT; + int ret, flags = 0; do { if (o->proto == FIO_TYPE_UDP) { @@ -301,8 +309,8 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) * if we are going to write more, set MSG_MORE */ #ifdef MSG_MORE - if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < - td->o.size) + if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < + td->o.size) && !o->pingpong) flags |= MSG_MORE; #endif ret = send(io_u->file->fd, io_u->xfer_buf, @@ -314,8 +322,6 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) ret = poll_wait(td, io_u->file->fd, POLLOUT); if (ret <= 0) break; - - flags &= ~OS_MSG_DONTWAIT; } while (1); return ret; @@ -329,7 +335,7 @@ static int is_udp_close(struct io_u *io_u, int len) return 0; msg = io_u->xfer_buf; - if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC) + if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) return 0; if (ntohl(msg->cmd) != FIO_LINK_CLOSE) return 0; @@ -341,7 +347,7 @@ static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) { struct netio_data *nd = td->io_ops->data; struct netio_options *o = td->eo; - int ret, flags = OS_MSG_DONTWAIT; + int ret, flags = 0; do { if (o->proto == FIO_TYPE_UDP) { @@ -366,28 +372,26 @@ static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) ret = poll_wait(td, io_u->file->fd, POLLIN); if (ret <= 0) break; - flags &= ~OS_MSG_DONTWAIT; flags |= MSG_WAITALL; } while (1); return ret; } -static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) +static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, + enum fio_ddir ddir) { struct netio_data *nd = td->io_ops->data; struct netio_options *o = td->eo; int ret; - fio_ro_check(td, io_u); - - if (io_u->ddir == DDIR_WRITE) { + if (ddir == DDIR_WRITE) { if (!nd->use_splice || o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UNIX) ret = fio_netio_send(td, io_u); else ret = fio_netio_splice_out(td, io_u); - } else if (io_u->ddir == DDIR_READ) { + } else if (ddir == DDIR_READ) { if (!nd->use_splice || o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_UNIX) ret = fio_netio_recv(td, io_u); @@ -404,7 +408,7 @@ static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) } else { int err = errno; - if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE) + if (ddir == DDIR_WRITE && err == EMSGSIZE) return FIO_Q_BUSY; io_u->error = err; @@ -417,6 +421,28 @@ static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) return FIO_Q_COMPLETED; } +static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) +{ + struct netio_options *o = td->eo; + int ret; + + fio_ro_check(td, io_u); + + ret = __fio_netio_queue(td, io_u, io_u->ddir); + if (!o->pingpong || ret != FIO_Q_COMPLETED) + return ret; + + /* + * For ping-pong mode, receive or send reply as needed + */ + if (td_read(td) && io_u->ddir == DDIR_READ) + ret = __fio_netio_queue(td, io_u, DDIR_WRITE); + else if (td_write(td) && io_u->ddir == DDIR_WRITE) + ret = __fio_netio_queue(td, io_u, DDIR_READ); + + return ret; +} + static int fio_netio_connect(struct thread_data *td, struct fio_file *f) { struct netio_data *nd = td->io_ops->data; @@ -496,6 +522,7 @@ static int fio_netio_accept(struct thread_data *td, struct fio_file *f) goto err; } + reset_all_stats(td); td_set_runstate(td, state); return 0; err: @@ -503,21 +530,6 @@ err: return 1; } -static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) -{ - int ret; - struct netio_options *o = td->eo; - - if (o->listen) - ret = fio_netio_accept(td, f); - else - ret = fio_netio_connect(td, f); - - if (ret) - f->fd = -1; - return ret; -} - static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) { struct netio_data *nd = td->io_ops->data; @@ -525,7 +537,7 @@ static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) struct sockaddr *to = (struct sockaddr *) &nd->addr; int ret; - msg.magic = htonl(FIO_LINK_CLOSE_MAGIC); + msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); msg.cmd = htonl(FIO_LINK_CLOSE); ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, @@ -548,6 +560,84 @@ static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) return generic_close_file(td, f); } +static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) +{ + struct netio_data *nd = td->io_ops->data; + struct udp_close_msg msg; + struct sockaddr *to = (struct sockaddr *) &nd->addr; + fio_socklen_t len = sizeof(nd->addr); + int ret; + + ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len); + if (ret < 0) { + td_verror(td, errno, "sendto udp link open"); + return ret; + } + + if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC || + ntohl(msg.cmd) != FIO_LINK_OPEN) { + log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic), + ntohl(msg.cmd)); + return -1; + } + + return 0; +} + +static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f) +{ + struct netio_data *nd = td->io_ops->data; + struct udp_close_msg msg; + struct sockaddr *to = (struct sockaddr *) &nd->addr; + int ret; + + msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); + msg.cmd = htonl(FIO_LINK_OPEN); + + ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, + sizeof(nd->addr)); + if (ret < 0) { + td_verror(td, errno, "sendto udp link open"); + return ret; + } + + return 0; +} + +static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) +{ + int ret; + struct netio_options *o = td->eo; + + if (o->listen) + ret = fio_netio_accept(td, f); + else + ret = fio_netio_connect(td, f); + + if (ret) { + f->fd = -1; + return ret; + } + + if (o->proto == FIO_TYPE_UDP) { + if (td_write(td)) + ret = fio_netio_udp_send_open(td, f); + else { + int state; + + state = td->runstate; + td_set_runstate(td, TD_SETTING_UP); + ret = fio_netio_udp_recv_open(td, f); + td_set_runstate(td, state); + } + } + + if (ret) + fio_netio_close_file(td, f); + + return ret; +} + static int fio_netio_setup_connect_inet(struct thread_data *td, const char *host, unsigned short port) { @@ -791,6 +881,11 @@ static int fio_netio_setup(struct thread_data *td) return 0; } +static void fio_netio_terminate(struct thread_data *td) +{ + kill(td->pid, SIGUSR2); +} + #ifdef FIO_HAVE_SPLICE static int fio_netio_setup_splice(struct thread_data *td) { @@ -819,11 +914,12 @@ static struct ioengine_ops ioengine_splice = { .init = fio_netio_init, .cleanup = fio_netio_cleanup, .open_file = fio_netio_open_file, - .close_file = generic_close_file, + .close_file = fio_netio_close_file, + .terminate = fio_netio_terminate, .options = options, .option_struct_size = sizeof(struct netio_options), .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | - FIO_SIGTERM | FIO_PIPEIO, + FIO_PIPEIO, }; #endif @@ -837,10 +933,11 @@ static struct ioengine_ops ioengine_rw = { .cleanup = fio_netio_cleanup, .open_file = fio_netio_open_file, .close_file = fio_netio_close_file, + .terminate = fio_netio_terminate, .options = options, .option_struct_size = sizeof(struct netio_options), .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | - FIO_SIGTERM | FIO_PIPEIO, + FIO_PIPEIO, }; static int str_hostname_cb(void *data, const char *input) diff --git a/fio.h b/fio.h index 0100e3d..58364aa 100644 --- a/fio.h +++ b/fio.h @@ -309,11 +309,22 @@ struct thread_options { unsigned int sync_file_range; }; +enum { + TD_F_VER_BACKLOG = 1, + TD_F_TRIM_BACKLOG = 2, + TD_F_READ_IOLOG = 4, + TD_F_REFILL_BUFFERS = 8, + TD_F_SCRAMBLE_BUFFERS = 16, + TD_F_VER_NONE = 32, + TD_F_PROFILE_OPS = 64, +}; + /* * This describes a single thread/process executing a fio job. */ struct thread_data { struct thread_options o; + unsigned long flags; void *eo; char verror[FIO_VERROR_SIZE]; pthread_t thread; @@ -683,8 +694,8 @@ enum { TD_NOT_CREATED = 0, TD_CREATED, TD_INITIALIZED, - TD_SETTING_UP, TD_RAMP, + TD_SETTING_UP, TD_RUNNING, TD_PRE_READING, TD_VERIFYING, diff --git a/init.c b/init.c index 29a50f2..563dcb7 100644 --- a/init.c +++ b/init.c @@ -767,6 +767,24 @@ int ioengine_load(struct thread_data *td) return 0; } +static void init_flags(struct thread_data *td) +{ + struct thread_options *o = &td->o; + + if (o->verify_backlog) + td->flags |= TD_F_VER_BACKLOG; + if (o->trim_backlog) + td->flags |= TD_F_TRIM_BACKLOG; + if (o->read_iolog_file) + td->flags |= TD_F_READ_IOLOG; + if (o->refill_buffers) + td->flags |= TD_F_REFILL_BUFFERS; + if (o->scramble_buffers) + td->flags |= TD_F_SCRAMBLE_BUFFERS; + if (o->verify != VERIFY_NONE) + td->flags |= TD_F_VER_NONE; +} + /* * Adds a job to the list of things todo. Sanitizes the various options * to make sure we don't have conflicts, and initializes various @@ -787,6 +805,8 @@ static int add_job(struct thread_data *td, const char *jobname, int job_add_num) if (td == &def_thread) return 0; + init_flags(td); + /* * if we are just dumping the output command line, don't add the job */ diff --git a/io_u.c b/io_u.c index 006f2c9..91d1290 100644 --- a/io_u.c +++ b/io_u.c @@ -290,10 +290,12 @@ static int __get_next_offset(struct thread_data *td, struct io_u *io_u) static int get_next_offset(struct thread_data *td, struct io_u *io_u) { - struct prof_io_ops *ops = &td->prof_io_ops; + if (td->flags & TD_F_PROFILE_OPS) { + struct prof_io_ops *ops = &td->prof_io_ops; - if (ops->fill_io_u_off) - return ops->fill_io_u_off(td, io_u); + if (ops->fill_io_u_off) + return ops->fill_io_u_off(td, io_u); + } return __get_next_offset(td, io_u); } @@ -368,10 +370,12 @@ static unsigned int __get_next_buflen(struct thread_data *td, struct io_u *io_u) static unsigned int get_next_buflen(struct thread_data *td, struct io_u *io_u) { - struct prof_io_ops *ops = &td->prof_io_ops; + if (td->flags & TD_F_PROFILE_OPS) { + struct prof_io_ops *ops = &td->prof_io_ops; - if (ops->fill_io_u_size) - return ops->fill_io_u_size(td, io_u); + if (ops->fill_io_u_size) + return ops->fill_io_u_size(td, io_u); + } return __get_next_buflen(td, io_u); } @@ -960,10 +964,12 @@ out: static struct fio_file *get_next_file(struct thread_data *td) { - struct prof_io_ops *ops = &td->prof_io_ops; + if (!(td->flags & TD_F_PROFILE_OPS)) { + struct prof_io_ops *ops = &td->prof_io_ops; - if (ops->get_next_file) - return ops->get_next_file(td); + if (ops->get_next_file) + return ops->get_next_file(td); + } return __get_next_file(td); } @@ -1040,7 +1046,10 @@ again: static int check_get_trim(struct thread_data *td, struct io_u *io_u) { - if (td->o.trim_backlog && td->trim_entries) { + if (!(td->flags & TD_F_TRIM_BACKLOG)) + return 0; + + if (td->trim_entries) { int get_trim = 0; if (td->trim_batch) { @@ -1063,7 +1072,10 @@ static int check_get_trim(struct thread_data *td, struct io_u *io_u) static int check_get_verify(struct thread_data *td, struct io_u *io_u) { - if (td->o.verify_backlog && td->io_hist_len) { + if (!(td->flags & TD_F_VER_BACKLOG)) + return 0; + + if (td->io_hist_len) { int get_verify = 0; if (td->verify_batch) @@ -1154,7 +1166,7 @@ struct io_u *get_io_u(struct thread_data *td) /* * If using an iolog, grab next piece if any available. */ - if (td->o.read_iolog_file) { + if (td->flags & TD_F_READ_IOLOG) { if (read_iolog_get(td, io_u)) goto err_put; } else if (set_io_u_file(td, io_u)) { @@ -1175,12 +1187,12 @@ struct io_u *get_io_u(struct thread_data *td) f->last_pos = io_u->offset + io_u->buflen; if (io_u->ddir == DDIR_WRITE) { - if (td->o.refill_buffers) { + if (td->flags & TD_F_REFILL_BUFFERS) { io_u_fill_buffer(td, io_u, io_u->xfer_buflen, io_u->xfer_buflen); - } else if (td->o.scramble_buffers) + } else if (td->flags & TD_F_SCRAMBLE_BUFFERS) do_scramble = 1; - if (td->o.verify != VERIFY_NONE) { + if (td->flags & TD_F_VER_NONE) { populate_verify_io_u(td, io_u); do_scramble = 0; } diff --git a/ioengine.h b/ioengine.h index 61cb396..997f90a 100644 --- a/ioengine.h +++ b/ioengine.h @@ -1,7 +1,7 @@ #ifndef FIO_IOENGINE_H #define FIO_IOENGINE_H -#define FIO_IOOPS_VERSION 13 +#define FIO_IOOPS_VERSION 14 enum { IO_U_F_FREE = 1 << 0, @@ -45,12 +45,16 @@ struct io_u { struct timeval start_time; struct timeval issue_time; + struct fio_file *file; + unsigned int flags; + enum fio_ddir ddir; + /* * Allocated/set buffer and length */ - void *buf; unsigned long buflen; unsigned long long offset; + void *buf; /* * Initial seed for generating the buffer contents @@ -73,8 +77,6 @@ struct io_u { unsigned int resid; unsigned int error; - enum fio_ddir ddir; - /* * io engine private data */ @@ -84,10 +86,6 @@ struct io_u { void *engine_data; }; - unsigned int flags; - - struct fio_file *file; - struct flist_head list; /* @@ -122,6 +120,7 @@ struct ioengine_ops { int (*open_file)(struct thread_data *, struct fio_file *); int (*close_file)(struct thread_data *, struct fio_file *); int (*get_file_size)(struct thread_data *, struct fio_file *); + void (*terminate)(struct thread_data *); int option_struct_size; struct fio_option *options; void *data; @@ -136,10 +135,9 @@ enum fio_ioengine_flags { FIO_NODISKUTIL = 1 << 4, /* diskutil can't handle filename */ FIO_UNIDIR = 1 << 5, /* engine is uni-directional */ FIO_NOIO = 1 << 6, /* thread does only pseudo IO */ - FIO_SIGTERM = 1 << 7, /* needs SIGTERM to exit */ - FIO_PIPEIO = 1 << 8, /* input/output no seekable */ - FIO_BARRIER = 1 << 9, /* engine supports barriers */ - FIO_MEMALIGN = 1 << 10, /* engine wants aligned memory */ + FIO_PIPEIO = 1 << 7, /* input/output no seekable */ + FIO_BARRIER = 1 << 8, /* engine supports barriers */ + FIO_MEMALIGN = 1 << 9, /* engine wants aligned memory */ }; /* diff --git a/libfio.c b/libfio.c index ee5a0ea..96ae814 100644 --- a/libfio.c +++ b/libfio.c @@ -177,15 +177,15 @@ void fio_terminate_threads(int group_id) /* * if the thread is running, just let it exit */ - if (!td->pid) + if (!td->pid || pid == td->pid) continue; else if (td->runstate < TD_RAMP) kill(td->pid, SIGTERM); - else if (pid != td->pid) { + else { struct ioengine_ops *ops = td->io_ops; - if (ops && (ops->flags & FIO_SIGTERM)) - kill(td->pid, SIGTERM); + if (ops && ops->terminate) + ops->terminate(td); } } } diff --git a/os/os.h b/os/os.h index 2e4764e..1d3a750 100644 --- a/os/os.h +++ b/os/os.h @@ -135,7 +135,7 @@ typedef unsigned long os_cpu_mask_t; #endif #ifndef FIO_PREFERRED_CLOCK_SOURCE -#define FIO_PREFERRED_CLOCK_SOURCE CS_CGETTIME +#define FIO_PREFERRED_CLOCK_SOURCE CS_GTOD #endif #ifndef FIO_MAX_JOBS diff --git a/profile.c b/profile.c index 855dde3..506462e 100644 --- a/profile.c +++ b/profile.c @@ -93,8 +93,10 @@ void profile_add_hooks(struct thread_data *td) if (!ops) return; - if (ops->io_ops) + if (ops->io_ops) { td->prof_io_ops = *ops->io_ops; + td->flags |= TD_F_PROFILE_OPS; + } } int profile_td_init(struct thread_data *td) diff --git a/verify.c b/verify.c index f246dc8..c0485d5 100644 --- a/verify.c +++ b/verify.c @@ -690,6 +690,7 @@ int verify_io_u(struct thread_data *td, struct io_u *io_u) .hdr_num = hdr_num, .td = td, }; + unsigned int verify_type; if (ret && td->o.verify_fatal) break; @@ -708,7 +709,12 @@ int verify_io_u(struct thread_data *td, struct io_u *io_u) return EILSEQ; } - switch (hdr->verify_type) { + if (td->o.verify != VERIFY_NONE) + verify_type = td->o.verify; + else + verify_type = hdr->verify_type; + + switch (verify_type) { case VERIFY_MD5: ret = verify_io_u_md5(hdr, &vc); break; @@ -747,6 +753,10 @@ int verify_io_u(struct thread_data *td, struct io_u *io_u) log_err("Bad verify type %u\n", hdr->verify_type); ret = EINVAL; } + + if (ret && verify_type != hdr->verify_type) + log_err("fio: verify type mismatch (%u media, %u given)\n", + hdr->verify_type, verify_type); } done: -- To unsubscribe from this list: send the line "unsubscribe fio" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html