The current remote driver code for streams only supports blocking I/O mode. This is fine for the usage with migration but is a problem for more general use cases, in particular bi-directional streams. This adds supported for the stream callbacks and non-blocking I/O. with the minor caveat is that it doesn't actually do non-blocking I/O for sending stream data, only receiving it. A future patch will try to do non-blocking sends, but this is quite tricky to get right. * src/remote/remote_driver.c: Allow non-blocking I/O for streams and support callbacks --- src/remote/remote_driver.c | 188 ++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 172 insertions(+), 16 deletions(-) diff --git a/src/remote/remote_driver.c b/src/remote/remote_driver.c index 1c874b2..61da8ff 100644 --- a/src/remote/remote_driver.c +++ b/src/remote/remote_driver.c @@ -132,6 +132,13 @@ struct private_stream_data { unsigned int serial; unsigned int proc_nr; + virStreamEventCallback cb; + void *cbOpaque; + virFreeCallback cbFree; + int cbEvents; + int cbTimer; + int cbDispatch; + /* XXX this is potentially unbounded if the client * app has domain events registered, since packets * may be read off wire, while app isn't ready to @@ -200,9 +207,10 @@ struct private_data { }; enum { - REMOTE_CALL_IN_OPEN = (1 << 0), + REMOTE_CALL_IN_OPEN = (1 << 0), REMOTE_CALL_QUIET_MISSING_RPC = (1 << 1), - REMOTE_QEMU_CALL = (1 << 2), + REMOTE_CALL_QEMU = (1 << 2), + REMOTE_CALL_NONBLOCK = (1 << 3), }; @@ -8144,6 +8152,20 @@ remoteStreamOpen(virStreamPtr st, } +static void +remoteStreamEventTimerUpdate(struct private_stream_data *privst) +{ + if (!privst->cb) + return; + + if (!privst->cbEvents) + virEventUpdateTimeout(privst->cbTimer, -1); + else if (privst->incoming && + (privst->cbEvents & VIR_STREAM_EVENT_READABLE)) + virEventUpdateTimeout(privst->cbTimer, 0); +} + + static int remoteStreamPacket(virStreamPtr st, int status, @@ -8338,6 +8360,12 @@ remoteStreamRecv(virStreamPtr st, struct remote_thread_call *thiscall; int ret; + if (st->flags & VIR_STREAM_NONBLOCK) { + DEBUG0("Non-blocking mode and no data available"); + rv = -2; + goto cleanup; + } + if (VIR_ALLOC(thiscall) < 0) { virReportOOMError(); goto cleanup; @@ -8381,6 +8409,8 @@ remoteStreamRecv(virStreamPtr st, rv = 0; } + remoteStreamEventTimerUpdate(privst); + DEBUG("Done %d", rv); cleanup: @@ -8391,28 +8421,153 @@ cleanup: return rv; } + +static void +remoteStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virStreamPtr st = opaque; + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + + remoteDriverLock(priv); + if (privst->cb && + (privst->cbEvents & VIR_STREAM_EVENT_READABLE) && + privst->incomingOffset) { + virStreamEventCallback cb = privst->cb; + void *cbOpaque = privst->cbOpaque; + virFreeCallback cbFree = privst->cbFree; + + privst->cbDispatch = 1; + remoteDriverUnlock(priv); + (cb)(st, VIR_STREAM_EVENT_READABLE, cbOpaque); + remoteDriverLock(priv); + privst->cbDispatch = 0; + + if (!privst->cb && cbFree) + (cbFree)(cbOpaque); + } + remoteDriverUnlock(priv); +} + + +static void +remoteStreamEventTimerFree(void *opaque) +{ + virStreamPtr st = opaque; + virUnrefStream(st); +} + + static int -remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED, - int events ATTRIBUTE_UNUSED, - virStreamEventCallback cb ATTRIBUTE_UNUSED, - void *opaque ATTRIBUTE_UNUSED, - virFreeCallback ff ATTRIBUTE_UNUSED) +remoteStreamEventAddCallback(virStreamPtr st, + int events, + virStreamEventCallback cb, + void *opaque, + virFreeCallback ff) { - return -1; + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + int ret = -1; + + remoteDriverLock(priv); + + if (events & ~VIR_STREAM_EVENT_READABLE) { + remoteError(VIR_ERR_INTERNAL_ERROR, + _("unsupported stream events %d"), events); + goto cleanup; + } + + if (privst->cb) { + remoteError(VIR_ERR_INTERNAL_ERROR, + _("multiple stream callbacks not supported")); + goto cleanup; + } + + virStreamRef(st); + if ((privst->cbTimer = + virEventAddTimeout(-1, + remoteStreamEventTimer, + st, + remoteStreamEventTimerFree)) < 0) { + virUnrefStream(st); + goto cleanup; + } + + privst->cb = cb; + privst->cbOpaque = opaque; + privst->cbFree = ff; + privst->cbEvents = events; + + ret = 0; + +cleanup: + remoteDriverUnlock(priv); + return ret; } static int -remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED, - int events ATTRIBUTE_UNUSED) +remoteStreamEventUpdateCallback(virStreamPtr st, + int events) { - return -1; + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + int ret = -1; + + remoteDriverLock(priv); + + if (events & ~VIR_STREAM_EVENT_READABLE) { + remoteError(VIR_ERR_INTERNAL_ERROR, + _("unsupported stream events %d"), events); + goto cleanup; + } + + if (!privst->cb) { + remoteError(VIR_ERR_INTERNAL_ERROR, + _("no stream callback registered")); + goto cleanup; + } + + privst->cbEvents = events; + + remoteStreamEventTimerUpdate(privst); + + ret = 0; + +cleanup: + remoteDriverUnlock(priv); + return ret; } static int -remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED) +remoteStreamEventRemoveCallback(virStreamPtr st) { - return -1; + struct private_data *priv = st->conn->privateData; + struct private_stream_data *privst = st->privateData; + int ret = -1; + + remoteDriverLock(priv); + + if (!privst->cb) { + remoteError(VIR_ERR_INTERNAL_ERROR, + _("no stream callback registered")); + goto cleanup; + } + + if (!privst->cbDispatch && + privst->cbFree) + (privst->cbFree)(privst->cbOpaque); + privst->cb = NULL; + privst->cbOpaque = NULL; + privst->cbFree = NULL; + privst->cbEvents = 0; + virEventRemoveTimeout(privst->cbTimer); + + ret = 0; + +cleanup: + remoteDriverUnlock(priv); + return ret; } static int @@ -9065,7 +9220,7 @@ remoteQemuDomainMonitorCommand (virDomainPtr domain, const char *cmd, args.flags = flags; memset (&ret, 0, sizeof ret); - if (call (domain->conn, priv, REMOTE_QEMU_CALL, QEMU_PROC_MONITOR_COMMAND, + if (call (domain->conn, priv, REMOTE_CALL_QEMU, QEMU_PROC_MONITOR_COMMAND, (xdrproc_t) xdr_qemu_monitor_command_args, (char *) &args, (xdrproc_t) xdr_qemu_monitor_command_ret, (char *) &ret) == -1) goto done; @@ -9119,7 +9274,7 @@ prepareCall(struct private_data *priv, rv->ret = ret; rv->want_reply = 1; - if (flags & REMOTE_QEMU_CALL) { + if (flags & REMOTE_CALL_QEMU) { hdr.prog = QEMU_PROGRAM; hdr.vers = QEMU_PROTOCOL_VERSION; } @@ -9512,7 +9667,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv, expectedprog = REMOTE_PROGRAM; expectedvers = REMOTE_PROTOCOL_VERSION; - if (flags & REMOTE_QEMU_CALL) { + if (flags & REMOTE_CALL_QEMU) { expectedprog = QEMU_PROGRAM; expectedvers = QEMU_PROTOCOL_VERSION; } @@ -9738,6 +9893,7 @@ processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED, thecall->mode = REMOTE_MODE_COMPLETE; } else { VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset); + remoteStreamEventTimerUpdate(privst); } return 0; } -- 1.7.2.3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list