On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote: > This allows to create a pipe between 2 GIOStream, the input side read > from the peer output side, and vice-versa. > > In the following patches, this will avoid the socket communication > to exchange with the embedded webdav server. > --- > configure.ac | 4 +- > gtk/Makefile.am | 7 + > gtk/giopipe.c | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ > gtk/giopipe.h | 29 ++++ > tests/Makefile.am | 5 + > tests/pipe.c | 313 ++++++++++++++++++++++++++++++++++++ > 6 files changed, 829 insertions(+), 1 deletion(-) > create mode 100644 gtk/giopipe.c > create mode 100644 gtk/giopipe.h > create mode 100644 tests/pipe.c > > diff --git a/configure.ac b/configure.ac > index 4e88dec..d98e502 100644 > --- a/configure.ac > +++ b/configure.ac > @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav], > if test "x$enable_webdav" = "xno"; then > have_phodav="no" > else > - PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no]) > + PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no]) This glib requirement comes from the use of GSimpleIOStream, could be worth explicitly mentioning it in the commit log. > AC_SUBST(PHODAV_CFLAGS) > AC_SUBST(PHODAV_LIBS) > > @@ -289,6 +289,8 @@ fi > AS_IF([test "x$have_phodav" = "xyes"], > AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav])) > > +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"]) > + > AC_ARG_WITH([audio], > AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]), > [], > diff --git a/gtk/Makefile.am b/gtk/Makefile.am > index 7728fec..ab50c79 100644 > --- a/gtk/Makefile.am > +++ b/gtk/Makefile.am > @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES += \ > $(NULL) > endif > > +if WITH_PHODAV > +libspice_client_glib_2_0_la_SOURCES += \ > + giopipe.c \ > + giopipe.h \ Has this GioPipe been proposed upstream? > + $(NULL) > +endif > + > if WITH_UCONTEXT > libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c > endif > diff --git a/gtk/giopipe.c b/gtk/giopipe.c > new file mode 100644 > index 0000000..45007c4 > --- /dev/null > +++ b/gtk/giopipe.c > @@ -0,0 +1,472 @@ > +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ > +/* > + Copyright (C) 2015 Red Hat, Inc. > + > + This library is free software; you can redistribute it and/or > + modify it under the terms of the GNU Lesser General Public > + License as published by the Free Software Foundation; either > + version 2.1 of the License, or (at your option) any later version. > + > + This library is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with this library; if not, see <http://www.gnu.org/licenses/>. > +*/ > + > +#include <string.h> > +#include <errno.h> > + > +#include "giopipe.h" > + > +#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ()) > +#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream)) > +#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) > +#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM)) > +#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM)) > +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) > + > +typedef struct _PipeInputStreamClass PipeInputStreamClass; > +typedef struct _PipeInputStream PipeInputStream; > +typedef struct _PipeOutputStream PipeOutputStream; > + > +struct _PipeInputStream > +{ > + GInputStream parent_instance; > + > + PipeOutputStream *peer; > + gssize read; > + > + /* GIOstream:closed is protected against pending operations, so we > + * use an additional close flag to cancel those when the peer is > + * closing. > + */ > + gboolean closed; > + GSource *source; > +}; > + > +struct _PipeInputStreamClass > +{ > + GInputStreamClass parent_class; > +}; > + > +#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ()) > +#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream)) > +#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) > +#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM)) > +#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM)) > +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) > + > +typedef struct _PipeOutputStreamClass PipeOutputStreamClass; > + > +struct _PipeOutputStream > +{ > + GOutputStream parent_instance; > + > + PipeInputStream *peer; > + const gchar *buffer; > + gsize count; > + gboolean closed; > + GSource *source; > +}; > + > +struct _PipeOutputStreamClass > +{ > + GOutputStreamClass parent_class; > +}; > + > +struct _SpicePipeStreamPrivate { > + PipeInputStream *input_stream; > + PipeOutputStream *output_stream; > +}; This struct does not seem to be used. > + > +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); > +static void pipe_input_stream_check_source (PipeInputStream *self); > +static void pipe_output_stream_check_source (PipeOutputStream *self); > + > +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM, > + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, > + pipe_input_stream_pollable_iface_init)) > + > +static gssize > +pipe_input_stream_read (GInputStream *stream, > + void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error) > +{ > + PipeInputStream *self = PIPE_INPUT_STREAM (stream); > + > + g_return_val_if_fail(count > 0, -1); > + > + if (g_input_stream_is_closed (stream) || self->closed) { It's not clear why you need this 'closed' variable, it seems to always be set together with calls to g_xxxx_stream_close(). Is that in case some of the cleanup done in g_xxxx_stream_close() fails? > + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, > + "Stream is already closed"); > + return -1; > + } > + > + if (!self->peer->buffer) { > + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, > + g_strerror(EAGAIN)); > + return -1; > + } > + > + g_return_val_if_fail(self->peer->buffer, -1); You just checked that self->peer->buffer is not NULL and returned an error if it is. > + > + count = MIN(self->peer->count, count); > + memcpy(buffer, self->peer->buffer, count); > + self->read = count; > + self->peer->buffer = NULL; > + > + //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count); > + pipe_output_stream_check_source(self->peer); > + > + return count; > +} > + > +static void > +pipe_input_stream_check_source (PipeInputStream *self) > +{ > + if (self->source && !g_source_is_destroyed(self->source) && > + g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self))) > + g_source_set_ready_time(self->source, 0); g_source_set_ready_time API doc says "This API is only intended to be used by implementations of GSource. Do not call this API on a GSource that you did not create." > +} > + > +static gboolean > +pipe_input_stream_close (GInputStream *stream, > + GCancellable *cancellable, > + GError **error) > +{ > + PipeInputStream *self; > + > + self = PIPE_INPUT_STREAM(stream); > + > + if (self->peer) { > + /* ignore any pending errors */ > + self->peer->closed = TRUE; > + g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL); > + pipe_output_stream_check_source(self->peer); > + } Why not set self->priv->closed as well? > + > + return TRUE; > +} > + > +static void > +pipe_input_stream_close_async (GInputStream *stream, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer data) > +{ > + GTask *task; > + > + task = g_task_new (stream, cancellable, callback, data); > + > + /* will always return TRUE */ > + pipe_input_stream_close (stream, cancellable, NULL); > + > + g_task_return_boolean (task, TRUE); > + g_object_unref (task); > +} g_input_stream_close_async() API doc says "The asyncronous methods have a default fallback that uses threads to implement asynchronicity, so they are optional for inheriting classes. However, if you override one you must override all." I guess we should do without that one as this is the only async operation you overrode. > + > +static gboolean > +pipe_input_stream_close_finish (GInputStream *stream, > + GAsyncResult *result, > + GError **error) > +{ > + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); > + > + return g_task_propagate_boolean (G_TASK (result), error); > +} > + > +static void > +pipe_input_stream_init (PipeInputStream *self) > +{ > + self->read = -1; > +} > + > +static void > +pipe_input_stream_dispose(GObject *object) > +{ > + PipeInputStream *self; > + > + self = PIPE_INPUT_STREAM(object); > + > + if (self->peer) { > + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); > + self->peer = NULL; > + } > + No cleanup of the 'source' private member? > + G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object); > +} > + > +static void > +pipe_input_stream_class_init (PipeInputStreamClass *klass) > +{ > + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); > + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass); > + > + istream_class->read_fn = pipe_input_stream_read; > + istream_class->close_fn = pipe_input_stream_close; > + istream_class->close_async = pipe_input_stream_close_async; > + istream_class->close_finish = pipe_input_stream_close_finish; > + > + gobject_class->dispose = pipe_input_stream_dispose; > +} > + > +static gboolean > +pipe_input_stream_is_readable (GPollableInputStream *stream) > +{ > + PipeInputStream *self = PIPE_INPUT_STREAM (stream); > + gboolean readable; > + > + readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed; > + //g_debug("readable %p %d", self->peer, readable); > + > + return readable; > +} > + > +static GSource * > +pipe_input_stream_create_source (GPollableInputStream *stream, > + GCancellable *cancellable) > +{ > + PipeInputStream *self = PIPE_INPUT_STREAM(stream); > + GSource *pollable_source; > + > + g_return_val_if_fail (self->source == NULL || > + g_source_is_destroyed (self->source), NULL); > + > + if (self->source && g_source_is_destroyed (self->source)) > + g_source_unref (self->source); > + > + pollable_source = g_pollable_source_new_full (self, NULL, cancellable); > + self->source = g_source_ref (pollable_source); > + > + return pollable_source; > +} > + > +static void > +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) > +{ > + iface->is_readable = pipe_input_stream_is_readable; > + iface->create_source = pipe_input_stream_create_source; > +} > + > +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); > + > +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM, > + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, > + pipe_output_stream_pollable_iface_init)) > + > +static gssize > +pipe_output_stream_write (GOutputStream *stream, > + const void *buffer, > + gsize count, > + GCancellable *cancellable, > + GError **error) > +{ > + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream); > + PipeInputStream *peer = self->peer; > + > + //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count); > + if (g_output_stream_is_closed (stream) || self->closed) { > + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, > + "Stream is already closed"); > + return -1; > + } > + > + /* this abuses pollable stream, writing sync would likely lead to > + crashes, since the buffer pointer would become invalid, a > + generic solution would need a copy.. > + */ > + g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1); > + self->buffer = buffer; > + self->count = count; > + > + pipe_input_stream_check_source(self->peer); This call will trigger a sync call to pipe_input_stream_read() if I followed everything properly? A comment mentioning that might make the code easier to follow. > + > + if (peer->read < 0) { > + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, > + g_strerror (EAGAIN)); > + return -1; > + } > + > + g_assert(peer->read <= self->count); > + count = peer->read; My understanding is that here you don't want to assume that self->count == peer->read... > + > + self->buffer = NULL; > + self->count = 0; ... but here you make the assumption that everything was consumed by the input stream > + peer->read = -1; > + > + return count; > +} > + > +static void > +pipe_output_stream_init (PipeOutputStream *stream) > +{ > +} > + > +static void > +pipe_output_stream_dispose(GObject *object) > +{ > + PipeOutputStream *self; > + > + self = PIPE_OUTPUT_STREAM(object); > + > + if (self->peer) { > + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); > + self->peer = NULL; > + } > + Same question about the 'source' private member. > + G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object); > +} > + > +static void > +pipe_output_stream_check_source (PipeOutputStream *self) > +{ > + if (self->source && !g_source_is_destroyed(self->source) && > + g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self))) > + g_source_set_ready_time(self->source, 0); Same comment about g_source_set_ready_time. > +} > + > +static gboolean > +pipe_output_stream_close (GOutputStream *stream, > + GCancellable *cancellable, > + GError **error) > +{ > + PipeOutputStream *self; > + > + self = PIPE_OUTPUT_STREAM(stream); > + > + if (self->peer) { > + /* ignore any pending errors */ > + self->peer->closed = TRUE; > + g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL); > + pipe_input_stream_check_source(self->peer); > + } > + > + return TRUE; > +} > + > +static void > +pipe_output_stream_close_async (GOutputStream *stream, > + int io_priority, > + GCancellable *cancellable, > + GAsyncReadyCallback callback, > + gpointer data) > +{ > + GTask *task; > + > + task = g_task_new (stream, cancellable, callback, data); > + > + /* will always return TRUE */ > + pipe_output_stream_close (stream, cancellable, NULL); > + > + g_task_return_boolean (task, TRUE); > + g_object_unref (task); > +} Same comment as before about async methods Christophe
Attachment:
pgpG9lANs89LD.pgp
Description: PGP signature
_______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel