Re: [PATCH 3/3] ceph: include the initial ACL in create/mkdir/mknod MDS requests

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, 16 Sep 2014, Yan, Zheng wrote:
> Current code set new file/directory's initial ACL in a non-atomic
> manner.
> Client first sends request to MDS to create new file/directory, then set
> the initial ACL after the new file/directory is successfully created.
> 
> The fix is include the initial ACL in create/mkdir/mknod MDS requests.
> So MDS can handle creating file/directory and setting the initial ACL in
> one request.
> 
> Signed-off-by: Yan, Zheng <zyan@xxxxxxxxxx>

Reviewed-by: Sage Weil <sage@xxxxxxxxxx>

> ---
>  fs/ceph/acl.c   | 125 ++++++++++++++++++++++++++++++++++++++++++++------------
>  fs/ceph/dir.c   |  41 ++++++++++++++-----
>  fs/ceph/file.c  |  27 +++++++++---
>  fs/ceph/super.h |  24 ++++++++---
>  4 files changed, 170 insertions(+), 47 deletions(-)
> 
> diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
> index cebf2eb..5bd853b 100644
> --- a/fs/ceph/acl.c
> +++ b/fs/ceph/acl.c
> @@ -169,36 +169,109 @@ out:
>  	return ret;
>  }
>  
> -int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
> +int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
> +		       struct ceph_acls_info *info)
>  {
> -	struct posix_acl *default_acl, *acl;
> -	umode_t new_mode = inode->i_mode;
> -	int error;
> -
> -	error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
> -	if (error)
> -		return error;
> -
> -	if (!default_acl && !acl) {
> -		cache_no_acl(inode);
> -		if (new_mode != inode->i_mode) {
> -			struct iattr newattrs = {
> -				.ia_mode = new_mode,
> -				.ia_valid = ATTR_MODE,
> -			};
> -			error = ceph_setattr(dentry, &newattrs);
> +	struct posix_acl *acl, *default_acl;
> +	size_t val_size1 = 0, val_size2 = 0;
> +	struct ceph_pagelist *pagelist = NULL;
> +	void *tmp_buf = NULL;
> +	int err;
> +
> +	err = posix_acl_create(dir, mode, &default_acl, &acl);
> +	if (err)
> +		return err;
> +
> +	if (acl) {
> +		int ret = posix_acl_equiv_mode(acl, mode);
> +		if (ret < 0)
> +			goto out_err;
> +		if (ret == 0) {
> +			posix_acl_release(acl);
> +			acl = NULL;
>  		}
> -		return error;
>  	}
>  
> -	if (default_acl) {
> -		error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
> -		posix_acl_release(default_acl);
> -	}
> +	if (!default_acl && !acl)
> +		return 0;
> +
> +	if (acl)
> +		val_size1 = posix_acl_xattr_size(acl->a_count);
> +	if (default_acl)
> +		val_size2 = posix_acl_xattr_size(default_acl->a_count);
> +
> +	err = -ENOMEM;
> +	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
> +	if (!tmp_buf)
> +		goto out_err;
> +	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
> +	if (!pagelist)
> +		goto out_err;
> +	ceph_pagelist_init(pagelist);
> +
> +	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
> +	if (err)
> +		goto out_err;
> +
> +	ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
> +
>  	if (acl) {
> -		if (!error)
> -			error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS);
> -		posix_acl_release(acl);
> +		size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
> +		err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
> +		if (err)
> +			goto out_err;
> +		ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
> +					    len);
> +		err = posix_acl_to_xattr(&init_user_ns, acl,
> +					 tmp_buf, val_size1);
> +		if (err < 0)
> +			goto out_err;
> +		ceph_pagelist_encode_32(pagelist, val_size1);
> +		ceph_pagelist_append(pagelist, tmp_buf, val_size1);
>  	}
> -	return error;
> +	if (default_acl) {
> +		size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
> +		err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
> +		if (err)
> +			goto out_err;
> +		err = ceph_pagelist_encode_string(pagelist,
> +						  POSIX_ACL_XATTR_DEFAULT, len);
> +		err = posix_acl_to_xattr(&init_user_ns, default_acl,
> +					 tmp_buf, val_size2);
> +		if (err < 0)
> +			goto out_err;
> +		ceph_pagelist_encode_32(pagelist, val_size2);
> +		ceph_pagelist_append(pagelist, tmp_buf, val_size2);
> +	}
> +
> +	kfree(tmp_buf);
> +
> +	info->acl = acl;
> +	info->default_acl = default_acl;
> +	info->pagelist = pagelist;
> +	return 0;
> +
> +out_err:
> +	posix_acl_release(acl);
> +	posix_acl_release(default_acl);
> +	kfree(tmp_buf);
> +	if (pagelist)
> +		ceph_pagelist_release(pagelist);
> +	return err;
> +}
> +
> +void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
> +{
> +	if (!inode)
> +		return;
> +	ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
> +	ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
> +}
> +
> +void ceph_release_acls_info(struct ceph_acls_info *info)
> +{
> +	posix_acl_release(info->acl);
> +	posix_acl_release(info->default_acl);
> +	if (info->pagelist)
> +		ceph_pagelist_release(info->pagelist);
>  }
> diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> index c29d6ae..26be849 100644
> --- a/fs/ceph/dir.c
> +++ b/fs/ceph/dir.c
> @@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
>  	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
>  	struct ceph_mds_client *mdsc = fsc->mdsc;
>  	struct ceph_mds_request *req;
> +	struct ceph_acls_info acls = {};
>  	int err;
>  
>  	if (ceph_snap(dir) != CEPH_NOSNAP)
>  		return -EROFS;
>  
> +	err = ceph_pre_init_acls(dir, &mode, &acls);
> +	if (err < 0)
> +		return err;
> +
>  	dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
>  	     dir, dentry, mode, rdev);
>  	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
>  	if (IS_ERR(req)) {
> -		d_drop(dentry);
> -		return PTR_ERR(req);
> +		err = PTR_ERR(req);
> +		goto out;
>  	}
>  	req->r_dentry = dget(dentry);
>  	req->r_num_caps = 2;
> @@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
>  	req->r_args.mknod.rdev = cpu_to_le32(rdev);
>  	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
>  	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
> +	if (acls.pagelist) {
> +		req->r_pagelist = acls.pagelist;
> +		acls.pagelist = NULL;
> +	}
>  	err = ceph_mdsc_do_request(mdsc, dir, req);
>  	if (!err && !req->r_reply_info.head->is_dentry)
>  		err = ceph_handle_notrace_create(dir, dentry);
>  	ceph_mdsc_put_request(req);
> -
> +out:
>  	if (!err)
> -		ceph_init_acl(dentry, dentry->d_inode, dir);
> +		ceph_init_inode_acls(dentry->d_inode, &acls);
>  	else
>  		d_drop(dentry);
> +	ceph_release_acls_info(&acls);
>  	return err;
>  }
>  
> @@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
>  	dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
>  	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
>  	if (IS_ERR(req)) {
> -		d_drop(dentry);
> -		return PTR_ERR(req);
> +		err = PTR_ERR(req);
> +		goto out;
>  	}
>  	req->r_dentry = dget(dentry);
>  	req->r_num_caps = 2;
> @@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
>  	if (!err && !req->r_reply_info.head->is_dentry)
>  		err = ceph_handle_notrace_create(dir, dentry);
>  	ceph_mdsc_put_request(req);
> -	if (!err)
> -		ceph_init_acl(dentry, dentry->d_inode, dir);
> -	else
> +out:
> +	if (err)
>  		d_drop(dentry);
>  	return err;
>  }
> @@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
>  	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
>  	struct ceph_mds_client *mdsc = fsc->mdsc;
>  	struct ceph_mds_request *req;
> +	struct ceph_acls_info acls = {};
>  	int err = -EROFS;
>  	int op;
>  
> @@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
>  	} else {
>  		goto out;
>  	}
> +
> +	mode |= S_IFDIR;
> +	err = ceph_pre_init_acls(dir, &mode, &acls);
> +	if (err < 0)
> +		goto out;
> +
>  	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
>  	if (IS_ERR(req)) {
>  		err = PTR_ERR(req);
> @@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
>  	req->r_args.mkdir.mode = cpu_to_le32(mode);
>  	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
>  	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
> +	if (acls.pagelist) {
> +		req->r_pagelist = acls.pagelist;
> +		acls.pagelist = NULL;
> +	}
>  	err = ceph_mdsc_do_request(mdsc, dir, req);
>  	if (!err && !req->r_reply_info.head->is_dentry)
>  		err = ceph_handle_notrace_create(dir, dentry);
>  	ceph_mdsc_put_request(req);
>  out:
>  	if (!err)
> -		ceph_init_acl(dentry, dentry->d_inode, dir);
> +		ceph_init_inode_acls(dentry->d_inode, &acls);
>  	else
>  		d_drop(dentry);
> +	ceph_release_acls_info(&acls);
>  	return err;
>  }
>  
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index e520317..1c1df08 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
>  	struct ceph_mds_client *mdsc = fsc->mdsc;
>  	struct ceph_mds_request *req;
>  	struct dentry *dn;
> +	struct ceph_acls_info acls = {};
>  	int err;
>  
>  	dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
> @@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
>  	if (err < 0)
>  		return err;
>  
> +	if (flags & O_CREAT) {
> +		err = ceph_pre_init_acls(dir, &mode, &acls);
> +		if (err < 0)
> +			return err;
> +	}
> +
>  	/* do the open */
>  	req = prepare_open_request(dir->i_sb, flags, mode);
> -	if (IS_ERR(req))
> -		return PTR_ERR(req);
> +	if (IS_ERR(req)) {
> +		err = PTR_ERR(req);
> +		goto out_acl;
> +	}
>  	req->r_dentry = dget(dentry);
>  	req->r_num_caps = 2;
>  	if (flags & O_CREAT) {
>  		req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
>  		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
> +		if (acls.pagelist) {
> +			req->r_pagelist = acls.pagelist;
> +			acls.pagelist = NULL;
> +		}
>  	}
>  	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
>  	err = ceph_mdsc_do_request(mdsc,
>  				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
>  				   req);
>  	if (err)
> -		goto out_err;
> +		goto out_req;
>  
>  	err = ceph_handle_snapdir(req, dentry, err);
>  	if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
> @@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
>  		dn = NULL;
>  	}
>  	if (err)
> -		goto out_err;
> +		goto out_req;
>  	if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
>  		/* make vfs retry on splice, ENOENT, or symlink */
>  		dout("atomic_open finish_no_open on dn %p\n", dn);
> @@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
>  	} else {
>  		dout("atomic_open finish_open on dn %p\n", dn);
>  		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
> -			ceph_init_acl(dentry, dentry->d_inode, dir);
> +			ceph_init_inode_acls(dentry->d_inode, &acls);
>  			*opened |= FILE_CREATED;
>  		}
>  		err = finish_open(file, dentry, ceph_open, opened);
>  	}
> -out_err:
> +out_req:
>  	if (!req->r_err && req->r_target_inode)
>  		ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
>  	ceph_mdsc_put_request(req);
> +out_acl:
> +	ceph_release_acls_info(&acls);
>  	dout("atomic_open result=%d\n", err);
>  	return err;
>  }
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index bbb44cd..f62a098 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -733,15 +733,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
>  extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
>  extern void __init ceph_xattr_init(void);
>  extern void ceph_xattr_exit(void);
> +extern const struct xattr_handler *ceph_xattr_handlers[];
>  
>  /* acl.c */
> -extern const struct xattr_handler *ceph_xattr_handlers[];
> +struct ceph_acls_info {
> +	void *default_acl;
> +	void *acl;
> +	struct ceph_pagelist *pagelist;
> +};
>  
>  #ifdef CONFIG_CEPH_FS_POSIX_ACL
>  
>  struct posix_acl *ceph_get_acl(struct inode *, int);
>  int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
> -int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
> +int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
> +		       struct ceph_acls_info *info);
> +void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
> +void ceph_release_acls_info(struct ceph_acls_info *info);
>  
>  static inline void ceph_forget_all_cached_acls(struct inode *inode)
>  {
> @@ -753,12 +761,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
>  #define ceph_get_acl NULL
>  #define ceph_set_acl NULL
>  
> -static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
> -				struct inode *dir)
> +static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
> +				     struct ceph_acls_info *info)
>  {
>  	return 0;
>  }
> -
> +static inline void ceph_init_inode_acls(struct inode *inode,
> +					struct ceph_acls_info *info)
> +{
> +}
> +static inline void ceph_release_acls_info(struct ceph_acls_info *info)
> +{
> +}
>  static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
>  {
>  	return 0;
> -- 
> 1.9.3
> 
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> 
> 
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux