[PATCH rdma-core 4/5] rsockets: Wake-up all waiting threads on poll events

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



In order to have rpoll() block the calling thread until the desired
event occurs, we map rpoll() to poll() on the CQ fd (or the rdma cm
fd if the rsocket is not yet connected).  However, poll() is waiting
on reading an event from the CQ, and not directly on the state of the
rsocket.  This can result in rpoll() behaving differently than poll()
in multi-thread situations.

For example, have two threads call rpoll(), one waiting for POLLIN, the
other POLLOUT.  When a new completion is written to the CQ, an event
will be written to the fd.  It's possible for one thread (say the
POLLOUT one) to wake up, read the CQ event, and process the completion.
The completion could report that data is now available to be read from
the rsocket, and that POLLIN should be signaled.  When the POLLIN
thread wakes up, it finds the CQ fd empty, so continues blocking on
poll in the kernel.  In this case, the thread never exits the kernel.

In the above situation, the application will hang.

Threads blocking in rpoll() should return based on changes to the state
of the rsocket(s) that they are monitoring.  And the state of the
rsocket may be modified by another thread.  To handle this situation,
when the state of an rsocket _may_ have changed, we wake up all threads
blocked in poll(), so they can re-check the state of their rsocket(s).

Note that it's easier to have all threads re-check the rsocket states
than perform more complex state tracking because of the difficulty in
trying to track which rsockets have been passed into multiple rpoll()
calls.

Rpoll() is modified as follows.  When a thread has processed any event,
it halts future calls into poll().  It then writes to a socketpair to
signal all threads blocked in poll() to wake-up.  Only after all threads
return from the kernel are threads allowed to resume calling poll().

Signed-off-by: Sean Hefty <sean.hefty@xxxxxxxxx>
---
 librdmacm/rsocket.c |  147 +++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 130 insertions(+), 17 deletions(-)

diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c
index 9ac881bc..ed5e0fe3 100644
--- a/librdmacm/rsocket.c
+++ b/librdmacm/rsocket.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008-2014 Intel Corporation.  All rights reserved.
+ * Copyright (c) 2008-2019 Intel Corporation.  All rights reserved.
  *
  * This software is available to you under a choice of one of two
  * licenses.  You may choose to be licensed under the terms of the GNU
@@ -118,6 +118,10 @@ static struct rs_svc listen_svc = {
 	.run = listen_svc_run
 };
 
+static uint32_t pollcnt;
+static bool suspendpoll;
+static int pollsignal[2];
+
 static uint16_t def_iomap_size = 0;
 static uint16_t def_inline = 64;
 static uint16_t def_sqsize = 384;
@@ -469,8 +473,8 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
 	msg.cmd = cmd;
 	msg.status = EINVAL;
 	msg.rs = rs;
-	write_all(svc->sock[0], &msg, sizeof msg);
-	read_all(svc->sock[0], &msg, sizeof msg);
+	write_all(svc->sock[0], &msg, sizeof(msg));
+	read_all(svc->sock[0], &msg, sizeof(msg));
 	ret = rdma_seterrno(msg.status);
 	if (svc->cnt)
 		goto unlock;
@@ -2983,19 +2987,123 @@ static uint64_t rs_time_us(void)
 	return now.tv_sec * 1000000 + now.tv_usec;
 }
 
+/* When mapping rpoll to poll, the events reported on the RDMA
+ * fd are independent from the events rpoll may be looking for.
+ * To avoid threads hanging in poll, whenever any event occurs,
+ * we need to wakeup all threads in poll, so that they can check
+ * if there has been a change on the rsockets they are monitoring.
+ * To support this, we 'gate' threads entering and leaving rpoll.
+ */
+static int rs_pollinit(void)
+{
+	int ret = 0;
+
+	pthread_mutex_lock(&mut);
+	if (pollsignal[0] || pollsignal[1])
+		goto unlock;
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, pollsignal);
+	if (ret)
+		goto unlock;
+
+	/* Avoid hangs clearing the signal (reading fd) */
+	ret = set_fd_nonblock(pollsignal[0], true);
+	if (ret) {
+		close(pollsignal[0]);
+		close(pollsignal[1]);
+		pollsignal[0] = 0;
+		pollsignal[1] = 0;
+	}
+unlock:
+	pthread_mutex_unlock(&mut);
+	return ret;
+}
+
+/* When an event occurs, we must wait until the state of all rsockets
+ * has settled.  Then we need to re-check the rsocket state prior to
+ * blocking on poll().
+ */
+static int rs_poll_enter(void)
+{
+	pthread_mutex_lock(&mut);
+	if (suspendpoll) {
+		pthread_mutex_unlock(&mut);
+		pthread_yield();
+		return -EBUSY;
+	}
+
+	pollcnt++;
+	pthread_mutex_unlock(&mut);
+	return 0;
+}
+
+static void rs_poll_exit(void)
+{
+	ssize_t __attribute__((unused)) rc;
+	char c;
+
+	pthread_mutex_lock(&mut);
+	if (!--pollcnt) {
+		rc = read(pollsignal[0], &c, sizeof(c));
+		suspendpoll = 0;
+	}
+	pthread_mutex_unlock(&mut);
+}
+
+/* When an event occurs, it's possible for a single thread blocked in
+ * poll to return from the kernel, read the event, and update the state
+ * of an rsocket.  However, that can leave threads blocked in the kernel
+ * on poll (trying to read the CQ fd), which have had their rsocket
+ * state set.  To avoid those threads remaining blocked in the kernel,
+ * we must wake them up and ensure that they all return to user space,
+ * in order to re-check the state of their rsockets.
+ *
+ * Because poll is racy wrt updating the rsocket states, we need to
+ * signal state checks whenever a thread updates the state of a
+ * monitored rsocket, independent of whether that thread actually
+ * reads an event from an fd.  In other words, we must wake up all
+ * polling threads whenever poll() indicates that there is a new
+ * completion to process, and when rpoll() will return a successful
+ * value after having blocked.
+ */
+static void rs_poll_stop(void)
+{
+	ssize_t __attribute__((unused)) rc;
+	char c = 5;
+
+	pthread_mutex_lock(&mut);
+	if (!--pollcnt) {
+		rc = read(pollsignal[0], &c, sizeof(c));
+		suspendpoll = 0;
+	} else if (!suspendpoll) {
+		suspendpoll = 1;
+		write_all(pollsignal[1], &c, sizeof(c));
+	}
+	pthread_mutex_unlock(&mut);
+}
+
+/* We always add the pollsignal read fd to the poll fd set, so
+ * that we can signal any blocked threads.
+ */
 static struct pollfd *rs_fds_alloc(nfds_t nfds)
 {
 	static __thread struct pollfd *rfds;
 	static __thread nfds_t rnfds;
 
-	if (nfds > rnfds) {
+	if (nfds + 1 > rnfds) {
 		if (rfds)
 			free(rfds);
+		else if (rs_pollinit())
+			return NULL;
 
-		rfds = malloc(sizeof(*rfds) * nfds);
-		rnfds = rfds ? nfds : 0;
+		rfds = malloc(sizeof(*rfds) * nfds + 1);
+		rnfds = rfds ? nfds + 1 : 0;
 	}
 
+	if (rfds) {
+		rfds[nfds].fd = pollsignal[0];
+		rfds[nfds].events = POLLIN;
+	}
 	return rfds;
 }
 
@@ -3120,17 +3228,16 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
 	int i, cnt = 0;
 
 	for (i = 0; i < nfds; i++) {
-		if (!rfds[i].revents)
-			continue;
-
 		rs = idm_lookup(&idm, fds[i].fd);
 		if (rs) {
-			fastlock_acquire(&rs->cq_wait_lock);
-			if (rs->type == SOCK_STREAM)
-				rs_get_cq_event(rs);
-			else
-				ds_get_cq_event(rs);
-			fastlock_release(&rs->cq_wait_lock);
+			if (rfds[i].revents) {
+				fastlock_acquire(&rs->cq_wait_lock);
+				if (rs->type == SOCK_STREAM)
+					rs_get_cq_event(rs);
+				else
+					ds_get_cq_event(rs);
+				fastlock_release(&rs->cq_wait_lock);
+			}
 			fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
 		} else {
 			fds[i].revents = rfds[i].revents;
@@ -3174,17 +3281,23 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
 		if (ret)
 			break;
 
+		if (rs_poll_enter())
+			continue;
+
 		if (timeout >= 0) {
 			timeout -= (int) ((rs_time_us() - start_time) / 1000);
 			if (timeout <= 0)
 				return 0;
 		}
 
-		ret = poll(rfds, nfds, timeout);
-		if (ret <= 0)
+		ret = poll(rfds, nfds + 1, timeout);
+		if (ret <= 0) {
+			rs_poll_exit();
 			break;
+		}
 
 		ret = rs_poll_events(rfds, fds, nfds);
+		rs_poll_stop();
 	} while (!ret);
 
 	return ret;





[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux