This patch is the final one for multi-threading in the remote driver. With the first 5 patches applied the remote driver is thread safe, but everything is still serialized, since the call() method which does the network I/O blocks until the reply is fully received, and holds the mutex while blocking. This is sub-optimal :-) We fundamentally only have a single network socket open, so we have to make sure only one thread is ever doing network I/O in each direction. The GNUTLS and SASL encryption state is also only safe to use from a single thread. This means we can't simply let the call() methods run in parallel. While one thread is waiting to receive it RPC reply message, there's no reason why the RPC requests from other threads can't be sent out on the wire. So called pipelining of requests. In fact, since we have a unique serial number for every request/reply there's no reason why replies need come back in the same order as they're sent. This provides optimal concurrency All the complexity starts off in the call() method. From the caller's point of view, this method takes an RPC request, sends it, waits for the reply message, and returns the data. ie it is blocking. The caller of call() is required to be holding the driver mutex. Upon entering the call() there are two possible scenarios - No thread is currently doing network I/O. - Another thread is doing network I/O In the first scenario, this thread takes immediate responsibilty for doing all network I/O until such time as its finished its own RPC call. This thread is said to be "holding the buck" In the second scenario, this thread delegates responsibility for doing I/O to the existing thread, and puts itself to sleep. Someone else is "holding the buck" While the thread holding the buck is waiting for its reply to arrive, it releases the driver mutex. This allows other threads to enter the call method and queue up their requests. Once the first thread has finished sending its initial request data, it will it start to send requests from other queued threads. It may even start to process other threads replies, and/or asynchronous events. Eventually its own reply will arrive. At this point the first thread is all done. There are again two possible scenarios for it to consider: - No other threads were sleeping waiting for it to finish - One or more threads were sleeping waiting for it to finish If no threads were waiting, it just returns, giving up the buck. If one or more threads were sleeping, then it picks the first sleeping thread and wakes it up. This is 'passing the buck'. >From the point of view of a sleeping thread, there are also two possible scenarios when it gets woken up - It is being woken because the other thread has got its reply ready. - It is being woken because the other threadis passing it the buck In the first case, the thread being woken is done with its call and can just return from call() to its caller. In the second case, the thread being woken up is now "holding the buck' and has to take over responsibility for all network I/O, until its desired reply arrives To deal with all this 'buck passing', we need to keep various bits of state around about each thread's RPC call. This is done in the struct remote_thread_call. struct remote_thread_call { /* This indicated whether this call is being sent, being received, completed successfully, or failed with error */ int mode; /* The data being sent on the wire, the length, and progress */ /* 4 byte length, followed by RPC message headerbody */ char buffer[4 REMOTE_MESSAGE_MAX]; unsigned int bufferLength; unsigned int bufferOffset; /* The serial number & procedure number, so we can match up its reply when it arrives */ unsigned int serial; unsigned int proc_nr; /* The condition the thread sleeps on while another thread is holding the buck */ pthread_cond_t cond; /* Used to de-serialize the XDR object into the return values */ xdrproc_t ret_filter; /* Struct to hold the return values. Yes, evil XDR casts */ char *ret; /* If the 'mode' indicated an error, this stores it */ remote_error err; /* The next thread waiting to dispatch, if any */ struct remote_thread_call *next; }; As more and more threads arrie in the call() method, they're all queued up to have their requests processed in FIFO order for sake of fairness. Replies are processed in any order - whatever they arrive in, though the libvirtd daemon currently does strict FIFO ordering for replies. Since we need to release the mutex while waiting for I/O, we can no longer use blocking I/O on the socket. So its now put into non-blocking mode and then we use poll() to watch for readability / writability. We need the remote driver to work on Windows, but I'm hoping GNULIB's poll() function will work well enough for this not to be a problem Since this is a work in progress patch, there's quite a bit of debugging in it. I've also done a stupid modification to the 'virNodeGetInfo' impl for the QEMU driver to sleep for 5 seconds to demonstrate overlapping operations, and another stupid modification to virsh to spawn a thread that calls virNodeGetInfo in a loop. This just demonstrates I've got the hand-off between threads working in the RPC call() method. Obviously not to be comitted. Running virsh -c qemu:///system shows the two threads working in lock-step. In this case the background thread has started its virNodeGetInfo call and then I typed 'list' in the foreground thread DEBUG: libvirt.c: virConnectClone (conn=0x72e110) DEBUG: libvirt.c: virNodeGetInfo (conn=0x72fb40, info=0x7f5ee6025030) DEBUG: remote_internal.c: call (Doing call 6 (nil)) Welcome to lt-virsh, the virtualization interactive terminal. Type: 'help' for help with commands 'quit' to quit DEBUG: remote_internal.c: call (We have the buck 6 0x7380f0 0x7380f0) ^ at this point the BG thread is waiting for its reply... virsh # list DEBUG: libvirt.c: virConnectNumOfDomains (conn=0x72e110) DEBUG: remote_internal.c: call (Doing call 51 0x7380f0) DEBUG: remote_internal.c: call (Going to sleep 51 0x7380f0 0x7910b0) ^ And thanks to us typing 'list', a new call is triggerd. We can see it putting itself to sleep since the bg thread has the buck DEBUG: remote_internal.c: processCallRecv (Do 4 0) DEBUG: remote_internal.c: processCallRecvLen (Got length, now need 188 total (184 more)) DEBUG: remote_internal.c: processCallRecv (Do 188 4) DEBUG: remote_internal.c: processCallRecv (Do 0 0) ^ Here we see the first thread has got the reply it wanted DEBUG: remote_internal.c: processCalls (Giving up the buck 6 0x7380f0 0x7910b0) ^ So its decided to give up the buck DEBUG: remote_internal.c: processCalls (Passing the buck to 51 0x7910b0) ^ And noticed another thread was waiting, so passed it the buck DEBUG: remote_internal.c: call (All done with our call 6 0x7910b0 0x7380f0) 2 1 1 8119572 DEBUG: remote_internal.c: call (Wokeup from sleep 51 0x7910b0 0x7910b0) DEBUG: remote_internal.c: call (We have the buck 51 0x7910b0 0x7910b0) ^ The second thread has now woken up and got the buck. Its requests has already been sent onto the wire by the first thread, so now it merely waits for its reply DEBUG: remote_internal.c: processCallRecv (Do 4 0) DEBUG: remote_internal.c: processCallRecvLen (Got length, now need 32 total (28 more)) DEBUG: remote_internal.c: processCallRecv (Do 32 4) DEBUG: remote_internal.c: processCallRecv (Do 0 0) ^ Which is has now got. DEBUG: remote_internal.c: processCalls (Giving up the buck 51 0x7910b0 (nil)) DEBUG: remote_internal.c: call (All done with our call 51 (nil) 0x7910b0) ^ The second thread now gives up the buck, and there's no one waiting who wants it Id Name State ---------------------------------- virsh # Daniel diff --git a/src/libvirt_sym.version.in b/src/libvirt_sym.version.in --- a/src/libvirt_sym.version.in +++ b/src/libvirt_sym.version.in @@ -586,6 +586,7 @@ LIBVIRT_PRIVATE_@VERSION@ { virEventAddHandle; virEventRemoveHandle; virExec; + virSetNonBlock; virFormatMacAddr; virParseMacAddr; virFileDeletePid; diff --git a/src/qemu_driver.c b/src/qemu_driver.c --- a/src/qemu_driver.c +++ b/src/qemu_driver.c @@ -1338,6 +1338,7 @@ static int qemudGetMaxVCPUs(virConnectPt static int qemudGetNodeInfo(virConnectPtr conn, virNodeInfoPtr nodeinfo) { + sleep(5); return virNodeInfoPopulate(conn, nodeinfo); } diff --git a/src/remote_internal.c b/src/remote_internal.c --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -67,6 +67,8 @@ #include <libxml/uri.h> #include <netdb.h> + +#include <poll.h> /* AI_ADDRCONFIG is missing on some systems. */ #ifndef AI_ADDRCONFIG @@ -88,6 +90,37 @@ static int inside_daemon = 0; +struct remote_thread_call; + + +enum { + REMOTE_MODE_WAIT_TX, + REMOTE_MODE_WAIT_RX, + REMOTE_MODE_COMPLETE, + REMOTE_MODE_ERROR, +}; + +struct remote_thread_call { + int mode; + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + + unsigned int serial; + unsigned int proc_nr; + + pthread_cond_t cond; + + xdrproc_t ret_filter; + char *ret; + + remote_error err; + + struct remote_thread_call *next; +}; + struct private_data { PTHREAD_MUTEX_T(lock); @@ -101,12 +134,24 @@ struct private_data { int localUses; /* Ref count for private data */ char *hostname; /* Original hostname */ FILE *debugLog; /* Debug remote protocol */ + #if HAVE_SASL sasl_conn_t *saslconn; /* SASL context */ + const char *saslDecoded; unsigned int saslDecodedLength; unsigned int saslDecodedOffset; -#endif + + const char *saslEncoded; + unsigned int saslEncodedLength; + unsigned int saslEncodedOffset; +#endif + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + /* The list of domain event callbacks */ virDomainEventCallbackListPtr callbackList; /* The queue of domain events generated @@ -114,6 +159,9 @@ struct private_data { virDomainEventQueuePtr domainEvents; /* Timer for flushing domainEvents queue */ int eventFlushTimer; + + /* List of threads currently doing dispatch */ + struct remote_thread_call *waitDispatch; }; enum { @@ -160,7 +208,6 @@ static void make_nonnull_storage_pool (r static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); void remoteDomainEventFired(int watch, int fd, int event, void *data); -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); void remoteDomainEventQueueFlush(int timer, void *opaque); /*----------------------------------------------------------------------*/ @@ -695,6 +742,13 @@ doRemoteOpen (virConnectPtr conn, } /* switch (transport) */ + if (virSetNonBlock(priv->sock) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make socket non-blocking %s"), + strerror(errno)); + goto failed; + } + /* Try and authenticate with server */ if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) @@ -767,6 +821,7 @@ doRemoteOpen (virConnectPtr conn, DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " "continuing without events."); virEventRemoveHandle(priv->watch); + priv->watch = -1; } } /* Successful. */ @@ -842,6 +897,7 @@ remoteOpen (virConnectPtr conn, pthread_mutex_init(&priv->lock, NULL); remoteDriverLock(priv); priv->localUses = 1; + priv->watch = -1; if (flags & VIR_CONNECT_RO) rflags |= VIR_DRV_OPEN_REMOTE_RO; @@ -1226,6 +1282,7 @@ doRemoteClose (virConnectPtr conn, struc virEventRemoveTimeout(priv->eventFlushTimer); /* Remove handle for remote events */ virEventRemoveHandle(priv->watch); + priv->watch = -1; } /* Close socket. */ @@ -5570,12 +5627,616 @@ done: /*----------------------------------------------------------------------*/ -static int really_write (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); -static int really_read (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); - -/* This function performs a remote procedure call to procedure PROC_NR. + +static struct remote_thread_call * +prepareCall(virConnectPtr conn, + struct private_data *priv, + int flags, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) +{ + XDR xdr; + struct remote_message_header hdr; + struct remote_thread_call *rv; + + if (VIR_ALLOC(rv) < 0) + return NULL; + + /* Get a unique serial number for this message. */ + rv->serial = priv->counter++; + rv->proc_nr = proc_nr; + rv->ret_filter = ret_filter; + rv->ret = ret; + + pthread_cond_init(&rv->cond, NULL); + + hdr.prog = REMOTE_PROGRAM; + hdr.vers = REMOTE_PROTOCOL_VERSION; + hdr.proc = proc_nr; + hdr.direction = REMOTE_CALL; + hdr.serial = rv->serial; + hdr.status = REMOTE_OK; + + /* Serialise header followed by args. */ + xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_RPC, _("xdr_remote_message_header failed")); + goto error; + } + + if (!(*args_filter) (&xdr, args)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("marshalling args")); + goto error; + } + + /* Get the length stored in buffer. */ + rv->bufferLength = xdr_getpos (&xdr); + xdr_destroy (&xdr); + + /* Length must include the length word itself (always encoded in + * 4 bytes as per RFC 4506). + */ + rv->bufferLength += 4; + + /* Encode the length word. */ + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + _("xdr_int (length word)")); + goto error; + } + xdr_destroy (&xdr); + + return rv; + +error: + VIR_FREE(ret); + return NULL; +} + + + +static int +processCallWrite(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + const char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_send (priv->session, bytes, len); + if (ret < 0) { + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret)); + return -1; + } + } else { + resend: + ret = send (priv->sock, bytes, len, 0); + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EAGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, strerror (errno)); + return -1; + + } + } + + return ret; +} + + +static int +processCallRead(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_recv (priv->session, bytes, len); + if (ret < 0) { + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret)); + return -1; + } + } else { + resend: + ret = recv (priv->sock, bytes, len, 0); + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EAGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, strerror (errno)); + return -1; + + } + } + + return ret; +} + + +static int +processCallSendOne(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thecall) +{ +#if HAVE_SASL + if (priv->saslconn) { + const char *output; + unsigned int outputlen; + int err, ret; + + if (!priv->saslEncoded) { + err = sasl_encode(priv->saslconn, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset, + &output, &outputlen); + if (err != SASL_OK) { + return -1; + } + priv->saslEncoded = output; + priv->saslEncodedLength = outputlen; + priv->saslEncodedOffset = 0; + + thecall->bufferOffset = thecall->bufferLength; + } + + ret = processCallWrite(conn, priv, in_open, + priv->saslEncoded + priv->saslEncodedOffset, + priv->saslEncodedLength - priv->saslEncodedOffset); + if (ret < 0) + return ret; + priv->saslEncodedOffset += ret; + + if (priv->saslEncodedOffset == priv->saslEncodedLength) { + priv->saslEncoded = NULL; + priv->saslEncodedOffset = priv->saslEncodedLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } + } else { +#endif + int ret; + ret = processCallWrite(conn, priv, in_open, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset); + if (ret < 0) + return ret; + thecall->bufferOffset += ret; + + if (thecall->bufferOffset == thecall->bufferLength) { + thecall->bufferOffset = thecall->bufferLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } +#if HAVE_SASL + } +#endif + return 0; +} + + +static int +processCallSend(virConnectPtr conn, struct private_data *priv, + int in_open) { + struct remote_thread_call *thecall = priv->waitDispatch; + + while (thecall && + thecall->mode != REMOTE_MODE_WAIT_TX) + thecall = thecall->next; + + if (!thecall) + return -1; /* Shouldn't happen, but you never know... */ + + while (thecall) { + int ret = processCallSendOne(conn, priv, in_open, thecall); + if (ret < 0) + return ret; + + if (thecall->mode == REMOTE_MODE_WAIT_TX) + return 0; /* Blocking write, to back to event loop */ + + thecall = thecall->next; + } + + return 0; /* No more calls to send, all done */ +} + +static int +processCallRecvSome(virConnectPtr conn, struct private_data *priv, + int in_open) { + unsigned int wantData; + + /* Start by reading length word */ + if (priv->bufferLength == 0) + priv->bufferLength = 4; + + wantData = priv->bufferLength - priv->bufferOffset; + +#if HAVE_SASL + if (priv->saslconn) { + if (priv->saslDecoded == NULL) { + char encoded[8192]; + unsigned int encodedLen = sizeof(encoded); + int ret, err; + ret = processCallRead(conn, priv, in_open, + encoded, encodedLen); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + err = sasl_decode(priv->saslconn, encoded, ret, + &priv->saslDecoded, &priv->saslDecodedLength); + if (ret != SASL_OK) + return -1; + priv->saslDecodedOffset = 0; + } + + if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData) + wantData = (priv->saslDecodedLength - priv->saslDecodedOffset); + + memcpy(priv->buffer + priv->bufferOffset, + priv->saslDecoded + priv->saslDecodedOffset, + wantData); + priv->saslDecodedOffset += wantData; + priv->bufferOffset += wantData; + if (priv->saslDecodedOffset == priv->saslDecodedLength) { + priv->saslDecodedLength = priv->saslDecodedLength = 0; + priv->saslDecoded = NULL; + } + + return wantData; + } else { +#endif + int ret; + + ret = processCallRead(conn, priv, in_open, + priv->buffer + priv->bufferOffset, + wantData); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + priv->bufferOffset += ret; + + return ret; +#if HAVE_SASL + } +#endif +} + + +static void +processCallAsyncEvent(virConnectPtr conn, struct private_data *priv, + int in_open, + remote_message_header *hdr, + XDR *xdr) { + /* An async message has come in while we were waiting for the + * response. Process it to pull it off the wire, and try again + */ + DEBUG0("Encountered an event while waiting for a response"); + + if (in_open) { + DEBUG("Ignoring bogus event %d received while in open", hdr->proc); + return; + } + + if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) { + remoteDomainQueueEvent(conn, xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); + } else { + DEBUG("Unexpected event proc %d", hdr->proc); + } +} + +static int +processCallRecvLen(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + int len; + + xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE); + if (!xdr_int (&xdr, &len)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("xdr_int (length word, reply)")); + return -1; + } + xdr_destroy (&xdr); + + /* Length includes length word - adjust to real length to read. */ + len -= 4; + + if (len < 0 || len > REMOTE_MESSAGE_MAX) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("packet received from server too large")); + return -1; + } + + /* Extend our declared buffer length and carry + on reading the header + payload */ + priv->bufferLength += len; + DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len); + return 0; +} + + +static int +processCallRecvMsg(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + struct remote_message_header hdr; + int len = priv->bufferLength - 4; + struct remote_thread_call *thecall; + + /* Deserialise reply header. */ + xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("invalid header in reply")); + return -1; + } + + /* Check program, version, etc. are what we expect. */ + if (hdr.prog != REMOTE_PROGRAM) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown program (received %x, expected %x)"), + hdr.prog, REMOTE_PROGRAM); + return -1; + } + if (hdr.vers != REMOTE_PROTOCOL_VERSION) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown protocol version (received %x, expected %x)"), + hdr.vers, REMOTE_PROTOCOL_VERSION); + return -1; + } + + /* Async events from server need special handling */ + if (hdr.direction == REMOTE_MESSAGE) { + processCallAsyncEvent(conn, priv, in_open, + &hdr, &xdr); + xdr_destroy(&xdr); + return 0; + } + + if (hdr.direction != REMOTE_REPLY) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("got unexpected RPC call %d from server"), + hdr.proc); + xdr_destroy(&xdr); + return -1; + } + + /* Ok, definitely got an RPC reply now find + out who's been waiting for it */ + + thecall = priv->waitDispatch; + while (thecall && + thecall->serial != hdr.serial) + thecall = thecall->next; + + if (!thecall) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("no call waiting for reply with serial %d"), + hdr.serial); + xdr_destroy(&xdr); + return -1; + } + + if (hdr.proc != thecall->proc_nr) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown procedure (received %x, expected %x)"), + hdr.proc, thecall->proc_nr); + xdr_destroy (&xdr); + return -1; + } + + /* Status is either REMOTE_OK (meaning that what follows is a ret + * structure), or REMOTE_ERROR (and what follows is a remote_error + * structure). + */ + switch (hdr.status) { + case REMOTE_OK: + if (!(*thecall->ret_filter) (&xdr, thecall->ret)) { + error (in_open ? NULL : conn, VIR_ERR_RPC, + _("unmarshalling ret")); + return -1; + } + thecall->mode = REMOTE_MODE_COMPLETE; + xdr_destroy (&xdr); + return 0; + + case REMOTE_ERROR: + memset (&thecall->err, 0, sizeof thecall->err); + if (!xdr_remote_error (&xdr, &thecall->err)) { + error (in_open ? NULL : conn, + VIR_ERR_RPC, _("unmarshalling remote_error")); + return -1; + } + xdr_destroy (&xdr); + thecall->mode = REMOTE_MODE_ERROR; + return 0; + + default: + virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown status (received %x)"), + hdr.status); + xdr_destroy (&xdr); + return -1; + } +} + + +static int +processCallRecv(virConnectPtr conn, struct private_data *priv, + int in_open) { + int ret; + + /* Read as much data as is available, until we get + * EGAIN + */ + for (;;) { + DEBUG("Do %d %d", priv->bufferLength, priv->bufferOffset); + ret = processCallRecvSome(conn, priv, in_open); + + if (ret < 0) + return -1; + if (ret == 0) + return 0; /* Blocking on read */ + + /* Check for completion of our goal */ + if (priv->bufferOffset == priv->bufferLength) { + if (priv->bufferOffset == 4) { + ret = processCallRecvLen(conn, priv, in_open); + } else { + ret = processCallRecvMsg(conn, priv, in_open); + priv->bufferOffset = priv->bufferLength = 0; + } + if (ret < 0) + return -1; + } + } +} + +/* + * Process all calls pending dispatch/receive until we + * get a reply to our own call. Then quit and pass the buck + * to someone else. + */ +static int +processCalls(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thiscall) +{ + struct pollfd fds[1]; + int ret; + + /* XXX add wakeup pipe */ + + /* XXX weeeeeeeendows hate, perhaps gnulib poll() will work ? */ + + fds[0].fd = priv->sock; + + for (;;) { + struct remote_thread_call *tmp = priv->waitDispatch; + struct remote_thread_call *prev; + + fds[0].events = fds[0].revents = 0; + while (tmp) { + if (tmp->mode == REMOTE_MODE_WAIT_RX) + fds[0].events |= POLLIN; + if (tmp->mode == REMOTE_MODE_WAIT_TX) + fds[0].events |= POLLOUT; + + tmp = tmp->next; + } + + /* Release lock while poll'ing so other threads + * can stuff themselves on the queue */ + remoteDriverUnlock(priv); + ret = poll(fds, ARRAY_CARDINALITY(fds), -1); + remoteDriverLock(priv); + + if (ret < 0) { + if (errno == EAGAIN) + continue; + ;/* XXX damn */ + } + + if (fds[0].revents & POLLOUT) + processCallSend(conn, priv, in_open); + + if (fds[0].revents & POLLIN) + processCallRecv(conn, priv, in_open); + + /* XXX poll hup/err */ + + /* Iterate through waiting threads and if + * any are complete then tell 'em to wakeup + */ + tmp = priv->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp != thiscall && + (tmp->mode == REMOTE_MODE_COMPLETE || + tmp->mode == REMOTE_MODE_ERROR)) { + /* Take them out of the list */ + if (prev) + prev->next = tmp->next; + else + priv->waitDispatch = tmp->next; + + /* And wake them up.... + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch); + pthread_cond_signal(&tmp->cond); + } + prev = tmp; + tmp = tmp->next; + } + + /* Now see if *we* are done */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* We're at head of the list already, so + * remove us + */ + priv->waitDispatch = thiscall->next; + DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch); + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + if (priv->waitDispatch) { + DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch); + pthread_cond_signal(&priv->waitDispatch->cond); + } + return 0; + } + } +} + +/* + * This function performs a remote procedure call to procedure PROC_NR. * * NB. This does not free the args structure (not desirable, since you * often want this allocated on the stack or else it contains strings @@ -5584,204 +6245,29 @@ static int really_read (virConnectPtr co * * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling, * else Bad Things will happen in the XDR code. - */ -static int -doCall (virConnectPtr conn, struct private_data *priv, - int flags /* if we are in virConnectOpen */, - int proc_nr, - xdrproc_t args_filter, char *args, - xdrproc_t ret_filter, char *ret) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; - XDR xdr; - int len; - struct remote_error rerror; - - /* Get a unique serial number for this message. */ - int serial = priv->counter++; - - hdr.prog = REMOTE_PROGRAM; - hdr.vers = REMOTE_PROTOCOL_VERSION; - hdr.proc = proc_nr; - hdr.direction = REMOTE_CALL; - hdr.serial = serial; - hdr.status = REMOTE_OK; - - /* Serialise header followed by args. */ - xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("xdr_remote_message_header failed")); - return -1; - } - - if (!(*args_filter) (&xdr, args)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("marshalling args")); - return -1; - } - - /* Get the length stored in buffer. */ - len = xdr_getpos (&xdr); - xdr_destroy (&xdr); - - /* Length must include the length word itself (always encoded in - * 4 bytes as per RFC 4506). - */ - len += 4; - - /* Encode the length word. */ - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE); - if (!xdr_int (&xdr, &len)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("xdr_int (length word)")); - return -1; - } - xdr_destroy (&xdr); - - /* Send length word followed by header+args. */ - if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 || - really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) - return -1; - -retry_read: - /* Read and deserialise length word. */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) - return -1; - - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("xdr_int (length word, reply)")); - return -1; - } - xdr_destroy (&xdr); - - /* Length includes length word - adjust to real length to read. */ - len -= 4; - - if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("packet received from server too large")); - return -1; - } - - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1) - return -1; - - /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("invalid header in reply")); - return -1; - } - - /* Check program, version, etc. are what we expect. */ - if (hdr.prog != REMOTE_PROGRAM) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown program (received %x, expected %x)"), - hdr.prog, REMOTE_PROGRAM); - return -1; - } - if (hdr.vers != REMOTE_PROTOCOL_VERSION) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown protocol version (received %x, expected %x)"), - hdr.vers, REMOTE_PROTOCOL_VERSION); - return -1; - } - - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - /* An async message has come in while we were waiting for the - * response. Process it to pull it off the wire, and try again - */ - DEBUG0("Encountered an event while waiting for a response"); - - remoteDomainQueueEvent(conn, &xdr); - virEventUpdateTimeout(priv->eventFlushTimer, 0); - - DEBUG0("Retrying read"); - xdr_destroy (&xdr); - goto retry_read; - } - if (hdr.proc != proc_nr) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown procedure (received %x, expected %x)"), - hdr.proc, proc_nr); - return -1; - } - if (hdr.direction != REMOTE_REPLY) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown direction (received %x, expected %x)"), - hdr.direction, REMOTE_REPLY); - return -1; - } - if (hdr.serial != serial) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown serial (received %x, expected %x)"), - hdr.serial, serial); - return -1; - } - - /* Status is either REMOTE_OK (meaning that what follows is a ret - * structure), or REMOTE_ERROR (and what follows is a remote_error - * structure). - */ - switch (hdr.status) { - case REMOTE_OK: - if (!(*ret_filter) (&xdr, ret)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, - _("unmarshalling ret")); - return -1; - } - xdr_destroy (&xdr); - return 0; - - case REMOTE_ERROR: - memset (&rerror, 0, sizeof rerror); - if (!xdr_remote_error (&xdr, &rerror)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - VIR_ERR_RPC, _("unmarshalling remote_error")); - return -1; - } - xdr_destroy (&xdr); - /* See if caller asked us to keep quiet about missing RPCs - * eg for interop with older servers */ - if (flags & REMOTE_CALL_QUIET_MISSING_RPC && - rerror.domain == VIR_FROM_REMOTE && - rerror.code == VIR_ERR_RPC && - rerror.level == VIR_ERR_ERROR && - STRPREFIX(*rerror.message, "unknown procedure")) { - return -2; - } - server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror); - xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror); - return -1; - - default: - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown status (received %x)"), - hdr.status); - xdr_destroy (&xdr); - return -1; - } -} - - + * + * NB(3) You must have the private_data lock before calling this + * + * NB(4) This is very complicated. Due to connection cloning, multiple + * threads can want to use the socket at once. Obviously only one of + * them can. So if someone's using the socket, other threads are put + * to sleep on condition variables. THe existing thread may completely + * send & receive their RPC call/reply while they're asleep. Or it + * may only get around to dealing with sending the call. Or it may + * get around to neither. So upon waking up from slumber, the other + * thread may or may not have more work todo. + * + * We call this dance 'passing the buck' + * + * http://en.wikipedia.org/wiki/Passing_the_buck + * + * "Buck passing or passing the buck is the action of transferring + * responsibility or blame unto another person. It is also used as + * a strategy in power politics when the actions of one country/ + * nation are blamed on another, providing an opportunity for war." + * + * NB(5) Don't Panic! + */ static int call (virConnectPtr conn, struct private_data *priv, int flags /* if we are in virConnectOpen */, @@ -5790,6 +6276,66 @@ call (virConnectPtr conn, struct private xdrproc_t ret_filter, char *ret) { int rv; + struct remote_thread_call *thiscall; + + DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch); + thiscall = prepareCall(conn, priv, flags, proc_nr, + args_filter, args, + ret_filter, ret); + + if (!thiscall) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_NO_MEMORY, NULL); + return -1; + } + + /* Check to see if another thread is dispatching */ + if (priv->waitDispatch) { + /* Stick ourselves on the end of the wait queue */ + struct remote_thread_call *tmp = priv->waitDispatch; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + priv->waitDispatch = thiscall; + + DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Go to sleep while other thread is working... */ + pthread_cond_wait(&thiscall->cond, &priv->lock); + + DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Two reasons we can be woken up + * 1. Other thread has got our reply ready for us + * 2. Other thread is all done, and its out turn to + * be the dispatcher to finish waiting for + * out reply + */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* + * We avoided catching the buck and our reply is ready ! + * We've already had 'thiscall' removed from the list + * so just need to (maybe) handle errors & free it + */ + goto cleanup; + } + + /* Grr, someone passed the buck onto us ... */ + + } else { + /* We're first to catch the buck */ + priv->waitDispatch = thiscall; + } + + DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* + * The buck stops here! + * + * At this point we're about to own the dispatch + * process... + */ + /* * Avoid needless wake-ups of the event loop in the * case where this call is being made from a different @@ -5800,209 +6346,147 @@ call (virConnectPtr conn, struct private if (priv->watch >= 0) virEventUpdateHandle(priv->watch, 0); - rv = doCall(conn, priv,flags, proc_nr, - args_filter, args, - ret_filter, ret); + rv = processCalls(conn, priv, + flags & REMOTE_CALL_IN_OPEN ? 1 : 0, + thiscall); if (priv->watch >= 0) virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE); - return rv; -} - -static int -really_write_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - const char *bytes, int len) -{ - const char *p; - int err; - - p = bytes; - if (priv->uses_tls) { - do { - err = gnutls_record_send (priv->session, p, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); - return -1; - } - len -= err; - p += err; - } - while (len > 0); - } else { - do { - err = send (priv->sock, p, len, 0); - if (err == -1) { - if (errno == EINTR || errno == EAGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); - return -1; - } - len -= err; - p += err; - } - while (len > 0); - } - - return 0; -} - -static int -really_write_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - return really_write_buf(conn, priv, in_open, bytes, len); -} - -#if HAVE_SASL -static int -really_write_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - const char *output; - unsigned int outputlen; - int err; - - err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen); - if (err != SASL_OK) { - return -1; - } - - return really_write_buf(conn, priv, in_open, output, outputlen); -} -#endif - -static int -really_write (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ -#if HAVE_SASL - if (priv->saslconn) - return really_write_sasl(conn, priv, in_open, bytes, len); - else -#endif - return really_write_plain(conn, priv, in_open, bytes, len); -} - -static int -really_read_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - int err; - - if (priv->uses_tls) { - tlsreread: - err = gnutls_record_recv (priv->session, bytes, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED) - goto tlsreread; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); - return -1; - } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; - } - return err; - } else { - reread: - err = recv (priv->sock, bytes, len, 0); - if (err == -1) { - if (errno == EINTR) - goto reread; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); - return -1; - } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; - } - return err; - } - - return 0; -} - -static int -really_read_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - do { - int ret = really_read_buf (conn, priv, in_open, bytes, len); - if (ret < 0) - return -1; - - len -= ret; - bytes += ret; - } while (len > 0); - - return 0; -} - -#if HAVE_SASL -static int -really_read_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - do { - int want, got; - if (priv->saslDecoded == NULL) { - char encoded[8192]; - int encodedLen = sizeof(encoded); - int err, ret; - ret = really_read_buf (conn, priv, in_open, encoded, encodedLen); - if (ret < 0) - return -1; - - err = sasl_decode(priv->saslconn, encoded, ret, - &priv->saslDecoded, &priv->saslDecodedLength); - } - - got = priv->saslDecodedLength - priv->saslDecodedOffset; - want = len; - if (want > got) - want = got; - - memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want); - priv->saslDecodedOffset += want; - if (priv->saslDecodedOffset == priv->saslDecodedLength) { - priv->saslDecoded = NULL; - priv->saslDecodedOffset = priv->saslDecodedLength = 0; - } - bytes += want; - len -= want; - } while (len > 0); - - return 0; -} -#endif - -static int -really_read (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ -#if HAVE_SASL - if (priv->saslconn) - return really_read_sasl (conn, priv, in_open, bytes, len); - else -#endif - return really_read_plain (conn, priv, in_open, bytes, len); -} + + if (rv < 0) { + VIR_FREE(thiscall); + return -1; + } + +cleanup: + DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall); + if (thiscall->mode == REMOTE_MODE_ERROR) { + /* See if caller asked us to keep quiet about missing RPCs + * eg for interop with older servers */ + if (flags & REMOTE_CALL_QUIET_MISSING_RPC && + thiscall->err.domain == VIR_FROM_REMOTE && + thiscall->err.code == VIR_ERR_RPC && + thiscall->err.level == VIR_ERR_ERROR && + STRPREFIX(*thiscall->err.message, "unknown procedure")) { + rv = -2; + } else { + server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + &thiscall->err); + rv = -1; + } + } else { + rv = 0; + } + VIR_FREE(thiscall); + return rv; +} + + +/** + * remoteDomainReadEvent + * + * Read the event data off the wire + */ +static virDomainEventPtr +remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) +{ + remote_domain_event_ret ret; + virDomainPtr dom; + virDomainEventPtr event = NULL; + memset (&ret, 0, sizeof ret); + + /* unmarshall parameters, and process it*/ + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { + error (conn, VIR_ERR_RPC, + _("remoteDomainProcessEvent: unmarshalling ret")); + return NULL; + } + + dom = get_nonnull_domain(conn,ret.dom); + if (!dom) + return NULL; + + event = virDomainEventNewFromDom(dom, ret.event, ret.detail); + + virDomainFree(dom); + return event; +} + +static void +remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) +{ + struct private_data *priv = conn->privateData; + virDomainEventPtr event; + + event = remoteDomainReadEvent(conn, xdr); + if (!event) + return; + + if (virDomainEventQueuePush(priv->domainEvents, + event) < 0) + DEBUG0("Error adding event to queue"); + + virDomainEventFree(event); +} + +/** remoteDomainEventFired: + * + * The callback for monitoring the remote socket + * for event data + */ +void +remoteDomainEventFired(int watch, + int fd, + int event, + void *opaque) +{ + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + remoteDriverLock(priv); + + /* This should be impossible, but it doesn't hurt to check */ + if (priv->waitDispatch) + goto done; + + DEBUG("Event fired %d %d %d %X", watch, fd, event, event); + + if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { + DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " + "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (fd != priv->sock) { + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (processCallRecv(conn, priv, 0) < 0) + DEBUG0("Something went wrong during async message processing"); + +done: + remoteDriverUnlock(priv); +} + +void +remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + remoteDriverLock(priv); + + virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, + virDomainEventDispatchDefaultFunc, NULL); + virEventUpdateTimeout(priv->eventFlushTimer, -1); + + remoteDriverUnlock(priv); +} + /* For errors internal to this library. */ static void @@ -6306,161 +6790,3 @@ remoteRegister (void) return 0; } -/** - * remoteDomainReadEvent - * - * Read the event data off the wire - */ -static virDomainEventPtr -remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) -{ - remote_domain_event_ret ret; - virDomainPtr dom; - virDomainEventPtr event = NULL; - memset (&ret, 0, sizeof ret); - - /* unmarshall parameters, and process it*/ - if (! xdr_remote_domain_event_ret(xdr, &ret) ) { - error (conn, VIR_ERR_RPC, - _("remoteDomainProcessEvent: unmarshalling ret")); - return NULL; - } - - dom = get_nonnull_domain(conn,ret.dom); - if (!dom) - return NULL; - - event = virDomainEventNewFromDom(dom, ret.event, ret.detail); - - virDomainFree(dom); - return event; -} - -static void -remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - DEBUG0("Calling domain event callbacks (no queue)"); - virDomainEventDispatch(event, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virDomainEventFree(event); -} - -static void -remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - if (virDomainEventQueuePush(priv->domainEvents, - event) < 0) - DEBUG0("Error adding event to queue"); - - virDomainEventFree(event); -} - -/** remoteDomainEventFired: - * - * The callback for monitoring the remote socket - * for event data - */ -void -remoteDomainEventFired(int watch, - int fd, - int event, - void *opaque) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; - XDR xdr; - int len; - - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - DEBUG("Event fired %d %d %d %X", watch, fd, event, event); - - if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { - DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " - "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); - virEventRemoveHandle(watch); - goto done; - } - - if (fd != priv->sock) { - virEventRemoveHandle(watch); - goto done; - } - - /* Read and deserialise length word. */ - if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) - goto done; - - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { - error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); - goto done; - } - xdr_destroy (&xdr); - - /* Length includes length word - adjust to real length to read. */ - len -= 4; - - if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (conn, VIR_ERR_RPC, _("packet received from server too large")); - goto done; - } - - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, 0, buffer, len) == -1) { - error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); - goto done; - } - - /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (conn, VIR_ERR_RPC, _("invalid header in event firing")); - goto done; - } - - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - DEBUG0("Encountered an async event"); - remoteDomainProcessEvent(conn, &xdr); - } else { - DEBUG0("invalid proc in event firing"); - error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); - } - -done: - remoteDriverUnlock(priv); -} - -void -remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) -{ - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virEventUpdateTimeout(priv->eventFlushTimer, -1); - - remoteDriverUnlock(priv); -} diff --git a/src/util.c b/src/util.c --- a/src/util.c +++ b/src/util.c @@ -167,7 +167,7 @@ static int virSetCloseExec(int fd) { return 0; } -static int virSetNonBlock(int fd) { +int virSetNonBlock(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL)) < 0) return -1; diff --git a/src/util.h b/src/util.h --- a/src/util.h +++ b/src/util.h @@ -36,6 +36,8 @@ enum { VIR_EXEC_NONBLOCK = (1 << 0), VIR_EXEC_DAEMON = (1 << 1), }; + +int virSetNonBlock(int fd); int virExec(virConnectPtr conn, const char *const*argv, diff --git a/src/virsh.c b/src/virsh.c --- a/src/virsh.c +++ b/src/virsh.c @@ -6572,12 +6572,27 @@ _vshStrdup(vshControl *ctl, const char * return NULL; } +static +void *evil(void *data) +{ + virConnectPtr conn = data; + + while (1) { + virNodeInfo ni; + virNodeGetInfo(conn, &ni); + fprintf(stderr, "BG %d %d %d %lu\n", ni.cores, ni.threads, ni.nodes, ni.memory); + sleep(2); + } +} + /* * Initialize connection. */ static int vshInit(vshControl *ctl) { + pthread_t t; + virConnectPtr cloned; if (ctl->conn) return FALSE; @@ -6590,6 +6605,9 @@ vshInit(vshControl *ctl) virConnectAuthPtrDefault, ctl->readonly ? VIR_CONNECT_RO : 0); + + cloned = virConnectClone(ctl->conn); + pthread_create(&t, NULL, evil, cloned); /* This is not necessarily fatal. All the individual commands check * vshConnectionUsability, except ones which don't need a connection -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list