[PATCH rdma-core] librdmacm/rsockets: Use service thread to accept connections

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Rsockets currently uses the application thread to drive the
progress state of new connections.  However, some applications
expect that new connections can be established without the
application taking specific actions.  For example, some
apps do this sequence:

s = socket();
c = socket();
fcntl(s, O_NONBLOCK);
listen(s);
connect(c);
a = accept(s);

In rsockets, this hangs at connect because nothing processes the
incoming connection request.  This problem was reported when
integrating rsockets in the Java Development Kit.

To better support the socket semantic, move the processing of
connection requests to a service thread.  We setup a
socketpair on any listening socket.  After a connection has been
established, the listening socket is notified via the socketpair.
With this change, a single thread app can connect to itself and
transfer data.

Signed-off-by: Sean Hefty <sean.hefty@xxxxxxxxx>
---
 librdmacm/rsocket.c |  187 ++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 146 insertions(+), 41 deletions(-)

diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c
index 48f3071..2447089 100644
--- a/librdmacm/rsocket.c
+++ b/librdmacm/rsocket.c
@@ -77,7 +77,9 @@ enum {
 	RS_SVC_REM_DGRAM,
 	RS_SVC_ADD_KEEPALIVE,
 	RS_SVC_REM_KEEPALIVE,
-	RS_SVC_MOD_KEEPALIVE
+	RS_SVC_MOD_KEEPALIVE,
+	RS_SVC_ADD_LISTEN,
+	RS_SVC_REM_LISTEN,
 };
 
 struct rs_svc_msg {
@@ -109,6 +111,12 @@ static struct rs_svc tcp_svc = {
 	.context_size = sizeof(*tcp_svc_timeouts),
 	.run = tcp_svc_run
 };
+static struct pollfd *listen_svc_fds;
+static void *listen_svc_run(void *arg);
+static struct rs_svc listen_svc = {
+	.context_size = sizeof(*listen_svc_fds),
+	.run = listen_svc_run
+};
 
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
@@ -310,6 +318,7 @@ struct rsocket {
 			struct rdma_cm_id *cm_id;
 			uint64_t	  tcp_opts;
 			unsigned int	  keepalive_time;
+			int		  accept_queue[2];
 
 			unsigned int	  ctrl_seqno;
 			unsigned int	  ctrl_max_seqno;
@@ -1026,6 +1035,11 @@ static void rs_free(struct rsocket *rs)
 		rdma_destroy_id(rs->cm_id);
 	}
 
+	if (rs->accept_queue[0] > 0 || rs->accept_queue[1] > 0) {
+		close(rs->accept_queue[0]);
+		close(rs->accept_queue[1]);
+	}
+
 	fastlock_destroy(&rs->map_lock);
 	fastlock_destroy(&rs->cq_wait_lock);
 	fastlock_destroy(&rs->cq_lock);
@@ -1203,45 +1217,52 @@ int rlisten(int socket, int backlog)
 	if (!rs)
 		return ERR(EBADF);
 
-	if (rs->state != rs_listening) {
-		ret = rdma_listen(rs->cm_id, backlog);
-		if (!ret)
-			rs->state = rs_listening;
-	} else {
-		ret = 0;
+	if (rs->state == rs_listening)
+		return 0;
+
+	ret = rdma_listen(rs->cm_id, backlog);
+	if (ret)
+		return ret;
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rs->accept_queue);
+	if (ret)
+		return ret;
+
+	if (rs->fd_flags & O_NONBLOCK) {
+		ret = set_fd_nonblock(rs->accept_queue[0], true);
+		if (ret)
+			return ret;
 	}
-	return ret;
+
+	ret = set_fd_nonblock(rs->cm_id->channel->fd, true);
+	if (ret)
+		return ret;
+
+	ret = rs_notify_svc(&listen_svc, rs, RS_SVC_ADD_LISTEN);
+	if (ret)
+		return ret;
+
+	rs->state = rs_listening;
+	return 0;
 }
 
-/*
- * Nonblocking is usually not inherited between sockets, but we need to
- * inherit it here to establish the connection only.  This is needed to
- * prevent rdma_accept from blocking until the remote side finishes
- * establishing the connection.  If we were to allow rdma_accept to block,
- * then a single thread cannot establish a connection with itself, or
- * two threads which try to connect to each other can deadlock trying to
- * form a connection.
- *
- * Data transfers on the new socket remain blocking unless the user
- * specifies otherwise through rfcntl.
- */
-int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+/* Accepting new connection requests is currently a blocking operation */
+static void rs_accept(struct rsocket *rs)
 {
-	struct rsocket *rs, *new_rs;
+	struct rsocket *new_rs;
 	struct rdma_conn_param param;
 	struct rs_conn_data *creq, cresp;
+	struct rdma_cm_id *cm_id;
 	int ret;
 
-	rs = idm_lookup(&idm, socket);
-	if (!rs)
-		return ERR(EBADF);
+	ret = rdma_get_request(rs->cm_id, &cm_id);
+	if (ret)
+		return;
+
 	new_rs = rs_alloc(rs, rs->type);
 	if (!new_rs)
-		return ERR(ENOMEM);
-
-	ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
-	if (ret)
 		goto err;
+	new_rs->cm_id = cm_id;
 
 	ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
 	if (ret < 0)
@@ -1249,13 +1270,8 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
 
 	creq = (struct rs_conn_data *)
 	       (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
-	if (creq->version != 1) {
-		ret = ERR(ENOTSUP);
+	if (creq->version != 1)
 		goto err;
-	}
-
-	if (rs->fd_flags & O_NONBLOCK)
-		set_fd_nonblock(new_rs->cm_id->channel->fd, true);
 
 	ret = rs_create_ep(new_rs);
 	if (ret)
@@ -1274,13 +1290,34 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
 	else
 		goto err;
 
+	 write_all(rs->accept_queue[1], &new_rs, sizeof(new_rs));
+	return;
+
+err:
+	rdma_reject(cm_id, NULL, 0);
+	if (new_rs)
+		rs_free(new_rs);
+}
+
+int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
+{
+	struct rsocket *rs, *new_rs;
+	int ret;
+
+	rs = idm_lookup(&idm, socket);
+	if (!rs)
+		return ERR(EBADF);
+
+	if (rs->state != rs_listening)
+		return ERR(EBADF);
+
+	ret = read(rs->accept_queue[0], &new_rs, sizeof(new_rs));
+	if (ret != sizeof(new_rs))
+		return ret;
+
 	if (addr && addrlen)
 		rgetpeername(new_rs->index, addr, addrlen);
 	return new_rs->index;
-
-err:
-	rs_free(new_rs);
-	return ret;
 }
 
 static int rs_do_connect(struct rsocket *rs)
@@ -3314,8 +3351,13 @@ int rclose(int socket)
 	if (rs->type == SOCK_STREAM) {
 		if (rs->state & rs_connected)
 			rshutdown(socket, SHUT_RDWR);
-		else if (rs->opts & RS_OPT_SVC_ACTIVE)
-			rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+		
+		if (rs->opts & RS_OPT_SVC_ACTIVE) {
+			if (rs->state == rs_listening)
+				rs_notify_svc(&listen_svc, rs, RS_SVC_REM_LISTEN);
+			else
+				rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
+		}
 	} else {
 		ds_shutdown(rs);
 	}
@@ -4359,3 +4401,66 @@ static void *tcp_svc_run(void *arg)
 
 	return NULL;
 }
+
+static void listen_svc_process_sock(struct rs_svc *svc)
+{
+	struct rs_svc_msg msg;
+
+	read_all(svc->sock[1], &msg, sizeof msg);
+	switch (msg.cmd) {
+	case RS_SVC_ADD_LISTEN:
+		msg.status = rs_svc_add_rs(svc, msg.rs);
+		if (!msg.status) {
+			msg.rs->opts |= RS_OPT_SVC_ACTIVE;
+			listen_svc_fds = svc->contexts;
+			listen_svc_fds[svc->cnt].fd = msg.rs->cm_id->channel->fd;
+			listen_svc_fds[svc->cnt].events = POLLIN;
+			listen_svc_fds[svc->cnt].revents = 0;
+		}
+		break;
+	case RS_SVC_REM_LISTEN:
+		msg.status = rs_svc_rm_rs(svc, msg.rs);
+		if (!msg.status)
+			msg.rs->opts &= ~RS_OPT_SVC_ACTIVE;
+		break;
+	case RS_SVC_NOOP:
+		msg.status = 0;
+		break;
+	default:
+		break;
+	}
+	write_all(svc->sock[1], &msg, sizeof msg);
+}
+
+static void *listen_svc_run(void *arg)
+{
+	struct rs_svc *svc = arg;
+	struct rs_svc_msg msg;
+	int i, ret;
+
+	ret = rs_svc_grow_sets(svc, 4);
+	if (ret) {
+		msg.status = ret;
+		write_all(svc->sock[1], &msg, sizeof msg);
+		return (void *) (uintptr_t) ret;
+	}
+
+	listen_svc_fds = svc->contexts;
+	listen_svc_fds[0].fd = svc->sock[1];
+	listen_svc_fds[0].events = POLLIN;
+	do {
+		for (i = 0; i <= svc->cnt; i++)
+			listen_svc_fds[i].revents = 0;
+
+		poll(listen_svc_fds, svc->cnt + 1, -1);
+		if (listen_svc_fds[0].revents)
+			listen_svc_process_sock(svc);
+
+		for (i = 1; i <= svc->cnt; i++) {
+			if (listen_svc_fds[i].revents)
+				rs_accept(svc->rss[i]);
+		}
+	} while (svc->cnt >= 1);
+
+	return NULL;
+}






[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux