On Fri, Jan 16, 2009 at 12:11:16PM +0000, Daniel P. Berrange wrote: > > > @@ -114,6 +164,11 @@ struct private_data { > > > virDomainEventQueuePtr domainEvents; > > > /* Timer for flushing domainEvents queue */ > > > int eventFlushTimer; > > > + > > > + /* List of threads currently doing dispatch */ > > > + int wakeupSend; > > > + int wakeupRead; > > > > How about appending "FD" to indicate these are file descriptors. > > The names combined with the comment (which must apply to waitDispatch) > > made me wonder what they represented. Only when I saw them used > > in safewrite /saferead calls did I get it. > > Yes, good idea - and its not really a list of threads either, > so the comment is a little misleading :-) > > > + /* Encode the length word. */ > > > + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); > > > + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { > > > + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, > > > + _("xdr_int (length word)")); > > > > I haven't done enough xdr* work to know, and man pages > > didn't provide an immediate answer: > > Is there no need to call xdr_destroy on this error path? > > I'd expect xdrmem_create to do any allocation, not xdr_int. > > There are many like this. > > Yes, the 'error:' codepath should be calling 'xdr_destroy(&xdr)' > to ensure free'ing of memory. > > > > > > + goto error; > > > + } > > > + xdr_destroy (&xdr); > > > + > > > + return rv; > > > + > > > +error: > > > + VIR_FREE(ret); > > > + return NULL; > > > > The above should free rv, not ret: > > > > VIR_FREE(rv); Here is an update with those suggested renames & bug fixes in it. It also addresses the error reporting issue mentioned in http://www.redhat.com/archives/libvir-list/2009-January/msg00428.html That code should not have been using DEBUG() - it now correctly raises a real error including the error string, not just errno. There were two other bugs with missing error raising in the path for sasl_encode/decode. Everything upto this patch is committed, so this is diffed against current CVS. libvirt_private.syms | 1 remote_internal.c | 1539 ++++++++++++++++++++++++++++++++------------------- util.c | 33 - util.h | 2 4 files changed, 1002 insertions(+), 573 deletions(-) Daniel diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -290,6 +290,7 @@ virEnumToString; virEventAddHandle; virEventRemoveHandle; virExec; +virSetNonBlock; virFormatMacAddr; virGetHostname; virParseMacAddr; diff --git a/src/remote_internal.c b/src/remote_internal.c --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -68,6 +68,8 @@ #include <netdb.h> +#include <poll.h> + /* AI_ADDRCONFIG is missing on some systems. */ #ifndef AI_ADDRCONFIG # define AI_ADDRCONFIG 0 @@ -86,8 +88,44 @@ #include "util.h" #include "event.h" +#ifdef WIN32 +#define pipe(fds) _pipe(fds,4096, _O_BINARY) +#endif + + static int inside_daemon = 0; +struct remote_thread_call; + + +enum { + REMOTE_MODE_WAIT_TX, + REMOTE_MODE_WAIT_RX, + REMOTE_MODE_COMPLETE, + REMOTE_MODE_ERROR, +}; + +struct remote_thread_call { + int mode; + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + + unsigned int serial; + unsigned int proc_nr; + + virCond cond; + + xdrproc_t ret_filter; + char *ret; + + remote_error err; + + struct remote_thread_call *next; +}; + struct private_data { virMutex lock; @@ -101,12 +139,24 @@ struct private_data { int localUses; /* Ref count for private data */ char *hostname; /* Original hostname */ FILE *debugLog; /* Debug remote protocol */ + #if HAVE_SASL sasl_conn_t *saslconn; /* SASL context */ + const char *saslDecoded; unsigned int saslDecodedLength; unsigned int saslDecodedOffset; -#endif + + const char *saslEncoded; + unsigned int saslEncodedLength; + unsigned int saslEncodedOffset; +#endif + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + /* The list of domain event callbacks */ virDomainEventCallbackListPtr callbackList; /* The queue of domain events generated @@ -114,6 +164,13 @@ struct private_data { virDomainEventQueuePtr domainEvents; /* Timer for flushing domainEvents queue */ int eventFlushTimer; + + /* Self-pipe to wakeup threads waiting in poll() */ + int wakeupSendFD; + int wakeupReadFD; + + /* List of threads currently waiting for dispatch */ + struct remote_thread_call *waitDispatch; }; enum { @@ -160,7 +217,6 @@ static void make_nonnull_network (remote static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); void remoteDomainEventFired(int watch, int fd, int event, void *data); -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); void remoteDomainEventQueueFlush(int timer, void *opaque); /*----------------------------------------------------------------------*/ @@ -274,6 +330,7 @@ doRemoteOpen (virConnectPtr conn, virConnectAuthPtr auth ATTRIBUTE_UNUSED, int flags) { + int wakeupFD[2]; char *transport_str = NULL; if (conn->uri) { @@ -696,6 +753,21 @@ doRemoteOpen (virConnectPtr conn, } /* switch (transport) */ + if (virSetNonBlock(priv->sock) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make socket non-blocking %s"), + strerror(errno)); + goto failed; + } + + if (pipe(wakeupFD) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make pipe %s"), + strerror(errno)); + goto failed; + } + priv->wakeupReadFD = wakeupFD[0]; + priv->wakeupSendFD = wakeupFD[1]; /* Try and authenticate with server */ if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) @@ -768,6 +840,7 @@ doRemoteOpen (virConnectPtr conn, DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " "continuing without events."); virEventRemoveHandle(priv->watch); + priv->watch = -1; } } /* Successful. */ @@ -848,6 +921,7 @@ remoteOpen (virConnectPtr conn, } remoteDriverLock(priv); priv->localUses = 1; + priv->watch = -1; if (flags & VIR_CONNECT_RO) rflags |= VIR_DRV_OPEN_REMOTE_RO; @@ -1220,6 +1294,7 @@ doRemoteClose (virConnectPtr conn, struc virEventRemoveTimeout(priv->eventFlushTimer); /* Remove handle for remote events */ virEventRemoveHandle(priv->watch); + priv->watch = -1; } /* Close socket. */ @@ -5537,12 +5612,665 @@ done: /*----------------------------------------------------------------------*/ -static int really_write (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); -static int really_read (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); - -/* This function performs a remote procedure call to procedure PROC_NR. + +static struct remote_thread_call * +prepareCall(virConnectPtr conn, + struct private_data *priv, + int flags, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) +{ + XDR xdr; + struct remote_message_header hdr; + struct remote_thread_call *rv; + + if (VIR_ALLOC(rv) < 0) + return NULL; + + if (virCondInit(&rv->cond) < 0) { + VIR_FREE(rv); + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_INTERNAL_ERROR, + _("cannot initialize mutex")); + return NULL; + } + + /* Get a unique serial number for this message. */ + rv->serial = priv->counter++; + rv->proc_nr = proc_nr; + rv->ret_filter = ret_filter; + rv->ret = ret; + + hdr.prog = REMOTE_PROGRAM; + hdr.vers = REMOTE_PROTOCOL_VERSION; + hdr.proc = proc_nr; + hdr.direction = REMOTE_CALL; + hdr.serial = rv->serial; + hdr.status = REMOTE_OK; + + /* Serialise header followed by args. */ + xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_RPC, _("xdr_remote_message_header failed")); + goto error; + } + + if (!(*args_filter) (&xdr, args)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("marshalling args")); + goto error; + } + + /* Get the length stored in buffer. */ + rv->bufferLength = xdr_getpos (&xdr); + xdr_destroy (&xdr); + + /* Length must include the length word itself (always encoded in + * 4 bytes as per RFC 4506). + */ + rv->bufferLength += 4; + + /* Encode the length word. */ + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("xdr_int (length word)")); + goto error; + } + xdr_destroy (&xdr); + + return rv; + +error: + xdr_destroy (&xdr); + VIR_FREE(rv); + return NULL; +} + + + +static int +processCallWrite(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + const char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_send (priv->session, bytes, len); + if (ret < 0) { + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret)); + return -1; + } + } else { + resend: + ret = send (priv->sock, bytes, len, 0); + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EWOULDBLOCK) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, strerror (errno)); + return -1; + + } + } + + return ret; +} + + +static int +processCallRead(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_recv (priv->session, bytes, len); + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + /* Treat 0 == EOF as an error */ + if (ret <= 0) { + if (ret < 0) + errorf (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, + _("failed to read from TLS socket %s"), + gnutls_strerror (ret)); + else + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + "%s", _("server closed connection")); + return -1; + } + } else { + resend: + ret = recv (priv->sock, bytes, len, 0); + if (ret <= 0) { + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EWOULDBLOCK) + return 0; + + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + _("failed to read from socket %s"), + strerror (errno)); + } else { + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + "%s", _("server closed connection")); + } + return -1; + } + } + + return ret; +} + + +static int +processCallSendOne(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thecall) +{ +#if HAVE_SASL + if (priv->saslconn) { + const char *output; + unsigned int outputlen; + int err, ret; + + if (!priv->saslEncoded) { + err = sasl_encode(priv->saslconn, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset, + &output, &outputlen); + if (err != SASL_OK) { + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("failed to encode SASL data: %s"), + sasl_errstring(err, NULL, NULL)); + return -1; + } + priv->saslEncoded = output; + priv->saslEncodedLength = outputlen; + priv->saslEncodedOffset = 0; + + thecall->bufferOffset = thecall->bufferLength; + } + + ret = processCallWrite(conn, priv, in_open, + priv->saslEncoded + priv->saslEncodedOffset, + priv->saslEncodedLength - priv->saslEncodedOffset); + if (ret < 0) + return ret; + priv->saslEncodedOffset += ret; + + if (priv->saslEncodedOffset == priv->saslEncodedLength) { + priv->saslEncoded = NULL; + priv->saslEncodedOffset = priv->saslEncodedLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } + } else { +#endif + int ret; + ret = processCallWrite(conn, priv, in_open, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset); + if (ret < 0) + return ret; + thecall->bufferOffset += ret; + + if (thecall->bufferOffset == thecall->bufferLength) { + thecall->bufferOffset = thecall->bufferLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } +#if HAVE_SASL + } +#endif + return 0; +} + + +static int +processCallSend(virConnectPtr conn, struct private_data *priv, + int in_open) { + struct remote_thread_call *thecall = priv->waitDispatch; + + while (thecall && + thecall->mode != REMOTE_MODE_WAIT_TX) + thecall = thecall->next; + + if (!thecall) + return -1; /* Shouldn't happen, but you never know... */ + + while (thecall) { + int ret = processCallSendOne(conn, priv, in_open, thecall); + if (ret < 0) + return ret; + + if (thecall->mode == REMOTE_MODE_WAIT_TX) + return 0; /* Blocking write, to back to event loop */ + + thecall = thecall->next; + } + + return 0; /* No more calls to send, all done */ +} + +static int +processCallRecvSome(virConnectPtr conn, struct private_data *priv, + int in_open) { + unsigned int wantData; + + /* Start by reading length word */ + if (priv->bufferLength == 0) + priv->bufferLength = 4; + + wantData = priv->bufferLength - priv->bufferOffset; + +#if HAVE_SASL + if (priv->saslconn) { + if (priv->saslDecoded == NULL) { + char encoded[8192]; + unsigned int encodedLen = sizeof(encoded); + int ret, err; + ret = processCallRead(conn, priv, in_open, + encoded, encodedLen); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + err = sasl_decode(priv->saslconn, encoded, ret, + &priv->saslDecoded, &priv->saslDecodedLength); + if (err != SASL_OK) { + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("failed to decode SASL data: %s"), + sasl_errstring(err, NULL, NULL)); + return -1; + } + priv->saslDecodedOffset = 0; + } + + if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData) + wantData = (priv->saslDecodedLength - priv->saslDecodedOffset); + + memcpy(priv->buffer + priv->bufferOffset, + priv->saslDecoded + priv->saslDecodedOffset, + wantData); + priv->saslDecodedOffset += wantData; + priv->bufferOffset += wantData; + if (priv->saslDecodedOffset == priv->saslDecodedLength) { + priv->saslDecodedLength = priv->saslDecodedLength = 0; + priv->saslDecoded = NULL; + } + + return wantData; + } else { +#endif + int ret; + + ret = processCallRead(conn, priv, in_open, + priv->buffer + priv->bufferOffset, + wantData); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + priv->bufferOffset += ret; + + return ret; +#if HAVE_SASL + } +#endif +} + + +static void +processCallAsyncEvent(virConnectPtr conn, struct private_data *priv, + int in_open, + remote_message_header *hdr, + XDR *xdr) { + /* An async message has come in while we were waiting for the + * response. Process it to pull it off the wire, and try again + */ + DEBUG0("Encountered an event while waiting for a response"); + + if (in_open) { + DEBUG("Ignoring bogus event %d received while in open", hdr->proc); + return; + } + + if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) { + remoteDomainQueueEvent(conn, xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); + } else { + DEBUG("Unexpected event proc %d", hdr->proc); + } +} + +static int +processCallRecvLen(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + int len; + + xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE); + if (!xdr_int (&xdr, &len)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("xdr_int (length word, reply)")); + return -1; + } + xdr_destroy (&xdr); + + /* Length includes length word - adjust to real length to read. */ + len -= 4; + + if (len < 0 || len > REMOTE_MESSAGE_MAX) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("packet received from server too large")); + return -1; + } + + /* Extend our declared buffer length and carry + on reading the header + payload */ + priv->bufferLength += len; + DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len); + return 0; +} + + +static int +processCallRecvMsg(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + struct remote_message_header hdr; + int len = priv->bufferLength - 4; + struct remote_thread_call *thecall; + + /* Deserialise reply header. */ + xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("invalid header in reply")); + return -1; + } + + /* Check program, version, etc. are what we expect. */ + if (hdr.prog != REMOTE_PROGRAM) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown program (received %x, expected %x)"), + hdr.prog, REMOTE_PROGRAM); + return -1; + } + if (hdr.vers != REMOTE_PROTOCOL_VERSION) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown protocol version (received %x, expected %x)"), + hdr.vers, REMOTE_PROTOCOL_VERSION); + return -1; + } + + /* Async events from server need special handling */ + if (hdr.direction == REMOTE_MESSAGE) { + processCallAsyncEvent(conn, priv, in_open, + &hdr, &xdr); + xdr_destroy(&xdr); + return 0; + } + + if (hdr.direction != REMOTE_REPLY) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("got unexpected RPC call %d from server"), + hdr.proc); + xdr_destroy(&xdr); + return -1; + } + + /* Ok, definitely got an RPC reply now find + out who's been waiting for it */ + + thecall = priv->waitDispatch; + while (thecall && + thecall->serial != hdr.serial) + thecall = thecall->next; + + if (!thecall) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("no call waiting for reply with serial %d"), + hdr.serial); + xdr_destroy(&xdr); + return -1; + } + + if (hdr.proc != thecall->proc_nr) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown procedure (received %x, expected %x)"), + hdr.proc, thecall->proc_nr); + xdr_destroy (&xdr); + return -1; + } + + /* Status is either REMOTE_OK (meaning that what follows is a ret + * structure), or REMOTE_ERROR (and what follows is a remote_error + * structure). + */ + switch (hdr.status) { + case REMOTE_OK: + if (!(*thecall->ret_filter) (&xdr, thecall->ret)) { + error (in_open ? NULL : conn, VIR_ERR_RPC, + _("unmarshalling ret")); + return -1; + } + thecall->mode = REMOTE_MODE_COMPLETE; + xdr_destroy (&xdr); + return 0; + + case REMOTE_ERROR: + memset (&thecall->err, 0, sizeof thecall->err); + if (!xdr_remote_error (&xdr, &thecall->err)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("unmarshalling remote_error")); + return -1; + } + xdr_destroy (&xdr); + thecall->mode = REMOTE_MODE_ERROR; + return 0; + + default: + virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown status (received %x)"), + hdr.status); + xdr_destroy (&xdr); + return -1; + } +} + + +static int +processCallRecv(virConnectPtr conn, struct private_data *priv, + int in_open) { + int ret; + + /* Read as much data as is available, until we get + * EGAIN + */ + for (;;) { + ret = processCallRecvSome(conn, priv, in_open); + + if (ret < 0) + return -1; + if (ret == 0) + return 0; /* Blocking on read */ + + /* Check for completion of our goal */ + if (priv->bufferOffset == priv->bufferLength) { + if (priv->bufferOffset == 4) { + ret = processCallRecvLen(conn, priv, in_open); + } else { + ret = processCallRecvMsg(conn, priv, in_open); + priv->bufferOffset = priv->bufferLength = 0; + } + if (ret < 0) + return -1; + } + } +} + +/* + * Process all calls pending dispatch/receive until we + * get a reply to our own call. Then quit and pass the buck + * to someone else. + */ +static int +processCalls(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thiscall) +{ + struct pollfd fds[2]; + int ret; + + fds[0].fd = priv->sock; + fds[1].fd = priv->wakeupReadFD; + + for (;;) { + struct remote_thread_call *tmp = priv->waitDispatch; + struct remote_thread_call *prev; + char ignore; + + fds[0].events = fds[0].revents = 0; + fds[1].events = fds[1].revents = 0; + + fds[1].events = POLLIN; + while (tmp) { + if (tmp->mode == REMOTE_MODE_WAIT_RX) + fds[0].events |= POLLIN; + if (tmp->mode == REMOTE_MODE_WAIT_TX) + fds[0].events |= POLLOUT; + + tmp = tmp->next; + } + + /* Release lock while poll'ing so other threads + * can stuff themselves on the queue */ + remoteDriverUnlock(priv); + + repoll: + ret = poll(fds, ARRAY_CARDINALITY(fds), -1); + if (ret < 0 && errno == EINTR) + goto repoll; + remoteDriverLock(priv); + + if (fds[1].revents) { + DEBUG0("Woken up from poll by other thread"); + saferead(priv->wakeupReadFD, &ignore, sizeof(ignore)); + } + + if (ret < 0) { + if (errno == EWOULDBLOCK) + continue; + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("poll on socket failed %s"), strerror(errno)); + return -1; + } + + if (fds[0].revents & POLLOUT) { + if (processCallSend(conn, priv, in_open) < 0) + return -1; + } + + if (fds[0].revents & POLLIN) { + if (processCallRecv(conn, priv, in_open) < 0) + return -1; + } + + /* Iterate through waiting threads and if + * any are complete then tell 'em to wakeup + */ + tmp = priv->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp != thiscall && + (tmp->mode == REMOTE_MODE_COMPLETE || + tmp->mode == REMOTE_MODE_ERROR)) { + /* Take them out of the list */ + if (prev) + prev->next = tmp->next; + else + priv->waitDispatch = tmp->next; + + /* And wake them up.... + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch); + virCondSignal(&tmp->cond); + } + prev = tmp; + tmp = tmp->next; + } + + /* Now see if *we* are done */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* We're at head of the list already, so + * remove us + */ + priv->waitDispatch = thiscall->next; + DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch); + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + if (priv->waitDispatch) { + DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch); + virCondSignal(&priv->waitDispatch->cond); + } + return 0; + } + + + if (fds[0].revents & (POLLHUP | POLLERR)) { + errorf(in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + "%s", _("received hangup / error event on socket")); + return -1; + } + } +} + +/* + * This function performs a remote procedure call to procedure PROC_NR. * * NB. This does not free the args structure (not desirable, since you * often want this allocated on the stack or else it contains strings @@ -5551,204 +6279,29 @@ static int really_read (virConnectPtr co * * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling, * else Bad Things will happen in the XDR code. - */ -static int -doCall (virConnectPtr conn, struct private_data *priv, - int flags /* if we are in virConnectOpen */, - int proc_nr, - xdrproc_t args_filter, char *args, - xdrproc_t ret_filter, char *ret) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; - XDR xdr; - int len; - struct remote_error rerror; - - /* Get a unique serial number for this message. */ - int serial = priv->counter++; - - hdr.prog = REMOTE_PROGRAM; - hdr.vers = REMOTE_PROTOCOL_VERSION; - hdr.proc = proc_nr; - hdr.direction = REMOTE_CALL; - hdr.serial = serial; - hdr.status = REMOTE_OK; - - /* Serialise header followed by args. */ - xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("xdr_remote_message_header failed")); - return -1; - } - - if (!(*args_filter) (&xdr, args)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("marshalling args")); - return -1; - } - - /* Get the length stored in buffer. */ - len = xdr_getpos (&xdr); - xdr_destroy (&xdr); - - /* Length must include the length word itself (always encoded in - * 4 bytes as per RFC 4506). - */ - len += 4; - - /* Encode the length word. */ - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE); - if (!xdr_int (&xdr, &len)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("xdr_int (length word)")); - return -1; - } - xdr_destroy (&xdr); - - /* Send length word followed by header+args. */ - if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 || - really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) - return -1; - -retry_read: - /* Read and deserialise length word. */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) - return -1; - - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("xdr_int (length word, reply)")); - return -1; - } - xdr_destroy (&xdr); - - /* Length includes length word - adjust to real length to read. */ - len -= 4; - - if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("packet received from server too large")); - return -1; - } - - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1) - return -1; - - /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("invalid header in reply")); - return -1; - } - - /* Check program, version, etc. are what we expect. */ - if (hdr.prog != REMOTE_PROGRAM) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown program (received %x, expected %x)"), - hdr.prog, REMOTE_PROGRAM); - return -1; - } - if (hdr.vers != REMOTE_PROTOCOL_VERSION) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown protocol version (received %x, expected %x)"), - hdr.vers, REMOTE_PROTOCOL_VERSION); - return -1; - } - - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - /* An async message has come in while we were waiting for the - * response. Process it to pull it off the wire, and try again - */ - DEBUG0("Encountered an event while waiting for a response"); - - remoteDomainQueueEvent(conn, &xdr); - virEventUpdateTimeout(priv->eventFlushTimer, 0); - - DEBUG0("Retrying read"); - xdr_destroy (&xdr); - goto retry_read; - } - if (hdr.proc != proc_nr) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown procedure (received %x, expected %x)"), - hdr.proc, proc_nr); - return -1; - } - if (hdr.direction != REMOTE_REPLY) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown direction (received %x, expected %x)"), - hdr.direction, REMOTE_REPLY); - return -1; - } - if (hdr.serial != serial) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown serial (received %x, expected %x)"), - hdr.serial, serial); - return -1; - } - - /* Status is either REMOTE_OK (meaning that what follows is a ret - * structure), or REMOTE_ERROR (and what follows is a remote_error - * structure). - */ - switch (hdr.status) { - case REMOTE_OK: - if (!(*ret_filter) (&xdr, ret)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("unmarshalling ret")); - return -1; - } - xdr_destroy (&xdr); - return 0; - - case REMOTE_ERROR: - memset (&rerror, 0, sizeof rerror); - if (!xdr_remote_error (&xdr, &rerror)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("unmarshalling remote_error")); - return -1; - } - xdr_destroy (&xdr); - /* See if caller asked us to keep quiet about missing RPCs - * eg for interop with older servers */ - if (flags & REMOTE_CALL_QUIET_MISSING_RPC && - rerror.domain == VIR_FROM_REMOTE && - rerror.code == VIR_ERR_RPC && - rerror.level == VIR_ERR_ERROR && - STRPREFIX(*rerror.message, "unknown procedure")) { - return -2; - } - server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror); - xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror); - return -1; - - default: - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown status (received %x)"), - hdr.status); - xdr_destroy (&xdr); - return -1; - } -} - - + * + * NB(3) You must have the private_data lock before calling this + * + * NB(4) This is very complicated. Due to connection cloning, multiple + * threads can want to use the socket at once. Obviously only one of + * them can. So if someone's using the socket, other threads are put + * to sleep on condition variables. THe existing thread may completely + * send & receive their RPC call/reply while they're asleep. Or it + * may only get around to dealing with sending the call. Or it may + * get around to neither. So upon waking up from slumber, the other + * thread may or may not have more work todo. + * + * We call this dance 'passing the buck' + * + * http://en.wikipedia.org/wiki/Passing_the_buck + * + * "Buck passing or passing the buck is the action of transferring + * responsibility or blame unto another person. It is also used as + * a strategy in power politics when the actions of one country/ + * nation are blamed on another, providing an opportunity for war." + * + * NB(5) Don't Panic! + */ static int call (virConnectPtr conn, struct private_data *priv, int flags /* if we are in virConnectOpen */, @@ -5757,6 +6310,87 @@ call (virConnectPtr conn, struct private xdrproc_t ret_filter, char *ret) { int rv; + struct remote_thread_call *thiscall; + + DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch); + thiscall = prepareCall(conn, priv, flags, proc_nr, + args_filter, args, + ret_filter, ret); + + if (!thiscall) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_NO_MEMORY, NULL); + return -1; + } + + /* Check to see if another thread is dispatching */ + if (priv->waitDispatch) { + /* Stick ourselves on the end of the wait queue */ + struct remote_thread_call *tmp = priv->waitDispatch; + char ignore = 1; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + priv->waitDispatch = thiscall; + + /* Force other thread to wakup from poll */ + safewrite(priv->wakeupSendFD, &ignore, sizeof(ignore)); + + DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Go to sleep while other thread is working... */ + if (virCondWait(&thiscall->cond, &priv->lock) < 0) { + if (priv->waitDispatch == thiscall) { + priv->waitDispatch = thiscall->next; + } else { + tmp = priv->waitDispatch; + while (tmp && tmp->next && + tmp->next != thiscall) { + tmp = tmp->next; + } + if (tmp && tmp->next == thiscall) + tmp->next = thiscall->next; + } + errorf(flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_INTERNAL_ERROR, "%s", + _("failed to wait on condition")); + VIR_FREE(thiscall); + return -1; + } + + DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Two reasons we can be woken up + * 1. Other thread has got our reply ready for us + * 2. Other thread is all done, and it is our turn to + * be the dispatcher to finish waiting for + * our reply + */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* + * We avoided catching the buck and our reply is ready ! + * We've already had 'thiscall' removed from the list + * so just need to (maybe) handle errors & free it + */ + goto cleanup; + } + + /* Grr, someone passed the buck onto us ... */ + + } else { + /* We're first to catch the buck */ + priv->waitDispatch = thiscall; + } + + DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* + * The buck stops here! + * + * At this point we're about to own the dispatch + * process... + */ + /* * Avoid needless wake-ups of the event loop in the * case where this call is being made from a different @@ -5767,207 +6401,146 @@ call (virConnectPtr conn, struct private if (priv->watch >= 0) virEventUpdateHandle(priv->watch, 0); - rv = doCall(conn, priv,flags, proc_nr, - args_filter, args, - ret_filter, ret); + rv = processCalls(conn, priv, + flags & REMOTE_CALL_IN_OPEN ? 1 : 0, + thiscall); if (priv->watch >= 0) virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE); - return rv; -} - -static int -really_write_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - const char *bytes, int len) -{ - const char *p; - int err; - - p = bytes; - if (priv->uses_tls) { - do { - err = gnutls_record_send (priv->session, p, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); - return -1; - } - len -= err; - p += err; - } - while (len > 0); - } else { - do { - err = send (priv->sock, p, len, 0); - if (err == -1) { - if (errno == EINTR || errno == EAGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); - return -1; - } - len -= err; - p += err; - } - while (len > 0); - } - - return 0; -} - -static int -really_write_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - return really_write_buf(conn, priv, in_open, bytes, len); -} - -#if HAVE_SASL -static int -really_write_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - const char *output; - unsigned int outputlen; - int err; - - err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen); - if (err != SASL_OK) { - return -1; - } - - return really_write_buf(conn, priv, in_open, output, outputlen); -} -#endif - -static int -really_write (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ -#if HAVE_SASL - if (priv->saslconn) - return really_write_sasl(conn, priv, in_open, bytes, len); - else -#endif - return really_write_plain(conn, priv, in_open, bytes, len); -} - -static int -really_read_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - int err; - - if (priv->uses_tls) { - tlsreread: - err = gnutls_record_recv (priv->session, bytes, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED) - goto tlsreread; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); - return -1; - } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; - } - } else { - reread: - err = recv (priv->sock, bytes, len, 0); - if (err == -1) { - if (errno == EINTR) - goto reread; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); - return -1; - } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; - } - } - - return err; -} - -static int -really_read_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - do { - int ret = really_read_buf (conn, priv, in_open, bytes, len); - if (ret < 0) - return -1; - - len -= ret; - bytes += ret; - } while (len > 0); - - return 0; -} - -#if HAVE_SASL -static int -really_read_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - do { - int want, got; - if (priv->saslDecoded == NULL) { - char encoded[8192]; - int encodedLen = sizeof(encoded); - int err, ret; - ret = really_read_buf (conn, priv, in_open, encoded, encodedLen); - if (ret < 0) - return -1; - - err = sasl_decode(priv->saslconn, encoded, ret, - &priv->saslDecoded, &priv->saslDecodedLength); - } - - got = priv->saslDecodedLength - priv->saslDecodedOffset; - want = len; - if (want > got) - want = got; - - memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want); - priv->saslDecodedOffset += want; - if (priv->saslDecodedOffset == priv->saslDecodedLength) { - priv->saslDecoded = NULL; - priv->saslDecodedOffset = priv->saslDecodedLength = 0; - } - bytes += want; - len -= want; - } while (len > 0); - - return 0; -} -#endif - -static int -really_read (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ -#if HAVE_SASL - if (priv->saslconn) - return really_read_sasl (conn, priv, in_open, bytes, len); - else -#endif - return really_read_plain (conn, priv, in_open, bytes, len); -} + + if (rv < 0) { + VIR_FREE(thiscall); + return -1; + } + +cleanup: + DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall); + if (thiscall->mode == REMOTE_MODE_ERROR) { + /* See if caller asked us to keep quiet about missing RPCs + * eg for interop with older servers */ + if (flags & REMOTE_CALL_QUIET_MISSING_RPC && + thiscall->err.domain == VIR_FROM_REMOTE && + thiscall->err.code == VIR_ERR_RPC && + thiscall->err.level == VIR_ERR_ERROR && + STRPREFIX(*thiscall->err.message, "unknown procedure")) { + rv = -2; + } else { + server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + &thiscall->err); + rv = -1; + } + } else { + rv = 0; + } + VIR_FREE(thiscall); + return rv; +} + +/** + * remoteDomainReadEvent + * + * Read the event data off the wire + */ +static virDomainEventPtr +remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) +{ + remote_domain_event_ret ret; + virDomainPtr dom; + virDomainEventPtr event = NULL; + memset (&ret, 0, sizeof ret); + + /* unmarshall parameters, and process it*/ + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { + error (conn, VIR_ERR_RPC, + _("remoteDomainProcessEvent: unmarshalling ret")); + return NULL; + } + + dom = get_nonnull_domain(conn,ret.dom); + if (!dom) + return NULL; + + event = virDomainEventNewFromDom(dom, ret.event, ret.detail); + + virDomainFree(dom); + return event; +} + +static void +remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) +{ + struct private_data *priv = conn->privateData; + virDomainEventPtr event; + + event = remoteDomainReadEvent(conn, xdr); + if (!event) + return; + + if (virDomainEventQueuePush(priv->domainEvents, + event) < 0) { + DEBUG0("Error adding event to queue"); + virDomainEventFree(event); + } +} + +/** remoteDomainEventFired: + * + * The callback for monitoring the remote socket + * for event data + */ +void +remoteDomainEventFired(int watch, + int fd, + int event, + void *opaque) +{ + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + remoteDriverLock(priv); + + /* This should be impossible, but it doesn't hurt to check */ + if (priv->waitDispatch) + goto done; + + DEBUG("Event fired %d %d %d %X", watch, fd, event, event); + + if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { + DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " + "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (fd != priv->sock) { + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (processCallRecv(conn, priv, 0) < 0) + DEBUG0("Something went wrong during async message processing"); + +done: + remoteDriverUnlock(priv); +} + +void +remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + remoteDriverLock(priv); + + virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, + virDomainEventDispatchDefaultFunc, NULL); + virEventUpdateTimeout(priv->eventFlushTimer, -1); + + remoteDriverUnlock(priv); +} + /* For errors internal to this library. */ static void @@ -6267,161 +6840,3 @@ remoteRegister (void) return 0; } -/** - * remoteDomainReadEvent - * - * Read the event data off the wire - */ -static virDomainEventPtr -remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) -{ - remote_domain_event_ret ret; - virDomainPtr dom; - virDomainEventPtr event = NULL; - memset (&ret, 0, sizeof ret); - - /* unmarshall parameters, and process it*/ - if (! xdr_remote_domain_event_ret(xdr, &ret) ) { - error (conn, VIR_ERR_RPC, - _("remoteDomainProcessEvent: unmarshalling ret")); - return NULL; - } - - dom = get_nonnull_domain(conn,ret.dom); - if (!dom) - return NULL; - - event = virDomainEventNewFromDom(dom, ret.event, ret.detail); - - virDomainFree(dom); - return event; -} - -static void -remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - DEBUG0("Calling domain event callbacks (no queue)"); - virDomainEventDispatch(event, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virDomainEventFree(event); -} - -static void -remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - if (virDomainEventQueuePush(priv->domainEvents, - event) < 0) { - DEBUG0("Error adding event to queue"); - virDomainEventFree(event); - } -} - -/** remoteDomainEventFired: - * - * The callback for monitoring the remote socket - * for event data - */ -void -remoteDomainEventFired(int watch, - int fd, - int event, - void *opaque) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; - XDR xdr; - int len; - - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - DEBUG("Event fired %d %d %d %X", watch, fd, event, event); - - if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { - DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " - "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); - virEventRemoveHandle(watch); - goto done; - } - - if (fd != priv->sock) { - virEventRemoveHandle(watch); - goto done; - } - - /* Read and deserialise length word. */ - if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) - goto done; - - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { - error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); - goto done; - } - xdr_destroy (&xdr); - - /* Length includes length word - adjust to real length to read. */ - len -= 4; - - if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (conn, VIR_ERR_RPC, _("packet received from server too large")); - goto done; - } - - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, 0, buffer, len) == -1) { - error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); - goto done; - } - - /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (conn, VIR_ERR_RPC, _("invalid header in event firing")); - goto done; - } - - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - DEBUG0("Encountered an async event"); - remoteDomainProcessEvent(conn, &xdr); - } else { - DEBUG0("invalid proc in event firing"); - error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); - } - -done: - remoteDriverUnlock(priv); -} - -void -remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) -{ - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virEventUpdateTimeout(priv->eventFlushTimer, -1); - - remoteDriverUnlock(priv); -} diff --git a/src/util.c b/src/util.c --- a/src/util.c +++ b/src/util.c @@ -34,6 +34,7 @@ #include <poll.h> #include <sys/types.h> #include <sys/stat.h> +#include <sys/ioctl.h> #if HAVE_SYS_WAIT_H #include <sys/wait.h> #endif @@ -155,8 +156,28 @@ virArgvToString(const char *const *argv) return ret; } +int virSetNonBlock(int fd) { +#ifndef WIN32 + int flags; + if ((flags = fcntl(fd, F_GETFL)) < 0) + return -1; + flags |= O_NONBLOCK; + if ((fcntl(fd, F_SETFL, flags)) < 0) + return -1; +#else + unsigned long flag = 1; -#ifndef __MINGW32__ + /* This is actually Gnulib's replacement rpl_ioctl function. + * We can't call ioctlsocket directly in any case. + */ + if (ioctl (fd, FIONBIO, (void *) &flag) == -1) + return -1; +#endif + return 0; +} + + +#ifndef WIN32 static int virSetCloseExec(int fd) { int flags; @@ -168,16 +189,6 @@ static int virSetCloseExec(int fd) { return 0; } -static int virSetNonBlock(int fd) { - int flags; - if ((flags = fcntl(fd, F_GETFL)) < 0) - return -1; - flags |= O_NONBLOCK; - if ((fcntl(fd, F_SETFL, flags)) < 0) - return -1; - return 0; -} - static int __virExec(virConnectPtr conn, const char *const*argv, diff --git a/src/util.h b/src/util.h --- a/src/util.h +++ b/src/util.h @@ -38,6 +38,8 @@ enum { VIR_EXEC_DAEMON = (1 << 1), }; +int virSetNonBlock(int fd); + int virExec(virConnectPtr conn, const char *const*argv, const char *const*envp, -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list