On Mon, Feb 23, 2015 at 1:28 PM, Christophe Fergeau <cfergeau@xxxxxxxxxx> wrote: > On Sat, Feb 21, 2015 at 01:40:12AM +0100, Marc-André Lureau wrote: >> This allows to create a pipe between 2 GIOStream, the input side read >> from the peer output side, and vice-versa. >> >> In the following patches, this will avoid the socket communication >> to exchange with the embedded webdav server. >> --- >> configure.ac | 4 +- >> gtk/Makefile.am | 7 + >> gtk/giopipe.c | 472 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> gtk/giopipe.h | 29 ++++ >> tests/Makefile.am | 5 + >> tests/pipe.c | 313 ++++++++++++++++++++++++++++++++++++ >> 6 files changed, 829 insertions(+), 1 deletion(-) >> create mode 100644 gtk/giopipe.c >> create mode 100644 gtk/giopipe.h >> create mode 100644 tests/pipe.c >> >> diff --git a/configure.ac b/configure.ac >> index 4e88dec..d98e502 100644 >> --- a/configure.ac >> +++ b/configure.ac >> @@ -278,7 +278,7 @@ AC_ARG_ENABLE([webdav], >> if test "x$enable_webdav" = "xno"; then >> have_phodav="no" >> else >> - PKG_CHECK_MODULES(PHODAV, [libphodav-1.0], [have_phodav=yes], [have_phodav=no]) >> + PKG_CHECK_MODULES(PHODAV, [libphodav-1.0 glib-2.0 >= 2.43.90], [have_phodav=yes], [have_phodav=no]) > > This glib requirement comes from the use of GSimpleIOStream, could be > worth explicitly mentioning it in the commit log. I don't see much need for this detail, there might be other API from newer glib used, but ok. > >> AC_SUBST(PHODAV_CFLAGS) >> AC_SUBST(PHODAV_LIBS) >> >> @@ -289,6 +289,8 @@ fi >> AS_IF([test "x$have_phodav" = "xyes"], >> AC_DEFINE([USE_PHODAV], [1], [Define if supporting phodav])) >> >> +AM_CONDITIONAL([WITH_PHODAV], [test "x$have_phodav" = "xyes"]) >> + >> AC_ARG_WITH([audio], >> AS_HELP_STRING([--with-audio=@<:@gstreamer/pulse/auto/no@:>@], [Select audio backend @<:@default=auto@:>@]), >> [], >> diff --git a/gtk/Makefile.am b/gtk/Makefile.am >> index 7728fec..ab50c79 100644 >> --- a/gtk/Makefile.am >> +++ b/gtk/Makefile.am >> @@ -346,6 +346,13 @@ libspice_client_glib_2_0_la_SOURCES += \ >> $(NULL) >> endif >> >> +if WITH_PHODAV >> +libspice_client_glib_2_0_la_SOURCES += \ >> + giopipe.c \ >> + giopipe.h \ > > Has this GioPipe been proposed upstream? No, it's far from being acceptable by upstream since it's only handling async. Furthermore, I think upstream is reluctant to adding more GIO stream code for some reason (I think too much burden already). > >> + $(NULL) >> +endif >> + >> if WITH_UCONTEXT >> libspice_client_glib_2_0_la_SOURCES += continuation.h continuation.c coroutine_ucontext.c >> endif >> diff --git a/gtk/giopipe.c b/gtk/giopipe.c >> new file mode 100644 >> index 0000000..45007c4 >> --- /dev/null >> +++ b/gtk/giopipe.c >> @@ -0,0 +1,472 @@ >> +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ >> +/* >> + Copyright (C) 2015 Red Hat, Inc. >> + >> + This library is free software; you can redistribute it and/or >> + modify it under the terms of the GNU Lesser General Public >> + License as published by the Free Software Foundation; either >> + version 2.1 of the License, or (at your option) any later version. >> + >> + This library is distributed in the hope that it will be useful, >> + but WITHOUT ANY WARRANTY; without even the implied warranty of >> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU >> + Lesser General Public License for more details. >> + >> + You should have received a copy of the GNU Lesser General Public >> + License along with this library; if not, see <http://www.gnu.org/licenses/>. >> +*/ >> + >> +#include <string.h> >> +#include <errno.h> >> + >> +#include "giopipe.h" >> + >> +#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ()) >> +#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream)) >> +#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) >> +#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM)) >> +#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM)) >> +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) >> + >> +typedef struct _PipeInputStreamClass PipeInputStreamClass; >> +typedef struct _PipeInputStream PipeInputStream; >> +typedef struct _PipeOutputStream PipeOutputStream; >> + >> +struct _PipeInputStream >> +{ >> + GInputStream parent_instance; >> + >> + PipeOutputStream *peer; >> + gssize read; >> + >> + /* GIOstream:closed is protected against pending operations, so we >> + * use an additional close flag to cancel those when the peer is >> + * closing. >> + */ >> + gboolean closed; >> + GSource *source; >> +}; >> + >> +struct _PipeInputStreamClass >> +{ >> + GInputStreamClass parent_class; >> +}; >> + >> +#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ()) >> +#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream)) >> +#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) >> +#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM)) >> +#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM)) >> +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) >> + >> +typedef struct _PipeOutputStreamClass PipeOutputStreamClass; >> + >> +struct _PipeOutputStream >> +{ >> + GOutputStream parent_instance; >> + >> + PipeInputStream *peer; >> + const gchar *buffer; >> + gsize count; >> + gboolean closed; >> + GSource *source; >> +}; >> + >> +struct _PipeOutputStreamClass >> +{ >> + GOutputStreamClass parent_class; >> +}; >> + >> +struct _SpicePipeStreamPrivate { >> + PipeInputStream *input_stream; >> + PipeOutputStream *output_stream; >> +}; > > This struct does not seem to be used. right, it's a left over from previous version > >> + >> +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); >> +static void pipe_input_stream_check_source (PipeInputStream *self); >> +static void pipe_output_stream_check_source (PipeOutputStream *self); >> + >> +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM, >> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, >> + pipe_input_stream_pollable_iface_init)) >> + >> +static gssize >> +pipe_input_stream_read (GInputStream *stream, >> + void *buffer, >> + gsize count, >> + GCancellable *cancellable, >> + GError **error) >> +{ >> + PipeInputStream *self = PIPE_INPUT_STREAM (stream); >> + >> + g_return_val_if_fail(count > 0, -1); >> + >> + if (g_input_stream_is_closed (stream) || self->closed) { > > It's not clear why you need this 'closed' variable, it seems to always > be set together with calls to g_xxxx_stream_close(). Is that in case > some of the cleanup done in g_xxxx_stream_close() fails? because closing from peer must immediately close the other side (they share memory), but the other side might have pending operations, so g_xxxx_stream_close() will fail. > >> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, >> + "Stream is already closed"); >> + return -1; >> + } >> + >> + if (!self->peer->buffer) { >> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, >> + g_strerror(EAGAIN)); >> + return -1; >> + } >> + >> + g_return_val_if_fail(self->peer->buffer, -1); > > You just checked that self->peer->buffer is not NULL and returned an > error if it is. right, removed > >> + >> + count = MIN(self->peer->count, count); >> + memcpy(buffer, self->peer->buffer, count); >> + self->read = count; >> + self->peer->buffer = NULL; >> + >> + //g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count); >> + pipe_output_stream_check_source(self->peer); >> + >> + return count; >> +} >> + >> +static void >> +pipe_input_stream_check_source (PipeInputStream *self) >> +{ >> + if (self->source && !g_source_is_destroyed(self->source) && >> + g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self))) >> + g_source_set_ready_time(self->source, 0); > > g_source_set_ready_time API doc says "This API is only intended to be > used by implementations of GSource. Do not call this API on a GSource > that you did not create." Well, I used a "GConditionSource" in previous series, similar to the condition source code in gio-coroutine.c. I proposed it upstream (because I needed private fields for GPollable to work!), but I was told to use this API instead by Ryan, who also wrote that documentation! I guess he should modify it. >> +} >> + >> +static gboolean >> +pipe_input_stream_close (GInputStream *stream, >> + GCancellable *cancellable, >> + GError **error) >> +{ >> + PipeInputStream *self; >> + >> + self = PIPE_INPUT_STREAM(stream); >> + >> + if (self->peer) { >> + /* ignore any pending errors */ >> + self->peer->closed = TRUE; >> + g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL); >> + pipe_output_stream_check_source(self->peer); >> + } > > Why not set self->priv->closed as well? Because that field is for peer closing. I can rename it peer_closed perhaps? (peer->peer_closed...) > >> + >> + return TRUE; >> +} >> + >> +static void >> +pipe_input_stream_close_async (GInputStream *stream, >> + int io_priority, >> + GCancellable *cancellable, >> + GAsyncReadyCallback callback, >> + gpointer data) >> +{ >> + GTask *task; >> + >> + task = g_task_new (stream, cancellable, callback, data); >> + >> + /* will always return TRUE */ >> + pipe_input_stream_close (stream, cancellable, NULL); >> + >> + g_task_return_boolean (task, TRUE); >> + g_object_unref (task); >> +} > > g_input_stream_close_async() API doc says "The asyncronous methods have > a default fallback that uses threads to implement asynchronicity, so > they are optional for inheriting classes. However, if you override one > you must override all." I guess we should do without that one as this is > the only async operation you overrode. Yes, I am not sure what that means in practice. There are also default _async fallback for other methods, this might be outdated comment? > >> + >> +static gboolean >> +pipe_input_stream_close_finish (GInputStream *stream, >> + GAsyncResult *result, >> + GError **error) >> +{ >> + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); >> + >> + return g_task_propagate_boolean (G_TASK (result), error); >> +} >> + >> +static void >> +pipe_input_stream_init (PipeInputStream *self) >> +{ >> + self->read = -1; >> +} >> + >> +static void >> +pipe_input_stream_dispose(GObject *object) >> +{ >> + PipeInputStream *self; >> + >> + self = PIPE_INPUT_STREAM(object); >> + >> + if (self->peer) { >> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); >> + self->peer = NULL; >> + } >> + > > No cleanup of the 'source' private member? Clean-up added > >> + G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object); >> +} >> + >> +static void >> +pipe_input_stream_class_init (PipeInputStreamClass *klass) >> +{ >> + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); >> + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass); >> + >> + istream_class->read_fn = pipe_input_stream_read; >> + istream_class->close_fn = pipe_input_stream_close; >> + istream_class->close_async = pipe_input_stream_close_async; >> + istream_class->close_finish = pipe_input_stream_close_finish; >> + >> + gobject_class->dispose = pipe_input_stream_dispose; >> +} >> + >> +static gboolean >> +pipe_input_stream_is_readable (GPollableInputStream *stream) >> +{ >> + PipeInputStream *self = PIPE_INPUT_STREAM (stream); >> + gboolean readable; >> + >> + readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed; >> + //g_debug("readable %p %d", self->peer, readable); >> + >> + return readable; >> +} >> + >> +static GSource * >> +pipe_input_stream_create_source (GPollableInputStream *stream, >> + GCancellable *cancellable) >> +{ >> + PipeInputStream *self = PIPE_INPUT_STREAM(stream); >> + GSource *pollable_source; >> + >> + g_return_val_if_fail (self->source == NULL || >> + g_source_is_destroyed (self->source), NULL); >> + >> + if (self->source && g_source_is_destroyed (self->source)) >> + g_source_unref (self->source); >> + >> + pollable_source = g_pollable_source_new_full (self, NULL, cancellable); >> + self->source = g_source_ref (pollable_source); >> + >> + return pollable_source; >> +} >> + >> +static void >> +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) >> +{ >> + iface->is_readable = pipe_input_stream_is_readable; >> + iface->create_source = pipe_input_stream_create_source; >> +} >> + >> +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); >> + >> +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM, >> + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, >> + pipe_output_stream_pollable_iface_init)) >> + >> +static gssize >> +pipe_output_stream_write (GOutputStream *stream, >> + const void *buffer, >> + gsize count, >> + GCancellable *cancellable, >> + GError **error) >> +{ >> + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream); >> + PipeInputStream *peer = self->peer; >> + >> + //g_debug("write %p :%"G_GSIZE_FORMAT, stream, count); >> + if (g_output_stream_is_closed (stream) || self->closed) { >> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, >> + "Stream is already closed"); >> + return -1; >> + } >> + >> + /* this abuses pollable stream, writing sync would likely lead to >> + crashes, since the buffer pointer would become invalid, a >> + generic solution would need a copy.. >> + */ >> + g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1); >> + self->buffer = buffer; >> + self->count = count; >> + >> + pipe_input_stream_check_source(self->peer); > > This call will trigger a sync call to pipe_input_stream_read() if I > followed everything properly? A comment mentioning that might make the > code easier to follow. It's not sync, it will "schedule peer source". As said in the comment, this abuses pollable stream, since it keeps a pointer to the buffer and assumes the function will be resumed with the same data (there are preconditions checks for that) > >> + >> + if (peer->read < 0) { >> + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, >> + g_strerror (EAGAIN)); >> + return -1; >> + } >> + >> + g_assert(peer->read <= self->count); >> + count = peer->read; > > > My understanding is that here you don't want to assume that self->count > == peer->read... >> + >> + self->buffer = NULL; >> + self->count = 0; > > ... but here you make the assumption that everything was consumed by the > input stream it's just clearing internal state of the input data, the actual bytes written count is returned to writer. >> + peer->read = -1; >> + >> + return count; >> +} >> + >> +static void >> +pipe_output_stream_init (PipeOutputStream *stream) >> +{ >> +} >> + >> +static void >> +pipe_output_stream_dispose(GObject *object) >> +{ >> + PipeOutputStream *self; >> + >> + self = PIPE_OUTPUT_STREAM(object); >> + >> + if (self->peer) { >> + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); >> + self->peer = NULL; >> + } >> + > > Same question about the 'source' private member. > > Same comment about g_source_set_ready_time. > > Same comment as before about async methods > > same answers. -- Marc-André Lureau _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel