Hi, On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote: > Add a set of helper functions built around GIO that can be used to > easily write messages to and read from the given FD. > > Since VDAgentConnection uses GIO, > it integrates well with GMainLoop. > > Read messages must begin with a header of a fixed size. > Message body size can vary. > > User of VDAgentConnection is notified > through callbacks about the following events: > - message header read > - message body read > - I/O error > > A new VDAgentConnection can be constructed using > vdagent_connection_new() based on a GIOStream. > > A new GIOStream can be obtained using > vdagent_file_open() or vdagent_socket_connect(). > > vdagent_connection_destroy() destroyes the connection. > However, due to the asynchronous nature of used GIO functions, > this does NOT close the underlying FD immediately. Yep, commented about it on 00/18 but I take that making this a GObject might help. Not giving a full review here, just small note after looking at the patch and the follow up ones. > --- > Makefile.am | 2 + > src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++ > src/vdagent-connection.h | 103 ++++++++++++++ > 3 files changed, 406 insertions(+) > create mode 100644 src/vdagent-connection.c > create mode 100644 src/vdagent-connection.h > > diff --git a/Makefile.am b/Makefile.am > index fa54bbc..b291b19 100644 > --- a/Makefile.am > +++ b/Makefile.am > @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd > common_sources = \ > src/udscs.c \ > src/udscs.h \ > + src/vdagent-connection.c \ > + src/vdagent-connection.h \ > src/vdagentd-proto-strings.h \ > src/vdagentd-proto.h \ > $(NULL) > diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c > new file mode 100644 > index 0000000..0eb2ec9 > --- /dev/null > +++ b/src/vdagent-connection.c > @@ -0,0 +1,301 @@ > +/* vdagent-connection.c > + > + Copyright 2018 Red Hat, Inc. > + > + This program is free software: you can redistribute it and/or modify > + it under the terms of the GNU General Public License as published by > + the Free Software Foundation, either version 3 of the License, or > + (at your option) any later version. > + > + This program is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + GNU General Public License for more details. > + > + You should have received a copy of the GNU General Public License > + along with this program. If not, see <http://www.gnu.org/licenses/>. > +*/ > + > +#include <syslog.h> > +#include <fcntl.h> > +#include <glib/gstdio.h> > +#include <gio/gunixinputstream.h> > +#include <gio/gunixoutputstream.h> > +#include <gio/gunixsocketaddress.h> > + > +#include "vdagent-connection.h" > + > +struct VDAgentConnection { > + GIOStream *io_stream; > + gboolean opening; > + GCancellable *cancellable; > + > + GQueue *write_queue; > + GMainLoop *flush_loop; > + > + VDAgentConnReadCb read_cb; > + gpointer read_buff; > + gpointer header_buff; > + gsize header_size; > + VDAgentConnHeaderReadCb header_read_cb; > + > + VDAgentConnErrorCb error_cb; > + > + GCredentials *credentials; > + > + gpointer user_data; > +}; > + > +static void request_message_write(VDAgentConnection *conn); > +static void request_message_read(VDAgentConnection *conn); > + > +GIOStream *vdagent_file_open(const gchar *path) > +{ > + gint fd; > + > + fd = g_open(path, O_RDWR); > + if (fd == -1) { > + syslog(LOG_ERR, "%s: %m", __func__); > + return NULL; > + } > + > + return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE), > + g_unix_output_stream_new(fd, TRUE)); > +} > + > +GIOStream *vdagent_socket_connect(const gchar *address) > +{ > + GSocketConnection *socket_conn; > + GSocketClient *client; > + GSocketConnectable *connectable; > + GError *err = NULL; > + > + connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address)); > + client = g_object_new(G_TYPE_SOCKET_CLIENT, > + "family", G_SOCKET_FAMILY_UNIX, > + "type", G_SOCKET_TYPE_STREAM, > + NULL); > + > + socket_conn = g_socket_client_connect(client, connectable, NULL, &err); > + g_object_unref(client); > + g_object_unref(connectable); > + if (err) { > + syslog(LOG_ERR, "%s: %s", __func__, err->message); > + g_error_free(err); > + } > + return G_IO_STREAM(socket_conn); > +} Not convinced that this API is really needed? The function can be kept but to be used internally by vdagent_connection_new() itself, I guess. I don't see other components using GIOStream unless to pass it to vdagent_connection_new (). Might make sense to make socket's address as VDAgentConnection's property which should be set on g_object_new () too. Reviewed-by: Victor Toso <victortoso@xxxxxxxxxx> Victor > + > +VDAgentConnection *vdagent_connection_new( > + GIOStream *io_stream, > + gboolean wait_on_opening, > + gsize header_size, > + VDAgentConnHeaderReadCb header_read_cb, > + VDAgentConnReadCb read_cb, > + VDAgentConnErrorCb error_cb, > + gpointer user_data) > +{ > + VDAgentConnection *conn; > + > + conn = g_new(VDAgentConnection, 1); > + conn->io_stream = io_stream; > + conn->cancellable = g_cancellable_new(); > + conn->opening = wait_on_opening; > + conn->write_queue = g_queue_new(); > + conn->flush_loop = NULL; > + conn->read_cb = read_cb; > + conn->read_buff = NULL; > + conn->header_buff = g_malloc(header_size); > + conn->header_size = header_size; > + conn->header_read_cb = header_read_cb; > + conn->error_cb = error_cb; > + conn->credentials = NULL; > + conn->user_data = user_data; > + > + request_message_read(conn); > + > + return conn; > +} > + > +static gboolean connection_has_pending(VDAgentConnection *conn) > +{ > + GInputStream *in = g_io_stream_get_input_stream(conn->io_stream); > + GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream); > + > + return g_input_stream_has_pending(in) || g_output_stream_has_pending(out); > +} > + > +/* Free up all resources used by VDAgentConnection > + * once all I/O operations have finished. */ > +static void connection_finalize(VDAgentConnection *conn) > +{ > + g_object_unref(conn->cancellable); > + g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref); > + g_clear_pointer(&conn->flush_loop, g_main_loop_quit); > + g_clear_object(&conn->credentials); > + g_free(conn->header_buff); > + g_free(conn->read_buff); > + g_object_unref(conn->io_stream); > + g_free(conn); > +} > + > +void vdagent_connection_destroy(VDAgentConnection *conn) > +{ > + /* If there's a pending I/O operation on either of the streams, cancel it, > + * connection_finalize() will be invoked in the next GMainLoop iteration(s). */ > + if (connection_has_pending(conn)) > + g_cancellable_cancel(conn->cancellable); > + else > + connection_finalize(conn); > +} > + > +static void handle_io_error(VDAgentConnection *conn, GError *err) > +{ > + if (g_cancellable_is_cancelled(conn->cancellable)) { > + if (!connection_has_pending(conn)) > + connection_finalize(conn); > + } else { > + syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message); > + conn->error_cb(conn->user_data); > + } > + g_error_free(err); > +} > + > +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn) > +{ > + GSocket *socket; > + GError *err = NULL; > + > + g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL); > + > + if (conn->credentials) > + return conn->credentials; > + > + socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream)); > + conn->credentials = g_socket_get_credentials(socket, &err); > + if (err) { > + syslog(LOG_ERR, "%s: %s", __func__, err->message); > + g_error_free(err); > + } > + return conn->credentials; > +} > + > +static void message_write_cb(GObject *source_object, > + GAsyncResult *res, > + gpointer user_data) > +{ > + VDAgentConnection *conn = user_data; > + GOutputStream *out = G_OUTPUT_STREAM(source_object); > + GError *err = NULL; > + > + g_output_stream_write_all_finish(out, res, NULL, &err); > + g_bytes_unref(g_queue_pop_head(conn->write_queue)); > + > + if (err) > + return handle_io_error(conn, err); > + > + conn->opening = FALSE; > + > + if (g_queue_is_empty(conn->write_queue)) > + g_clear_pointer(&conn->flush_loop, g_main_loop_quit); > + else > + request_message_write(conn); > +} > + > +static void request_message_write(VDAgentConnection *conn) > +{ > + GBytes *msg; > + GOutputStream *out; > + > + msg = g_queue_peek_head(conn->write_queue); > + out = g_io_stream_get_output_stream(conn->io_stream); > + > + g_output_stream_write_all_async(out, > + g_bytes_get_data(msg, NULL), g_bytes_get_size(msg), > + G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn); > +} > + > +void vdagent_connection_write(VDAgentConnection *conn, > + gpointer data, > + gsize size) > +{ > + g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size)); > + > + if (g_queue_get_length(conn->write_queue) == 1) > + request_message_write(conn); > +} > + > +void vdagent_connection_flush(VDAgentConnection *conn) > +{ > + GMainLoop *loop; > + /* TODO: allow multiple flush calls at once? */ > + g_return_if_fail(conn->flush_loop == NULL); > + > + if (g_queue_is_empty(conn->write_queue)) > + return; > + > + loop = conn->flush_loop = g_main_loop_new(NULL, FALSE); > + /* When using GTK+, this should be wrapped with > + * gdk_threads_leave() and gdk_threads_enter(), > + * but since flush is used in virtio-port.c only > + * let's leave it as it is for now. */ > + g_main_loop_run(loop); > + g_main_loop_unref(loop); > +} > + > +static void message_read_cb(GObject *source_object, > + GAsyncResult *res, > + gpointer user_data) > +{ > + VDAgentConnection *conn = user_data; > + GInputStream *in = G_INPUT_STREAM(source_object); > + GError *err = NULL; > + gsize bytes_read, data_size; > + > + g_input_stream_read_all_finish(in, res, &bytes_read, &err); > + if (err) > + return handle_io_error(conn, err); > + if (bytes_read == 0) { > + /* see virtio-port.c for the rationale behind this */ > + if (conn->opening) { > + g_usleep(10000); > + request_message_read(conn); > + } else { > + conn->error_cb(conn->user_data); > + } > + return; > + } > + conn->opening = FALSE; > + > + if (conn->read_buff == NULL) { > + /* we've read the message header, now let's read its body */ > + if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data)) > + return; > + if (data_size > 0) { > + conn->read_buff = g_malloc(data_size); > + /* TODO: if allocation fails, we could try g_input_stream_skip() > + * and hope that the message wasn't crucial for proper functiong. > + * An example might be when a user tries to copy large clipboard. > + * Not sure whether it's worth implementing. > + * Other stuff might just as well fall apart > + * when the system is running out of memory? */ > + g_input_stream_read_all_async(in, conn->read_buff, data_size, > + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn); > + return; > + } > + } > + > + if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data)) > + return; > + g_clear_pointer(&conn->read_buff, g_free); > + request_message_read(conn); > +} > + > +static void request_message_read(VDAgentConnection *conn) > +{ > + GInputStream *in; > + in = g_io_stream_get_input_stream(conn->io_stream); > + > + g_input_stream_read_all_async(in, conn->header_buff, conn->header_size, > + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn); > +} > diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h > new file mode 100644 > index 0000000..6fc0081 > --- /dev/null > +++ b/src/vdagent-connection.h > @@ -0,0 +1,103 @@ > +/* vdagent-connection.h > + > + Copyright 2018 Red Hat, Inc. > + > + This program is free software: you can redistribute it and/or modify > + it under the terms of the GNU General Public License as published by > + the Free Software Foundation, either version 3 of the License, or > + (at your option) any later version. > + > + This program is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + GNU General Public License for more details. > + > + You should have received a copy of the GNU General Public License > + along with this program. If not, see <http://www.gnu.org/licenses/>. > +*/ > + > +#ifndef __VDAGENT_CONNECTION_H > +#define __VDAGENT_CONNECTION_H > + > +#include <glib.h> > +#include <gio/gio.h> > + > +typedef struct VDAgentConnection VDAgentConnection; > + > +/* Called when a message header has been read. > + * > + * If the handler wishes to continue reading, > + * it must set @body_size to the size of message's body and return TRUE. > + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked. > + * > + * Otherwise the handler should return FALSE > + * and call vdagent_connection_destroy(). > + * > + * @header_buff is owned by VDAgentConnection and must not be freed. */ > +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff, > + gsize *body_size, > + gpointer user_data); > + > +/* Called when a full message has been read. > + * > + * If the handler wished to continue reading, it must return TRUE, > + * otherwise FALSE and call vdagent_connection_destroy(). > + * > + * @header, @data are owned by VDAgentConnection and must not be freed. */ > +typedef gboolean (*VDAgentConnReadCb)(gpointer header, > + gpointer data, > + gpointer user_data); > + > +/* Called when an error occured during read or wirte. > + * The handler is expected to call vdagent_connection_destroy(). */ > +typedef void (*VDAgentConnErrorCb)(gpointer user_data); > + > +/* Open a file in @path for read and write. > + * Returns a GIOStream to the given file or NULL on error. */ > +GIOStream *vdagent_file_open(const gchar *path); > + > +/* Create a socket and initiate a new connection to the socket on @address. > + * Returns a GIOStream corresponding to the new connection or NULL on error. */ > +GIOStream *vdagent_socket_connect(const gchar *address); > + > +/* Create new VDAgentConnection and start reading incoming messages. > + * > + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error > + * until the first message is successfully read or written to the @io_stream. > + * > + * @user_data will be passed to the supplied callbacks. */ > +VDAgentConnection *vdagent_connection_new( > + GIOStream *io_stream, > + gboolean wait_on_opening, > + gsize header_size, > + VDAgentConnHeaderReadCb header_read_cb, > + VDAgentConnReadCb read_cb, > + VDAgentConnErrorCb error_cb, > + gpointer user_data); > + > +/* Free up all resources associated with the VDAgentConnection. > + * > + * This operation can be asynchronous. */ > +void vdagent_connection_destroy(VDAgentConnection *conn); > + > +/* Append a message to write queue. > + * > + * VDAgentConnection takes ownership of the @data > + * and frees it once the message is flushed. */ > +void vdagent_connection_write(VDAgentConnection *conn, > + gpointer data, > + gsize size); > + > +/* Waits until all queued messages get written to the output stream. > + * > + * Note: other GSources can be triggered during this call */ > +void vdagent_connection_flush(VDAgentConnection *conn); > + > +/* Returns the credentials of the foreign process connected to the socket. > + * > + * It is an error to call this function with a VDAgentConnection > + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */ > +GCredentials *vdagent_connection_get_peer_credentials( > + VDAgentConnection *conn); > + > +#endif > -- > 2.17.1 > > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > https://lists.freedesktop.org/mailman/listinfo/spice-devel
Attachment:
signature.asc
Description: PGP signature
_______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel