On Fri, Oct 17, 2008 at 12:02:13PM -0400, Ben Guthro wrote: > Deliver local callbacks in response to remote events > > remote_internal.c | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 248 insertions(+), 7 deletions(-) > @@ -680,6 +689,26 @@ doRemoteOpen (virConnectPtr conn, > (xdrproc_t) xdr_void, (char *) NULL) == -1) > goto failed; > > + if(VIR_ALLOC(priv->domainEvents)<0) { > + error(conn, VIR_ERR_INVALID_ARG, _("Error allocating domainEvents")); > + goto failed; > + } > + > + DEBUG0("Adding Handler for remote events"); > + /* Set up a callback to listen on the socket data */ > + if (virEventAddHandle(priv->sock, > + POLLIN | POLLERR | POLLHUP, > + remoteDomainEventFired, > + conn) < 0) { > + DEBUG0("virEventAddHandle failed: No addHandleImpl defined. continuing without events."); > + } > + > + DEBUG0("Adding Timeout for remote event queue flushing"); > + if ( (priv->eventFlushTimer = virEventAddTimeout(0, > + remoteDomainEventQueueFlush, > + conn)) < 0) { Small bug here - this creates an active timer event, which will fire immediately & forever. Simply change the '0' to a '-1' to register a timeout that's initially disabled, and then use virEventUpdateTimeout to toggle it on/off only when required. > + > +static int remoteDomainEventRegister (virConnectPtr conn, > + void *callback, > + void *opaque) > +{ > + struct private_data *priv = conn->privateData; > + > + /* dispatch an rpc - so the server sde can track > + how many callbacks are regstered */ > + remote_domain_events_register_args args; > + args.callback = (unsigned long)callback; > + args.user_data = (unsigned long)opaque; This relates back to my comment on the remote_protocl.x file - i feel we should probably maintain a generic token, rather than pointing the actual pointers over the wire as ints. > /*----------------------------------------------------------------------*/ > > @@ -4367,6 +4444,7 @@ call (virConnectPtr conn, struct private_data *priv, > really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) > return -1; > > +retry_read: > /* Read and deserialise length word. */ > if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) > return -1; > @@ -4418,10 +4496,19 @@ call (virConnectPtr conn, struct private_data *priv, > return -1; > } > > - /* If we extend the server to actually send asynchronous messages, then > - * we'll need to change this so that it can recognise an asynch > - * message being received at this point. > - */ > + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && > + hdr.direction == REMOTE_MESSAGE) { > + /* An async message has come in while we were waiting for the > + * response. Process it to pull it off the wire, and try again > + */ > + DEBUG0("Encountered an event while waiting for a response"); > + > + remoteDomainQueueEvent(conn, &xdr); Need to call virEventUpdateTimeout() to enable the timer here. > +/** > + * remoteDomainReadEvent > + * > + * Read the event data off the wire > + */ > +static int remoteDomainReadEvent(virConnectPtr conn, XDR *xdr, > + virDomainPtr *dom, int *event, > + virConnectDomainEventCallback *cb, > + void **opaque) > +{ > + remote_domain_event_ret ret; > + memset (&ret, 0, sizeof ret); > + > + /* unmarshall parameters, and process it*/ > + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { > + error (conn, VIR_ERR_RPC, _("remoteDomainProcessEvent: unmarshalling ret")); > + return -1; > + } > + > + *dom = get_nonnull_domain(conn,ret.dom); > + *event = ret.event; > + *cb = (virConnectDomainEventCallback)ret.callback; > + *opaque = (void *)ret.user_data; > + > + return 0; > +} > + > +static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) > +{ > + virDomainPtr dom; > + int event; > + virConnectDomainEventCallback cb; > + void *opaque; > + if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) { > + DEBUG0("Calling domain event callback (no queue)"); > + if(cb) > + cb(conn,dom,event,opaque); Needs to call virDomainFree(dom) to release the object. > + } > +} > + > +static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) > +{ > + virDomainPtr dom; > + int event; > + virConnectDomainEventCallback cb; > + void *opaque; > + struct private_data *priv = conn->privateData; > + > + if(!remoteDomainReadEvent(conn, xdr, &dom, &event, &cb, &opaque)) > + { > + if( virDomainEventCallbackQueuePush(priv->domainEvents, dom, event, cb, opaque) < 0 ) { > + DEBUG("%s", "Error adding event to queue"); > + } > + } > +} > + > +/** remoteDomainEventFired: > + * > + * The callback for monitoring the remote socket > + * for event data > + */ > +void remoteDomainEventFired(int fd ATTRIBUTE_UNUSED, > + int event ATTRIBUTE_UNUSED, > + void *opaque) > +{ > + char buffer[REMOTE_MESSAGE_MAX]; > + char buffer2[4]; > + struct remote_message_header hdr; > + XDR xdr; > + int len; > + > + virConnectPtr conn = opaque; > + struct private_data *priv = conn->privateData; > + > + DEBUG("%s : Event fired", __FUNCTION__); > + > + /* Read and deserialise length word. */ > + if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) > + return; > + > + xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); > + if (!xdr_int (&xdr, &len)) { > + error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); > + return; > + } > + xdr_destroy (&xdr); > + > + /* Length includes length word - adjust to real length to read. */ > + len -= 4; > + > + if (len < 0 || len > REMOTE_MESSAGE_MAX) { > + error (conn, VIR_ERR_RPC, _("packet received from server too large")); > + return; > + } > + > + /* Read reply header and what follows (either a ret or an error). */ > + if (really_read (conn, priv, 0, buffer, len) == -1) { > + error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); > + return; > + } > + > + /* Deserialise reply header. */ > + xdrmem_create (&xdr, buffer, len, XDR_DECODE); > + if (!xdr_remote_message_header (&xdr, &hdr)) { > + error (conn, VIR_ERR_RPC, _("invalid header in event firing")); > + return; > + } > + > + if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && > + hdr.direction == REMOTE_MESSAGE) { > + DEBUG0("Encountered an async event"); > + remoteDomainProcessEvent(conn, &xdr); > + } else { > + DEBUG0("invalid proc in event firing"); > + error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); > + } > +} > + > +void remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, > + void *opaque) > +{ > + virDomainEventPtr domEvent; > + virConnectPtr conn = opaque; > + struct private_data *priv = conn->privateData; > + > + DEBUG0("Flushing domain events"); > + while( (domEvent = virDomainEventCallbackQueuePop(priv->domainEvents)) ) { > + DEBUG(" Flushing %p", domEvent); > + if(domEvent->cb) > + domEvent->cb(domEvent->dom->conn, > + domEvent->dom, > + domEvent->event, > + domEvent->opaque); Needs to also call virDomainFree(domEvent->dom) to release the object. > + VIR_FREE(domEvent); > + } And virEventUpdateTimeout to disable the timer again. > +} Here's a small additive patch which takes care of the timer issue diff -r 99dad81d37dd src/remote_internal.c --- a/src/remote_internal.c Sun Oct 19 13:46:36 2008 -0400 +++ b/src/remote_internal.c Sun Oct 19 14:06:33 2008 -0400 @@ -704,7 +704,7 @@ } DEBUG0("Adding Timeout for remote event queue flushing"); - if ( (priv->eventFlushTimer = virEventAddTimeout(0, + if ( (priv->eventFlushTimer = virEventAddTimeout(-1, remoteDomainEventQueueFlush, conn)) < 0) { DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. continuing without events."); @@ -4504,6 +4504,7 @@ DEBUG0("Encountered an event while waiting for a response"); remoteDomainQueueEvent(conn, &xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); DEBUG0("Retrying read"); xdr_destroy (&xdr); @@ -5182,4 +5183,6 @@ domEvent->opaque); VIR_FREE(domEvent); } -} + + virEventUpdateTimeout(priv->eventFlushTimer, -1); +} Regards, Daniel -- |: Red Hat, Engineering, London -o- http://people.redhat.com/berrange/ :| |: http://libvirt.org -o- http://virt-manager.org -o- http://ovirt.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: GnuPG: 7D3B9505 -o- F3C9 553F A1DA 4AC2 5648 23C1 B3DF F742 7D3B 9505 :| -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list