This is the infamous "gatling gun" code to allow greater parallelism within the socket transport. It's based on my SSL-transport patch (https://github.com/gluster/glusterfs/pull/2). The two are related because a lot of work occurs within SSL_read/SSL_write and the inherent single-threading of the socket code via a single polling thread severely impacts performance in that case. In any case, this is purely a proof of concept or request for comments at this point, due to several deficiencies that I'll get to in a moment. To expand a bit on the commit comment... Good: this yields a >2x performance on my tests using SSL. On the 24-core/48GB/10GbE machines in the lab, "iozone -r 1m -i 0 -l 24" improves from 185MB/s to over 400MB/s between a single client and single server using SSL (850MB/s without is typical) and parallel threads go from ~2.5 to ~7.5 (even with io-threads in both cases). There might even be some performance in non-SSL cases, e.g. a single client connecting to many servers, but that's just icing on the cake. Bad: the code doesn't clean up on disconnect properly, doesn't work well with non-blocking I/O (which is rather pointless with this code anyway), and there seems to be some bad interaction with the glusterd port mapper. Since CloudFS doesn't use that port mapper for other reasons, it's not affected and I'm tempted not to care, but I guess I should debug that some day. Ugly: the management code is very racy, and those races are tickled by the new threading milieu that socket_poller introduces. The patch already fixes one pre-existing race in which glusterfs_mgmt_init sets up a callback before setting up pointers needed by code within that callback, but there's at least one other serious problem I'm aware of. Some of the management code (e.g. sending the commit reply for a "volume create" request) calls socket_submit_reply and then immediately frees some of the data being sent, so if the message isn't sent synchronously then the other side gets an invalid message type. Sending synchronously is the normal case, and it's unlikely that the socket buffers will fill up on this low-traffic path so that a deferred send will be necessary, but it is possible. I haven't gone through the inordinately convoluted code that assembles these messages to figure out exactly where the error lies, and frankly I'm not wild about debugging that to deal with a problem that pre-dates my changes. While it's unlikely that the socket buffers would fill so that a deferred send would become necessary, especially on the low-traffic management path, it has always been possible and this code which frees data before its sure to be sent has always been erroneous.
>From 2cf2bea12c8639355bf3561dc772cad773746026 Mon Sep 17 00:00:00 2001 From: Jeff Darcy <jdarcy@xxxxxxxxxx> Date: Tue, 31 May 2011 12:13:30 -0400 Subject: [PATCH] Use separate polling thread for each connection. Good: 2x performance with SSL transport Bad: doesn't work with portmapper, doesn't clean up properly on disconnect Ugly: there's still a race with mgmt code calling submit_reply and then freeing our data out from under us if the message isn't sent synchronously --- glusterfsd/src/glusterfsd-mgmt.c | 3 +- rpc/rpc-transport/socket/src/socket.c | 257 +++++++++++++++++++++------------ rpc/rpc-transport/socket/src/socket.h | 2 + 3 files changed, 167 insertions(+), 95 deletions(-) diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index 1f5f648..413790b 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -877,6 +877,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) gf_log ("", GF_LOG_WARNING, "failed to create rpc clnt"); goto out; } + /* This is used from within mgmt_rpc_notify, so LET'S SET IT FIRST! */ + ctx->mgmt = rpc; ret = rpc_clnt_register_notify (rpc, mgmt_rpc_notify, THIS); if (ret) { @@ -894,7 +896,6 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx) if (ret) goto out; - ctx->mgmt = rpc; out: return ret; } diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index dc84da7..31e8eac 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1,4 +1,5 @@ /* + * #endif Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> This file is part of GlusterFS. @@ -51,6 +52,10 @@ #define SSL_PRIVATE_KEY_OPT "transport.socket.ssl-private-key" #define SSL_CA_LIST_OPT "transport.socket.ssl-ca-list" +#define POLL_MASK_INPUT (POLLIN | POLLPRI) +#define POLL_MASK_OUTPUT (POLLOUT) +#define POLL_MASK_ERROR (POLLERR | POLLHUP | POLLNVAL) + #define __socket_proto_reset_pending(priv) do { \ memset (&priv->incoming.frag.vector, 0, \ sizeof (priv->incoming.frag.vector)); \ @@ -136,6 +141,17 @@ __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ } +/* + * The RPC notification callbacks have traditionally only been called from the + * single polling thread, and might fail in hard-to-debug ways if called more + * than once concurrently. Worse, that might appear to work most of the time + * and then fail only occasionally. To guard against that, we use this lock + * to ensure that we still have only one concurrent up-call, even though it's + * coming from one of our own polling threads instead of the global one. See + * the top of socket_poller for one interaction where this turned out to be + * necessary - and yes, it was hard to debug. + */ +pthread_mutex_t socket_global_lock = PTHREAD_MUTEX_INITIALIZER; int socket_init (rpc_transport_t *this); @@ -755,9 +771,11 @@ out: int -__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) +__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry, int direct) { - int ret = -1; + int ret = -1; + socket_private_t *priv = NULL; + char a_byte = 0; ret = __socket_writev (this, entry->pending_vector, entry->pending_count, @@ -768,6 +786,16 @@ __socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) /* current entry was completely written */ GF_ASSERT (entry->pending_count == 0); __socket_ioq_entry_free (entry); + priv = this->private; + /* + * The pipe should only remain readable if there are more + * entries after this, so drain the byte representing this + * entry. + */ + if (!direct && read(priv->pipe[0],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "read error on pipe"); + } } return ret; @@ -790,18 +818,11 @@ __socket_ioq_churn (rpc_transport_t *this) /* pick next entry */ entry = priv->ioq_next; - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 0); if (ret != 0) break; } - - if (list_empty (&priv->ioq)) { - /* all pending writes done, not interested in POLLOUT */ - priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, priv->idx, -1, 0); - } - out: return ret; } @@ -855,7 +876,9 @@ socket_event_poll_out (rpc_transport_t *this) } pthread_mutex_unlock (&priv->lock); + pthread_mutex_lock(&socket_global_lock); ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); + pthread_mutex_unlock(&socket_global_lock); out: return ret; @@ -1778,10 +1801,12 @@ socket_event_poll_in (rpc_transport_t *this) ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + pthread_mutex_lock(&socket_global_lock); ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); rpc_transport_pollin_destroy (pollin); + pthread_mutex_unlock(&socket_global_lock); } return ret; @@ -1860,55 +1885,90 @@ out: } -/* reads rpc_requests during pollin */ -int -socket_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) +void * +socket_poller (void *ctx) { - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - - this = data; - GF_VALIDATE_OR_GOTO ("socket", this, out); - GF_VALIDATE_OR_GOTO ("socket", this->private, out); - GF_VALIDATE_OR_GOTO ("socket", this->xl, out); - - THIS = this->xl; - priv = this->private; - - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - } - pthread_mutex_unlock (&priv->lock); - + rpc_transport_t *this = ctx; + socket_private_t *priv = this->private; + struct pollfd pfd[2] = {{0,},}; + gf_boolean_t to_write = _gf_false; + int ret = 0; + + /* + * We can't actually start doing anything that might generate upcalls + * until the thread that created us releases the lock that they held + * while doing so. Unfortunately, there's no reasonable way for us + * down here in socket-land to associate the creator with the created + * when the interactions are occurring through a glusterd callback, so + * instead of using the more obvious pthread_cond_t approach we just + * (ab)use the global lock as a kind of gate. Once we have the lock + * we've passed the gate and don't need to do anything more, so we + * just release it right away. + */ + pthread_mutex_lock(&socket_global_lock); if (!priv->connected) { + THIS = this->xl; ret = socket_connect_finish (this); } + pthread_mutex_unlock(&socket_global_lock); - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this); - } - - if ((ret < 0) || poll_err) { - /* Logging has happened already in earlier cases */ - gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), - "disconnecting now"); - socket_event_poll_err (this); - rpc_transport_unref (this); - } - -out: - return 0; + for (;;) { + pthread_mutex_lock(&priv->lock); + to_write = !list_empty(&priv->ioq); + pthread_mutex_unlock(&priv->lock); + pfd[0].fd = priv->pipe[0]; + pfd[0].events = POLL_MASK_ERROR; + pfd[0].revents = 0; + pfd[1].fd = priv->sock; + pfd[1].events = POLL_MASK_INPUT | POLL_MASK_ERROR; + pfd[1].revents = 0; + if (to_write) { + pfd[1].events |= POLL_MASK_OUTPUT; + } + else { + pfd[0].events |= POLL_MASK_INPUT; + } + if (poll(pfd,2,-1) < 0) { + gf_log(this->name,GF_LOG_ERROR,"poll failed"); + return NULL; + } + if (pfd[0].revents & POLL_MASK_ERROR) { + gf_log(this->name,GF_LOG_ERROR, + "poll error on pipe"); + return NULL; + } + if (pfd[1].revents & POLL_MASK_ERROR) { + gf_log(this->name,GF_LOG_ERROR, + "poll error on socket"); + return NULL; + } + /* Only glusterd actually seems to need this. */ + THIS = this->xl; + if (pfd[1].revents & POLL_MASK_INPUT) { + ret = socket_event_poll_in(this); + } + else if (pfd[1].revents & POLL_MASK_OUTPUT) { + ret = socket_event_poll_out(this); + } + else { + /* + * This usually means that we left poll() because + * somebody pushed a byte onto our pipe. That wakeup + * is why the pipe is there, but once awake we can do + * all the checking we need on the next iteration. + */ + ret = 0; + } + if (ret < 0) { + gf_log(this->name,GF_LOG_ERROR, + "unknown error in polling loop"); + return NULL; + } + } } + int socket_server_event_handler (int fd, int idx, void *data, int poll_in, int poll_out, int poll_err) @@ -2035,14 +2095,16 @@ socket_server_event_handler (int fd, int idx, void *data, new_priv->connected = 1; rpc_transport_ref (new_trans); - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); + if (pipe(new_priv->pipe) < 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create pipe"); + } - if (new_priv->idx == -1) - ret = -1; + if (pthread_create(&new_priv->thread,NULL, + socket_poller,new_trans) != 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create poll thread"); + } } pthread_mutex_unlock (&new_priv->lock); if (ret == -1) { @@ -2242,16 +2304,18 @@ socket_connect (rpc_transport_t *this, int port) } priv->connected = 0; - rpc_transport_ref (this); - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); - if (priv->idx == -1) { - gf_log ("", GF_LOG_WARNING, - "failed to register the event"); - ret = -1; - } + if (pipe(priv->pipe) < 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create pipe"); + } + + if (pthread_create(&priv->thread,NULL, + socket_poller, this) != 0) { + gf_log(this->name,GF_LOG_ERROR, + "could not create poll thread"); + } } unlock: pthread_mutex_unlock (&priv->lock); @@ -2413,10 +2477,10 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + char a_byte = 'j'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2442,26 +2506,28 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) goto unlock; if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 1); - if (ret == 0) + if (ret == 0) { + gf_log(this->name,GF_LOG_DEBUG, + "request sent in-line"); need_append = 0; - - if (ret > 0) - need_poll_out = 1; + } } if (need_append) { + gf_log(this->name,GF_LOG_DEBUG,"deferring request"); list_add_tail (&entry->list, &priv->ioq); + /* + * Make sure the polling thread wakes up, by writing a + * byte to represent this entry. + */ + if (write(priv->pipe[1],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "write error on pipe"); + } ret = 0; } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } } unlock: pthread_mutex_unlock (&priv->lock); @@ -2476,10 +2542,10 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) { socket_private_t *priv = NULL; int ret = -1; - char need_poll_out = 0; char need_append = 1; struct ioq *entry = NULL; glusterfs_ctx_t *ctx = NULL; + char a_byte = 'd'; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -2498,33 +2564,36 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) } goto unlock; } + priv->submit_log = 0; entry = __socket_ioq_new (this, &reply->msg); if (!entry) goto unlock; + if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); + ret = __socket_ioq_churn_entry (this, entry, 1); - if (ret == 0) + if (ret == 0) { + gf_log(this->name,GF_LOG_DEBUG, + "reply sent in-line"); need_append = 0; - - if (ret > 0) - need_poll_out = 1; + } } if (need_append) { + gf_log(this->name,GF_LOG_DEBUG,"deferring reply"); list_add_tail (&entry->list, &priv->ioq); + /* + * Make sure the polling thread wakes up, by writing a + * byte to represent this entry. + */ + if (write(priv->pipe[1],&a_byte,1) < 1) { + gf_log(this->name,GF_LOG_WARNING, + "write error on pipe"); + } ret = 0; } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } } - unlock: pthread_mutex_unlock (&priv->lock); @@ -2692,7 +2761,7 @@ socket_init (rpc_transport_t *this) priv->idx = -1; priv->connected = -1; priv->nodelay = 1; - priv->bio = 0; + priv->bio = 1; priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; INIT_LIST_HEAD (&priv->ioq); diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index eaa38a9..625306c 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -204,6 +204,8 @@ typedef struct { char *ssl_own_cert; char *ssl_private_key; char *ssl_ca_list; + pthread_t thread; + int pipe[2]; } socket_private_t; -- 1.7.3.4