This patch allows the remote driver to work with an asynchronous EventImpl (it's the only one using an externally-supplied one), assuming libvirt is compiled with pthread support. (Without pthreads, this code is harmless in a single-threaded environment.) Basically it uses a mutex to protect reads from the RPC socket in such a way that message reads (in their entirety) are done atomically (otherwise the remoteDomainEventFired() can race the call() code that reads replies & events). In addition, I update the EventImpl handle to prevent remoteDomainEventFired() from being called everytime a reply is sent. (This helps us dispatch events in a timely manner, though it's not strictly necessary. Without it, any events coming in during a call() won't be dispatched until the call drops the socket lock (because remoteDomainEventFired() will be stuck awaiting the lock). Dave
diff --git a/src/remote_internal.c b/src/remote_internal.c index 2ca7930..59128f6 100644 --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -116,6 +116,7 @@ struct private_data { virDomainEventQueuePtr domainEvents; /* Timer for flushing domainEvents queue */ int eventFlushTimer; + PTHREAD_MUTEX_T(lock); /* Serializes socket reads w/async EventImpl */ }; #define GET_PRIVATE(conn,retcode) \ @@ -700,6 +701,9 @@ doRemoteOpen (virConnectPtr conn, } /* switch (transport) */ + /* This must precede the first call() */ + priv->eventFlushTimer = -1; + /* Try and authenticate with server */ if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) goto failed; @@ -744,6 +748,8 @@ doRemoteOpen (virConnectPtr conn, } } + pthread_mutex_init(&priv->lock, NULL); + if(VIR_ALLOC(priv->callbackList)<0) { error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list")); goto failed; @@ -1250,6 +1256,8 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv) /* Free queued events */ virDomainEventQueueFree(priv->domainEvents); + pthread_mutex_destroy(&priv->lock); + return 0; } @@ -4536,11 +4544,11 @@ static int really_read (virConnectPtr conn, struct private_data *priv, * else Bad Things will happen in the XDR code. */ static int -call (virConnectPtr conn, struct private_data *priv, - int flags /* if we are in virConnectOpen */, - int proc_nr, - xdrproc_t args_filter, char *args, - xdrproc_t ret_filter, char *ret) +really_call (virConnectPtr conn, struct private_data *priv, + int flags /* if we are in virConnectOpen */, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) { char buffer[REMOTE_MESSAGE_MAX]; char buffer2[4]; @@ -4596,16 +4604,18 @@ call (virConnectPtr conn, struct private_data *priv, really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) return -1; + pthread_mutex_lock(&priv->lock); + retry_read: /* Read and deserialise length word. */ if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) - return -1; + goto unlock_return_err; xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); if (!xdr_int (&xdr, &len)) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); - return -1; + goto unlock_return_err; } xdr_destroy (&xdr); @@ -4615,12 +4625,14 @@ retry_read: if (len < 0 || len > REMOTE_MESSAGE_MAX) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, _("packet received from server too large")); - return -1; + goto unlock_return_err; } /* Read reply header and what follows (either a ret or an error). */ if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1) - return -1; + goto unlock_return_err; + + pthread_mutex_unlock(&priv->lock); /* Deserialise reply header. */ xdrmem_create (&xdr, buffer, len, XDR_DECODE); @@ -4729,8 +4741,33 @@ retry_read: xdr_destroy (&xdr); return -1; } + + unlock_return_err: + pthread_mutex_unlock(&priv->lock); + return -1; +} + +static int call (virConnectPtr conn, struct private_data *priv, + int flags /* if we are in virConnectOpen */, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) +{ + int rv; + if (priv->eventFlushTimer >= 0) + virEventUpdateHandle(priv->sock, 0); + rv = really_call(conn, priv, flags, proc_nr, + args_filter, args, + ret_filter, ret); + if (priv->eventFlushTimer >= 0) + virEventUpdateHandle(priv->sock, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_ERROR | + VIR_EVENT_HANDLE_HANGUP); + return rv; } + static int really_write_buf (virConnectPtr conn, struct private_data *priv, int in_open /* if we are in virConnectOpen */, @@ -5287,14 +5324,16 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, return; } + pthread_mutex_lock(&priv->lock); + /* Read and deserialise length word. */ if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) - return; + goto unlock_and_return; xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); if (!xdr_int (&xdr, &len)) { error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); - return; + goto unlock_and_return; } xdr_destroy (&xdr); @@ -5303,15 +5342,17 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, if (len < 0 || len > REMOTE_MESSAGE_MAX) { error (conn, VIR_ERR_RPC, _("packet received from server too large")); - return; + goto unlock_and_return; } /* Read reply header and what follows (either a ret or an error). */ if (really_read (conn, priv, 0, buffer, len) == -1) { error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); - return; + goto unlock_and_return; } + pthread_mutex_unlock(&priv->lock); + /* Deserialise reply header. */ xdrmem_create (&xdr, buffer, len, XDR_DECODE); if (!xdr_remote_message_header (&xdr, &hdr)) { @@ -5327,6 +5368,11 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, DEBUG0("invalid proc in event firing"); error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); } + + return; + + unlock_and_return: + pthread_mutex_unlock(&priv->lock); } void
-- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list