A hack to support zero copy readv, only supports glfs_preadv_async() now. From: Bharata B Rao <bharata@xxxxxxxxxxxxxxxxxx> --- api/src/glfs-fops.c | 38 +++++++ api/src/glfs.h | 2 libglusterfs/src/call-stub.c | 63 ++++++++++++ libglusterfs/src/call-stub.h | 17 +++ libglusterfs/src/globals.c | 1 libglusterfs/src/glusterfs.h | 1 libglusterfs/src/syncop.c | 37 +++++++ libglusterfs/src/syncop.h | 4 + libglusterfs/src/xlator.h | 20 ++++ rpc/rpc-lib/src/protocol-common.h | 1 rpc/rpc-lib/src/rpc-clnt.c | 7 + rpc/rpc-lib/src/rpc-clnt.h | 2 rpc/rpc-lib/src/rpc-transport.h | 3 + rpc/rpc-transport/socket/src/socket.c | 103 ++++++++++++++++++- xlators/cluster/dht/src/dht-common.h | 7 + xlators/cluster/dht/src/dht-inode-read.c | 92 +++++++++++++++++ xlators/cluster/dht/src/dht.c | 1 xlators/debug/error-gen/src/error-gen.c | 49 +++++++++ xlators/debug/io-stats/src/io-stats.c | 52 ++++++++++ xlators/performance/md-cache/src/md-cache.c | 43 ++++++++ xlators/protocol/client/src/client-rpc-fops.c | 135 +++++++++++++++++++++++++ xlators/protocol/client/src/client.c | 41 ++++++++ xlators/protocol/client/src/client.h | 2 xlators/protocol/server/src/server-rpc-fops.c | 133 +++++++++++++++++++++++++ 24 files changed, 848 insertions(+), 6 deletions(-) diff --git a/api/src/glfs-fops.c b/api/src/glfs-fops.c index be26dc1..b977a4b 100644 --- a/api/src/glfs-fops.c +++ b/api/src/glfs-fops.c @@ -344,6 +344,35 @@ glfs_preadv (struct glfs_fd *glfd, const struct iovec *iovec, int iovcnt, return size; } +ssize_t +glfs_preadv_zcopy (struct glfs_fd *glfd, const struct iovec *iovec, int iovcnt, + off_t offset, int flags) +{ + xlator_t *subvol = NULL; + int ret = -1; + struct iovec *iov = NULL; + int cnt = 0; + struct iobref *iobref = NULL; + + __glfs_entry_fd (glfd); + + subvol = glfs_fd_subvol (glfd); + + ret = syncop_readv_zcopy (subvol, glfd->fd, offset, + 0, iovec, iovcnt, &iobref); + if (ret <= 0) + return ret; + + glfd->offset = (offset + ret); + + if (iov) + GF_FREE (iov); + if (iobref) + iobref_unref (iobref); + + return ret; +} + ssize_t glfs_read (struct glfs_fd *glfd, void *buf, size_t count, int flags) @@ -425,6 +454,10 @@ glfs_io_async_task (void *data) ret = glfs_preadv (gio->glfd, gio->iov, gio->count, gio->offset, gio->flags); break; + case GF_FOP_READ_ZCOPY: + ret = glfs_preadv_zcopy (gio->glfd, gio->iov, gio->count, + gio->offset, gio->flags); + break; case GF_FOP_WRITE: ret = glfs_pwritev (gio->glfd, gio->iov, gio->count, gio->offset, gio->flags); @@ -464,7 +497,10 @@ glfs_preadv_async (struct glfs_fd *glfd, const struct iovec *iovec, int count, return -1; } - gio->op = GF_FOP_READ; + if (flags & GLUSTERFS_READV_ZCOPY) + gio->op = GF_FOP_READ_ZCOPY; + else + gio->op = GF_FOP_READ; gio->glfd = glfd; gio->count = count; gio->offset = offset; diff --git a/api/src/glfs.h b/api/src/glfs.h index e19c1cd..435fe45 100644 --- a/api/src/glfs.h +++ b/api/src/glfs.h @@ -48,6 +48,8 @@ __BEGIN_DECLS struct glfs; typedef struct glfs glfs_t; +/* flags for readv variants */ +#define GLUSTERFS_READV_ZCOPY 0x1 /* SYNOPSIS diff --git a/libglusterfs/src/call-stub.c b/libglusterfs/src/call-stub.c index 7bf8613..3f74eb6 100644 --- a/libglusterfs/src/call-stub.c +++ b/libglusterfs/src/call-stub.c @@ -918,6 +918,58 @@ out: return stub; } +call_stub_t * +fop_readv_zcopy_stub (call_frame_t *frame, fop_readv_zcopy_t fn, + fd_t *fd, struct iovec *vector, int32_t count, off_t off, + uint32_t flags, dict_t *xdata) +{ + call_stub_t *stub = NULL; + + GF_VALIDATE_OR_GOTO ("call-stub", frame, out); + + stub = stub_new (frame, 1, GF_FOP_READ_ZCOPY); + GF_VALIDATE_OR_GOTO ("call-stub", stub, out); + + stub->fn.readv = fn; + if (fd) + stub->args.fd = fd_ref (fd); + stub->args.size = iov_length(vector, count); + stub->args.offset = off; + stub->args.flags = flags; + + if (xdata) + stub->args.xdata = dict_ref (xdata); +out: + return stub; +} + + +call_stub_t * +fop_readv_zcopy_cbk_stub (call_frame_t *frame, fop_readv_cbk_t fn, + int32_t op_ret, int32_t op_errno, + struct iatt *stbuf, + struct iobref *iobref, dict_t *xdata) +{ + call_stub_t *stub = NULL; + + GF_VALIDATE_OR_GOTO ("call-stub", frame, out); + + stub = stub_new (frame, 0, GF_FOP_READ_ZCOPY); + GF_VALIDATE_OR_GOTO ("call-stub", stub, out); + + stub->fn_cbk.readv = fn; + stub->args_cbk.op_ret = op_ret; + stub->args_cbk.op_errno = op_errno; + if (op_ret >= 0) { + stub->args_cbk.stat = *stbuf; + stub->args_cbk.iobref = iobref_ref (iobref); + } + if (xdata) + stub->args_cbk.xdata = dict_ref (xdata); +out: + return stub; +} + call_stub_t * fop_writev_stub (call_frame_t *frame, fop_writev_t fn, @@ -2204,6 +2256,12 @@ call_resume_wind (call_stub_t *stub) stub->args.offset, stub->args.flags, stub->args.xdata); break; + case GF_FOP_READ_ZCOPY: + stub->fn.readv_zcopy (stub->frame, stub->frame->this, + stub->args.fd, stub->args.vector, + stub->args.count, stub->args.offset, + stub->args.flags, stub->args.xdata); + break; case GF_FOP_WRITE: stub->fn.writev (stub->frame, stub->frame->this, stub->args.fd, stub->args.vector, @@ -2439,6 +2497,11 @@ call_resume_unwind (call_stub_t *stub) stub->args_cbk.count, &stub->args_cbk.stat, stub->args_cbk.iobref, stub->args_cbk.xdata); break; + case GF_FOP_READ_ZCOPY: + STUB_UNWIND (stub, readv_zcopy, stub->args_cbk.vector, + stub->args_cbk.count, &stub->args_cbk.stat, + stub->args_cbk.iobref, stub->args_cbk.xdata); + break; case GF_FOP_WRITE: STUB_UNWIND (stub, writev, &stub->args_cbk.prestat, &stub->args_cbk.poststat, stub->args_cbk.xdata); diff --git a/libglusterfs/src/call-stub.h b/libglusterfs/src/call-stub.h index 3351118..ad03f6b 100644 --- a/libglusterfs/src/call-stub.h +++ b/libglusterfs/src/call-stub.h @@ -69,6 +69,7 @@ typedef struct { fop_fxattrop_t fxattrop; fop_setattr_t setattr; fop_fsetattr_t fsetattr; + fop_readv_zcopy_t readv_zcopy; } fn; union { @@ -113,6 +114,7 @@ typedef struct { fop_fxattrop_cbk_t fxattrop; fop_setattr_cbk_t setattr; fop_fsetattr_cbk_t fsetattr; + fop_readv_zcopy_cbk_t readv_zcopy; } fn_cbk; struct { @@ -410,6 +412,21 @@ fop_readv_cbk_stub (call_frame_t *frame, struct iobref *iobref, dict_t *xdata); call_stub_t * +fop_readv_zcopy_stub (call_frame_t *frame, + fop_readv_zcopy_t fn, + fd_t *fd, struct iovec *vector, + int32_t count, + off_t off, uint32_t flags, dict_t *xdata); + +call_stub_t * +fop_readv_zcopy_cbk_stub (call_frame_t *frame, + fop_readv_zcopy_cbk_t fn, + int32_t op_ret, + int32_t op_errno, + struct iatt *stbuf, + struct iobref *iobref, dict_t *xdata); + +call_stub_t * fop_writev_stub (call_frame_t *frame, fop_writev_t fn, fd_t *fd, diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c index 05ff52c..8bb6458 100644 --- a/libglusterfs/src/globals.c +++ b/libglusterfs/src/globals.c @@ -67,6 +67,7 @@ const char *gf_fop_list[GF_FOP_MAXVALUE] = { [GF_FOP_RELEASE] = "RELEASE", [GF_FOP_RELEASEDIR] = "RELEASEDIR", [GF_FOP_FREMOVEXATTR]= "FREMOVEXATTR", + [GF_FOP_READ_ZCOPY] = "READ_ZCOPY", }; /* THIS */ diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 74e6847..ef6067a 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -196,6 +196,7 @@ typedef enum { GF_FOP_RELEASEDIR, GF_FOP_GETSPEC, GF_FOP_FREMOVEXATTR, + GF_FOP_READ_ZCOPY, GF_FOP_MAXVALUE, } glusterfs_fop_t; diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index c996b8f..0596188 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -1095,6 +1095,43 @@ out: } +int32_t +syncop_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *stbuf, + struct iobref *iobref, dict_t *xdata) +{ + struct syncargs *args = NULL; + + args = cookie; + + INIT_LIST_HEAD (&args->entries.list); + + args->op_ret = op_ret; + args->op_errno = op_errno; + + __wake (args); + + return 0; + +} + +int +syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off, + uint32_t flags, struct iovec *vector, int count, + struct iobref **iobref) +{ + struct syncargs args = {0, }; + + SYNCOP (subvol, (&args), syncop_readv_zcopy_cbk, + subvol->fops->readv_zcopy, fd, vector, count, off, flags, NULL); + + if (args.op_ret < 0) + goto out; +out: + errno = args.op_errno; + return args.op_ret; +} + int syncop_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, struct iatt *prebuf, diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 001c68f..61c4222 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -252,6 +252,10 @@ int syncop_readv (xlator_t *subvol, fd_t *fd, size_t size, off_t off, uint32_t flags, /* out */ struct iovec **vector, int *count, struct iobref **iobref); +int syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off, + uint32_t flags, + /* out */ + struct iovec *vector, int count, struct iobref **iobref); int syncop_ftruncate (xlator_t *subvol, fd_t *fd, off_t offset); int syncop_truncate (xlator_t *subvol, loc_t *loc, off_t offset); diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h index 1e21b63..30884c3 100644 --- a/libglusterfs/src/xlator.h +++ b/libglusterfs/src/xlator.h @@ -261,6 +261,16 @@ typedef int32_t (*fop_readv_cbk_t) (call_frame_t *frame, struct iatt *stbuf, struct iobref *iobref, dict_t *xdata); +typedef int32_t (*fop_readv_zcopy_cbk_t) (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct iovec *vector, + int32_t count, + struct iatt *stbuf, + struct iobref *iobref, dict_t *xdata); + typedef int32_t (*fop_writev_cbk_t) (call_frame_t *frame, void *cookie, xlator_t *this, @@ -501,6 +511,14 @@ typedef int32_t (*fop_readv_t) (call_frame_t *frame, off_t offset, uint32_t flags, dict_t *xdata); +typedef int32_t (*fop_readv_zcopy_t) (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t offset, + uint32_t flags, dict_t *xdata); + typedef int32_t (*fop_writev_t) (call_frame_t *frame, xlator_t *this, fd_t *fd, @@ -678,6 +696,7 @@ struct xlator_fops { fop_setattr_t setattr; fop_fsetattr_t fsetattr; fop_getspec_t getspec; + fop_readv_zcopy_t readv_zcopy; /* these entries are used for a typechecking hack in STACK_WIND _only_ */ fop_lookup_cbk_t lookup_cbk; @@ -722,6 +741,7 @@ struct xlator_fops { fop_setattr_cbk_t setattr_cbk; fop_fsetattr_cbk_t fsetattr_cbk; fop_getspec_cbk_t getspec_cbk; + fop_readv_zcopy_cbk_t readv_zcopy_cbk; }; typedef int32_t (*cbk_forget_t) (xlator_t *this, diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 97017e5..e65d806 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -56,6 +56,7 @@ enum gf_fop_procnum { GFS3_OP_RELEASE, GFS3_OP_RELEASEDIR, GFS3_OP_FREMOVEXATTR, + GFS3_OP_READ_ZCOPY, GFS3_OP_MAXVALUE, } ; diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index e6c681d..212f4c7 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -466,6 +466,8 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) info->progver = saved_frame.rpcreq->prog->progver; info->rpc_req = saved_frame.rpcreq; info->rsp = saved_frame.rsp; + info->rsp_payload = saved_frame.rpcreq->rsp_payload; + info->rsp_payload_count = saved_frame.rpcreq->rsp_payload_count; ret = 0; out: @@ -1437,6 +1439,11 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, rpcreq->xid = callid; rpcreq->cbkfn = cbkfn; + memcpy(rpcreq->rsp_payload, rsp_payload, + rsp_payload_count * sizeof (struct iovec)); + rpcreq->rsp_payload_count = rsp_payload_count; + + ret = -1; if (proghdr) { diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 0da1655..710951e 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -161,6 +161,8 @@ struct rpc_req { int procnum; fop_cbk_fn_t cbkfn; void *conn_private; + struct iovec rsp_payload[256]; /* TODO: Allocate this */ + int32_t rsp_payload_count; }; typedef struct rpc_clnt { diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 272de9d..8bc2d18 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -158,6 +158,9 @@ struct rpc_request_info { int procnum; void *rpc_req; /* struct rpc_req */ rpc_transport_rsp_t rsp; + /* TODO: This should ideally reside in @rsp above */ + struct iovec *rsp_payload; + int32_t rsp_payload_count; }; typedef struct rpc_request_info rpc_request_info_t; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index fffc137..1a7302e 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -378,6 +378,9 @@ __socket_cached_read (rpc_transport_t *this, struct iovec *opvector, int opcount goto uncached; } + /* TODO */ + goto uncached; + if (!in->ra_max) { /* first call after passing SP_STATE_READING_FRAGHDR */ in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX); @@ -1165,6 +1168,72 @@ out: return ret; } +static inline int +__socket_read_simple_msg_zcopy (rpc_transport_t *this) +{ + int ret = 0; + uint32_t remaining_size = 0; + size_t bytes_read = 0; + socket_private_t *priv = NULL; + struct gf_sock_incoming *in = NULL; + struct gf_sock_incoming_frag *frag = NULL; + + GF_VALIDATE_OR_GOTO ("socket", this, out); + GF_VALIDATE_OR_GOTO ("socket", this->private, out); + + priv = this->private; + + in = &priv->incoming; + frag = &in->frag; + + switch (frag->simple_state) { + + case SP_STATE_SIMPLE_MSG_INIT: + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; + + frag->simple_state = SP_STATE_READING_SIMPLE_MSG; + + /* fall through */ + + case SP_STATE_READING_SIMPLE_MSG: + ret = 0; + + remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read; + + if (remaining_size > 0) { + ret = __socket_readv (this, + in->pending_vector, + in->pending_count, + &in->pending_vector, + &in->pending_count, + &bytes_read); + } + + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "reading from socket failed. Error (%s), " + "peer (%s)", strerror (errno), + this->peerinfo.identifier); + break; + } + + frag->bytes_read += bytes_read; + //frag->fragcurrent += bytes_read; + + if (ret > 0) { + gf_log (this->name, GF_LOG_TRACE, + "partial read on non-blocking socket."); + break; + } + + if (ret == 0) { + frag->simple_state = SP_STATE_SIMPLE_MSG_INIT; + } + } + +out: + return ret; +} static inline int __socket_read_simple_request (rpc_transport_t *this) @@ -1510,6 +1579,17 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) case SP_STATE_READ_PROC_OPAQUE: read_proc_opaque: + /* Initialize in->pending_vector with user supplied iovec */ + if (in->request_info && + in->request_info->procnum == GFS3_OP_READ_ZCOPY) { + in->pending_vector = in->request_info->rsp_payload; + in->pending_count = in->request_info->rsp_payload_count; + frag->call_body.reply.accepted_success_state + = SP_STATE_READ_PROC_HEADER; + + /* fall through */ + + } else { if (in->payload_vector.iov_base == NULL) { size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read); @@ -1541,12 +1621,17 @@ __socket_read_accepted_successful_reply (rpc_transport_t *this) frag->call_body.reply.accepted_success_state = SP_STATE_READ_PROC_HEADER; - + } /* fall through */ case SP_STATE_READ_PROC_HEADER: /* now read the entire remaining msg into new iobuf */ - ret = __socket_read_simple_msg (this); + if (in->request_info && + in->request_info->procnum == GFS3_OP_READ_ZCOPY) { + ret = __socket_read_simple_msg_zcopy (this); + } else { + ret = __socket_read_simple_msg (this); + } if ((ret == -1) || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) { frag->call_body.reply.accepted_success_state @@ -1797,7 +1882,8 @@ __socket_read_reply (rpc_transport_t *this) } if ((request_info->prognum == GLUSTER_FOP_PROGRAM) - && (request_info->procnum == GF_FOP_READ)) { + && (request_info->procnum == GF_FOP_READ || + request_info->procnum == GFS3_OP_READ_ZCOPY)) { if (map_xid && request_info->rsp.rsp_payload_count != 0) { in->iobref = iobref_ref (request_info->rsp.rsp_iobref); in->payload_vector = *request_info->rsp.rsp_payload; @@ -1976,7 +2062,14 @@ __socket_proto_state_machine (rpc_transport_t *this, /* fall through */ case SP_STATE_READ_FRAGHDR: - + /* + * TODO: + * IIUC, Memory is allocated for entire payload here, + * but in case of readv, payload memory is allocated + * again in __socket_read_accepted_successful_reply(). + * The latter one is actually used to return data to + * the caller. + */ in->fraghdr = ntoh32 (in->fraghdr); in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr); iobuf = iobuf_get2 (this->ctx->iobuf_pool, @@ -2059,6 +2152,8 @@ __socket_proto_state_machine (rpc_transport_t *this, in->request_info = NULL; } in->record_state = SP_STATE_COMPLETE; + in->pending_vector = NULL; + in->pending_count = 0; break; case SP_STATE_COMPLETE: diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index bd00089..e0271f4 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -560,6 +560,13 @@ int32_t dht_readv (call_frame_t *frame, size_t size, off_t offset, uint32_t flags, dict_t *xdata); +int32_t dht_readv_zcopy (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t offset, uint32_t flags, dict_t *xdata); + int32_t dht_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, diff --git a/xlators/cluster/dht/src/dht-inode-read.c b/xlators/cluster/dht/src/dht-inode-read.c index f17cb73..9438c99 100644 --- a/xlators/cluster/dht/src/dht-inode-read.c +++ b/xlators/cluster/dht/src/dht-inode-read.c @@ -420,6 +420,52 @@ out: } int +dht_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, + struct iatt *stbuf, + struct iobref *iobref, dict_t *xdata) +{ + dht_local_t *local = NULL; + int ret = 0; + + local = frame->local; + if (!local) { + op_ret = -1; + op_errno = EINVAL; + goto out; + } + + /* This is already second try, no need for re-check */ + if (local->call_cnt != 1) + goto out; + + if ((op_ret == -1) && (op_errno != ENOENT)) + goto out; + + if ((op_ret == -1) || IS_DHT_MIGRATION_PHASE2 (stbuf)) { + /* File would be migrated to other node */ + ret = fd_ctx_get (local->fd, this, NULL); + if (ret) { + local->rebalance.target_op_fn = dht_readv2; + ret = dht_rebalance_complete_check (this, frame); + } else { + /* value is already set in fd_ctx, that means no need + to check for whether its complete or not. */ + dht_readv2 (this, frame, 0); + } + if (!ret) + return 0; + } + +out: + DHT_STRIP_PHASE1_FLAGS (stbuf); + DHT_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0, stbuf, + iobref, xdata); + + return 0; +} + +int dht_readv2 (xlator_t *this, call_frame_t *frame, int op_ret) { dht_local_t *local = NULL; @@ -493,6 +539,52 @@ err: } int +dht_readv_zcopy (call_frame_t *frame, xlator_t *this, + fd_t *fd, struct iovec *vector, int32_t count, off_t off, + uint32_t flags, dict_t *xdata) +{ + xlator_t *subvol = NULL; + int op_errno = -1; + dht_local_t *local = NULL; + size_t size; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (fd, err); + + local = dht_local_init (frame, NULL, fd, GF_FOP_READ_ZCOPY); + if (!local) { + op_errno = ENOMEM; + goto err; + } + + subvol = local->cached_subvol; + if (!subvol) { + gf_log (this->name, GF_LOG_DEBUG, + "no cached subvolume for fd=%p", fd); + op_errno = EINVAL; + goto err; + } + + size = iov_length(vector, count); + local->rebalance.offset = off; + local->rebalance.size = size; + local->rebalance.flags = flags; + local->call_cnt = 1; + + STACK_WIND (frame, dht_readv_zcopy_cbk, + subvol, subvol->fops->readv_zcopy, + fd, vector, count, off, flags, xdata); + + return 0; + +err: + op_errno = (op_errno == -1) ? errno : op_errno; + DHT_STACK_UNWIND (readv, frame, -1, op_errno, NULL, 0, NULL, NULL, NULL); + + return 0; +} +int dht_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret, int op_errno, dict_t *xdata) { diff --git a/xlators/cluster/dht/src/dht.c b/xlators/cluster/dht/src/dht.c index 784ed92..1289b0f 100644 --- a/xlators/cluster/dht/src/dht.c +++ b/xlators/cluster/dht/src/dht.c @@ -599,6 +599,7 @@ struct xlator_fops fops = { .fxattrop = dht_fxattrop, .setattr = dht_setattr, .fsetattr = dht_fsetattr, + .readv_zcopy = dht_readv_zcopy, }; struct xlator_dumpops dumpops = { diff --git a/xlators/debug/error-gen/src/error-gen.c b/xlators/debug/error-gen/src/error-gen.c index 6bdb041..f62784d 100644 --- a/xlators/debug/error-gen/src/error-gen.c +++ b/xlators/debug/error-gen/src/error-gen.c @@ -174,7 +174,10 @@ sys_error_t error_no_list[] = { EROFS,EBADF,EIO}}, [GF_FOP_GETSPEC] = { .error_no_count = 4, .error_no = {EACCES,EBADF,ENAMETOOLONG, - EINTR}} + EINTR}}, + [GF_FOP_READ_ZCOPY] = { .error_no_count = 5, + .error_no = {EINVAL,EBADF,EFAULT,EISDIR, + ENAMETOOLONG}} }; int @@ -275,6 +278,8 @@ get_fop_int (char **op_no_str) return GF_FOP_OPEN; else if (!strcmp ((*op_no_str), "readv")) return GF_FOP_READ; + else if (!strcmp ((*op_no_str), "readv_zcopy")) + return GF_FOP_READ_ZCOPY; else if (!strcmp ((*op_no_str), "writev")) return GF_FOP_WRITE; else if (!strcmp ((*op_no_str), "statfs")) @@ -1082,6 +1087,47 @@ error_gen_readv (call_frame_t *frame, xlator_t *this, return 0; } +int +error_gen_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *stbuf, struct iobref *iobref, dict_t *xdata) +{ + STACK_UNWIND_STRICT (readv_zcopy, frame, op_ret, op_errno, + NULL, 0, stbuf, iobref, xdata); + return 0; +} + + +int +error_gen_readv_zcopy (call_frame_t *frame, xlator_t *this, + fd_t *fd, struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, dict_t *xdata) +{ + int op_errno = 0; + eg_t *egp = NULL; + int enable = 1; + + egp = this->private; + enable = egp->enable[GF_FOP_READ_ZCOPY]; + + if (enable) + op_errno = error_gen (this, GF_FOP_READ_ZCOPY); + + if (op_errno) { + GF_ERROR(this, "unwind(-1, %s)", strerror (op_errno)); + STACK_UNWIND_STRICT (readv_zcopy, frame, -1, op_errno, NULL, 0, + NULL, NULL, xdata); + return 0; + } + + + STACK_WIND (frame, error_gen_readv_zcopy_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv_zcopy, + fd, vector, count, offset, flags, xdata); + return 0; +} + int error_gen_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -2148,6 +2194,7 @@ struct xlator_fops fops = { .create = error_gen_create, .open = error_gen_open, .readv = error_gen_readv, + .readv_zcopy = error_gen_readv_zcopy, .writev = error_gen_writev, .statfs = error_gen_statfs, .flush = error_gen_flush, diff --git a/xlators/debug/io-stats/src/io-stats.c b/xlators/debug/io-stats/src/io-stats.c index 63bb8fa..f3d6660 100644 --- a/xlators/debug/io-stats/src/io-stats.c +++ b/xlators/debug/io-stats/src/io-stats.c @@ -1346,6 +1346,40 @@ io_stats_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } +int +io_stats_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *buf, struct iobref *iobref, dict_t *xdata) +{ + //int len = 0; + fd_t *fd = NULL; + struct ios_stat *iosstat = NULL; + + fd = frame->local; + frame->local = NULL; + +#if 0 + if (op_ret > 0) { + len = iov_length (vector, count); + BUMP_READ (fd, len); + } +#endif + + UPDATE_PROFILE_STATS (frame, READ); + ios_inode_ctx_get (fd->inode, this, &iosstat); + + if (iosstat) { + BUMP_STATS (iosstat, IOS_STATS_TYPE_READ); + BUMP_THROUGHPUT (iosstat, IOS_STATS_THRU_READ); + iosstat = NULL; + } + + STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, + NULL, 0, buf, iobref, xdata); + return 0; + +} + int io_stats_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -2074,6 +2108,23 @@ io_stats_readv (call_frame_t *frame, xlator_t *this, int +io_stats_readv_zcopy (call_frame_t *frame, xlator_t *this, + fd_t *fd, struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, dict_t *xdata) +{ + frame->local = fd; + + START_FOP_LATENCY (frame); + + STACK_WIND (frame, io_stats_readv_zcopy_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv_zcopy, + fd, vector, count, offset, flags, xdata); + return 0; +} + + +int io_stats_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset, @@ -2790,6 +2841,7 @@ struct xlator_fops fops = { .truncate = io_stats_truncate, .open = io_stats_open, .readv = io_stats_readv, + .readv_zcopy = io_stats_readv_zcopy, .writev = io_stats_writev, .statfs = io_stats_statfs, .flush = io_stats_flush, diff --git a/xlators/performance/md-cache/src/md-cache.c b/xlators/performance/md-cache/src/md-cache.c index 0c5ca87..0c3d6b2 100644 --- a/xlators/performance/md-cache/src/md-cache.c +++ b/xlators/performance/md-cache/src/md-cache.c @@ -1385,6 +1385,48 @@ mdc_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, return 0; } +int +mdc_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iatt *stbuf, struct iobref *iobref, dict_t *xdata) +{ + mdc_local_t *local = NULL; + + local = frame->local; + + if (op_ret != 0) + goto out; + + if (!local) + goto out; + + mdc_inode_iatt_set (this, local->fd->inode, stbuf); + +out: + MDC_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0, + stbuf, iobref, xdata); + + return 0; +} + + +int +mdc_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, uint32_t flags, + dict_t *xdata) +{ + mdc_local_t *local = NULL; + + local = mdc_local_get (frame); + + local->fd = fd_ref (fd); + + STACK_WIND (frame, mdc_readv_zcopy_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv_zcopy, + fd, vector, count, offset, flags, xdata); + return 0; +} + int mdc_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -1947,6 +1989,7 @@ struct xlator_fops fops = { .link = mdc_link, .create = mdc_create, .readv = mdc_readv, + .readv_zcopy = mdc_readv_zcopy, .writev = mdc_writev, .setattr = mdc_setattr, .fsetattr = mdc_fsetattr, diff --git a/xlators/protocol/client/src/client-rpc-fops.c b/xlators/protocol/client/src/client-rpc-fops.c index f524c1a..b05c003 100644 --- a/xlators/protocol/client/src/client-rpc-fops.c +++ b/xlators/protocol/client/src/client-rpc-fops.c @@ -2712,6 +2712,79 @@ out: } int +client3_3_readv_zcopy_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + call_frame_t *frame = NULL; + struct iobref *iobref = NULL; + struct iovec vector[MAX_IOVEC] = {{0}, }; + struct iatt stat = {0,}; + gfs3_read_rsp rsp = {0,}; + int ret = 0, rspcount = 0; + clnt_local_t *local = NULL; + xlator_t *this = NULL; + dict_t *xdata = NULL; + + this = THIS; + + memset (vector, 0, sizeof (vector)); + + frame = myframe; + local = frame->local; + + if (-1 == req->rpc_status) { + rsp.op_ret = -1; + rsp.op_errno = ENOTCONN; + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_read_rsp); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); + rsp.op_ret = -1; + rsp.op_errno = EINVAL; + goto out; + } + + if (rsp.op_ret != -1) { + iobref = req->rsp_iobref; + gf_stat_to_iatt (&rsp.stat, &stat); + + vector[0].iov_len = rsp.op_ret; + if (rsp.op_ret > 0) + vector[0].iov_base = req->rsp[1].iov_base; + rspcount = 1; + } + GF_PROTOCOL_DICT_UNSERIALIZE (this, xdata, (rsp.xdata.xdata_val), + (rsp.xdata.xdata_len), ret, + rsp.op_errno, out); + +#ifdef GF_TESTING_IO_XDATA + dict_dump (xdata); +#endif + +out: + if (rsp.op_ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "remote operation failed: %s", + strerror (gf_error_to_errno (rsp.op_errno))); + } else if (rsp.op_ret >= 0) { + if (local->attempt_reopen) + client_attempt_reopen (local->fd, this); + } + CLIENT_STACK_UNWIND (readv_zcopy, frame, rsp.op_ret, + gf_error_to_errno (rsp.op_errno), vector, rspcount, + &stat, iobref, xdata); + + free (rsp.xdata.xdata_val); + + if (xdata) + dict_unref (xdata); + + return 0; +} + +int client3_3_release_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { @@ -3955,6 +4028,7 @@ client3_3_readv (call_frame_t *frame, xlator_t *this, } local->iobref = rsp_iobref; + local->iobref = NULL; rsp_iobref = NULL; GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val), @@ -3986,6 +4060,65 @@ unwind: return 0; } +int32_t +client3_3_readv_zcopy (call_frame_t *frame, xlator_t *this, + void *data) +{ + clnt_args_t *args = NULL; + int64_t remote_fd = -1; + clnt_conf_t *conf = NULL; + clnt_local_t *local = NULL; + int op_errno = ESTALE; + gfs3_read_req req = {{0,},}; + int ret = 0; + + if (!frame || !this || !data) + goto unwind; + + args = data; + conf = this->private; + + CLIENT_GET_REMOTE_FD (this, args->fd, FALLBACK_TO_ANON_FD, + remote_fd, op_errno, unwind); + ret = client_fd_fop_prepare_local (frame, args->fd, remote_fd); + if (ret) { + op_errno = -ret; + goto unwind; + } + local = frame->local; + + req.size = args->size; + req.offset = args->offset; + req.fd = remote_fd; + req.flag = args->flags; + + memcpy (req.gfid, args->fd->inode->gfid, 16); + + GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val), + req.xdata.xdata_len, op_errno, unwind); + + ret = client_submit_request (this, &req, frame, conf->fops, + GFS3_OP_READ_ZCOPY, client3_3_readv_zcopy_cbk, NULL, + NULL, 0, args->payload_vector, + args->payload_count, + local->iobref, + (xdrproc_t)xdr_gfs3_read_req); + if (ret) { + //unwind is done in the cbk + gf_log (this->name, GF_LOG_WARNING, "failed to send the fop"); + } + + GF_FREE (req.xdata.xdata_val); + + return 0; +unwind: + + CLIENT_STACK_UNWIND (readv_zcopy, frame, -1, op_errno, NULL, 0, NULL, NULL, NULL); + GF_FREE (req.xdata.xdata_val); + + return 0; +} + int32_t client3_3_writev (call_frame_t *frame, xlator_t *this, void *data) @@ -5845,6 +5978,7 @@ rpc_clnt_procedure_t clnt3_3_fop_actors[GF_FOP_MAXVALUE] = { [GF_FOP_RELEASEDIR] = { "RELEASEDIR", client3_3_releasedir }, [GF_FOP_GETSPEC] = { "GETSPEC", client3_getspec }, [GF_FOP_FREMOVEXATTR] = { "FREMOVEXATTR", client3_3_fremovexattr }, + [GF_FOP_READ_ZCOPY] = { "READ_ZCOPY", client3_3_readv_zcopy }, }; /* Used From RPC-CLNT library to log proper name of procedure based on number */ @@ -5893,6 +6027,7 @@ char *clnt3_3_fop_names[GFS3_OP_MAXVALUE] = { [GFS3_OP_RELEASE] = "RELEASE", [GFS3_OP_RELEASEDIR] = "RELEASEDIR", [GFS3_OP_FREMOVEXATTR] = "FREMOVEXATTR", + [GFS3_OP_READ_ZCOPY] = "READ_ZCOPY", }; rpc_clnt_prog_t clnt3_3_fop_prog = { diff --git a/xlators/protocol/client/src/client.c b/xlators/protocol/client/src/client.c index 931c671..5d4e9d1 100644 --- a/xlators/protocol/client/src/client.c +++ b/xlators/protocol/client/src/client.c @@ -923,6 +923,46 @@ out: } +int32_t +client_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, + off_t offset, uint32_t flags, dict_t *xdata) +{ + int ret = -1; + clnt_conf_t *conf = NULL; + rpc_clnt_procedure_t *proc = NULL; + clnt_args_t args = {0,}; + + conf = this->private; + if (!conf || !conf->fops) + goto out; + + args.fd = fd; + args.size = iov_length(vector, count); + args.offset = offset; + args.flags = flags; + args.xdata = xdata; + args.payload_vector = vector; + args.payload_count = count; + + proc = &conf->fops->proctable[GF_FOP_READ_ZCOPY]; + if (!proc) { + gf_log (this->name, GF_LOG_ERROR, + "rpc procedure not found for %s", + gf_fop_list[GF_FOP_READ]); + goto out; + } + if (proc->fn) + ret = proc->fn (frame, this, &args); + +out: + if (ret) + STACK_UNWIND_STRICT (readv_zcopy, frame, -1, ENOTCONN, + NULL, 0, NULL, NULL, NULL); + + return 0; +} + int32_t @@ -2636,6 +2676,7 @@ struct xlator_fops fops = { .truncate = client_truncate, .open = client_open, .readv = client_readv, + .readv_zcopy = client_readv_zcopy, .writev = client_writev, .statfs = client_statfs, .flush = client_flush, diff --git a/xlators/protocol/client/src/client.h b/xlators/protocol/client/src/client.h index 0a27c09..39c3bf8 100644 --- a/xlators/protocol/client/src/client.h +++ b/xlators/protocol/client/src/client.h @@ -195,6 +195,8 @@ typedef struct client_args { mode_t umask; dict_t *xdata; + struct iovec *payload_vector; + int32_t payload_count; } clnt_args_t; typedef ssize_t (*gfs_serialize_t) (struct iovec outmsg, void *args); diff --git a/xlators/protocol/server/src/server-rpc-fops.c b/xlators/protocol/server/src/server-rpc-fops.c index f44ced4..f16187d 100644 --- a/xlators/protocol/server/src/server-rpc-fops.c +++ b/xlators/protocol/server/src/server-rpc-fops.c @@ -1515,6 +1515,55 @@ out: } int +server_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + struct iovec *vector, int32_t count, + struct iatt *stbuf, struct iobref *iobref, dict_t *xdata) +{ + gfs3_read_rsp rsp = {0,}; + server_state_t *state = NULL; + rpcsvc_request_t *req = NULL; + + req = frame->local; + state = CALL_STATE(frame); + +#ifdef GF_TESTING_IO_XDATA + { + int ret = 0; + if (!xdata) + xdata = dict_new (); + + ret = dict_set_str (xdata, "testing-the-xdata-key", + "testing-xdata-value"); + } +#endif + GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val), + rsp.xdata.xdata_len, op_errno, out); + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_INFO, + "%"PRId64": READV_ZCOPY %"PRId64" (%s) ==> (%s)", + frame->root->unique, state->resolve.fd_no, + uuid_utoa (state->resolve.gfid), strerror (op_errno)); + goto out; + } + + gf_stat_from_iatt (&rsp.stat, stbuf); + rsp.size = op_ret; + +out: + rsp.op_ret = op_ret; + rsp.op_errno = gf_errno_to_error (op_errno); + + server_submit_reply (frame, req, &rsp, vector, count, iobref, + (xdrproc_t)xdr_gfs3_read_rsp); + + GF_FREE (rsp.xdata.xdata_val); + + return 0; +} + +int server_rchecksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, uint32_t weak_checksum, uint8_t *strong_checksum, @@ -2749,6 +2798,27 @@ err: return 0; } +int +server_readv_zcopy_resume (call_frame_t *frame, xlator_t *bound_xl) +{ + server_state_t *state = NULL; + + state = CALL_STATE (frame); + + if (state->resolve.op_ret != 0) + goto err; + + STACK_WIND (frame, server_readv_cbk, + bound_xl, bound_xl->fops->readv, + state->fd, state->size, state->offset, state->flags, state->xdata); + + return 0; +err: + server_readv_cbk (frame, NULL, frame->this, state->resolve.op_ret, + state->resolve.op_errno, NULL, 0, NULL, NULL, NULL); + return 0; +} + int server_create_resume (call_frame_t *frame, xlator_t *bound_xl) @@ -3347,6 +3417,68 @@ out: return ret; } +int +server3_3_readv_zcopy (rpcsvc_request_t *req) +{ + server_state_t *state = NULL; + call_frame_t *frame = NULL; + gfs3_read_req args = {{0,},}; + int ret = -1; + int op_errno = 0; + + if (!req) + goto out; + + ret = xdr_to_generic (req->msg[0], &args, (xdrproc_t)xdr_gfs3_read_req); + if (ret < 0) { + //failed to decode msg; + req->rpc_err = GARBAGE_ARGS; + goto out; + } + + frame = get_frame_from_request (req); + if (!frame) { + // something wrong, mostly insufficient memory + req->rpc_err = GARBAGE_ARGS; /* TODO */ + goto out; + } + /* + * TODO: ZCOPY client requests are treated as normal READ requests + * in server + */ + frame->root->op = GF_FOP_READ; + + state = CALL_STATE (frame); + if (!state->conn->bound_xl) { + /* auth failure, request on subvolume without setvolume */ + req->rpc_err = GARBAGE_ARGS; + goto out; + } + + state->resolve.type = RESOLVE_MUST; + state->resolve.fd_no = args.fd; + state->size = args.size; + state->offset = args.offset; + state->flags = args.flag; + + memcpy (state->resolve.gfid, args.gfid, 16); + + GF_PROTOCOL_DICT_UNSERIALIZE (state->conn->bound_xl, state->xdata, + (args.xdata.xdata_val), + (args.xdata.xdata_len), ret, + op_errno, out); + + ret = 0; + resolve_and_resume (frame, server_readv_resume); +out: + /* memory allocated by libc, don't use GF_FREE */ + free (args.xdata.xdata_val); + + if (op_errno) + req->rpc_err = GARBAGE_ARGS; + + return ret; +} int server3_3_writev (rpcsvc_request_t *req) @@ -5760,6 +5892,7 @@ rpcsvc_actor_t glusterfs3_3_fop_actors[] = { [GFS3_OP_RELEASE] = { "RELEASE", GFS3_OP_RELEASE, server3_3_release, NULL, 0}, [GFS3_OP_RELEASEDIR] = { "RELEASEDIR", GFS3_OP_RELEASEDIR, server3_3_releasedir, NULL, 0}, [GFS3_OP_FREMOVEXATTR] = { "FREMOVEXATTR", GFS3_OP_FREMOVEXATTR, server3_3_fremovexattr, NULL, 0}, + [GFS3_OP_READ_ZCOPY] = { "READ_ZCOPY", GFS3_OP_READ_ZCOPY, server3_3_readv_zcopy, NULL, 0}, }; -- http://raobharata.wordpress.com/