Re: [PATCH spice-gtk V4 1/3] file-xfer: handling various transfer messages in main channel

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




Hi!

On Fri, Dec 28, 2012 at 2:33 PM, Dunrong Huang <riegamaths@xxxxxxxxx> wrote:
Agent channel is a flow-control channel. That means before
we send agent data to server, we must obtain tokens distributed
from spice server, if we do not do that, spice server will get error,
or at least, the data will be discarded.

Other type of agent data will be cached to agent_msg_queue if there
are no more tokens. But for file-xfer data, if we cache too much of
those data, our memory will be exhausted pretty quickly if file is
too big.

We also should make other agent data(clipboard, mouse, ...) get
through when file-xfer data are sending.

So, for the reason of above, we can not fill file-xfer data to agent queue
too quickly, we must consider the tokens, and other messages.

Marc-André suggested me to call spice_channel_flush_async() and wait the
queued data to be sent, but the API does not consider the available
tokens, so I use a new algorithm/API(file_xfer_flush_async) based on
spice_channel_flush_async() to send file-xfer data.


Right

 gtk/channel-main.c      | 476 ++++++++++++++++++++++++++++++++++++++++++++++++
 gtk/channel-main.h      |   8 +
 gtk/map-file            |   1 +
 gtk/spice-glib-sym-file |   1 +
 4 files changed, 486 insertions(+)

diff --git a/gtk/channel-main.c b/gtk/channel-main.c
index 6b9ba8d..b1496bd 100644
--- a/gtk/channel-main.c
+++ b/gtk/channel-main.c
@@ -18,6 +18,7 @@
 #include <math.h>
 #include <spice/vd_agent.h>
 #include <common/rect.h>
+#include <glib/gstdio.h>

 #include "glib-compat.h"
 #include "spice-client.h"
@@ -51,6 +52,24 @@

 typedef struct spice_migrate spice_migrate;

+#define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32)
+typedef struct SpiceFileXferTask {
+    uint32_t                       id;
+    uint32_t                       group_id;
+    GFile                          *file;
+    SpiceMainChannel               *channel;
+    GFileInputStream               *file_stream;
+    GFileCopyFlags                 flags;
+    GCancellable                   *cancellable;
+    GFileProgressCallback          progress_callback;
+    gpointer                       progress_callback_data;
+    GAsyncReadyCallback            callback;
+    gpointer                       user_data;
+    char                           buffer[FILE_XFER_CHUNK_SIZE];
+    uint64_t                       read_bytes;
+    uint64_t                       file_size;
+} SpiceFileXferTask;
+
 struct _SpiceMainChannelPrivate  {
     enum SpiceMouseMode         mouse_mode;
     bool                        agent_connected;
@@ -79,6 +98,8 @@ struct _SpiceMainChannelPrivate  {
     } display[MAX_DISPLAY];
     gint                        timer_id;
     GQueue                      *agent_msg_queue;
+    GList                       *file_xfer_task_list;
+    GSList                      *flushing;

     guint                       switch_host_delayed_id;
     guint                       migrate_delayed_id;
@@ -802,6 +823,63 @@ static void agent_free_msg_queue(SpiceMainChannel *channel)
     c->agent_msg_queue = NULL;
 }

+/* Here, flushing algorithm is stolen from spice-channel.c */
+static void
+file_xfer_flushed(SpiceMainChannel *channel, gboolean success)
+{
+    SpiceMainChannelPrivate *c = channel->priv;
+    GSList *l;
+
+    for (l = c->flushing; l != NULL; l = l->next) {
+        GSimpleAsyncResult *result = G_SIMPLE_ASYNC_RESULT(l->data);
+        g_simple_async_result_set_op_res_gboolean(result, success);
+        g_simple_async_result_complete_in_idle(result);
+    }
+
+    g_slist_free_full(c->flushing, g_object_unref);
+    c->flushing = NULL;
+}
+
+static void
+file_xfer_flush_async(SpiceMainChannel *channel, GCancellable *cancellable,
+                      GAsyncReadyCallback callback, gpointer user_data)
+{
+    GSimpleAsyncResult *simple;
+    SpiceMainChannelPrivate *c = channel->priv;
+    gboolean was_empty;
+
+    simple = g_simple_async_result_new(G_OBJECT(channel), callback, user_data,
+                                       file_xfer_flush_async);
+
+    was_empty = g_queue_is_empty(c->agent_msg_queue);
+    if (was_empty) {
+        g_simple_async_result_set_op_res_gboolean(simple, TRUE);
+        g_simple_async_result_complete_in_idle(simple);
+        return;
+    }
+
+    c->flushing = g_slist_append(c->flushing, simple);
+}
+
+static gboolean
+file_xfer_flush_finish(SpiceMainChannel *channel, GAsyncResult *result,
+                       GError **error)
+{
+    GSimpleAsyncResult *simple;
+
+    simple = (GSimpleAsyncResult *)result;
+
+    if (g_simple_async_result_propagate_error(simple, error)) {
+        return -1;
+    }
+
+    g_return_val_if_fail(g_simple_async_result_is_valid(result,
+        G_OBJECT(channel), file_xfer_flush_async), FALSE);
+
+    CHANNEL_DEBUG(channel, "flushed finished!");
+    return g_simple_async_result_get_op_res_gboolean(simple);
+}
+
 /* coroutine context */
 static void agent_send_msg_queue(SpiceMainChannel *channel)
 {
@@ -814,6 +892,9 @@ static void agent_send_msg_queue(SpiceMainChannel *channel)
         out = g_queue_pop_head(c->agent_msg_queue);
         spice_msg_out_send_internal(out);
     }
+    if (g_queue_is_empty(c->agent_msg_queue) && c->flushing != NULL) {
+        file_xfer_flushed(channel, TRUE);
+    }
 }

 /* any context: the message is not flushed immediately,
@@ -1384,6 +1465,203 @@ static void main_handle_agent_disconnected(SpiceChannel *channel, SpiceMsgIn *in
     agent_stopped(SPICE_MAIN_CHANNEL(channel));
 }

+static gint file_xfer_task_find(gconstpointer a, gconstpointer b)
+{
+    SpiceFileXferTask *task = (SpiceFileXferTask *)a;
+    uint32_t id = *(uint32_t *)b;
+
+    if (task->id == id) {
+        return 0;
+    }
+
+    return 1;
+}
+
+static void file_read_cb(GObject *source_object,
+                         GAsyncResult *res,
+                         gpointer user_data);
+
+static gboolean report_progress(gpointer user_data)
+{
+    SpiceFileXferTask *task = user_data;
+    SpiceMainChannelPrivate *c = task->channel->priv;
+
+    if (task->progress_callback) {
+        uint64_t all_read_bytes = 0, all_bytes = 0;
+        GList *it;
+        for (it = g_list_first(c->file_xfer_task_list);
+             it != NULL; it = g_list_next(it)) {
+            SpiceFileXferTask *t;
+            t = it->data;
+            /* Calculate all remain bytes through group id, NB: we dont
+             * consider the task that has been finished */
+            if (t->group_id == task->id) {
+                all_read_bytes += t->read_bytes;
+                all_bytes += t->file_size;
+            }
+        }
+        task->progress_callback(all_read_bytes, all_bytes,
+                                task->progress_callback_data);
+    }
+
+    return FALSE;
+}
+
+static void data_flushed_cb(GObject *source_object,
+                            GAsyncResult *res,
+                            gpointer user_data)
+{
+    SpiceFileXferTask *task = user_data;
+    SpiceMainChannel *channel = (SpiceMainChannel *)source_object;
+    GError *error = NULL;
+
+    file_xfer_flush_finish(channel, res, &error);

Even if the function currently doesn't report error, you should treat error case at least with a g_warning().
 
+    /* Report progress */
+    report_progress(task);
+
+    /* Read more data */
+    g_input_stream_read_async(G_INPUT_STREAM(task->file_stream),
+                              task->buffer,
+                              FILE_XFER_CHUNK_SIZE,
+                              G_PRIORITY_DEFAULT,
+                              task->cancellable,
+                              file_read_cb,
+                              task);
+}
+
+static void
+file_xfer_queue(SpiceFileXferTask *task, int data_size)
+{
+    VDAgentFileXferDataMessage *msg;
+    SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(task->channel);
+
+    msg = g_alloca(sizeof(VDAgentFileXferDataMessage));
+    msg->id = task->id;
+    msg->size = data_size;
+    agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA, msg,
+                         sizeof(VDAgentFileXferDataMessage), task->buffer,
+                         data_size, NULL);
+    spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE);
+}
+
+static void report_finish(SpiceFileXferTask *task)
+{
+    if (task->callback) {
+        GSimpleAsyncResult *res;
+        res = g_simple_async_result_new(G_OBJECT(task->file), task->callback,
+                                        task->user_data, report_finish);
+        g_simple_async_result_set_op_res_gboolean(res, TRUE);
+        g_simple_async_result_complete_in_idle(res);
+        g_object_unref(res);
+    }
+}
+
+/* main context */
+static void
+file_close_cb(GObject      *object,
+              GAsyncResult *res,
+              gpointer      user_data)
+{
+    SpiceFileXferTask *task = user_data;
+    GInputStream *stream = G_INPUT_STREAM(object);
+    SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(task->channel)->priv;
+    GError *error = NULL;
+
+    g_input_stream_close_finish(stream, res, &error);
+    if (error) {
+        SPICE_DEBUG("close file error: %s", error->message);
+        g_clear_error(&error);
+    }
+
+    c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list, task);
+
+    /* If all tasks have been finished, notify to user */
+    if (g_list_length(c->file_xfer_task_list) == 0) {
+        report_finish(task);
+    }
+    g_object_unref(task->file);
+    g_object_unref(task->file_stream);
+    g_free(task);
+}
+
+/* main context */
+static void file_read_cb(GObject *source_object,
+                         GAsyncResult *res,
+                         gpointer user_data)
+{
+    SpiceFileXferTask *task = user_data;
+    SpiceMainChannel *channel = task->channel;
+    gssize count;
+    GError *error = NULL;
+
+    count = g_input_stream_read_finish(G_INPUT_STREAM(task->file_stream),
+                                       res, &error);
+    if (count > 0) {
+        task->read_bytes += count;
+        file_xfer_queue(task, count);
+        file_xfer_flush_async(channel, task->cancellable,
+                              data_flushed_cb, task);
+    } else {
+        g_input_stream_close_async(G_INPUT_STREAM(task->file_stream),
+                                   G_PRIORITY_DEFAULT,
+                                   task->cancellable,
+                                   file_close_cb,
+                                   task);

You should report error to caller (probably with g_simple_async_report_gerror_in_idle)
 
There will be some issues with the fact that there are multiple outstanding async in the background. The common pattern is to _always_ complete one async() call  with one result (succesful or error). There shouldn't be "lost" async, or you may "block" some client execution path.

+    }
+}
+
+/* coroutine context */
+static void file_xfer_send_data_msg(SpiceMainChannel *channel, uint32_t id)
+{
+    SpiceMainChannelPrivate *c = channel->priv;
+    GList *l;
+    SpiceFileXferTask *task;
+
+    l = g_list_find_custom(c->file_xfer_task_list, &id,
+                           file_xfer_task_find);
+
+    g_return_if_fail(l != NULL);
+
+    task = l->data;
+    g_input_stream_read_async(G_INPUT_STREAM(task->file_stream),
+                              task->buffer,
+                              FILE_XFER_CHUNK_SIZE,
+                              G_PRIORITY_DEFAULT,
+                              task->cancellable,
+                              file_read_cb,
+                              task);
+}
+
+/* coroutine context */
+static void file_xfer_handle_status(SpiceMainChannel *channel,
+                                    VDAgentFileXferStatusMessage *msg)
+{
+    SPICE_DEBUG("task %d received response %d", msg->id, msg->result);

 You could lookup the task only once here.

+
+    if (msg->result == VD_AGENT_FILE_XFER_STATUS_CAN_SEND_DATA) {
+        file_xfer_send_data_msg(channel, msg->id);

and can call g_input_stream_read_async() directly here, or perhaps move the read_async() in a seperate function task_continue_read() which will be called also from data_flushed_cb()
 
+    } else {

Please check precisely the other msg->result values, and do a g_warn_if_reached() for unknown values.

+        /* Error, remove this task */
+        SpiceMainChannelPrivate *c = channel->priv;
+        GList *l;
+        SpiceFileXferTask *task;
+
+        l = g_list_find_custom(c->file_xfer_task_list, &msg->id,
+                               file_xfer_task_find);
+        g_return_if_fail(l != NULL);
+
+        task = l->data;
+        SPICE_DEBUG("user removed task %d, result: %d", msg->id,
+                    msg->result);
 
You should report error to caller (probably with g_simple_async_report_gerror_in_idle)


+        c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list,
+                                               task);
+        g_object_unref(task->file);
+        g_object_unref(task->file_stream);
+        g_free(task); 

Those last 4 lines could probably be in a seperate function file_xfer_task_free ()

+    }
+}
+
 /* coroutine context */
 static void main_agent_handle_msg(SpiceChannel *channel,
                                   VDAgentMessage *msg, gpointer payload)
@@ -1487,6 +1765,9 @@ static void main_agent_handle_msg(SpiceChannel *channel,
                     reply->error == VD_AGENT_SUCCESS ? "success" : "error");
         break;
     }
+    case VD_AGENT_FILE_XFER_STATUS:
+        file_xfer_handle_status(SPICE_MAIN_CHANNEL(channel), payload);
+        break;
     default:
         g_warning("unhandled agent message type: %u (%s), size %u",
                   msg->type, NAME(agent_msg_types, msg->type), msg->size);
@@ -1563,6 +1844,7 @@ static void main_handle_agent_token(SpiceChannel *channel, SpiceMsgIn *in)
     SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(channel)->priv;

     c->agent_tokens += tokens->num_tokens;
+
     agent_send_msg_queue(SPICE_MAIN_CHANNEL(channel));
 }

@@ -2246,3 +2528,197 @@ void spice_main_set_display_enabled(SpiceMainChannel *channel, int id, gboolean
         c->display[id].enabled = enabled;
     }
 }
+
+static void
+file_info_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
+{
+    GFileInfo *info;
+    GFile *file = G_FILE(obj);
+    GError *error = NULL;
+    GKeyFile *keyfile = NULL;
+    gchar *basename = NULL;
+    VDAgentFileXferStartMessage *msg;
+    gsize msg_size, data_len;
+    gchar *string;
+    SpiceFileXferTask *task = (SpiceFileXferTask *)data;
+    SpiceMainChannelPrivate *c = task->channel->priv;
+
+    info = g_file_query_info_finish(file, res, &error);
+    if (error) {
+        SPICE_DEBUG("couldn't get size of file %s: %s",
+                    g_file_get_path(file),
+                    error->message);
+        goto failed;
+    }
+    task->file_size = g_file_info_get_attribute_uint64(info,
+                                        G_FILE_ATTRIBUTE_STANDARD_SIZE);
+
+    keyfile = g_key_file_new();
+    if (keyfile == NULL) {
+        SPICE_DEBUG("failed to create key file: %s", error->message);
+        goto failed;
+    }
+
+    /* File name */
+    basename = g_file_get_basename(file);
+    if (basename == NULL) {
+        SPICE_DEBUG("failed to get file basename: %s", error->message);
+        goto failed;
+    }
+    g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename);
+    g_free(basename);
+
+    /* File size */
+    g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size",
+                          task->file_size);
+
+    /* Save keyfile content to memory. TODO: more file attributions
+       need to be sent to guest */
+    string = g_key_file_to_data(keyfile, &data_len, &error);
+    g_key_file_free(keyfile);
+    if (error) {
+        goto failed;
+    }
+
+    /* Create file-xfer start message */
+    msg_size = sizeof(VDAgentFileXferStartMessage) + data_len + 1;
+    msg = g_malloc0(msg_size);

This allocation and copy seems unnecessary, can you use "string" directly?

+    msg->id = task->id;
+    memcpy(msg->data, string, data_len + 1);
+    g_free(string);
+
+    CHANNEL_DEBUG(task->channel, "Insert a xfer task:%d to task list",
+                  task->id);
+    c->file_xfer_task_list = g_list_append(c->file_xfer_task_list, task);
+
+    agent_msg_queue(task->channel, VD_AGENT_FILE_XFER_START, msg_size, msg);
+    g_free(msg);
+    spice_channel_wakeup(SPICE_CHANNEL(task->channel), FALSE);
+    return ;
+
+failed:
+    g_clear_error(&error);
+    g_object_unref(task->file);
+    g_object_unref(task->file_stream);
+    g_free(task);
+}
+
+static void
+read_async_cb(GObject *obj, GAsyncResult *res, gpointer data)
+{
+    GFile *file = G_FILE(obj);
+    SpiceFileXferTask *task = (SpiceFileXferTask *)data;
+    GError *error = NULL;
+
+    task->file_stream = g_file_read_finish(file, res, &error);
+
+    if (task->file_stream) {
+        g_file_query_info_async(task->file,
+                                G_FILE_ATTRIBUTE_STANDARD_SIZE,
+                                G_FILE_QUERY_INFO_NONE,
+                                G_PRIORITY_DEFAULT,
+                                task->cancellable,
+                                file_info_async_cb,
+                                task);
+    } else {
+        SPICE_DEBUG("create file stream for %s error: %s",
+                    g_file_get_path(file), error->message);
+        g_clear_error(&error);
+        g_object_unref(task->file);
+        g_free(task);

You should report error to caller (probably with g_simple_async_report_gerror_in_idle)

+    }
+}
+
+static void
+file_xfer_send_start_msg_async(SpiceMainChannel *channel,
+                               GFile *file,
+                               GFileCopyFlags flags,
+                               GCancellable *cancellable,
+                               GFileProgressCallback progress_callback,
+                               gpointer progress_callback_data,
+                               GAsyncReadyCallback callback,
+                               gpointer user_data,
+                               uint32_t group_id)
+{
+    SpiceFileXferTask *task;
+    static uint32_t xfer_id;    /* Used to identify task id */
+
+    xfer_id = (xfer_id > UINT32_MAX) ? 0 : xfer_id;
+
+    task = spice_malloc0(sizeof(SpiceFileXferTask));
+    task->id = ++xfer_id;
+    task->group_id = group_id;
+    task->channel = channel;
+    task->file = g_object_ref(file);
+    task->flags = flags;
+    task->cancellable = cancellable;
+    task->progress_callback = progress_callback;
+    task->progress_callback_data = progress_callback_data;
+    task->callback = callback;
+    task->user_data = user_data;
+
+    g_file_read_async(file,
+                      G_PRIORITY_DEFAULT,
+                      cancellable,
+                      read_async_cb,
+                      task);
+
+}
+
+/**
+ * spice_main_file_copy_async:
+ * @sources: #GFile to be transfer
+ * @flags: set of #GFileCopyFlags
+ * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
+ * @progress_callback: (allow-none) (scope call): function to callback with
+ *     progress information, or %NULL if progress information is not needed
+ * @progress_callback_data: (closure): user data to pass to @progress_callback
+ * @error: #GError to set on error, or %NULL
+ *
+ * Copies the file @sources to guest
+ *
+ * If @cancellable is not %NULL, then the operation can be cancelled by
+ * triggering the cancellable object from another thread. If the operation
+ * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
+ *
+ * If @progress_callback is not %NULL, then the operation can be monitored by
+ * setting this to a #GFileProgressCallback function. @progress_callback_data
+ * will be passed to this function. It is guaranteed that this callback will
+ * be called after all data has been transferred with the total number of bytes
+ * copied during the operation.
+ *
+ * When the operation is finished, callback will be called.
+ *
+ **/
+void spice_main_file_copy_async(SpiceMainChannel *channel,
+                                GFile **sources,
+                                GFileCopyFlags flags,
+                                GCancellable *cancellable,
+                                GFileProgressCallback progress_callback,
+                                gpointer progress_callback_data,
+                                GAsyncReadyCallback callback,
+                                gpointer user_data)
+{
+    int i = 0;
+    static uint32_t xfer_group_id;
+
+    g_return_if_fail(channel != NULL);
+    g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel));
+    g_return_if_fail(sources != NULL);
+
+    xfer_group_id++;
+    xfer_group_id = (xfer_group_id > UINT32_MAX) ? 0 : xfer_group_id;
+    while (sources[i]) {
+        /* All tasks created from below function have same group id */

I am worried by the server side handling of sharing the same group id for several requests. But I am okay with this communication pattern that can be later improved if needed.
 
+        file_xfer_send_start_msg_async(channel,
+                                       sources[i],
+                                       flags,
+                                       cancellable,
+                                       progress_callback,
+                                       progress_callback_data,
+                                       callback,
+                                       user_data,
+                                       xfer_group_id);
+        i++;
+    }
+}
diff --git a/gtk/channel-main.h b/gtk/channel-main.h
index 1a5ab54..d00490f 100644
--- a/gtk/channel-main.h
+++ b/gtk/channel-main.h
@@ -78,6 +78,14 @@ void spice_main_clipboard_selection_notify(SpiceMainChannel *channel, guint sele
 void spice_main_clipboard_selection_request(SpiceMainChannel *channel, guint selection, guint32 type);

 gboolean spice_main_agent_test_capability(SpiceMainChannel *channel, guint32 cap);
+void spice_main_file_copy_async(SpiceMainChannel *channel,
+                                GFile **sources,
+                                GFileCopyFlags flags,
+                                GCancellable *cancellable,
+                                GFileProgressCallback progress_callback,
+                                gpointer progress_callback_data,
+                                GAsyncReadyCallback callback,
+                                gpointer user_data);

 #ifndef SPICE_DISABLE_DEPRECATED
 SPICE_DEPRECATED_FOR(spice_main_clipboard_selection_grab)
diff --git a/gtk/map-file b/gtk/map-file
index 516764c..9988e7d 100644
--- a/gtk/map-file
+++ b/gtk/map-file
@@ -55,6 +55,7 @@ spice_inputs_motion;
 spice_inputs_position;
 spice_inputs_set_key_locks;
 spice_main_agent_test_capability;
+spice_main_file_copy_async;
 spice_main_channel_get_type;
 spice_main_clipboard_grab;
 spice_main_clipboard_notify;
diff --git a/gtk/spice-glib-sym-file b/gtk/spice-glib-sym-file
index 641ff4d..81fedd5 100644
--- a/gtk/spice-glib-sym-file
+++ b/gtk/spice-glib-sym-file
@@ -31,6 +31,7 @@ spice_inputs_motion
 spice_inputs_position
 spice_inputs_set_key_locks
 spice_main_agent_test_capability
+spice_main_file_copy_async
 spice_main_channel_get_type
 spice_main_clipboard_grab
 spice_main_clipboard_notify
--
1.8.0

_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
http://lists.freedesktop.org/mailman/listinfo/spice-devel

 Thanks a lot for your work!

--
Marc-André Lureau
_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
http://lists.freedesktop.org/mailman/listinfo/spice-devel

[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]