Ensure that we wait on replies from any pending directory operations involving children before we allow synchronous operations involving that directory to proceed. Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx> --- fs/ceph/dir.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++--- fs/ceph/file.c | 4 +++ fs/ceph/super.h | 1 + 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 386c9439a020..0b8cee46e07c 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, struct ceph_mds_request *req; int err; + dout("link in dir %p old_dentry %p dentry %p\n", dir, + old_dentry, dentry); + if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; - dout("link in dir %p old_dentry %p dentry %p\n", dir, - old_dentry, dentry); + err = ceph_async_dirop_request_wait(dir); + if (err) + return err; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS); if (IS_ERR(req)) { d_drop(dentry); @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, iput(req->r_old_inode); } +int ceph_async_dirop_request_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_request *req = NULL; + int ret = 0; + + /* Only applicable for directories */ + if (S_ISDIR(inode->i_mode)) + return 0; + + spin_lock(&ci->i_unsafe_lock); + if (!list_empty(&ci->i_unsafe_dirops)) { + struct ceph_mds_request *last; + last = list_last_entry(&ci->i_unsafe_dirops, + struct ceph_mds_request, + r_unsafe_dir_item); + /* + * If last request hasn't gotten a reply, then wait + * for it. + */ + if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) && + !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) { + req = last; + ceph_mdsc_get_request(req); + } + } + spin_unlock(&ci->i_unsafe_lock); + + if (req) { + dout("%s %p wait on tid %llu\n", __func__, inode, + req ? req->r_tid : 0ULL); + ret = wait_for_completion_killable(&req->r_completion); + ceph_mdsc_put_request(req); + } + return ret; +} + /* * rmdir and unlink are differ only by the metadata op code */ @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; } else goto out; + + /* Wait for any requests involving children to get a reply */ + err = ceph_async_dirop_request_wait(dir); + if (err) + goto out; + req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -1115,6 +1163,9 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, int op = CEPH_MDS_OP_RENAME; int err; + dout("rename dir %p dentry %p to dir %p dentry %p\n", + old_dir, old_dentry, new_dir, new_dentry); + if (flags) return -EINVAL; @@ -1131,8 +1182,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, (!ceph_quota_is_same_realm(old_dir, new_dir))) return -EXDEV; - dout("rename dir %p dentry %p to dir %p dentry %p\n", - old_dir, old_dentry, new_dir, new_dentry); + err = ceph_async_dirop_request_wait(old_dir); + if (err) + return err; + + err = ceph_async_dirop_request_wait(new_dir); + if (err) + return err; + req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index f24d18f46715..f7e49907514e 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -444,6 +444,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, dir, dentry, dentry, d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode); + err = ceph_async_dirop_request_wait(dir); + if (err) + return err; + if (dentry->d_name.len > NAME_MAX) return -ENAMETOOLONG; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 5c361dc1f47f..e97a6ce31a4e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -1070,6 +1070,7 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req, struct dentry *dentry, int err); extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, struct dentry *dentry, int err); +extern int ceph_async_dirop_request_wait(struct inode *inode); extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di); extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di); -- 2.20.1