Introduced functions (private): * void spice_file_transfer_task_read_async() * gssize spice_file_transfer_task_read_finish() For a better abstraction of how to read from SpiceFileTransferTask and handle with its data, following the design of others objects like GFile and GInputStream. Due to the logic changed involved, some functions were created or renamed to better address or match its place and purpose: * spice_file_transfer_task_read_stream_cb Callback for the actual read from GInpustStream; This is handling the SpiceFileTransferTask bits only; * file_xfer_read_cb -> file_xfer_read_async_cb Renamed to match _read_async() function; This is handling the data from reading the file by flushing it to the agent. As the _read_async() uses GTask, the error handling is done on the channel-main's callback, after _read_finish() is called. This change is related to split SpiceFileTransferTask from channel-main. --- src/channel-main.c | 163 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 119 insertions(+), 44 deletions(-) diff --git a/src/channel-main.c b/src/channel-main.c index ceca7e3..9787613 100644 --- a/src/channel-main.c +++ b/src/channel-main.c @@ -71,6 +71,13 @@ static void spice_file_transfer_task_init_task_async(SpiceFileTransferTask *self static GFileInfo *spice_file_transfer_task_init_task_finish(SpiceFileTransferTask *xfer_task, GAsyncResult *result, GError **error); +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, + GAsyncReadyCallback callback, + gpointer userdata); +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask *self, + GAsyncResult *result, + char **buffer, + GError **error); /** * SECTION:file-transfer-task @@ -247,9 +254,11 @@ static void migrate_channel_event_cb(SpiceChannel *channel, SpiceChannelEvent ev gpointer data); static gboolean main_migrate_handshake_done(gpointer data); static void spice_main_channel_send_migration_handshake(SpiceChannel *channel); -static void file_xfer_continue_read(SpiceFileTransferTask *task); static void spice_file_transfer_task_completed(SpiceFileTransferTask *self, GError *error); static void file_xfer_flushed(SpiceMainChannel *channel, gboolean success); +static void file_xfer_read_async_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data); static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max); static void set_agent_connected(SpiceMainChannel *channel, gboolean connected); @@ -1883,7 +1892,6 @@ static void file_xfer_data_flushed_cb(GObject *source_object, SpiceMainChannel *channel = (SpiceMainChannel *)source_object; GError *error = NULL; - self->pending = FALSE; file_xfer_flush_finish(channel, res, &error); if (error || self->error) { spice_file_transfer_task_completed(self, error); @@ -1924,7 +1932,7 @@ static void file_xfer_data_flushed_cb(GObject *source_object, } /* Read more data */ - file_xfer_continue_read(self); + spice_file_transfer_task_read_async(self, file_xfer_read_async_cb, NULL); } static void file_xfer_queue(SpiceFileTransferTask *self, int data_size) @@ -1944,54 +1952,34 @@ static void file_xfer_queue(SpiceFileTransferTask *self, int data_size) } /* main context */ -static void file_xfer_read_cb(GObject *source_object, - GAsyncResult *res, - gpointer user_data) +static void file_xfer_read_async_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - SpiceFileTransferTask *self = user_data; - SpiceMainChannel *channel = self->channel; + SpiceFileTransferTask *xfer_task; + SpiceMainChannel *channel; gssize count; + char *buffer; + GCancellable *cancellable; GError *error = NULL; - self->pending = FALSE; - count = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), - res, &error); - /* Check for pending earlier errors */ - if (self->error) { - spice_file_transfer_task_completed(self, error); + xfer_task = SPICE_FILE_TRANSFER_TASK(source_object); + + channel = spice_file_transfer_task_get_channel(xfer_task); + count = spice_file_transfer_task_read_finish(xfer_task, res, &buffer, &error); + if (count < 0) { + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE); + spice_file_transfer_task_completed(xfer_task, error); return; } - if (count > 0 || self->file_size == 0) { - GCancellable *cancellable; - - self->read_bytes += count; - g_object_notify(G_OBJECT(self), "progress"); - file_xfer_queue(self, count); - if (count == 0) - return; - cancellable = spice_file_transfer_task_get_cancellable(self); - file_xfer_flush_async(channel, cancellable, - file_xfer_data_flushed_cb, self); - self->pending = TRUE; - } else if (error) { - spice_channel_wakeup(SPICE_CHANNEL(self->channel), FALSE); - spice_file_transfer_task_completed(self, error); - } - /* else EOF, do nothing (wait for VD_AGENT_FILE_XFER_STATUS from agent) */ -} + file_xfer_queue(xfer_task, count); + if (count == 0) + /* on EOF just wait for VD_AGENT_FILE_XFER_STATUS from agent */ + return; -/* coroutine context */ -static void file_xfer_continue_read(SpiceFileTransferTask *self) -{ - g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), - self->buffer, - FILE_XFER_CHUNK_SIZE, - G_PRIORITY_DEFAULT, - self->cancellable, - file_xfer_read_cb, - self); - self->pending = TRUE; + cancellable = spice_file_transfer_task_get_cancellable(xfer_task); + file_xfer_flush_async(channel, cancellable, file_xfer_data_flushed_cb, xfer_task); } /* coroutine context */ @@ -2010,7 +1998,7 @@ static void spice_file_transfer_task_handle_status(SpiceFileTransferTask *task, "transfer received CAN_SEND_DATA in pending state"); break; } - file_xfer_continue_read(task); + spice_file_transfer_task_read_async(task, file_xfer_read_async_cb, NULL); return; case VD_AGENT_FILE_XFER_STATUS_CANCELLED: error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, @@ -3367,6 +3355,93 @@ static GFileInfo *spice_file_transfer_task_init_task_finish(SpiceFileTransferTas return NULL; } +static void spice_file_transfer_task_read_stream_cb(GObject *source_object, + GAsyncResult *res, + gpointer userdata) +{ + SpiceFileTransferTask *self; + GTask *task; + gssize nbytes; + GError *error = NULL; + + task = G_TASK(userdata); + self = g_task_get_source_object(task); + + g_return_if_fail(self->pending == TRUE); + self->pending = FALSE; + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), res, &error); + if (error || self->error) { + error = (error == NULL) ? self->error : error; + g_task_return_error(task, error); + return; + } + + /* The progress here means the amount of data we have _read_ and not what + * was actually sent to the agent. On the next "progress", the previous data + * read was sent. This means that when user see 100%, we are sending the + * last chunk to the guest */ + self->read_bytes += nbytes; + g_object_notify(G_OBJECT(self), "progress"); + + g_task_return_int(task, nbytes); +} + +/* Any context */ +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, + GAsyncReadyCallback callback, + gpointer userdata) +{ + GTask *task; + + g_return_if_fail(self != NULL); + if (self->pending) { + g_task_report_new_error(self, callback, userdata, + spice_file_transfer_task_read_async, + SPICE_CLIENT_ERROR, + SPICE_CLIENT_ERROR_FAILED, + "Cannot read data in pending state"); + return; + } + + task = g_task_new(self, self->cancellable, callback, userdata); + + if (self->read_bytes == self->file_size) { + /* channel-main might can request data after reading the whole file as + * it expects EOF. Let's return immediately its request as we don't want + * to reach a state where agent says file-transfer SUCCEED but we are in + * a PENDING state in SpiceFileTransferTask due reading in idle */ + g_task_return_int(task, 0); + return; + } + + self->pending = TRUE; + g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), + self->buffer, + FILE_XFER_CHUNK_SIZE, + G_PRIORITY_DEFAULT, + self->cancellable, + spice_file_transfer_task_read_stream_cb, + task); +} + +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask *self, + GAsyncResult *result, + char **buffer, + GError **error) +{ + gssize nbytes; + GTask *task = G_TASK(result); + + g_return_val_if_fail(self != NULL, -1); + + nbytes = g_task_propagate_int(task, error); + if (nbytes >= 0 && buffer != NULL) + *buffer = self->buffer; + + return nbytes; +} + static void spice_file_transfer_task_get_property(GObject *object, guint property_id, -- 2.7.4 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel