Defines the extensions to the remote protocol for generic data streams. Adds a bunch of helper code to the libvirtd daemon for working with data streams. * qemud/Makefile.am: Add stream.c/stream.h to build * qemud/stream.c, qemud/stream.h: Generic helper functions for creating new streams, associating streams with clients, finding existing streams for a client and removing/deleting streams. * qemud/remote_protocol.x: Add a new 'REMOTE_STREAM' constant for the 'enum remote_message_type' for encoding stream data in wire messages. Add a new 'REMOTE_CONTINUE' constant to 'enum remote_message_status' to indicate further data stream messsages are expected to follow. Document how the remote_message_header is used to encode data streams * qemud/remote_protocol.h: Regenerate * qemud/dispatch.c: Remove assumption that a error message sent to client is always type=REMOTE_REPLY. It may now also be type=REMOTE_STREAM. Add convenient method for sending outgoing stream data packets. Log and ignore non-filtered incoming stream packets. Add a method for serializing a stream error message * qemud/dispatch.h: Add API for serializing stream errors and sending stream data packets * qemud/qemud.h: Add struct qemud_client_stream for tracking active data streams for clients. Tweak filter function operation so that it accepts a client object too. * qemud/qemud.c: Refactor code for free'ing message objects which have been fully transmitted into separate method. Release all active streams when client shuts down. Change filter function to be responsible for queueing the message --- qemud/Makefile.am | 1 + qemud/dispatch.c | 127 ++++++++++++++++++++++++++++- qemud/dispatch.h | 17 ++++ qemud/qemud.c | 54 ++++++++---- qemud/qemud.h | 32 ++++++- qemud/remote_protocol.h | 4 + qemud/remote_protocol.x | 44 +++++++++- qemud/stream.c | 210 +++++++++++++++++++++++++++++++++++++++++++++++ qemud/stream.h | 49 +++++++++++ 9 files changed, 509 insertions(+), 29 deletions(-) create mode 100644 qemud/stream.c create mode 100644 qemud/stream.h diff --git a/qemud/Makefile.am b/qemud/Makefile.am index 959ff88..5f1376e 100644 --- a/qemud/Makefile.am +++ b/qemud/Makefile.am @@ -5,6 +5,7 @@ DAEMON_SOURCES = \ qemud.c qemud.h \ remote.c remote.h \ dispatch.c dispatch.h \ + stream.c stream.h \ remote_dispatch_prototypes.h \ remote_dispatch_table.h \ remote_dispatch_args.h \ diff --git a/qemud/dispatch.c b/qemud/dispatch.c index a60f2f4..1934d24 100644 --- a/qemud/dispatch.c +++ b/qemud/dispatch.c @@ -104,7 +104,7 @@ void remoteDispatchOOMError (remote_error *rerr) { remoteDispatchStringError(rerr, VIR_ERR_NO_MEMORY, - NULL); + "out of memory"); } @@ -136,6 +136,10 @@ remoteSerializeError(struct qemud_client *client, unsigned int len; struct qemud_client_message *msg = NULL; + DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d, msg=%s", + program, version, procedure, type, serial, + rerr->message ? *rerr->message : "(none)"); + if (VIR_ALLOC(msg) < 0) goto fatal_error; @@ -206,19 +210,38 @@ fatal_error: * * Returns 0 if the error was sent, -1 upon fatal error */ -static int +int remoteSerializeReplyError(struct qemud_client *client, remote_error *rerr, remote_message_header *req) { + /* + * For data streams, errors are sent back as data streams + * For method calls, errors are sent back as method replies + */ return remoteSerializeError(client, rerr, req->prog, req->vers, req->proc, - REMOTE_REPLY, + req->type == REMOTE_STREAM ? REMOTE_STREAM : REMOTE_REPLY, req->serial); } +int +remoteSerializeStreamError(struct qemud_client *client, + remote_error *rerr, + int proc, + int serial) +{ + return remoteSerializeError(client, + rerr, + REMOTE_PROGRAM, + REMOTE_PROTOCOL_VERSION, + proc, + REMOTE_STREAM, + serial); +} + /* * @msg: the complete incoming message, whose header to decode * @@ -338,6 +361,10 @@ remoteDispatchClientRequest (struct qemud_server *server, { remote_error rerr; + DEBUG("prog=%d ver=%d type=%d satus=%d serial=%d proc=%d", + msg->hdr.prog, msg->hdr.vers, msg->hdr.type, + msg->hdr.status, msg->hdr.serial, msg->hdr.proc); + memset(&rerr, 0, sizeof rerr); /* Check version, etc. */ @@ -358,11 +385,24 @@ remoteDispatchClientRequest (struct qemud_server *server, case REMOTE_CALL: return remoteDispatchClientCall(server, client, msg); + case REMOTE_STREAM: + /* Since stream data is non-acked, async, we may continue to received + * stream packets after we closed down a stream. Just drop & ignore + * these. + */ + VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d", + msg->hdr.serial, msg->hdr.proc, msg->hdr.status); + qemudClientMessageRelease(client, msg); + break; + default: remoteDispatchFormatError (&rerr, _("type (%d) != REMOTE_CALL"), (int) msg->hdr.type); + goto error; } + return 0; + error: return remoteSerializeReplyError(client, &rerr, &msg->hdr); } @@ -532,3 +572,84 @@ xdr_error: fatal_error: return -1; } + + +int +remoteSendStreamData(struct qemud_client *client, + struct qemud_client_stream *stream, + const char *data, + size_t len) +{ + struct qemud_client_message *msg; + XDR xdr; + + DEBUG("client=%p stream=%p data=%p len=%d", client, stream, data, len); + + if (VIR_ALLOC(msg) < 0) { + return -1; + } + + /* Return header. We're re-using same message object, so + * only need to tweak type/status fields */ + msg->hdr.prog = REMOTE_PROGRAM; + msg->hdr.vers = REMOTE_PROTOCOL_VERSION; + msg->hdr.proc = stream->procedure; + msg->hdr.type = REMOTE_STREAM; + msg->hdr.serial = stream->serial; + /* + * NB + * data != NULL + len > 0 => REMOTE_CONTINUE (Sending back data) + * data != NULL + len == 0 => REMOTE_CONTINUE (Sending read EOF) + * data == NULL => REMOTE_OK (Sending finish handshake confirmation) + */ + msg->hdr.status = data ? REMOTE_CONTINUE : REMOTE_OK; + + if (remoteEncodeClientMessageHeader(msg) < 0) + goto fatal_error; + + if (data && len) { + if ((msg->bufferLength - msg->bufferOffset) < len) + goto fatal_error; + + /* Now for the payload */ + xdrmem_create (&xdr, + msg->buffer, + msg->bufferLength, + XDR_ENCODE); + + /* Skip over existing header already written */ + if (xdr_setpos(&xdr, msg->bufferOffset) == 0) + goto xdr_error; + + memcpy(msg->buffer + msg->bufferOffset, data, len); + msg->bufferOffset += len; + + /* Update the length word. */ + len = msg->bufferOffset; + if (xdr_setpos (&xdr, 0) == 0) + goto xdr_error; + + if (!xdr_u_int (&xdr, &len)) + goto xdr_error; + + xdr_destroy (&xdr); + + DEBUG("Total %d", msg->bufferOffset); + } + + /* Reset ready for I/O */ + msg->bufferLength = msg->bufferOffset; + msg->bufferOffset = 0; + + /* Put reply on end of tx queue to send out */ + qemudClientMessageQueuePush(&client->tx, msg); + qemudUpdateClientEvent(client); + + return 0; + +xdr_error: + xdr_destroy (&xdr); +fatal_error: + VIR_FREE(msg); + return -1; +} diff --git a/qemud/dispatch.h b/qemud/dispatch.h index 1d85df9..03a699c 100644 --- a/qemud/dispatch.h +++ b/qemud/dispatch.h @@ -49,6 +49,17 @@ void remoteDispatchOOMError (remote_error *rerr); void remoteDispatchConnError (remote_error *rerr, virConnectPtr conn); + +int +remoteSerializeReplyError(struct qemud_client *client, + remote_error *rerr, + remote_message_header *req); +int +remoteSerializeStreamError(struct qemud_client *client, + remote_error *rerr, + int proc, + int serial); + /* Having this here is dubious. It should be in remote.h * but qemud.c shouldn't depend on that header directly. * Refactor this later to deal with this properly. @@ -60,4 +71,10 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, void *opaque); +int +remoteSendStreamData(struct qemud_client *client, + struct qemud_client_stream *stream, + const char *data, + size_t len); + #endif /* __LIBVIRTD_DISPATCH_H__ */ diff --git a/qemud/qemud.c b/qemud/qemud.c index e657cf2..6c81dec 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -61,6 +61,7 @@ #include "conf.h" #include "event.h" #include "memory.h" +#include "stream.h" #ifdef HAVE_AVAHI #include "mdns.h" #endif @@ -1718,10 +1719,15 @@ readmore: /* Check if any filters match this message */ filter = client->filters; while (filter) { - if ((filter->query)(msg, filter->opaque)) { - qemudClientMessageQueuePush(&filter->dx, msg); + int ret; + ret = (filter->query)(client, msg, filter->opaque); + if (ret == 1) { msg = NULL; break; + } else if (ret == -1) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); + return; } filter = filter->next; } @@ -1883,6 +1889,29 @@ static ssize_t qemudClientWrite(struct qemud_client *client) { } +void +qemudClientMessageRelease(struct qemud_client *client, + struct qemud_client_message *msg) +{ + if (!msg->async) + client->nrequests--; + + /* See if the recv queue is currently throttled */ + if (!client->rx && + client->nrequests < max_client_requests) { + /* Reset message record for next RX attempt */ + memset(msg, 0, sizeof(*msg)); + client->rx = msg; + /* Get ready to receive next message */ + client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; + } else { + VIR_FREE(msg); + } + + qemudUpdateClientEvent(client); +} + + /* * Process all queued client->tx messages until * we would block on I/O @@ -1906,26 +1935,10 @@ qemudDispatchClientWrite(struct qemud_client *client) { /* Get finished reply from head of tx queue */ reply = qemudClientMessageQueueServe(&client->tx); - /* If its not an async message, then we have - * just completed an RPC request */ - if (!reply->async) - client->nrequests--; - - /* Move record to end of 'rx' ist */ - if (!client->rx && - client->nrequests < max_client_requests) { - /* Reset message record for next RX attempt */ - client->rx = reply; - client->rx->bufferOffset = 0; - client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; - } else { - VIR_FREE(reply); - } + qemudClientMessageRelease(client, reply); if (client->closing) qemudDispatchClientFailure(client); - else - qemudUpdateClientEvent(client); } } } @@ -2137,6 +2150,9 @@ static void qemudFreeClient(struct qemud_client *client) { VIR_FREE(msg); } + while (client->streams) + remoteRemoveClientStream(client, client->streams); + if (client->conn) virConnectClose(client->conn); virMutexDestroy(&client->lock); diff --git a/qemud/qemud.h b/qemud/qemud.h index 254db44..8ef5871 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -129,26 +129,43 @@ struct qemud_client_message { unsigned int bufferLength; unsigned int bufferOffset; - int async : 1; + unsigned int async : 1; remote_message_header hdr; struct qemud_client_message *next; }; +struct qemud_client; + /* Allow for filtering of incoming messages to a custom * dispatch processing queue, instead of client->dx. */ -typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque); +typedef int (*qemud_client_filter_func)(struct qemud_client *client, + struct qemud_client_message *msg, void *opaque); struct qemud_client_filter { qemud_client_filter_func query; void *opaque; - struct qemud_client_message *dx; - struct qemud_client_filter *next; }; +struct qemud_client_stream { + virStreamPtr st; + int procedure; + int serial; + + unsigned int recvEOF : 1; + unsigned int closed : 1; + + struct qemud_client_filter filter; + + struct qemud_client_message *rx; + int tx; + + struct qemud_client_stream *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -197,6 +214,10 @@ struct qemud_client { * end up on the 'dx' queue */ struct qemud_client_filter *filters; + /* Data streams */ + struct qemud_client_stream *streams; + + /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is * called, it will be set back to NULL if that succeeds. @@ -275,6 +296,9 @@ qemudClientMessageQueuePush(struct qemud_client_message **queue, struct qemud_client_message * qemudClientMessageQueueServe(struct qemud_client_message **queue); +void +qemudClientMessageRelease(struct qemud_client *client, + struct qemud_client_message *msg); #if HAVE_POLKIT diff --git a/qemud/remote_protocol.h b/qemud/remote_protocol.h index 2e5bc81..2ff9075 100644 --- a/qemud/remote_protocol.h +++ b/qemud/remote_protocol.h @@ -16,6 +16,8 @@ extern "C" { #include "internal.h" #include <arpa/inet.h> #define REMOTE_MESSAGE_MAX 262144 +#define REMOTE_MESSAGE_HEADER_MAX 24 +#define REMOTE_MESSAGE_PAYLOAD_MAX 262120 #define REMOTE_STRING_MAX 65536 typedef char *remote_nonnull_string; @@ -1576,12 +1578,14 @@ enum remote_message_type { REMOTE_CALL = 0, REMOTE_REPLY = 1, REMOTE_MESSAGE = 2, + REMOTE_STREAM = 3, }; typedef enum remote_message_type remote_message_type; enum remote_message_status { REMOTE_OK = 0, REMOTE_ERROR = 1, + REMOTE_CONTINUE = 2, }; typedef enum remote_message_status remote_message_status; #define REMOTE_MESSAGE_HEADER_XDR_LEN 4 diff --git a/qemud/remote_protocol.x b/qemud/remote_protocol.x index 8f9b6db..e24c428 100644 --- a/qemud/remote_protocol.x +++ b/qemud/remote_protocol.x @@ -44,6 +44,12 @@ /* Maximum total message size (serialised). */ const REMOTE_MESSAGE_MAX = 262144; +/* Size of struct remote_message_header (serialized)*/ +const REMOTE_MESSAGE_HEADER_MAX = 24; + +/* Size of message payload */ +const REMOTE_MESSAGE_PAYLOAD_MAX = 262120; + /* Length of long, but not unbounded, strings. * This is an arbitrary limit designed to stop the decoder from trying * to allocate unbounded amounts of memory when fed with a bad message. @@ -1448,8 +1454,27 @@ enum remote_procedure { * * serial matches that from the corresponding REMOTE_CALL * * - type == REMOTE_MESSAGE - * * serial matches that from the corresponding REMOTE_CALL, or zero + * * serial is always zero + * + * - type == REMOTE_STREAM + * * serial matches that from the corresponding REMOTE_CALL * + * and the 'status' field varies according to: + * + * - type == REMOTE_CALL + * * REMOTE_OK always + * + * - type == REMOTE_REPLY + * * REMOTE_OK if RPC finished successfully + * * REMOTE_ERROR if something failed + * + * - type == REMOTE_MESSAGE + * * REMOTE_OK always + * + * - type == REMOTE_STREAM + * * REMOTE_CONTINUE if more data is following + * * REMOTE_OK if stream is complete + * * REMOTE_ERROR if stream had an error * * Payload varies according to type and status: * @@ -1468,6 +1493,13 @@ enum remote_procedure { * * status == REMOTE_ERROR * remote_error Error information * + * - type == REMOTE_STREAM + * * status == REMOTE_CONTINUE + * byte[] raw stream data + * * status == REMOTE_ERROR + * remote_error error information + * * status == REMOTE_OK + * <empty> */ enum remote_message_type { /* client -> server. args from a method call */ @@ -1475,7 +1507,9 @@ enum remote_message_type { /* server -> client. reply/error from a method call */ REMOTE_REPLY = 1, /* either direction. async notification */ - REMOTE_MESSAGE = 2 + REMOTE_MESSAGE = 2, + /* either direction. stream data packet */ + REMOTE_STREAM = 3 }; enum remote_message_status { @@ -1487,7 +1521,11 @@ enum remote_message_status { /* For replies, indicates that an error happened, and a struct * remote_error follows. */ - REMOTE_ERROR = 1 + REMOTE_ERROR = 1, + + /* For streams, indicates that more data is still expected + */ + REMOTE_CONTINUE = 2 }; /* 4 byte length word per header */ diff --git a/qemud/stream.c b/qemud/stream.c new file mode 100644 index 0000000..1644a1b --- /dev/null +++ b/qemud/stream.c @@ -0,0 +1,210 @@ +/* + * stream.c: APIs for managing client streams + * + * Copyright (C) 2009 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + + +#include <config.h> + +#include "stream.h" +#include "memory.h" +#include "dispatch.h" +#include "logging.h" + + +/* + * @client: a locked client object + * + * Invoked by the main loop when filtering incoming messages. + * + * Returns 1 if the message was processed, 0 if skipped, + * -1 on fatal client error + */ +static int +remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED, + struct qemud_client_message *msg ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + return 0; +} + + +/* + * @conn: a connection object to associate the stream with + * @hdr: the method call to associate with the stram + * + * Creates a new stream for this conn + * + * Returns a new stream object, or NULL upon OOM + */ +struct qemud_client_stream * +remoteCreateClientStream(virConnectPtr conn, + remote_message_header *hdr) +{ + struct qemud_client_stream *stream; + + DEBUG("proc=%d serial=%d", hdr->proc, hdr->serial); + + if (VIR_ALLOC(stream) < 0) + return NULL; + + stream->procedure = hdr->proc; + stream->serial = hdr->serial; + + stream->st = virStreamNew(conn, VIR_STREAM_NONBLOCK); + if (!stream->st) { + VIR_FREE(stream); + return NULL; + } + + stream->filter.query = remoteStreamFilter; + stream->filter.opaque = stream; + + return stream; +} + +/* + * @stream: an unused client stream + * + * Frees the memory associated with this inactive client + * stream + */ +void remoteFreeClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + struct qemud_client_message *msg; + + if (!stream) + return; + + DEBUG("proc=%d serial=%d", stream->procedure, stream->serial); + + msg = stream->rx; + while (msg) { + struct qemud_client_message *tmp = msg->next; + qemudClientMessageRelease(client, msg); + msg = tmp; + } + + virStreamFree(stream->st); + VIR_FREE(stream); +} + + +/* + * @client: a locked client to add the stream to + * @stream: a stream to add + */ +int remoteAddClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + struct qemud_client_stream *tmp = client->streams; + + DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); + + if (tmp) { + while (tmp->next) + tmp = tmp->next; + tmp->next = stream; + } else { + client->streams = stream; + } + + stream->filter.next = client->filters; + client->filters = &stream->filter; + + stream->tx = 1; + + return 0; +} + + +/* + * @client: a locked client object + * @procedure: procedure associated with the stream + * @serial: serial number associated with the stream + * + * Finds a existing active stream + * + * Returns a stream object matching the procedure+serial number, or NULL + */ +struct qemud_client_stream * +remoteFindClientStream(struct qemud_client *client, + virStreamPtr st) +{ + struct qemud_client_stream *stream = client->streams; + + while (stream) { + if (stream->st == st) + return stream; + stream = stream->next; + } + + return NULL; +} + + +/* + * @client: a locked client object + * @stream: an inactive, closed stream object + * + * Removes a stream from the list of active streams for the client + * + * Returns 0 if the stream was removd, -1 if it doesn't exist + */ +int +remoteRemoveClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); + + struct qemud_client_stream *curr = client->streams; + struct qemud_client_stream *prev = NULL; + struct qemud_client_filter *filter = NULL; + + if (client->filters == &stream->filter) { + client->filters = client->filters->next; + } else { + filter = client->filters; + while (filter) { + if (filter->next == &stream->filter) { + filter->next = filter->next->next; + break; + } + } + } + + if (!stream->closed) + virStreamAbort(stream->st); + + while (curr) { + if (curr == stream) { + if (prev) + prev->next = curr->next; + else + client->streams = curr->next; + remoteFreeClientStream(client, stream); + return 0; + } + prev = curr; + curr = curr->next; + } + return -1; +} diff --git a/qemud/stream.h b/qemud/stream.h new file mode 100644 index 0000000..fe5ce6f --- /dev/null +++ b/qemud/stream.h @@ -0,0 +1,49 @@ +/* + * stream.h: APIs for managing client streams + * + * Copyright (C) 2009 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange <berrange@xxxxxxxxxx> + */ + + +#ifndef __LIBVIRTD_STREAM_H__ +#define __LIBVIRTD_STREAM_H__ + +#include "qemud.h" + + + +struct qemud_client_stream * +remoteCreateClientStream(virConnectPtr conn, + remote_message_header *hdr); + +void remoteFreeClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +int remoteAddClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +struct qemud_client_stream * +remoteFindClientStream(struct qemud_client *client, + virStreamPtr stream); + +int +remoteRemoveClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +#endif /* __LIBVIRTD_STREAM_H__ */ -- 1.6.2.5 -- Libvir-list mailing list Libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list