This allows to use conveniently GIOStream APIs without caring about coroutine and Spice messages details. --- gtk/Makefile.am | 2 + gtk/vmcstream.c | 532 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ gtk/vmcstream.h | 81 +++++++++ 3 files changed, 615 insertions(+) create mode 100644 gtk/vmcstream.c create mode 100644 gtk/vmcstream.h diff --git a/gtk/Makefile.am b/gtk/Makefile.am index 62afd36..7ceb22f 100644 --- a/gtk/Makefile.am +++ b/gtk/Makefile.am @@ -255,6 +255,8 @@ libspice_client_glib_2_0_la_SOURCES = \ usbutil.c \ usbutil.h \ $(USB_ACL_HELPER_SRCS) \ + vmcstream.c \ + vmcstream.h \ \ decode.h \ decode-glz.c \ diff --git a/gtk/vmcstream.c b/gtk/vmcstream.c new file mode 100644 index 0000000..f92f8b7 --- /dev/null +++ b/gtk/vmcstream.c @@ -0,0 +1,532 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2013 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ +#include <string.h> + +#include "vmcstream.h" +#include "spice-channel-priv.h" +#include "gio-coroutine.h" + +struct _SpiceVmcInputStream +{ + GInputStream parent_instance; + GSimpleAsyncResult *result; + struct coroutine *coroutine; + + SpiceChannel *channel; + gboolean all; + guint8 *buffer; + gsize count; + gsize pos; + + GCancellable *cancellable; + gulong cancel_id; +}; + +struct _SpiceVmcInputStreamClass +{ + GInputStreamClass parent_class; +}; + +static gssize spice_vmc_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static void spice_vmc_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +static gssize spice_vmc_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static gssize spice_vmc_input_stream_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean spice_vmc_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); + +G_DEFINE_TYPE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM) + + +static void +spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass) +{ + GInputStreamClass *istream_class; + + istream_class = G_INPUT_STREAM_CLASS(klass); + istream_class->read_fn = spice_vmc_input_stream_read; + istream_class->read_async = spice_vmc_input_stream_read_async; + istream_class->read_finish = spice_vmc_input_stream_read_finish; + istream_class->skip = spice_vmc_input_stream_skip; + istream_class->close_fn = spice_vmc_input_stream_close; +} + +static void +spice_vmc_input_stream_init(SpiceVmcInputStream *self) +{ +} + +static SpiceVmcInputStream * +spice_vmc_input_stream_new(void) +{ + SpiceVmcInputStream *self; + + self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL); + + return self; +} + +/* coroutine */ +/** + * Feed a SpiceVmc stream with new data from a coroutine + * + * The other end will be waiting on read_async() until data is fed + * here. + */ +G_GNUC_INTERNAL void +spice_vmc_input_stream_co_data(SpiceVmcInputStream *self, + const gpointer d, gsize size) +{ + guint8 *data = d; + + g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self)); + g_return_if_fail(self->coroutine == NULL); + + self->coroutine = coroutine_self(); + + while (size > 0) { + SPICE_DEBUG("spicevmc co_data %p", self->result); + if (!self->result) + coroutine_yield(NULL); + + g_return_if_fail(self->result != NULL); + + gsize min = MIN(self->count, size); + memcpy(self->buffer, data, min); + + size -= min; + data += min; + + SPICE_DEBUG("spicevmc co_data complete: %" G_GSIZE_FORMAT + "/%" G_GSIZE_FORMAT, min, self->count); + + self->pos += min; + self->buffer += min; + + if (self->all && min > 0 && self->pos != self->count) + continue; + + g_simple_async_result_set_op_res_gssize(self->result, self->pos); + + g_simple_async_result_complete_in_idle(self->result); + g_clear_object(&self->result); + if (self->cancellable) { + g_cancellable_disconnect(self->cancellable, self->cancel_id); + g_clear_object(&self->cancellable); + } + } + + self->coroutine = NULL; +} + +static void +read_cancelled(GCancellable *cancellable, + gpointer user_data) +{ + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data); + + SPICE_DEBUG("read cancelled, %p", self->result); + g_simple_async_result_set_error(self->result, + G_IO_ERROR, G_IO_ERROR_CANCELLED, + "read cancelled"); + g_simple_async_result_complete_in_idle(self->result); + + g_clear_object(&self->result); + + /* See FIXME */ + /* if (self->cancellable) { */ + /* g_cancellable_disconnect(self->cancellable, self->cancel_id); */ + /* g_clear_object(&self->cancellable); */ + /* } */ +} + +void +spice_vmc_input_stream_read_all_async(GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + GSimpleAsyncResult *result; + + /* no concurrent read permitted by ginputstream */ + g_return_if_fail(self->result == NULL); + g_return_if_fail(self->cancellable == NULL); + self->all = TRUE; + self->buffer = buffer; + self->count = count; + self->pos = 0; + result = g_simple_async_result_new(G_OBJECT(self), + callback, + user_data, + spice_vmc_input_stream_read_async); + self->result = result; + self->cancellable = g_object_ref(cancellable); + if (cancellable) + self->cancel_id = + g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); + + if (self->coroutine) + coroutine_yieldto(self->coroutine, NULL); +} + +gssize +spice_vmc_input_stream_read_all_finish(GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + + g_return_val_if_fail(g_simple_async_result_is_valid(result, + G_OBJECT(self), + spice_vmc_input_stream_read_async), + -1); + + simple = (GSimpleAsyncResult *)result; + + /* FIXME: calling _finish() is required. Disconnecting in + read_cancelled() causes a deadlock. #705395 */ + if (self->cancellable) { + g_cancellable_disconnect(self->cancellable, self->cancel_id); + g_clear_object(&self->cancellable); + } + + if (g_simple_async_result_propagate_error(simple, error)) + return -1; + + return g_simple_async_result_get_op_res_gssize(simple); +} + +static void +spice_vmc_input_stream_read_async(GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + GSimpleAsyncResult *result; + + /* no concurrent read permitted by ginputstream */ + g_return_if_fail(self->result == NULL); + g_return_if_fail(self->cancellable == NULL); + self->all = FALSE; + self->buffer = buffer; + self->count = count; + self->pos = 0; + result = g_simple_async_result_new(G_OBJECT(self), + callback, + user_data, + spice_vmc_input_stream_read_async); + self->result = result; + self->cancellable = g_object_ref(cancellable); + if (cancellable) + self->cancel_id = + g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); + + if (self->coroutine) + coroutine_yieldto(self->coroutine, NULL); +} + +static gssize +spice_vmc_input_stream_read_finish(GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + + g_return_val_if_fail(g_simple_async_result_is_valid(result, + G_OBJECT(self), + spice_vmc_input_stream_read_async), + -1); + + simple = (GSimpleAsyncResult *)result; + + /* FIXME: calling _finish() is required. Disconnecting in + read_cancelled() causes a deadlock. #705395 */ + if (self->cancellable) { + g_cancellable_disconnect(self->cancellable, self->cancel_id); + g_clear_object(&self->cancellable); + } + + if (g_simple_async_result_propagate_error(simple, error)) + return -1; + + return g_simple_async_result_get_op_res_gssize(simple); +} + +static gssize +spice_vmc_input_stream_read(GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_reached(-1); +} + +static gssize +spice_vmc_input_stream_skip(GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_reached(-1); +} + +static gboolean +spice_vmc_input_stream_close(GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + SPICE_DEBUG("fake close"); + return TRUE; +} + +/* OUTPUT */ + +struct _SpiceVmcOutputStream +{ + GOutputStream parent_instance; + + SpiceChannel *channel; /* weak */ +}; + +struct _SpiceVmcOutputStreamClass +{ + GOutputStreamClass parent_class; +}; + +static gssize spice_vmc_output_stream_write_fn (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gssize spice_vmc_output_stream_write_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); +static void spice_vmc_output_stream_write_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +G_DEFINE_TYPE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM) + + +static void +spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass) +{ + GOutputStreamClass *ostream_class; + + ostream_class = G_OUTPUT_STREAM_CLASS(klass); + ostream_class->write_fn = spice_vmc_output_stream_write_fn; + ostream_class->write_async = spice_vmc_output_stream_write_async; + ostream_class->write_finish = spice_vmc_output_stream_write_finish; +} + +static void +spice_vmc_output_stream_init(SpiceVmcOutputStream *self) +{ +} + +static SpiceVmcOutputStream * +spice_vmc_output_stream_new(SpiceChannel *channel) +{ + SpiceVmcOutputStream *self; + + self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL); + self->channel = channel; + + return self; +} + +static gssize +spice_vmc_output_stream_write_fn(GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + SpiceMsgOut *msg_out; + + msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel), + SPICE_MSGC_SPICEVMC_DATA); + spice_marshaller_add(msg_out->marshaller, buffer, count); + spice_msg_out_send(msg_out); + + return count; +} + +static gssize +spice_vmc_output_stream_write_finish(GOutputStream *stream, + GAsyncResult *simple, + GError **error) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + GSimpleAsyncResult *res = + g_simple_async_result_get_op_res_gpointer(G_SIMPLE_ASYNC_RESULT(simple)); + + SPICE_DEBUG("spicevmc write finish"); + return spice_vmc_write_finish(self->channel, G_ASYNC_RESULT(res), error); +} + +static void +write_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GSimpleAsyncResult *simple = user_data; + + g_simple_async_result_set_op_res_gpointer(simple, res, NULL); + + g_simple_async_result_complete(simple); + g_object_unref(simple); +} + +static void +spice_vmc_output_stream_write_async(GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + GSimpleAsyncResult *simple; + + SPICE_DEBUG("spicevmc write async"); + /* an AsyncResult to forward async op to channel */ + simple = g_simple_async_result_new(G_OBJECT(self), callback, user_data, + spice_vmc_output_stream_write_async); + + spice_vmc_write_async(self->channel, buffer, count, + cancellable, write_cb, + simple); +} + +/* STREAM */ + +struct _SpiceVmcStream +{ + GIOStream parent_instance; + + SpiceChannel *channel; /* weak */ + SpiceVmcInputStream *in; + SpiceVmcOutputStream *out; +}; + +struct _SpiceVmcStreamClass +{ + GIOStreamClass parent_class; +}; + +static void spice_vmc_stream_finalize (GObject *object); +static GInputStream * spice_vmc_stream_get_input_stream (GIOStream *stream); +static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream); + +G_DEFINE_TYPE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM) + +static void +spice_vmc_stream_class_init(SpiceVmcStreamClass *klass) +{ + GObjectClass *object_class; + GIOStreamClass *iostream_class; + + object_class = G_OBJECT_CLASS(klass); + object_class->finalize = spice_vmc_stream_finalize; + + iostream_class = G_IO_STREAM_CLASS(klass); + iostream_class->get_input_stream = spice_vmc_stream_get_input_stream; + iostream_class->get_output_stream = spice_vmc_stream_get_output_stream; +} + +static void +spice_vmc_stream_finalize(GObject *object) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(object); + + g_clear_object(&self->in); + g_clear_object(&self->out); + + G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object); +} + +static void +spice_vmc_stream_init(SpiceVmcStream *self) +{ +} + +SpiceVmcStream * +spice_vmc_stream_new(SpiceChannel *channel) +{ + SpiceVmcStream *self; + + self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL); + self->channel = channel; + + return self; +} + +static GInputStream * +spice_vmc_stream_get_input_stream(GIOStream *stream) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); + + if (!self->in) + self->in = spice_vmc_input_stream_new(); + + return G_INPUT_STREAM(self->in); +} + +static GOutputStream * +spice_vmc_stream_get_output_stream(GIOStream *stream) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); + + if (!self->out) + self->out = spice_vmc_output_stream_new(self->channel); + + return G_OUTPUT_STREAM(self->out); +} diff --git a/gtk/vmcstream.h b/gtk/vmcstream.h new file mode 100644 index 0000000..1316b77 --- /dev/null +++ b/gtk/vmcstream.h @@ -0,0 +1,81 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2013 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ +#ifndef __SPICE_VMC_STREAM_H__ +#define __SPICE_VMC_STREAM_H__ + +#include <gio/gio.h> + +#include "spice-types.h" + +G_BEGIN_DECLS + +#define SPICE_TYPE_VMC_INPUT_STREAM (spice_vmc_input_stream_get_type ()) +#define SPICE_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStream)) +#define SPICE_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) +#define SPICE_IS_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_INPUT_STREAM)) +#define SPICE_IS_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_INPUT_STREAM)) +#define SPICE_VMC_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) + +typedef struct _SpiceVmcInputStreamClass SpiceVmcInputStreamClass; +typedef struct _SpiceVmcInputStream SpiceVmcInputStream; + +GType spice_vmc_input_stream_get_type (void) G_GNUC_CONST; +void spice_vmc_input_stream_co_data (SpiceVmcInputStream *input, + const gpointer data, + gsize size); + +void spice_vmc_input_stream_read_all_async(GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gssize spice_vmc_input_stream_read_all_finish(GInputStream *stream, + GAsyncResult *result, + GError **error); + + +#define SPICE_TYPE_VMC_OUTPUT_STREAM (spice_vmc_output_stream_get_type ()) +#define SPICE_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStream)) +#define SPICE_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) +#define SPICE_IS_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_OUTPUT_STREAM)) +#define SPICE_IS_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_OUTPUT_STREAM)) +#define SPICE_VMC_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) + +typedef struct _SpiceVmcOutputStreamClass SpiceVmcOutputStreamClass; +typedef struct _SpiceVmcOutputStream SpiceVmcOutputStream; + +GType spice_vmc_output_stream_get_type (void) G_GNUC_CONST; + +#define SPICE_TYPE_VMC_STREAM (spice_vmc_stream_get_type ()) +#define SPICE_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStream)) +#define SPICE_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) +#define SPICE_IS_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_STREAM)) +#define SPICE_IS_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_STREAM)) +#define SPICE_VMC_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) + +typedef struct _SpiceVmcStreamClass SpiceVmcStreamClass; +typedef struct _SpiceVmcStream SpiceVmcStream; + +GType spice_vmc_stream_get_type (void) G_GNUC_CONST; +SpiceVmcStream* spice_vmc_stream_new (SpiceChannel *channel); + +G_END_DECLS + +#endif /* __SPICE_VMC_STREAM_H__ */ -- 1.8.4.2 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel