Current code set new file/directory's initial ACL in a non-atomic manner. Client first sends request to MDS to create new file/directory, then set the initial ACL after the new file/directory is successfully created. The fix is include the initial ACL in create/mkdir/mknod MDS requests. So MDS can handle creating file/directory and setting the initial ACL in one request. Signed-off-by: Yan, Zheng <zyan@xxxxxxxxxx> --- fs/ceph/acl.c | 124 +++++++++++++++++++++++++++++++++++++++++---------- fs/ceph/dir.c | 41 ++++++++++++----- fs/ceph/file.c | 27 ++++++++--- fs/ceph/mds_client.c | 1 - fs/ceph/super.h | 27 ++++++++--- 5 files changed, 174 insertions(+), 46 deletions(-) diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index cebf2eb..2b7f497 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -169,36 +169,112 @@ out: return ret; } -int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) +int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info) { - struct posix_acl *default_acl, *acl; - umode_t new_mode = inode->i_mode; - int error; - - error = posix_acl_create(dir, &new_mode, &default_acl, &acl); - if (error) - return error; - - if (!default_acl && !acl) { - cache_no_acl(inode); - if (new_mode != inode->i_mode) { - struct iattr newattrs = { - .ia_mode = new_mode, - .ia_valid = ATTR_MODE, - }; - error = ceph_setattr(dentry, &newattrs); + struct posix_acl *acl, *default_acl; + size_t buf_size, name_len1, name_len2, val_size1, val_size2; + struct page **pages = NULL; + void *p, *xattr_buf = NULL; + int err, i, nr_pages; + + err = posix_acl_create(dir, mode, &default_acl, &acl); + if (err) + return err; + + if (acl) { + int ret = posix_acl_equiv_mode(acl, mode); + if (ret < 0) + goto out_err; + if (ret == 0) { + posix_acl_release(acl); + acl = NULL; } - return error; } + if (!default_acl && !acl) + return 0; + + buf_size = 4; + if (acl) { + name_len1 = strlen(POSIX_ACL_XATTR_ACCESS); + val_size1 = posix_acl_xattr_size(acl->a_count); + buf_size += 8 + name_len1 + val_size1; + } else { + name_len1 = val_size1 = 0; + } if (default_acl) { - error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT); - posix_acl_release(default_acl); + name_len2 = strlen(POSIX_ACL_XATTR_DEFAULT); + val_size2 = posix_acl_xattr_size(default_acl->a_count); + buf_size += 8 + name_len2 + val_size2; + } else { + name_len2 = val_size2 = 0; } + + err = -ENOMEM; + nr_pages = calc_pages_for(0, buf_size); + pages = kmalloc(sizeof(struct page*) * nr_pages, GFP_NOFS); + if (!pages) + goto out_err; + xattr_buf = kmalloc(PAGE_SIZE * nr_pages, GFP_NOFS); + if (!xattr_buf) + goto out_err; + + pages[0] = virt_to_page(xattr_buf); + for (i = 1; i < nr_pages; i++) + pages[i] = pages[0] + i; + + p = xattr_buf; + ceph_encode_32(&p, acl && default_acl ? 2 : 1); + if (acl) { - if (!error) - error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS); - posix_acl_release(acl); + ceph_encode_32(&p, name_len1); + memcpy(p, POSIX_ACL_XATTR_ACCESS, name_len1); + p += name_len1; + ceph_encode_32(&p, val_size1); + err = posix_acl_to_xattr(&init_user_ns, acl, p, val_size1); + if (err < 0) + goto out_err; + p += val_size1; } - return error; + if (default_acl) { + ceph_encode_32(&p, name_len2); + memcpy(p, POSIX_ACL_XATTR_DEFAULT, name_len2); + p += name_len2; + ceph_encode_32(&p, val_size2); + err = posix_acl_to_xattr(&init_user_ns, default_acl, p, val_size2); + if (err < 0) + goto out_err; + p += val_size2; + } + + info->acl = acl; + info->default_acl = default_acl; + info->xattr_buf = xattr_buf; + info->buf_pages = pages; + info->buf_size = buf_size; + return 0; + +out_err: + posix_acl_release(acl); + posix_acl_release(default_acl); + kfree(pages); + kfree(xattr_buf); + return err; +} + +void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info) +{ + if (!inode) + return; + ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl); + ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl); +} + +void ceph_release_acls_info(struct ceph_acls_info *info) +{ + posix_acl_release(info->acl); + posix_acl_release(info->default_acl); + kfree(info->buf_pages); + kfree(info->xattr_buf); } diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index c29d6ae..ea576b5 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + struct ceph_acls_info acls = {}; int err; if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + return err; + dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", dir, dentry, mode, rdev); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); if (IS_ERR(req)) { - d_drop(dentry); - return PTR_ERR(req); + err = PTR_ERR(req); + goto out; } req->r_dentry = dget(dentry); req->r_num_caps = 2; @@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.buf_size) { + req->r_pages = acls.buf_pages; + req->r_data_len = acls.buf_size; + } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); - +out: if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); else d_drop(dentry); + ceph_release_acls_info(&acls); return err; } @@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); if (IS_ERR(req)) { - d_drop(dentry); - return PTR_ERR(req); + err = PTR_ERR(req); + goto out; } req->r_dentry = dget(dentry); req->r_num_caps = 2; @@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); - if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); - else +out: + if (err) d_drop(dentry); return err; } @@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + struct ceph_acls_info acls = {}; int err = -EROFS; int op; @@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) } else { goto out; } + + mode |= S_IFDIR; + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + goto out; + req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.buf_size) { + req->r_pages = acls.buf_pages; + req->r_data_len = acls.buf_size; + } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); out: if (!err) - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); else d_drop(dentry); + ceph_release_acls_info(&acls); return err; } diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 2eb02f8..46a0525f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; struct dentry *dn; + struct ceph_acls_info acls = {}; int err; dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n", @@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, if (err < 0) return err; + if (flags & O_CREAT) { + err = ceph_pre_init_acls(dir, &mode, &acls); + if (err < 0) + return err; + } + /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); - if (IS_ERR(req)) - return PTR_ERR(req); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out_acl; + } req->r_dentry = dget(dentry); req->r_num_caps = 2; if (flags & O_CREAT) { req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (acls.buf_size) { + req->r_pages = acls.buf_pages; + req->r_data_len = acls.buf_size; + } } req->r_locked_dir = dir; /* caller holds dir->i_mutex */ err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); if (err) - goto out_err; + goto out_req; err = ceph_handle_snapdir(req, dentry, err); if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) @@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, dn = NULL; } if (err) - goto out_err; + goto out_req; if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) { /* make vfs retry on splice, ENOENT, or symlink */ dout("atomic_open finish_no_open on dn %p\n", dn); @@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } else { dout("atomic_open finish_open on dn %p\n", dn); if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { - ceph_init_acl(dentry, dentry->d_inode, dir); + ceph_init_inode_acls(dentry->d_inode, &acls); *opened |= FILE_CREATED; } err = finish_open(file, dentry, ceph_open, opened); } -out_err: +out_req: if (!req->r_err && req->r_target_inode) ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); ceph_mdsc_put_request(req); +out_acl: + ceph_release_acls_info(&acls); dout("atomic_open result=%d\n", err); return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index a17fc49..8a397f2 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1848,7 +1848,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); if (req->r_data_len) { - /* outbound data set only by ceph_sync_setxattr() */ BUG_ON(!req->r_pages); ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 12b2074..ad76163 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -733,15 +733,25 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci); extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __init ceph_xattr_init(void); extern void ceph_xattr_exit(void); +extern const struct xattr_handler *ceph_xattr_handlers[]; /* acl.c */ -extern const struct xattr_handler *ceph_xattr_handlers[]; +struct ceph_acls_info { + void *default_acl; + void *acl; + void *xattr_buf; + struct page **buf_pages; + size_t buf_size; +}; #ifdef CONFIG_CEPH_FS_POSIX_ACL struct posix_acl *ceph_get_acl(struct inode *, int); int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type); -int ceph_init_acl(struct dentry *, struct inode *, struct inode *); +int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info); +void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info); +void ceph_release_acls_info(struct ceph_acls_info *info); static inline void ceph_forget_all_cached_acls(struct inode *inode) { @@ -753,12 +763,19 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode) #define ceph_get_acl NULL #define ceph_set_acl NULL -static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, - struct inode *dir) + +static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode, + struct ceph_acls_info *info) { return 0; } - +static inline void ceph_init_inode_acls(struct inode *inode, + struct ceph_acls_info *info) +{ +} +static inline void ceph_release_acls_info(struct ceph_acls_info *info) +{ +} static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) { return 0; -- 1.9.3 -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html