Re: [RFC spice-vdagent 05/18] add VDAgentConnection

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi,

On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote:
> Add a set of helper functions built around GIO that can be used to
> easily write messages to and read from the given FD.
> 
> Since VDAgentConnection uses GIO,
> it integrates well with GMainLoop.
> 
> Read messages must begin with a header of a fixed size.
> Message body size can vary.
> 
> User of VDAgentConnection is notified
> through callbacks about the following events:
> - message header read
> - message body read
> - I/O error
> 
> A new VDAgentConnection can be constructed using
> vdagent_connection_new() based on a GIOStream.
> 
> A new GIOStream can be obtained using
> vdagent_file_open() or vdagent_socket_connect().
> 
> vdagent_connection_destroy() destroyes the connection.
> However, due to the asynchronous nature of used GIO functions,
> this does NOT close the underlying FD immediately.

Yep, commented about it on 00/18 but I take that making this a
GObject might help. Not giving a full review here, just small
note after looking at the patch and the follow up ones.

> ---
>  Makefile.am              |   2 +
>  src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++
>  src/vdagent-connection.h | 103 ++++++++++++++
>  3 files changed, 406 insertions(+)
>  create mode 100644 src/vdagent-connection.c
>  create mode 100644 src/vdagent-connection.h
> 
> diff --git a/Makefile.am b/Makefile.am
> index fa54bbc..b291b19 100644
> --- a/Makefile.am
> +++ b/Makefile.am
> @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd
>  common_sources =				\
>  	src/udscs.c				\
>  	src/udscs.h				\
> +	src/vdagent-connection.c		\
> +	src/vdagent-connection.h		\
>  	src/vdagentd-proto-strings.h		\
>  	src/vdagentd-proto.h			\
>  	$(NULL)
> diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c
> new file mode 100644
> index 0000000..0eb2ec9
> --- /dev/null
> +++ b/src/vdagent-connection.c
> @@ -0,0 +1,301 @@
> +/*  vdagent-connection.c
> +
> +    Copyright 2018 Red Hat, Inc.
> +
> +    This program is free software: you can redistribute it and/or modify
> +    it under the terms of the GNU General Public License as published by
> +    the Free Software Foundation, either version 3 of the License, or
> +    (at your option) any later version.
> +
> +    This program is distributed in the hope that it will be useful,
> +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +    GNU General Public License for more details.
> +
> +    You should have received a copy of the GNU General Public License
> +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#include <syslog.h>
> +#include <fcntl.h>
> +#include <glib/gstdio.h>
> +#include <gio/gunixinputstream.h>
> +#include <gio/gunixoutputstream.h>
> +#include <gio/gunixsocketaddress.h>
> +
> +#include "vdagent-connection.h"
> +
> +struct VDAgentConnection {
> +    GIOStream              *io_stream;
> +    gboolean                opening;
> +    GCancellable           *cancellable;
> +
> +    GQueue                 *write_queue;
> +    GMainLoop              *flush_loop;
> +
> +    VDAgentConnReadCb       read_cb;
> +    gpointer                read_buff;
> +    gpointer                header_buff;
> +    gsize                   header_size;
> +    VDAgentConnHeaderReadCb header_read_cb;
> +
> +    VDAgentConnErrorCb      error_cb;
> +
> +    GCredentials           *credentials;
> +
> +    gpointer                user_data;
> +};
> +
> +static void request_message_write(VDAgentConnection *conn);
> +static void request_message_read(VDAgentConnection *conn);
> +
> +GIOStream *vdagent_file_open(const gchar *path)
> +{
> +    gint fd;
> +
> +    fd = g_open(path, O_RDWR);
> +    if (fd == -1) {
> +        syslog(LOG_ERR, "%s: %m", __func__);
> +        return NULL;
> +    }
> +
> +    return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE),
> +                                  g_unix_output_stream_new(fd, TRUE));
> +}
> +
> +GIOStream *vdagent_socket_connect(const gchar *address)
> +{
> +    GSocketConnection *socket_conn;
> +    GSocketClient *client;
> +    GSocketConnectable *connectable;
> +    GError *err = NULL;
> +
> +    connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address));
> +    client = g_object_new(G_TYPE_SOCKET_CLIENT,
> +                          "family", G_SOCKET_FAMILY_UNIX,
> +                          "type", G_SOCKET_TYPE_STREAM,
> +                          NULL);
> +
> +    socket_conn = g_socket_client_connect(client, connectable, NULL, &err);
> +    g_object_unref(client);
> +    g_object_unref(connectable);
> +    if (err) {
> +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> +        g_error_free(err);
> +    }
> +    return G_IO_STREAM(socket_conn);
> +}

Not convinced that this API is really needed? The function can be
kept but to be used internally by vdagent_connection_new()
itself, I guess. I don't see other components using GIOStream
unless to pass it to vdagent_connection_new ().

Might make sense to make socket's address as VDAgentConnection's
property which should be set on g_object_new () too.

Reviewed-by: Victor Toso <victortoso@xxxxxxxxxx>

Victor

> +
> +VDAgentConnection *vdagent_connection_new(
> +    GIOStream              *io_stream,
> +    gboolean                wait_on_opening,
> +    gsize                   header_size,
> +    VDAgentConnHeaderReadCb header_read_cb,
> +    VDAgentConnReadCb       read_cb,
> +    VDAgentConnErrorCb      error_cb,
> +    gpointer                user_data)
> +{
> +    VDAgentConnection *conn;
> +
> +    conn = g_new(VDAgentConnection, 1);
> +    conn->io_stream = io_stream;
> +    conn->cancellable = g_cancellable_new();
> +    conn->opening = wait_on_opening;
> +    conn->write_queue = g_queue_new();
> +    conn->flush_loop = NULL;
> +    conn->read_cb = read_cb;
> +    conn->read_buff = NULL;
> +    conn->header_buff = g_malloc(header_size);
> +    conn->header_size = header_size;
> +    conn->header_read_cb = header_read_cb;
> +    conn->error_cb = error_cb;
> +    conn->credentials = NULL;
> +    conn->user_data = user_data;
> +
> +    request_message_read(conn);
> +
> +    return conn;
> +}
> +
> +static gboolean connection_has_pending(VDAgentConnection *conn)
> +{
> +    GInputStream *in = g_io_stream_get_input_stream(conn->io_stream);
> +    GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream);
> +
> +    return g_input_stream_has_pending(in) || g_output_stream_has_pending(out);
> +}
> +
> +/* Free up all resources used by VDAgentConnection
> + * once all I/O operations have finished. */
> +static void connection_finalize(VDAgentConnection *conn)
> +{
> +    g_object_unref(conn->cancellable);
> +    g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref);
> +    g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> +    g_clear_object(&conn->credentials);
> +    g_free(conn->header_buff);
> +    g_free(conn->read_buff);
> +    g_object_unref(conn->io_stream);
> +    g_free(conn);
> +}
> +
> +void vdagent_connection_destroy(VDAgentConnection *conn)
> +{
> +    /* If there's a pending I/O operation on either of the streams, cancel it,
> +     * connection_finalize() will be invoked in the next GMainLoop iteration(s). */
> +    if (connection_has_pending(conn))
> +        g_cancellable_cancel(conn->cancellable);
> +    else
> +        connection_finalize(conn);
> +}
> +
> +static void handle_io_error(VDAgentConnection *conn, GError *err)
> +{
> +    if (g_cancellable_is_cancelled(conn->cancellable)) {
> +        if (!connection_has_pending(conn))
> +            connection_finalize(conn);
> +    } else {
> +        syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message);
> +        conn->error_cb(conn->user_data);
> +    }
> +    g_error_free(err);
> +}
> +
> +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn)
> +{
> +    GSocket *socket;
> +    GError *err = NULL;
> +
> +    g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL);
> +
> +    if (conn->credentials)
> +        return conn->credentials;
> +
> +    socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream));
> +    conn->credentials = g_socket_get_credentials(socket, &err);
> +    if (err) {
> +        syslog(LOG_ERR, "%s: %s", __func__, err->message);
> +        g_error_free(err);
> +    }
> +    return conn->credentials;
> +}
> +
> +static void message_write_cb(GObject      *source_object,
> +                             GAsyncResult *res,
> +                             gpointer      user_data)
> +{
> +    VDAgentConnection *conn = user_data;
> +    GOutputStream *out = G_OUTPUT_STREAM(source_object);
> +    GError *err = NULL;
> +
> +    g_output_stream_write_all_finish(out, res, NULL, &err);
> +    g_bytes_unref(g_queue_pop_head(conn->write_queue));
> +
> +    if (err)
> +        return handle_io_error(conn, err);
> +
> +    conn->opening = FALSE;
> +
> +    if (g_queue_is_empty(conn->write_queue))
> +        g_clear_pointer(&conn->flush_loop, g_main_loop_quit);
> +    else
> +        request_message_write(conn);
> +}
> +
> +static void request_message_write(VDAgentConnection *conn)
> +{
> +    GBytes *msg;
> +    GOutputStream *out;
> +
> +    msg = g_queue_peek_head(conn->write_queue);
> +    out = g_io_stream_get_output_stream(conn->io_stream);
> +
> +    g_output_stream_write_all_async(out,
> +        g_bytes_get_data(msg, NULL), g_bytes_get_size(msg),
> +        G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn);
> +}
> +
> +void vdagent_connection_write(VDAgentConnection *conn,
> +                              gpointer           data,
> +                              gsize              size)
> +{
> +    g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size));
> +
> +    if (g_queue_get_length(conn->write_queue) == 1)
> +        request_message_write(conn);
> +}
> +
> +void vdagent_connection_flush(VDAgentConnection *conn)
> +{
> +    GMainLoop *loop;
> +    /* TODO: allow multiple flush calls at once? */
> +    g_return_if_fail(conn->flush_loop == NULL);
> +
> +    if (g_queue_is_empty(conn->write_queue))
> +        return;
> +
> +    loop = conn->flush_loop = g_main_loop_new(NULL, FALSE);
> +    /* When using GTK+, this should be wrapped with
> +     * gdk_threads_leave() and gdk_threads_enter(),
> +     * but since flush is used in virtio-port.c only
> +     * let's leave it as it is for now. */
> +    g_main_loop_run(loop);
> +    g_main_loop_unref(loop);
> +}
> +
> +static void message_read_cb(GObject      *source_object,
> +                            GAsyncResult *res,
> +                            gpointer      user_data)
> +{
> +    VDAgentConnection *conn = user_data;
> +    GInputStream *in = G_INPUT_STREAM(source_object);
> +    GError *err = NULL;
> +    gsize bytes_read, data_size;
> +
> +    g_input_stream_read_all_finish(in, res, &bytes_read, &err);
> +    if (err)
> +        return handle_io_error(conn, err);
> +    if (bytes_read == 0) {
> +        /* see virtio-port.c for the rationale behind this */
> +        if (conn->opening) {
> +            g_usleep(10000);
> +            request_message_read(conn);
> +        } else {
> +            conn->error_cb(conn->user_data);
> +        }
> +        return;
> +    }
> +    conn->opening = FALSE;
> +
> +    if (conn->read_buff == NULL) {
> +        /* we've read the message header, now let's read its body */
> +        if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data))
> +            return;
> +        if (data_size > 0) {
> +            conn->read_buff = g_malloc(data_size);
> +            /* TODO: if allocation fails, we could try g_input_stream_skip()
> +             * and hope that the message wasn't crucial for proper functiong.
> +             * An example might be when a user tries to copy large clipboard.
> +             * Not sure whether it's worth implementing.
> +             * Other stuff might just as well fall apart
> +             * when the system is running out of memory? */
> +            g_input_stream_read_all_async(in, conn->read_buff, data_size,
> +                G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> +            return;
> +        }
> +    }
> +
> +    if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data))
> +        return;
> +    g_clear_pointer(&conn->read_buff, g_free);
> +    request_message_read(conn);
> +}
> +
> +static void request_message_read(VDAgentConnection *conn)
> +{
> +    GInputStream *in;
> +    in = g_io_stream_get_input_stream(conn->io_stream);
> +
> +    g_input_stream_read_all_async(in, conn->header_buff, conn->header_size,
> +        G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn);
> +}
> diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h
> new file mode 100644
> index 0000000..6fc0081
> --- /dev/null
> +++ b/src/vdagent-connection.h
> @@ -0,0 +1,103 @@
> +/*  vdagent-connection.h
> +
> +    Copyright 2018 Red Hat, Inc.
> +
> +    This program is free software: you can redistribute it and/or modify
> +    it under the terms of the GNU General Public License as published by
> +    the Free Software Foundation, either version 3 of the License, or
> +    (at your option) any later version.
> +
> +    This program is distributed in the hope that it will be useful,
> +    but WITHOUT ANY WARRANTY; without even the implied warranty of
> +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +    GNU General Public License for more details.
> +
> +    You should have received a copy of the GNU General Public License
> +    along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +*/
> +
> +#ifndef __VDAGENT_CONNECTION_H
> +#define __VDAGENT_CONNECTION_H
> +
> +#include <glib.h>
> +#include <gio/gio.h>
> +
> +typedef struct VDAgentConnection VDAgentConnection;
> +
> +/* Called when a message header has been read.
> + *
> + * If the handler wishes to continue reading,
> + * it must set @body_size to the size of message's body and return TRUE.
> + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked.
> + *
> + * Otherwise the handler should return FALSE
> + * and call vdagent_connection_destroy().
> + *
> + * @header_buff is owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff,
> +                                            gsize   *body_size,
> +                                            gpointer user_data);
> +
> +/* Called when a full message has been read.
> + *
> + * If the handler wished to continue reading, it must return TRUE,
> + * otherwise FALSE and call vdagent_connection_destroy().
> + *
> + * @header, @data are owned by VDAgentConnection and must not be freed. */
> +typedef gboolean (*VDAgentConnReadCb)(gpointer header,
> +                                      gpointer data,
> +                                      gpointer user_data);
> +
> +/* Called when an error occured during read or wirte.
> + * The handler is expected to call vdagent_connection_destroy(). */
> +typedef void (*VDAgentConnErrorCb)(gpointer user_data);
> +
> +/* Open a file in @path for read and write.
> + * Returns a GIOStream to the given file or NULL on error. */
> +GIOStream *vdagent_file_open(const gchar *path);
> +
> +/* Create a socket and initiate a new connection to the socket on @address.
> + * Returns a GIOStream corresponding to the new connection or NULL on error. */
> +GIOStream *vdagent_socket_connect(const gchar *address);
> +
> +/* Create new VDAgentConnection and start reading incoming messages.
> + *
> + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error
> + * until the first message is successfully read or written to the @io_stream.
> + *
> + * @user_data will be passed to the supplied callbacks. */
> +VDAgentConnection *vdagent_connection_new(
> +    GIOStream              *io_stream,
> +    gboolean                wait_on_opening,
> +    gsize                   header_size,
> +    VDAgentConnHeaderReadCb header_read_cb,
> +    VDAgentConnReadCb       read_cb,
> +    VDAgentConnErrorCb      error_cb,
> +    gpointer                user_data);
> +
> +/* Free up all resources associated with the VDAgentConnection.
> + *
> + * This operation can be asynchronous. */
> +void vdagent_connection_destroy(VDAgentConnection *conn);
> +
> +/* Append a message to write queue.
> + *
> + * VDAgentConnection takes ownership of the @data
> + * and frees it once the message is flushed. */
> +void vdagent_connection_write(VDAgentConnection *conn,
> +                              gpointer           data,
> +                              gsize              size);
> +
> +/* Waits until all queued messages get written to the output stream.
> + *
> + * Note: other GSources can be triggered during this call */
> +void vdagent_connection_flush(VDAgentConnection *conn);
> +
> +/* Returns the credentials of the foreign process connected to the socket.
> + *
> + * It is an error to call this function with a VDAgentConnection
> + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */
> +GCredentials *vdagent_connection_get_peer_credentials(
> +    VDAgentConnection *conn);
> +
> +#endif
> -- 
> 2.17.1
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@xxxxxxxxxxxxxxxxxxxxx
> https://lists.freedesktop.org/mailman/listinfo/spice-devel

Attachment: signature.asc
Description: PGP signature

_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
https://lists.freedesktop.org/mailman/listinfo/spice-devel

[Index of Archives]     [Linux Virtualization]     [Linux Virtualization]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]