Since sparse read handling is so complex, add a new field for tracking how much data we've read out of the data blob, and decrement that whenever we marshal up a new iov_iter for a read off the socket. On a revoke, just ensure we skip past whatever remains in the iter, plus the remaining data_len and epilogue. Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx> --- include/linux/ceph/messenger.h | 1 + net/ceph/messenger_v2.c | 37 +++++++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 498a1b7bd3c1..206452d8a385 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -413,6 +413,7 @@ struct ceph_connection_v2_info { void *conn_bufs[16]; int conn_buf_cnt; + int data_len_remain; struct kvec in_sign_kvecs[8]; struct kvec out_sign_kvecs[8]; diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index 16fcac363670..45ba59ce69e6 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -1866,6 +1866,7 @@ static int prepare_sparse_read_cont(struct ceph_connection *con) bv.bv_offset = 0; } set_in_bvec(con, &bv); + con->v2.data_len_remain -= bv.bv_len; WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT); return 0; } @@ -1882,7 +1883,10 @@ static int prepare_sparse_read_cont(struct ceph_connection *con) return 0; } - return prepare_read_data_extent(con, off, len); + ret = prepare_read_data_extent(con, off, len); + if (ret == 0) + con->v2.data_len_remain -= len; + return ret; } static int prepare_sparse_read_header(struct ceph_connection *con) @@ -1918,19 +1922,24 @@ static int prepare_sparse_read_header(struct ceph_connection *con) if (!buf) { ret = prepare_read_data_extent(con, off, len); - if (!ret) + if (!ret) { + con->v2.data_len_remain -= len; con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT; + } return ret; } WARN_ON_ONCE(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_HDR); reset_in_kvecs(con); add_in_kvec(con, buf, len); + con->v2.data_len_remain -= len; return 0; } static int prepare_sparse_read_data(struct ceph_connection *con) { + struct ceph_msg *msg = con->in_msg; + if (WARN_ON_ONCE(!con->ops->sparse_read)) return -EOPNOTSUPP; @@ -1939,6 +1948,7 @@ static int prepare_sparse_read_data(struct ceph_connection *con) reset_in_kvecs(con); con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_HDR; + con->v2.data_len_remain = data_len(msg); return prepare_sparse_read_header(con); } @@ -3620,6 +3630,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con) con->v2.in_state = IN_S_FINISH_SKIP; } +static void revoke_at_prepare_sparse_data(struct ceph_connection *con) +{ + int resid; /* current piece of data */ + int remaining; + + WARN_ON(con_secure(con)); + WARN_ON(!data_len(con->in_msg)); + WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter)); + resid = iov_iter_count(&con->v2.in_iter); + dout("%s con %p resid %d\n", __func__, con, resid); + + remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain; + con->v2.in_iter.count -= resid; + set_in_skip(con, resid + remaining); + con->v2.in_state = IN_S_FINISH_SKIP; +} + static void revoke_at_handle_epilogue(struct ceph_connection *con) { int resid; @@ -3636,7 +3663,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con) void ceph_con_v2_revoke_incoming(struct ceph_connection *con) { switch (con->v2.in_state) { - case IN_S_PREPARE_SPARSE_DATA: // FIXME + case IN_S_PREPARE_SPARSE_DATA: case IN_S_PREPARE_READ_DATA: revoke_at_prepare_read_data(con); break; @@ -3646,6 +3673,10 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con) case IN_S_PREPARE_READ_ENC_PAGE: revoke_at_prepare_read_enc_page(con); break; + case IN_S_PREPARE_SPARSE_DATA_HDR: + case IN_S_PREPARE_SPARSE_DATA_CONT: + revoke_at_prepare_sparse_data(con); + break; case IN_S_HANDLE_EPILOGUE: revoke_at_handle_epilogue(con); break; -- 2.34.1