Multi-threaded socket code

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This is the infamous "gatling gun" code to allow greater parallelism
within the socket transport. It's based on my SSL-transport patch
(https://github.com/gluster/glusterfs/pull/2). The two are related
because a lot of work occurs within SSL_read/SSL_write and the inherent
single-threading of the socket code via a single polling thread severely
impacts performance in that case. In any case, this is purely a proof of
concept or request for comments at this point, due to several
deficiencies that I'll get to in a moment. To expand a bit on the commit
comment...


Good: this yields a >2x performance on my tests using SSL. On the
24-core/48GB/10GbE machines in the lab, "iozone -r 1m -i 0 -l 24"
improves from 185MB/s to over 400MB/s between a single client and single
server using SSL (850MB/s without is typical) and parallel threads go
from ~2.5 to ~7.5 (even with io-threads in both cases). There might even
be some performance in non-SSL cases, e.g. a single client connecting to
many servers, but that's just icing on the cake.

Bad: the code doesn't clean up on disconnect properly, doesn't work well
with non-blocking I/O (which is rather pointless with this code anyway),
and there seems to be some bad interaction with the glusterd port
mapper. Since CloudFS doesn't use that port mapper for other reasons,
it's not affected and I'm tempted not to care, but I guess I should
debug that some day.

Ugly: the management code is very racy, and those races are tickled by
the new threading milieu that socket_poller introduces. The patch
already fixes one pre-existing race in which glusterfs_mgmt_init sets up
a callback before setting up pointers needed by code within that
callback, but there's at least one other serious problem I'm aware of.
Some of the management code (e.g. sending the commit reply for a "volume
create" request) calls socket_submit_reply and then immediately frees
some of the data being sent, so if the message isn't sent synchronously
then the other side gets an invalid message type. Sending synchronously
is the normal case, and it's unlikely that the socket buffers will fill
up on this low-traffic path so that a deferred send will be necessary,
but it is possible. I haven't gone through the inordinately convoluted
code that assembles these messages to figure out exactly where the error
lies, and frankly I'm not wild about debugging that to deal with a
problem that pre-dates my changes. While it's unlikely that the socket
buffers would fill so that a deferred send would become necessary,
especially on the low-traffic management path, it has always been
possible and this code which frees data before its sure to be sent has
always been erroneous.
>From 2cf2bea12c8639355bf3561dc772cad773746026 Mon Sep 17 00:00:00 2001
From: Jeff Darcy <jdarcy@xxxxxxxxxx>
Date: Tue, 31 May 2011 12:13:30 -0400
Subject: [PATCH] Use separate polling thread for each connection.

Good: 2x performance with SSL transport
Bad: doesn't work with portmapper, doesn't clean up properly on disconnect
Ugly: there's still a race with mgmt code calling submit_reply and then
      freeing our data out from under us if the message isn't sent
      synchronously
---
 glusterfsd/src/glusterfsd-mgmt.c      |    3 +-
 rpc/rpc-transport/socket/src/socket.c |  257 +++++++++++++++++++++------------
 rpc/rpc-transport/socket/src/socket.h |    2 +
 3 files changed, 167 insertions(+), 95 deletions(-)

diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c
index 1f5f648..413790b 100644
--- a/glusterfsd/src/glusterfsd-mgmt.c
+++ b/glusterfsd/src/glusterfsd-mgmt.c
@@ -877,6 +877,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
                 gf_log ("", GF_LOG_WARNING, "failed to create rpc clnt");
                 goto out;
         }
+	/* This is used from within mgmt_rpc_notify, so LET'S SET IT FIRST! */
+        ctx->mgmt = rpc;
 
         ret = rpc_clnt_register_notify (rpc, mgmt_rpc_notify, THIS);
         if (ret) {
@@ -894,7 +896,6 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
         if (ret)
                 goto out;
 
-        ctx->mgmt = rpc;
 out:
         return ret;
 }
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index dc84da7..31e8eac 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1,4 +1,5 @@
 /*
+ * #endif
   Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
   This file is part of GlusterFS.
 
@@ -51,6 +52,10 @@
 #define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key"
 #define SSL_CA_LIST_OPT     "transport.socket.ssl-ca-list"
 
+#define POLL_MASK_INPUT  (POLLIN | POLLPRI)
+#define POLL_MASK_OUTPUT (POLLOUT)
+#define POLL_MASK_ERROR  (POLLERR | POLLHUP | POLLNVAL)
+
 #define __socket_proto_reset_pending(priv) do {                 \
                 memset (&priv->incoming.frag.vector, 0,         \
                         sizeof (priv->incoming.frag.vector));   \
@@ -136,6 +141,17 @@
                 __socket_proto_update_priv_after_read (priv, ret, bytes_read); \
         }
 
+/*
+ * The RPC notification callbacks have traditionally only been called from the
+ * single polling thread, and might fail in hard-to-debug ways if called more
+ * than once concurrently.  Worse, that might appear to work most of the time
+ * and then fail only occasionally.  To guard against that, we use this lock
+ * to ensure that we still have only one concurrent up-call, even though it's
+ * coming from one of our own polling threads instead of the global one.  See
+ * the top of socket_poller for one interaction where this turned out to be
+ * necessary - and yes, it was hard to debug.
+ */
+pthread_mutex_t socket_global_lock = PTHREAD_MUTEX_INITIALIZER;
 
 int socket_init (rpc_transport_t *this);
 
@@ -755,9 +771,11 @@ out:
 
 
 int
-__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
+__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct)
 {
-        int ret = -1;
+        int               ret = -1;
+	socket_private_t *priv = NULL;
+	char              a_byte = 0;
 
         ret = __socket_writev (this, entry->pending_vector,
                                entry->pending_count,
@@ -768,6 +786,16 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry)
                 /* current entry was completely written */
                 GF_ASSERT (entry->pending_count == 0);
                 __socket_ioq_entry_free (entry);
+		priv = this->private;
+		/*
+		 * The pipe should only remain readable if there are more
+		 * entries after this, so drain the byte representing this
+		 * entry.
+		 */
+		if (!direct && read(priv->pipe[0],&a_byte,1) < 1) {
+			gf_log(this->name,GF_LOG_WARNING,
+			       "read error on pipe");
+		}
         }
 
         return ret;
@@ -790,18 +818,11 @@ __socket_ioq_churn (rpc_transport_t *this)
                 /* pick next entry */
                 entry = priv->ioq_next;
 
-                ret = __socket_ioq_churn_entry (this, entry);
+                ret = __socket_ioq_churn_entry (this, entry, 0);
 
                 if (ret != 0)
                         break;
         }
-
-        if (list_empty (&priv->ioq)) {
-                /* all pending writes done, not interested in POLLOUT */
-                priv->idx = event_select_on (this->ctx->event_pool,
-                                             priv->sock, priv->idx, -1, 0);
-        }
-
 out:
         return ret;
 }
@@ -855,7 +876,9 @@ socket_event_poll_out (rpc_transport_t *this)
         }
         pthread_mutex_unlock (&priv->lock);
 
+	pthread_mutex_lock(&socket_global_lock);
         ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL);
+	pthread_mutex_unlock(&socket_global_lock);
 
 out:
         return ret;
@@ -1778,10 +1801,12 @@ socket_event_poll_in (rpc_transport_t *this)
         ret = socket_proto_state_machine (this, &pollin);
 
         if (pollin != NULL) {
+		pthread_mutex_lock(&socket_global_lock);
                 ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
                                             pollin);
 
                 rpc_transport_pollin_destroy (pollin);
+		pthread_mutex_unlock(&socket_global_lock);
         }
 
         return ret;
@@ -1860,55 +1885,90 @@ out:
 }
 
 
-/* reads rpc_requests during pollin */
-int
-socket_event_handler (int fd, int idx, void *data,
-                      int poll_in, int poll_out, int poll_err)
+void *
+socket_poller (void *ctx)
 {
-        rpc_transport_t      *this = NULL;
-        socket_private_t *priv = NULL;
-        int               ret = 0;
-
-        this = data;
-        GF_VALIDATE_OR_GOTO ("socket", this, out);
-        GF_VALIDATE_OR_GOTO ("socket", this->private, out);
-        GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
-
-        THIS = this->xl;
-        priv = this->private;
-
-
-        pthread_mutex_lock (&priv->lock);
-        {
-                priv->idx = idx;
-        }
-        pthread_mutex_unlock (&priv->lock);
-
+        rpc_transport_t  *this = ctx;
+        socket_private_t *priv = this->private;
+	struct pollfd     pfd[2] = {{0,},};
+	gf_boolean_t      to_write = _gf_false;
+	int               ret = 0;
+
+	/*
+	 * We can't actually start doing anything that might generate upcalls
+	 * until the thread that created us releases the lock that they held
+	 * while doing so.  Unfortunately, there's no reasonable way for us
+	 * down here in socket-land to associate the creator with the created
+	 * when the interactions are occurring through a glusterd callback, so
+	 * instead of using the more obvious pthread_cond_t approach we just
+	 * (ab)use the global lock as a kind of gate.  Once we have the lock
+	 * we've passed the gate and don't need to do anything more, so we
+	 * just release it right away.
+	 */
+	pthread_mutex_lock(&socket_global_lock);
         if (!priv->connected) {
+		THIS = this->xl;
                 ret = socket_connect_finish (this);
         }
+	pthread_mutex_unlock(&socket_global_lock);
 
-        if (!ret && poll_out) {
-                ret = socket_event_poll_out (this);
-        }
-
-        if (!ret && poll_in) {
-                ret = socket_event_poll_in (this);
-        }
-
-        if ((ret < 0) || poll_err) {
-                /* Logging has happened already in earlier cases */
-                gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
-                        "disconnecting now");
-                socket_event_poll_err (this);
-                rpc_transport_unref (this);
-        }
-
-out:
-        return 0;
+	for (;;) {
+		pthread_mutex_lock(&priv->lock);
+		to_write = !list_empty(&priv->ioq);
+		pthread_mutex_unlock(&priv->lock);
+		pfd[0].fd = priv->pipe[0];
+		pfd[0].events = POLL_MASK_ERROR;
+		pfd[0].revents = 0;
+		pfd[1].fd = priv->sock;
+		pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR;
+		pfd[1].revents = 0;
+		if (to_write) {
+			pfd[1].events |= POLL_MASK_OUTPUT;
+		}
+		else {
+			pfd[0].events |= POLL_MASK_INPUT;
+		}
+		if (poll(pfd,2,-1) < 0) {
+			gf_log(this->name,GF_LOG_ERROR,"poll failed");
+			return NULL;
+		}
+		if (pfd[0].revents & POLL_MASK_ERROR) {
+			gf_log(this->name,GF_LOG_ERROR,
+			       "poll error on pipe");
+			return NULL;
+		}
+		if (pfd[1].revents & POLL_MASK_ERROR) {
+			gf_log(this->name,GF_LOG_ERROR,
+			       "poll error on socket");
+			return NULL;
+		}
+		/* Only glusterd actually seems to need this. */
+		THIS = this->xl;
+		if (pfd[1].revents & POLL_MASK_INPUT) {
+			ret = socket_event_poll_in(this);
+		}
+		else if (pfd[1].revents & POLL_MASK_OUTPUT) {
+			ret = socket_event_poll_out(this);
+		}
+		else {
+			/*
+			 * This usually means that we left poll() because
+			 * somebody pushed a byte onto our pipe.  That wakeup
+			 * is why the pipe is there, but once awake we can do
+			 * all the checking we need on the next iteration.
+			 */
+			ret = 0;
+		}
+		if (ret < 0) {
+			gf_log(this->name,GF_LOG_ERROR,
+			       "unknown error in polling loop");
+			return NULL;
+		}
+	}
 }
 
 
+
 int
 socket_server_event_handler (int fd, int idx, void *data,
                              int poll_in, int poll_out, int poll_err)
@@ -2035,14 +2095,16 @@ socket_server_event_handler (int fd, int idx, void *data,
                                 new_priv->connected = 1;
                                 rpc_transport_ref (new_trans);
 
-                                new_priv->idx =
-                                        event_register (ctx->event_pool,
-                                                        new_sock,
-                                                        socket_event_handler,
-                                                        new_trans, 1, 0);
+				if (pipe(new_priv->pipe) < 0) {
+					gf_log(this->name,GF_LOG_ERROR,
+					       "could not create pipe");
+				}
 
-                                if (new_priv->idx == -1)
-                                        ret = -1;
+				if (pthread_create(&new_priv->thread,NULL,
+						socket_poller,new_trans) != 0) {
+					gf_log(this->name,GF_LOG_ERROR,
+					       "could not create poll thread");
+				}
                         }
                         pthread_mutex_unlock (&new_priv->lock);
                         if (ret == -1) {
@@ -2242,16 +2304,18 @@ socket_connect (rpc_transport_t *this, int port)
                 }
 
                 priv->connected = 0;
-
                 rpc_transport_ref (this);
 
-                priv->idx = event_register (ctx->event_pool, priv->sock,
-                                            socket_event_handler, this, 1, 1);
-                if (priv->idx == -1) {
-                        gf_log ("", GF_LOG_WARNING,
-                                "failed to register the event");
-                        ret = -1;
-                }
+		if (pipe(priv->pipe) < 0) {
+			gf_log(this->name,GF_LOG_ERROR,
+			       "could not create pipe");
+		}
+
+		if (pthread_create(&priv->thread,NULL,
+				socket_poller, this) != 0) {
+			gf_log(this->name,GF_LOG_ERROR,
+			       "could not create poll thread");
+		}
         }
 unlock:
         pthread_mutex_unlock (&priv->lock);
@@ -2413,10 +2477,10 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
 {
         socket_private_t *priv = NULL;
         int               ret = -1;
-        char              need_poll_out = 0;
         char              need_append = 1;
         struct ioq       *entry = NULL;
         glusterfs_ctx_t  *ctx = NULL;
+	char              a_byte = 'j';
 
         GF_VALIDATE_OR_GOTO ("socket", this, out);
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2442,26 +2506,28 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
                         goto unlock;
 
                 if (list_empty (&priv->ioq)) {
-                        ret = __socket_ioq_churn_entry (this, entry);
+                        ret = __socket_ioq_churn_entry (this, entry, 1);
 
-                        if (ret == 0)
+                        if (ret == 0) {
+				gf_log(this->name,GF_LOG_DEBUG,
+				       "request sent in-line");
                                 need_append = 0;
-
-                        if (ret > 0)
-                                need_poll_out = 1;
+			}
                 }
 
                 if (need_append) {
+			gf_log(this->name,GF_LOG_DEBUG,"deferring request");
                         list_add_tail (&entry->list, &priv->ioq);
+			/*
+			 * Make sure the polling thread wakes up, by writing a
+			 * byte to represent this entry.
+			 */
+			if (write(priv->pipe[1],&a_byte,1) < 1) {
+				gf_log(this->name,GF_LOG_WARNING,
+				       "write error on pipe");
+			}
                         ret = 0;
                 }
-
-                if (need_poll_out) {
-                        /* first entry to wait. continue writing on POLLOUT */
-                        priv->idx = event_select_on (ctx->event_pool,
-                                                     priv->sock,
-                                                     priv->idx, -1, 1);
-                }
         }
 unlock:
         pthread_mutex_unlock (&priv->lock);
@@ -2476,10 +2542,10 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
 {
         socket_private_t *priv = NULL;
         int               ret = -1;
-        char              need_poll_out = 0;
         char              need_append = 1;
         struct ioq       *entry = NULL;
         glusterfs_ctx_t  *ctx = NULL;
+	char              a_byte = 'd';
 
         GF_VALIDATE_OR_GOTO ("socket", this, out);
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -2498,33 +2564,36 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
                         }
                         goto unlock;
                 }
+
                 priv->submit_log = 0;
                 entry = __socket_ioq_new (this, &reply->msg);
                 if (!entry)
                         goto unlock;
+
                 if (list_empty (&priv->ioq)) {
-                        ret = __socket_ioq_churn_entry (this, entry);
+                        ret = __socket_ioq_churn_entry (this, entry, 1);
 
-                        if (ret == 0)
+                        if (ret == 0) {
+				gf_log(this->name,GF_LOG_DEBUG,
+				       "reply sent in-line");
                                 need_append = 0;
-
-                        if (ret > 0)
-                                need_poll_out = 1;
+			}
                 }
 
                 if (need_append) {
+			gf_log(this->name,GF_LOG_DEBUG,"deferring reply");
                         list_add_tail (&entry->list, &priv->ioq);
+			/*
+			 * Make sure the polling thread wakes up, by writing a
+			 * byte to represent this entry.
+			 */
+			if (write(priv->pipe[1],&a_byte,1) < 1) {
+				gf_log(this->name,GF_LOG_WARNING,
+				       "write error on pipe");
+			}
                         ret = 0;
                 }
-
-                if (need_poll_out) {
-                        /* first entry to wait. continue writing on POLLOUT */
-                        priv->idx = event_select_on (ctx->event_pool,
-                                                     priv->sock,
-                                                     priv->idx, -1, 1);
-                }
         }
-
 unlock:
         pthread_mutex_unlock (&priv->lock);
 
@@ -2692,7 +2761,7 @@ socket_init (rpc_transport_t *this)
         priv->idx = -1;
         priv->connected = -1;
         priv->nodelay = 1;
-        priv->bio = 0;
+        priv->bio = 1;
         priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
 
         INIT_LIST_HEAD (&priv->ioq);
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index eaa38a9..625306c 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -204,6 +204,8 @@ typedef struct {
 	char                  *ssl_own_cert;
 	char                  *ssl_private_key;
 	char                  *ssl_ca_list;
+	pthread_t              thread;
+	int                    pipe[2];
 } socket_private_t;
 
 
-- 
1.7.3.4


[Index of Archives]     [Gluster Users]     [Ceph Users]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [eCos]     [Asterisk Internet PBX]     [Linux API]

  Powered by Linux