This allows to create a pipe between 2 GIOStream, the input side read from the peer output side, and vice-versa. In the following patches, this will avoid the socket communication to exchange with the embedded webdav server. --- gtk/Makefile.am | 2 + gtk/giopipe.c | 569 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ gtk/giopipe.h | 53 +++++ tests/Makefile.am | 2 + tests/pipe.c | 313 ++++++++++++++++++++++++++++++ 5 files changed, 939 insertions(+) create mode 100644 gtk/giopipe.c create mode 100644 gtk/giopipe.h create mode 100644 tests/pipe.c diff --git a/gtk/Makefile.am b/gtk/Makefile.am index 7728fec..678dd26 100644 --- a/gtk/Makefile.am +++ b/gtk/Makefile.am @@ -280,6 +280,8 @@ libspice_client_glib_2_0_la_SOURCES = \ $(USB_ACL_HELPER_SRCS) \ vmcstream.c \ vmcstream.h \ + giopipe.c \ + giopipe.h \ wocky-http-proxy.c \ wocky-http-proxy.h \ \ diff --git a/gtk/giopipe.c b/gtk/giopipe.c new file mode 100644 index 0000000..c62ba9f --- /dev/null +++ b/gtk/giopipe.c @@ -0,0 +1,569 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2015 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include <string.h> +#include <errno.h> + +#include "giopipe.h" + +#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ()) +#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream)) +#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) +#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM)) +#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM)) +#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass)) + +typedef struct _PipeInputStreamClass PipeInputStreamClass; +typedef struct _PipeInputStream PipeInputStream; +typedef struct _PipeOutputStream PipeOutputStream; + +struct _PipeInputStream +{ + GInputStream parent_instance; + + PipeOutputStream *peer; + gssize read; + + /* GIOstream:closed is protected against pending operations, so we + * use an additional close flag to cancel those when the peer is + * closing. + */ + gboolean closed; +}; + +struct _PipeInputStreamClass +{ + GInputStreamClass parent_class; +}; + +#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ()) +#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream)) +#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) +#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM)) +#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM)) +#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass)) + +typedef struct _PipeOutputStreamClass PipeOutputStreamClass; + +struct _PipeOutputStream +{ + GOutputStream parent_instance; + + PipeInputStream *peer; + const gchar *buffer; + gsize count; + gboolean closed; +}; + +struct _PipeOutputStreamClass +{ + GOutputStreamClass parent_class; +}; + +struct _SpicePipeStreamPrivate { + PipeInputStream *input_stream; + PipeOutputStream *output_stream; +}; + +typedef struct _ConditionSource +{ + GSource src; + + GSourceFunc condition; + gpointer data; +} ConditionSource; + +/* + * Call immediately before the main loop does an iteration. Returns + * true if the condition we're checking is ready for dispatch + */ +static gboolean +condition_prepare(GSource *source, int *timeout) +{ + ConditionSource *src = (ConditionSource *)source; + + *timeout = -1; + + return src->condition(src->data); +} + +/* + * Call immediately after the main loop does an iteration. Returns + * true if the condition we're checking is ready for dispatch + */ +static gboolean +condition_check(GSource *source) +{ + ConditionSource *src = (ConditionSource *)source; + + return src->condition(src->data); +} + +static gboolean +condition_dispatch(GSource *source G_GNUC_UNUSED, + GSourceFunc callback, + gpointer user_data) +{ + if (!callback) { + g_warning ("Condition source dispatched without callback\n" + "You must call g_source_set_callback()."); + return FALSE; + } + + return callback(user_data); +} + +static gboolean +condition_closure_callback (gpointer data) +{ + GClosure *closure = data; + GValue result_value = G_VALUE_INIT; + gboolean result; + + g_value_init (&result_value, G_TYPE_BOOLEAN); + + g_closure_invoke (closure, &result_value, 0, NULL, NULL); + + result = g_value_get_boolean (&result_value); + g_value_unset (&result_value); + + return result; +} + +GSourceFuncs conditionFuncs = { + .prepare = condition_prepare, + .check = condition_check, + .dispatch = condition_dispatch, + .closure_callback = condition_closure_callback +}; + +static GSource * +condition_source_new (GSourceFunc condition, gpointer data) +{ + GSource *source; + ConditionSource *src; + + source = g_source_new (&conditionFuncs, sizeof (ConditionSource)); + g_source_set_priority (source, G_PRIORITY_DEFAULT_IDLE); + + src = (ConditionSource *)source; + src->condition = condition; + src->data = data; + + return source; +} + +static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + pipe_input_stream_pollable_iface_init)) + +static gssize +pipe_input_stream_read (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + PipeInputStream *self = PIPE_INPUT_STREAM (stream); + + g_return_val_if_fail(count > 0, -1); + + if (g_input_stream_is_closed (stream) || self->closed) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + "Stream is already closed"); + return -1; + } + + if (!self->peer->buffer) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + g_strerror(EAGAIN)); + return -1; + } + + g_return_val_if_fail(self->peer->buffer, -1); + + count = MIN(self->peer->count, count); + memcpy(buffer, self->peer->buffer, count); + self->read = count; + self->peer->buffer = NULL; + + g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count); + + return count; +} + +static gboolean +pipe_input_stream_close (GInputStream *stream, + GCancellable *cancellable, + GError **error) +{ + PipeInputStream *self; + + self = PIPE_INPUT_STREAM(stream); + + if (self->peer) { + self->peer->closed = TRUE; + /* ignore any pending errors */ + g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL); + } + + return TRUE; +} + +static void +pipe_input_stream_close_async (GInputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data) +{ + GTask *task; + + task = g_task_new (stream, cancellable, callback, data); + + /* will always return TRUE */ + pipe_input_stream_close (stream, cancellable, NULL); + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +static gboolean +pipe_input_stream_close_finish (GInputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); +} + +static void +pipe_input_stream_init (PipeInputStream *self) +{ + self->read = -1; +} + +static void +pipe_input_stream_dispose(GObject *object) +{ + PipeInputStream *self; + + self = PIPE_INPUT_STREAM(object); + + if (self->peer) { + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); + self->peer = NULL; + } + + G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object); +} + +static void +pipe_input_stream_class_init (PipeInputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass); + + istream_class->read_fn = pipe_input_stream_read; + istream_class->close_fn = pipe_input_stream_close; + istream_class->close_async = pipe_input_stream_close_async; + istream_class->close_finish = pipe_input_stream_close_finish; + + gobject_class->dispose = pipe_input_stream_dispose; +} + +static gboolean +pipe_input_stream_is_readable (GPollableInputStream *stream) +{ + PipeInputStream *self = PIPE_INPUT_STREAM (stream); + gboolean readable; + + readable = (self->peer && self->peer->buffer && self->read == -1) || self->closed; + g_debug("readable %p %d", self->peer, readable); + + return readable; +} + +static GSource * +pipe_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + base_source = condition_source_new ((GSourceFunc)pipe_input_stream_is_readable, stream); + g_source_set_name(base_source, "pipe read condition"); + pollable_source = g_pollable_source_new_full (stream, base_source, + cancellable); + g_source_unref (base_source); + + return pollable_source; +} + +static void +pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->is_readable = pipe_input_stream_is_readable; + iface->create_source = pipe_input_stream_create_source; +} + +static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + pipe_output_stream_pollable_iface_init)) + +static gssize +pipe_output_stream_write (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream); + PipeInputStream *peer = self->peer; + + g_debug("write %p :%"G_GSIZE_FORMAT, stream, count); + if (g_output_stream_is_closed (stream) || self->closed) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + "Stream is already closed"); + return -1; + } + + /* this abuses pollable stream, writing sync would likely lead to + crashes, since the buffer pointer would become invalid, a + generic solution would need a copy.. + */ + g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1); + self->buffer = buffer; + self->count = count; + + if (peer->read < 0) { + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + g_strerror (EAGAIN)); + return -1; + } + + g_assert(peer->read <= self->count); + count = peer->read; + + self->buffer = NULL; + self->count = 0; + peer->read = -1; + + return count; +} + +static void +pipe_output_stream_init (PipeOutputStream *stream) +{ +} + +static void +pipe_output_stream_dispose(GObject *object) +{ + PipeOutputStream *self; + + self = PIPE_OUTPUT_STREAM(object); + + if (self->peer) { + g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer); + self->peer = NULL; + } + + G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object); +} + +static gboolean +pipe_output_stream_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + PipeOutputStream *self; + + self = PIPE_OUTPUT_STREAM(stream); + + if (self->peer) { + self->peer->closed = TRUE; + /* ignore any pending errors */ + g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL); + } + + return TRUE; +} + +static void +pipe_output_stream_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data) +{ + GTask *task; + + task = g_task_new (stream, cancellable, callback, data); + + /* will always return TRUE */ + pipe_output_stream_close (stream, cancellable, NULL); + + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +static gboolean +pipe_output_stream_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); +} + + +static void +pipe_output_stream_class_init (PipeOutputStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass); + + ostream_class->write_fn = pipe_output_stream_write; + ostream_class->close_fn = pipe_output_stream_close; + ostream_class->close_async = pipe_output_stream_close_async; + ostream_class->close_finish = pipe_output_stream_close_finish; + + gobject_class->dispose = pipe_output_stream_dispose; +} + +static gboolean +pipe_output_stream_is_writable (GPollableOutputStream *stream) +{ + PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream); + gboolean writable; + + writable = self->buffer == NULL || self->peer->read >= 0; + g_debug("writable %p %d", self, writable); + + return writable; +} + +static GSource * +pipe_output_stream_create_source (GPollableOutputStream *self, + GCancellable *cancellable) +{ + GSource *base_source, *pollable_source; + + base_source = condition_source_new ((GSourceFunc)pipe_output_stream_is_writable, self); + g_source_set_name(base_source, "pipe write condition"); + pollable_source = g_pollable_source_new_full (self, base_source, cancellable); + g_source_unref (base_source); + + return pollable_source; +} + +static void +pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface) +{ + iface->is_writable = pipe_output_stream_is_writable; + iface->create_source = pipe_output_stream_create_source; +} + +G_DEFINE_TYPE_WITH_PRIVATE(SpicePipeStream, spice_pipe_stream, G_TYPE_IO_STREAM) + +static GInputStream * +get_input_stream (GIOStream *io_stream) +{ + SpicePipeStream *self = (SpicePipeStream *) io_stream; + + return (GInputStream *)self->priv->input_stream; +} + +static GOutputStream * +get_output_stream (GIOStream *io_stream) +{ + SpicePipeStream *self = (SpicePipeStream *) io_stream; + + return (GOutputStream *)self->priv->output_stream; +} + +static void +spice_pipe_stream_init(SpicePipeStream *pipe) +{ + pipe->priv = spice_pipe_stream_get_instance_private (pipe); + + pipe->priv->input_stream = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL); + pipe->priv->output_stream = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL); +} + +static void +spice_pipe_set_peer(SpicePipeStream *pipe, SpicePipeStream *peer) +{ + pipe->priv->output_stream->peer = peer->priv->input_stream; + g_object_add_weak_pointer(G_OBJECT(pipe->priv->output_stream->peer), + (gpointer*)&pipe->priv->output_stream->peer); + pipe->priv->input_stream->peer = peer->priv->output_stream; + g_object_add_weak_pointer(G_OBJECT(pipe->priv->input_stream->peer), + (gpointer*)&pipe->priv->input_stream->peer); +} + +static void +spice_pipe_stream_finalize(GObject *object) +{ + SpicePipeStream *pipe; + + pipe = SPICE_PIPE_STREAM(object); + + g_clear_object(&pipe->priv->input_stream); + g_clear_object(&pipe->priv->output_stream); + + G_OBJECT_CLASS(spice_pipe_stream_parent_class)->finalize (object); +} + +static void +spice_pipe_stream_class_init(SpicePipeStreamClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GIOStreamClass *io_class = G_IO_STREAM_CLASS (klass); + + io_class->get_input_stream = get_input_stream; + io_class->get_output_stream = get_output_stream; + + gobject_class->finalize = spice_pipe_stream_finalize; +} + +G_GNUC_INTERNAL void +spice_make_pipe(GIOStream **p1, GIOStream **p2) +{ + SpicePipeStream *a, *b; + + g_return_if_fail(p1 != NULL); + g_return_if_fail(p2 != NULL); + g_return_if_fail(*p1 == NULL); + g_return_if_fail(*p2 == NULL); + + a = g_object_new(SPICE_TYPE_PIPE_STREAM, NULL); + b = g_object_new(SPICE_TYPE_PIPE_STREAM, NULL); + + spice_pipe_set_peer(a, b); + spice_pipe_set_peer(b, a); + + *p1 = G_IO_STREAM(a); + *p2 = G_IO_STREAM(b); +} diff --git a/gtk/giopipe.h b/gtk/giopipe.h new file mode 100644 index 0000000..e8ce296 --- /dev/null +++ b/gtk/giopipe.h @@ -0,0 +1,53 @@ +/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + Copyright (C) 2015 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ +#ifndef __SPICE_GIO_PIPE_H__ +#define __SPICE_GIO_PIPE_H__ + +#include <gio/gio.h> + +G_BEGIN_DECLS + +#define SPICE_TYPE_PIPE_STREAM (spice_pipe_stream_get_type ()) +#define SPICE_PIPE_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SPICE_TYPE_PIPE_STREAM, SpicePipeStream)) +#define SPICE_PIPE_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SPICE_TYPE_PIPE_STREAM, SpicePipeStreamClass)) +#define SPICE_IS_PIPE_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SPICE_TYPE_PIPE_STREAM)) +#define SPICE_IS_PIPE_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SPICE_TYPE_PIPE_STREAM)) +#define SPICE_PIPE_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SPICE_TYPE_PIPE_STREAM, SpicePipeStreamClass)) + +typedef struct _SpicePipeStreamPrivate SpicePipeStreamPrivate; +typedef struct _SpicePipeStreamClass SpicePipeStreamClass; +typedef struct _SpicePipeStream SpicePipeStream; + +struct _SpicePipeStream +{ + GIOStream parent_instance; + + /*< private >*/ + SpicePipeStreamPrivate *priv; +}; + +struct _SpicePipeStreamClass +{ + GIOStreamClass parent_class; +}; + +void spice_make_pipe(GIOStream **p1, GIOStream **p2); + +G_END_DECLS + +#endif /* __SPICE_GIO_PIPE_H__ */ diff --git a/tests/Makefile.am b/tests/Makefile.am index b236b12..5e57aa6 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -3,6 +3,7 @@ NULL = noinst_PROGRAMS = \ coroutine \ util \ + pipe \ $(NULL) TESTS = $(noinst_PROGRAMS) @@ -20,5 +21,6 @@ LDADD = \ util_SOURCES = util.c coroutine_SOURCES = coroutine.c +pipe_SOURCES = pipe.c -include $(top_srcdir)/git.mk diff --git a/tests/pipe.c b/tests/pipe.c new file mode 100644 index 0000000..841cb77 --- /dev/null +++ b/tests/pipe.c @@ -0,0 +1,313 @@ +#include <glib.h> +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <locale.h> + +#include "giopipe.h" + +typedef struct _Fixture { + GIOStream *p1; + GIOStream *p2; + + GInputStream *ip1; + GOutputStream *op1; + GInputStream *ip2; + GOutputStream *op2; + + gchar buf[16]; + + GMainLoop *loop; + GCancellable *cancellable; + guint timeout; +} Fixture; + +static gboolean +stop_loop (gpointer data) +{ + GMainLoop *loop = data; + + g_main_loop_quit (loop); + g_assert_not_reached(); + + return G_SOURCE_REMOVE; +} + +static void +fixture_set_up(Fixture *fixture, + gconstpointer user_data) +{ + int i; + + spice_make_pipe(&fixture->p1, &fixture->p2); + g_assert_true(G_IS_IO_STREAM(fixture->p1)); + g_assert_true(G_IS_IO_STREAM(fixture->p2)); + + fixture->op1 = g_io_stream_get_output_stream(fixture->p1); + g_assert_true(G_IS_OUTPUT_STREAM(fixture->op1)); + fixture->ip1 = g_io_stream_get_input_stream(fixture->p1); + g_assert_true(G_IS_INPUT_STREAM(fixture->ip1)); + fixture->op2 = g_io_stream_get_output_stream(fixture->p2); + g_assert_true(G_IS_OUTPUT_STREAM(fixture->op2)); + fixture->ip2 = g_io_stream_get_input_stream(fixture->p2); + g_assert_true(G_IS_INPUT_STREAM(fixture->ip2)); + + for (i = 0; i < sizeof(fixture->buf); i++) { + fixture->buf[i] = 0x42 + i; + } + + fixture->cancellable = g_cancellable_new(); + fixture->loop = g_main_loop_new (NULL, FALSE); + fixture->timeout = g_timeout_add (1000, stop_loop, fixture->loop); +} + +static void +fixture_tear_down(Fixture *fixture, + gconstpointer user_data) +{ + g_clear_object(&fixture->p1); + g_clear_object(&fixture->p2); + + g_clear_object(&fixture->cancellable); + g_source_remove(fixture->timeout); + g_main_loop_unref(fixture->loop); +} + +static void +test_pipe_readblock(Fixture *f, gconstpointer user_data) +{ + GError *error = NULL; + gssize size; + + size = g_input_stream_read(f->ip2, f->buf, 1, + f->cancellable, &error); + + g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); + + g_clear_error(&error); +} + +static void +test_pipe_writeblock(Fixture *f, gconstpointer user_data) +{ + GError *error = NULL; + gssize size; + + size = g_output_stream_write(f->op1, "", 1, + f->cancellable, &error); + + g_assert_error(error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); + + g_clear_error(&error); +} + +static void +write_cb(GObject *source, GAsyncResult *result, gpointer user_data) +{ + GError *error = NULL; + GMainLoop *loop = user_data; + gssize nbytes; + + nbytes = g_output_stream_write_finish(G_OUTPUT_STREAM(source), result, &error); + + g_assert_no_error(error); + g_assert_cmpint(nbytes, >, 0); + g_clear_error(&error); + + g_main_loop_quit (loop); +} + +static void +read_cb(GObject *source, GAsyncResult *result, gpointer user_data) +{ + GError *error = NULL; + gssize nbytes, expected = GPOINTER_TO_INT(user_data); + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error); + + g_assert_cmpint(nbytes, ==, expected); + g_assert_no_error(error); + g_clear_error(&error); +} + +static void +test_pipe_writeread(Fixture *f, gconstpointer user_data) +{ + g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT, + f->cancellable, write_cb, f->loop); + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, read_cb, GINT_TO_POINTER(1)); + + g_main_loop_run (f->loop); + + g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT, + f->cancellable, write_cb, f->loop); + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, read_cb, GINT_TO_POINTER(1)); + + g_main_loop_run (f->loop); +} + +static void +test_pipe_readwrite(Fixture *f, gconstpointer user_data) +{ + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, read_cb, GINT_TO_POINTER(1)); + g_output_stream_write_async(f->op1, "", 1, G_PRIORITY_DEFAULT, + f->cancellable, write_cb, f->loop); + + g_main_loop_run (f->loop); +} + +static void +read8_cb(GObject *source, GAsyncResult *result, gpointer user_data) +{ + GError *error = NULL; + gssize nbytes; + GMainLoop *loop = user_data; + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error); + + g_assert_cmpint(nbytes, ==, 8); + g_assert_no_error(error); + g_clear_error(&error); +} + +static void +test_pipe_write16read8(Fixture *f, gconstpointer user_data) +{ + g_output_stream_write_async(f->op1, "0123456789abcdef", 16, G_PRIORITY_DEFAULT, + f->cancellable, write_cb, f->loop); + g_input_stream_read_async(f->ip2, f->buf, 8, G_PRIORITY_DEFAULT, + f->cancellable, read8_cb, GINT_TO_POINTER(8)); + + g_main_loop_run (f->loop); + + /* check next read would block */ + test_pipe_readblock(f, user_data); +} + +static void +test_pipe_write8read16(Fixture *f, gconstpointer user_data) +{ + g_output_stream_write_async(f->op1, "01234567", 8, G_PRIORITY_DEFAULT, + f->cancellable, write_cb, f->loop); + g_input_stream_read_async(f->ip2, f->buf, 16, G_PRIORITY_DEFAULT, + f->cancellable, read8_cb, GINT_TO_POINTER(8)); + + g_main_loop_run (f->loop); + + /* check next read would block */ + test_pipe_writeblock(f, user_data); +} + +static void +readclose_cb(GObject *source, GAsyncResult *result, gpointer user_data) +{ + GError *error = NULL; + gssize nbytes; + GMainLoop *loop = user_data; + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error); + + g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED); + g_clear_error(&error); + + g_main_loop_quit (loop); +} + +static void +test_pipe_readclosestream(Fixture *f, gconstpointer user_data) +{ + GError *error = NULL; + + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, readclose_cb, f->loop); + g_io_stream_close(f->p1, f->cancellable, &error); + + g_main_loop_run (f->loop); +} + +static void +test_pipe_readclose(Fixture *f, gconstpointer user_data) +{ + GError *error = NULL; + + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, readclose_cb, f->loop); + g_output_stream_close(f->op1, f->cancellable, &error); + + g_main_loop_run (f->loop); +} + +static void +readcancel_cb(GObject *source, GAsyncResult *result, gpointer user_data) +{ + GError *error = NULL; + gssize nbytes; + GMainLoop *loop = user_data; + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(source), result, &error); + + g_assert_error(error, G_IO_ERROR, G_IO_ERROR_CLOSED); + g_clear_error(&error); + + g_main_loop_quit (loop); +} + +static void +test_pipe_readcancel(Fixture *f, gconstpointer user_data) +{ + GError *error = NULL; + + g_input_stream_read_async(f->ip2, f->buf, 1, G_PRIORITY_DEFAULT, + f->cancellable, readcancel_cb, f->loop); + g_output_stream_close(f->op1, f->cancellable, &error); + + g_main_loop_run (f->loop); +} + +int main(int argc, char* argv[]) +{ + setlocale(LC_ALL, ""); + + g_test_init(&argc, &argv, NULL); + + g_test_add("/pipe/readblock", Fixture, NULL, + fixture_set_up, test_pipe_readblock, + fixture_tear_down); + + g_test_add("/pipe/writeblock", Fixture, NULL, + fixture_set_up, test_pipe_writeblock, + fixture_tear_down); + + g_test_add("/pipe/writeread", Fixture, NULL, + fixture_set_up, test_pipe_writeread, + fixture_tear_down); + + g_test_add("/pipe/readwrite", Fixture, NULL, + fixture_set_up, test_pipe_readwrite, + fixture_tear_down); + + g_test_add("/pipe/write16read8", Fixture, NULL, + fixture_set_up, test_pipe_write16read8, + fixture_tear_down); + + g_test_add("/pipe/write8read16", Fixture, NULL, + fixture_set_up, test_pipe_write8read16, + fixture_tear_down); + + g_test_add("/pipe/readclosestream", Fixture, NULL, + fixture_set_up, test_pipe_readclosestream, + fixture_tear_down); + + g_test_add("/pipe/readclose", Fixture, NULL, + fixture_set_up, test_pipe_readclose, + fixture_tear_down); + + g_test_add("/pipe/readcancel", Fixture, NULL, + fixture_set_up, test_pipe_readcancel, + fixture_tear_down); + + return g_test_run(); +} -- 2.1.0 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel