* include/libvirt/libvirt.h, include/libvirt/libvirt.h.in: Public API contract for virStreamPtr object * src/libvirt_public.syms: Export data stream APIs * src/libvirt_private.syms: Export internal helper APIs * src/libvirt.c: Data stream API driver dispatch * src/datatypes.h, src/datatypes.c: Internal helpers for virStreamPtr object * src/driver.h: Define internal driver API for streams * .x-sc_avoid_write: Ignore src/libvirt.c because it trips up on comments including write() --- .x-sc_avoid_write | 1 + include/libvirt/libvirt.h | 93 ++++++ include/libvirt/libvirt.h.in | 93 ++++++ src/datatypes.c | 59 ++++ src/datatypes.h | 33 ++ src/driver.h | 34 ++ src/libvirt.c | 683 ++++++++++++++++++++++++++++++++++++++++++ src/libvirt_private.syms | 2 + src/libvirt_public.syms | 15 + 9 files changed, 1013 insertions(+), 0 deletions(-) diff --git a/.x-sc_avoid_write b/.x-sc_avoid_write index 8ed87c5..c5a7535 100644 --- a/.x-sc_avoid_write +++ b/.x-sc_avoid_write @@ -1,5 +1,6 @@ ^src/util\.c$ ^src/xend_internal\.c$ ^src/util-lib\.c$ +^src/libvirt\.c$ ^qemud/qemud.c$ ^gnulib/ diff --git a/include/libvirt/libvirt.h b/include/libvirt/libvirt.h index 855f755..5dcecfd 100644 --- a/include/libvirt/libvirt.h +++ b/include/libvirt/libvirt.h @@ -110,6 +110,24 @@ typedef enum { VIR_DOMAIN_NONE = 0 } virDomainCreateFlags; + + +/** + * virStream: + * + * a virStream is a private structure representing a data stream. + */ +typedef struct _virStream virStream; + +/** + * virStreamPtr: + * + * a virStreamPtr is pointer to a virStream private structure, this is the + * type used to reference a data stream in the API. + */ +typedef virStream *virStreamPtr; + + /** * VIR_SECURITY_LABEL_BUFLEN: * @@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle, virEventAddTimeoutFunc addTimeout, virEventUpdateTimeoutFunc updateTimeout, virEventRemoveTimeoutFunc removeTimeout); + +enum { + VIR_STREAM_NONBLOCK = (1 << 0), +}; + +virStreamPtr virStreamNew(virConnectPtr conn, + unsigned int flags); +int virStreamRef(virStreamPtr st); + +int virStreamSend(virStreamPtr st, + const char *data, + size_t nbytes); + +int virStreamRecv(virStreamPtr st, + char *data, + size_t nbytes); + + +typedef int (*virStreamSourceFunc)(virStreamPtr st, + char *data, + size_t nbytes, + void *opaque); + +int virStreamSendAll(virStreamPtr st, + virStreamSourceFunc handler, + void *opaque); + +typedef int (*virStreamSinkFunc)(virStreamPtr st, + const char *data, + size_t nbytes, + void *opaque); + +int virStreamRecvAll(virStreamPtr st, + virStreamSinkFunc handler, + void *opaque); + +typedef enum { + VIR_STREAM_EVENT_READABLE = (1 << 0), + VIR_STREAM_EVENT_WRITABLE = (1 << 1), + VIR_STREAM_EVENT_ERROR = (1 << 2), + VIR_STREAM_EVENT_HANGUP = (1 << 3), +} virStreamEventType; + + +/** + * virStreamEventCallback: + * + * @stream: stream on which the event occurred + * @events: bitset of events from virEventHandleType constants + * @opaque: user data registered with handle + * + * Callback for receiving stream events. The callback will + * be invoked once for each event which is pending. + */ +typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque); + +int virStreamEventAddCallback(virStreamPtr stream, + int events, + virStreamEventCallback cb, + void *opaque, + virFreeCallback ff); + +int virStreamEventUpdateCallback(virStreamPtr stream, + int events); + +int virStreamEventRemoveCallback(virStreamPtr stream); + + +int virStreamFinish(virStreamPtr st); +int virStreamAbort(virStreamPtr st); + +int virStreamFree(virStreamPtr st); + + + #ifdef __cplusplus } #endif diff --git a/include/libvirt/libvirt.h.in b/include/libvirt/libvirt.h.in index e6536c7..db091dc 100644 --- a/include/libvirt/libvirt.h.in +++ b/include/libvirt/libvirt.h.in @@ -110,6 +110,24 @@ typedef enum { VIR_DOMAIN_NONE = 0 } virDomainCreateFlags; + + +/** + * virStream: + * + * a virStream is a private structure representing a data stream. + */ +typedef struct _virStream virStream; + +/** + * virStreamPtr: + * + * a virStreamPtr is pointer to a virStream private structure, this is the + * type used to reference a data stream in the API. + */ +typedef virStream *virStreamPtr; + + /** * VIR_SECURITY_LABEL_BUFLEN: * @@ -1448,6 +1466,81 @@ void virEventRegisterImpl(virEventAddHandleFunc addHandle, virEventAddTimeoutFunc addTimeout, virEventUpdateTimeoutFunc updateTimeout, virEventRemoveTimeoutFunc removeTimeout); + +enum { + VIR_STREAM_NONBLOCK = (1 << 0), +}; + +virStreamPtr virStreamNew(virConnectPtr conn, + unsigned int flags); +int virStreamRef(virStreamPtr st); + +int virStreamSend(virStreamPtr st, + const char *data, + size_t nbytes); + +int virStreamRecv(virStreamPtr st, + char *data, + size_t nbytes); + + +typedef int (*virStreamSourceFunc)(virStreamPtr st, + char *data, + size_t nbytes, + void *opaque); + +int virStreamSendAll(virStreamPtr st, + virStreamSourceFunc handler, + void *opaque); + +typedef int (*virStreamSinkFunc)(virStreamPtr st, + const char *data, + size_t nbytes, + void *opaque); + +int virStreamRecvAll(virStreamPtr st, + virStreamSinkFunc handler, + void *opaque); + +typedef enum { + VIR_STREAM_EVENT_READABLE = (1 << 0), + VIR_STREAM_EVENT_WRITABLE = (1 << 1), + VIR_STREAM_EVENT_ERROR = (1 << 2), + VIR_STREAM_EVENT_HANGUP = (1 << 3), +} virStreamEventType; + + +/** + * virStreamEventCallback: + * + * @stream: stream on which the event occurred + * @events: bitset of events from virEventHandleType constants + * @opaque: user data registered with handle + * + * Callback for receiving stream events. The callback will + * be invoked once for each event which is pending. + */ +typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque); + +int virStreamEventAddCallback(virStreamPtr stream, + int events, + virStreamEventCallback cb, + void *opaque, + virFreeCallback ff); + +int virStreamEventUpdateCallback(virStreamPtr stream, + int events); + +int virStreamEventRemoveCallback(virStreamPtr stream); + + +int virStreamFinish(virStreamPtr st); +int virStreamAbort(virStreamPtr st); + +int virStreamFree(virStreamPtr st); + + + #ifdef __cplusplus } #endif diff --git a/src/datatypes.c b/src/datatypes.c index d03a679..3611b62 100644 --- a/src/datatypes.c +++ b/src/datatypes.c @@ -1129,3 +1129,62 @@ virUnrefNodeDevice(virNodeDevicePtr dev) { virMutexUnlock(&dev->conn->lock); return (refs); } + + +virStreamPtr virGetStream(virConnectPtr conn) { + virStreamPtr ret = NULL; + + virMutexLock(&conn->lock); + + if (VIR_ALLOC(ret) < 0) { + virReportOOMError(conn); + goto error; + } + ret->magic = VIR_STREAM_MAGIC; + ret->conn = conn; + conn->refs++; + ret->refs++; + virMutexUnlock(&conn->lock); + return(ret); + +error: + virMutexUnlock(&conn->lock); + VIR_FREE(ret); + return(NULL); +} + +static void +virReleaseStream(virStreamPtr st) { + virConnectPtr conn = st->conn; + DEBUG("release dev %p", st); + + st->magic = -1; + VIR_FREE(st); + + DEBUG("unref connection %p %d", conn, conn->refs); + conn->refs--; + if (conn->refs == 0) { + virReleaseConnect(conn); + /* Already unlocked mutex */ + return; + } + + virMutexUnlock(&conn->lock); +} + +int virUnrefStream(virStreamPtr st) { + int refs; + + virMutexLock(&st->conn->lock); + DEBUG("unref stream %p %d", st, st->refs); + st->refs--; + refs = st->refs; + if (refs == 0) { + virReleaseStream(st); + /* Already unlocked mutex */ + return (0); + } + + virMutexUnlock(&st->conn->lock); + return (refs); +} diff --git a/src/datatypes.h b/src/datatypes.h index da83e02..0fed07f 100644 --- a/src/datatypes.h +++ b/src/datatypes.h @@ -100,6 +100,17 @@ /** + * VIR_STREAM_MAGIC: + * + * magic value used to protect the API when pointers to stream structures + * are passed down by the users. + */ +#define VIR_STREAM_MAGIC 0x1DEAD666 +#define VIR_IS_STREAM(obj) ((obj) && (obj)->magic==VIR_STREAM_MAGIC) +#define VIR_IS_CONNECTED_STREAM(obj) (VIR_IS_STREAM(obj) && VIR_IS_CONNECT((obj)->conn)) + + +/** * _virConnect: * * Internal structure associated to a connection @@ -234,6 +245,25 @@ struct _virNodeDevice { }; +typedef int (*virStreamAbortFunc)(virStreamPtr, void *opaque); +typedef int (*virStreamFinishFunc)(virStreamPtr, void *opaque); + +/** + * _virStream: + * + * Internal structure associated with an input stream + */ +struct _virStream { + unsigned int magic; + virConnectPtr conn; + int refs; + int flags; + + virStreamDriverPtr driver; + void *privateData; +}; + + /************************************************************************ * * * API for domain/connections (de)allocations and lookups * @@ -270,4 +300,7 @@ virNodeDevicePtr virGetNodeDevice(virConnectPtr conn, const char *name); int virUnrefNodeDevice(virNodeDevicePtr dev); +virStreamPtr virGetStream(virConnectPtr conn); +int virUnrefStream(virStreamPtr st); + #endif diff --git a/src/driver.h b/src/driver.h index 79d46ff..25d34b6 100644 --- a/src/driver.h +++ b/src/driver.h @@ -799,6 +799,40 @@ struct _virDeviceMonitor { virDrvNodeDeviceDestroy deviceDestroy; }; +typedef struct _virStreamDriver virStreamDriver; +typedef virStreamDriver *virStreamDriverPtr; + +typedef int (*virDrvStreamSend)(virStreamPtr st, + const char *data, + size_t nbytes); +typedef int (*virDrvStreamRecv)(virStreamPtr st, + char *data, + size_t nbytes); + +typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream, + int events, + virStreamEventCallback cb, + void *opaque, + virFreeCallback ff); + +typedef int (*virDrvStreamEventUpdateCallback)(virStreamPtr stream, + int events); +typedef int (*virDrvStreamEventRemoveCallback)(virStreamPtr stream); +typedef int (*virDrvStreamFinish)(virStreamPtr st); +typedef int (*virDrvStreamAbort)(virStreamPtr st); + + +struct _virStreamDriver { + virDrvStreamSend streamSend; + virDrvStreamRecv streamRecv; + virDrvStreamEventAddCallback streamAddCallback; + virDrvStreamEventUpdateCallback streamUpdateCallback; + virDrvStreamEventRemoveCallback streamRemoveCallback; + virDrvStreamFinish streamFinish; + virDrvStreamAbort streamAbort; +}; + + /* * Registration * TODO: also need ways to (des)activate a given driver diff --git a/src/libvirt.c b/src/libvirt.c index ca8e003..d6536f4 100644 --- a/src/libvirt.c +++ b/src/libvirt.c @@ -559,6 +559,10 @@ virLibNodeDeviceError(virNodeDevicePtr dev, virErrorNumber error, errmsg, info, NULL, 0, 0, errmsg, info); } +#define virLibStreamError(conn, code, fmt...) \ + virReportErrorHelper(conn, VIR_FROM_NONE, code, __FILE__, \ + __FUNCTION__, __LINE__, fmt) + /** * virRegisterNetworkDriver: * @driver: pointer to a network driver block @@ -8626,3 +8630,682 @@ error: virSetConnError(conn); return -1; } + + +/** + * virStreamNew: + * @conn: pointer to the connection + * @flags: control features of the stream + * + * Creates a new stream object which can be used to perform + * streamed I/O with other public API function. + * + * When no longer needed, a stream object must be released + * with virStreamFree. If a data stream has been used, + * then the application must call virStreamFinish or + * virStreamAbort before free'ing to, in order to notify + * the driver of termination. + * + * If a non-blocking data stream is required passed + * VIR_STREAM_NONBLOCK for flags, otherwise pass 0. + * + * Returns the new stream, or NULL upon error + */ +virStreamPtr +virStreamNew(virConnectPtr conn, + unsigned int flags) +{ + virStreamPtr st; + + DEBUG("conn=%p, flags=%u", conn, flags); + + virResetLastError(); + + if (!VIR_IS_CONNECT(conn)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (NULL); + } + + st = virGetStream(conn); + if (st) + st->flags = flags; + + return st; +} + + +/** + * virStreamRef: + * @stream: pointer to the stream + * + * Increment the reference count on the stream. For each + * additional call to this method, there shall be a corresponding + * call to virStreamFree to release the reference count, once + * the caller no longer needs the reference to this object. + * + * Returns 0 in case of success, -1 in case of failure + */ +int +virStreamRef(virStreamPtr stream) +{ + if ((!VIR_IS_CONNECTED_STREAM(stream))) { + virLibConnError(NULL, VIR_ERR_INVALID_ARG, __FUNCTION__); + return(-1); + } + virMutexLock(&stream->conn->lock); + DEBUG("stream=%p refs=%d", stream, stream->refs); + stream->refs++; + virMutexUnlock(&stream->conn->lock); + return 0; +} + + +/** + * virStreamSend: + * @stream: pointer to the stream object + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Write a series of bytes to the stream. This method may + * block the calling application for an arbitrary amount + * of time. Once an application has finished sending data + * it should call virStreamFinish to wait for succesful + * confirmation from the driver, or detect any error + * + * This method may not be used if a stream source has been + * registered + * + * Errors are not guaranteed to be reported synchronously + * with the call, but may instead be delayed until a + * subsequent call. + * + * A example using this with a hypothetical file upload + * API looks like + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_RDONLY) + * + * virConnectUploadFile(conn, "demo.iso", st); + * + * while (1) { + * char buf[1024]; + * int got = read(fd, buf, 1024); + * if (got < 0) { + * virStreamAbort(st); + * break; + * } + * if (got == 0) { + * virStreamFinish(st); + * break; + * } + * int offset = 0; + * while (offset < got) { + * int sent = virStreamSend(st, buf+offset, got-offset) + * if (sent < 0) { + * virStreamAbort(st); + * goto done; + * } + * offset += sent; + * } + * } + * done: + * virStreamFree(st); + * close(fd); + * + * Returns the number of bytes written, which may be less + * than requested. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if the outgoing transmit buffers are full & + * the stream is marked as non-blocking. + */ +int virStreamSend(virStreamPtr stream, + const char *data, + size_t nbytes) +{ + DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamSend) { + int ret; + ret = (stream->driver->streamSend)(stream, data, nbytes); + if (ret == -2) + return -2; + + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + +/** + * virStreamRecv: + * @stream: pointer to the stream object + * @data: buffer to write to stream + * @nbytes: size of @data buffer + * + * Write a series of bytes to the stream. This method may + * block the calling application for an arbitrary amount + * of time. + * + * Errors are not guaranteed to be reported synchronously + * with the call, but may instead be delayed until a + * subsequent call. + * + * A example using this with a hypothetical file download + * API looks like + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY, 0600) + * + * virConnectDownloadFile(conn, "demo.iso", st); + * + * while (1) { + * char buf[1024]; + * int got = virStreamRecv(st, buf, 1024); + * if (got < 0) + * break; + * if (got == 0) { + * virStreamFinish(st); + * break; + * } + * int offset = 0; + * while (offset < got) { + * int sent = write(fd, buf+offset, got-offset) + * if (sent < 0) { + * virStreamAbort(st); + * goto done; + * } + * offset += sent; + * } + * } + * done: + * virStreamFree(st); + * close(fd); + * + * + * Returns the number of bytes read, which may be less + * than requested. + * + * Returns 0 when the end of the stream is reached, at + * which time the caller should invoke virStreamFinish() + * to get confirmation of stream completion. + * + * Returns -1 upon error, at which time the stream will + * be marked as aborted, and the caller should now release + * the stream with virStreamFree. + * + * Returns -2 if there is no data pending to be read & the + * stream is marked as non-blocking. + */ +int virStreamRecv(virStreamPtr stream, + char *data, + size_t nbytes) +{ + DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamRecv) { + int ret; + ret = (stream->driver->streamRecv)(stream, data, nbytes); + if (ret == -2) + return -2; + + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamSendAll: + * @stream: pointer to the stream object + * @handler: source callback for reading data from application + * @opaque: application defined data + * + * Send the entire data stream, reading the data from the + * requested data source. This is simply a convenient alternative + * to virStreamSend, for apps that do blocking-I/o. + * + * A example using this with a hypothetical file upload + * API looks like + * + * int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return read(*fd, buf, nbytes); + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_RDONLY) + * + * virConnectUploadFile(conn, st); + * virStreamSendAll(st, mysource, &fd); + * virStreamFree(st); + * close(fd); + * + * Returns 0 if all the data was succesfully sent. The stream + * will be marked as finished on success, so the caller need + * only call virStreamFree(). + * + * Returns -1 upon any error, with the stream being marked as + * aborted, so the caller need only call virStreamFree() + */ +int virStreamSendAll(virStreamPtr stream, + virStreamSourceFunc handler, + void *opaque) +{ + char *bytes = NULL; + int want = 1024*64; + int ret = -1; + DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virLibConnError(NULL, VIR_ERR_OPERATION_INVALID, + _("data sources cannot be used for non-blocking streams")); + goto cleanup; + } + + if (VIR_ALLOC_N(bytes, want) < 0) { + virReportOOMError(stream->conn); + goto cleanup; + } + + for (;;) { + int got, offset = 0; + got = (handler)(stream, bytes, want, opaque); + if (got < 0) { + virStreamAbort(stream); + goto cleanup; + } + if (got == 0) + break; + while (offset < got) { + int done; + done = virStreamSend(stream, bytes + offset, got - offset); + if (done < 0) + goto cleanup; + offset += done; + } + } + ret = 0; + +cleanup: + VIR_FREE(bytes); + + /* Copy to connection error object for back compatability */ + if (ret != 0) + virSetConnError(stream->conn); + + return ret; +} + + +/** + * virStreamRecvAll: + * @stream: pointer to the stream object + * @handler: sink callback for writing data to application + * @opaque: application defined data + * + * Receive the entire data stream, sending the data to the + * requested data sink. This is simply a convenient alternative + * to virStreamRecv, for apps that do blocking-I/o. + * + * A example using this with a hypothetical file download + * API looks like + * + * int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) { + * int *fd = opaque; + * + * return write(*fd, buf, nbytes); + * } + * + * virStreamPtr st = virStreamNew(conn, 0); + * int fd = open("demo.iso", O_WRONLY) + * + * virConnectUploadFile(conn, st); + * virStreamRecvAll(st, mysink, &fd); + * virStreamFree(st); + * close(fd); + * + * Returns 0 if all the data was succesfully received. The stream + * will be marked as finished on success, so the caller need + * only call virStreamFree(). + * + * Returns -1 upon any error, with the stream being marked as + * aborted, so the caller need only call virStreamFree() + */ +int virStreamRecvAll(virStreamPtr stream, + virStreamSinkFunc handler, + void *opaque) +{ + char *bytes = NULL; + int want = 1024*64; + int ret = -1; + DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->flags & VIR_STREAM_NONBLOCK) { + virLibConnError(NULL, VIR_ERR_OPERATION_INVALID, + _("data sinks cannot be used for non-blocking streams")); + goto cleanup; + } + + + if (VIR_ALLOC_N(bytes, want) < 0) { + virReportOOMError(stream->conn); + goto cleanup; + } + + for (;;) { + int got, offset = 0; + got = virStreamRecv(stream, bytes, want); + if (got < 0) + goto cleanup; + if (got == 0) + break; + while (offset < got) { + int done; + done = (handler)(stream, bytes + offset, got - offset, opaque); + if (done < 0) { + virStreamAbort(stream); + goto cleanup; + } + offset += done; + } + } + ret = 0; + +cleanup: + VIR_FREE(bytes); + + /* Copy to connection error object for back compatability */ + if (ret != 0) + virSetConnError(stream->conn); + + return ret; +} + + +/** + * virStreamEventAddCallback + * @stream: pointer to the stream object + * @events: set of events to monitor + * @cb: callback to invoke when an event occurs + * @opaque: application defined data + * @ff: callback to free @opaque data + * + * Register a callback to be notified when a stream + * becomes writable, or readable. This is most commonly + * used in conjunction with non-blocking data streams + * to integrate into an event loop + * + * Return 0 on success, -1 upon error + */ +int virStreamEventAddCallback(virStreamPtr stream, + int events, + virStreamEventCallback cb, + void *opaque, + virFreeCallback ff) +{ + DEBUG("stream=%p, events=%d, cb=%p, opaque=%p, ff=%p", stream, events, cb, opaque, ff); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamAddCallback) { + int ret; + ret = (stream->driver->streamAddCallback)(stream, events, cb, opaque, ff); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamEventUpdateCallback + * @stream: pointer to the stream object + * @events: set of events to monitor + * + * Changes the set of events to monitor for a stream. This allows + * for event notification to be changed without having to + * unregister & register the callback completely. This method + * is guarenteed to succeed if a callback is already registered + * + * Returns 0 on success, -1 if no callback is registered + */ +int virStreamEventUpdateCallback(virStreamPtr stream, + int events) +{ + DEBUG("stream=%p, events=%d", stream, events); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamUpdateCallback) { + int ret; + ret = (stream->driver->streamUpdateCallback)(stream, events); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamEventRemoveCallback + * @stream: pointer to the stream object + * + * Remove a event callback from the stream + * + * Return 0 on success, -1 on error + */ +int virStreamEventRemoveCallback(virStreamPtr stream) +{ + DEBUG("stream=%p", stream); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamRemoveCallback) { + int ret; + ret = (stream->driver->streamRemoveCallback)(stream); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamFinish: + * @stream: pointer to the stream object + * + * Indicate that there is no further data is to be transmitted + * on the stream. For output streams this should be called once + * all data has been written. For input streams this should be + * called once virStreamRecv returns end-of-file. + * + * This method is a synchronization point for all asynchronous + * errors, so if this returns a success code the application can + * be sure that all data has been successfully processed. + * + * Returns 0 on success, -1 upon error + */ +int virStreamFinish(virStreamPtr stream) +{ + DEBUG("stream=%p", stream); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamFinish) { + int ret; + ret = (stream->driver->streamFinish)(stream); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamAbort: + * @stream: pointer to the stream object + * + * Request that the in progress data transfer be cancelled + * abnormally before the end of the stream has been reached. + * For output streams this can be used to inform the driver + * that the stream is being terminated early. For input + * streams this can be used to inform the driver that it + * should stop sending data. + * + * Returns 0 on success, -1 upon error + */ +int virStreamAbort(virStreamPtr stream) +{ + DEBUG("stream=%p", stream); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + if (stream->driver && + stream->driver->streamAbort) { + int ret; + ret = (stream->driver->streamAbort)(stream); + if (ret < 0) + goto error; + return ret; + } + + virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__); +error: + /* Copy to connection error object for back compatability */ + virSetConnError(stream->conn); + return -1; +} + + +/** + * virStreamFree: + * @stream: pointer to the stream object + * + * Decrement the reference count on a stream, releasing + * the stream object if the reference count has hit zero. + * + * There must not be a active data transfer in progress + * when releasing the stream. If a stream needs to be + * disposed of prior to end of stream being reached, then + * the virStreamAbort function should be called first. + * + * Returns 0 upon success, or -1 on error + */ +int virStreamFree(virStreamPtr stream) +{ + DEBUG("stream=%p", stream); + + virResetLastError(); + + if (!VIR_IS_CONNECTED_STREAM(stream)) { + virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__); + return (-1); + } + + /* XXX Enforce shutdown before free'ing resources ? */ + + if (virUnrefStream(stream) < 0) + return (-1); + return (0); +} diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 2bf4e15..12d552e 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -56,6 +56,8 @@ virUnrefStorageVol; virGetNodeDevice; virUnrefDomain; virUnrefConnect; +virGetStream; +virUnrefStream; # domain_conf.h diff --git a/src/libvirt_public.syms b/src/libvirt_public.syms index c06f51e..f48b8c5 100644 --- a/src/libvirt_public.syms +++ b/src/libvirt_public.syms @@ -291,4 +291,19 @@ LIBVIRT_0.7.0 { virConnectListDefinedInterfaces; } LIBVIRT_0.6.4; +LIBVIRT_0.7.1 { + virStreamNew; + virStreamRef; + virStreamSend; + virStreamRecv; + virStreamSendAll; + virStreamRecvAll; + virStreamEventAddCallback; + virStreamEventUpdateCallback; + virStreamEventRemoveCallback; + virStreamFinish; + virStreamAbort; + virStreamFree; +} LIBVIRT_0.7.0; + # .... define new API here using predicted next version number .... -- 1.6.2.5 -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list