Add support for a new sparse_read ceph_connection operation. The idea is that the client code can define this operation use it to do special handling for incoming reads. The alloc_msg routine can look at the request and determine whether the reply is expected to be sparse. If it is, then we'll dispatch to a different set of state machine states that will repeatedly call sparse_read get length and placement info for reading the extent map, and the extents themselves. TODO: support for revoke during a sparse read Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx> --- include/linux/ceph/messenger.h | 19 ++++ net/ceph/messenger_v2.c | 164 ++++++++++++++++++++++++++++++--- 2 files changed, 169 insertions(+), 14 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index e7f2fb2fc207..498a1b7bd3c1 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -25,6 +25,22 @@ struct ceph_connection_operations { struct ceph_connection *(*get)(struct ceph_connection *); void (*put)(struct ceph_connection *); + /** + * sparse_read: read sparse data + * @con: connection we're reading from + * @off: offset into msgr data caller should read into + * @len: len of the data that msgr should read + * @buf: optional buffer to read into + * + * This should be called more than once, each time setting up to + * receive an extent into the correct portion of the buffer (and + * zeroing the holes between them). + * + * Returns 1 if there is more data to be read, 0 if reading is + * complete, or -errno if there was an error. + */ + int (*sparse_read)(struct ceph_connection *con, u64 *off, u64 *len, char **buf); + /* handle an incoming message. */ void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m); @@ -252,6 +268,7 @@ struct ceph_msg { struct kref kref; bool more_to_follow; bool needs_out_seq; + bool sparse_read; int front_alloc_len; struct ceph_msgpool *pool; @@ -464,6 +481,8 @@ struct ceph_connection { struct page *bounce_page; u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ + int sparse_resid; + struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */ struct delayed_work work; /* send|recv work */ diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index c6e5bfc717d5..16fcac363670 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -52,14 +52,17 @@ #define FRAME_LATE_STATUS_COMPLETE 0xe #define FRAME_LATE_STATUS_ABORTED_MASK 0xf -#define IN_S_HANDLE_PREAMBLE 1 -#define IN_S_HANDLE_CONTROL 2 -#define IN_S_HANDLE_CONTROL_REMAINDER 3 -#define IN_S_PREPARE_READ_DATA 4 -#define IN_S_PREPARE_READ_DATA_CONT 5 -#define IN_S_PREPARE_READ_ENC_PAGE 6 -#define IN_S_HANDLE_EPILOGUE 7 -#define IN_S_FINISH_SKIP 8 +#define IN_S_HANDLE_PREAMBLE 1 +#define IN_S_HANDLE_CONTROL 2 +#define IN_S_HANDLE_CONTROL_REMAINDER 3 +#define IN_S_PREPARE_READ_DATA 4 +#define IN_S_PREPARE_READ_DATA_CONT 5 +#define IN_S_PREPARE_READ_ENC_PAGE 6 +#define IN_S_PREPARE_SPARSE_DATA 7 +#define IN_S_PREPARE_SPARSE_DATA_HDR 8 +#define IN_S_PREPARE_SPARSE_DATA_CONT 9 +#define IN_S_HANDLE_EPILOGUE 10 +#define IN_S_FINISH_SKIP 11 #define OUT_S_QUEUE_DATA 1 #define OUT_S_QUEUE_DATA_CONT 2 @@ -1753,13 +1756,13 @@ static int prepare_read_control_remainder(struct ceph_connection *con) return 0; } -static int prepare_read_data(struct ceph_connection *con) +static int prepare_read_data_extent(struct ceph_connection *con, int off, int len) { struct bio_vec bv; - con->in_data_crc = -1; - ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg, - data_len(con->in_msg)); + ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg, off+len); + if (off) + ceph_msg_data_advance(&con->v2.in_cursor, off); get_bvec_at(&con->v2.in_cursor, &bv); if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { @@ -1775,10 +1778,20 @@ static int prepare_read_data(struct ceph_connection *con) bv.bv_offset = 0; } set_in_bvec(con, &bv); - con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT; return 0; } +static int prepare_read_data(struct ceph_connection *con) +{ + int ret; + + con->in_data_crc = -1; + ret = prepare_read_data_extent(con, 0, data_len(con->in_msg)); + if (ret == 0) + con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT; + return ret; +} + static void prepare_read_data_cont(struct ceph_connection *con) { struct bio_vec bv; @@ -1819,6 +1832,116 @@ static void prepare_read_data_cont(struct ceph_connection *con) con->v2.in_state = IN_S_HANDLE_EPILOGUE; } +static int prepare_sparse_read_cont(struct ceph_connection *con) +{ + int ret; + struct bio_vec bv; + char *buf = NULL; + u64 off = 0, len = 0; + + if (!iov_iter_is_bvec(&con->v2.in_iter)) + return -EIO; + + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + con->in_data_crc = crc32c(con->in_data_crc, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + + get_bvec_at(&con->v2.in_cursor, &bv); + memcpy_to_page(bv.bv_page, bv.bv_offset, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + } else { + con->in_data_crc = ceph_crc32c_page(con->in_data_crc, + con->v2.in_bvec.bv_page, + con->v2.in_bvec.bv_offset, + con->v2.in_bvec.bv_len); + } + + ceph_msg_data_advance(&con->v2.in_cursor, con->v2.in_bvec.bv_len); + if (con->v2.in_cursor.total_resid) { + get_bvec_at(&con->v2.in_cursor, &bv); + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + } + set_in_bvec(con, &bv); + WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT); + return 0; + } + + /* get next extent */ + ret = con->ops->sparse_read(con, &off, &len, &buf); + if (ret <= 0) { + if (ret < 0) + return ret; + + reset_in_kvecs(con); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; + return 0; + } + + return prepare_read_data_extent(con, off, len); +} + +static int prepare_sparse_read_header(struct ceph_connection *con) +{ + int ret; + char *buf = NULL; + u64 off = 0, len = 0; + + if (!iov_iter_is_kvec(&con->v2.in_iter)) + return -EIO; + + /* On first call, we have no kvec so don't compute crc */ + if (con->v2.in_kvec_cnt) { + WARN_ON_ONCE(con->v2.in_kvec_cnt > 1); + con->in_data_crc = crc32c(con->in_data_crc, + con->v2.in_kvecs[0].iov_base, + con->v2.in_kvecs[0].iov_len); + } + + ret = con->ops->sparse_read(con, &off, &len, &buf); + if (ret < 0) + return ret; + if (ret == 0) { + reset_in_kvecs(con); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; + return 0; + } + + /* No actual data? */ + if (WARN_ON_ONCE(!ret)) + return -EIO; + + if (!buf) { + ret = prepare_read_data_extent(con, off, len); + if (!ret) + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT; + return ret; + } + + WARN_ON_ONCE(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_HDR); + reset_in_kvecs(con); + add_in_kvec(con, buf, len); + return 0; +} + +static int prepare_sparse_read_data(struct ceph_connection *con) +{ + if (WARN_ON_ONCE(!con->ops->sparse_read)) + return -EOPNOTSUPP; + + if (!con_secure(con)) + con->in_data_crc = -1; + + reset_in_kvecs(con); + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_HDR; + return prepare_sparse_read_header(con); +} + static int prepare_read_tail_plain(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; @@ -1839,7 +1962,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con) } if (data_len(msg)) { - con->v2.in_state = IN_S_PREPARE_READ_DATA; + if (msg->sparse_read) + con->v2.in_state = IN_S_PREPARE_SPARSE_DATA; + else + con->v2.in_state = IN_S_PREPARE_READ_DATA; } else { add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); con->v2.in_state = IN_S_HANDLE_EPILOGUE; @@ -2893,6 +3019,15 @@ static int populate_in_iter(struct ceph_connection *con) prepare_read_enc_page(con); ret = 0; break; + case IN_S_PREPARE_SPARSE_DATA: + ret = prepare_sparse_read_data(con); + break; + case IN_S_PREPARE_SPARSE_DATA_HDR: + ret = prepare_sparse_read_header(con); + break; + case IN_S_PREPARE_SPARSE_DATA_CONT: + ret = prepare_sparse_read_cont(con); + break; case IN_S_HANDLE_EPILOGUE: ret = handle_epilogue(con); break; @@ -3501,6 +3636,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con) void ceph_con_v2_revoke_incoming(struct ceph_connection *con) { switch (con->v2.in_state) { + case IN_S_PREPARE_SPARSE_DATA: // FIXME case IN_S_PREPARE_READ_DATA: revoke_at_prepare_read_data(con); break; -- 2.34.1