From: Xiubo Li <xiubli@xxxxxxxxxx>
The messages from ceph maybe split into multiple socket packages
and we just need to wait for all the data to be availiable on the
sokcet.
This will add 'sr_total_resid' to record the total length for all
data items for sparse-read message and 'sr_resid_elen' to record
the current extent total length.
URL: https://tracker.ceph.com/issues/63586
Signed-off-by: Xiubo Li <xiubli@xxxxxxxxxx>
---
include/linux/ceph/messenger.h | 1 +
net/ceph/messenger_v1.c | 32 +++++++++++++++++++++-----------
2 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 2eaaabbe98cb..ca6f82abed62 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -231,6 +231,7 @@ struct ceph_msg_data {
struct ceph_msg_data_cursor {
size_t total_resid; /* across all data items */
+ size_t sr_total_resid; /* across all data items for sparse-read */
struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 4cb60bacf5f5..2733da891688 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -160,7 +160,9 @@ static size_t sizeof_footer(struct ceph_connection *con)
static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
/* Initialize data cursor if it's not a sparse read */
- if (!msg->sparse_read)
+ if (msg->sparse_read)
+ msg->cursor.sr_total_resid = data_len;
+ else
ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
}
@@ -1032,35 +1034,43 @@ static int read_partial_sparse_msg_data(struct ceph_connection *con)
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
u32 crc = 0;
int ret = 1;
+ int len;
if (do_datacrc)
crc = con->in_data_crc;
- do {
- if (con->v1.in_sr_kvec.iov_base)
+ while (cursor->sr_total_resid) {
+ len = 0;
+ if (con->v1.in_sr_kvec.iov_base) {
+ len = con->v1.in_sr_kvec.iov_len;
ret = read_partial_message_chunk(con,
&con->v1.in_sr_kvec,
con->v1.in_sr_len,
&crc);
- else if (cursor->sr_resid > 0)
+ len = con->v1.in_sr_kvec.iov_len - len;
+ } else if (cursor->sr_resid > 0) {
+ len = cursor->sr_resid;
ret = read_partial_sparse_msg_extent(con, &crc);
-
- if (ret <= 0) {
- if (do_datacrc)
- con->in_data_crc = crc;
- return ret;
+ len -= cursor->sr_resid;
}
+ cursor->sr_total_resid -= len;
+ if (ret <= 0)
+ break;
memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
ret = con->ops->sparse_read(con, cursor,
(char **)&con->v1.in_sr_kvec.iov_base);
+ if (ret <= 0) {
+ ret = ret ? : 1; /* must return > 0 to indicate success */
+ break;
+ }
con->v1.in_sr_len = ret;
- } while (ret > 0);
+ }
if (do_datacrc)
con->in_data_crc = crc;
- return ret < 0 ? ret : 1; /* must return > 0 to indicate success */
+ return ret;
}
static int read_partial_msg_data(struct ceph_connection *con)