Hi On Thu, May 23, 2019 at 10:37 AM Jakub Janků <jjanku@xxxxxxxxxx> wrote: > > OutputQueue is a self-contained unit and as such can be put in > a separate file to make the spice-webdavd.c less cluttered. > > Also, as the current implementation defines output_queue_{ref, unref}, > turn OutputQueue into a GObject which can handle these for us. > > Signed-off-by: Jakub Janků <jjanku@xxxxxxxxxx> ack in principle, minor coding style issues The phodav source code tries to follow glib code style. Can you indent accordingly? > --- > spice/meson.build | 8 ++- > spice/output-queue.c | 164 ++++++++++++++++++++++++++++++++++++++++++ > spice/output-queue.h | 38 ++++++++++ > spice/spice-webdavd.c | 162 ++--------------------------------------- > 4 files changed, 214 insertions(+), 158 deletions(-) > create mode 100644 spice/output-queue.c > create mode 100644 spice/output-queue.h > > diff --git a/spice/meson.build b/spice/meson.build > index 6db22cc..06d20e6 100644 > --- a/spice/meson.build > +++ b/spice/meson.build > @@ -4,9 +4,15 @@ if host_machine.system() == 'windows' > win32_deps += compiler.find_library('mpr') > endif > > +sources = [ > + 'spice-webdavd.c', > + 'output-queue.c', > + 'output-queue.h' > +] > + > executable( > 'spice-webdavd', > - [ 'spice-webdavd.c' ], > + sources, > install_dir : sbindir, > include_directories : incdir, > dependencies : win32_deps + avahi_deps + deps, > diff --git a/spice/output-queue.c b/spice/output-queue.c > new file mode 100644 > index 0000000..6991493 > --- /dev/null > +++ b/spice/output-queue.c > @@ -0,0 +1,164 @@ > +/* > + * Copyright (C) 2019 Red Hat, Inc. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, see <http://www.gnu.org/licenses/>. > + */ > +#include <config.h> > + > +#include "output-queue.h" > + > +typedef struct _OutputQueueElem > +{ > + OutputQueue *queue; > + const guint8 *buf; > + gsize size; > + PushedCb cb; > + gpointer user_data; > +} OutputQueueElem; > + > +struct _OutputQueue > +{ > + GObject parent_instance; > + GOutputStream *output; > + gboolean flushing; > + guint idle_id; > + GQueue *queue; > + GCancellable *cancel; > +}; > + > +G_DEFINE_TYPE(OutputQueue, output_queue, G_TYPE_OBJECT); > + > +static void output_queue_init(OutputQueue *self) > +{ > + self->queue = g_queue_new (); > +} > + > +static void output_queue_finalize(GObject *obj) > +{ > + OutputQueue *self = OUTPUT_QUEUE(obj); > + > + g_warn_if_fail (g_queue_get_length (self->queue) == 0); > + g_warn_if_fail (!self->flushing); > + g_warn_if_fail (!self->idle_id); > + > + g_queue_free_full (self->queue, g_free); > + g_object_unref (self->output); > + g_object_unref (self->cancel); > + > + G_OBJECT_CLASS(output_queue_parent_class)->finalize(obj); > +} > + > +static void output_queue_class_init(OutputQueueClass *klass) > +{ > + GObjectClass *gobject_class = G_OBJECT_CLASS(klass); > + gobject_class->finalize = output_queue_finalize; > +} > + > +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel) > +{ > + OutputQueue *self = g_object_new(OUTPUT_TYPE_QUEUE, NULL); > + self->output = g_object_ref (output); > + self->cancel = g_object_ref (cancel); > + return self; > +} > + > +static gboolean output_queue_idle (gpointer user_data); > + > +static void > +output_queue_flush_cb (GObject *source_object, > + GAsyncResult *res, > + gpointer user_data) > +{ > + GError *error = NULL; > + OutputQueueElem *e = user_data; > + OutputQueue *q = e->queue; > + > + g_debug ("flushed"); > + q->flushing = FALSE; > + g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), > + res, &error); > + if (error) > + g_warning ("error: %s", error->message); > + > + g_clear_error (&error); > + > + if (!q->idle_id) > + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q)); > + > + g_free (e); > + g_object_unref (q); > +} > + > +static gboolean > +output_queue_idle (gpointer user_data) > +{ > + OutputQueue *q = user_data; > + OutputQueueElem *e = NULL; > + GError *error = NULL; > + > + if (q->flushing) > + { > + g_debug ("already flushing"); > + goto end; > + } > + > + e = g_queue_pop_head (q->queue); > + if (!e) > + { > + g_debug ("No more data to flush"); > + goto end; > + } > + > + g_debug ("flushing %" G_GSIZE_FORMAT, e->size); > + g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error); > + if (e->cb) > + e->cb (q, e->user_data, error); > + > + if (error) > + goto end; > + > + q->flushing = TRUE; > + g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e); > + > + q->idle_id = 0; > + return FALSE; > + > +end: > + g_clear_error (&error); > + q->idle_id = 0; > + g_free (e); > + g_object_unref (q); > + > + return FALSE; > +} > + > +void > +output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, > + PushedCb pushed_cb, gpointer user_data) > +{ > + OutputQueueElem *e; > + > + g_return_if_fail (q != NULL); > + > + e = g_new (OutputQueueElem, 1); > + e->buf = buf; > + e->size = size; > + e->cb = pushed_cb; > + e->user_data = user_data; > + e->queue = q; > + g_queue_push_tail (q->queue, e); > + > + if (!q->idle_id && !q->flushing) > + q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q)); > +} > diff --git a/spice/output-queue.h b/spice/output-queue.h > new file mode 100644 > index 0000000..ab8f6eb > --- /dev/null > +++ b/spice/output-queue.h > @@ -0,0 +1,38 @@ > +/* > + * Copyright (C) 2019 Red Hat, Inc. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, see <http://www.gnu.org/licenses/>. > + */ > + > +#ifndef __OUTPUT_QUEUE_H > +#define __OUTPUT_QUEUE_H > + > +#include <gio/gio.h> > +#include <glib-object.h> > + > +G_BEGIN_DECLS > + > +#define OUTPUT_TYPE_QUEUE output_queue_get_type() > +G_DECLARE_FINAL_TYPE(OutputQueue, output_queue, OUTPUT, QUEUE, GObject); > + > +OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel); > + > +typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error); > + > +void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, > + PushedCb pushed_cb, gpointer user_data); > + > +G_END_DECLS > + > +#endif > diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c > index f2c7f07..84ab770 100644 > --- a/spice/spice-webdavd.c > +++ b/spice/spice-webdavd.c > @@ -39,25 +39,7 @@ > #include <avahi-gobject/ga-entry-group.h> > #endif > > -typedef struct _OutputQueue > -{ > - guint refs; > - GOutputStream *output; > - gboolean flushing; > - guint idle_id; > - GQueue *queue; > -} OutputQueue; > - > -typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error); > - > -typedef struct _OutputQueueElem > -{ > - OutputQueue *queue; > - const guint8 *buf; > - gsize size; > - PushedCb cb; > - gpointer user_data; > -} OutputQueueElem; > +#include "output-queue.h" > > typedef struct _ServiceData > { > @@ -69,139 +51,6 @@ typedef struct _ServiceData > > static GCancellable *cancel; > > -static OutputQueue* > -output_queue_new (GOutputStream *output) > -{ > - OutputQueue *queue = g_new0 (OutputQueue, 1); > - > - queue->output = g_object_ref (output); > - queue->queue = g_queue_new (); > - queue->refs = 1; > - > - return queue; > -} > - > -static > -void > -output_queue_free (OutputQueue *queue) > -{ > - g_warn_if_fail (g_queue_get_length (queue->queue) == 0); > - g_warn_if_fail (!queue->flushing); > - g_warn_if_fail (!queue->idle_id); > - > - g_queue_free_full (queue->queue, g_free); > - g_clear_object (&queue->output); > - g_free (queue); > -} > - > -static OutputQueue* > -output_queue_ref (OutputQueue *q) > -{ > - q->refs++; > - return q; > -} > - > -static void > -output_queue_unref (OutputQueue *q) > -{ > - g_return_if_fail (q != NULL); > - > - q->refs--; > - if (q->refs == 0) > - output_queue_free (q); > -} > - > -static gboolean output_queue_idle (gpointer user_data); > - > -static void > -output_queue_flush_cb (GObject *source_object, > - GAsyncResult *res, > - gpointer user_data) > -{ > - GError *error = NULL; > - OutputQueueElem *e = user_data; > - OutputQueue *q = e->queue; > - > - g_debug ("flushed"); > - q->flushing = FALSE; > - g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), > - res, &error); > - if (error) > - g_warning ("error: %s", error->message); > - > - g_clear_error (&error); > - > - if (!q->idle_id) > - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q)); > - > - g_free (e); > - output_queue_unref (q); > -} > - > -static gboolean > -output_queue_idle (gpointer user_data) > -{ > - OutputQueue *q = user_data; > - OutputQueueElem *e = NULL; > - GError *error = NULL; > - > - if (q->flushing) > - { > - g_debug ("already flushing"); > - goto end; > - } > - > - e = g_queue_pop_head (q->queue); > - if (!e) > - { > - g_debug ("No more data to flush"); > - goto end; > - } > - > - g_debug ("flushing %" G_GSIZE_FORMAT, e->size); > - g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error); > - if (e->cb) > - e->cb (q, e->user_data, error); > - > - if (error) > - goto end; > - > - q->flushing = TRUE; > - g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e); > - > - q->idle_id = 0; > - return FALSE; > - > -end: > - g_clear_error (&error); > - q->idle_id = 0; > - g_free (e); > - output_queue_unref (q); > - > - return FALSE; > -} > - > -static void > -output_queue_push (OutputQueue *q, const guint8 *buf, gsize size, > - PushedCb pushed_cb, gpointer user_data) > -{ > - OutputQueueElem *e; > - > - g_return_if_fail (q != NULL); > - > - e = g_new (OutputQueueElem, 1); > - e->buf = buf; > - e->size = size; > - e->cb = pushed_cb; > - e->user_data = user_data; > - e->queue = q; > - g_queue_push_tail (q->queue, e); > - > - if (!q->idle_id && !q->flushing) > - q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q)); > -} > - > - > static struct _DemuxData > { > gint64 client; > @@ -264,7 +113,7 @@ add_client (GSocketConnection *client_connection) > client->client_connection = g_object_ref (client_connection); > // TODO: check if usage of this idiom is portable, or if we need to check collisions > client->id = GPOINTER_TO_INT (client_connection); > - client->queue = output_queue_new (bostream); > + client->queue = output_queue_new (bostream, cancel); > g_object_unref (bostream); > > g_hash_table_insert (clients, &client->id, client); > @@ -280,7 +129,7 @@ client_free (Client *c) > > g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL); > g_object_unref (c->client_connection); > - output_queue_unref (c->queue); > + g_object_unref (c->queue); > g_free (c); > } > > @@ -732,7 +581,7 @@ open_mux_path (const char *path) > mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE)); > #endif > > - mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream)); > + mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel); > } > > #ifdef G_OS_WIN32 > @@ -1002,12 +851,11 @@ run_service (ServiceData *service_data) > g_clear_object (&mux_istream); > g_clear_object (&mux_ostream); > > - output_queue_unref (mux_queue); > + g_clear_object (&mux_queue); > g_hash_table_unref (clients); > > g_socket_service_stop (socket_service); > > - mux_queue = NULL; > g_clear_object (&cancel); > > #ifdef G_OS_WIN32 > -- > 2.21.0 > > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > https://lists.freedesktop.org/mailman/listinfo/spice-devel -- Marc-André Lureau _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel