This allows to use conveniently GIOStream APIs without caring about coroutine and Spice messages details. --- gtk/Makefile.am | 2 + gtk/channel-base.c | 48 ++++++ gtk/channel-port.c | 33 +--- gtk/spice-channel-priv.h | 8 + gtk/vmcstream.c | 436 +++++++++++++++++++++++++++++++++++++++++++++++ gtk/vmcstream.h | 69 ++++++++ 6 files changed, 566 insertions(+), 30 deletions(-) create mode 100644 gtk/vmcstream.c create mode 100644 gtk/vmcstream.h diff --git a/gtk/Makefile.am b/gtk/Makefile.am index 5d29018..1932b46 100644 --- a/gtk/Makefile.am +++ b/gtk/Makefile.am @@ -251,6 +251,8 @@ libspice_client_glib_2_0_la_SOURCES = \ usbutil.c \ usbutil.h \ $(USB_ACL_HELPER_SRCS) \ + vmcstream.c \ + vmcstream.h \ \ decode.h \ decode-glz.c \ diff --git a/gtk/channel-base.c b/gtk/channel-base.c index dff3024..3956a96 100644 --- a/gtk/channel-base.c +++ b/gtk/channel-base.c @@ -189,3 +189,51 @@ void spice_channel_handle_migrate(SpiceChannel *channel, SpiceMsgIn *in) spice_msg_out_send_internal(out); } } + +static void +vmc_write_free_cb(uint8_t *data, void *user_data) +{ + GSimpleAsyncResult *result = user_data; + + g_simple_async_result_complete_in_idle(result); + g_object_unref(result); +} + +G_GNUC_INTERNAL +void spice_vmc_write_async(SpiceChannel *self, + const void *buffer, gsize count, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceMsgOut *msg; + GSimpleAsyncResult *simple; + + simple = g_simple_async_result_new(G_OBJECT(self), callback, user_data, + spice_port_write_async); + g_simple_async_result_set_op_res_gssize(simple, count); + + msg = spice_msg_out_new(SPICE_CHANNEL(self), SPICE_MSGC_SPICEVMC_DATA); + spice_marshaller_add_ref_full(msg->marshaller, (uint8_t*)buffer, count, + vmc_write_free_cb, simple); + spice_msg_out_send(msg); +} + +G_GNUC_INTERNAL +gssize spice_vmc_write_finish(SpiceChannel *self, + GAsyncResult *result, GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail(result != NULL, -1); + + simple = (GSimpleAsyncResult *)result; + + if (g_simple_async_result_propagate_error(simple, error)) + return -1; + + g_return_val_if_fail(g_simple_async_result_is_valid(result, G_OBJECT(self), + spice_port_write_async), -1); + + return g_simple_async_result_get_op_res_gssize(simple); +} diff --git a/gtk/channel-port.c b/gtk/channel-port.c index 1d6eef2..e1b2da5 100644 --- a/gtk/channel-port.c +++ b/gtk/channel-port.c @@ -287,14 +287,6 @@ static void port_handle_msg(SpiceChannel *channel, SpiceMsgIn *in) emit_main_context(channel, SPICE_PORT_DATA, buf, size); } -static void port_write_free_cb(uint8_t *data, void *user_data) -{ - GSimpleAsyncResult *result = user_data; - - g_simple_async_result_complete(result); - g_object_unref(result); -} - /** * spice_port_write_async: * @port: A #SpicePortChannel @@ -318,9 +310,7 @@ void spice_port_write_async(SpicePortChannel *self, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *simple; SpicePortChannelPrivate *c; - SpiceMsgOut *msg; g_return_if_fail(SPICE_IS_PORT_CHANNEL(self)); g_return_if_fail(buffer != NULL); @@ -333,14 +323,8 @@ void spice_port_write_async(SpicePortChannel *self, return; } - simple = g_simple_async_result_new(G_OBJECT(self), callback, user_data, - spice_port_write_async); - g_simple_async_result_set_op_res_gssize(simple, count); - - msg = spice_msg_out_new(SPICE_CHANNEL(self), SPICE_MSGC_SPICEVMC_DATA); - spice_marshaller_add_ref_full(msg->marshaller, (uint8_t*)buffer, count, - port_write_free_cb, simple); - spice_msg_out_send(msg); + spice_vmc_write_async(SPICE_CHANNEL(self), buffer, count, + cancellable, callback, user_data); } /** @@ -358,20 +342,9 @@ void spice_port_write_async(SpicePortChannel *self, gssize spice_port_write_finish(SpicePortChannel *self, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - g_return_val_if_fail(SPICE_IS_PORT_CHANNEL(self), -1); - g_return_val_if_fail(result != NULL, -1); - - simple = (GSimpleAsyncResult *)result; - - if (g_simple_async_result_propagate_error(simple, error)) - return -1; - - g_return_val_if_fail(g_simple_async_result_is_valid(result, G_OBJECT(self), - spice_port_write_async), -1); - return g_simple_async_result_get_op_res_gssize(simple); + return spice_vmc_write_finish(SPICE_CHANNEL(self), result, error); } /** diff --git a/gtk/spice-channel-priv.h b/gtk/spice-channel-priv.h index be061c5..d58b6f1 100644 --- a/gtk/spice-channel-priv.h +++ b/gtk/spice-channel-priv.h @@ -200,6 +200,14 @@ void spice_caps_set(GArray *caps, guint32 cap, const gchar *desc); gchar *spice_channel_supported_string(void); +void spice_vmc_write_async(SpiceChannel *self, + const void *buffer, gsize count, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +gssize spice_vmc_write_finish(SpiceChannel *self, + GAsyncResult *result, GError **error); + G_END_DECLS #endif /* __SPICE_CLIENT_CHANNEL_PRIV_H__ */ diff --git a/gtk/vmcstream.c b/gtk/vmcstream.c new file mode 100644 index 0000000..02af959 --- /dev/null +++ b/gtk/vmcstream.c @@ -0,0 +1,436 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2013 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ +#include <string.h> + +#include "vmcstream.h" +#include "spice-channel-priv.h" +#include "gio-coroutine.h" + +struct _SpiceVmcInputStream +{ + GInputStream parent_instance; + GSimpleAsyncResult *result; + struct coroutine *coroutine; + + SpiceChannel *channel; + guint8 *buffer; + gsize count; + + GCancellable *cancellable; + gulong cancel_id; +}; + +struct _SpiceVmcInputStreamClass +{ + GInputStreamClass parent_class; +}; + +static gssize spice_vmc_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static void spice_vmc_input_stream_read_async (GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +static gssize spice_vmc_input_stream_read_finish (GInputStream *stream, + GAsyncResult *result, + GError **error); +static gssize spice_vmc_input_stream_skip (GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error); +static gboolean spice_vmc_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error); +static void spice_vmc_input_stream_finalize (GObject *object); + +G_DEFINE_TYPE_WITH_CODE(SpiceVmcInputStream, spice_vmc_input_stream, G_TYPE_INPUT_STREAM,) + + +static void +spice_vmc_input_stream_class_init(SpiceVmcInputStreamClass *klass) +{ + GObjectClass *object_class; + GInputStreamClass *istream_class; + + object_class = G_OBJECT_CLASS(klass); + object_class->finalize = spice_vmc_input_stream_finalize; + + istream_class = G_INPUT_STREAM_CLASS(klass); + istream_class->read_fn = spice_vmc_input_stream_read; + istream_class->read_async = spice_vmc_input_stream_read_async; + istream_class->read_finish = spice_vmc_input_stream_read_finish; + istream_class->skip = spice_vmc_input_stream_skip; + istream_class->close_fn = spice_vmc_input_stream_close; +} + +static void +spice_vmc_input_stream_finalize(GObject *object) +{ + G_OBJECT_CLASS (spice_vmc_input_stream_parent_class)->finalize(object); +} + +static void +spice_vmc_input_stream_init(SpiceVmcInputStream *self) +{ +} + +static SpiceVmcInputStream * +spice_vmc_input_stream_new(void) +{ + SpiceVmcInputStream *self; + + self = g_object_new(SPICE_TYPE_VMC_INPUT_STREAM, NULL); + + return self; +} + +/* coroutine */ +G_GNUC_INTERNAL void +spice_vmc_input_stream_co_data(SpiceVmcInputStream *self, + const gpointer d, gsize size) +{ + guint8 *data = d; + + g_return_if_fail(SPICE_IS_VMC_INPUT_STREAM(self)); + g_return_if_fail(self->coroutine == NULL); + + self->coroutine = coroutine_self(); + + while (size > 0) { + if (!self->result) + coroutine_yield(NULL); + + g_return_if_fail(self->result != NULL); + + gsize min = MIN(self->count, size); + memcpy(self->buffer, data, min); + + size -= min; + data += min; + + self->count -= min; + self->buffer += min; + + if (self->count == 0) { + g_simple_async_result_complete_in_idle(self->result); + g_clear_object(&self->result); + } + } + + self->coroutine = NULL; +} + +static void +read_cancelled(GCancellable *cancellable, + gpointer user_data) +{ + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(user_data); + + g_simple_async_result_complete_in_idle(self->result); + g_clear_object(&self->result); +} + +static void +spice_vmc_input_stream_read_async(GInputStream *stream, + void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + GSimpleAsyncResult *result; + + /* no concurrent read permitted by ginputstream */ + g_return_if_fail(self->result == NULL); + self->buffer = buffer; + self->count = count; + result = g_simple_async_result_new(G_OBJECT(self), + callback, + user_data, + spice_vmc_input_stream_read_async); + g_simple_async_result_set_op_res_gssize(result, count); + self->result = result; + self->cancellable = g_object_ref(cancellable); + if (cancellable) + self->cancel_id = + g_cancellable_connect(cancellable, G_CALLBACK(read_cancelled), self, NULL); + + if (self->coroutine) + coroutine_yieldto(self->coroutine, NULL); +} + +static gssize +spice_vmc_input_stream_read_finish(GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + SpiceVmcInputStream *self = SPICE_VMC_INPUT_STREAM(stream); + + g_return_val_if_fail(g_simple_async_result_is_valid(result, + G_OBJECT(self), + spice_vmc_input_stream_read_async), + -1); + + if (self->cancellable) { + g_cancellable_disconnect(self->cancellable, self->cancel_id); + g_clear_object(&self->cancellable); + } + + simple = (GSimpleAsyncResult *)result; + + if (g_simple_async_result_propagate_error(simple, error)) + return -1; + + return g_simple_async_result_get_op_res_gssize(simple); +} + +static gssize +spice_vmc_input_stream_read(GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_reached(-1); +} + +static gssize +spice_vmc_input_stream_skip(GInputStream *stream, + gsize count, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_reached(-1); +} + +static gboolean +spice_vmc_input_stream_close(GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_reached(TRUE); +} + +/* OUTPUT */ + +struct _SpiceVmcOutputStream +{ + GOutputStream parent_instance; + + SpiceChannel *channel; /* weak */ +}; + +struct _SpiceVmcOutputStreamClass +{ + GOutputStreamClass parent_class; +}; + +static void spice_vmc_output_stream_finalize (GObject *object); +static gssize spice_vmc_output_stream_write_fn (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error); +static gssize spice_vmc_output_stream_write_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); +static void spice_vmc_output_stream_write_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +G_DEFINE_TYPE_WITH_CODE(SpiceVmcOutputStream, spice_vmc_output_stream, G_TYPE_OUTPUT_STREAM,) + + +static void +spice_vmc_output_stream_class_init(SpiceVmcOutputStreamClass *klass) +{ + GObjectClass *object_class; + GOutputStreamClass *ostream_class; + + object_class = G_OBJECT_CLASS(klass); + object_class->finalize = spice_vmc_output_stream_finalize; + + ostream_class = G_OUTPUT_STREAM_CLASS(klass); + ostream_class->write_fn = spice_vmc_output_stream_write_fn; + ostream_class->write_async = spice_vmc_output_stream_write_async; + ostream_class->write_finish = spice_vmc_output_stream_write_finish; +} + +static void +spice_vmc_output_stream_finalize(GObject *object) +{ + G_OBJECT_CLASS(spice_vmc_output_stream_parent_class)->finalize(object); +} + +static void +spice_vmc_output_stream_init(SpiceVmcOutputStream *self) +{ +} + +static SpiceVmcOutputStream * +spice_vmc_output_stream_new(SpiceChannel *channel) +{ + SpiceVmcOutputStream *self; + + self = g_object_new(SPICE_TYPE_VMC_OUTPUT_STREAM, NULL); + self->channel = channel; + + return self; +} + +static gssize +spice_vmc_output_stream_write_fn(GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + SpiceMsgOut *msg_out; + + msg_out = spice_msg_out_new(SPICE_CHANNEL(self->channel), + SPICE_MSGC_SPICEVMC_DATA); + spice_marshaller_add(msg_out->marshaller, buffer, count); + spice_msg_out_send(msg_out); + + return count; +} + +static gssize +spice_vmc_output_stream_write_finish(GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + + return spice_vmc_write_finish(self->channel, result, error); +} + +static void +spice_vmc_output_stream_write_async(GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceVmcOutputStream *self = SPICE_VMC_OUTPUT_STREAM(stream); + + spice_vmc_write_async(self->channel, buffer, count, + cancellable, callback, + self); +} + +/* STREAM */ + +struct _SpiceVmcStream +{ + GIOStream parent_instance; + + SpiceChannel *channel; /* weak */ + SpiceVmcInputStream *in; + SpiceVmcOutputStream *out; +}; + +struct _SpiceVmcStreamClass +{ + GIOStreamClass parent_class; +}; + +static void spice_vmc_stream_finalize (GObject *object); +static GInputStream * spice_vmc_stream_get_input_stream (GIOStream *stream); +static GOutputStream * spice_vmc_stream_get_output_stream (GIOStream *stream); + +G_DEFINE_TYPE_WITH_CODE(SpiceVmcStream, spice_vmc_stream, G_TYPE_IO_STREAM,) + +static void +spice_vmc_stream_class_init(SpiceVmcStreamClass *klass) +{ + GObjectClass *object_class; + GIOStreamClass *iostream_class; + + object_class = G_OBJECT_CLASS(klass); + object_class->finalize = spice_vmc_stream_finalize; + + iostream_class = G_IO_STREAM_CLASS(klass); + iostream_class->get_input_stream = spice_vmc_stream_get_input_stream; + iostream_class->get_output_stream = spice_vmc_stream_get_output_stream; +} + +static void +spice_vmc_stream_finalize(GObject *object) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(object); + + g_clear_object(&self->in); + g_clear_object(&self->out); + + G_OBJECT_CLASS(spice_vmc_stream_parent_class)->finalize(object); +} + +static void +spice_vmc_stream_init(SpiceVmcStream *self) +{ +} + +SpiceVmcStream * +spice_vmc_stream_new(SpiceChannel *channel) +{ + SpiceVmcStream *self; + + self = g_object_new(SPICE_TYPE_VMC_STREAM, NULL); + self->channel = channel; + + return self; +} + +static GInputStream * +spice_vmc_stream_get_input_stream(GIOStream *stream) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); + + if (!self->in) + self->in = spice_vmc_input_stream_new(); + + return G_INPUT_STREAM(self->in); +} + +static GOutputStream * +spice_vmc_stream_get_output_stream(GIOStream *stream) +{ + SpiceVmcStream *self = SPICE_VMC_STREAM(stream); + + if (!self->out) + self->out = spice_vmc_output_stream_new(self->channel); + + return G_OUTPUT_STREAM(self->out); +} diff --git a/gtk/vmcstream.h b/gtk/vmcstream.h new file mode 100644 index 0000000..f6a1f4a --- /dev/null +++ b/gtk/vmcstream.h @@ -0,0 +1,69 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2013 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ +#ifndef __SPICE_VMC_STREAM_H__ +#define __SPICE_VMC_STREAM_H__ + +#include <gio/gio.h> + +#include "spice-types.h" + +G_BEGIN_DECLS + +#define SPICE_TYPE_VMC_INPUT_STREAM (spice_vmc_input_stream_get_type ()) +#define SPICE_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStream)) +#define SPICE_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) +#define SPICE_IS_VMC_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_INPUT_STREAM)) +#define SPICE_IS_VMC_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_INPUT_STREAM)) +#define SPICE_VMC_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_INPUT_STREAM, SpiceVmcInputStreamClass)) + +typedef struct _SpiceVmcInputStreamClass SpiceVmcInputStreamClass; +typedef struct _SpiceVmcInputStream SpiceVmcInputStream; + +GType spice_vmc_input_stream_get_type (void) G_GNUC_CONST; +void spice_vmc_input_stream_co_data (SpiceVmcInputStream *input, + const gpointer data, + gsize size); + +#define SPICE_TYPE_VMC_OUTPUT_STREAM (spice_vmc_output_stream_get_type ()) +#define SPICE_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStream)) +#define SPICE_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) +#define SPICE_IS_VMC_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_OUTPUT_STREAM)) +#define SPICE_IS_VMC_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_OUTPUT_STREAM)) +#define SPICE_VMC_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_OUTPUT_STREAM, SpiceVmcOutputStreamClass)) + +typedef struct _SpiceVmcOutputStreamClass SpiceVmcOutputStreamClass; +typedef struct _SpiceVmcOutputStream SpiceVmcOutputStream; + +GType spice_vmc_output_stream_get_type (void) G_GNUC_CONST; + +#define SPICE_TYPE_VMC_STREAM (spice_vmc_stream_get_type ()) +#define SPICE_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStream)) +#define SPICE_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) +#define SPICE_IS_VMC_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_VMC_STREAM)) +#define SPICE_IS_VMC_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_VMC_STREAM)) +#define SPICE_VMC_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_VMC_STREAM, SpiceVmcStreamClass)) + +typedef struct _SpiceVmcStreamClass SpiceVmcStreamClass; +typedef struct _SpiceVmcStream SpiceVmcStream; + +GType spice_vmc_stream_get_type (void) G_GNUC_CONST; +SpiceVmcStream* spice_vmc_stream_new (SpiceChannel *channel); + +G_END_DECLS + +#endif /* __SPICE_VMC_STREAM_H__ */ -- 1.8.3.rc1.49.g8d97506 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel