[PATCH 07/12] Domain Events - remote driver Deliver local callbacks in response to remote events remote_internal.c | 276 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 271 insertions(+), 5 deletions(-)
diff --git a/src/remote_internal.c b/src/remote_internal.c index 35b7b4b..8875674 100644 --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -34,6 +34,7 @@ #include <signal.h> #include <sys/types.h> #include <sys/stat.h> +#include <sys/poll.h> #include <fcntl.h> #ifdef HAVE_SYS_WAIT_H @@ -73,6 +74,7 @@ #include "remote_protocol.h" #include "memory.h" #include "util.h" +#include "event.h" /* Per-connection private data. */ #define MAGIC 999 /* private_data->magic if OK */ @@ -97,6 +99,13 @@ struct private_data { unsigned int saslDecodedLength; unsigned int saslDecodedOffset; #endif + /* The list of domain event callbacks */ + virDomainEventCallbackListPtr callbackList; + /* The queue of domain events generated + during a call / response rpc */ + virDomainEventQueuePtr domainEvents; + /* Timer for flushing domainEvents queue */ + int eventFlushTimer; }; #define GET_PRIVATE(conn,retcode) \ @@ -156,7 +165,10 @@ static void make_nonnull_domain (remote_nonnull_domain *dom_dst, virDomainPtr do static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr net_src); static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); - +void remoteDomainEventFired(int fd, virEventHandleType event, void *data); +static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); +static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); +void remoteDomainEventQueueFlush(int timer, void *opaque); /*----------------------------------------------------------------------*/ /* Helper functions for remoteOpen. */ @@ -680,6 +692,36 @@ doRemoteOpen (virConnectPtr conn, (xdrproc_t) xdr_void, (char *) NULL) == -1) goto failed; + if(VIR_ALLOC(priv->callbackList)<0) { + error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list")); + goto failed; + } + + if(VIR_ALLOC(priv->domainEvents)<0) { + error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents")); + goto failed; + } + + DEBUG0("Adding Handler for remote events"); + /* Set up a callback to listen on the socket data */ + if (virEventAddHandle(priv->sock, + VIR_EVENT_HANDLE_READABLE | + VIR_EVENT_HANDLE_ERROR | + VIR_EVENT_HANDLE_HANGUP, + remoteDomainEventFired, + conn) < 0) { + DEBUG0("virEventAddHandle failed: No addHandleImpl defined." + " continuing without events."); + } else { + + DEBUG0("Adding Timeout for remote event queue flushing"); + if ( (priv->eventFlushTimer = virEventAddTimeout(-1, + remoteDomainEventQueueFlush, + conn)) < 0) { + DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " + "continuing without events."); + } + } /* Successful. */ retcode = VIR_DRV_OPEN_SUCCESS; @@ -1101,6 +1143,11 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv) (xdrproc_t) xdr_void, (char *) NULL) == -1) return -1; + /* Remove handle for remote events */ + virEventRemoveHandle(priv->sock); + /* Remove timout */ + virEventRemoveTimeout(priv->eventFlushTimer); + /* Close socket. */ if (priv->uses_tls && priv->session) { gnutls_bye (priv->session, GNUTLS_SHUT_RDWR); @@ -1132,6 +1179,12 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv) /* Free private data. */ priv->magic = DEAD; + /* Free callback list */ + virDomainEventCallbackListFree(priv->callbackList); + + /* Free queued events */ + virDomainEventQueueFree(priv->domainEvents); + return 0; } @@ -4288,6 +4341,52 @@ remoteAuthPolkit (virConnectPtr conn, struct private_data *priv, int in_open, return 0; } #endif /* HAVE_POLKIT */ +/*----------------------------------------------------------------------*/ + +static int remoteDomainEventRegister (virConnectPtr conn, + void *callback ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + struct private_data *priv = conn->privateData; + + if (virDomainEventCallbackListAdd(conn, priv->callbackList, + callback, opaque) < 0) { + error (conn, VIR_ERR_RPC, _("adding cb to list")); + return -1; + } + + if ( priv->callbackList->count == 1 ) { + /* Tell the server when we are the first callback deregistering */ + if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_REGISTER, + (xdrproc_t) xdr_void, (char *) NULL, + (xdrproc_t) xdr_void, (char *) NULL) == -1) + return -1; + } + + return 0; +} + +static int remoteDomainEventDeregister (virConnectPtr conn, + void *callback ATTRIBUTE_UNUSED) +{ + struct private_data *priv = conn->privateData; + + if (virDomainEventCallbackListRemove(conn, priv->callbackList, + callback) < 0) { + error (conn, VIR_ERR_RPC, _("removing cb fron list")); + return -1; + } + + if ( priv->callbackList->count == 0 ) { + /* Tell the server when we are the last callback deregistering */ + if (call (conn, priv, 0, REMOTE_PROC_DOMAIN_EVENTS_DEREGISTER, + (xdrproc_t) xdr_void, (char *) NULL, + (xdrproc_t) xdr_void, (char *) NULL) == -1) + return -1; + } + + return 0; +} /*----------------------------------------------------------------------*/ @@ -4367,6 +4466,7 @@ call (virConnectPtr conn, struct private_data *priv, really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) return -1; +retry_read: /* Read and deserialise length word. */ if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) return -1; @@ -4418,10 +4518,20 @@ call (virConnectPtr conn, struct private_data *priv, return -1; } - /* If we extend the server to actually send asynchronous messages, then - * we'll need to change this so that it can recognise an asynch - * message being received at this point. - */ + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && + hdr.direction == REMOTE_MESSAGE) { + /* An async message has come in while we were waiting for the + * response. Process it to pull it off the wire, and try again + */ + DEBUG0("Encountered an event while waiting for a response"); + + remoteDomainQueueEvent(conn, &xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); + + DEBUG0("Retrying read"); + xdr_destroy (&xdr); + goto retry_read; + } if (hdr.proc != proc_nr) { __virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, @@ -4872,6 +4982,8 @@ static virDriver driver = { .domainMemoryPeek = remoteDomainMemoryPeek, .nodeGetCellsFreeMemory = remoteNodeGetCellsFreeMemory, .getFreeMemory = remoteNodeGetFreeMemory, + .domainEventRegister = remoteDomainEventRegister, + .domainEventDeregister = remoteDomainEventDeregister, }; static virNetworkDriver network_driver = { @@ -4957,3 +5069,157 @@ remoteRegister (void) return 0; } + +/** + * remoteDomainReadEvent + * + * Read the event data off the wire + */ +static int +remoteDomainReadEvent(virConnectPtr conn, XDR *xdr, + virDomainPtr *dom, int *event) +{ + remote_domain_event_ret ret; + memset (&ret, 0, sizeof ret); + + /* unmarshall parameters, and process it*/ + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { + error (conn, VIR_ERR_RPC, + _("remoteDomainProcessEvent: unmarshalling ret")); + return -1; + } + + *dom = get_nonnull_domain(conn,ret.dom); + *event = ret.event; + + return 0; +} + +static void +remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) +{ + virDomainPtr dom; + int event,i; + struct private_data *priv = conn->privateData; + + if(!remoteDomainReadEvent(conn, xdr, &dom, &event)) { + DEBUG0("Calling domain event callbacks (no queue)"); + for(i=0 ; i < priv->callbackList->count ; i++) { + if( priv->callbackList->callbacks[i] ) + priv->callbackList->callbacks[i]->cb(conn, dom, event, + priv->callbackList->callbacks[i]->opaque); + } + } +} + +static void +remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) +{ + virDomainPtr dom; + int event; + struct private_data *priv = conn->privateData; + + if(!remoteDomainReadEvent(conn, xdr, &dom, &event)) + { + if( virDomainEventCallbackQueuePush(priv->domainEvents, + dom, event) < 0 ) { + DEBUG("%s", "Error adding event to queue"); + } + } +} + +/** remoteDomainEventFired: + * + * The callback for monitoring the remote socket + * for event data + */ +void +remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, + virEventHandleType event, + void *opaque) +{ + char buffer[REMOTE_MESSAGE_MAX]; + char buffer2[4]; + struct remote_message_header hdr; + XDR xdr; + int len; + + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + DEBUG("%s : Event fired %d %X", __FUNCTION__, event, event); + + if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { + DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " + "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); + virEventRemoveHandle(fd); + return; + } + + /* Read and deserialise length word. */ + if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) + return; + + xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); + if (!xdr_int (&xdr, &len)) { + error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); + return; + } + xdr_destroy (&xdr); + + /* Length includes length word - adjust to real length to read. */ + len -= 4; + + if (len < 0 || len > REMOTE_MESSAGE_MAX) { + error (conn, VIR_ERR_RPC, _("packet received from server too large")); + return; + } + + /* Read reply header and what follows (either a ret or an error). */ + if (really_read (conn, priv, 0, buffer, len) == -1) { + error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); + return; + } + + /* Deserialise reply header. */ + xdrmem_create (&xdr, buffer, len, XDR_DECODE); + if (!xdr_remote_message_header (&xdr, &hdr)) { + error (conn, VIR_ERR_RPC, _("invalid header in event firing")); + return; + } + + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && + hdr.direction == REMOTE_MESSAGE) { + DEBUG0("Encountered an async event"); + remoteDomainProcessEvent(conn, &xdr); + } else { + DEBUG0("invalid proc in event firing"); + error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); + } +} + +void +remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + int i; + virDomainEventPtr domEvent; + void *user_data = NULL; + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) { + DEBUG(" Flushing %p", domEvent); + for (i=0 ; i < priv->callbackList->count ; i++) { + if( priv->callbackList->callbacks[i] ) { + user_data = priv->callbackList->callbacks[i]->opaque; + priv->callbackList->callbacks[i]->cb(domEvent->dom->conn, + domEvent->dom, + domEvent->event, + user_data); + } + } + VIR_FREE(domEvent); + } + + virEventUpdateTimeout(priv->eventFlushTimer, -1); +}
-- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list