Re: [PATCH 16/22] pnfs-submit: rewrite of layout state handling and cb_layoutrecall

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, 2010-11-12 at 03:48 -0500, Fred Isaman wrote:
> Remove NFS_LAYOUT_STATEID_SET in favor of just checking list_empty(lo->segs).
> 
> LAYOUTGETs with openstateid are serialized.  Waiting on the condition
> (list_empty(lo->segs) && plh_outstanding>0) both drains outstanding RPCs once
> the stateid is invalidated and allows only a single LAYOUTGET(openstateid)
> through at a time.
> 
> Before sending a LAYOUTRETURN, plh_block_lgets is incremented.  It is
> decremented in the rpc_release function.  While set, LAYOUTGETs are
> paused in their rpc_prepare function, and any responses are
> forgotten.
> 
> Callbacks are handled by blocking any matching LAYOUTGETS while processing and
> initiating drain of IO.  A notification system is set up so that when
> all relevant IO is finished, the state manger thread is invoked, which
> synchronously sends the final matching LAYOUTRETURN before unblocking
> LAYOUTGETS.
> 
> Signed-off-by: Fred Isaman <iisaman@xxxxxxxxxx>
> ---
>  fs/nfs/callback.h         |    7 +
>  fs/nfs/callback_proc.c    |  466 +++++++++++++++++++++++----------------------
>  fs/nfs/client.c           |    3 +
>  fs/nfs/nfs4proc.c         |   81 ++++++--
>  fs/nfs/nfs4state.c        |    4 +
>  fs/nfs/nfs4xdr.c          |   16 ++-
>  fs/nfs/pnfs.c             |  177 +++++++++++++-----
>  fs/nfs/pnfs.h             |   41 +++-
>  include/linux/nfs_fs_sb.h |    4 +
>  9 files changed, 497 insertions(+), 302 deletions(-)
> 
> diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
> index cea58cc..4a9905b 100644
> --- a/fs/nfs/callback.h
> +++ b/fs/nfs/callback.h
> @@ -163,6 +163,9 @@ struct cb_layoutrecallargs {
>  extern unsigned nfs4_callback_layoutrecall(
>  	struct cb_layoutrecallargs *args,
>  	void *dummy, struct cb_process_state *cps);
> +extern bool matches_outstanding_recall(struct inode *ino,
> +				       struct pnfs_layout_range *range);
> +extern void nfs_client_return_layouts(struct nfs_client *clp);
>  
>  static inline void put_session_client(struct nfs4_session *session)
>  {
> @@ -178,6 +181,10 @@ find_client_from_cps(struct cb_process_state *cps, struct sockaddr *addr)
>  
>  #else
>  
> +static inline void nfs_client_return_layouts(struct nfs_client *clp)
> +{
> +}
> +
>  static inline struct nfs_client *
>  find_client_from_cps(struct cb_process_state *cps, struct sockaddr *addr)
>  {
> diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
> index 6e0fc40..af405cf 100644
> --- a/fs/nfs/callback_proc.c
> +++ b/fs/nfs/callback_proc.c
> @@ -124,265 +124,283 @@ int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation, const nf
>  #if defined(CONFIG_NFS_V4_1)
>  
>  static bool
> -pnfs_is_next_layout_stateid(const struct pnfs_layout_hdr *lo,
> -			    const nfs4_stateid stateid)
> +_recall_matches_lget(struct pnfs_cb_lrecall_info *cb_info,
> +		     struct inode *ino, struct pnfs_layout_range *range)
>  {
> -	bool res;
> -	u32 oldseqid, newseqid;
> -
> -	spin_lock(&lo->inode->i_lock);
> -	{
> -		oldseqid = be32_to_cpu(lo->stateid.stateid.seqid);
> -		newseqid = be32_to_cpu(stateid.stateid.seqid);
> -		res = !memcmp(lo->stateid.stateid.other,
> -			      stateid.stateid.other,
> -			      NFS4_STATEID_OTHER_SIZE);
> -		if (res) { /* comparing layout stateids */
> -			if (oldseqid == ~0)
> -				res = (newseqid == 1);
> -			else
> -				res = (newseqid == oldseqid + 1);
> -		} else { /* open stateid */
> -			res = !memcmp(lo->stateid.data,
> -				      &zero_stateid,
> -				      NFS4_STATEID_SIZE);
> -			if (res)
> -				res = (newseqid == 1);
> -		}
> -	}
> -	spin_unlock(&lo->inode->i_lock);
> +	struct cb_layoutrecallargs *cb_args = &cb_info->pcl_args;
>  
> -	return res;
> +	switch (cb_args->cbl_recall_type) {
> +	case RETURN_ALL:
> +		return true;
> +	case RETURN_FSID:
> +		return !memcmp(&NFS_SERVER(ino)->fsid, &cb_args->cbl_fsid,
> +			       sizeof(struct nfs_fsid));
> +	case RETURN_FILE:
> +		return (ino == cb_info->pcl_ino) &&
> +			should_free_lseg(range, &cb_args->cbl_range);
> +	default:
> +		BUG();

Why should we BUG() just because the server is screwed up? That's not a
client bug.

> +	}
>  }
>  
> -/*
> - * Retrieve an inode based on layout recall parameters
> - *
> - * Note: caller must iput(inode) to dereference the inode.
> - */
> -static struct inode *
> -nfs_layoutrecall_find_inode(struct nfs_client *clp,
> -			    const struct cb_layoutrecallargs *args)
> +bool
> +matches_outstanding_recall(struct inode *ino, struct pnfs_layout_range *range)
>  {
> -	struct nfs_inode *nfsi;
> -	struct pnfs_layout_hdr *lo;
> -	struct nfs_server *server;
> -	struct inode *ino = NULL;
> -
> -	dprintk("%s: Begin recall_type=%d clp %p\n",
> -		__func__, args->cbl_recall_type, clp);
> -
> -	spin_lock(&clp->cl_lock);
> -	list_for_each_entry(lo, &clp->cl_layouts, layouts) {
> -		nfsi = NFS_I(lo->inode);
> -		if (!nfsi)
> -			continue;
> -
> -		dprintk("%s: Searching inode=%lu\n",
> -			__func__, nfsi->vfs_inode.i_ino);
> -
> -		if (args->cbl_recall_type == RETURN_FILE) {
> -		    if (nfs_compare_fh(&args->cbl_fh, &nfsi->fh))
> -			continue;
> -		} else if (args->cbl_recall_type == RETURN_FSID) {
> -			server = NFS_SERVER(&nfsi->vfs_inode);
> -			if (server->fsid.major != args->cbl_fsid.major ||
> -			    server->fsid.minor != args->cbl_fsid.minor)
> -				continue;
> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
> +	struct pnfs_cb_lrecall_info *cb_info;
> +	bool rv = false;
> +
> +	assert_spin_locked(&clp->cl_lock);

Can we please go easy on the asserts? There is way too much asserting
going on in the NFSv4.1 code. This isn't a publicly visible interface,
so just get it right in the debugging process before the merge, and then
kill these asserts...

> +	list_for_each_entry(cb_info, &clp->cl_layoutrecalls, pcl_list) {
> +		if (_recall_matches_lget(cb_info, ino, range)) {
> +			rv = true;
> +			break;
>  		}
> -
> -		/* Make sure client didn't clean up layout without
> -		 * telling the server */
> -		if (!has_layout(nfsi))
> -			continue;
> -
> -		ino = igrab(&nfsi->vfs_inode);
> -		dprintk("%s: Found inode=%p\n", __func__, ino);
> -		break;
>  	}
> -	spin_unlock(&clp->cl_lock);
> -	return ino;
> +	return rv;
>  }
>  
> -struct recall_layout_threadargs {
> -	struct inode *inode;
> -	struct nfs_client *clp;
> -	struct completion started;
> -	struct cb_layoutrecallargs *rl;
> -	int result;
> -};
> -
> -static int pnfs_recall_layout(void *data)
> +/* Send a synchronous LAYOUTRETURN.  By the time this is called, we know
> + * all IO has been drained, any matching lsegs deleted, and that no
> + * overlapping LAYOUTGETs will be sent or processed for the duration
> + * of this call.
> + * Note that it is possible that when this is called, the stateid has
> + * been invalidated.  But will not be cleared, so can still use.
> + */
> +static int
> +pnfs_send_layoutreturn(struct nfs_client *clp,
> +		       struct pnfs_cb_lrecall_info *cb_info)
>  {
> -	struct inode *inode, *ino;
> -	struct nfs_client *clp;
> -	struct cb_layoutrecallargs rl;
> +	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
>  	struct nfs4_layoutreturn *lrp;
> -	struct recall_layout_threadargs *args =
> -		(struct recall_layout_threadargs *)data;
> -	int status = 0;
> -
> -	daemonize("nfsv4-layoutreturn");
> -
> -	dprintk("%s: recall_type=%d fsid 0x%llx-0x%llx start\n",
> -		__func__, args->rl->cbl_recall_type,
> -		args->rl->cbl_fsid.major, args->rl->cbl_fsid.minor);
> -
> -	clp = args->clp;
> -	inode = args->inode;
> -	rl = *args->rl;
> -
> -	/* support whole file layouts only */
> -	rl.cbl_range.offset = 0;
> -	rl.cbl_range.length = NFS4_MAX_UINT64;
> -
> -	if (rl.cbl_recall_type == RETURN_FILE) {
> -		if (pnfs_is_next_layout_stateid(NFS_I(inode)->layout,
> -						rl.cbl_stateid))
> -			status = pnfs_return_layout(inode, &rl.cbl_range,
> -						    &rl.cbl_stateid, RETURN_FILE,
> -						    false);
> -		else
> -			status = cpu_to_be32(NFS4ERR_DELAY);
> -		if (status)
> -			dprintk("%s RETURN_FILE error: %d\n", __func__, status);
> -		else
> -			status =  cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
> -		args->result = status;
> -		complete(&args->started);
> -		goto out;
> -	}
> -
> -	status = cpu_to_be32(NFS4_OK);
> -	args->result = status;
> -	complete(&args->started);
> -	args = NULL;
> -
> -	/* IMPROVEME: This loop is inefficient, running in O(|s_inodes|^2) */
> -	while ((ino = nfs_layoutrecall_find_inode(clp, &rl)) != NULL) {
> -		/* FIXME: need to check status on pnfs_return_layout */
> -		pnfs_return_layout(ino, &rl.cbl_range, NULL, RETURN_FILE, false);
> -		iput(ino);
> -	}
>  
>  	lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
> -	if (!lrp) {
> -		dprintk("%s: allocation failed. Cannot send last LAYOUTRETURN\n",
> -			__func__);
> -		goto out;
> -	}
> -
> -	/* send final layoutreturn */
> +	if (!lrp)
> +		return -ENOMEM;
>  	lrp->args.reclaim = 0;
> -	lrp->args.layout_type = rl.cbl_layout_type;
> -	lrp->args.return_type = rl.cbl_recall_type;
> +	lrp->args.layout_type = args->cbl_layout_type;
> +	lrp->args.return_type = args->cbl_recall_type;
>  	lrp->clp = clp;
> -	lrp->args.range = rl.cbl_range;
> -	lrp->args.inode = inode;
> -	nfs4_proc_layoutreturn(lrp, true);
> -
> -out:
> -	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
> -	nfs_put_client(clp);
> -	module_put_and_exit(0);
> -	dprintk("%s: exit status %d\n", __func__, 0);
> -	return 0;
> +	if (args->cbl_recall_type == RETURN_FILE) {
> +		lrp->args.range = args->cbl_range;
> +		lrp->args.inode = cb_info->pcl_ino;
> +	} else {
> +		lrp->args.range.iomode = IOMODE_ANY;
> +		lrp->args.inode = NULL;
> +	}
> +	return nfs4_proc_layoutreturn(lrp, true);
>  }
>  
> -/*
> - * Asynchronous layout recall!
> +/* Called by state manager to finish CB_LAYOUTRECALLS initiated by
> + * nfs4_callback_layoutrecall().
>   */
> -static int pnfs_async_return_layout(struct nfs_client *clp, struct inode *inode,
> -				    struct cb_layoutrecallargs *rl)
> +void nfs_client_return_layouts(struct nfs_client *clp)
>  {
> -	struct recall_layout_threadargs data = {
> -		.clp = clp,
> -		.inode = inode,
> -		.rl = rl,
> -	};
> -	struct task_struct *t;
> -	int status = -EAGAIN;
> +	struct pnfs_cb_lrecall_info *cb_info;
>  
> -	dprintk("%s: -->\n", __func__);
> +	spin_lock(&clp->cl_lock);
> +	while (true) {
> +		if (list_empty(&clp->cl_layoutrecalls)) {
> +			spin_unlock(&clp->cl_lock);
> +			break;
> +		}
> +		cb_info = list_first_entry(&clp->cl_layoutrecalls,
> +					   struct pnfs_cb_lrecall_info,
> +					   pcl_list);
> +		spin_unlock(&clp->cl_lock);
> +		if (atomic_read(&cb_info->pcl_count) != 0)
> +			break;
> +		/* What do on error return?  These layoutreturns are
> +		 * required by the protocol.  So if do not get
> +		 * successful reply, probably have to do something
> +		 * more drastic.
> +		 */
> +		pnfs_send_layoutreturn(clp, cb_info);
> +		spin_lock(&clp->cl_lock);
> +		/* Removing from the list unblocks LAYOUTGETs */
> +		list_del(&cb_info->pcl_list);
> +		clp->cl_cb_lrecall_count--;
> +		rpc_wake_up(&clp->cl_rpcwaitq_recall);
> +		kfree(cb_info);
> +	}
> +}
>  
> -	/* FIXME: do not allow two concurrent layout recalls */
> -	if (test_and_set_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state))
> -		return status;
> -
> -	init_completion(&data.started);
> -	__module_get(THIS_MODULE);
> -	atomic_inc(&clp->cl_count);
> -
> -	t = kthread_run(pnfs_recall_layout, &data, "%s", "pnfs_recall_layout");
> -	if (IS_ERR(t)) {
> -		printk(KERN_INFO "NFS: Layout recall callback thread failed "
> -			"for client (clientid %08x/%08x)\n",
> -			(unsigned)(clp->cl_clientid >> 32),
> -			(unsigned)(clp->cl_clientid));
> -		status = PTR_ERR(t);
> -		goto out_module_put;
> +void notify_drained(struct pnfs_cb_lrecall_info *d)
> +{
> +	if (d && atomic_dec_and_test(&d->pcl_count)) {
> +		set_bit(NFS4CLNT_LAYOUT_RECALL, &d->pcl_clp->cl_state);
> +		nfs4_schedule_state_manager(d->pcl_clp);
>  	}
> -	wait_for_completion(&data.started);
> -	return data.result;
> -out_module_put:
> -	nfs_put_client(clp);
> -	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
> -	module_put(THIS_MODULE);
> -	return status;
>  }
>  
> -static int pnfs_recall_all_layouts(struct nfs_client *clp)
> +static int initiate_layout_draining(struct pnfs_cb_lrecall_info *cb_info)
>  {
> -	struct cb_layoutrecallargs rl;
> -	struct inode *inode;
> -	int status = 0;
> -
> -	rl.cbl_recall_type = RETURN_ALL;
> -	rl.cbl_range.iomode = IOMODE_ANY;
> -	rl.cbl_range.offset = 0;
> -	rl.cbl_range.length = NFS4_MAX_UINT64;
> -
> -	/* we need the inode to get the nfs_server struct */
> -	inode = nfs_layoutrecall_find_inode(clp, &rl);
> -	if (!inode)
> -		return status;
> -	status = pnfs_async_return_layout(clp, inode, &rl);
> -	iput(inode);
> +	struct nfs_client *clp = cb_info->pcl_clp;
> +	struct pnfs_layout_hdr *lo;
> +	int rv = NFS4ERR_NOMATCHING_LAYOUT;
> +	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
> +
> +	if (args->cbl_recall_type == RETURN_FILE) {
> +		LIST_HEAD(free_me_list);
> +
> +		spin_lock(&clp->cl_lock);
> +		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
> +			if (nfs_compare_fh(&args->cbl_fh,
> +					   &NFS_I(lo->inode)->fh))
> +				continue;
> +			if (test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags))
> +				rv = NFS4ERR_DELAY;
> +			else {
> +				/* FIXME I need to better understand igrab and
> +				 * does having a layout ref keep ino around?
> +				 *  It should.
> +				 */
> +				/* We need to hold the reference until any
> +				 * potential LAYOUTRETURN is finished.
> +				 */
> +				get_layout_hdr(lo);
> +				cb_info->pcl_ino = lo->inode;
> +				rv = NFS4_OK;
> +			}
> +			break;
> +		}
> +		spin_unlock(&clp->cl_lock);
> +
> +		spin_lock(&lo->inode->i_lock);
> +		if (rv == NFS4_OK) {
> +			lo->plh_block_lgets++;
> +			nfs4_asynch_forget_layouts(lo, &args->cbl_range,
> +						   cb_info, &free_me_list);
> +		}
> +		pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
> +		spin_unlock(&lo->inode->i_lock);
> +		pnfs_free_lseg_list(&free_me_list);
> +	} else {
> +		struct pnfs_layout_hdr *tmp;
> +		LIST_HEAD(recall_list);
> +		LIST_HEAD(free_me_list);
> +		struct pnfs_layout_range range = {
> +			.iomode = IOMODE_ANY,
> +			.offset = 0,
> +			.length = NFS4_MAX_UINT64,
> +		};
> +
> +		spin_lock(&clp->cl_lock);
> +		/* Per RFC 5661, 12.5.5.2.1.5, bulk recall must be serialized */
> +		if (!list_is_singular(&clp->cl_layoutrecalls)) {
> +			spin_unlock(&clp->cl_lock);
> +			return NFS4ERR_DELAY;
> +		}
> +		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
> +			if ((args->cbl_recall_type == RETURN_FSID) &&
> +			    memcmp(&NFS_SERVER(lo->inode)->fsid,
> +				   &args->cbl_fsid, sizeof(struct nfs_fsid)))
> +				continue;
> +			get_layout_hdr(lo);
> +			/* We could list_del(&lo->layouts) here */
> +			BUG_ON(!list_empty(&lo->plh_bulk_recall));
> +			list_add(&lo->plh_bulk_recall, &recall_list);
> +		}
> +		spin_unlock(&clp->cl_lock);
> +		list_for_each_entry_safe(lo, tmp,
> +					 &recall_list, plh_bulk_recall) {
> +			spin_lock(&lo->inode->i_lock);
> +			set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
> +			nfs4_asynch_forget_layouts(lo, &range, cb_info,
> +						   &free_me_list);
> +			list_del_init(&lo->plh_bulk_recall);
> +			spin_unlock(&lo->inode->i_lock);
> +			put_layout_hdr(lo->inode);
> +			rv = NFS4_OK;
> +		}
> +		pnfs_free_lseg_list(&free_me_list);
> +	}
> +	return rv;
> +}
> +
> +static u32 do_callback_layoutrecall(struct nfs_client *clp,
> +				    struct cb_layoutrecallargs *args)
> +{
> +	struct pnfs_cb_lrecall_info *new;
> +	u32 res;
> +
> +	dprintk("%s enter, type=%i\n", __func__, args->cbl_recall_type);
> +	new = kmalloc(sizeof(*new), GFP_KERNEL);
> +	if (!new) {
> +		res = NFS4ERR_RESOURCE;
> +		goto out;
> +	}
> +	memcpy(&new->pcl_args, args, sizeof(*args));
> +	atomic_set(&new->pcl_count, 1);
> +	new->pcl_clp = clp;
> +	new->pcl_ino = NULL;
> +	spin_lock(&clp->cl_lock);
> +	if (clp->cl_cb_lrecall_count >= PNFS_MAX_CB_LRECALLS) {
> +		kfree(new);
> +		res = NFS4ERR_DELAY;
> +		spin_unlock(&clp->cl_lock);
> +		goto out;
> +	}
> +	clp->cl_cb_lrecall_count++;
> +	/* Adding to the list will block conflicting LGET activity */
> +	list_add_tail(&new->pcl_list, &clp->cl_layoutrecalls);
> +	spin_unlock(&clp->cl_lock);
> +	res = initiate_layout_draining(new);
> +	if (res || atomic_dec_and_test(&new->pcl_count)) {
> +		spin_lock(&clp->cl_lock);
> +		list_del(&new->pcl_list);
> +		clp->cl_cb_lrecall_count--;
> +		rpc_wake_up(&clp->cl_rpcwaitq_recall);
> +		spin_unlock(&clp->cl_lock);
> +		if (res == NFS4_OK) {
> +			if (args->cbl_recall_type == RETURN_FILE) {
> +				struct pnfs_layout_hdr *lo;
> +
> +				lo = NFS_I(new->pcl_ino)->layout;
> +				spin_lock(&lo->inode->i_lock);
> +				lo->plh_block_lgets--;
> +				if (!pnfs_layoutgets_blocked(lo, NULL))
> +					rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
> +				spin_unlock(&lo->inode->i_lock);
> +				put_layout_hdr(new->pcl_ino);
> +			}
> +			res = NFS4ERR_NOMATCHING_LAYOUT;
> +		}
> +		kfree(new);
> +	}
> +out:
> +	dprintk("%s returning %i\n", __func__, res);
> +	return res;
>  
> -	return status;
>  }
>  
>  __be32 nfs4_callback_layoutrecall(struct cb_layoutrecallargs *args,
>  				  void *dummy, struct cb_process_state *cps)
>  {
>  	struct nfs_client *clp;
> -	struct inode *inode = NULL;
> -	__be32 res;
> -	int status;
> +	u32 res;
>  
>  	dprintk("%s: -->\n", __func__);
>  
> -	res = cpu_to_be32(NFS4ERR_OP_NOT_IN_SESSION);
> -	if (cps->session) /* set in cb_sequence */
> +	if (cps->session) { /* set in cb_sequence */
>  		clp = cps->session->clp;
> -	else
> -		goto out;
> +		res = do_callback_layoutrecall(clp, args);
> +	} else
> +		res = NFS4ERR_OP_NOT_IN_SESSION;
>  
> -	res = cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
> -	/*
> -	 * In the _ALL or _FSID case, we need the inode to get
> -	 * the nfs_server struct.
> -	 */
> -	inode = nfs_layoutrecall_find_inode(clp, args);
> -	if (!inode)
> -		goto out;
> -	status = pnfs_async_return_layout(clp, inode, args);
> -	if (status)
> -		res = cpu_to_be32(NFS4ERR_DELAY);
> -	iput(inode);
> -out:
> -	dprintk("%s: exit with status = %d\n", __func__, ntohl(res));
> -	return res;
> +	dprintk("%s: exit with status = %d\n", __func__, res);
> +	return cpu_to_be32(res);
> +}
> +
> +static void pnfs_recall_all_layouts(struct nfs_client *clp)
> +{
> +	struct cb_layoutrecallargs args;
> +
> +	/* Pretend we got a CB_LAYOUTRECALL(ALL) */
> +	memset(&args, 0, sizeof(args));
> +	args.cbl_recall_type = RETURN_ALL;
> +	/* FIXME we ignore errors, what should we do? */

We're a forgetful client: we don't care...

> +	do_callback_layoutrecall(clp, &args);
>  }



>  
>  int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation, const nfs4_stateid *stateid)
> @@ -665,9 +683,7 @@ __be32 nfs4_callback_recallany(struct cb_recallanyargs *args, void *dummy,
>  		flags |= FMODE_WRITE;
>  	if (test_bit(RCA4_TYPE_MASK_FILE_LAYOUT, (const unsigned long *)
>  		     &args->craa_type_mask))
> -		if (pnfs_recall_all_layouts(clp) == -EAGAIN)
> -			status = cpu_to_be32(NFS4ERR_DELAY);
> -
> +		pnfs_recall_all_layouts(clp);
>  	if (flags)
>  		nfs_expire_all_delegation_types(clp, flags);
>  out:
> diff --git a/fs/nfs/client.c b/fs/nfs/client.c
> index 3c8c841..dbf43e7 100644
> --- a/fs/nfs/client.c
> +++ b/fs/nfs/client.c
> @@ -158,6 +158,9 @@ static struct nfs_client *nfs_alloc_client(const struct nfs_client_initdata *cl_
>  		clp->cl_machine_cred = cred;
>  #if defined(CONFIG_NFS_V4_1)
>  	INIT_LIST_HEAD(&clp->cl_layouts);
> +	INIT_LIST_HEAD(&clp->cl_layoutrecalls);
> +	rpc_init_wait_queue(&clp->cl_rpcwaitq_recall,
> +			    "NFS client CB_LAYOUTRECALLS");
>  #endif
>  	nfs_fscache_get_client_cookie(clp);
>  
> diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
> index fe79872..6223c6a 100644
> --- a/fs/nfs/nfs4proc.c
> +++ b/fs/nfs/nfs4proc.c
> @@ -5346,31 +5346,58 @@ nfs4_layoutget_prepare(struct rpc_task *task, void *calldata)
>  	struct inode *ino = lgp->args.inode;
>  	struct nfs_inode *nfsi = NFS_I(ino);
>  	struct nfs_server *server = NFS_SERVER(ino);
> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>  
>  	dprintk("--> %s\n", __func__);
> +	spin_lock(&clp->cl_lock);
> +	if (matches_outstanding_recall(ino, &lgp->args.range)) {
> +		rpc_sleep_on(&clp->cl_rpcwaitq_recall, task, NULL);
> +		spin_unlock(&clp->cl_lock);
> +		return;
> +	}
> +	spin_unlock(&clp->cl_lock);
> +	/* Note the is a race here, where a CB_LAYOUTRECALL can come in
> +	 * right now covering the LAYOUTGET we are about to send.
> +	 * However, that is not so catastrophic, and there seems
> +	 * to be no way to prevent it completely.
> +	 */
>  	spin_lock(&ino->i_lock);
> -	if (pnfs_layoutgets_blocked(nfsi->layout)) {
> +	if (pnfs_layoutgets_blocked(nfsi->layout, NULL)) {
>  		rpc_sleep_on(&nfsi->lo_rpcwaitq_stateid, task, NULL);
>  		spin_unlock(&ino->i_lock);
>  		return;
>  	}
> +	/* This needs after above check but atomic with it in order to properly
> +	 * serialize openstateid LAYOUTGETs.
> +	 */
> +	nfsi->layout->plh_outstanding++;
>  	spin_unlock(&ino->i_lock);
> +
>  	if (nfs4_setup_sequence(server, NULL, &lgp->args.seq_args,
> -				&lgp->res.seq_res, 0, task))
> +				&lgp->res.seq_res, 0, task)) {
> +		spin_lock(&ino->i_lock);
> +		nfsi->layout->plh_outstanding--;
> +		spin_unlock(&ino->i_lock);
>  		return;
> +	}
>  	rpc_call_start(task);
>  }
>  
>  static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
>  {
>  	struct nfs4_layoutget *lgp = calldata;
> -	struct nfs_server *server = NFS_SERVER(lgp->args.inode);
> +	struct inode *ino = lgp->args.inode;
>  
>  	dprintk("--> %s\n", __func__);
>  
> -	if (!nfs4_sequence_done(task, &lgp->res.seq_res))
> +	if (!nfs4_sequence_done(task, &lgp->res.seq_res)) {
> +		/* layout code relies on fact that in this case
> +		 * code falls back to tk_action=call_start, but not
> +		 * back to rpc_prepare_task, to keep plh_outstanding
> +		 * correct.
> +		 */
>  		return;
> -
> +	}
>  	switch (task->tk_status) {
>  	case 0:
>  		break;
> @@ -5379,7 +5406,11 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
>  		task->tk_status = -NFS4ERR_DELAY;
>  		/* Fall through */
>  	default:
> -		if (nfs4_async_handle_error(task, server, NULL, NULL) == -EAGAIN) {
> +		if (nfs4_async_handle_error(task, NFS_SERVER(ino),
> +					    NULL, NULL) == -EAGAIN) {
> +			spin_lock(&ino->i_lock);
> +			NFS_I(ino)->layout->plh_outstanding--;
> +			spin_unlock(&ino->i_lock);
>  			rpc_restart_call_prepare(task);
>  			return;
>  		}
> @@ -5437,13 +5468,20 @@ int nfs4_proc_layoutget(struct nfs4_layoutget *lgp)
>  	if (IS_ERR(task))
>  		return PTR_ERR(task);
>  	status = nfs4_wait_for_completion_rpc_task(task);
> -	if (status != 0)
> -		goto out;
> -	status = task->tk_status;
> -	if (status != 0)
> -		goto out;
> -	status = pnfs_layout_process(lgp);
> -out:
> +	if (status == 0)
> +		status = task->tk_status;
> +	if (status == 0)
> +		status = pnfs_layout_process(lgp);
> +	else {
> +		struct inode *ino = lgp->args.inode;
> +		struct pnfs_layout_hdr *lo = NFS_I(ino)->layout;
> +
> +		spin_lock(&ino->i_lock);
> +		lo->plh_outstanding--;
> +		if (!pnfs_layoutgets_blocked(lo, NULL))
> +			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
> +		spin_unlock(&ino->i_lock);
> +	}
>  	rpc_put_task(task);
>  	dprintk("<-- %s status=%d\n", __func__, status);
>  	return status;
> @@ -5587,9 +5625,9 @@ static void nfs4_layoutreturn_done(struct rpc_task *task, void *calldata)
>  
>  		spin_lock(&lo->inode->i_lock);
>  		if (lrp->res.lrs_present)
> -			pnfs_set_layout_stateid(lo, &lrp->res.stateid);
> +			pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
>  		else
> -			pnfs_invalidate_layout_stateid(lo);
> +			BUG_ON(!list_empty(&lo->segs));
>  		spin_unlock(&lo->inode->i_lock);
>  	}
>  	dprintk("<-- %s\n", __func__);
> @@ -5606,10 +5644,11 @@ static void nfs4_layoutreturn_release(void *calldata)
>  
>  		spin_lock(&ino->i_lock);
>  		lo->plh_block_lgets--;
> -		if (!pnfs_layoutgets_blocked(lo))
> +		lo->plh_outstanding--;
> +		if (!pnfs_layoutgets_blocked(lo, NULL))
>  			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>  		spin_unlock(&ino->i_lock);
> -		put_layout_hdr(lrp->args.inode);
> +		put_layout_hdr(ino);
>  	}
>  	kfree(calldata);
>  	dprintk("<-- %s\n", __func__);
> @@ -5639,6 +5678,14 @@ int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool issync)
>  	int status = 0;
>  
>  	dprintk("--> %s\n", __func__);
> +	if (lrp->args.return_type == RETURN_FILE) {
> +		struct pnfs_layout_hdr *lo = NFS_I(lrp->args.inode)->layout;
> +		/* FIXME we should test for BULK here */
> +		spin_lock(&lo->inode->i_lock);
> +		BUG_ON(lo->plh_block_lgets == 0);
> +		lo->plh_outstanding++;
> +		spin_unlock(&lo->inode->i_lock);
> +	}
>  	task = rpc_run_task(&task_setup_data);
>  	if (IS_ERR(task))
>  		return PTR_ERR(task);
> diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
> index 00632f6..ceb0d66 100644
> --- a/fs/nfs/nfs4state.c
> +++ b/fs/nfs/nfs4state.c
> @@ -1560,6 +1560,10 @@ static void nfs4_state_manager(struct nfs_client *clp)
>  			nfs_client_return_marked_delegations(clp);
>  			continue;
>  		}
> +		if (test_and_clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state)) {
> +			nfs_client_return_layouts(clp);
> +			continue;
> +		}
>  		/* Recall session slots */
>  		if (test_and_clear_bit(NFS4CLNT_RECALL_SLOT, &clp->cl_state)
>  		   && nfs4_has_session(clp)) {
> diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
> index 328cca5..f530c7e 100644
> --- a/fs/nfs/nfs4xdr.c
> +++ b/fs/nfs/nfs4xdr.c
> @@ -1827,13 +1827,14 @@ encode_getdeviceinfo(struct xdr_stream *xdr,
>  	hdr->replen += decode_getdeviceinfo_maxsz;
>  }
>  
> -static void
> +static int
>  encode_layoutget(struct xdr_stream *xdr,
>  		      const struct nfs4_layoutget_args *args,
>  		      struct compound_hdr *hdr)
>  {
>  	nfs4_stateid stateid;
>  	__be32 *p;
> +	int status;
>  
>  	p = reserve_space(xdr, 44 + NFS4_STATEID_SIZE);
>  	*p++ = cpu_to_be32(OP_LAYOUTGET);
> @@ -1843,8 +1844,11 @@ encode_layoutget(struct xdr_stream *xdr,
>  	p = xdr_encode_hyper(p, args->range.offset);
>  	p = xdr_encode_hyper(p, args->range.length);
>  	p = xdr_encode_hyper(p, args->minlength);
> -	pnfs_get_layout_stateid(&stateid, NFS_I(args->inode)->layout,
> -				args->ctx->state);
> +	status = pnfs_choose_layoutget_stateid(&stateid,
> +					       NFS_I(args->inode)->layout,
> +					       args->ctx->state);
> +	if (status)
> +		return status;
>  	p = xdr_encode_opaque_fixed(p, &stateid.data, NFS4_STATEID_SIZE);
>  	*p = cpu_to_be32(args->maxcount);
>  
> @@ -1857,6 +1861,7 @@ encode_layoutget(struct xdr_stream *xdr,
>  		args->maxcount);
>  	hdr->nops++;
>  	hdr->replen += decode_layoutget_maxsz;
> +	return 0;
>  }
>  
>  static int
> @@ -2782,12 +2787,15 @@ static int nfs4_xdr_enc_layoutget(struct rpc_rqst *req, uint32_t *p,
>  	struct compound_hdr hdr = {
>  		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
>  	};
> +	int status;
>  
>  	xdr_init_encode(&xdr, &req->rq_snd_buf, p);
>  	encode_compound_hdr(&xdr, req, &hdr);
>  	encode_sequence(&xdr, &args->seq_args, &hdr);
>  	encode_putfh(&xdr, NFS_FH(args->inode), &hdr);
> -	encode_layoutget(&xdr, args, &hdr);
> +	status = encode_layoutget(&xdr, args, &hdr);
> +	if (status)
> +		return status;
>  	encode_nops(&hdr);
>  	return 0;
>  }
> diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
> index 07b04e8..2d817be 100644
> --- a/fs/nfs/pnfs.c
> +++ b/fs/nfs/pnfs.c
> @@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(pnfs_unregister_layoutdriver);
>   */
>  
>  /* Need to hold i_lock if caller does not already hold reference */
> -static void
> +void
>  get_layout_hdr(struct pnfs_layout_hdr *lo)
>  {
>  	atomic_inc(&lo->plh_refcount);
> @@ -278,24 +278,29 @@ init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg)
>  	smp_mb();
>  	lseg->valid = true;
>  	lseg->layout = lo;
> +	lseg->drain_notification = NULL;
>  }
>  
>  static void
>  _put_lseg_common(struct pnfs_layout_segment *lseg)
>  {
> +	struct inode *ino = lseg->layout->inode;
> +
>  	BUG_ON(lseg->valid == true);
>  	list_del(&lseg->fi_list);
>  	if (list_empty(&lseg->layout->segs)) {
>  		struct nfs_client *clp;
>  
> -		clp = NFS_SERVER(lseg->layout->inode)->nfs_client;
> +		clp = NFS_SERVER(ino)->nfs_client;
>  		spin_lock(&clp->cl_lock);
>  		/* List does not take a reference, so no need for put here */
>  		list_del_init(&lseg->layout->layouts);
>  		spin_unlock(&clp->cl_lock);
> -		pnfs_invalidate_layout_stateid(lseg->layout);
> +		clear_bit(NFS_LAYOUT_BULK_RECALL, &lseg->layout->plh_flags);
> +		if (!pnfs_layoutgets_blocked(lseg->layout, NULL))
> +			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>  	}
> -	rpc_wake_up(&NFS_I(lseg->layout->inode)->lo_rpcwaitq);
> +	rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq);
>  }
>  
>  /* The use of tmp_list is necessary because pnfs_curr_ld->free_lseg
> @@ -325,9 +330,12 @@ put_lseg(struct pnfs_layout_segment *lseg)
>  		atomic_read(&lseg->pls_refcount), lseg->valid);
>  	ino = lseg->layout->inode;
>  	if (atomic_dec_and_lock(&lseg->pls_refcount, &ino->i_lock)) {
> +		struct pnfs_cb_lrecall_info *drain_info = lseg->drain_notification;
> +
>  		_put_lseg_common(lseg);
>  		spin_unlock(&ino->i_lock);
>  		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
> +		notify_drained(drain_info);
>  		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
>  		put_layout_hdr(ino);
>  	}
> @@ -345,7 +353,7 @@ EXPORT_SYMBOL_GPL(put_lseg);
>   * READ		READ	true
>   * READ		RW	false
>   */
> -static int
> +bool
>  should_free_lseg(struct pnfs_layout_range *lseg_range,
>  		 struct pnfs_layout_range *recall_range)
>  {
> @@ -388,16 +396,19 @@ pnfs_clear_lseg_list(struct pnfs_layout_hdr *lo, struct list_head *tmp_list,
>  	dprintk("%s:Return\n", __func__);
>  }
>  
> -static void
> +void
>  pnfs_free_lseg_list(struct list_head *free_me)
>  {
>  	struct pnfs_layout_segment *lseg, *tmp;
>  	struct inode *ino;
> +	struct pnfs_cb_lrecall_info *drain_info;
>  
>  	list_for_each_entry_safe(lseg, tmp, free_me, fi_list) {
>  		BUG_ON(atomic_read(&lseg->pls_refcount) != 0);
>  		ino = lseg->layout->inode;
> +		drain_info = lseg->drain_notification;
>  		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
> +		notify_drained(drain_info);
>  		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
>  		put_layout_hdr(ino);
>  	}
> @@ -453,40 +464,49 @@ pnfs_destroy_all_layouts(struct nfs_client *clp)
>  	}
>  }
>  
> -/* update lo->stateid with new if is more recent
> - *
> - * lo->stateid could be the open stateid, in which case we just use what given.
> - */
> +/* update lo->stateid with new if is more recent */
>  void
> -pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
> -			const nfs4_stateid *new)
> +pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
> +			bool update_barrier)
>  {
> -	nfs4_stateid *old = &lo->stateid;
> -	bool overwrite = false;
> +	u32 oldseq, newseq;
>  
>  	assert_spin_locked(&lo->inode->i_lock);
> -	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags) ||
> -	    memcmp(old->stateid.other, new->stateid.other, sizeof(new->stateid.other)))
> -		overwrite = true;
> -	else {
> -		u32 oldseq, newseq;
> -
> -		oldseq = be32_to_cpu(old->stateid.seqid);
> -		newseq = be32_to_cpu(new->stateid.seqid);
> -		if ((int)(newseq - oldseq) > 0)
> -			overwrite = true;
> +	oldseq = be32_to_cpu(lo->stateid.stateid.seqid);
> +	newseq = be32_to_cpu(new->stateid.seqid);
> +	if ((int)(newseq - oldseq) > 0) {
> +		memcpy(&lo->stateid, &new->stateid, sizeof(new->stateid));
> +		if (update_barrier)
> +			lo->plh_barrier = be32_to_cpu(new->stateid.seqid);
> +		else {
> +			/* Because of wraparound, we want to keep the barrier
> +			 * "close" to the current seqids.  It needs to be
> +			 * within 2**31 to count as "behind", so if it
> +			 * gets too near that limit, give us a litle leeway
> +			 * and bring it to within 2**30.
> +			 * NOTE - and yes, this is all unsigned arithmetic.
> +			 */
> +			if (unlikely((newseq - lo->plh_barrier) > (3 << 29)))
> +				lo->plh_barrier = newseq - (1 << 30);
> +		}
>  	}
> -	if (overwrite)
> -		memcpy(&old->stateid, &new->stateid, sizeof(new->stateid));
>  }
>  
> -void
> -pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
> -			struct nfs4_state *open_state)
> +int
> +pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
> +			      struct nfs4_state *open_state)
>  {
> +	int status = 0;
> +
>  	dprintk("--> %s\n", __func__);
>  	spin_lock(&lo->inode->i_lock);
> -	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags)) {
> +	if (lo->plh_block_lgets ||
> +	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
> +		/* We avoid -EAGAIN, as that has special meaning to
> +		 * some callers.
> +		 */
> +		status = -NFS4ERR_LAYOUTTRYLATER;
> +	} else if (list_empty(&lo->segs)) {
>  		int seq;
>  
>  		do {
> @@ -494,12 +514,11 @@ pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
>  			memcpy(dst->data, open_state->stateid.data,
>  			       sizeof(open_state->stateid.data));
>  		} while (read_seqretry(&open_state->seqlock, seq));
> -		set_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
>  	} else
> -		memcpy(dst->data, lo->stateid.data,
> -		       sizeof(lo->stateid.data));
> +		memcpy(dst->data, lo->stateid.data, sizeof(lo->stateid.data));
>  	spin_unlock(&lo->inode->i_lock);
>  	dprintk("<-- %s\n", __func__);
> +	return status;
>  }
>  
>  /*
> @@ -566,6 +585,28 @@ has_layout_to_return(struct pnfs_layout_hdr *lo,
>  	return out;
>  }
>  
> +void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
> +				struct pnfs_layout_range *range,
> +				struct pnfs_cb_lrecall_info *drain_info,
> +				struct list_head *tmp_list)
> +{
> +	struct pnfs_layout_segment *lseg, *tmp;
> +
> +	assert_spin_locked(&lo->inode->i_lock);

Poor practice. If you want to ensure the caller holds the inode->i_lock,
then just call the function '*_locked'. That is a lot more helpful than
these damned asserts.

> +	list_for_each_entry_safe(lseg, tmp, &lo->segs, fi_list)
> +		if (should_free_lseg(&lseg->range, range)) {
> +			/* FIXME - need to change to something like a
> +			 * notification bitmap to remove the restriction
> +			 * of only being able to process a single
> +			 * CB_LAYOUTRECALL at a time.
> +			 */
> +			BUG_ON(lseg->drain_notification);
> +			lseg->drain_notification = drain_info;
> +			atomic_inc(&drain_info->pcl_count);
> +			mark_lseg_invalid(lseg, tmp_list);
> +		}
> +}
> +
>  /* Return true if there is layout based io in progress in the given range.
>   * Assumes range has already been marked invalid, and layout marked to
>   * prevent any new lseg from being inserted.
> @@ -711,14 +752,6 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
>  	dprintk("%s:Begin\n", __func__);
>  
>  	assert_spin_locked(&lo->inode->i_lock);
> -	if (list_empty(&lo->segs)) {
> -		struct nfs_client *clp = NFS_SERVER(lo->inode)->nfs_client;
> -
> -		spin_lock(&clp->cl_lock);
> -		BUG_ON(!list_empty(&lo->layouts));
> -		list_add_tail(&lo->layouts, &clp->cl_layouts);
> -		spin_unlock(&clp->cl_lock);
> -	}
>  	list_for_each_entry(lp, &lo->segs, fi_list) {
>  		if (cmp_layout(&lp->range, &lseg->range) > 0)
>  			continue;
> @@ -735,6 +768,9 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
>  	}
>  	if (!found) {
>  		list_add_tail(&lseg->fi_list, &lo->segs);
> +		if (list_is_singular(&lo->segs) &&
> +		    !pnfs_layoutgets_blocked(lo, NULL))
> +			rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
>  		dprintk("%s: inserted lseg %p "
>  			"iomode %d offset %llu length %llu at tail\n",
>  			__func__, lseg, lseg->range.iomode,
> @@ -756,6 +792,7 @@ alloc_init_layout_hdr(struct inode *ino)
>  	atomic_set(&lo->plh_refcount, 1);
>  	INIT_LIST_HEAD(&lo->layouts);
>  	INIT_LIST_HEAD(&lo->segs);
> +	INIT_LIST_HEAD(&lo->plh_bulk_recall);
>  	lo->inode = ino;
>  	return lo;
>  }
> @@ -843,6 +880,7 @@ pnfs_update_layout(struct inode *ino,
>  		.length = NFS4_MAX_UINT64,
>  	};
>  	struct nfs_inode *nfsi = NFS_I(ino);
> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>  	struct pnfs_layout_hdr *lo;
>  	struct pnfs_layout_segment *lseg = NULL;
>  
> @@ -878,9 +916,28 @@ pnfs_update_layout(struct inode *ino,
>  		goto out_unlock;
>  
>  	get_layout_hdr(lo); /* Matched in pnfs_layoutget_release */
> +	if (list_empty(&lo->segs)) {
> +		/* The lo must be on the clp list if there is any
> +		 * chance of a CB_LAYOUTRECALL(FILE) coming in.
> +		 */
> +		spin_lock(&clp->cl_lock);
> +		BUG_ON(!list_empty(&lo->layouts));
> +		list_add_tail(&lo->layouts, &clp->cl_layouts);
> +		spin_unlock(&clp->cl_lock);
> +	}
>  	spin_unlock(&ino->i_lock);
>  
>  	lseg = send_layoutget(lo, ctx, &arg);
> +	if (!lseg) {
> +		spin_lock(&ino->i_lock);
> +		if (list_empty(&lo->segs)) {
> +			spin_lock(&clp->cl_lock);
> +			list_del_init(&lo->layouts);
> +			spin_unlock(&clp->cl_lock);
> +			clear_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
> +		}
> +		spin_unlock(&ino->i_lock);
> +	}
>  out:
>  	dprintk("%s end, state 0x%lx lseg %p\n", __func__,
>  		nfsi->layout->plh_flags, lseg);
> @@ -891,10 +948,15 @@ out_unlock:
>  }
>  
>  bool
> -pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo)
> +pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid)
>  {
>  	assert_spin_locked(&lo->inode->i_lock);
> -	return lo->plh_block_lgets;
> +	if ((stateid) &&
> +	    (int)(lo->plh_barrier - be32_to_cpu(stateid->stateid.seqid)) >= 0)
> +		return true;
> +	return lo->plh_block_lgets ||
> +		test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
> +		(list_empty(&lo->segs) && lo->plh_outstanding);
>  }
>  
>  int
> @@ -904,6 +966,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>  	struct nfs4_layoutget_res *res = &lgp->res;
>  	struct pnfs_layout_segment *lseg;
>  	struct inode *ino = lo->inode;
> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>  	int status = 0;
>  
>  	/* Inject layout blob into I/O device driver */
> @@ -915,10 +978,25 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>  			status = PTR_ERR(lseg);
>  		dprintk("%s: Could not allocate layout: error %d\n",
>  		       __func__, status);
> +		spin_lock(&ino->i_lock);
>  		goto out;
>  	}
>  
>  	spin_lock(&ino->i_lock);
> +	/* decrement needs to be done before call to pnfs_layoutget_blocked */
> +	lo->plh_outstanding--;
> +	spin_lock(&clp->cl_lock);
> +	if (matches_outstanding_recall(ino, &res->range)) {
> +		spin_unlock(&clp->cl_lock);
> +		dprintk("%s forget reply due to recall\n", __func__);
> +		goto out_forget_reply;
> +	}
> +	spin_unlock(&clp->cl_lock);
> +
> +	if (pnfs_layoutgets_blocked(lo, &res->stateid)) {
> +		dprintk("%s forget reply due to state\n", __func__);
> +		goto out_forget_reply;
> +	}
>  	init_lseg(lo, lseg);
>  	lseg->range = res->range;
>  	get_lseg(lseg);
> @@ -934,10 +1012,19 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>  	}
>  
>  	/* Done processing layoutget. Set the layout stateid */
> -	pnfs_set_layout_stateid(lo, &res->stateid);
> -	spin_unlock(&ino->i_lock);
> +	pnfs_set_layout_stateid(lo, &res->stateid, false);
>  out:
> +	if (!pnfs_layoutgets_blocked(lo, NULL))
> +		rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
> +	spin_unlock(&ino->i_lock);
>  	return status;
> +
> +out_forget_reply:
> +	spin_unlock(&ino->i_lock);
> +	lseg->layout = lo;
> +	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
> +	spin_lock(&ino->i_lock);
> +	goto out;
>  }
>  
>  void
> diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
> index 891aeab..7ea121f 100644
> --- a/fs/nfs/pnfs.h
> +++ b/fs/nfs/pnfs.h
> @@ -31,6 +31,7 @@
>  #define FS_NFS_PNFS_H
>  
>  #include <linux/nfs_page.h>
> +#include "callback.h" /* for cb_layoutrecallargs */
>  
>  struct pnfs_layout_segment {
>  	struct list_head fi_list;
> @@ -38,6 +39,7 @@ struct pnfs_layout_segment {
>  	atomic_t pls_refcount;
>  	bool valid;
>  	struct pnfs_layout_hdr *layout;
> +	struct pnfs_cb_lrecall_info *drain_notification;
>  };
>  
>  enum pnfs_try_status {
> @@ -52,7 +54,7 @@ enum pnfs_try_status {
>  enum {
>  	NFS_LAYOUT_RO_FAILED = 0,	/* get ro layout failed stop trying */
>  	NFS_LAYOUT_RW_FAILED,		/* get rw layout failed stop trying */
> -	NFS_LAYOUT_STATEID_SET,		/* have a valid layout stateid */
> +	NFS_LAYOUT_BULK_RECALL,		/* bulk recall affecting layout */
>  	NFS_LAYOUT_NEED_LCOMMIT,	/* LAYOUTCOMMIT needed */
>  };
>  
> @@ -94,10 +96,13 @@ struct pnfs_layoutdriver_type {
>  struct pnfs_layout_hdr {
>  	atomic_t		plh_refcount;
>  	struct list_head	layouts;   /* other client layouts */
> +	struct list_head	plh_bulk_recall; /* clnt list of bulk recalls */
>  	struct list_head	segs;      /* layout segments list */
>  	int			roc_iomode;/* return on close iomode, 0=none */
>  	nfs4_stateid		stateid;
> +	unsigned long		plh_outstanding; /* number of RPCs out */
>  	unsigned long		plh_block_lgets; /* block LAYOUTGET if >0 */
> +	u32			plh_barrier; /* ignore lower seqids */
>  	unsigned long		plh_flags;
>  	struct rpc_cred		*cred;     /* layoutcommit credential */
>  	/* DH: These vars keep track of the maximum write range
> @@ -118,6 +123,14 @@ struct pnfs_device {
>  	unsigned int  pglen;
>  };
>  
> +struct pnfs_cb_lrecall_info {
> +	struct list_head	pcl_list; /* hook into cl_layoutrecalls list */
> +	atomic_t		pcl_count;
> +	struct nfs_client	*pcl_clp;
> +	struct inode		*pcl_ino;
> +	struct cb_layoutrecallargs pcl_args;
> +};
> +
>  /*
>   * Device ID RCU cache. A device ID is unique per client ID and layout type.
>   */
> @@ -176,7 +189,10 @@ extern int nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data,
>  extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool wait);
>  
>  /* pnfs.c */
> +void get_layout_hdr(struct pnfs_layout_hdr *lo);
>  void put_lseg(struct pnfs_layout_segment *lseg);
> +bool should_free_lseg(struct pnfs_layout_range *lseg_range,
> +		      struct pnfs_layout_range *recall_range);
>  struct pnfs_layout_segment *
>  pnfs_has_layout(struct pnfs_layout_hdr *lo, struct pnfs_layout_range *range);
>  struct pnfs_layout_segment *
> @@ -201,15 +217,24 @@ enum pnfs_try_status pnfs_try_to_commit(struct nfs_write_data *,
>  void pnfs_pageio_init_read(struct nfs_pageio_descriptor *, struct inode *,
>  			   struct nfs_open_context *, struct list_head *);
>  void pnfs_pageio_init_write(struct nfs_pageio_descriptor *, struct inode *);
> -bool pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo);
> +bool pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid);
>  int pnfs_layout_process(struct nfs4_layoutget *lgp);
> +void pnfs_free_lseg_list(struct list_head *tmp_list);
>  void pnfs_destroy_layout(struct nfs_inode *);
>  void pnfs_destroy_all_layouts(struct nfs_client *);
>  void put_layout_hdr(struct inode *inode);
>  void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
> -			     const nfs4_stateid *new);
> -void pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
> -			     struct nfs4_state *open_state);
> +			     const nfs4_stateid *new,
> +			     bool update_barrier);
> +int pnfs_choose_layoutget_stateid(nfs4_stateid *dst,
> +				  struct pnfs_layout_hdr *lo,
> +				  struct nfs4_state *open_state);
> +void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
> +				struct pnfs_layout_range *range,
> +				struct pnfs_cb_lrecall_info *drain_info,
> +				struct list_head *tmp_list);
> +/* FIXME - this should be in callback.h, but pnfs_cb_lrecall_info needs to be there too */
> +extern void notify_drained(struct pnfs_cb_lrecall_info *d);
>  
>  static inline bool
>  has_layout(struct nfs_inode *nfsi)
> @@ -223,12 +248,6 @@ static inline int lo_fail_bit(u32 iomode)
>  			 NFS_LAYOUT_RW_FAILED : NFS_LAYOUT_RO_FAILED;
>  }
>  
> -static inline void pnfs_invalidate_layout_stateid(struct pnfs_layout_hdr *lo)
> -{
> -	assert_spin_locked(&lo->inode->i_lock);
> -	clear_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
> -}
> -
>  static inline void get_lseg(struct pnfs_layout_segment *lseg)
>  {
>  	atomic_inc(&lseg->pls_refcount);
> diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
> index 3cae408..80dcc00 100644
> --- a/include/linux/nfs_fs_sb.h
> +++ b/include/linux/nfs_fs_sb.h
> @@ -83,6 +83,10 @@ struct nfs_client {
>  	u32			cl_exchange_flags;
>  	struct nfs4_session	*cl_session; 	/* sharred session */
>  	struct list_head	cl_layouts;
> +	struct list_head	cl_layoutrecalls;
> +	unsigned long		cl_cb_lrecall_count;
> +#define PNFS_MAX_CB_LRECALLS (1)
> +	struct rpc_wait_queue	cl_rpcwaitq_recall;
>  	struct pnfs_deviceid_cache *cl_devid_cache; /* pNFS deviceid cache */
>  #endif /* CONFIG_NFS_V4_1 */
>  



--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux