* qemud/dispatch.c: Set streamTX flag on outgoing data packets * qemud/qemud.h: Add streamTX flag to track outgoing data * qemud/qemud.c: Re-enable further TX when outgoing data packet has been fully sent. * qemud/stream.h, qemud/strea.c: Add method for enabling TX. Support reading from streams and transmitting data out to client --- qemud/dispatch.c | 2 + qemud/qemud.c | 4 ++- qemud/qemud.h | 1 + qemud/stream.c | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ qemud/stream.h | 4 ++ 5 files changed, 106 insertions(+), 1 deletions(-) diff --git a/qemud/dispatch.c b/qemud/dispatch.c index 1934d24..7417001 100644 --- a/qemud/dispatch.c +++ b/qemud/dispatch.c @@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client, DEBUG("Total %d", msg->bufferOffset); } + if (data) + msg->streamTX = 1; /* Reset ready for I/O */ msg->bufferLength = msg->bufferOffset; diff --git a/qemud/qemud.c b/qemud/qemud.c index 6c81dec..af71495 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -1893,7 +1893,9 @@ void qemudClientMessageRelease(struct qemud_client *client, struct qemud_client_message *msg) { - if (!msg->async) + if (msg->streamTX) { + remoteStreamMessageFinished(client, msg); + } else if (!msg->async) client->nrequests--; /* See if the recv queue is currently throttled */ diff --git a/qemud/qemud.h b/qemud/qemud.h index 8ef5871..911cdc3 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -130,6 +130,7 @@ struct qemud_client_message { unsigned int bufferOffset; unsigned int async : 1; + unsigned int streamTX : 1; remote_message_header hdr; diff --git a/qemud/stream.c b/qemud/stream.c index 1fe0e58..584268d 100644 --- a/qemud/stream.c +++ b/qemud/stream.c @@ -32,6 +32,9 @@ static int remoteStreamHandleWrite(struct qemud_client *client, struct qemud_client_stream *stream); static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream); +static int remoteStreamHandleFinish(struct qemud_client *client, struct qemud_client_stream *stream, struct qemud_client_message *msg); @@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream) int newEvents = 0; if (stream->rx) newEvents |= VIR_STREAM_EVENT_WRITABLE; + if (stream->tx && !stream->recvEOF) + newEvents |= VIR_STREAM_EVENT_READABLE; virStreamEventUpdateCallback(stream->st, newEvents); } @@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque) } } + if (!stream->recvEOF && + (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) { + events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP); + if (remoteStreamHandleRead(client, stream) < 0) { + remoteRemoveClientStream(client, stream); + qemudDispatchClientFailure(client); + goto cleanup; + } + } + if (!stream->closed && (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) { int ret; @@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client, return 0; } + + + +/* + * Invoked when a stream is signalled as having data + * available to read. This reads upto one message + * worth of data, and then queues that for transmission + * to the client. + * + * Returns 0 if data was queued for TX, or a error RPC + * was sent, or -1 on fatal error, indicating client should + * be killed + */ +static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + char *buffer; + size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX; + int ret; + + DEBUG("stream=%p", stream); + + /* Shouldn't ever be called unless we're marked able to + * transmit, but doesn't hurt to check */ + if (!stream->tx) + return 0; + + if (VIR_ALLOC_N(buffer, bufferLen) < 0) + return -1; + + ret = virStreamRecv(stream->st, buffer, bufferLen); + if (ret == -2) { + /* Should never get this, since we're only called when we know + * we're readable, but hey things change... */ + ret = 0; + } else if (ret < 0) { + remote_error rerr; + memset(&rerr, 0, sizeof rerr); + remoteDispatchConnError(&rerr, NULL); + + ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); + } else { + stream->tx = 0; + if (ret == 0) + stream->recvEOF = 1; + ret = remoteSendStreamData(client, stream, buffer, ret); + } + + VIR_FREE(buffer); + return ret; +} + + +/* + * Invoked when an outgoing data packet message has been fully sent. + * This simply re-enables TX of further data. + * + * The idea is to stop the daemon growing without bound due to + * fast stream, but slow client + */ +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg) +{ + struct qemud_client_stream *stream = client->streams; + + while (stream) { + if (msg->hdr.proc == stream->procedure && + msg->hdr.serial == stream->serial) + break; + stream = stream->next; + } + + DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial); + + if (stream) { + stream->tx = 1; + remoteStreamUpdateEvents(stream); + } +} diff --git a/qemud/stream.h b/qemud/stream.h index fe5ce6f..de738ba 100644 --- a/qemud/stream.h +++ b/qemud/stream.h @@ -46,4 +46,8 @@ int remoteRemoveClientStream(struct qemud_client *client, struct qemud_client_stream *stream); +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg); + #endif /* __LIBVIRTD_STREAM_H__ */ -- 1.6.2.5 -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list