OutputQueue is a self-contained unit and as such can be put in a separate file to make the spice-webdavd.c less cluttered. Also, as the current implementation defines output_queue_{ref, unref}, turn OutputQueue into a GObject which can handle these for us. Signed-off-by: Jakub Janků <jjanku@xxxxxxxxxx> --- spice/meson.build | 8 ++- spice/output-queue.c | 164 ++++++++++++++++++++++++++++++++++++++++++ spice/output-queue.h | 38 ++++++++++ spice/spice-webdavd.c | 162 ++--------------------------------------- 4 files changed, 214 insertions(+), 158 deletions(-) create mode 100644 spice/output-queue.c create mode 100644 spice/output-queue.h diff --git a/spice/meson.build b/spice/meson.build index 6db22cc..06d20e6 100644 --- a/spice/meson.build +++ b/spice/meson.build @@ -4,9 +4,15 @@ if host_machine.system() == 'windows' win32_deps += compiler.find_library('mpr') endif +sources = [ + 'spice-webdavd.c', + 'output-queue.c', + 'output-queue.h' +] + executable( 'spice-webdavd', - [ 'spice-webdavd.c' ], + sources, install_dir : sbindir, include_directories : incdir, dependencies : win32_deps + avahi_deps + deps, diff --git a/spice/output-queue.c b/spice/output-queue.c new file mode 100644 index 0000000..6991493 --- /dev/null +++ b/spice/output-queue.c @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2019 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see <http://www.gnu.org/licenses/>. + */ +#include <config.h> + +#include "output-queue.h" + +typedef struct _OutputQueueElem +{ + OutputQueue *queue; + const guint8 *buf; + gsize size; + PushedCb cb; + gpointer user_data; +} OutputQueueElem; + +struct _OutputQueue +{ + GObject parent_instance; + GOutputStream *output; + gboolean flushing; + guint idle_id; + GQueue *queue; + GCancellable *cancel; +}; + +G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT); + +static void output_queue_init(OutputQueue *self) +{ + self->queue = g_queue_new (); +} + +static void output_queue_finalize(GObject *obj) +{ + OutputQueue *self = OUTPUT_QUEUE(obj); + + g_warn_if_fail (g_queue_get_length (self->queue) == 0); + g_warn_if_fail (!self->flushing); + g_warn_if_fail (!self->idle_id); + + g_queue_free_full (self->queue, g_free); + g_object_unref (self->output); + g_object_unref (self->cancel); + + G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj); +} + +static void output_queue_class_init(OutputQueueClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS(klass); + gobject_class->finalize = output_queue_finalize; +} + +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel) +{ + OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL); + self->output = g_object_ref (output); + self->cancel = g_object_ref (cancel); + return self; +} + +static gboolean output_queue_idle (gpointer user_data); + +static void +output_queue_flush_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GError *error = NULL; + OutputQueueElem *e = user_data; + OutputQueue *q = e->queue; + + g_debug ("flushed"); + q->flushing = FALSE; + g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), + res, &error); + if (error) + g_warning ("error: %s", error->message); + + g_clear_error (&error); + + if (!q->idle_id) + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q)); + + g_free (e); + g_object_unref (q); +} + +static gboolean +output_queue_idle (gpointer user_data) +{ + OutputQueue *q = user_data; + OutputQueueElem *e = NULL; + GError *error = NULL; + + if (q->flushing) + { + g_debug ("already flushing"); + goto end; + } + + e = g_queue_pop_head (q->queue); + if (!e) + { + g_debug ("No more data to flush"); + goto end; + } + + g_debug ("flushing %" G_GSIZE_FORMAT, e->size); + g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error); + if (e->cb) + e->cb (q, e->user_data, error); + + if (error) + goto end; + + q->flushing = TRUE; + g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e); + + q->idle_id = 0; + return FALSE; + +end: + g_clear_error (&error); + q->idle_id = 0; + g_free (e); + g_object_unref (q); + + return FALSE; +} + +void +output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, + PushedCb pushed_cb, gpointer user_data) +{ + OutputQueueElem *e; + + g_return_if_fail (q != NULL); + + e = g_new (OutputQueueElem, 1); + e->buf = buf; + e->size = size; + e->cb = pushed_cb; + e->user_data = user_data; + e->queue = q; + g_queue_push_tail (q->queue, e); + + if (!q->idle_id && !q->flushing) + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q)); +} diff --git a/spice/output-queue.h b/spice/output-queue.h new file mode 100644 index 0000000..ab8f6eb --- /dev/null +++ b/spice/output-queue.h @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2019 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __OUTPUT_QUEUE_H +#define __OUTPUT_QUEUE_H + +#include <gio/gio.h> +#include <glib-object.h> + +G_BEGIN_DECLS + +#define OUTPUT_TYPE_QUEUE output_queue_get_type() +G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject); + +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel); + +typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error); + +void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, + PushedCb pushed_cb, gpointer user_data); + +G_END_DECLS + +#endif diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c index f2c7f07..84ab770 100644 --- a/spice/spice-webdavd.c +++ b/spice/spice-webdavd.c @@ -39,25 +39,7 @@ #include <avahi-gobject/ga-entry-group.h> #endif -typedef struct _OutputQueue -{ - guint refs; - GOutputStream *output; - gboolean flushing; - guint idle_id; - GQueue *queue; -} OutputQueue; - -typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error); - -typedef struct _OutputQueueElem -{ - OutputQueue *queue; - const guint8 *buf; - gsize size; - PushedCb cb; - gpointer user_data; -} OutputQueueElem; +#include "output-queue.h" typedef struct _ServiceData { @@ -69,139 +51,6 @@ typedef struct _ServiceData static GCancellable *cancel; -static OutputQueue* -output_queue_new (GOutputStream *output) -{ - OutputQueue *queue = g_new0 (OutputQueue, 1); - - queue->output = g_object_ref (output); - queue->queue = g_queue_new (); - queue->refs = 1; - - return queue; -} - -static -void -output_queue_free (OutputQueue *queue) -{ - g_warn_if_fail (g_queue_get_length (queue->queue) == 0); - g_warn_if_fail (!queue->flushing); - g_warn_if_fail (!queue->idle_id); - - g_queue_free_full (queue->queue, g_free); - g_clear_object (&queue->output); - g_free (queue); -} - -static OutputQueue* -output_queue_ref (OutputQueue *q) -{ - q->refs++; - return q; -} - -static void -output_queue_unref (OutputQueue *q) -{ - g_return_if_fail (q != NULL); - - q->refs--; - if (q->refs == 0) - output_queue_free (q); -} - -static gboolean output_queue_idle (gpointer user_data); - -static void -output_queue_flush_cb (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GError *error = NULL; - OutputQueueElem *e = user_data; - OutputQueue *q = e->queue; - - g_debug ("flushed"); - q->flushing = FALSE; - g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), - res, &error); - if (error) - g_warning ("error: %s", error->message); - - g_clear_error (&error); - - if (!q->idle_id) - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q)); - - g_free (e); - output_queue_unref (q); -} - -static gboolean -output_queue_idle (gpointer user_data) -{ - OutputQueue *q = user_data; - OutputQueueElem *e = NULL; - GError *error = NULL; - - if (q->flushing) - { - g_debug ("already flushing"); - goto end; - } - - e = g_queue_pop_head (q->queue); - if (!e) - { - g_debug ("No more data to flush"); - goto end; - } - - g_debug ("flushing %" G_GSIZE_FORMAT, e->size); - g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error); - if (e->cb) - e->cb (q, e->user_data, error); - - if (error) - goto end; - - q->flushing = TRUE; - g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e); - - q->idle_id = 0; - return FALSE; - -end: - g_clear_error (&error); - q->idle_id = 0; - g_free (e); - output_queue_unref (q); - - return FALSE; -} - -static void -output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, - PushedCb pushed_cb, gpointer user_data) -{ - OutputQueueElem *e; - - g_return_if_fail (q != NULL); - - e = g_new (OutputQueueElem, 1); - e->buf = buf; - e->size = size; - e->cb = pushed_cb; - e->user_data = user_data; - e->queue = q; - g_queue_push_tail (q->queue, e); - - if (!q->idle_id && !q->flushing) - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q)); -} - - static struct _DemuxData { gint64 client; @@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection) client->client_connection = g_object_ref (client_connection); // TODO: check if usage of this idiom is portable, or if we need to check collisions client->id = GPOINTER_TO_INT (client_connection); - client->queue = output_queue_new (bostream); + client->queue = output_queue_new (bostream, cancel); g_object_unref (bostream); g_hash_table_insert (clients, &client->id, client); @@ -280,7 +129,7 @@ client_free (Client *c) g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL); g_object_unref (c->client_connection); - output_queue_unref (c->queue); + g_object_unref (c->queue); g_free (c); } @@ -732,7 +581,7 @@ open_mux_path (const char *path) mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE)); #endif - mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream)); + mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel); } #ifdef G_OS_WIN32 @@ -1002,12 +851,11 @@ run_service (ServiceData *service_data) g_clear_object (&mux_istream); g_clear_object (&mux_ostream); - output_queue_unref (mux_queue); + g_clear_object (&mux_queue); g_hash_table_unref (clients); g_socket_service_stop (socket_service); - mux_queue = NULL; g_clear_object (&cancel); #ifdef G_OS_WIN32 -- 2.21.0 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel