* qemud/stream.c: Handle incoming stream data packets, queuing until stream becomes writable. Handle stream completion handshake * po/POTFILES.in: Add qemud/stream.c --- po/POTFILES.in | 1 + qemud/stream.c | 305 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 303 insertions(+), 3 deletions(-) diff --git a/po/POTFILES.in b/po/POTFILES.in index 66d3ebd..d144689 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -1,6 +1,7 @@ qemud/dispatch.c qemud/qemud.c qemud/remote.c +qemud/stream.c src/bridge.c src/conf.c src/console.c diff --git a/qemud/stream.c b/qemud/stream.c index 1644a1b..1fe0e58 100644 --- a/qemud/stream.c +++ b/qemud/stream.c @@ -28,6 +28,93 @@ #include "dispatch.h" #include "logging.h" +static int +remoteStreamHandleWrite(struct qemud_client *client, + struct qemud_client_stream *stream); +static int +remoteStreamHandleFinish(struct qemud_client *client, + struct qemud_client_stream *stream, + struct qemud_client_message *msg); +static int +remoteStreamHandleAbort(struct qemud_client *client, + struct qemud_client_stream *stream, + struct qemud_client_message *msg); + + + +static void +remoteStreamUpdateEvents(struct qemud_client_stream *stream) +{ + int newEvents = 0; + if (stream->rx) + newEvents |= VIR_STREAM_EVENT_WRITABLE; + + virStreamEventUpdateCallback(stream->st, newEvents); +} + + +/* + * Callback that gets invoked when a stream becomes writable/readable + */ +static void +remoteStreamEvent(virStreamPtr st, int events, void *opaque) +{ + struct qemud_client *client = opaque; + struct qemud_client_stream *stream; + + /* XXX sub-optimal - we really should be taking the server lock + * first, but we have no handle to the server object + * We're lucky to get away with it for now, due to this callback + * executing in the main thread, but this should really be fixed + */ + virMutexLock(&client->lock); + + stream = remoteFindClientStream(client, st); + + if (!stream) { + VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st); + virStreamEventRemoveCallback(st); + goto cleanup; + } + + DEBUG("st=%p events=%d", st, events); + + if (events & VIR_STREAM_EVENT_WRITABLE) { + if (remoteStreamHandleWrite(client, stream) < 0) { + remoteRemoveClientStream(client, stream); + qemudDispatchClientFailure(client); + goto cleanup; + } + } + + if (!stream->closed && + (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) { + int ret; + remote_error rerr; + memset(&rerr, 0, sizeof rerr); + stream->closed = 1; + virStreamAbort(stream->st); + if (events & VIR_STREAM_EVENT_HANGUP) + remoteDispatchFormatError(&rerr, "%s", _("stream had unexpected termination")); + else + remoteDispatchFormatError(&rerr, "%s", _("stream had I/O failure")); + ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); + remoteRemoveClientStream(client, stream); + if (ret < 0) + qemudDispatchClientFailure(client); + goto cleanup; + } + + if (stream->closed) { + remoteRemoveClientStream(client, stream); + } else { + remoteStreamUpdateEvents(stream); + } + +cleanup: + virMutexUnlock(&client->lock); +} + /* * @client: a locked client object @@ -38,10 +125,54 @@ * -1 on fatal client error */ static int -remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED, - struct qemud_client_message *msg ATTRIBUTE_UNUSED, - void *opaque ATTRIBUTE_UNUSED) +remoteStreamFilter(struct qemud_client *client, + struct qemud_client_message *msg, void *opaque) { + struct qemud_client_stream *stream = opaque; + + if (msg->hdr.serial == stream->serial && + msg->hdr.proc == stream->procedure && + msg->hdr.type == REMOTE_STREAM) { + DEBUG("Incoming rx=%p serial=%d proc=%d status=%d", + stream->rx, msg->hdr.proc, msg->hdr.serial, msg->hdr.status); + + /* If there are queued packets, we need to queue all further + * messages, since they must be processed strictly in order. + * If there are no queued packets, then OK/ERROR messages + * should be processed immediately. Data packets are still + * queued to only be processed when the stream is marked as + * writable. + */ + if (stream->rx) { + qemudClientMessageQueuePush(&stream->rx, msg); + remoteStreamUpdateEvents(stream); + } else { + int ret = 0; + switch (msg->hdr.status) { + case REMOTE_OK: + ret = remoteStreamHandleFinish(client, stream, msg); + if (ret == 0) + qemudClientMessageRelease(client, msg); + break; + + case REMOTE_CONTINUE: + qemudClientMessageQueuePush(&stream->rx, msg); + remoteStreamUpdateEvents(stream); + break; + + case REMOTE_ERROR: + default: + ret = remoteStreamHandleAbort(client, stream, msg); + if (ret == 0) + qemudClientMessageRelease(client, msg); + break; + } + + if (ret < 0) + return -1; + } + return 1; + } return 0; } @@ -119,6 +250,10 @@ int remoteAddClientStream(struct qemud_client *client, DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); + if (virStreamEventAddCallback(stream->st, 0, + remoteStreamEvent, client, NULL) < 0) + return -1; + if (tmp) { while (tmp->next) tmp = tmp->next; @@ -132,6 +267,8 @@ int remoteAddClientStream(struct qemud_client *client, stream->tx = 1; + remoteStreamUpdateEvents(stream); + return 0; } @@ -208,3 +345,165 @@ remoteRemoveClientStream(struct qemud_client *client, } return -1; } + + +/* + * Returns: + * -1 if fatal error occurred + * 0 if message was fully processed + * 1 if message is still being processed + */ +static int +remoteStreamHandleWriteData(struct qemud_client *client, + struct qemud_client_stream *stream, + struct qemud_client_message *msg) +{ + remote_error rerr; + int ret; + + DEBUG("stream=%p proc=%d serial=%d len=%d offset=%d", + stream, msg->hdr.proc, msg->hdr.serial, msg->bufferLength, msg->bufferOffset); + + memset(&rerr, 0, sizeof rerr); + + ret = virStreamSend(stream->st, + msg->buffer + msg->bufferOffset, + msg->bufferLength - msg->bufferOffset); + + if (ret > 0) { + msg->bufferOffset += ret; + + /* Partial write, so indicate we have more todo later */ + if (msg->bufferOffset < msg->bufferLength) + return 1; + } else if (ret == -2) { + /* Blocking, so indicate we have more todo later */ + return 1; + } else { + VIR_INFO0("Stream send failed"); + stream->closed = 1; + remoteDispatchConnError(&rerr, client->conn); + return remoteSerializeReplyError(client, &rerr, &msg->hdr); + } + + return 0; +} + + +/* + * Process an finish handshake from the client. + * + * Returns a REMOTE_OK confirmation if successful, or a REMOTE_ERROR + * if there was a stream error + * + * Returns 0 if successfully sent RPC reply, -1 upon fatal error + */ +static int +remoteStreamHandleFinish(struct qemud_client *client, + struct qemud_client_stream *stream, + struct qemud_client_message *msg) +{ + remote_error rerr; + int ret; + + DEBUG("stream=%p proc=%d serial=%d", + stream, msg->hdr.proc, msg->hdr.serial); + + memset(&rerr, 0, sizeof rerr); + + stream->closed = 1; + ret = virStreamFinish(stream->st); + + if (ret < 0) { + remoteDispatchConnError(&rerr, client->conn); + return remoteSerializeReplyError(client, &rerr, &msg->hdr); + } else { + /* Send zero-length confirm */ + if (remoteSendStreamData(client, stream, NULL, 0) < 0) + return -1; + } + + return 0; +} + + +/* + * Process an abort request from the client. + * + * Returns 0 if successfully aborted, -1 upon error + */ +static int +remoteStreamHandleAbort(struct qemud_client *client, + struct qemud_client_stream *stream, + struct qemud_client_message *msg) +{ + remote_error rerr; + + DEBUG("stream=%p proc=%d serial=%d", + stream, msg->hdr.proc, msg->hdr.serial); + + memset(&rerr, 0, sizeof rerr); + + stream->closed = 1; + virStreamAbort(stream->st); + + if (msg->hdr.status == REMOTE_ERROR) + remoteDispatchFormatError(&rerr, "%s", _("stream aborted at client request")); + else { + VIR_WARN("unexpected stream status %d", msg->hdr.status); + remoteDispatchFormatError(&rerr, _("stream aborted with unexpected status %d"), + msg->hdr.status); + } + + return remoteSerializeReplyError(client, &rerr, &msg->hdr); +} + + + +/* + * Called when the stream is signalled has being able to accept + * data writes. Will process all pending incoming messages + * until they're all gone, or I/O blocks + * + * Returns 0 on success, or -1 upon fatal error + */ +static int +remoteStreamHandleWrite(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + struct qemud_client_message *msg, *tmp; + + DEBUG("stream=%p", stream); + + msg = stream->rx; + while (msg && !stream->closed) { + int ret; + switch (msg->hdr.status) { + case REMOTE_OK: + ret = remoteStreamHandleFinish(client, stream, msg); + break; + + case REMOTE_CONTINUE: + ret = remoteStreamHandleWriteData(client, stream, msg); + break; + + case REMOTE_ERROR: + default: + ret = remoteStreamHandleAbort(client, stream, msg); + break; + } + + if (ret == 0) + qemudClientMessageQueueServe(&stream->rx); + else if (ret < 0) + return -1; + else + break; /* still processing data */ + + tmp = msg->next; + qemudClientMessageRelease(client, msg); + msg = tmp; + } + + return 0; +} -- 1.6.2.5 -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list