From: Xiubo Li <xiubli@xxxxxxxxxx>
The sync read may split the read into several osdc requests, so
for each it may in different Rados objects.
Signed-off-by: Xiubo Li <xiubli@xxxxxxxxxx>
---
fs/ceph/file.c | 44 ++++++++++++++++++++++++++++++++++++++++++--
fs/ceph/super.h | 18 +++++++++++++++++-
2 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 129f6a642f8e..cedd86a6058d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -871,7 +871,8 @@ enum {
* only return a short read to the caller if we hit EOF.
*/
ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
- struct iov_iter *to, int *retry_op)
+ struct iov_iter *to, int *retry_op,
+ struct ceph_object_vers *objvers)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
@@ -880,6 +881,7 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
u64 off = *ki_pos;
u64 len = iov_iter_count(to);
u64 i_size;
+ u32 object_count = 8;
dout("sync_read on inode %p %llu~%u\n", inode, *ki_pos, (unsigned)len);
@@ -896,6 +898,15 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
if (ret < 0)
return ret;
+ if (objvers) {
+ objvers->count = 0;
+ objvers->objvers = kcalloc(object_count,
+ sizeof(struct ceph_object_ver),
+ GFP_KERNEL);
+ if (!objvers->objvers)
+ return -ENOMEM;
+ }
+
ret = 0;
while ((len = iov_iter_count(to)) > 0) {
struct ceph_osd_request *req;
@@ -938,6 +949,30 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
req->r_end_latency,
len, ret);
+ if (objvers) {
+ u32 ind = objvers->count;
+
+ if (objvers->count >= object_count) {
+ int ov_size;
+
+ object_count *= 2;
+ ov_size = sizeof(struct ceph_object_ver);
+ objvers->objvers = krealloc_array(objvers,
+ object_count,
+ ov_size,
+ GFP_KERNEL);
+ if (!objvers->objvers) {
+ objvers->count = 0;
+ ret = -ENOMEM;
+ break;
+ }
+ }
+
+ objvers->objvers[ind].offset = off;
+ objvers->objvers[ind].length = len;
+ objvers->objvers[ind].objver = req->r_version;
+ objvers->count++;
+ }
ceph_osdc_put_request(req);
i_size = i_size_read(inode);
@@ -995,6 +1030,11 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
}
dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
+ if (ret < 0 && objvers) {
+ objvers->count = 0;
+ kfree(objvers->objvers);
+ objvers->objvers = NULL;
+ }
return ret;
}
@@ -1008,7 +1048,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
(unsigned)iov_iter_count(to),
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
- return __ceph_sync_read(inode, &iocb->ki_pos, to, retry_op);
+ return __ceph_sync_read(inode, &iocb->ki_pos, to, retry_op, NULL);
}
struct ceph_aio_request {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2362d758af97..b347b12e86a9 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -451,6 +451,21 @@ struct ceph_inode_info {
struct inode vfs_inode; /* at end */
};
+/*
+ * The version of an object which contains the
+ * file range of [offset, offset + length).
+ */
+struct ceph_object_ver {
+ u64 offset;
+ u64 length;
+ u64 objver;
+};
+
+struct ceph_object_vers {
+ u32 count;
+ struct ceph_object_ver *objvers;
+};
+
static inline struct ceph_inode_info *
ceph_inode(const struct inode *inode)
{
@@ -1254,7 +1269,8 @@ extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode);
extern ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
- struct iov_iter *to, int *retry_op);
+ struct iov_iter *to, int *retry_op,
+ struct ceph_object_vers *objvers);