[PATCH 13/18] pnfs-submit: rewrite of layout state handling and cb_layoutrecall

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Remove NFS_LAYOUT_STATEID_SET in favor of just checking list_empty(lo->segs).

LAYOUTGETs with openstateid are serialized.  Waiting on the condition
(list_empty(lo->segs) && plh_outstanding>0) both drains outstanding RPCs once
the stateid is invalidated and allows only a single LAYOUTGET(openstateid)
through at a time.

Before sending a LAYOUTRETURN, plh_block_lgets is incremented.  It is
decremented in the rpc_release function.  While set, LAYOUTGETs are
paused in their rpc_prepare function, and any responses are
forgotten.

Callbacks are handled by blocking any matching LAYOUTGETS while processing and
initiating drain of IO.  A notification system is set up so that when
all relevant IO is finished, the state manger thread is invoked, which
synchronously sends the final matching LAYOUTRETURN before unblocking
LAYOUTGETS.

Signed-off-by: Fred Isaman <iisaman@xxxxxxxxxx>
---
 fs/nfs/callback.h         |    4 +-
 fs/nfs/callback_proc.c    |  471 +++++++++++++++++++++++----------------------
 fs/nfs/client.c           |    3 +
 fs/nfs/inode.c            |    3 +-
 fs/nfs/nfs4proc.c         |  105 +++++++---
 fs/nfs/nfs4state.c        |    4 +
 fs/nfs/nfs4xdr.c          |   16 ++-
 fs/nfs/pnfs.c             |  181 +++++++++++++----
 fs/nfs/pnfs.h             |   41 +++-
 include/linux/nfs_fs.h    |    1 +
 include/linux/nfs_fs_sb.h |    4 +
 11 files changed, 518 insertions(+), 315 deletions(-)

diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index 817b0f4..c1c7f3e 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -161,7 +161,8 @@ struct cb_layoutrecallargs {
 extern unsigned nfs4_callback_layoutrecall(
 	struct cb_layoutrecallargs *args,
 	void *dummy, struct cb_process_state *cps);
-
+extern bool matches_outstanding_recall(struct inode *ino,
+				       struct pnfs_layout_range *range);
 #endif /* CONFIG_NFS_V4_1 */
 
 extern __be32 nfs4_callback_getattr(struct cb_getattrargs *args,
@@ -171,6 +172,7 @@ extern __be32 nfs4_callback_recall(struct cb_recallargs *args, void *dummy,
 				   struct cb_process_state *cps);
 
 #ifdef CONFIG_NFS_V4
+extern void nfs_client_return_layouts(struct nfs_client *clp);
 extern int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt);
 extern void nfs_callback_down(int minorversion);
 extern int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation,
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 1509c34..583446b 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -132,270 +132,291 @@ int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation, const nf
 #if defined(CONFIG_NFS_V4_1)
 
 static bool
-pnfs_is_next_layout_stateid(const struct pnfs_layout_hdr *lo,
-			    const nfs4_stateid stateid)
+_recall_matches_lget(struct pnfs_cb_lrecall_info *cb_info,
+		     struct inode *ino, struct pnfs_layout_range *range)
 {
-	bool res;
-	u32 oldseqid, newseqid;
-
-	spin_lock(&lo->inode->i_lock);
-	{
-		oldseqid = be32_to_cpu(lo->stateid.stateid.seqid);
-		newseqid = be32_to_cpu(stateid.stateid.seqid);
-		res = !memcmp(lo->stateid.stateid.other,
-			      stateid.stateid.other,
-			      NFS4_STATEID_OTHER_SIZE);
-		if (res) { /* comparing layout stateids */
-			if (oldseqid == ~0)
-				res = (newseqid == 1);
-			else
-				res = (newseqid == oldseqid + 1);
-		} else { /* open stateid */
-			res = !memcmp(lo->stateid.data,
-				      &zero_stateid,
-				      NFS4_STATEID_SIZE);
-			if (res)
-				res = (newseqid == 1);
-		}
-	}
-	spin_unlock(&lo->inode->i_lock);
+	struct cb_layoutrecallargs *cb_args = &cb_info->pcl_args;
 
-	return res;
+	switch (cb_args->cbl_recall_type) {
+	case RETURN_ALL:
+		return true;
+	case RETURN_FSID:
+		return !memcmp(&NFS_SERVER(ino)->fsid, &cb_args->cbl_fsid,
+			       sizeof(struct nfs_fsid));
+	case RETURN_FILE:
+		if (ino != cb_info->pcl_ino)
+			return false;
+		return should_free_lseg(range, &cb_args->cbl_range);
+	default:
+		BUG();
+	}
 }
 
-/*
- * Retrieve an inode based on layout recall parameters
- *
- * Note: caller must iput(inode) to dereference the inode.
- */
-static struct inode *
-nfs_layoutrecall_find_inode(struct nfs_client *clp,
-			    const struct cb_layoutrecallargs *args)
+bool
+matches_outstanding_recall(struct inode *ino, struct pnfs_layout_range *range)
 {
-	struct nfs_inode *nfsi;
-	struct pnfs_layout_hdr *lo;
-	struct nfs_server *server;
-	struct inode *ino = NULL;
-
-	dprintk("%s: Begin recall_type=%d clp %p\n",
-		__func__, args->cbl_recall_type, clp);
-
-	spin_lock(&clp->cl_lock);
-	list_for_each_entry(lo, &clp->cl_layouts, layouts) {
-		nfsi = NFS_I(lo->inode);
-		if (!nfsi)
-			continue;
-
-		dprintk("%s: Searching inode=%lu\n",
-			__func__, nfsi->vfs_inode.i_ino);
-
-		if (args->cbl_recall_type == RETURN_FILE) {
-		    if (nfs_compare_fh(&args->cbl_fh, &nfsi->fh))
-			continue;
-		} else if (args->cbl_recall_type == RETURN_FSID) {
-			server = NFS_SERVER(&nfsi->vfs_inode);
-			if (server->fsid.major != args->cbl_fsid.major ||
-			    server->fsid.minor != args->cbl_fsid.minor)
-				continue;
+	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
+	struct pnfs_cb_lrecall_info *cb_info;
+	bool rv = false;
+
+	assert_spin_locked(&clp->cl_lock);
+	list_for_each_entry(cb_info, &clp->cl_layoutrecalls, pcl_list) {
+		if (_recall_matches_lget(cb_info, ino, range)) {
+			rv = true;
+			break;
 		}
-
-		/* Make sure client didn't clean up layout without
-		 * telling the server */
-		if (!has_layout(nfsi))
-			continue;
-
-		ino = igrab(&nfsi->vfs_inode);
-		dprintk("%s: Found inode=%p\n", __func__, ino);
-		break;
 	}
-	spin_unlock(&clp->cl_lock);
-	return ino;
+	return rv;
 }
 
-struct recall_layout_threadargs {
-	struct inode *inode;
-	struct nfs_client *clp;
-	struct completion started;
-	struct cb_layoutrecallargs *rl;
-	int result;
-};
-
-static int pnfs_recall_layout(void *data)
+/* Send a synchronous LAYOUTRETURN.  By the time this is called, we know
+ * all IO has been drained, any matching lsegs deleted, and that no
+ * overlapping LAYOUTGETs will be sent or processed for the duration
+ * of this call.
+ * Note that it is possible that when this is called, the stateid has
+ * been invalidated.  But will not be cleared, so can still use.
+ */
+static int
+pnfs_send_layoutreturn(struct nfs_client *clp,
+		       struct pnfs_cb_lrecall_info *cb_info)
 {
-	struct inode *inode, *ino;
-	struct nfs_client *clp;
-	struct cb_layoutrecallargs rl;
+	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
 	struct nfs4_layoutreturn *lrp;
-	struct recall_layout_threadargs *args =
-		(struct recall_layout_threadargs *)data;
-	int status = 0;
-
-	daemonize("nfsv4-layoutreturn");
-
-	dprintk("%s: recall_type=%d fsid 0x%llx-0x%llx start\n",
-		__func__, args->rl->cbl_recall_type,
-		args->rl->cbl_fsid.major, args->rl->cbl_fsid.minor);
-
-	clp = args->clp;
-	inode = args->inode;
-	rl = *args->rl;
-
-	/* support whole file layouts only */
-	rl.cbl_range.offset = 0;
-	rl.cbl_range.length = NFS4_MAX_UINT64;
-
-	if (rl.cbl_recall_type == RETURN_FILE) {
-		if (pnfs_is_next_layout_stateid(NFS_I(inode)->layout,
-						rl.cbl_stateid))
-			status = pnfs_return_layout(inode, &rl.cbl_range,
-						    &rl.cbl_stateid, RETURN_FILE,
-						    false);
-		else
-			status = cpu_to_be32(NFS4ERR_DELAY);
-		if (status)
-			dprintk("%s RETURN_FILE error: %d\n", __func__, status);
-		else
-			status =  cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
-		args->result = status;
-		complete(&args->started);
-		goto out;
-	}
-
-	status = cpu_to_be32(NFS4_OK);
-	args->result = status;
-	complete(&args->started);
-	args = NULL;
-
-	/* IMPROVEME: This loop is inefficient, running in O(|s_inodes|^2) */
-	while ((ino = nfs_layoutrecall_find_inode(clp, &rl)) != NULL) {
-		/* FIXME: need to check status on pnfs_return_layout */
-		pnfs_return_layout(ino, &rl.cbl_range, NULL, RETURN_FILE, false);
-		iput(ino);
-	}
 
 	lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
-	if (!lrp) {
-		dprintk("%s: allocation failed. Cannot send last LAYOUTRETURN\n",
-			__func__);
-		goto out;
-	}
-
-	/* send final layoutreturn */
+	if (!lrp)
+		return -ENOMEM;
 	lrp->args.reclaim = 0;
-	lrp->args.layout_type = rl.cbl_layout_type;
-	lrp->args.return_type = rl.cbl_recall_type;
+	lrp->args.layout_type = args->cbl_layout_type;
+	lrp->args.return_type = args->cbl_recall_type;
 	lrp->clp = clp;
-	lrp->args.range = rl.cbl_range;
-	lrp->args.inode = inode;
-	nfs4_proc_layoutreturn(lrp, true);
-
-out:
-	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
-	nfs_put_client(clp);
-	module_put_and_exit(0);
-	dprintk("%s: exit status %d\n", __func__, 0);
-	return 0;
+	if (args->cbl_recall_type == RETURN_FILE) {
+		lrp->args.range = args->cbl_range;
+		lrp->args.inode = cb_info->pcl_ino;
+	} else {
+		lrp->args.range.iomode = IOMODE_ANY;
+		lrp->args.inode = NULL;
+	}
+	return nfs4_proc_layoutreturn(lrp, true);
 }
 
-/*
- * Asynchronous layout recall!
+/* Called by state manager to finish CB_LAYOUTRECALLS initiated by
+ * nfs4_callback_layoutrecall().
  */
-static int pnfs_async_return_layout(struct nfs_client *clp, struct inode *inode,
-				    struct cb_layoutrecallargs *rl)
+void nfs_client_return_layouts(struct nfs_client *clp)
 {
-	struct recall_layout_threadargs data = {
-		.clp = clp,
-		.inode = inode,
-		.rl = rl,
-	};
-	struct task_struct *t;
-	int status = -EAGAIN;
+	struct pnfs_cb_lrecall_info *cb_info;
 
-	dprintk("%s: -->\n", __func__);
+	spin_lock(&clp->cl_lock);
+	while (true) {
+		if (list_empty(&clp->cl_layoutrecalls)) {
+			spin_unlock(&clp->cl_lock);
+			break;
+		}
+		cb_info = list_first_entry(&clp->cl_layoutrecalls,
+					   struct pnfs_cb_lrecall_info,
+					   pcl_list);
+		spin_unlock(&clp->cl_lock);
+		if (atomic_read(&cb_info->pcl_count) != 0)
+			break;
+		/* What do on error return?  These layoutreturns are
+		 * required by the protocol.  So if do not get
+		 * successful reply, probably have to do something
+		 * more drastic.
+		 */
+		pnfs_send_layoutreturn(clp, cb_info);
+		spin_lock(&clp->cl_lock);
+		/* Removing from the list unblocks LAYOUTGETs */
+		list_del(&cb_info->pcl_list);
+		clp->cl_cb_lrecall_count--;
+		rpc_wake_up(&clp->cl_rpcwaitq_recall);
+		kfree(cb_info);
+	}
+}
 
-	/* FIXME: do not allow two concurrent layout recalls */
-	if (test_and_set_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state))
-		return status;
-
-	init_completion(&data.started);
-	__module_get(THIS_MODULE);
-	atomic_inc(&clp->cl_count);
-
-	t = kthread_run(pnfs_recall_layout, &data, "%s", "pnfs_recall_layout");
-	if (IS_ERR(t)) {
-		printk(KERN_INFO "NFS: Layout recall callback thread failed "
-			"for client (clientid %08x/%08x)\n",
-			(unsigned)(clp->cl_clientid >> 32),
-			(unsigned)(clp->cl_clientid));
-		status = PTR_ERR(t);
-		goto out_module_put;
+void notify_drained(struct pnfs_cb_lrecall_info *d)
+{
+	if (d && atomic_dec_and_test(&d->pcl_count)) {
+		set_bit(NFS4CLNT_LAYOUT_RECALL, &d->pcl_clp->cl_state);
+		nfs4_schedule_state_manager(d->pcl_clp);
 	}
-	wait_for_completion(&data.started);
-	return data.result;
-out_module_put:
-	nfs_put_client(clp);
-	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
-	module_put(THIS_MODULE);
-	return status;
 }
 
-static int pnfs_recall_all_layouts(struct nfs_client *clp)
+static int initiate_layout_draining(struct pnfs_cb_lrecall_info *cb_info)
 {
-	struct cb_layoutrecallargs rl;
-	struct inode *inode;
-	int status = 0;
-
-	rl.cbl_recall_type = RETURN_ALL;
-	rl.cbl_range.iomode = IOMODE_ANY;
-	rl.cbl_range.offset = 0;
-	rl.cbl_range.length = NFS4_MAX_UINT64;
-
-	/* we need the inode to get the nfs_server struct */
-	inode = nfs_layoutrecall_find_inode(clp, &rl);
-	if (!inode)
-		return status;
-	status = pnfs_async_return_layout(clp, inode, &rl);
-	iput(inode);
+	struct nfs_client *clp = cb_info->pcl_clp;
+	struct pnfs_layout_hdr *lo;
+	int rv = NFS4ERR_NOMATCHING_LAYOUT;
+	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
+
+	if (args->cbl_recall_type == RETURN_FILE) {
+		LIST_HEAD(free_me_list);
+
+		spin_lock(&clp->cl_lock);
+		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
+			if (nfs_compare_fh(&args->cbl_fh,
+					   &NFS_I(lo->inode)->fh))
+				continue;
+			if (test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags))
+				rv = NFS4ERR_DELAY;
+			else {
+				/* FIXME I need to better understand igrab and
+				 * does having a layout ref keep ino around?
+				 *  It should.
+				 */
+				/* We need to hold the reference until any
+				 * potential LAYOUTRETURN is finished.
+				 */
+				get_layout_hdr(lo);
+				cb_info->pcl_ino = lo->inode;
+				rv = NFS4_OK;
+			}
+			break;
+		}
+		spin_unlock(&clp->cl_lock);
+
+		spin_lock(&lo->inode->i_lock);
+		if (rv == NFS4_OK) {
+			lo->plh_block_lgets++;
+			nfs4_asynch_forget_layouts(lo, &args->cbl_range,
+						   cb_info, &free_me_list);
+		}
+		pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
+		spin_unlock(&lo->inode->i_lock);
+		pnfs_free_lseg_list(&free_me_list);
+	} else {
+		struct pnfs_layout_hdr *tmp;
+		LIST_HEAD(recall_list);
+		LIST_HEAD(free_me_list);
+		struct pnfs_layout_range range = {
+			.iomode = IOMODE_ANY,
+			.offset = 0,
+			.length = NFS4_MAX_UINT64,
+		};
+
+		spin_lock(&clp->cl_lock);
+		/* Per RFC 5661, 12.5.5.2.1.5, bulk recall must be serialized */
+		if (!list_is_singular(&clp->cl_layoutrecalls)) {
+			spin_unlock(&clp->cl_lock);
+			return NFS4ERR_DELAY;
+		}
+		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
+			if ((args->cbl_recall_type == RETURN_FSID) &&
+			    memcmp(&NFS_SERVER(lo->inode)->fsid,
+				   &args->cbl_fsid, sizeof(struct nfs_fsid)))
+				continue;
+			get_layout_hdr(lo);
+			/* We could list_del(&lo->layouts) here */
+			BUG_ON(!list_empty(&lo->plh_bulk_recall));
+			list_add(&lo->plh_bulk_recall, &recall_list);
+		}
+		spin_unlock(&clp->cl_lock);
+		list_for_each_entry_safe(lo, tmp,
+					 &recall_list, plh_bulk_recall) {
+			spin_lock(&lo->inode->i_lock);
+			set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
+			nfs4_asynch_forget_layouts(lo, &range, cb_info,
+						   &free_me_list);
+			list_del_init(&lo->plh_bulk_recall);
+			spin_unlock(&lo->inode->i_lock);
+			put_layout_hdr(lo->inode);
+			rv = NFS4_OK;
+		}
+		pnfs_free_lseg_list(&free_me_list);
+	}
+	return rv;
+}
+
+static u32 do_callback_layoutrecall(struct nfs_client *clp,
+				    struct cb_layoutrecallargs *args)
+{
+	struct pnfs_cb_lrecall_info *new;
+	u32 res;
+
+	dprintk("%s enter, type=%i\n", __func__, args->cbl_recall_type);
+	new = kmalloc(sizeof(*new), GFP_KERNEL);
+	if (!new) {
+		res = NFS4ERR_RESOURCE;
+		goto out;
+	}
+	memcpy(&new->pcl_args, args, sizeof(*args));
+	atomic_set(&new->pcl_count, 1);
+	new->pcl_clp = clp;
+	new->pcl_ino = NULL;
+	spin_lock(&clp->cl_lock);
+	if (clp->cl_cb_lrecall_count >= PNFS_MAX_CB_LRECALLS) {
+		kfree(new);
+		res = NFS4ERR_DELAY;
+		spin_unlock(&clp->cl_lock);
+		goto out;
+	}
+	clp->cl_cb_lrecall_count++;
+	/* Adding to the list will block conflicting LGET activity */
+	list_add_tail(&new->pcl_list, &clp->cl_layoutrecalls);
+	spin_unlock(&clp->cl_lock);
+	res = initiate_layout_draining(new);
+	if (res || atomic_dec_and_test(&new->pcl_count)) {
+		spin_lock(&clp->cl_lock);
+		list_del(&new->pcl_list);
+		clp->cl_cb_lrecall_count--;
+		rpc_wake_up(&clp->cl_rpcwaitq_recall);
+		spin_unlock(&clp->cl_lock);
+		if (res == NFS4_OK) {
+			if (args->cbl_recall_type == RETURN_FILE) {
+				struct pnfs_layout_hdr *lo;
+
+				lo = NFS_I(new->pcl_ino)->layout;
+				spin_lock(&lo->inode->i_lock);
+				lo->plh_block_lgets--;
+				if (!pnfs_layoutgets_blocked(lo, NULL))
+					rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
+				spin_unlock(&lo->inode->i_lock);
+				put_layout_hdr(new->pcl_ino);
+			}
+			res = NFS4ERR_NOMATCHING_LAYOUT;
+		}
+		kfree(new);
+	}
+out:
+	dprintk("%s returning %i\n", __func__, res);
+	return res;
 
-	return status;
 }
 
 __be32 nfs4_callback_layoutrecall(struct cb_layoutrecallargs *args,
 				  void *dummy, struct cb_process_state *cps)
 {
 	struct nfs_client *clp;
-	struct inode *inode = NULL;
-	__be32 res;
-	int status;
+	u32 res;
 
 	dprintk("%s: -->\n", __func__);
 
-	res = cpu_to_be32(NFS4ERR_OP_NOT_IN_SESSION);
 	if (cps->session) /* set in cb_sequence */
 		clp = cps->session->clp;
-	else
+	else {
+		res = NFS4ERR_OP_NOT_IN_SESSION;
 		goto out;
-
+	}
 	/* the callback must come from the MDS personality */
-	res = cpu_to_be32(NFS4ERR_NOTSUPP);
-	if (!(clp->cl_exchange_flags & EXCHGID4_FLAG_USE_PNFS_MDS))
+	if (!(clp->cl_exchange_flags & EXCHGID4_FLAG_USE_PNFS_MDS)) {
+		res = NFS4ERR_INVAL;
 		goto out;
-
-	res = cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
-	/*
-	 * In the _ALL or _FSID case, we need the inode to get
-	 * the nfs_server struct.
-	 */
-	inode = nfs_layoutrecall_find_inode(clp, args);
-	if (!inode)
-		goto out;
-	status = pnfs_async_return_layout(clp, inode, args);
-	if (status)
-		res = cpu_to_be32(NFS4ERR_DELAY);
-	iput(inode);
+	}
+	res = do_callback_layoutrecall(clp, args);
 out:
-	dprintk("%s: exit with status = %d\n", __func__, ntohl(res));
-	return res;
+	dprintk("%s: exit with status = %d\n", __func__, res);
+	return cpu_to_be32(res);
+}
+
+static void pnfs_recall_all_layouts(struct nfs_client *clp)
+{
+	struct cb_layoutrecallargs args;
+
+	/* Pretend we got a CB_LAYOUTRECALL(ALL) */
+	memset(&args, 0, sizeof(args));
+	args.cbl_recall_type = RETURN_ALL;
+	/* FIXME we ignore errors, what should we do? */
+	do_callback_layoutrecall(clp, &args);
 }
 
 int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation, const nfs4_stateid *stateid)
@@ -677,9 +698,7 @@ __be32 nfs4_callback_recallany(struct cb_recallanyargs *args, void *dummy,
 		flags |= FMODE_WRITE;
 	if (test_bit(RCA4_TYPE_MASK_FILE_LAYOUT, (const unsigned long *)
 		     &args->craa_type_mask))
-		if (pnfs_recall_all_layouts(clp) == -EAGAIN)
-			status = cpu_to_be32(NFS4ERR_DELAY);
-
+		pnfs_recall_all_layouts(clp);
 	if (flags)
 		nfs_expire_all_delegation_types(clp, flags);
 out:
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 3c8c841..dbf43e7 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -158,6 +158,9 @@ static struct nfs_client *nfs_alloc_client(const struct nfs_client_initdata *cl_
 		clp->cl_machine_cred = cred;
 #if defined(CONFIG_NFS_V4_1)
 	INIT_LIST_HEAD(&clp->cl_layouts);
+	INIT_LIST_HEAD(&clp->cl_layoutrecalls);
+	rpc_init_wait_queue(&clp->cl_rpcwaitq_recall,
+			    "NFS client CB_LAYOUTRECALLS");
 #endif
 	nfs_fscache_get_client_cookie(clp);
 
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 72f27cc..8727ade 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -1459,7 +1459,8 @@ static inline void nfs4_init_once(struct nfs_inode *nfsi)
 	nfsi->delegation = NULL;
 	nfsi->delegation_state = 0;
 	init_rwsem(&nfsi->rwsem);
-	rpc_init_wait_queue(&nfsi->lo_rpcwaitq, "pNFS Layout");
+	rpc_init_wait_queue(&nfsi->lo_rpcwaitq, "pNFS Layoutreturn");
+	rpc_init_wait_queue(&nfsi->lo_rpcwaitq_stateid, "pNFS Layoutstateid");
 	nfsi->layout = NULL;
 #endif
 }
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index be19e225..87b2b63 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -5346,42 +5346,58 @@ nfs4_layoutget_prepare(struct rpc_task *task, void *calldata)
 	struct inode *ino = lgp->args.inode;
 	struct nfs_inode *nfsi = NFS_I(ino);
 	struct nfs_server *server = NFS_SERVER(ino);
-	struct pnfs_layout_segment *lseg;
+	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
 
 	dprintk("--> %s\n", __func__);
+	spin_lock(&clp->cl_lock);
+	if (matches_outstanding_recall(ino, &lgp->args.range)) {
+		rpc_sleep_on(&clp->cl_rpcwaitq_recall, task, NULL);
+		spin_unlock(&clp->cl_lock);
+		return;
+	}
+	spin_unlock(&clp->cl_lock);
+	/* Note the is a race here, where a CB_LAYOUTRECALL can come in
+	 * right now covering the LAYOUTGET we are about to send.
+	 * However, that is not so catastrophic, and there seems
+	 * to be no way to prevent it completely.
+	 */
 	spin_lock(&ino->i_lock);
-	lseg = pnfs_has_layout(nfsi->layout, &lgp->args.range);
-	if (likely(!lseg)) {
+	if (pnfs_layoutgets_blocked(nfsi->layout, NULL)) {
+		rpc_sleep_on(&nfsi->lo_rpcwaitq_stateid, task, NULL);
 		spin_unlock(&ino->i_lock);
-		dprintk("%s: no lseg found, proceeding\n", __func__);
-		if (!nfs4_setup_sequence(server, NULL, &lgp->args.seq_args,
-					 &lgp->res.seq_res, 0, task))
-			rpc_call_start(task);
 		return;
 	}
-	if (!lseg->valid) {
+	/* This needs after but atomic with above check in order to properly
+	 * serialize openstateid LAYOUTGETs.
+	 */
+	nfsi->layout->plh_outstanding++;
+	spin_unlock(&ino->i_lock);
+
+	if (nfs4_setup_sequence(server, NULL, &lgp->args.seq_args,
+				&lgp->res.seq_res, 0, task)) {
+		spin_lock(&ino->i_lock);
+		nfsi->layout->plh_outstanding--;
 		spin_unlock(&ino->i_lock);
-		dprintk("%s: invalid lseg found, waiting\n", __func__);
-		rpc_sleep_on(&nfsi->lo_rpcwaitq, task, NULL);
 		return;
 	}
-	get_lseg(lseg);
-	*lgp->lsegpp = lseg;
-	spin_unlock(&ino->i_lock);
-	dprintk("%s: valid lseg found, no rpc required\n", __func__);
-	rpc_exit(task, NFS4_OK);
+	rpc_call_start(task);
 }
 
 static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_layoutget *lgp = calldata;
-	struct nfs_server *server = NFS_SERVER(lgp->args.inode);
+	struct inode *ino = lgp->args.inode;
 
 	dprintk("--> %s\n", __func__);
 
-	if (!nfs4_sequence_done(task, &lgp->res.seq_res))
+	if (!nfs4_sequence_done(task, &lgp->res.seq_res)) {
+		/* layout code relies on fact that in this case
+		 * code falls back to tk_action=call_start, but not
+		 * back to rpc_prepare_task, to keep plh_outstanding
+		 * correct.
+		 */
 		return;
-
+	}
 	switch (task->tk_status) {
 	case 0:
 		break;
@@ -5390,7 +5406,11 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
 		task->tk_status = -NFS4ERR_DELAY;
 		/* Fall through */
 	default:
-		if (nfs4_async_handle_error(task, server, NULL, NULL) == -EAGAIN) {
+		if (nfs4_async_handle_error(task, NFS_SERVER(ino),
+					    NULL, NULL) == -EAGAIN) {
+			spin_lock(&ino->i_lock);
+			NFS_I(ino)->layout->plh_outstanding--;
+			spin_unlock(&ino->i_lock);
 			rpc_restart_call_prepare(task);
 			return;
 		}
@@ -5448,13 +5468,20 @@ int nfs4_proc_layoutget(struct nfs4_layoutget *lgp)
 	if (IS_ERR(task))
 		return PTR_ERR(task);
 	status = nfs4_wait_for_completion_rpc_task(task);
-	if (status != 0)
-		goto out;
-	status = task->tk_status;
-	if (status != 0)
-		goto out;
-	status = pnfs_layout_process(lgp);
-out:
+	if (status == 0)
+		status = task->tk_status;
+	if (status == 0)
+		status = pnfs_layout_process(lgp);
+	else {
+		struct inode *ino = lgp->args.inode;
+		struct pnfs_layout_hdr *lo = NFS_I(ino)->layout;
+
+		spin_lock(&ino->i_lock);
+		lo->plh_outstanding--;
+		if (!pnfs_layoutgets_blocked(lo, NULL))
+			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
+		spin_unlock(&ino->i_lock);
+	}
 	rpc_put_task(task);
 	dprintk("<-- %s status=%d\n", __func__, status);
 	return status;
@@ -5598,9 +5625,9 @@ static void nfs4_layoutreturn_done(struct rpc_task *task, void *calldata)
 
 		spin_lock(&lo->inode->i_lock);
 		if (lrp->res.lrs_present)
-			pnfs_set_layout_stateid(lo, &lrp->res.stateid);
+			pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
 		else
-			pnfs_invalidate_layout_stateid(lo);
+			BUG_ON(!list_empty(&lo->segs));
 		spin_unlock(&lo->inode->i_lock);
 	}
 	dprintk("<-- %s\n", __func__);
@@ -5611,8 +5638,18 @@ static void nfs4_layoutreturn_release(void *calldata)
 	struct nfs4_layoutreturn *lrp = calldata;
 
 	dprintk("--> %s return_type %d\n", __func__, lrp->args.return_type);
-	if (lrp->args.return_type == RETURN_FILE)
-		put_layout_hdr(lrp->args.inode);
+	if (lrp->args.return_type == RETURN_FILE) {
+		struct inode *ino = lrp->args.inode;
+		struct pnfs_layout_hdr *lo = NFS_I(ino)->layout;
+
+		spin_lock(&ino->i_lock);
+		lo->plh_block_lgets--;
+		lo->plh_outstanding--;
+		if (!pnfs_layoutgets_blocked(lo, NULL))
+			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
+		spin_unlock(&ino->i_lock);
+		put_layout_hdr(ino);
+	}
 	kfree(calldata);
 	dprintk("<-- %s\n", __func__);
 }
@@ -5641,6 +5678,14 @@ int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool issync)
 	int status = 0;
 
 	dprintk("--> %s\n", __func__);
+	if (lrp->args.return_type == RETURN_FILE) {
+		struct pnfs_layout_hdr *lo = NFS_I(lrp->args.inode)->layout;
+		/* FIXME we should test for BULK here */
+		spin_lock(&lo->inode->i_lock);
+		BUG_ON(lo->plh_block_lgets == 0);
+		lo->plh_outstanding++;
+		spin_unlock(&lo->inode->i_lock);
+	}
 	task = rpc_run_task(&task_setup_data);
 	if (IS_ERR(task))
 		return PTR_ERR(task);
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 00632f6..ceb0d66 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -1560,6 +1560,10 @@ static void nfs4_state_manager(struct nfs_client *clp)
 			nfs_client_return_marked_delegations(clp);
 			continue;
 		}
+		if (test_and_clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state)) {
+			nfs_client_return_layouts(clp);
+			continue;
+		}
 		/* Recall session slots */
 		if (test_and_clear_bit(NFS4CLNT_RECALL_SLOT, &clp->cl_state)
 		   && nfs4_has_session(clp)) {
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 10a6f4a..5208ef7 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1827,13 +1827,14 @@ encode_getdeviceinfo(struct xdr_stream *xdr,
 	hdr->replen += decode_getdeviceinfo_maxsz;
 }
 
-static void
+static int
 encode_layoutget(struct xdr_stream *xdr,
 		      const struct nfs4_layoutget_args *args,
 		      struct compound_hdr *hdr)
 {
 	nfs4_stateid stateid;
 	__be32 *p;
+	int status;
 
 	p = reserve_space(xdr, 44 + NFS4_STATEID_SIZE);
 	*p++ = cpu_to_be32(OP_LAYOUTGET);
@@ -1843,8 +1844,11 @@ encode_layoutget(struct xdr_stream *xdr,
 	p = xdr_encode_hyper(p, args->range.offset);
 	p = xdr_encode_hyper(p, args->range.length);
 	p = xdr_encode_hyper(p, args->minlength);
-	pnfs_get_layout_stateid(&stateid, NFS_I(args->inode)->layout,
-				args->ctx->state);
+	status = pnfs_choose_layoutget_stateid(&stateid,
+					       NFS_I(args->inode)->layout,
+					       args->ctx->state);
+	if (status)
+		return status;
 	p = xdr_encode_opaque_fixed(p, &stateid.data, NFS4_STATEID_SIZE);
 	*p = cpu_to_be32(args->maxcount);
 
@@ -1857,6 +1861,7 @@ encode_layoutget(struct xdr_stream *xdr,
 		args->maxcount);
 	hdr->nops++;
 	hdr->replen += decode_layoutget_maxsz;
+	return 0;
 }
 
 static int
@@ -2781,12 +2786,15 @@ static int nfs4_xdr_enc_layoutget(struct rpc_rqst *req, uint32_t *p,
 	struct compound_hdr hdr = {
 		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
 	};
+	int status;
 
 	xdr_init_encode(&xdr, &req->rq_snd_buf, p);
 	encode_compound_hdr(&xdr, req, &hdr);
 	encode_sequence(&xdr, &args->seq_args, &hdr);
 	encode_putfh(&xdr, NFS_FH(args->inode), &hdr);
-	encode_layoutget(&xdr, args, &hdr);
+	status = encode_layoutget(&xdr, args, &hdr);
+	if (status)
+		return status;
 	encode_nops(&hdr);
 	return 0;
 }
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index ca8be8d..8d04cf2 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(pnfs_unregister_layoutdriver);
  */
 
 /* Need to hold i_lock if caller does not already hold reference */
-static void
+void
 get_layout_hdr(struct pnfs_layout_hdr *lo)
 {
 	atomic_inc(&lo->plh_refcount);
@@ -278,24 +278,29 @@ init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg)
 	smp_mb();
 	lseg->valid = true;
 	lseg->layout = lo;
+	lseg->drain_notification = NULL;
 }
 
 static void
 _put_lseg_common(struct pnfs_layout_segment *lseg)
 {
+	struct inode *ino = lseg->layout->inode;
+
 	BUG_ON(lseg->valid == true);
 	list_del(&lseg->fi_list);
 	if (list_empty(&lseg->layout->segs)) {
 		struct nfs_client *clp;
 
-		clp = NFS_SERVER(lseg->layout->inode)->nfs_client;
+		clp = NFS_SERVER(ino)->nfs_client;
 		spin_lock(&clp->cl_lock);
 		/* List does not take a reference, so no need for put here */
 		list_del_init(&lseg->layout->layouts);
 		spin_unlock(&clp->cl_lock);
-		pnfs_invalidate_layout_stateid(lseg->layout);
+		clear_bit(NFS_LAYOUT_BULK_RECALL, &lseg->layout->plh_flags);
+		if (!pnfs_layoutgets_blocked(lseg->layout, NULL))
+			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
 	}
-	rpc_wake_up(&NFS_I(lseg->layout->inode)->lo_rpcwaitq);
+	rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq);
 }
 
 /* The use of tmp_list is necessary because pnfs_curr_ld->free_lseg
@@ -325,9 +330,12 @@ put_lseg(struct pnfs_layout_segment *lseg)
 		atomic_read(&lseg->pls_refcount), lseg->valid);
 	ino = lseg->layout->inode;
 	if (atomic_dec_and_lock(&lseg->pls_refcount, &ino->i_lock)) {
+		struct pnfs_cb_lrecall_info *drain_info = lseg->drain_notification;
+
 		_put_lseg_common(lseg);
 		spin_unlock(&ino->i_lock);
 		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
+		notify_drained(drain_info);
 		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
 		put_layout_hdr(ino);
 	}
@@ -345,7 +353,7 @@ EXPORT_SYMBOL_GPL(put_lseg);
  * READ		READ	true
  * READ		RW	false
  */
-static int
+bool
 should_free_lseg(struct pnfs_layout_range *lseg_range,
 		 struct pnfs_layout_range *recall_range)
 {
@@ -388,16 +396,19 @@ pnfs_clear_lseg_list(struct pnfs_layout_hdr *lo, struct list_head *tmp_list,
 	dprintk("%s:Return\n", __func__);
 }
 
-static void
+void
 pnfs_free_lseg_list(struct list_head *free_me)
 {
 	struct pnfs_layout_segment *lseg, *tmp;
 	struct inode *ino;
+	struct pnfs_cb_lrecall_info *drain_info;
 
 	list_for_each_entry_safe(lseg, tmp, free_me, fi_list) {
 		BUG_ON(atomic_read(&lseg->pls_refcount) != 0);
 		ino = lseg->layout->inode;
+		drain_info = lseg->drain_notification;
 		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
+		notify_drained(drain_info);
 		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
 		put_layout_hdr(ino);
 	}
@@ -453,31 +464,32 @@ pnfs_destroy_all_layouts(struct nfs_client *clp)
 	}
 }
 
-/* update lo->stateid with new if is more recent
- *
- * lo->stateid could be the open stateid, in which case we just use what given.
- */
+/* update lo->stateid with new if is more recent */
 void
-pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
-			const nfs4_stateid *new)
+pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
+			bool update_barrier)
 {
-	nfs4_stateid *old = &lo->stateid;
-	bool overwrite = false;
+	u32 oldseq, newseq;
 
 	assert_spin_locked(&lo->inode->i_lock);
-	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags) ||
-	    memcmp(old->stateid.other, new->stateid.other, sizeof(new->stateid.other)))
-		overwrite = true;
-	else {
-		u32 oldseq, newseq;
-
-		oldseq = be32_to_cpu(old->stateid.seqid);
-		newseq = be32_to_cpu(new->stateid.seqid);
-		if ((int)(newseq - oldseq) > 0)
-			overwrite = true;
+	oldseq = be32_to_cpu(lo->stateid.stateid.seqid);
+	newseq = be32_to_cpu(new->stateid.seqid);
+	if ((int)(newseq - oldseq) > 0) {
+		memcpy(&lo->stateid, &new->stateid, sizeof(new->stateid));
+		if (update_barrier)
+			lo->plh_barrier = be32_to_cpu(new->stateid.seqid);
+		else {
+			/* Because of wraparound, we want to keep the barrier
+			 * "close" to the current seqids.  It needs to be
+			 * within 2**31 to count as "behind", so if it
+			 * gets too near that limit, give us a litle leeway
+			 * and bring it to within 2**30.
+			 * NOTE - and yes, this is all unsigned arithmetic.
+			 */
+			if (unlikely((newseq - lo->plh_barrier) > (3 << 29)))
+				lo->plh_barrier = newseq - (1 << 30);
+		}
 	}
-	if (overwrite)
-		memcpy(&old->stateid, &new->stateid, sizeof(new->stateid));
 }
 
 /* Layoutreturn may use an invalid stateid, just copy what is there */
@@ -487,13 +499,21 @@ void pnfs_copy_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo)
 	memcpy(dst->data, lo->stateid.data, sizeof(lo->stateid.data));
 }
 
-void
-pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
-			struct nfs4_state *open_state)
+int
+pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
+			      struct nfs4_state *open_state)
 {
+	int status = 0;
+
 	dprintk("--> %s\n", __func__);
 	spin_lock(&lo->inode->i_lock);
-	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags)) {
+	if (lo->plh_block_lgets ||
+	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
+		/* We avoid -EAGAIN, as that has special meaning to
+		 * some callers.
+		 */
+		status = -NFS4ERR_LAYOUTTRYLATER;
+	} else if (list_empty(&lo->segs)) {
 		int seq;
 
 		do {
@@ -501,12 +521,11 @@ pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
 			memcpy(dst->data, open_state->stateid.data,
 			       sizeof(open_state->stateid.data));
 		} while (read_seqretry(&open_state->seqlock, seq));
-		set_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
 	} else
-		memcpy(dst->data, lo->stateid.data,
-		       sizeof(lo->stateid.data));
+		memcpy(dst->data, lo->stateid.data, sizeof(lo->stateid.data));
 	spin_unlock(&lo->inode->i_lock);
 	dprintk("<-- %s\n", __func__);
+	return status;
 }
 
 /*
@@ -573,6 +592,28 @@ has_layout_to_return(struct pnfs_layout_hdr *lo,
 	return out;
 }
 
+void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
+				struct pnfs_layout_range *range,
+				struct pnfs_cb_lrecall_info *drain_info,
+				struct list_head *tmp_list)
+{
+	struct pnfs_layout_segment *lseg, *tmp;
+
+	assert_spin_locked(&lo->inode->i_lock);
+	list_for_each_entry_safe(lseg, tmp, &lo->segs, fi_list)
+		if (should_free_lseg(&lseg->range, range)) {
+			/* FIXME - need to change to something like a
+			 * notification bitmap to remove the restriction
+			 * of only being able to process a single
+			 * CB_LAYOUTRECALL at a time.
+			 */
+			BUG_ON(lseg->drain_notification);
+			lseg->drain_notification = drain_info;
+			atomic_inc(&drain_info->pcl_count);
+			mark_lseg_invalid(lseg, tmp_list);
+		}
+}
+
 /* Return true if there is layout based io in progress in the given range.
  * Assumes range has already been marked invalid, and layout marked to
  * prevent any new lseg from being inserted.
@@ -661,6 +702,7 @@ _pnfs_return_layout(struct inode *ino, struct pnfs_layout_range *range,
 			goto out;
 		}
 
+		lo->plh_block_lgets++;
 		list_for_each_entry_safe(lseg, tmp, &lo->segs, fi_list)
 			if (should_free_lseg(&lseg->range, &arg))
 				mark_lseg_invalid(lseg, &tmp_list);
@@ -717,14 +759,6 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
 	dprintk("%s:Begin\n", __func__);
 
 	assert_spin_locked(&lo->inode->i_lock);
-	if (list_empty(&lo->segs)) {
-		struct nfs_client *clp = NFS_SERVER(lo->inode)->nfs_client;
-
-		spin_lock(&clp->cl_lock);
-		BUG_ON(!list_empty(&lo->layouts));
-		list_add_tail(&lo->layouts, &clp->cl_layouts);
-		spin_unlock(&clp->cl_lock);
-	}
 	list_for_each_entry(lp, &lo->segs, fi_list) {
 		if (cmp_layout(&lp->range, &lseg->range) > 0)
 			continue;
@@ -741,6 +775,9 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
 	}
 	if (!found) {
 		list_add_tail(&lseg->fi_list, &lo->segs);
+		if (list_is_singular(&lo->segs) &&
+		    !pnfs_layoutgets_blocked(lo, NULL))
+			rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
 		dprintk("%s: inserted lseg %p "
 			"iomode %d offset %llu length %llu at tail\n",
 			__func__, lseg, lseg->range.iomode,
@@ -762,6 +799,7 @@ alloc_init_layout_hdr(struct inode *ino)
 	atomic_set(&lo->plh_refcount, 1);
 	INIT_LIST_HEAD(&lo->layouts);
 	INIT_LIST_HEAD(&lo->segs);
+	INIT_LIST_HEAD(&lo->plh_bulk_recall);
 	lo->inode = ino;
 	return lo;
 }
@@ -849,6 +887,7 @@ pnfs_update_layout(struct inode *ino,
 		.length = NFS4_MAX_UINT64,
 	};
 	struct nfs_inode *nfsi = NFS_I(ino);
+	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg = NULL;
 
@@ -884,9 +923,28 @@ pnfs_update_layout(struct inode *ino,
 		goto out_unlock;
 
 	get_layout_hdr(lo); /* Matched in pnfs_layoutget_release */
+	if (list_empty(&lo->segs)) {
+		/* The lo must be on the clp list if there is any
+		 * chance of a CB_LAYOUTRECALL(FILE) coming in.
+		 */
+		spin_lock(&clp->cl_lock);
+		BUG_ON(!list_empty(&lo->layouts));
+		list_add_tail(&lo->layouts, &clp->cl_layouts);
+		spin_unlock(&clp->cl_lock);
+	}
 	spin_unlock(&ino->i_lock);
 
 	lseg = send_layoutget(lo, ctx, &arg);
+	if (!lseg) {
+		spin_lock(&ino->i_lock);
+		if (list_empty(&lo->segs)) {
+			spin_lock(&clp->cl_lock);
+			list_del_init(&lo->layouts);
+			spin_unlock(&clp->cl_lock);
+			clear_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
+		}
+		spin_unlock(&ino->i_lock);
+	}
 out:
 	dprintk("%s end, state 0x%lx lseg %p\n", __func__,
 		nfsi->layout->plh_flags, lseg);
@@ -896,6 +954,18 @@ out_unlock:
 	goto out;
 }
 
+bool
+pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid)
+{
+	assert_spin_locked(&lo->inode->i_lock);
+	if ((stateid) &&
+	    (int)(lo->plh_barrier - be32_to_cpu(stateid->stateid.seqid)) >= 0)
+		return true;
+	return lo->plh_block_lgets ||
+		test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
+		(list_empty(&lo->segs) && lo->plh_outstanding);
+}
+
 int
 pnfs_layout_process(struct nfs4_layoutget *lgp)
 {
@@ -903,6 +973,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	struct nfs4_layoutget_res *res = &lgp->res;
 	struct pnfs_layout_segment *lseg;
 	struct inode *ino = lo->inode;
+	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
 	int status = 0;
 
 	/* Inject layout blob into I/O device driver */
@@ -914,10 +985,25 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 			status = PTR_ERR(lseg);
 		dprintk("%s: Could not allocate layout: error %d\n",
 		       __func__, status);
+		spin_lock(&ino->i_lock);
 		goto out;
 	}
 
 	spin_lock(&ino->i_lock);
+	/* decrement needs to be done before call to pnfs_layoutget_blocked */
+	lo->plh_outstanding--;
+	spin_lock(&clp->cl_lock);
+	if (matches_outstanding_recall(ino, &res->range)) {
+		spin_unlock(&clp->cl_lock);
+		dprintk("%s forget reply due to recall\n", __func__);
+		goto out_forget_reply;
+	}
+	spin_unlock(&clp->cl_lock);
+
+	if (pnfs_layoutgets_blocked(lo, &res->stateid)) {
+		dprintk("%s forget reply due to state\n", __func__);
+		goto out_forget_reply;
+	}
 	init_lseg(lo, lseg);
 	lseg->range = res->range;
 	get_lseg(lseg);
@@ -933,10 +1019,19 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	}
 
 	/* Done processing layoutget. Set the layout stateid */
-	pnfs_set_layout_stateid(lo, &res->stateid);
-	spin_unlock(&ino->i_lock);
+	pnfs_set_layout_stateid(lo, &res->stateid, false);
 out:
+	if (!pnfs_layoutgets_blocked(lo, NULL))
+		rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
+	spin_unlock(&ino->i_lock);
 	return status;
+
+out_forget_reply:
+	spin_unlock(&ino->i_lock);
+	lseg->layout = lo;
+	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
+	spin_lock(&ino->i_lock);
+	goto out;
 }
 
 void
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index e631487..810714a 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -31,6 +31,7 @@
 #define FS_NFS_PNFS_H
 
 #include <linux/nfs_page.h>
+#include "callback.h" /* for cb_layoutrecallargs */
 
 struct pnfs_layout_segment {
 	struct list_head fi_list;
@@ -38,6 +39,7 @@ struct pnfs_layout_segment {
 	atomic_t pls_refcount;
 	bool valid;
 	struct pnfs_layout_hdr *layout;
+	struct pnfs_cb_lrecall_info *drain_notification;
 };
 
 enum pnfs_try_status {
@@ -52,7 +54,7 @@ enum pnfs_try_status {
 enum {
 	NFS_LAYOUT_RO_FAILED = 0,	/* get ro layout failed stop trying */
 	NFS_LAYOUT_RW_FAILED,		/* get rw layout failed stop trying */
-	NFS_LAYOUT_STATEID_SET,		/* have a valid layout stateid */
+	NFS_LAYOUT_BULK_RECALL,		/* bulk recall affecting layout */
 	NFS_LAYOUT_NEED_LCOMMIT,	/* LAYOUTCOMMIT needed */
 };
 
@@ -94,9 +96,13 @@ struct pnfs_layoutdriver_type {
 struct pnfs_layout_hdr {
 	atomic_t		plh_refcount;
 	struct list_head	layouts;   /* other client layouts */
+	struct list_head	plh_bulk_recall; /* clnt list of bulk recalls */
 	struct list_head	segs;      /* layout segments list */
 	int			roc_iomode;/* return on close iomode, 0=none */
 	nfs4_stateid		stateid;
+	unsigned long		plh_outstanding; /* number of RPCs out */
+	unsigned long		plh_block_lgets; /* block LAYOUTGET if >0 */
+	u32			plh_barrier; /* ignore lower seqids */
 	unsigned long		plh_flags;
 	struct rpc_cred		*cred;     /* layoutcommit credential */
 	/* DH: These vars keep track of the maximum write range
@@ -117,6 +123,14 @@ struct pnfs_device {
 	unsigned int  pglen;
 };
 
+struct pnfs_cb_lrecall_info {
+	struct list_head	pcl_list; /* hook into cl_layoutrecalls list */
+	atomic_t		pcl_count;
+	struct nfs_client	*pcl_clp;
+	struct inode		*pcl_ino;
+	struct cb_layoutrecallargs pcl_args;
+};
+
 /*
  * Device ID RCU cache. A device ID is unique per client ID and layout type.
  */
@@ -175,7 +189,10 @@ extern int nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data,
 extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool wait);
 
 /* pnfs.c */
+void get_layout_hdr(struct pnfs_layout_hdr *lo);
 void put_lseg(struct pnfs_layout_segment *lseg);
+bool should_free_lseg(struct pnfs_layout_range *lseg_range,
+		      struct pnfs_layout_range *recall_range);
 struct pnfs_layout_segment *
 pnfs_has_layout(struct pnfs_layout_hdr *lo, struct pnfs_layout_range *range);
 struct pnfs_layout_segment *
@@ -200,15 +217,25 @@ enum pnfs_try_status pnfs_try_to_commit(struct nfs_write_data *,
 void pnfs_pageio_init_read(struct nfs_pageio_descriptor *, struct inode *,
 			   struct nfs_open_context *, struct list_head *);
 void pnfs_pageio_init_write(struct nfs_pageio_descriptor *, struct inode *);
+bool pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid);
 int pnfs_layout_process(struct nfs4_layoutget *lgp);
+void pnfs_free_lseg_list(struct list_head *tmp_list);
 void pnfs_destroy_layout(struct nfs_inode *);
 void pnfs_destroy_all_layouts(struct nfs_client *);
 void put_layout_hdr(struct inode *inode);
 void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
-			     const nfs4_stateid *new);
+			     const nfs4_stateid *new,
+			     bool update_barrier);
 void pnfs_copy_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo);
-void pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
-			     struct nfs4_state *open_state);
+int pnfs_choose_layoutget_stateid(nfs4_stateid *dst,
+				  struct pnfs_layout_hdr *lo,
+				  struct nfs4_state *open_state);
+void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
+				struct pnfs_layout_range *range,
+				struct pnfs_cb_lrecall_info *drain_info,
+				struct list_head *tmp_list);
+/* FIXME - this should be in callback.h, but pnfs_cb_lrecall_info needs to be there too */
+extern void notify_drained(struct pnfs_cb_lrecall_info *d);
 
 static inline bool
 has_layout(struct nfs_inode *nfsi)
@@ -222,12 +249,6 @@ static inline int lo_fail_bit(u32 iomode)
 			 NFS_LAYOUT_RW_FAILED : NFS_LAYOUT_RO_FAILED;
 }
 
-static inline void pnfs_invalidate_layout_stateid(struct pnfs_layout_hdr *lo)
-{
-	assert_spin_locked(&lo->inode->i_lock);
-	clear_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
-}
-
 static inline void get_lseg(struct pnfs_layout_segment *lseg)
 {
 	atomic_inc(&lseg->pls_refcount);
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index d8bfa42..061d81a 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -191,6 +191,7 @@ struct nfs_inode {
 
 	/* pNFS layout information */
 	struct rpc_wait_queue lo_rpcwaitq;
+	struct rpc_wait_queue	lo_rpcwaitq_stateid;
 	struct pnfs_layout_hdr *layout;
 #endif /* CONFIG_NFS_V4*/
 #ifdef CONFIG_NFS_FSCACHE
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 3cae408..80dcc00 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -83,6 +83,10 @@ struct nfs_client {
 	u32			cl_exchange_flags;
 	struct nfs4_session	*cl_session; 	/* sharred session */
 	struct list_head	cl_layouts;
+	struct list_head	cl_layoutrecalls;
+	unsigned long		cl_cb_lrecall_count;
+#define PNFS_MAX_CB_LRECALLS (1)
+	struct rpc_wait_queue	cl_rpcwaitq_recall;
 	struct pnfs_deviceid_cache *cl_devid_cache; /* pNFS deviceid cache */
 #endif /* CONFIG_NFS_V4_1 */
 
-- 
1.7.2.1

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux