From: "Daniel P. Berrange" <berrange@xxxxxxxxxx> --- libvirt-gobject/Makefile.am | 8 +- libvirt-gobject/libvirt-gobject-output-stream.c | 240 +++++++++++++++++++++++ libvirt-gobject/libvirt-gobject-output-stream.h | 68 +++++++ libvirt-gobject/libvirt-gobject-stream.c | 115 +++++++++++- libvirt-gobject/libvirt-gobject-stream.h | 27 ++- 5 files changed, 447 insertions(+), 11 deletions(-) create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.c create mode 100644 libvirt-gobject/libvirt-gobject-output-stream.h diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am index 0eef9c8..ec7b454 100644 --- a/libvirt-gobject/Makefile.am +++ b/libvirt-gobject/Makefile.am @@ -45,8 +45,7 @@ GOBJECT_GENERATED_FILES = \ libvirt_gobject_1_0_ladir = $(includedir)/libvirt-gobject-1.0/libvirt-gobject libvirt_gobject_1_0_la_HEADERS = \ - $(GOBJECT_HEADER_FILES) \ - libvirt-gobject-input-stream.h + $(GOBJECT_HEADER_FILES) nodist_libvirt_gobject_1_0_la_HEADERS = \ libvirt-gobject-enums.h libvirt_gobject_1_0_la_SOURCES = \ @@ -54,7 +53,10 @@ libvirt_gobject_1_0_la_SOURCES = \ $(GOBJECT_SOURCE_FILES) \ libvirt-gobject-domain-device-private.h \ libvirt-gobject-compat.h \ - libvirt-gobject-input-stream.c + libvirt-gobject-input-stream.h \ + libvirt-gobject-input-stream.c \ + libvirt-gobject-output-stream.h \ + libvirt-gobject-output-stream.c nodist_libvirt_gobject_1_0_la_SOURCES = \ $(GOBJECT_GENERATED_FILES) libvirt_gobject_1_0_la_CFLAGS = \ diff --git a/libvirt-gobject/libvirt-gobject-output-stream.c b/libvirt-gobject/libvirt-gobject-output-stream.c new file mode 100644 index 0000000..30ee519 --- /dev/null +++ b/libvirt-gobject/libvirt-gobject-output-stream.c @@ -0,0 +1,240 @@ +/* + * libvirt-gobject-output-stream.h: libvirt gobject integration + * + * Copyright (C) 2011 Red Hat + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: Daniel P. Berrange <berrange@xxxxxxxxxx> + * Marc-André Lureau <marcandre.lureau@xxxxxxxxxx> + */ + +#include <config.h> + +#include <libvirt/virterror.h> +#include <string.h> + +#include "libvirt-glib/libvirt-glib.h" +#include "libvirt-gobject/libvirt-gobject.h" +#include "libvirt-gobject-output-stream.h" + +extern gboolean debugFlag; + +#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0) + +#define gvir_output_stream_get_type _gvir_output_stream_get_type +G_DEFINE_TYPE(GVirOutputStream, gvir_output_stream, G_TYPE_OUTPUT_STREAM); + +enum +{ + PROP_0, + PROP_STREAM +}; + +struct _GVirOutputStreamPrivate +{ + GVirStream *stream; + + /* pending operation metadata */ + GSimpleAsyncResult *result; + GCancellable *cancellable; + const void * buffer; + gsize count; +}; + +static void gvir_output_stream_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object); + + switch (prop_id) { + case PROP_STREAM: + g_value_set_object(value, stream->priv->stream); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + } +} + +static void gvir_output_stream_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object); + + switch (prop_id) { + case PROP_STREAM: + stream->priv->stream = g_value_get_object(value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + } +} + +static void gvir_output_stream_finalize(GObject *object) +{ + GVirOutputStream *stream = GVIR_OUTPUT_STREAM(object); + + DEBUG("Finalize output stream GVirStream=%p", stream->priv->stream); + stream->priv->stream = NULL; // unowned + + if (G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize) + (*G_OBJECT_CLASS(gvir_output_stream_parent_class)->finalize)(object); +} + +static void +gvir_output_stream_write_ready(virStreamPtr st G_GNUC_UNUSED, + int events, + void *opaque) +{ + GVirOutputStream *stream = GVIR_OUTPUT_STREAM(opaque); + GVirOutputStreamPrivate *priv = stream->priv; + GSimpleAsyncResult *simple; + GError *error = NULL; + gssize result; + + g_return_if_fail(events & VIR_STREAM_EVENT_WRITABLE); + + result = gvir_stream_send(priv->stream, priv->buffer, priv->count, + priv->cancellable, &error); + + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_warn_if_reached(); + return; + } + + simple = stream->priv->result; + stream->priv->result = NULL; + + if (result >= 0) + g_simple_async_result_set_op_res_gssize(simple, result); + + if (error) + g_simple_async_result_take_error(simple, error); + + if (priv->cancellable) { + g_object_unref(stream->priv->cancellable); + priv->cancellable = NULL; + } + + g_simple_async_result_complete(simple); + g_object_unref(simple); + + return; +} + +static void gvir_output_stream_write_async(GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority G_GNUC_UNUSED, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream); + virStreamPtr handle; + + g_return_if_fail(GVIR_IS_OUTPUT_STREAM(stream)); + g_return_if_fail(output_stream->priv->result == NULL); + + g_object_get(output_stream->priv->stream, "handle", &handle, NULL); + + if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_WRITABLE, + gvir_output_stream_write_ready, stream, NULL) < 0) { + g_simple_async_report_error_in_idle(G_OBJECT(stream), + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + "Couldn't add event callback %s", + G_STRFUNC); + goto end; + } + + output_stream->priv->result = + g_simple_async_result_new(G_OBJECT(stream), callback, user_data, + gvir_output_stream_write_async); + if (cancellable) + g_object_ref(cancellable); + output_stream->priv->cancellable = cancellable; + output_stream->priv->buffer = buffer; + output_stream->priv->count = count; + +end: + virStreamFree(handle); +} + + +static gssize gvir_output_stream_write_finish(GOutputStream *stream, + GAsyncResult *result, + GError **error G_GNUC_UNUSED) +{ + GVirOutputStream *output_stream = GVIR_OUTPUT_STREAM(stream); + GSimpleAsyncResult *simple; + virStreamPtr handle; + gssize count; + + g_return_val_if_fail(GVIR_IS_OUTPUT_STREAM(stream), -1); + g_object_get(output_stream->priv->stream, "handle", &handle, NULL); + + simple = G_SIMPLE_ASYNC_RESULT(result); + + g_warn_if_fail(g_simple_async_result_get_source_tag(simple) == gvir_output_stream_write_async); + + count = g_simple_async_result_get_op_res_gssize(simple); + + virStreamEventRemoveCallback(handle); + virStreamFree(handle); + + return count; +} + + +static void gvir_output_stream_class_init(GVirOutputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS(klass); + GOutputStreamClass *goutputstream_class = G_OUTPUT_STREAM_CLASS(klass); + + g_type_class_add_private(klass, sizeof(GVirOutputStreamPrivate)); + + gobject_class->finalize = gvir_output_stream_finalize; + gobject_class->get_property = gvir_output_stream_get_property; + gobject_class->set_property = gvir_output_stream_set_property; + + goutputstream_class->write_fn = NULL; + goutputstream_class->write_async = gvir_output_stream_write_async; + goutputstream_class->write_finish = gvir_output_stream_write_finish; + + g_object_class_install_property(gobject_class, PROP_STREAM, + g_param_spec_object("stream", + "stream", + "GVirStream", + GVIR_TYPE_STREAM, G_PARAM_CONSTRUCT_ONLY | + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static void gvir_output_stream_init(GVirOutputStream *stream) +{ + stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamPrivate); +} + +GVirOutputStream* _gvir_output_stream_new(GVirStream *stream) +{ + return GVIR_OUTPUT_STREAM(g_object_new(GVIR_TYPE_OUTPUT_STREAM, "stream", stream, NULL)); +} diff --git a/libvirt-gobject/libvirt-gobject-output-stream.h b/libvirt-gobject/libvirt-gobject-output-stream.h new file mode 100644 index 0000000..0ca0053 --- /dev/null +++ b/libvirt-gobject/libvirt-gobject-output-stream.h @@ -0,0 +1,68 @@ +/* + * libvirt-gobject-output-stream.h: libvirt gobject integration + * + * Copyright (C) 2011 Red Hat + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: Daniel P. Berrange <berrange@xxxxxxxxxx> + * Marc-André Lureau <marcandre.lureau@xxxxxxxxxx> + */ + +#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD) +#error "Only <libvirt-gobject/libvirt-gobject.h> can be included directly." +#endif + +#ifndef __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__ +#define __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__ + +#include <gio/gio.h> +#include "libvirt-gobject-stream.h" + +G_BEGIN_DECLS + +#define GVIR_TYPE_OUTPUT_STREAM (_gvir_output_stream_get_type ()) +#define GVIR_OUTPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_CAST ((inst), \ + GVIR_TYPE_OUTPUT_STREAM, GVirOutputStream)) +#define GVIR_OUTPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_CAST ((class), \ + GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass)) +#define GVIR_IS_OUTPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_TYPE ((inst), \ + GVIR_TYPE_OUTPUT_STREAM)) +#define GVIR_IS_OUTPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_TYPE ((class), \ + GVIR_TYPE_OUTPUT_STREAM)) +#define GVIR_OUTPUT_STREAM_GET_CLASS(inst) (G_TYPE_INSTANCE_GET_CLASS ((inst), \ + GVIR_TYPE_OUTPUT_STREAM, GVirOutputStreamClass)) + +typedef struct _GVirOutputStreamPrivate GVirOutputStreamPrivate; +typedef struct _GVirOutputStreamClass GVirOutputStreamClass; +typedef struct _GVirOutputStream GVirOutputStream; + +struct _GVirOutputStreamClass +{ + GOutputStreamClass parent_class; +}; + +struct _GVirOutputStream +{ + GOutputStream parent_instance; + GVirOutputStreamPrivate *priv; +}; + +GType _gvir_output_stream_get_type (void) G_GNUC_CONST; +GVirOutputStream * _gvir_output_stream_new (GVirStream *stream); + +G_END_DECLS + +#endif /* __LIBVIRT_GOBJECT_OUTPUT_STREAM_H__ */ diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c index 30673aa..0d1c2d1 100644 --- a/libvirt-gobject/libvirt-gobject-stream.c +++ b/libvirt-gobject/libvirt-gobject-stream.c @@ -32,6 +32,7 @@ #include "libvirt-gobject-compat.h" #include "libvirt-gobject/libvirt-gobject-input-stream.h" +#include "libvirt-gobject/libvirt-gobject-output-stream.h" extern gboolean debugFlag; @@ -44,7 +45,7 @@ struct _GVirStreamPrivate { virStreamPtr handle; GInputStream *input_stream; - gboolean in_dispose; + GOutputStream *output_stream; }; G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM); @@ -77,6 +78,17 @@ static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream) } +static GOutputStream* gvir_stream_get_output_stream(GIOStream *io_stream) +{ + GVirStream *self = GVIR_STREAM(io_stream); + + if (self->priv->output_stream == NULL) + self->priv->output_stream = (GOutputStream *)_gvir_output_stream_new(self); + + return self->priv->output_stream; +} + + static gboolean gvir_stream_close(GIOStream *io_stream, GCancellable *cancellable, G_GNUC_UNUSED GError **error) { @@ -85,8 +97,8 @@ static gboolean gvir_stream_close(GIOStream *io_stream, if (self->priv->input_stream) g_input_stream_close(self->priv->input_stream, cancellable, NULL); - if (self->priv->in_dispose) - return TRUE; + if (self->priv->output_stream) + g_output_stream_close(self->priv->output_stream, cancellable, NULL); return TRUE; /* FIXME: really close the stream? */ } @@ -201,6 +213,7 @@ static void gvir_stream_class_init(GVirStreamClass *klass) object_class->set_property = gvir_stream_set_property; stream_class->get_input_stream = gvir_stream_get_input_stream; + stream_class->get_output_stream = gvir_stream_get_output_stream; stream_class->close_fn = gvir_stream_close; stream_class->close_async = gvir_stream_close_async; stream_class->close_finish = gvir_stream_close_finish; @@ -339,3 +352,99 @@ gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user return r; } + + +/** + * gvir_stream_send: + * @stream: the stream + * @buffer: a buffer to write data from (which should be at least @size + * bytes long). + * @size: the number of bytes you want to write to the stream + * @cancellable: (allow-none): a %GCancellable or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Send data (up to @size bytes) from a stream. + * On error -1 is returned and @error is set accordingly. + * + * gvir_stream_send() can return any number of bytes, up to + * @size. If more than @size bytes have been sendd, the additional + * data will be returned in future calls to gvir_stream_send(). + * + * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be + * returned. + * + * Returns: Number of bytes read, or 0 if the end of stream reached, + * or -1 on error. + */ +gssize gvir_stream_send(GVirStream *self, const gchar *buffer, gsize size, + GCancellable *cancellable, GError **error) +{ + int got; + + g_return_val_if_fail(GVIR_IS_STREAM(self), -1); + g_return_val_if_fail(buffer != NULL, -1); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + got = virStreamSend(self->priv->handle, buffer, size); + + if (got == -2) { /* blocking */ + g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL); + } else if (got < 0) { + g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + "Got virStreamRecv error in %s", G_STRFUNC); + } + + return got; +} + +struct stream_source_helper { + GVirStream *self; + GVirStreamSourceFunc func; + gpointer user_data; +}; + +static int +stream_source(virStreamPtr st G_GNUC_UNUSED, + char *bytes, size_t nbytes, void *opaque) +{ + struct stream_source_helper *helper = opaque; + + return helper->func(helper->self, bytes, nbytes, helper->user_data); +} + +/** + * gvir_stream_send_all: + * @stream: the stream + * @func: (scope notified): the callback for writing data to application + * @user_data: (closure): data to be passed to @callback + * Returns: the number of bytes consumed or -1 upon error + * + * Send the entire data stream, sending the data to the + * requested data source. This is simply a convenient alternative + * to virStreamRecv, for apps that do blocking-I/o. + */ +gssize +gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_data, GError **err) +{ + struct stream_source_helper helper = { + .self = self, + .func = func, + .user_data = user_data + }; + int r; + + g_return_val_if_fail(GVIR_IS_STREAM(self), -1); + g_return_val_if_fail(func != NULL, -1); + + r = virStreamSendAll(self->priv->handle, stream_source, &helper); + if (r < 0) { + if (err != NULL) + *err = gvir_error_new_literal(GVIR_STREAM_ERROR, + 0, + "Unable to perform SendAll"); + } + + return r; +} diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h index 35526db..5a1ee68 100644 --- a/libvirt-gobject/libvirt-gobject-stream.h +++ b/libvirt-gobject/libvirt-gobject-stream.h @@ -71,17 +71,34 @@ struct _GVirStreamClass * Returns: the number of bytes filled, 0 upon end * of file, or -1 upon error */ -typedef gint (* GVirStreamSinkFunc) (GVirStream *stream, - const gchar *buf, - gsize nbytes, - gpointer user_data); +typedef gint (* GVirStreamSinkFunc)(GVirStream *stream, + const gchar *buf, + gsize nbytes, + gpointer user_data); + +/** + * GVirStreamSourceFunc: + * @stream: a #GVirStream + * @buf: (out) (array length=nbytes) (transfer none): data pointer + * @nbytes: data size + * @user_data: user data passed to the function + * Returns: the number of bytes filled, 0 upon end + * of file, or -1 upon error + */ +typedef gint (* GVirStreamSourceFunc)(GVirStream *stream, + gchar *buf, + gsize nbytes, + gpointer user_data); GType gvir_stream_get_type(void); GType gvir_stream_handle_get_type(void); -gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err); +gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error); gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error); +gssize gvir_stream_send_all(GVirStream *stream, GVirStreamSourceFunc func, gpointer user_data, GError **error); +gssize gvir_stream_send(GVirStream *stream, const gchar *buffer, gsize size, GCancellable *cancellable, GError **error); + G_END_DECLS #endif /* __LIBVIRT_GOBJECT_STREAM_H__ */ -- 1.7.6.4 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list