[PATCH 06/16] libceph: switch data cursor from page to iov_iter for messenger

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The first problem is performance. Why not to pass to read/write
socket function the whole iov_iter and let socket API handle
everything at once instead of doing IO page by page?  So better
to make data cursor as iov_iter, which is generic for many API
calls.

The second reason is the support of kvec in the future, i.e. we
do not have a page in a hand, but a buffer.

So this patch is a preparation, the first iteration: users of data
cursor do not see pages, but use cursor->iter instead.  Internally
cursor still uses page.  In next patches that will be avoided.

We are still able to use sendpage() for 0-copy and have performance
benefit from multi-pages, i.e. if bvec in iter is a multi-page,
then we pass the whole multi-page to sendpage() and not only 4k.

Important to mention that for sendpage() MSG_SENDPAGE_NOTLAST is
always set if @more flag is true.  We know that the footer of a
message will follow, @more will be false and all data will be
pushed out of the socket.

Signed-off-by: Roman Penyaev <rpenyaev@xxxxxxx>
---
 include/linux/ceph/messenger.h |   3 +
 net/ceph/messenger.c           | 141 ++++++++++++++++++++++-----------
 2 files changed, 97 insertions(+), 47 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 424f9f1989b7..044c74333c27 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -192,6 +192,9 @@ struct ceph_msg_data_cursor {
 	size_t			total_resid;	/* across all data items */
 
 	struct ceph_msg_data	*data;		/* current data item */
+	struct iov_iter         iter;           /* iterator for current data */
+	struct bio_vec          it_bvec;        /* used as an addition to it */
+	unsigned int            direction;      /* data direction */
 	size_t			resid;		/* bytes not yet consumed */
 	bool			last_piece;	/* current is last piece */
 	bool			need_crc;	/* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 08786d75b990..709d9f26f755 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -523,6 +523,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 	return r;
 }
 
+static int ceph_tcp_recviov(struct socket *sock, struct iov_iter *iter)
+{
+	struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
+			      .msg_iter = *iter };
+	int r;
+
+	if (!iter->count)
+		msg.msg_flags |= MSG_TRUNC;
+
+	r = sock_recvmsg(sock, &msg, msg.msg_flags);
+	if (r == -EAGAIN)
+		r = 0;
+	return r;
+}
+
+__attribute__((unused))
 static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
 		     int page_offset, size_t length)
 {
@@ -594,6 +610,42 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
 	return ret;
 }
 
+/**
+ * ceph_tcp_sendiov() - either does sendmsg() or 0-copy sendpage()
+ *
+ * @more is true if caller will be sending more data shortly.
+ */
+static int ceph_tcp_sendiov(struct socket *sock, struct iov_iter *iter,
+			    bool more)
+{
+	if (iov_iter_is_bvec(iter)) {
+		const struct bio_vec *bvec = &iter->bvec[0];
+		int flags = more ? MSG_MORE | MSG_SENDPAGE_NOTLAST : 0;
+
+		/* Do 0-copy instead of sendmsg */
+
+		return ceph_tcp_sendpage(sock, bvec->bv_page,
+					 iter->iov_offset + bvec->bv_offset,
+					 bvec->bv_len - iter->iov_offset,
+					 flags);
+	} else {
+		struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
+				      .msg_iter = *iter };
+		int r;
+
+		if (more)
+			msg.msg_flags |= MSG_MORE;
+		else
+			/* superfluous, but what the hell */
+			msg.msg_flags |= MSG_EOR;
+
+		r = sock_sendmsg(sock, &msg);
+		if (r == -EAGAIN)
+			r = 0;
+		return r;
+	}
+}
+
 /*
  * Shutdown/close the socket for the given connection.
  */
@@ -1086,12 +1138,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
 }
 
 /*
- * Message data is handled (sent or received) in pieces, where each
- * piece resides on a single page.  The network layer might not
- * consume an entire piece at once.  A data item's cursor keeps
- * track of which piece is next to process and how much remains to
- * be processed in that piece.  It also tracks whether the current
- * piece is the last one in the data item.
+ * Message data is iterated (sent or received) by internal iov_iter.
  */
 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 {
@@ -1120,7 +1167,8 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 	cursor->need_crc = true;
 }
 
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void ceph_msg_data_cursor_init(unsigned int dir, struct ceph_msg *msg,
+				      size_t length)
 {
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 
@@ -1130,33 +1178,33 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
 
 	cursor->total_resid = length;
 	cursor->data = msg->data;
+	cursor->direction = dir;
 
 	__ceph_msg_data_cursor_init(cursor);
 }
 
 /*
- * Return the page containing the next piece to process for a given
- * data item, and supply the page offset and length of that piece.
+ * Setups cursor->iter for the next piece to process.
  */
-static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
-					size_t *page_offset, size_t *length)
+static void ceph_msg_data_next(struct ceph_msg_data_cursor *cursor)
 {
 	struct page *page;
+	size_t off, len;
 
 	switch (cursor->data->type) {
 	case CEPH_MSG_DATA_PAGELIST:
-		page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
+		page = ceph_msg_data_pagelist_next(cursor, &off, &len);
 		break;
 	case CEPH_MSG_DATA_PAGES:
-		page = ceph_msg_data_pages_next(cursor, page_offset, length);
+		page = ceph_msg_data_pages_next(cursor, &off, &len);
 		break;
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
-		page = ceph_msg_data_bio_next(cursor, page_offset, length);
+		page = ceph_msg_data_bio_next(cursor, &off, &len);
 		break;
 #endif /* CONFIG_BLOCK */
 	case CEPH_MSG_DATA_BVECS:
-		page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+		page = ceph_msg_data_bvecs_next(cursor, &off, &len);
 		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
@@ -1165,11 +1213,16 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
 	}
 
 	BUG_ON(!page);
-	BUG_ON(*page_offset + *length > PAGE_SIZE);
-	BUG_ON(!*length);
-	BUG_ON(*length > cursor->resid);
+	BUG_ON(off + len > PAGE_SIZE);
+	BUG_ON(!len);
+	BUG_ON(len > cursor->resid);
+
+	cursor->it_bvec.bv_page = page;
+	cursor->it_bvec.bv_len = len;
+	cursor->it_bvec.bv_offset = off;
 
-	return page;
+	iov_iter_bvec(&cursor->iter, cursor->direction,
+		      &cursor->it_bvec, 1, len);
 }
 
 /*
@@ -1220,11 +1273,12 @@ static size_t sizeof_footer(struct ceph_connection *con)
 	    sizeof(struct ceph_msg_footer_old);
 }
 
-static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+static void prepare_message_data(unsigned int dir, struct ceph_msg *msg,
+				 u32 data_len)
 {
 	/* Initialize data cursor */
 
-	ceph_msg_data_cursor_init(msg, (size_t)data_len);
+	ceph_msg_data_cursor_init(dir, msg, (size_t)data_len);
 }
 
 /*
@@ -1331,7 +1385,7 @@ static void prepare_write_message(struct ceph_connection *con)
 	/* is there a data payload? */
 	con->out_msg->footer.data_crc = 0;
 	if (m->data_length) {
-		prepare_message_data(con->out_msg, m->data_length);
+		prepare_message_data(WRITE, con->out_msg, m->data_length);
 		con->out_more = 1;  /* data + footer will follow */
 	} else {
 		/* no, queue up footer too and be done */
@@ -1532,16 +1586,19 @@ static int write_partial_kvec(struct ceph_connection *con)
 	return ret;  /* done! */
 }
 
-static u32 ceph_crc32c_page(u32 crc, struct page *page,
-				unsigned int page_offset,
-				unsigned int length)
+static int crc32c_kvec(struct kvec *vec, void *p)
 {
-	char *kaddr;
+	u32 *crc = p;
 
-	kaddr = kmap(page);
-	BUG_ON(kaddr == NULL);
-	crc = crc32c(crc, kaddr + page_offset, length);
-	kunmap(page);
+	*crc = crc32c(*crc, vec->iov_base, vec->iov_len);
+
+	return 0;
+}
+
+static u32 ceph_crc32c_iov(u32 crc, struct iov_iter *iter,
+			   unsigned int length)
+{
+	iov_iter_for_each_range(iter, length, crc32c_kvec, &crc);
 
 	return crc;
 }
@@ -1557,7 +1614,6 @@ static int write_partial_message_data(struct ceph_connection *con)
 	struct ceph_msg *msg = con->out_msg;
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-	int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
 	u32 crc;
 
 	dout("%s %p msg %p\n", __func__, con, msg);
@@ -1575,9 +1631,6 @@ static int write_partial_message_data(struct ceph_connection *con)
 	 */
 	crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
 	while (cursor->total_resid) {
-		struct page *page;
-		size_t page_offset;
-		size_t length;
 		int ret;
 
 		if (!cursor->resid) {
@@ -1585,11 +1638,8 @@ static int write_partial_message_data(struct ceph_connection *con)
 			continue;
 		}
 
-		page = ceph_msg_data_next(cursor, &page_offset, &length);
-		if (length == cursor->total_resid)
-			more = MSG_MORE;
-		ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
-					more);
+		ceph_msg_data_next(cursor);
+		ret = ceph_tcp_sendiov(con->sock, &cursor->iter, true);
 		if (ret <= 0) {
 			if (do_datacrc)
 				msg->footer.data_crc = cpu_to_le32(crc);
@@ -1597,7 +1647,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 			return ret;
 		}
 		if (do_datacrc && cursor->need_crc)
-			crc = ceph_crc32c_page(crc, page, page_offset, length);
+			crc = ceph_crc32c_iov(crc, &cursor->iter, ret);
 		ceph_msg_data_advance(cursor, (size_t)ret);
 	}
 
@@ -2315,9 +2365,6 @@ static int read_partial_msg_data(struct ceph_connection *con)
 	struct ceph_msg *msg = con->in_msg;
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
 	bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-	struct page *page;
-	size_t page_offset;
-	size_t length;
 	u32 crc = 0;
 	int ret;
 
@@ -2332,8 +2379,8 @@ static int read_partial_msg_data(struct ceph_connection *con)
 			continue;
 		}
 
-		page = ceph_msg_data_next(cursor, &page_offset, &length);
-		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+		ceph_msg_data_next(cursor);
+		ret = ceph_tcp_recviov(con->sock, &cursor->iter);
 		if (ret <= 0) {
 			if (do_datacrc)
 				con->in_data_crc = crc;
@@ -2342,7 +2389,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 		}
 
 		if (do_datacrc)
-			crc = ceph_crc32c_page(crc, page, page_offset, ret);
+			crc = ceph_crc32c_iov(crc, &cursor->iter, ret);
 		ceph_msg_data_advance(cursor, (size_t)ret);
 	}
 	if (do_datacrc)
@@ -2443,7 +2490,7 @@ static int read_partial_message(struct ceph_connection *con)
 		/* prepare for data payload, if any */
 
 		if (data_len)
-			prepare_message_data(con->in_msg, data_len);
+			prepare_message_data(READ, con->in_msg, data_len);
 	}
 
 	/* front */
-- 
2.24.1




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux