"Daniel P. Berrange" <berrange@xxxxxxxxxx> wrote: > This patch re-writes the code for dispatching RPC calls in the > remote driver to allow use from multiple threads. Only one thread > is allowed to send/recv on the socket at a time though. If another > thread comes along it will put itself on a queue and go to sleep. > The first thread may actually get around to transmitting the 2nd > thread's request while it is waiting for its own reply. It may > even get the 2nd threads reply, if its own RPC call is being really > slow. So when a thread wakes up from sleeping, it has to check > whether its own RPC call has already been processed. Likewise when > a thread owning the socket finishes with its own wor, it may have > to pass the buck to another thread. The upshot of this, is that > we have mutliple RPC calls executing in parallel, and requests+reply > are no longer guarenteed to be FIFO on the wire if talking to a new > enough server. > > This refactoring required use of a self-pipe/poll trick for sync > between threads, but fortunately gnulib now provides this on Windows > too, so there's no compatability problem there. Quick summary: dense ;-) though lots of moved code. I haven't finished, but did find at least one problem, below. > diff --git a/src/remote_internal.c b/src/remote_internal.c ... > @@ -114,6 +164,11 @@ struct private_data { > virDomainEventQueuePtr domainEvents; > /* Timer for flushing domainEvents queue */ > int eventFlushTimer; > + > + /* List of threads currently doing dispatch */ > + int wakeupSend; > + int wakeupRead; How about appending "FD" to indicate these are file descriptors. The names combined with the comment (which must apply to waitDispatch) made me wonder what they represented. Only when I saw them used in safewrite /saferead calls did I get it. > + struct remote_thread_call *waitDispatch; > }; > > enum { > @@ -160,7 +215,6 @@ static void make_nonnull_storage_pool (r > static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); > static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); > void remoteDomainEventFired(int watch, int fd, int event, void *data); > -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); > static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); > void remoteDomainEventQueueFlush(int timer, void *opaque); > /*----------------------------------------------------------------------*/ > @@ -274,6 +328,7 @@ doRemoteOpen (virConnectPtr conn, > virConnectAuthPtr auth ATTRIBUTE_UNUSED, > int flags) > { > + int wakeup[2]; Add "fd" to this name, too? Not as big a deal, since this is local and the first use makes it obvious. > char *transport_str = NULL; > > if (conn->uri) { > @@ -696,6 +751,21 @@ doRemoteOpen (virConnectPtr conn, > > } /* switch (transport) */ > > + if (virSetNonBlock(priv->sock) < 0) { > + errorf (conn, VIR_ERR_SYSTEM_ERROR, > + _("unable to make socket non-blocking %s"), > + strerror(errno)); > + goto failed; > + } > + > + if (pipe(wakeup) < 0) { > + errorf (conn, VIR_ERR_SYSTEM_ERROR, > + _("unable to make pipe %s"), > + strerror(errno)); > + goto failed; > + } > + priv->wakeupRead = wakeup[0]; > + priv->wakeupSend = wakeup[1]; > > /* Try and authenticate with server */ > if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) > @@ -768,6 +838,7 @@ doRemoteOpen (virConnectPtr conn, > DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " > "continuing without events."); > virEventRemoveHandle(priv->watch); > + priv->watch = -1; > } > } > /* Successful. */ > @@ -848,6 +919,7 @@ remoteOpen (virConnectPtr conn, > } > remoteDriverLock(priv); > priv->localUses = 1; > + priv->watch = -1; > > if (flags & VIR_CONNECT_RO) > rflags |= VIR_DRV_OPEN_REMOTE_RO; > @@ -1220,6 +1292,7 @@ doRemoteClose (virConnectPtr conn, struc > virEventRemoveTimeout(priv->eventFlushTimer); > /* Remove handle for remote events */ > virEventRemoveHandle(priv->watch); > + priv->watch = -1; > } > > /* Close socket. */ > @@ -5542,12 +5615,658 @@ done: > > /*----------------------------------------------------------------------*/ > > -static int really_write (virConnectPtr conn, struct private_data *priv, > - int in_open, char *bytes, int len); > -static int really_read (virConnectPtr conn, struct private_data *priv, > - int in_open, char *bytes, int len); > - > -/* This function performs a remote procedure call to procedure PROC_NR. > + > +static struct remote_thread_call * > +prepareCall(virConnectPtr conn, > + struct private_data *priv, > + int flags, > + int proc_nr, > + xdrproc_t args_filter, char *args, > + xdrproc_t ret_filter, char *ret) > +{ > + XDR xdr; > + struct remote_message_header hdr; > + struct remote_thread_call *rv; > + > + if (VIR_ALLOC(rv) < 0) > + return NULL; > + > + if (virCondInit(&rv->cond) < 0) { > + VIR_FREE(rv); > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, > + VIR_ERR_INTERNAL_ERROR, > + _("cannot initialize mutex")); > + return NULL; > + } > + > + /* Get a unique serial number for this message. */ > + rv->serial = priv->counter++; > + rv->proc_nr = proc_nr; > + rv->ret_filter = ret_filter; > + rv->ret = ret; > + > + hdr.prog = REMOTE_PROGRAM; > + hdr.vers = REMOTE_PROTOCOL_VERSION; > + hdr.proc = proc_nr; > + hdr.direction = REMOTE_CALL; > + hdr.serial = rv->serial; > + hdr.status = REMOTE_OK; > + > + /* Serialise header followed by args. */ > + xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE); > + if (!xdr_remote_message_header (&xdr, &hdr)) { > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, > + VIR_ERR_RPC, _("xdr_remote_message_header failed")); > + goto error; > + } > + > + if (!(*args_filter) (&xdr, args)) { > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, > + _("marshalling args")); > + goto error; > + } > + > + /* Get the length stored in buffer. */ > + rv->bufferLength = xdr_getpos (&xdr); > + xdr_destroy (&xdr); > + > + /* Length must include the length word itself (always encoded in > + * 4 bytes as per RFC 4506). > + */ > + rv->bufferLength += 4; > + > + /* Encode the length word. */ > + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); > + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, > + _("xdr_int (length word)")); I haven't done enough xdr* work to know, and man pages didn't provide an immediate answer: Is there no need to call xdr_destroy on this error path? I'd expect xdrmem_create to do any allocation, not xdr_int. There are many like this. > + goto error; > + } > + xdr_destroy (&xdr); > + > + return rv; > + > +error: > + VIR_FREE(ret); > + return NULL; The above should free rv, not ret: VIR_FREE(rv); -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list