Hey, On Tue, Aug 28, 2018 at 10:04 AM Victor Toso <victortoso@xxxxxxxxxx> wrote: > > Hi, > > On Tue, Aug 14, 2018 at 08:53:39PM +0200, Jakub Janků wrote: > > Add a set of helper functions built around GIO that can be used to > > easily write messages to and read from the given FD. > > > > Since VDAgentConnection uses GIO, > > it integrates well with GMainLoop. > > > > Read messages must begin with a header of a fixed size. > > Message body size can vary. > > > > User of VDAgentConnection is notified > > through callbacks about the following events: > > - message header read > > - message body read > > - I/O error > > > > A new VDAgentConnection can be constructed using > > vdagent_connection_new() based on a GIOStream. > > > > A new GIOStream can be obtained using > > vdagent_file_open() or vdagent_socket_connect(). > > > > vdagent_connection_destroy() destroyes the connection. > > However, due to the asynchronous nature of used GIO functions, > > this does NOT close the underlying FD immediately. > > Yep, commented about it on 00/18 but I take that making this a > GObject might help. Not giving a full review here, just small > note after looking at the patch and the follow up ones. > > > --- > > Makefile.am | 2 + > > src/vdagent-connection.c | 301 +++++++++++++++++++++++++++++++++++++++ > > src/vdagent-connection.h | 103 ++++++++++++++ > > 3 files changed, 406 insertions(+) > > create mode 100644 src/vdagent-connection.c > > create mode 100644 src/vdagent-connection.h > > > > diff --git a/Makefile.am b/Makefile.am > > index fa54bbc..b291b19 100644 > > --- a/Makefile.am > > +++ b/Makefile.am > > @@ -7,6 +7,8 @@ sbin_PROGRAMS = src/spice-vdagentd > > common_sources = \ > > src/udscs.c \ > > src/udscs.h \ > > + src/vdagent-connection.c \ > > + src/vdagent-connection.h \ > > src/vdagentd-proto-strings.h \ > > src/vdagentd-proto.h \ > > $(NULL) > > diff --git a/src/vdagent-connection.c b/src/vdagent-connection.c > > new file mode 100644 > > index 0000000..0eb2ec9 > > --- /dev/null > > +++ b/src/vdagent-connection.c > > @@ -0,0 +1,301 @@ > > +/* vdagent-connection.c > > + > > + Copyright 2018 Red Hat, Inc. > > + > > + This program is free software: you can redistribute it and/or modify > > + it under the terms of the GNU General Public License as published by > > + the Free Software Foundation, either version 3 of the License, or > > + (at your option) any later version. > > + > > + This program is distributed in the hope that it will be useful, > > + but WITHOUT ANY WARRANTY; without even the implied warranty of > > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > > + GNU General Public License for more details. > > + > > + You should have received a copy of the GNU General Public License > > + along with this program. If not, see <http://www.gnu.org/licenses/>. > > +*/ > > + > > +#include <syslog.h> > > +#include <fcntl.h> > > +#include <glib/gstdio.h> > > +#include <gio/gunixinputstream.h> > > +#include <gio/gunixoutputstream.h> > > +#include <gio/gunixsocketaddress.h> > > + > > +#include "vdagent-connection.h" > > + > > +struct VDAgentConnection { > > + GIOStream *io_stream; > > + gboolean opening; > > + GCancellable *cancellable; > > + > > + GQueue *write_queue; > > + GMainLoop *flush_loop; > > + > > + VDAgentConnReadCb read_cb; > > + gpointer read_buff; > > + gpointer header_buff; > > + gsize header_size; > > + VDAgentConnHeaderReadCb header_read_cb; > > + > > + VDAgentConnErrorCb error_cb; > > + > > + GCredentials *credentials; > > + > > + gpointer user_data; > > +}; > > + > > +static void request_message_write(VDAgentConnection *conn); > > +static void request_message_read(VDAgentConnection *conn); > > + > > +GIOStream *vdagent_file_open(const gchar *path) > > +{ > > + gint fd; > > + > > + fd = g_open(path, O_RDWR); > > + if (fd == -1) { > > + syslog(LOG_ERR, "%s: %m", __func__); > > + return NULL; > > + } > > + > > + return g_simple_io_stream_new(g_unix_input_stream_new(fd, TRUE), > > + g_unix_output_stream_new(fd, TRUE)); > > +} > > + > > +GIOStream *vdagent_socket_connect(const gchar *address) > > +{ > > + GSocketConnection *socket_conn; > > + GSocketClient *client; > > + GSocketConnectable *connectable; > > + GError *err = NULL; > > + > > + connectable = G_SOCKET_CONNECTABLE(g_unix_socket_address_new(address)); > > + client = g_object_new(G_TYPE_SOCKET_CLIENT, > > + "family", G_SOCKET_FAMILY_UNIX, > > + "type", G_SOCKET_TYPE_STREAM, > > + NULL); > > + > > + socket_conn = g_socket_client_connect(client, connectable, NULL, &err); > > + g_object_unref(client); > > + g_object_unref(connectable); > > + if (err) { > > + syslog(LOG_ERR, "%s: %s", __func__, err->message); > > + g_error_free(err); > > + } > > + return G_IO_STREAM(socket_conn); > > +} > > Not convinced that this API is really needed? The function can be > kept but to be used internally by vdagent_connection_new() > itself, I guess. I don't see other components using GIOStream > unless to pass it to vdagent_connection_new (). The rationale behind this was actually purely cosmetic. We could have something like vdagent_connection_new_file() and vdagent_connection_new_socket(), but both functions would need to have 7 arguments and we would need to initialize all the members of VDAgentConnection struct in both of the functions. So having just one common vdagent_connection_new() function seemed cleaner to me. > > Might make sense to make socket's address as VDAgentConnection's > property which should be set on g_object_new () too. > > Reviewed-by: Victor Toso <victortoso@xxxxxxxxxx> > > Victor > > > + > > +VDAgentConnection *vdagent_connection_new( > > + GIOStream *io_stream, > > + gboolean wait_on_opening, > > + gsize header_size, > > + VDAgentConnHeaderReadCb header_read_cb, > > + VDAgentConnReadCb read_cb, > > + VDAgentConnErrorCb error_cb, > > + gpointer user_data) > > +{ > > + VDAgentConnection *conn; > > + > > + conn = g_new(VDAgentConnection, 1); > > + conn->io_stream = io_stream; > > + conn->cancellable = g_cancellable_new(); > > + conn->opening = wait_on_opening; > > + conn->write_queue = g_queue_new(); > > + conn->flush_loop = NULL; > > + conn->read_cb = read_cb; > > + conn->read_buff = NULL; > > + conn->header_buff = g_malloc(header_size); > > + conn->header_size = header_size; > > + conn->header_read_cb = header_read_cb; > > + conn->error_cb = error_cb; > > + conn->credentials = NULL; > > + conn->user_data = user_data; > > + > > + request_message_read(conn); > > + > > + return conn; > > +} > > + > > +static gboolean connection_has_pending(VDAgentConnection *conn) > > +{ > > + GInputStream *in = g_io_stream_get_input_stream(conn->io_stream); > > + GOutputStream *out = g_io_stream_get_output_stream(conn->io_stream); > > + > > + return g_input_stream_has_pending(in) || g_output_stream_has_pending(out); > > +} > > + > > +/* Free up all resources used by VDAgentConnection > > + * once all I/O operations have finished. */ > > +static void connection_finalize(VDAgentConnection *conn) > > +{ > > + g_object_unref(conn->cancellable); > > + g_queue_free_full(conn->write_queue, (GDestroyNotify)g_bytes_unref); > > + g_clear_pointer(&conn->flush_loop, g_main_loop_quit); > > + g_clear_object(&conn->credentials); > > + g_free(conn->header_buff); > > + g_free(conn->read_buff); > > + g_object_unref(conn->io_stream); > > + g_free(conn); > > +} > > + > > +void vdagent_connection_destroy(VDAgentConnection *conn) > > +{ > > + /* If there's a pending I/O operation on either of the streams, cancel it, > > + * connection_finalize() will be invoked in the next GMainLoop iteration(s). */ > > + if (connection_has_pending(conn)) > > + g_cancellable_cancel(conn->cancellable); > > + else > > + connection_finalize(conn); > > +} > > + > > +static void handle_io_error(VDAgentConnection *conn, GError *err) > > +{ > > + if (g_cancellable_is_cancelled(conn->cancellable)) { > > + if (!connection_has_pending(conn)) > > + connection_finalize(conn); > > + } else { > > + syslog(LOG_ERR, "vdagent-connection: I/O error: %s", err->message); > > + conn->error_cb(conn->user_data); > > + } > > + g_error_free(err); > > +} > > + > > +GCredentials *vdagent_connection_get_peer_credentials(VDAgentConnection *conn) > > +{ > > + GSocket *socket; > > + GError *err = NULL; > > + > > + g_return_val_if_fail(G_IS_SOCKET_CONNECTION(conn->io_stream), NULL); > > + > > + if (conn->credentials) > > + return conn->credentials; > > + > > + socket = g_socket_connection_get_socket(G_SOCKET_CONNECTION(conn->io_stream)); > > + conn->credentials = g_socket_get_credentials(socket, &err); > > + if (err) { > > + syslog(LOG_ERR, "%s: %s", __func__, err->message); > > + g_error_free(err); > > + } > > + return conn->credentials; > > +} > > + > > +static void message_write_cb(GObject *source_object, > > + GAsyncResult *res, > > + gpointer user_data) > > +{ > > + VDAgentConnection *conn = user_data; > > + GOutputStream *out = G_OUTPUT_STREAM(source_object); > > + GError *err = NULL; > > + > > + g_output_stream_write_all_finish(out, res, NULL, &err); > > + g_bytes_unref(g_queue_pop_head(conn->write_queue)); > > + > > + if (err) > > + return handle_io_error(conn, err); > > + > > + conn->opening = FALSE; > > + > > + if (g_queue_is_empty(conn->write_queue)) > > + g_clear_pointer(&conn->flush_loop, g_main_loop_quit); > > + else > > + request_message_write(conn); > > +} > > + > > +static void request_message_write(VDAgentConnection *conn) > > +{ > > + GBytes *msg; > > + GOutputStream *out; > > + > > + msg = g_queue_peek_head(conn->write_queue); > > + out = g_io_stream_get_output_stream(conn->io_stream); > > + > > + g_output_stream_write_all_async(out, > > + g_bytes_get_data(msg, NULL), g_bytes_get_size(msg), > > + G_PRIORITY_DEFAULT, conn->cancellable, message_write_cb, conn); > > +} > > + > > +void vdagent_connection_write(VDAgentConnection *conn, > > + gpointer data, > > + gsize size) > > +{ > > + g_queue_push_tail(conn->write_queue, g_bytes_new_take(data, size)); > > + > > + if (g_queue_get_length(conn->write_queue) == 1) > > + request_message_write(conn); > > +} > > + > > +void vdagent_connection_flush(VDAgentConnection *conn) > > +{ > > + GMainLoop *loop; > > + /* TODO: allow multiple flush calls at once? */ > > + g_return_if_fail(conn->flush_loop == NULL); > > + > > + if (g_queue_is_empty(conn->write_queue)) > > + return; > > + > > + loop = conn->flush_loop = g_main_loop_new(NULL, FALSE); > > + /* When using GTK+, this should be wrapped with > > + * gdk_threads_leave() and gdk_threads_enter(), > > + * but since flush is used in virtio-port.c only > > + * let's leave it as it is for now. */ > > + g_main_loop_run(loop); > > + g_main_loop_unref(loop); > > +} > > + > > +static void message_read_cb(GObject *source_object, > > + GAsyncResult *res, > > + gpointer user_data) > > +{ > > + VDAgentConnection *conn = user_data; > > + GInputStream *in = G_INPUT_STREAM(source_object); > > + GError *err = NULL; > > + gsize bytes_read, data_size; > > + > > + g_input_stream_read_all_finish(in, res, &bytes_read, &err); > > + if (err) > > + return handle_io_error(conn, err); > > + if (bytes_read == 0) { > > + /* see virtio-port.c for the rationale behind this */ > > + if (conn->opening) { > > + g_usleep(10000); > > + request_message_read(conn); > > + } else { > > + conn->error_cb(conn->user_data); > > + } > > + return; > > + } > > + conn->opening = FALSE; > > + > > + if (conn->read_buff == NULL) { > > + /* we've read the message header, now let's read its body */ > > + if (!conn->header_read_cb(conn->header_buff, &data_size, conn->user_data)) > > + return; > > + if (data_size > 0) { > > + conn->read_buff = g_malloc(data_size); > > + /* TODO: if allocation fails, we could try g_input_stream_skip() > > + * and hope that the message wasn't crucial for proper functiong. > > + * An example might be when a user tries to copy large clipboard. > > + * Not sure whether it's worth implementing. > > + * Other stuff might just as well fall apart > > + * when the system is running out of memory? */ > > + g_input_stream_read_all_async(in, conn->read_buff, data_size, > > + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn); > > + return; > > + } > > + } > > + > > + if (!conn->read_cb(conn->header_buff, conn->read_buff, conn->user_data)) > > + return; > > + g_clear_pointer(&conn->read_buff, g_free); > > + request_message_read(conn); > > +} > > + > > +static void request_message_read(VDAgentConnection *conn) > > +{ > > + GInputStream *in; > > + in = g_io_stream_get_input_stream(conn->io_stream); > > + > > + g_input_stream_read_all_async(in, conn->header_buff, conn->header_size, > > + G_PRIORITY_DEFAULT, conn->cancellable, message_read_cb, conn); > > +} > > diff --git a/src/vdagent-connection.h b/src/vdagent-connection.h > > new file mode 100644 > > index 0000000..6fc0081 > > --- /dev/null > > +++ b/src/vdagent-connection.h > > @@ -0,0 +1,103 @@ > > +/* vdagent-connection.h > > + > > + Copyright 2018 Red Hat, Inc. > > + > > + This program is free software: you can redistribute it and/or modify > > + it under the terms of the GNU General Public License as published by > > + the Free Software Foundation, either version 3 of the License, or > > + (at your option) any later version. > > + > > + This program is distributed in the hope that it will be useful, > > + but WITHOUT ANY WARRANTY; without even the implied warranty of > > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > > + GNU General Public License for more details. > > + > > + You should have received a copy of the GNU General Public License > > + along with this program. If not, see <http://www.gnu.org/licenses/>. > > +*/ > > + > > +#ifndef __VDAGENT_CONNECTION_H > > +#define __VDAGENT_CONNECTION_H > > + > > +#include <glib.h> > > +#include <gio/gio.h> > > + > > +typedef struct VDAgentConnection VDAgentConnection; > > + > > +/* Called when a message header has been read. > > + * > > + * If the handler wishes to continue reading, > > + * it must set @body_size to the size of message's body and return TRUE. > > + * Once @body_size bytes are read, VDAgentConnReadCb() is invoked. > > + * > > + * Otherwise the handler should return FALSE > > + * and call vdagent_connection_destroy(). > > + * > > + * @header_buff is owned by VDAgentConnection and must not be freed. */ > > +typedef gboolean (*VDAgentConnHeaderReadCb)(gpointer header_buff, > > + gsize *body_size, > > + gpointer user_data); > > + > > +/* Called when a full message has been read. > > + * > > + * If the handler wished to continue reading, it must return TRUE, > > + * otherwise FALSE and call vdagent_connection_destroy(). > > + * > > + * @header, @data are owned by VDAgentConnection and must not be freed. */ > > +typedef gboolean (*VDAgentConnReadCb)(gpointer header, > > + gpointer data, > > + gpointer user_data); > > + > > +/* Called when an error occured during read or wirte. > > + * The handler is expected to call vdagent_connection_destroy(). */ > > +typedef void (*VDAgentConnErrorCb)(gpointer user_data); > > + > > +/* Open a file in @path for read and write. > > + * Returns a GIOStream to the given file or NULL on error. */ > > +GIOStream *vdagent_file_open(const gchar *path); > > + > > +/* Create a socket and initiate a new connection to the socket on @address. > > + * Returns a GIOStream corresponding to the new connection or NULL on error. */ > > +GIOStream *vdagent_socket_connect(const gchar *address); > > + > > +/* Create new VDAgentConnection and start reading incoming messages. > > + * > > + * If @wait_on_opening is set to TRUE, EOF won't be treated as an error > > + * until the first message is successfully read or written to the @io_stream. > > + * > > + * @user_data will be passed to the supplied callbacks. */ > > +VDAgentConnection *vdagent_connection_new( > > + GIOStream *io_stream, > > + gboolean wait_on_opening, > > + gsize header_size, > > + VDAgentConnHeaderReadCb header_read_cb, > > + VDAgentConnReadCb read_cb, > > + VDAgentConnErrorCb error_cb, > > + gpointer user_data); > > + > > +/* Free up all resources associated with the VDAgentConnection. > > + * > > + * This operation can be asynchronous. */ > > +void vdagent_connection_destroy(VDAgentConnection *conn); > > + > > +/* Append a message to write queue. > > + * > > + * VDAgentConnection takes ownership of the @data > > + * and frees it once the message is flushed. */ > > +void vdagent_connection_write(VDAgentConnection *conn, > > + gpointer data, > > + gsize size); > > + > > +/* Waits until all queued messages get written to the output stream. > > + * > > + * Note: other GSources can be triggered during this call */ > > +void vdagent_connection_flush(VDAgentConnection *conn); > > + > > +/* Returns the credentials of the foreign process connected to the socket. > > + * > > + * It is an error to call this function with a VDAgentConnection > > + * that isn't based on a GIOStream of G_TYPE_SOCKET_CONNECTION. */ > > +GCredentials *vdagent_connection_get_peer_credentials( > > + VDAgentConnection *conn); > > + > > +#endif > > -- > > 2.17.1 > > > > _______________________________________________ > > Spice-devel mailing list > > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > > https://lists.freedesktop.org/mailman/listinfo/spice-devel _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel