From: Dunrong Huang <riegamaths@xxxxxxxxx> This patch is aimed to handle various file xfer messages. How it works: 0) our main channel introduces a API spice_main_file_copy_async(). 1) When user drags a file and drop to spice client, spice client will catch a signal "drag-data-received", then it should call spice_main_file_copy_async() for transfering file to guest. 2) In main channel: when spice_main_file_copy_async() get called with file list passed, the API will send a start message which includes file and other needed information for each file. Then it will create a new xfer task and insert task list for each file, and return to caller. 3) According to the response message sent from guest, our main channel decides whether send more data, or cancel this xfer task. 4) When file transfer has finished, file xfer task will be removed from task list. Signed-off-by: Dunrong Huang <riegamaths@xxxxxxxxx> Signed-off-by: mathslinux <riegamaths@xxxxxxxxx> --- V5 -> V6: * Fix spicy hangs after doing a file copy gtk/channel-main.c | 515 ++++++++++++++++++++++++++++++++++++++++++++++++ gtk/channel-main.h | 12 ++ gtk/map-file | 2 + gtk/spice-glib-sym-file | 2 + 4 files changed, 531 insertions(+) diff --git a/gtk/channel-main.c b/gtk/channel-main.c index 6b9ba8d..a3211e1 100644 --- a/gtk/channel-main.c +++ b/gtk/channel-main.c @@ -18,6 +18,7 @@ #include <math.h> #include <spice/vd_agent.h> #include <common/rect.h> +#include <glib/gstdio.h> #include "glib-compat.h" #include "spice-client.h" @@ -51,6 +52,24 @@ typedef struct spice_migrate spice_migrate; +#define FILE_XFER_CHUNK_SIZE (VD_AGENT_MAX_DATA_SIZE * 32) +typedef struct SpiceFileXferTask { + uint32_t id; + GFile *file; + SpiceMainChannel *channel; + GFileInputStream *file_stream; + GFileCopyFlags flags; + GCancellable *cancellable; + GFileProgressCallback progress_callback; + gpointer progress_callback_data; + GAsyncReadyCallback callback; + gpointer user_data; + char buffer[FILE_XFER_CHUNK_SIZE]; + uint64_t read_bytes; + uint64_t file_size; + GError *error; +} SpiceFileXferTask; + struct _SpiceMainChannelPrivate { enum SpiceMouseMode mouse_mode; bool agent_connected; @@ -79,6 +98,8 @@ struct _SpiceMainChannelPrivate { } display[MAX_DISPLAY]; gint timer_id; GQueue *agent_msg_queue; + GList *file_xfer_task_list; + GSList *flushing; guint switch_host_delayed_id; guint migrate_delayed_id; @@ -802,6 +823,64 @@ static void agent_free_msg_queue(SpiceMainChannel *channel) c->agent_msg_queue = NULL; } +/* Here, flushing algorithm is stolen from spice-channel.c */ +static void +file_xfer_flushed(SpiceMainChannel *channel, gboolean success) +{ + SpiceMainChannelPrivate *c = channel->priv; + GSList *l; + + for (l = c->flushing; l != NULL; l = l->next) { + GSimpleAsyncResult *result = G_SIMPLE_ASYNC_RESULT(l->data); + g_simple_async_result_set_op_res_gboolean(result, success); + g_simple_async_result_complete_in_idle(result); + } + + g_slist_free_full(c->flushing, g_object_unref); + c->flushing = NULL; +} + +static void +file_xfer_flush_async(SpiceMainChannel *channel, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GSimpleAsyncResult *simple; + SpiceMainChannelPrivate *c = channel->priv; + gboolean was_empty; + + simple = g_simple_async_result_new(G_OBJECT(channel), callback, user_data, + file_xfer_flush_async); + + was_empty = g_queue_is_empty(c->agent_msg_queue); + if (was_empty) { + g_simple_async_result_set_op_res_gboolean(simple, TRUE); + g_simple_async_result_complete_in_idle(simple); + g_object_unref(simple); + return; + } + + c->flushing = g_slist_append(c->flushing, simple); +} + +static gboolean +file_xfer_flush_finish(SpiceMainChannel *channel, GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + simple = (GSimpleAsyncResult *)result; + + if (g_simple_async_result_propagate_error(simple, error)) { + return FALSE; + } + + g_return_val_if_fail(g_simple_async_result_is_valid(result, + G_OBJECT(channel), file_xfer_flush_async), FALSE); + + CHANNEL_DEBUG(channel, "flushed finished!"); + return g_simple_async_result_get_op_res_gboolean(simple); +} + /* coroutine context */ static void agent_send_msg_queue(SpiceMainChannel *channel) { @@ -814,6 +893,9 @@ static void agent_send_msg_queue(SpiceMainChannel *channel) out = g_queue_pop_head(c->agent_msg_queue); spice_msg_out_send_internal(out); } + if (g_queue_is_empty(c->agent_msg_queue) && c->flushing != NULL) { + file_xfer_flushed(channel, TRUE); + } } /* any context: the message is not flushed immediately, @@ -1384,6 +1466,219 @@ static void main_handle_agent_disconnected(SpiceChannel *channel, SpiceMsgIn *in agent_stopped(SPICE_MAIN_CHANNEL(channel)); } +static gint file_xfer_task_find(gconstpointer a, gconstpointer b) +{ + SpiceFileXferTask *task = (SpiceFileXferTask *)a; + uint32_t id = *(uint32_t *)b; + + if (task->id == id) { + return 0; + } + + return 1; +} + +static void file_read_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data); +static void file_xfer_continue_read(SpiceFileXferTask *task); + +static void report_progress(SpiceFileXferTask *task) +{ + if (task->progress_callback) { + task->progress_callback(task->read_bytes, task->file_size, + task->progress_callback_data); + } +} + +static void +file_close_cb(GObject *object, + GAsyncResult *close_res, + gpointer user_data); + +static void data_flushed_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + SpiceFileXferTask *task = user_data; + SpiceMainChannel *channel = (SpiceMainChannel *)source_object; + GError *error = NULL; + + file_xfer_flush_finish(channel, res, &error); + + if (error != NULL) { + g_warning("failed to flush xfer queue: %s", error->message); + task->error = error; + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream), + G_PRIORITY_DEFAULT, + task->cancellable, + file_close_cb, + task); + return; + } + + /* Report progress */ + report_progress(task); + + /* Read more data */ + file_xfer_continue_read(task); +} + +static void +file_xfer_queue(SpiceFileXferTask *task, int data_size) +{ + VDAgentFileXferDataMessage *msg; + SpiceMainChannel *channel = SPICE_MAIN_CHANNEL(task->channel); + + msg = g_alloca(sizeof(VDAgentFileXferDataMessage)); + msg->id = task->id; + msg->size = data_size; + agent_msg_queue_many(channel, VD_AGENT_FILE_XFER_DATA, msg, + sizeof(VDAgentFileXferDataMessage), task->buffer, + data_size, NULL); + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE); +} + +static void file_xfer_task_free(SpiceFileXferTask *task) +{ + SpiceMainChannelPrivate *c; + + g_return_if_fail(task != NULL); + + c = task->channel->priv; + c->file_xfer_task_list = g_list_remove(c->file_xfer_task_list, + task); + + g_clear_object(&task->file); + g_clear_object(&task->file_stream); + g_free(task); +} + +/* main context */ +static void +file_close_cb(GObject *object, + GAsyncResult *close_res, + gpointer user_data) +{ + GSimpleAsyncResult *res; + SpiceFileXferTask *task; + GInputStream *stream = G_INPUT_STREAM(object); + GError *error = NULL; + + stream = G_INPUT_STREAM(object); + task = user_data; + + g_input_stream_close_finish(stream, close_res, &error); + if (error) { + /* This error dont need to report to user, just print a log */ + SPICE_DEBUG("close file error: %s", error->message); + g_clear_error(&error); + } + + /* Notify to user that files have been transferred or something error + happened. */ + res = g_simple_async_result_new(G_OBJECT(task->channel), + task->callback, + task->user_data, + file_xfer_continue_read); + if (task->error) { + g_simple_async_result_take_error(res, task->error); + g_simple_async_result_set_op_res_gboolean(res, FALSE); + } else { + g_simple_async_result_set_op_res_gboolean(res, TRUE); + } + g_simple_async_result_complete_in_idle(res); + g_object_unref(res); + + file_xfer_task_free(task); +} + +/* main context */ +static void file_read_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + SpiceFileXferTask *task = user_data; + SpiceMainChannel *channel = task->channel; + gssize count; + GError *error = NULL; + + count = g_input_stream_read_finish(G_INPUT_STREAM(task->file_stream), + res, &error); + if (count > 0) { + task->read_bytes += count; + file_xfer_queue(task, count); + file_xfer_flush_async(channel, task->cancellable, + data_flushed_cb, task); + } else { + /* Error or EOF, close the file */ + if (count == -1) { + task->error = error; + } + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream), + G_PRIORITY_DEFAULT, + task->cancellable, + file_close_cb, + task); + } +} + +/* coroutine context */ +static void file_xfer_continue_read(SpiceFileXferTask *task) +{ + g_input_stream_read_async(G_INPUT_STREAM(task->file_stream), + task->buffer, + FILE_XFER_CHUNK_SIZE, + G_PRIORITY_DEFAULT, + task->cancellable, + file_read_cb, + task); +} + +/* coroutine context */ +static void file_xfer_handle_status(SpiceMainChannel *channel, + VDAgentFileXferStatusMessage *msg) +{ + SpiceMainChannelPrivate *c = channel->priv; + GList *l; + SpiceFileXferTask *task; + + l = g_list_find_custom(c->file_xfer_task_list, &msg->id, + file_xfer_task_find); + + g_return_if_fail(l != NULL); + task = l->data; + + SPICE_DEBUG("task %d received response %d", msg->id, msg->result); + + switch (msg->result) { + case VD_AGENT_FILE_XFER_STATUS_CAN_SEND_DATA: + file_xfer_continue_read(task); + return; + case VD_AGENT_FILE_XFER_STATUS_CANCELLED: + SPICE_DEBUG("user removed task %d, result: %d", msg->id, + msg->result); + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, + "transfer is cancelled by spice agent"); + break; + case VD_AGENT_FILE_XFER_STATUS_ERROR: + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, + "some errors occurred in the spice agent"); + break; + default: + g_warn_if_reached(); + task->error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, + "unhandled status type: %u", msg->result); + break; + } + + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream), + G_PRIORITY_DEFAULT, + task->cancellable, + file_close_cb, + task); +} + /* coroutine context */ static void main_agent_handle_msg(SpiceChannel *channel, VDAgentMessage *msg, gpointer payload) @@ -1487,6 +1782,9 @@ static void main_agent_handle_msg(SpiceChannel *channel, reply->error == VD_AGENT_SUCCESS ? "success" : "error"); break; } + case VD_AGENT_FILE_XFER_STATUS: + file_xfer_handle_status(SPICE_MAIN_CHANNEL(channel), payload); + break; default: g_warning("unhandled agent message type: %u (%s), size %u", msg->type, NAME(agent_msg_types, msg->type), msg->size); @@ -1563,6 +1861,7 @@ static void main_handle_agent_token(SpiceChannel *channel, SpiceMsgIn *in) SpiceMainChannelPrivate *c = SPICE_MAIN_CHANNEL(channel)->priv; c->agent_tokens += tokens->num_tokens; + agent_send_msg_queue(SPICE_MAIN_CHANNEL(channel)); } @@ -2246,3 +2545,219 @@ void spice_main_set_display_enabled(SpiceMainChannel *channel, int id, gboolean c->display[id].enabled = enabled; } } + +static void +file_info_async_cb(GObject *obj, GAsyncResult *res, gpointer data) +{ + GFileInfo *info; + GFile *file = G_FILE(obj); + GError *error = NULL; + GKeyFile *keyfile = NULL; + gchar *basename = NULL; + VDAgentFileXferStartMessage *msg; + gsize /*msg_size*/ data_len; + gchar *string; + SpiceFileXferTask *task = (SpiceFileXferTask *)data; + SpiceMainChannelPrivate *c = task->channel->priv; + + info = g_file_query_info_finish(file, res, &error); + if (error) { + SPICE_DEBUG("couldn't get size of file %s: %s", + g_file_get_path(file), + error->message); + goto failed; + } + task->file_size = g_file_info_get_attribute_uint64(info, + G_FILE_ATTRIBUTE_STANDARD_SIZE); + + keyfile = g_key_file_new(); + if (keyfile == NULL) { + SPICE_DEBUG("failed to create key file: %s", error->message); + goto failed; + } + + /* File name */ + basename = g_file_get_basename(file); + if (basename == NULL) { + SPICE_DEBUG("failed to get file basename: %s", error->message); + goto failed; + } + g_key_file_set_string(keyfile, "vdagent-file-xfer", "name", basename); + g_free(basename); + + /* File size */ + g_key_file_set_uint64(keyfile, "vdagent-file-xfer", "size", + task->file_size); + + /* Save keyfile content to memory. TODO: more file attributions + need to be sent to guest */ + string = g_key_file_to_data(keyfile, &data_len, &error); + g_key_file_free(keyfile); + if (error) { + goto failed; + } + + /* Create file-xfer start message */ + msg = g_alloca(sizeof(VDAgentFileXferStartMessage)); + msg->id = task->id; + + CHANNEL_DEBUG(task->channel, "Insert a xfer task:%d to task list", + task->id); + c->file_xfer_task_list = g_list_append(c->file_xfer_task_list, task); + + agent_msg_queue_many(task->channel, VD_AGENT_FILE_XFER_START, msg, + sizeof(VDAgentFileXferStartMessage), string, + data_len + 1, NULL); + g_free(string); + spice_channel_wakeup(SPICE_CHANNEL(task->channel), FALSE); + return ; + +failed: + g_clear_error(&error); + file_xfer_task_free(task); +} + +static void +read_async_cb(GObject *obj, GAsyncResult *res, gpointer data) +{ + GFile *file = G_FILE(obj); + SpiceFileXferTask *task = (SpiceFileXferTask *)data; + GError *error = NULL; + + task->file_stream = g_file_read_finish(file, res, &error); + + if (task->file_stream) { + g_file_query_info_async(task->file, + G_FILE_ATTRIBUTE_STANDARD_SIZE, + G_FILE_QUERY_INFO_NONE, + G_PRIORITY_DEFAULT, + task->cancellable, + file_info_async_cb, + task); + } else { + SPICE_DEBUG("create file stream for %s error: %s", + g_file_get_path(file), error->message); + task->error = error; + g_input_stream_close_async(G_INPUT_STREAM(task->file_stream), + G_PRIORITY_DEFAULT, + task->cancellable, + file_close_cb, + task); + } +} + +static void +file_xfer_send_start_msg_async(SpiceMainChannel *channel, + GFile *file, + GFileCopyFlags flags, + GCancellable *cancellable, + GFileProgressCallback progress_callback, + gpointer progress_callback_data, + GAsyncReadyCallback callback, + gpointer user_data) +{ + SpiceFileXferTask *task; + static uint32_t xfer_id; /* Used to identify task id */ + + xfer_id = (xfer_id > UINT32_MAX) ? 0 : xfer_id; + + task = spice_malloc0(sizeof(SpiceFileXferTask)); + task->id = ++xfer_id; + task->channel = channel; + task->file = g_object_ref(file); + task->flags = flags; + task->cancellable = cancellable; + task->progress_callback = progress_callback; + task->progress_callback_data = progress_callback_data; + task->callback = callback; + task->user_data = user_data; + + g_file_read_async(file, + G_PRIORITY_DEFAULT, + cancellable, + read_async_cb, + task); + +} + +/** + * spice_main_file_copy_async: + * @sources: #GFile to be transfer + * @flags: set of #GFileCopyFlags + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore + * @progress_callback: (allow-none) (scope call): function to callback with + * progress information, or %NULL if progress information is not needed + * @progress_callback_data: (closure): user data to pass to @progress_callback + * @callback: a #GAsyncReadyCallback to call when the request is satisfied + * @user_data: the data to pass to callback function + * + * Copies the file @sources to guest + * + * If @cancellable is not %NULL, then the operation can be cancelled by + * triggering the cancellable object from another thread. If the operation + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. + * + * If @progress_callback is not %NULL, then the operation can be monitored by + * setting this to a #GFileProgressCallback function. @progress_callback_data + * will be passed to this function. It is guaranteed that this callback will + * be called after all data has been transferred with the total number of bytes + * copied during the operation. + * + * When the operation is finished, callback will be called. You can then call + * spice_main_file_copy_finish() to get the result of the operation. + * + **/ +void spice_main_file_copy_async(SpiceMainChannel *channel, + GFile **sources, + GFileCopyFlags flags, + GCancellable *cancellable, + GFileProgressCallback progress_callback, + gpointer progress_callback_data, + GAsyncReadyCallback callback, + gpointer user_data) +{ + g_return_if_fail(channel != NULL); + g_return_if_fail(SPICE_IS_MAIN_CHANNEL(channel)); + g_return_if_fail(sources != NULL && sources[0] != NULL); + + /* At the moment, the copy() method is limited to a single file, + support for copying multi-files will be implemented later. */ + g_return_if_fail(sources[1] == NULL); + + file_xfer_send_start_msg_async(channel, + sources[0], + flags, + cancellable, + progress_callback, + progress_callback_data, + callback, + user_data); +} + +/** + * spice_main_file_copy_finish: + * @result: a #GAsyncResult. + * @error: a #GError, or %NULL + * + * Finishes copying the file started with + * spice_main_file_copy_async(). + * + * Returns: a %TRUE on success, %FALSE on error. + **/ +gboolean spice_main_file_copy_finish(SpiceMainChannel *channel, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail(SPICE_IS_MAIN_CHANNEL(channel), FALSE); + g_return_val_if_fail(result != NULL, FALSE); + + simple = (GSimpleAsyncResult *)result; + + if (g_simple_async_result_propagate_error(simple, error)) { + return FALSE; + } + + return g_simple_async_result_get_op_res_gboolean(simple); +} diff --git a/gtk/channel-main.h b/gtk/channel-main.h index 1a5ab54..adba0a2 100644 --- a/gtk/channel-main.h +++ b/gtk/channel-main.h @@ -78,6 +78,18 @@ void spice_main_clipboard_selection_notify(SpiceMainChannel *channel, guint sele void spice_main_clipboard_selection_request(SpiceMainChannel *channel, guint selection, guint32 type); gboolean spice_main_agent_test_capability(SpiceMainChannel *channel, guint32 cap); +void spice_main_file_copy_async(SpiceMainChannel *channel, + GFile **sources, + GFileCopyFlags flags, + GCancellable *cancellable, + GFileProgressCallback progress_callback, + gpointer progress_callback_data, + GAsyncReadyCallback callback, + gpointer user_data); + +gboolean spice_main_file_copy_finish(SpiceMainChannel *channel, + GAsyncResult *result, + GError **error); #ifndef SPICE_DISABLE_DEPRECATED SPICE_DEPRECATED_FOR(spice_main_clipboard_selection_grab) diff --git a/gtk/map-file b/gtk/map-file index 516764c..4d05597 100644 --- a/gtk/map-file +++ b/gtk/map-file @@ -55,6 +55,8 @@ spice_inputs_motion; spice_inputs_position; spice_inputs_set_key_locks; spice_main_agent_test_capability; +spice_main_file_copy_async; +spice_main_file_copy_finish; spice_main_channel_get_type; spice_main_clipboard_grab; spice_main_clipboard_notify; diff --git a/gtk/spice-glib-sym-file b/gtk/spice-glib-sym-file index 641ff4d..28b54af 100644 --- a/gtk/spice-glib-sym-file +++ b/gtk/spice-glib-sym-file @@ -31,6 +31,8 @@ spice_inputs_motion spice_inputs_position spice_inputs_set_key_locks spice_main_agent_test_capability +spice_main_file_copy_async +spice_main_file_copy_finish spice_main_channel_get_type spice_main_clipboard_grab spice_main_clipboard_notify -- 1.7.12.4 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx http://lists.freedesktop.org/mailman/listinfo/spice-devel