ACK On Tue, Feb 11, 2014 at 11:26:27AM +0100, Marc-André Lureau wrote: > This allows to use conveniently GIOStream APIs without caring about > coroutine and Spice messages details. > --- > gtk/Makefile.am | 2 + > gtk/vmcstream.c | 515 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ > gtk/vmcstream.h | 81 +++++++++ > 3 files changed, 598 insertions(+) > create mode 100644 gtk/vmcstream.c > create mode 100644 gtk/vmcstream.h > > diff --git a/gtk/Makefile.am b/gtk/Makefile.am > index 62afd36..7ceb22f 100644 > --- a/gtk/Makefile.am > +++ b/gtk/Makefile.am > @@ -255,6 +255,8 @@ libspice_client_glib_2_0_la_SOURCES = \ > usbutil.c \ > usbutil.h \ > $(USB_ACL_HELPER_SRCS) \ > + vmcstream.c \ > + vmcstream.h \ > \ > decode.h \ > decode-glz.c \ > diff --git a/gtk/vmcstream.c b/gtk/vmcstream.c > new file mode 100644 > index 0000000..3d8e0e0 > --- /dev/null > +++ b/gtk/vmcstream.c > @@ -0,0 +1,515 @@ > +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ > +/* > + Copyright (C) 2013 Red Hat, Inc. > + > + This library is free software; you can redistribute it and/or > + modify it under the terms of the GNU Lesser General Public > + License as published by the Free Software Foundation; either > + version 2.1 of the License, or (at your option) any later version. > + > + This library is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with this library; if not, see <http://www.gnu.org/licenses/>. > +*/ > +#include <string.h> > + > +#include "vmcstream.h" > +#include "spice-channel-priv.h" > +#include "gio-coroutine.h" > + > +struct _SpiceVmcInputStream > +{ > + GInputStream parent_instance; > + GSimpleAsyncResult *result; > + struct coroutine *coroutine; > + > + SpiceChannel *channel; > + gboolean all; > + guint8 *buffer; > + gsize count; > + gsize pos; > + > + GCancellable *cancellable; > + gulong cancel_id; > +}; > + > +struct _SpiceVmcInputStreamClass > +{ > + GInputStreamClass parent_class; > +}; > + > +static gssize spice_vmc_input_stream_read (GInputStream *stream, > + void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error); > +static void spice_vmc_input_stream_read_async (GInputStream *stream, > + void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data); > +static gssize spice_vmc_input_stream_read_finish (GInputStream *stream, > + GAsyncResult *result, > + GError **error); > +static gssize spice_vmc_input_stream_skip (GInputStream *stream, > + gsize count, > + GCancellable *cancellable, > + GError **error); > +static gboolean spice_vmc_input_stream_close (GInputStream *stream, > + GCancellable *cancellable, > + GError **error); > + > +G_DEFINE_TYPE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM) > + > + > +static void > +spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass) > +{ > + GInputStreamClass *istream_class; > + > + istream_class = G_INPUT_STREAM_CLASS(klass); > + istream_class->read_fn = spice_vmc_input_stream_read; > + istream_class->read_async = spice_vmc_input_stream_read_async; > + istream_class->read_finish = spice_vmc_input_stream_read_finish; > + istream_class->skip = spice_vmc_input_stream_skip; > + istream_class->close_fn = spice_vmc_input_stream_close; > +} > + > +static void > +spice_vmc_input_stream_init(SpiceVmcInputStream *self) > +{ > +} > + > +static SpiceVmcInputStream * > +spice_vmc_input_stream_new(void) > +{ > + SpiceVmcInputStream *self; > + > + self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL); > + > + return self; > +} > + > +/* coroutine */ > +/** > + * Feed a SpiceVmc stream with new data from a coroutine > + * > + * The other end will be waiting on read_async() until data is fed > + * here. > + */ > +G_GNUC_INTERNAL void > +spice_vmc_input_stream_co_data(SpiceVmcInputStream *self, > + const gpointer d, gsize size) > +{ > + guint8 *data = d; > + > + g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self)); > + g_return_if_fail(self->coroutine == NULL); > + > + self->coroutine = coroutine_self(); > + > + while (size > 0) { > + SPICE_DEBUG("spicevmc co_data %p", self->result); > + if (!self->result) > + coroutine_yield(NULL); > + > + g_return_if_fail(self->result != NULL); > + > + gsize min = MIN(self->count, size); > + memcpy(self->buffer, data, min); > + > + size -= min; > + data += min; > + > + SPICE_DEBUG("spicevmc co_data complete: %" G_GSIZE_FORMAT > + "/%" G_GSIZE_FORMAT, min, self->count); > + > + self->pos += min; > + self->buffer += min; > + > + if (self->all && min > 0 && self->pos != self->count) > + continue; > + > + g_simple_async_result_set_op_res_gssize(self->result, self->pos); > + > + g_simple_async_result_complete_in_idle(self->result); > + g_clear_object(&self->result); > + if (self->cancellable) { > + g_cancellable_disconnect(self->cancellable, self->cancel_id); > + g_clear_object(&self->cancellable); > + } > + } > + > + self->coroutine = NULL; > +} > + > +static void > +read_cancelled(GCancellable *cancellable, > + gpointer user_data) > +{ > + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data); > + > + SPICE_DEBUG("read cancelled, %p", self->result); > + g_simple_async_result_set_error(self->result, > + G_IO_ERROR, G_IO_ERROR_CANCELLED, > + "read cancelled"); > + g_simple_async_result_complete_in_idle(self->result); > + g_clear_object(&self->result); > + if (self->cancellable) { > + g_cancellable_disconnect(self->cancellable, self->cancel_id); > + g_clear_object(&self->cancellable); > + } > +} > + > +void > +spice_vmc_input_stream_read_all_async(GInputStream *stream, > + void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data) > +{ > + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); > + GSimpleAsyncResult *result; > + > + /* no concurrent read permitted by ginputstream */ > + g_return_if_fail(self->result == NULL); > + g_return_if_fail(self->cancellable == NULL); > + self->all = TRUE; > + self->buffer = buffer; > + self->count = count; > + self->pos = 0; > + result = g_simple_async_result_new(G_OBJECT(self), > + callback, > + user_data, > + spice_vmc_input_stream_read_async); > + self->result = result; > + self->cancellable = g_object_ref(cancellable); > + if (cancellable) > + self->cancel_id = > + g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); > + > + if (self->coroutine) > + coroutine_yieldto(self->coroutine, NULL); > +} > + > +gssize > +spice_vmc_input_stream_read_all_finish(GInputStream *stream, > + GAsyncResult *result, > + GError **error) > +{ > + GSimpleAsyncResult *simple; > + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); > + > + g_return_val_if_fail(g_simple_async_result_is_valid(result, > + G_OBJECT(self), > + spice_vmc_input_stream_read_async), > + -1); > + > + simple = (GSimpleAsyncResult *)result; > + > + if (g_simple_async_result_propagate_error(simple, error)) > + return -1; > + > + return g_simple_async_result_get_op_res_gssize(simple); > +} > + > +static void > +spice_vmc_input_stream_read_async(GInputStream *stream, > + void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data) > +{ > + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); > + GSimpleAsyncResult *result; > + > + /* no concurrent read permitted by ginputstream */ > + g_return_if_fail(self->result == NULL); > + g_return_if_fail(self->cancellable == NULL); > + self->all = FALSE; > + self->buffer = buffer; > + self->count = count; > + self->pos = 0; > + result = g_simple_async_result_new(G_OBJECT(self), > + callback, > + user_data, > + spice_vmc_input_stream_read_async); > + self->result = result; > + self->cancellable = g_object_ref(cancellable); > + if (cancellable) > + self->cancel_id = > + g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); > + > + if (self->coroutine) > + coroutine_yieldto(self->coroutine, NULL); > +} > + > +static gssize > +spice_vmc_input_stream_read_finish(GInputStream *stream, > + GAsyncResult *result, > + GError **error) > +{ > + GSimpleAsyncResult *simple; > + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); > + > + g_return_val_if_fail(g_simple_async_result_is_valid(result, > + G_OBJECT(self), > + spice_vmc_input_stream_read_async), > + -1); > + > + simple = (GSimpleAsyncResult *)result; > + > + if (g_simple_async_result_propagate_error(simple, error)) > + return -1; > + > + return g_simple_async_result_get_op_res_gssize(simple); > +} > + > +static gssize > +spice_vmc_input_stream_read(GInputStream *stream, > + void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error) > +{ > + g_return_val_if_reached(-1); > +} > + > +static gssize > +spice_vmc_input_stream_skip(GInputStream *stream, > + gsize count, > + GCancellable *cancellable, > + GError **error) > +{ > + g_return_val_if_reached(-1); > +} > + > +static gboolean > +spice_vmc_input_stream_close(GInputStream *stream, > + GCancellable *cancellable, > + GError **error) > +{ > + SPICE_DEBUG("fake close"); > + return TRUE; > +} > + > +/* OUTPUT */ > + > +struct _SpiceVmcOutputStream > +{ > + GOutputStream parent_instance; > + > + SpiceChannel *channel; /* weak */ > +}; > + > +struct _SpiceVmcOutputStreamClass > +{ > + GOutputStreamClass parent_class; > +}; > + > +static gssize spice_vmc_output_stream_write_fn (GOutputStream *stream, > + const void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error); > +static gssize spice_vmc_output_stream_write_finish (GOutputStream *stream, > + GAsyncResult *result, > + GError **error); > +static void spice_vmc_output_stream_write_async (GOutputStream *stream, > + const void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data); > + > +G_DEFINE_TYPE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM) > + > + > +static void > +spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass) > +{ > + GOutputStreamClass *ostream_class; > + > + ostream_class = G_OUTPUT_STREAM_CLASS(klass); > + ostream_class->write_fn = spice_vmc_output_stream_write_fn; > + ostream_class->write_async = spice_vmc_output_stream_write_async; > + ostream_class->write_finish = spice_vmc_output_stream_write_finish; > +} > + > +static void > +spice_vmc_output_stream_init(SpiceVmcOutputStream *self) > +{ > +} > + > +static SpiceVmcOutputStream * > +spice_vmc_output_stream_new(SpiceChannel *channel) > +{ > + SpiceVmcOutputStream *self; > + > + self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL); > + self->channel = channel; > + > + return self; > +} > + > +static gssize > +spice_vmc_output_stream_write_fn(GOutputStream *stream, > + const void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error) > +{ > + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); > + SpiceMsgOut *msg_out; > + > + msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel), > + SPICE_MSGC_SPICEVMC_DATA); > + spice_marshaller_add(msg_out->marshaller, buffer, count); > + spice_msg_out_send(msg_out); > + > + return count; > +} > + > +static gssize > +spice_vmc_output_stream_write_finish(GOutputStream *stream, > + GAsyncResult *simple, > + GError **error) > +{ > + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); > + GSimpleAsyncResult *res = > + g_simple_async_result_get_op_res_gpointer(G_SIMPLE_ASYNC_RESULT(simple)); > + > + SPICE_DEBUG("spicevmc write finish"); > + return spice_vmc_write_finish(self->channel, G_ASYNC_RESULT(res), error); > +} > + > +static void > +write_cb(GObject *source_object, > + GAsyncResult *res, > + gpointer user_data) > +{ > + GSimpleAsyncResult *simple = user_data; > + > + g_simple_async_result_set_op_res_gpointer(simple, res, NULL); > + > + g_simple_async_result_complete(simple); > + g_object_unref(simple); > +} > + > +static void > +spice_vmc_output_stream_write_async(GOutputStream *stream, > + const void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data) > +{ > + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); > + GSimpleAsyncResult *simple; > + > + SPICE_DEBUG("spicevmc write async"); > + /* an AsyncResult to forward async op to channel */ > + simple = g_simple_async_result_new(G_OBJECT(self), callback, user_data, > + spice_vmc_output_stream_write_async); > + > + spice_vmc_write_async(self->channel, buffer, count, > + cancellable, write_cb, > + simple); > +} > + > +/* STREAM */ > + > +struct _SpiceVmcStream > +{ > + GIOStream parent_instance; > + > + SpiceChannel *channel; /* weak */ > + SpiceVmcInputStream *in; > + SpiceVmcOutputStream *out; > +}; > + > +struct _SpiceVmcStreamClass > +{ > + GIOStreamClass parent_class; > +}; > + > +static void spice_vmc_stream_finalize (GObject *object); > +static GInputStream * spice_vmc_stream_get_input_stream (GIOStream *stream); > +static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream); > + > +G_DEFINE_TYPE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM) > + > +static void > +spice_vmc_stream_class_init(SpiceVmcStreamClass *klass) > +{ > + GObjectClass *object_class; > + GIOStreamClass *iostream_class; > + > + object_class = G_OBJECT_CLASS(klass); > + object_class->finalize = spice_vmc_stream_finalize; > + > + iostream_class = G_IO_STREAM_CLASS(klass); > + iostream_class->get_input_stream = spice_vmc_stream_get_input_stream; > + iostream_class->get_output_stream = spice_vmc_stream_get_output_stream; > +} > + > +static void > +spice_vmc_stream_finalize(GObject *object) > +{ > + SpiceVmcStream *self = SPICE_VMC_STREAM(object); > + > + g_clear_object(&self->in); > + g_clear_object(&self->out); > + > + G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object); > +} > + > +static void > +spice_vmc_stream_init(SpiceVmcStream *self) > +{ > +} > + > +SpiceVmcStream * > +spice_vmc_stream_new(SpiceChannel *channel) > +{ > + SpiceVmcStream *self; > + > + self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL); > + self->channel = channel; > + > + return self; > +} > + > +static GInputStream * > +spice_vmc_stream_get_input_stream(GIOStream *stream) > +{ > + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); > + > + if (!self->in) > + self->in = spice_vmc_input_stream_new(); > + > + return G_INPUT_STREAM(self->in); > +} > + > +static GOutputStream * > +spice_vmc_stream_get_output_stream(GIOStream *stream) > +{ > + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); > + > + if (!self->out) > + self->out = spice_vmc_output_stream_new(self->channel); > + > + return G_OUTPUT_STREAM(self->out); > +} > diff --git a/gtk/vmcstream.h b/gtk/vmcstream.h > new file mode 100644 > index 0000000..1316b77 > --- /dev/null > +++ b/gtk/vmcstream.h > @@ -0,0 +1,81 @@ > +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ > +/* > + Copyright (C) 2013 Red Hat, Inc. > + > + This library is free software; you can redistribute it and/or > + modify it under the terms of the GNU Lesser General Public > + License as published by the Free Software Foundation; either > + version 2.1 of the License, or (at your option) any later version. > + > + This library is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with this library; if not, see <http://www.gnu.org/licenses/>. > +*/ > +#ifndef __SPICE_VMC_STREAM_H__ > +#define __SPICE_VMC_STREAM_H__ > + > +#include <gio/gio.h> > + > +#include "spice-types.h" > + > +G_BEGIN_DECLS > + > +#define SPICE_TYPE_VMC_INPUT_STREAM (spice_vmc_input_stream_get_type ()) > +#define SPICE_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStream)) > +#define SPICE_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) > +#define SPICE_IS_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_INPUT_STREAM)) > +#define SPICE_IS_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_INPUT_STREAM)) > +#define SPICE_VMC_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) > + > +typedef struct _SpiceVmcInputStreamClass SpiceVmcInputStreamClass; > +typedef struct _SpiceVmcInputStream SpiceVmcInputStream; > + > +GType spice_vmc_input_stream_get_type (void) G_GNUC_CONST; > +void spice_vmc_input_stream_co_data (SpiceVmcInputStream *input, > + const gpointer data, > + gsize size); > + > +void spice_vmc_input_stream_read_all_async(GInputStream *stream, > + void *buffer, > + gsize count, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer user_data); > +gssize spice_vmc_input_stream_read_all_finish(GInputStream *stream, > + GAsyncResult *result, > + GError **error); > + > + > +#define SPICE_TYPE_VMC_OUTPUT_STREAM (spice_vmc_output_stream_get_type ()) > +#define SPICE_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStream)) > +#define SPICE_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) > +#define SPICE_IS_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_OUTPUT_STREAM)) > +#define SPICE_IS_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_OUTPUT_STREAM)) > +#define SPICE_VMC_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) > + > +typedef struct _SpiceVmcOutputStreamClass SpiceVmcOutputStreamClass; > +typedef struct _SpiceVmcOutputStream SpiceVmcOutputStream; > + > +GType spice_vmc_output_stream_get_type (void) G_GNUC_CONST; > + > +#define SPICE_TYPE_VMC_STREAM (spice_vmc_stream_get_type ()) > +#define SPICE_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStream)) > +#define SPICE_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) > +#define SPICE_IS_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_STREAM)) > +#define SPICE_IS_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_STREAM)) > +#define SPICE_VMC_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) > + > +typedef struct _SpiceVmcStreamClass SpiceVmcStreamClass; > +typedef struct _SpiceVmcStream SpiceVmcStream; > + > +GType spice_vmc_stream_get_type (void) G_GNUC_CONST; > +SpiceVmcStream* spice_vmc_stream_new (SpiceChannel *channel); > + > +G_END_DECLS > + > +#endif /* __SPICE_VMC_STREAM_H__ */ > -- > 1.8.4.2 > > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > http://lists.freedesktop.org/mailman/listinfo/spice-devel
Attachment:
pgptmWH5Fsz4i.pgp
Description: PGP signature
_______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel