Allows to read async from a stream with GVirInputStream. This is modelled after GSocket. --- libvirt-gobject/Makefile.am | 2 + libvirt-gobject/libvirt-gobject-connection.c | 2 +- libvirt-gobject/libvirt-gobject-input-stream.c | 239 ++++++++++++++++++++++++ libvirt-gobject/libvirt-gobject-input-stream.h | 68 +++++++ libvirt-gobject/libvirt-gobject-stream.c | 130 +++++++++++++- libvirt-gobject/libvirt-gobject-stream.h | 10 +- 6 files changed, 443 insertions(+), 8 deletions(-) create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.c create mode 100644 libvirt-gobject/libvirt-gobject-input-stream.h diff --git a/libvirt-gobject/Makefile.am b/libvirt-gobject/Makefile.am index 8147db2..7013675 100644 --- a/libvirt-gobject/Makefile.am +++ b/libvirt-gobject/Makefile.am @@ -40,6 +40,8 @@ libvirt_gobject_1_0_la_HEADERS = \ libvirt_gobject_1_0_la_SOURCES = \ $(libvirt_gobject_1_0_la_HEADERS) \ libvirt-gobject-enums.c \ + libvirt-gobject-input-stream.c \ + libvirt-gobject-input-stream.h \ $(GOBJECT_SOURCE_FILES) libvirt_gobject_1_0_la_CFLAGS = \ -DDATADIR="\"$(datadir)\"" \ diff --git a/libvirt-gobject/libvirt-gobject-connection.c b/libvirt-gobject/libvirt-gobject-connection.c index 5fc0a9e..95cd878 100644 --- a/libvirt-gobject/libvirt-gobject-connection.c +++ b/libvirt-gobject/libvirt-gobject-connection.c @@ -1151,7 +1151,7 @@ GVirStream *gvir_connection_get_stream(GVirConnection *self, klass = GVIR_CONNECTION_GET_CLASS(self); g_return_val_if_fail(klass->stream_new, NULL); - virStreamPtr st = virStreamNew(self->priv->conn, flags); + virStreamPtr st = virStreamNew(self->priv->conn, flags | VIR_STREAM_NONBLOCK); return klass->stream_new(self, st); } diff --git a/libvirt-gobject/libvirt-gobject-input-stream.c b/libvirt-gobject/libvirt-gobject-input-stream.c new file mode 100644 index 0000000..a76d670 --- /dev/null +++ b/libvirt-gobject/libvirt-gobject-input-stream.c @@ -0,0 +1,239 @@ +/* + * libvirt-gobject-input-stream.h: libvirt gobject integration + * + * Copyright (C) 2011 Red Hat + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: Daniel P. Berrange <berrange@xxxxxxxxxx> + * Marc-André Lureau <marcandre.lureau@xxxxxxxxxx> + */ + +#include <config.h> + +#include <libvirt/virterror.h> +#include <string.h> + +#include "libvirt-glib/libvirt-glib.h" +#include "libvirt-gobject/libvirt-gobject.h" +#include "libvirt-gobject-input-stream.h" + +extern gboolean debugFlag; + +#define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0) + +#define gvir_input_stream_get_type _gvir_input_stream_get_type +G_DEFINE_TYPE(GVirInputStream, gvir_input_stream, G_TYPE_INPUT_STREAM); + +enum +{ + PROP_0, + PROP_STREAM +}; + +struct _GVirInputStreamPrivate +{ + GVirStream *stream; + + /* pending operation metadata */ + GSimpleAsyncResult *result; + GCancellable *cancellable; + gpointer buffer; + gsize count; +}; + +static void gvir_input_stream_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + GVirInputStream *stream = GVIR_INPUT_STREAM(object); + + switch (prop_id) { + case PROP_STREAM: + g_value_set_object(value, stream->priv->stream); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + } +} + +static void gvir_input_stream_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + GVirInputStream *stream = GVIR_INPUT_STREAM(object); + + switch (prop_id) { + case PROP_STREAM: + stream->priv->stream = g_value_dup_object(value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + } +} + +static void gvir_input_stream_finalize(GObject *object) +{ + GVirInputStream *stream = GVIR_INPUT_STREAM(object); + + if (stream->priv->stream) + g_object_unref(stream->priv->stream); + + if (G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize) + (*G_OBJECT_CLASS(gvir_input_stream_parent_class)->finalize)(object); +} + +static void +gvir_input_stream_read_ready (G_GNUC_UNUSED virStreamPtr st, + int events, void *opaque) +{ + GVirInputStream *stream = GVIR_INPUT_STREAM(opaque); + GVirInputStreamPrivate *priv = stream->priv; + GSimpleAsyncResult *simple; + GError *error = NULL; + gssize result; + + g_return_if_fail(events & VIR_STREAM_EVENT_READABLE); + + result = gvir_stream_receive(priv->stream, priv->buffer, priv->count, + priv->cancellable, &error); + + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_warn_if_reached(); + return; + } + + simple = stream->priv->result; + stream->priv->result = NULL; + + if (result >= 0) + g_simple_async_result_set_op_res_gssize(simple, result); + + if (error) + g_simple_async_result_take_error(simple, error); + + if (priv->cancellable) { + g_object_unref(stream->priv->cancellable); + priv->cancellable = NULL; + } + + g_simple_async_result_complete(simple); + g_object_unref(simple); + + return; +} + +static void gvir_input_stream_read_async(GInputStream *stream, + void *buffer, + gsize count, + G_GNUC_UNUSED int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream); + virStreamPtr handle; + + g_return_if_fail(GVIR_IS_INPUT_STREAM(stream)); + g_return_if_fail(input_stream->priv->result == NULL); + + g_object_get(input_stream->priv->stream, "handle", &handle, NULL); + + if (virStreamEventAddCallback(handle, VIR_STREAM_EVENT_READABLE, + gvir_input_stream_read_ready, stream, NULL) < 0) { + g_simple_async_report_error_in_idle(G_OBJECT(stream), + callback, + user_data, + G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + "Couldn't add event callback %s", + G_STRFUNC); + goto end; + } + + input_stream->priv->result = + g_simple_async_result_new(G_OBJECT(stream), callback, user_data, + gvir_input_stream_read_async); + if (cancellable) + g_object_ref(cancellable); + input_stream->priv->cancellable = cancellable; + input_stream->priv->buffer = buffer; + input_stream->priv->count = count; + +end: + virStreamFree(handle); +} + + +static gssize gvir_input_stream_read_finish(GInputStream *stream, + GAsyncResult *result, + G_GNUC_UNUSED GError **error) +{ + GVirInputStream *input_stream = GVIR_INPUT_STREAM(stream); + GSimpleAsyncResult *simple; + virStreamPtr handle; + gssize count; + + g_return_val_if_fail(GVIR_IS_INPUT_STREAM(stream), -1); + g_object_get(input_stream->priv->stream, "handle", &handle, NULL); + + simple = G_SIMPLE_ASYNC_RESULT(result); + + g_warn_if_fail(g_simple_async_result_get_source_tag(simple) == gvir_input_stream_read_async); + + count = g_simple_async_result_get_op_res_gssize(simple); + + virStreamEventRemoveCallback(handle); + virStreamFree(handle); + + return count; +} + + +static void gvir_input_stream_class_init(GVirInputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS(klass); + GInputStreamClass *ginputstream_class = G_INPUT_STREAM_CLASS(klass); + + g_type_class_add_private(klass, sizeof(GVirInputStreamPrivate)); + + gobject_class->finalize = gvir_input_stream_finalize; + gobject_class->get_property = gvir_input_stream_get_property; + gobject_class->set_property = gvir_input_stream_set_property; + + ginputstream_class->read_fn = NULL; + ginputstream_class->read_async = gvir_input_stream_read_async; + ginputstream_class->read_finish = gvir_input_stream_read_finish; + + g_object_class_install_property(gobject_class, PROP_STREAM, + g_param_spec_object("stream", + "stream", + "GVirStream", + GVIR_TYPE_STREAM, G_PARAM_CONSTRUCT_ONLY | + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} + +static void gvir_input_stream_init(GVirInputStream *stream) +{ + stream->priv = G_TYPE_INSTANCE_GET_PRIVATE(stream, GVIR_TYPE_INPUT_STREAM, GVirInputStreamPrivate); +} + +GVirInputStream* _gvir_input_stream_new(GVirStream *stream) +{ + return GVIR_INPUT_STREAM(g_object_new(GVIR_TYPE_INPUT_STREAM, "stream", stream, NULL)); +} diff --git a/libvirt-gobject/libvirt-gobject-input-stream.h b/libvirt-gobject/libvirt-gobject-input-stream.h new file mode 100644 index 0000000..e8002b9 --- /dev/null +++ b/libvirt-gobject/libvirt-gobject-input-stream.h @@ -0,0 +1,68 @@ +/* + * libvirt-gobject-input-stream.h: libvirt gobject integration + * + * Copyright (C) 2011 Red Hat + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Authors: Daniel P. Berrange <berrange@xxxxxxxxxx> + * Marc-André Lureau <marcandre.lureau@xxxxxxxxxx> + */ + +#if !defined(__LIBVIRT_GOBJECT_H__) && !defined(LIBVIRT_GOBJECT_BUILD) +#error "Only <libvirt-gobject/libvirt-gobject.h> can be included directly." +#endif + +#ifndef __LIBVIRT_GOBJECT_INPUT_STREAM_H__ +#define __LIBVIRT_GOBJECT_INPUT_STREAM_H__ + +#include <gio/gio.h> +#include "libvirt-gobject-stream.h" + +G_BEGIN_DECLS + +#define GVIR_TYPE_INPUT_STREAM (_gvir_input_stream_get_type ()) +#define GVIR_INPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_CAST ((inst), \ + GVIR_TYPE_INPUT_STREAM, GVirInputStream)) +#define GVIR_INPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_CAST ((class), \ + GVIR_TYPE_INPUT_STREAM, GVirInputStreamClass)) +#define GVIR_IS_INPUT_STREAM(inst) (G_TYPE_CHECK_INSTANCE_TYPE ((inst), \ + GVIR_TYPE_INPUT_STREAM)) +#define GVIR_IS_INPUT_STREAM_CLASS(class) (G_TYPE_CHECK_CLASS_TYPE ((class), \ + GVIR_TYPE_INPUT_STREAM)) +#define GVIR_INPUT_STREAM_GET_CLASS(inst) (G_TYPE_INSTANCE_GET_CLASS ((inst), \ + GVIR_TYPE_INPUT_STREAM, GVirInputStreamClass)) + +typedef struct _GVirInputStreamPrivate GVirInputStreamPrivate; +typedef struct _GVirInputStreamClass GVirInputStreamClass; +typedef struct _GVirInputStream GVirInputStream; + +struct _GVirInputStreamClass +{ + GInputStreamClass parent_class; +}; + +struct _GVirInputStream +{ + GInputStream parent_instance; + GVirInputStreamPrivate *priv; +}; + +GType _gvir_input_stream_get_type (void) G_GNUC_CONST; +GVirInputStream * _gvir_input_stream_new (GVirStream *stream); + +G_END_DECLS + +#endif /* __LIBVIRT_GOBJECT_INPUT_STREAM_H__ */ diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c index 519d733..88e3a40 100644 --- a/libvirt-gobject/libvirt-gobject-stream.c +++ b/libvirt-gobject/libvirt-gobject-stream.c @@ -30,6 +30,8 @@ #include "libvirt-glib/libvirt-glib.h" #include "libvirt-gobject/libvirt-gobject.h" +#include "libvirt-gobject/libvirt-gobject-input-stream.h" + extern gboolean debugFlag; #define DEBUG(fmt, ...) do { if (G_UNLIKELY(debugFlag)) g_debug(fmt, ## __VA_ARGS__); } while (0) @@ -39,10 +41,12 @@ extern gboolean debugFlag; struct _GVirStreamPrivate { - virStreamPtr handle; + virStreamPtr handle; + GInputStream *input_stream; + gboolean in_dispose; }; -G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_OBJECT); +G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM); enum { @@ -60,6 +64,71 @@ gvir_stream_error_quark(void) return g_quark_from_static_string("vir-g-stream"); } + +static GInputStream* gvir_stream_get_input_stream(GIOStream *io_stream) +{ + GVirStream *self = GVIR_STREAM(io_stream); + + if (self->priv->input_stream == NULL) + self->priv->input_stream = (GInputStream *)_gvir_input_stream_new(self); + + return self->priv->input_stream; +} + + +static gboolean gvir_stream_close(GIOStream *io_stream, + GCancellable *cancellable, G_GNUC_UNUSED GError **error) +{ + GVirStream *self = GVIR_STREAM(io_stream); + + if (self->priv->input_stream) + g_input_stream_close(self->priv->input_stream, cancellable, NULL); + + if (self->priv->in_dispose) + return TRUE; + + return TRUE; /* FIXME: really close the stream? */ +} + + +static void gvir_stream_close_async(GIOStream *stream, G_GNUC_UNUSED int io_priority, + GCancellable *cancellable, GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *res; + GIOStreamClass *class; + GError *error; + + class = G_IO_STREAM_GET_CLASS(stream); + + /* close is not blocked, just do it! */ + error = NULL; + if (class->close_fn && + !class->close_fn(stream, cancellable, &error)) { + g_simple_async_report_take_gerror_in_idle(G_OBJECT (stream), + callback, user_data, + error); + return; + } + + res = g_simple_async_result_new(G_OBJECT (stream), + callback, + user_data, + gvir_stream_close_async); + g_simple_async_result_complete_in_idle(res); + g_object_unref (res); +} + + +static gboolean +gvir_stream_close_finish(G_GNUC_UNUSED GIOStream *stream, + G_GNUC_UNUSED GAsyncResult *result, + G_GNUC_UNUSED GError **error) +{ + return TRUE; +} + + static void gvir_stream_get_property(GObject *object, guint prop_id, GValue *value, @@ -107,6 +176,9 @@ static void gvir_stream_finalize(GObject *object) DEBUG("Finalize GVirStream=%p", self); + if (self->priv->input_stream) + g_object_unref(self->priv->input_stream); + if (priv->handle) { if (virStreamFinish(priv->handle) < 0) g_critical("cannot finish stream"); @@ -120,12 +192,18 @@ static void gvir_stream_finalize(GObject *object) static void gvir_stream_class_init(GVirStreamClass *klass) { - GObjectClass *object_class = G_OBJECT_CLASS (klass); + GObjectClass *object_class = G_OBJECT_CLASS(klass); + GIOStreamClass *stream_class = G_IO_STREAM_CLASS(klass); object_class->finalize = gvir_stream_finalize; object_class->get_property = gvir_stream_get_property; object_class->set_property = gvir_stream_set_property; + stream_class->get_input_stream = gvir_stream_get_input_stream; + stream_class->close_fn = gvir_stream_close; + stream_class->close_async = gvir_stream_close_async; + stream_class->close_finish = gvir_stream_close_finish; + g_object_class_install_property(object_class, PROP_HANDLE, g_param_spec_boxed("handle", @@ -170,6 +248,50 @@ GType gvir_stream_handle_get_type(void) return handle_type; } +/** + * gvir_stream_receive: + * @stream: the stream + * @buffer: a buffer to read data into (which should be at least @size + * bytes long). + * @size: the number of bytes you want to read from the stream + * @cancellable: (allow-none): a %GCancellable or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Receive data (up to @size bytes) from a stream. + * On error -1 is returned and @error is set accordingly. + * + * gvir_stream_receive() can return any number of bytes, up to + * @size. If more than @size bytes have been received, the additional + * data will be returned in future calls to gvir_stream_receive(). + * + * If there is no data available, a %G_IO_ERROR_WOULD_BLOCK error will be + * returned. + * + * Returns: Number of bytes read, or 0 if the end of stream reached, + * or -1 on error. + */ +gssize gvir_stream_receive(GVirStream *self, gchar *buffer, gsize size, + GCancellable *cancellable, GError **error) +{ + int got; + + g_return_val_if_fail(GVIR_IS_STREAM(self), -1); + g_return_val_if_fail(buffer != NULL, -1); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + got = virStreamRecv(self->priv->handle, buffer, size); + + if (got == -2) { /* blocking */ + g_set_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, NULL); + } else if (got < 0) { + g_set_error(error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + "Got virStreamRecv error in %s", G_STRFUNC); + } + + return got; +} struct stream_sink_helper { GVirStream *self; @@ -197,7 +319,7 @@ stream_sink(virStreamPtr st G_GNUC_UNUSED, * requested data sink. This is simply a convenient alternative * to virStreamRecv, for apps that do blocking-I/o. */ -gint +gssize gvir_stream_receive_all(GVirStream *self, GVirStreamSinkFunc func, gpointer user_data, GError **err) { struct stream_sink_helper helper = { diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h index 5181e24..35526db 100644 --- a/libvirt-gobject/libvirt-gobject-stream.h +++ b/libvirt-gobject/libvirt-gobject-stream.h @@ -28,6 +28,9 @@ #ifndef __LIBVIRT_GOBJECT_STREAM_H__ #define __LIBVIRT_GOBJECT_STREAM_H__ +#include <glib-object.h> +#include <gio/gio.h> + G_BEGIN_DECLS #define GVIR_TYPE_STREAM (gvir_stream_get_type ()) @@ -45,7 +48,7 @@ typedef struct _GVirStreamClass GVirStreamClass; struct _GVirStream { - GObject parent; + GIOStream parent_instance; GVirStreamPrivate *priv; @@ -54,7 +57,7 @@ struct _GVirStream struct _GVirStreamClass { - GObjectClass parent_class; + GIOStreamClass parent_class; gpointer padding[20]; }; @@ -76,7 +79,8 @@ typedef gint (* GVirStreamSinkFunc) (GVirStream *stream, GType gvir_stream_get_type(void); GType gvir_stream_handle_get_type(void); -gint gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err); +gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **err); +gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error); G_END_DECLS -- 1.7.6.2 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list