On Mon, Nov 28, 2011 at 01:13:45PM +0000, Daniel P. Berrange wrote: > From: "Daniel P. Berrange" <berrange@xxxxxxxxxx> > > The GIO GInputStream/GOutputStream async model for I/O does not > work for working with non-blocking bi-directional streams. To > allow that to be done more effectively, add an API to allow > main loop watches to be registered against streams. > > Since the libvirt level virStreamEventAddCallback API only allows > a single callback to be registered to a stream at any time, the > GVirStream object needs to be multiplexing of multiple watches into > a single libvirt level callback. > > Watches can be removed in the normal way with g_source_remove > > * libvirt-gobject/libvirt-gobject-stream.c, > libvirt-gobject/libvirt-gobject-stream.h, > libvirt-gobject/libvirt-gobject.sym: Add gvir_stream_add_watch > --- > libvirt-gobject/libvirt-gobject-stream.c | 180 ++++++++++++++++++++++++++++++ > libvirt-gobject/libvirt-gobject-stream.h | 17 +++ > libvirt-gobject/libvirt-gobject.sym | 1 + > 3 files changed, 198 insertions(+), 0 deletions(-) > > diff --git a/libvirt-gobject/libvirt-gobject-stream.c b/libvirt-gobject/libvirt-gobject-stream.c > index 0d1c2d1..03b2c84 100644 > --- a/libvirt-gobject/libvirt-gobject-stream.c > +++ b/libvirt-gobject/libvirt-gobject-stream.c > @@ -46,8 +46,20 @@ struct _GVirStreamPrivate > virStreamPtr handle; > GInputStream *input_stream; > GOutputStream *output_stream; > + > + gboolean eventRegistered; > + int eventLast; > + GList *sources; > }; > > +typedef struct { > + GSource source; > + GVirStreamIOCondition cond; > + GVirStreamIOCondition newCond; > + GVirStream *stream; > +} GVirStreamSource; > + > + > G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM); > > > @@ -186,6 +198,7 @@ static void gvir_stream_finalize(GObject *object) > { > GVirStream *self = GVIR_STREAM(object); > GVirStreamPrivate *priv = self->priv; > + GList *tmp; > > DEBUG("Finalize GVirStream=%p", self); > > @@ -199,6 +212,14 @@ static void gvir_stream_finalize(GObject *object) > virStreamFree(priv->handle); > } > > + tmp = priv->sources; > + while (tmp) { > + GVirStreamSource *source = tmp->data; > + g_source_remove(g_source_get_id((GSource*)source)); I think g_source_destroy can be used here > + tmp = tmp->next; > + } > + g_list_free(priv->sources); > + > G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object); > } > > @@ -448,3 +469,162 @@ gvir_stream_send_all(GVirStream *self, GVirStreamSourceFunc func, gpointer user_ > > return r; > } > + > + > +static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED, > + int events, > + void *opaque) > +{ > + GVirStream *stream = GVIR_STREAM(opaque); > + GVirStreamPrivate *priv = stream->priv; > + GList *tmp = priv->sources; > + > + while (tmp) { > + GVirStreamSource *source = tmp->data; > + source->newCond = 0; > + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) { > + if (events & VIR_STREAM_EVENT_READABLE) > + source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE; > + if (events & VIR_STREAM_EVENT_HANGUP) > + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP; > + if (events & VIR_STREAM_EVENT_ERROR) > + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR; > + } > + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) { > + if (events & VIR_STREAM_EVENT_WRITABLE) > + source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE; > + if (events & VIR_STREAM_EVENT_HANGUP) > + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP; > + if (events & VIR_STREAM_EVENT_ERROR) > + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR; > + } > + tmp = tmp->next; > + } > + > +} > + > + > +static void gvir_stream_update_events(GVirStream *stream) > +{ > + GVirStreamPrivate *priv = stream->priv; > + int mask = 0; > + GList *tmp = priv->sources; > + > + while (tmp) { > + GVirStreamSource *source = tmp->data; > + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) > + mask |= VIR_STREAM_EVENT_READABLE; > + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) > + mask |= VIR_STREAM_EVENT_WRITABLE; > + tmp = tmp->next; > + } > + > + if (mask) { > + if (priv->eventRegistered) { > + virStreamEventUpdateCallback(priv->handle, mask); > + } else { > + virStreamEventAddCallback(priv->handle, mask, > + gvir_stream_handle_events, > + g_object_ref(stream), > + g_object_unref); > + priv->eventRegistered = TRUE; > + } > + } else { > + if (priv->eventRegistered) { > + virStreamEventRemoveCallback(priv->handle); > + priv->eventRegistered = FALSE; > + } > + } > +} > + > +static gboolean gvir_stream_source_prepare(GSource *source, > + gint *timeout) > +{ > + GVirStreamSource *gsource = (GVirStreamSource*)source; > + if (gsource->newCond) { > + *timeout = 0; > + return TRUE; > + } > + *timeout = -1; > + return FALSE; > +} > + > +static gboolean gvir_stream_source_check(GSource *source) > +{ > + GVirStreamSource *gsource = (GVirStreamSource*)source; > + if (gsource->newCond) > + return TRUE; > + return FALSE; > +} > + > +static gboolean gvir_stream_source_dispatch(GSource *source, > + GSourceFunc callback, > + gpointer user_data) > +{ > + GVirStreamSource *gsource = (GVirStreamSource*)source; > + GVirStreamIOFunc func = (GVirStreamIOFunc)callback; > + gboolean ret; > + ret = func(gsource->stream, gsource->newCond, user_data); > + gsource->newCond = 0; > + return ret; > +} > + > +static void gvir_stream_source_finalize(GSource *source) > +{ > + GVirStreamSource *gsource = (GVirStreamSource*)source; > + GVirStreamPrivate *priv = gsource->stream->priv; > + GList *tmp, *prev = NULL; > + > + tmp = priv->sources; > + while (tmp) { > + if (tmp->data == source) { > + if (prev) { > + prev->next = tmp->next; > + } else { > + priv->sources = tmp->next; > + } > + tmp->next = NULL; > + g_list_free(tmp); > + break; > + } > + > + prev = tmp; > + tmp = tmp->next; > + } isn't it doing the same as g_list_remove? > + > + gvir_stream_update_events(gsource->stream); > +} > + > +GSourceFuncs gvir_stream_source_funcs = { > + .prepare = gvir_stream_source_prepare, > + .check = gvir_stream_source_check, > + .dispatch = gvir_stream_source_dispatch, > + .finalize = gvir_stream_source_finalize, > +}; > + > +gint gvir_stream_add_watch(GVirStream *stream, > + GVirStreamIOCondition cond, > + GVirStreamIOFunc func, > + gpointer opaque, > + GDestroyNotify notify) Dunno if it's worth having both gvir_stream_add_watch and gvir_stream_add_watch_full to be consistent with most glib source functions (g_timeout_add, g_idle_add, g_io_add_watch, ...). The notify argument would only be in the _full variant. > +{ > + GVirStreamPrivate *priv = stream->priv; > + gint id; > + GVirStreamSource *source = (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs, > + sizeof(GVirStreamSource)); > + > + source->stream = stream; > + source->cond = cond; > + > + priv->sources = g_list_append(priv->sources, source); > + > + gvir_stream_update_events(source->stream); > + > + g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque, notify); > + g_source_attach((GSource*)source, g_main_context_default()); > + > + id = g_source_get_id((GSource*)source); g_source_attach returns this id which is of type guint. > + g_source_unref((GSource*)source); > + > + return id; > +} > diff --git a/libvirt-gobject/libvirt-gobject-stream.h b/libvirt-gobject/libvirt-gobject-stream.h > index 5a1ee68..e0004b2 100644 > --- a/libvirt-gobject/libvirt-gobject-stream.h > +++ b/libvirt-gobject/libvirt-gobject-stream.h > @@ -93,6 +93,23 @@ typedef gint (* GVirStreamSourceFunc)(GVirStream *stream, > GType gvir_stream_get_type(void); > GType gvir_stream_handle_get_type(void); > > +typedef enum { > + GVIR_STREAM_IO_CONDITION_READABLE = (1 << 0), > + GVIR_STREAM_IO_CONDITION_WRITABLE = (1 << 1), > + GVIR_STREAM_IO_CONDITION_HANGUP = (1 << 2), > + GVIR_STREAM_IO_CONDITION_ERROR = (1 << 3), > +} GVirStreamIOCondition; > + > +typedef gboolean (*GVirStreamIOFunc)(GVirStream *stream, > + GVirStreamIOCondition cond, > + gpointer opaque); > + > +gint gvir_stream_add_watch(GVirStream *stream, > + GVirStreamIOCondition cond, > + GVirStreamIOFunc func, > + gpointer opaque, > + GDestroyNotify notify); > + > gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func, gpointer user_data, GError **error); > gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size, GCancellable *cancellable, GError **error); > > diff --git a/libvirt-gobject/libvirt-gobject.sym b/libvirt-gobject/libvirt-gobject.sym > index 78b3935..6261865 100644 > --- a/libvirt-gobject/libvirt-gobject.sym > +++ b/libvirt-gobject/libvirt-gobject.sym > @@ -126,6 +126,7 @@ LIBVIRT_GOBJECT_0.0.1 { > gvir_stream_get_type; > gvir_stream_receive_all; > gvir_stream_handle_get_type; > + gvir_stream_add_watch; > > local: > *; > -- > 1.7.6.4 > > -- > libvir-list mailing list > libvir-list@xxxxxxxxxx > https://www.redhat.com/mailman/listinfo/libvir-list
Attachment:
pgpgg33NNnGKi.pgp
Description: PGP signature
-- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list