Introduced functions (private): * void spice_file_transfer_task_read_async() * gssize spice_file_transfer_task_read_finish() For a better abstraction of how to read from SpiceFileTransferTask and handle its data, following the design of other objects like GFile and GInputStream. Due to the logic changes involved, some functions were created or renamed to better address or match its place and purpose: * spice_file_transfer_task_read_stream_cb Callback for the actual read from GInpustStream; This is handling the SpiceFileTransferTask bits only; * file_xfer_read_cb -> file_xfer_read_async_cb Renamed to match _read_async() function; This is handling the data from reading the file by flushing it to the agent. As the _read_async() uses GTask, the error handling is done on the channel-main's callback, after _read_finish() is called. This change is related to split SpiceFileTransferTask from channel-main. Acked-by: Jonathon Jongsma <jjongsma@xxxxxxxxxx> --- src/channel-main.c | 165 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 122 insertions(+), 43 deletions(-) diff --git a/src/channel-main.c b/src/channel-main.c index d721852..244b19e 100644 --- a/src/channel-main.c +++ b/src/channel-main.c @@ -71,6 +71,13 @@ static void spice_file_transfer_task_init_task_async(SpiceFileTransferTask *self static GFileInfo *spice_file_transfer_task_init_task_finish(SpiceFileTransferTask *xfer_task, GAsyncResult *result, GError **error); +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, + GAsyncReadyCallback callback, + gpointer userdata); +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask *self, + GAsyncResult *result, + char **buffer, + GError **error); /** * SECTION:file-transfer-task @@ -246,9 +253,11 @@ static void migrate_channel_event_cb(SpiceChannel *channel, SpiceChannelEvent ev gpointer data); static gboolean main_migrate_handshake_done(gpointer data); static void spice_main_channel_send_migration_handshake(SpiceChannel *channel); -static void file_xfer_continue_read(SpiceFileTransferTask *task); static void spice_file_transfer_task_completed(SpiceFileTransferTask *self, GError *error); static void file_xfer_flushed(SpiceMainChannel *channel, gboolean success); +static void file_xfer_read_async_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data); static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max); static void set_agent_connected(SpiceMainChannel *channel, gboolean connected); @@ -1882,7 +1891,6 @@ static void file_xfer_data_flushed_cb(GObject *source_object, SpiceMainChannel *channel = (SpiceMainChannel *)source_object; GError *error = NULL; - self->pending = FALSE; file_xfer_flush_finish(channel, res, &error); if (error || self->error) { spice_file_transfer_task_completed(self, error); @@ -1923,7 +1931,7 @@ static void file_xfer_data_flushed_cb(GObject *source_object, } /* Read more data */ - file_xfer_continue_read(self); + spice_file_transfer_task_read_async(self, file_xfer_read_async_cb, NULL); } static void file_xfer_queue(SpiceFileTransferTask *self, int data_size) @@ -1943,54 +1951,35 @@ static void file_xfer_queue(SpiceFileTransferTask *self, int data_size) } /* main context */ -static void file_xfer_read_cb(GObject *source_object, - GAsyncResult *res, - gpointer user_data) +static void file_xfer_read_async_cb(GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - SpiceFileTransferTask *self = user_data; - SpiceMainChannel *channel = self->channel; + SpiceFileTransferTask *xfer_task; + SpiceMainChannel *channel; gssize count; + char *buffer; + GCancellable *cancellable; GError *error = NULL; - self->pending = FALSE; - count = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), - res, &error); - /* Check for pending earlier errors */ - if (self->error) { - spice_file_transfer_task_completed(self, error); + xfer_task = SPICE_FILE_TRANSFER_TASK(source_object); + + channel = spice_file_transfer_task_get_channel(xfer_task); + count = spice_file_transfer_task_read_finish(xfer_task, res, &buffer, &error); + if (count < 0) { + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE); + spice_file_transfer_task_completed(xfer_task, error); return; } - if (count > 0 || self->file_size == 0) { - GCancellable *cancellable; - - self->read_bytes += count; - g_object_notify(G_OBJECT(self), "progress"); - file_xfer_queue(self, count); - if (count == 0) - return; - cancellable = spice_file_transfer_task_get_cancellable(self); - file_xfer_flush_async(channel, cancellable, - file_xfer_data_flushed_cb, self); - self->pending = TRUE; - } else if (error) { - spice_channel_wakeup(SPICE_CHANNEL(self->channel), FALSE); - spice_file_transfer_task_completed(self, error); + file_xfer_queue(xfer_task, count); + if (count == 0) { + /* on EOF just wait for VD_AGENT_FILE_XFER_STATUS from agent */ + return; } - /* else EOF, do nothing (wait for VD_AGENT_FILE_XFER_STATUS from agent) */ -} -/* coroutine context */ -static void file_xfer_continue_read(SpiceFileTransferTask *self) -{ - g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), - self->buffer, - FILE_XFER_CHUNK_SIZE, - G_PRIORITY_DEFAULT, - self->cancellable, - file_xfer_read_cb, - self); - self->pending = TRUE; + cancellable = spice_file_transfer_task_get_cancellable(xfer_task); + file_xfer_flush_async(channel, cancellable, file_xfer_data_flushed_cb, xfer_task); } /* coroutine context */ @@ -2009,7 +1998,7 @@ static void spice_file_transfer_task_handle_status(SpiceFileTransferTask *task, "transfer received CAN_SEND_DATA in pending state"); break; } - file_xfer_continue_read(task); + spice_file_transfer_task_read_async(task, file_xfer_read_async_cb, NULL); return; case VD_AGENT_FILE_XFER_STATUS_CANCELLED: error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, @@ -3370,6 +3359,96 @@ static GFileInfo *spice_file_transfer_task_init_task_finish(SpiceFileTransferTas return g_task_propagate_pointer(task, error); } +static void spice_file_transfer_task_read_stream_cb(GObject *source_object, + GAsyncResult *res, + gpointer userdata) +{ + SpiceFileTransferTask *self; + GTask *task; + gssize nbytes; + GError *error = NULL; + + task = G_TASK(userdata); + self = g_task_get_source_object(task); + + g_return_if_fail(self->pending == TRUE); + self->pending = FALSE; + + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), res, &error); + if (self->error) { + /* On any pending error on SpiceFileTransferTask */ + g_task_return_error(task, self->error); + return; + } else if (error) { + g_task_return_error(task, error); + return; + } + + self->read_bytes += nbytes; + + g_task_return_int(task, nbytes); +} + +/* Any context */ +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, + GAsyncReadyCallback callback, + gpointer userdata) +{ + GTask *task; + + g_return_if_fail(self != NULL); + if (self->pending) { + g_task_report_new_error(self, callback, userdata, + spice_file_transfer_task_read_async, + SPICE_CLIENT_ERROR, + SPICE_CLIENT_ERROR_FAILED, + "Cannot read data in pending state"); + return; + } + + /* Notify the progress prior the read to make the info be related to the + * data that was already sent. To notify the 100% (completed), channel-main + * should call read-async when it expects EOF. */ + g_object_notify(G_OBJECT(self), "progress"); + + task = g_task_new(self, self->cancellable, callback, userdata); + + if (self->read_bytes == self->file_size) { + /* channel-main might can request data after reading the whole file as + * it expects EOF. Let's return immediately its request as we don't want + * to reach a state where agent says file-transfer SUCCEED but we are in + * a PENDING state in SpiceFileTransferTask due reading in idle */ + g_task_return_int(task, 0); + return; + } + + self->pending = TRUE; + g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), + self->buffer, + FILE_XFER_CHUNK_SIZE, + G_PRIORITY_DEFAULT, + self->cancellable, + spice_file_transfer_task_read_stream_cb, + task); +} + +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask *self, + GAsyncResult *result, + char **buffer, + GError **error) +{ + gssize nbytes; + GTask *task = G_TASK(result); + + g_return_val_if_fail(self != NULL, -1); + + nbytes = g_task_propagate_int(task, error); + if (nbytes >= 0 && buffer != NULL) + *buffer = self->buffer; + + return nbytes; +} + static void spice_file_transfer_task_get_property(GObject *object, guint property_id, -- 2.7.4 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel