Hi, On Wed, Jul 06, 2016 at 09:34:15AM -0500, Jonathon Jongsma wrote: > On Tue, 2016-07-05 at 15:07 +0200, Victor Toso wrote: > > Introduced functions (private): > > * void spice_file_transfer_task_read_async() > > * gssize spice_file_transfer_task_read_finish() > > > > For a better abstraction of how to read from SpiceFileTransferTask and > > handle its data, following the design of other objects like GFile and > > GInputStream. > > > > Due to the logic changes involved, some functions were created or > > renamed to better address or match its place and purpose: > > > > * spice_file_transfer_task_read_stream_cb > > Callback for the actual read from GInpustStream; This is handling > > the SpiceFileTransferTask bits only; > > > > * file_xfer_read_cb -> file_xfer_read_async_cb > > Renamed to match _read_async() function; This is handling the data > > from reading the file by flushing it to the agent. > > > > As the _read_async() uses GTask, the error handling is done on the > > channel-main's callback, after _read_finish() is called. > > > > This change is related to split SpiceFileTransferTask from > > channel-main. > > > > Acked-by: Jonathon Jongsma <jjongsma@xxxxxxxxxx> > > --- > > src/channel-main.c | 165 +++++++++++++++++++++++++++++++++++++++------------- > > - > > 1 file changed, 122 insertions(+), 43 deletions(-) > > > > diff --git a/src/channel-main.c b/src/channel-main.c > > index d721852..244b19e 100644 > > --- a/src/channel-main.c > > +++ b/src/channel-main.c > > @@ -71,6 +71,13 @@ static void > > spice_file_transfer_task_init_task_async(SpiceFileTransferTask *self > > static GFileInfo > > *spice_file_transfer_task_init_task_finish(SpiceFileTransferTask *xfer_task, > > GAsyncResult > > *result, > > GError **error); > > +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, > > + GAsyncReadyCallback callback, > > + gpointer userdata); > > +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask > > *self, > > + GAsyncResult *result, > > + char **buffer, > > + GError **error); > > > > /** > > * SECTION:file-transfer-task > > @@ -246,9 +253,11 @@ static void migrate_channel_event_cb(SpiceChannel > > *channel, SpiceChannelEvent ev > > gpointer data); > > static gboolean main_migrate_handshake_done(gpointer data); > > static void spice_main_channel_send_migration_handshake(SpiceChannel > > *channel); > > -static void file_xfer_continue_read(SpiceFileTransferTask *task); > > static void spice_file_transfer_task_completed(SpiceFileTransferTask *self, > > GError *error); > > static void file_xfer_flushed(SpiceMainChannel *channel, gboolean success); > > +static void file_xfer_read_async_cb(GObject *source_object, > > + GAsyncResult *res, > > + gpointer user_data); > > static void spice_main_set_max_clipboard(SpiceMainChannel *self, gint max); > > static void set_agent_connected(SpiceMainChannel *channel, gboolean > > connected); > > > > @@ -1882,7 +1891,6 @@ static void file_xfer_data_flushed_cb(GObject > > *source_object, > > SpiceMainChannel *channel = (SpiceMainChannel *)source_object; > > GError *error = NULL; > > > > - self->pending = FALSE; > > file_xfer_flush_finish(channel, res, &error); > > if (error || self->error) { > > spice_file_transfer_task_completed(self, error); > > @@ -1923,7 +1931,7 @@ static void file_xfer_data_flushed_cb(GObject > > *source_object, > > } > > > > /* Read more data */ > > - file_xfer_continue_read(self); > > + spice_file_transfer_task_read_async(self, file_xfer_read_async_cb, NULL); > > } > > > > static void file_xfer_queue(SpiceFileTransferTask *self, int data_size) > > @@ -1943,54 +1951,35 @@ static void file_xfer_queue(SpiceFileTransferTask > > *self, int data_size) > > } > > > > /* main context */ > > -static void file_xfer_read_cb(GObject *source_object, > > - GAsyncResult *res, > > - gpointer user_data) > > +static void file_xfer_read_async_cb(GObject *source_object, > > + GAsyncResult *res, > > + gpointer user_data) > > { > > - SpiceFileTransferTask *self = user_data; > > - SpiceMainChannel *channel = self->channel; > > + SpiceFileTransferTask *xfer_task; > > + SpiceMainChannel *channel; > > gssize count; > > + char *buffer; > > + GCancellable *cancellable; > > GError *error = NULL; > > > > - self->pending = FALSE; > > - count = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), > > - res, &error); > > - /* Check for pending earlier errors */ > > - if (self->error) { > > - spice_file_transfer_task_completed(self, error); > > + xfer_task = SPICE_FILE_TRANSFER_TASK(source_object); > > + > > + channel = spice_file_transfer_task_get_channel(xfer_task); > > + count = spice_file_transfer_task_read_finish(xfer_task, res, &buffer, > > &error); > > + if (count < 0) { > > + spice_channel_wakeup(SPICE_CHANNEL(channel), FALSE); > > + spice_file_transfer_task_completed(xfer_task, error); > > return; > > } > > > > - if (count > 0 || self->file_size == 0) { > > - GCancellable *cancellable; > > - > > - self->read_bytes += count; > > - g_object_notify(G_OBJECT(self), "progress"); > > - file_xfer_queue(self, count); > > - if (count == 0) > > - return; > > - cancellable = spice_file_transfer_task_get_cancellable(self); > > - file_xfer_flush_async(channel, cancellable, > > - file_xfer_data_flushed_cb, self); > > - self->pending = TRUE; > > - } else if (error) { > > - spice_channel_wakeup(SPICE_CHANNEL(self->channel), FALSE); > > - spice_file_transfer_task_completed(self, error); > > + file_xfer_queue(xfer_task, count); > > + if (count == 0) { > > + /* on EOF just wait for VD_AGENT_FILE_XFER_STATUS from agent */ > > + return; > > } > > - /* else EOF, do nothing (wait for VD_AGENT_FILE_XFER_STATUS from agent) > > */ > > -} > > > > -/* coroutine context */ > > -static void file_xfer_continue_read(SpiceFileTransferTask *self) > > -{ > > - g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), > > - self->buffer, > > - FILE_XFER_CHUNK_SIZE, > > - G_PRIORITY_DEFAULT, > > - self->cancellable, > > - file_xfer_read_cb, > > - self); > > - self->pending = TRUE; > > + cancellable = spice_file_transfer_task_get_cancellable(xfer_task); > > + file_xfer_flush_async(channel, cancellable, file_xfer_data_flushed_cb, > > xfer_task); > > } > > > > /* coroutine context */ > > @@ -2009,7 +1998,7 @@ static void > > spice_file_transfer_task_handle_status(SpiceFileTransferTask *task, > > "transfer received CAN_SEND_DATA in pending > > state"); > > break; > > } > > - file_xfer_continue_read(task); > > + spice_file_transfer_task_read_async(task, file_xfer_read_async_cb, > > NULL); > > return; > > case VD_AGENT_FILE_XFER_STATUS_CANCELLED: > > error = g_error_new(SPICE_CLIENT_ERROR, SPICE_CLIENT_ERROR_FAILED, > > @@ -3370,6 +3359,96 @@ static GFileInfo > > *spice_file_transfer_task_init_task_finish(SpiceFileTransferTas > > return g_task_propagate_pointer(task, error); > > } > > > > +static void spice_file_transfer_task_read_stream_cb(GObject *source_object, > > + GAsyncResult *res, > > + gpointer userdata) > > +{ > > + SpiceFileTransferTask *self; > > + GTask *task; > > + gssize nbytes; > > + GError *error = NULL; > > + > > + task = G_TASK(userdata); > > + self = g_task_get_source_object(task); > > + > > + g_return_if_fail(self->pending == TRUE); > > + self->pending = FALSE; > > + > > + nbytes = g_input_stream_read_finish(G_INPUT_STREAM(self->file_stream), > > res, &error); > > + if (self->error) { > > + /* On any pending error on SpiceFileTransferTask */ > > + g_task_return_error(task, self->error); > > + return; > > + } else if (error) { > > + g_task_return_error(task, error); > > + return; > > + } > > + > > + self->read_bytes += nbytes; > > + > > + g_task_return_int(task, nbytes); > > +} > > + > > +/* Any context */ > > +static void spice_file_transfer_task_read_async(SpiceFileTransferTask *self, > > + GAsyncReadyCallback callback, > > + gpointer userdata) > > +{ > > + GTask *task; > > + > > + g_return_if_fail(self != NULL); > > + if (self->pending) { > > + g_task_report_new_error(self, callback, userdata, > > + spice_file_transfer_task_read_async, > > + SPICE_CLIENT_ERROR, > > + SPICE_CLIENT_ERROR_FAILED, > > + "Cannot read data in pending state"); > > + return; > > + } > > + > > + /* Notify the progress prior the read to make the info be related to the > > + * data that was already sent. To notify the 100% (completed), channel- > > main > > + * should call read-async when it expects EOF. */ > > + g_object_notify(G_OBJECT(self), "progress"); > > + > > + task = g_task_new(self, self->cancellable, callback, userdata); > > + > > + if (self->read_bytes == self->file_size) { > > + /* channel-main might can request data after reading the whole file > > as > > + * it expects EOF. Let's return immediately its request as we don't > > want > > + * to reach a state where agent says file-transfer SUCCEED but we are > > in > > + * a PENDING state in SpiceFileTransferTask due reading in idle */ > > + g_task_return_int(task, 0); > > + return; > > + } > > As an alternative, could we also simply emit a "progress" notification in > _task_completed() or something? The check for EOF is not related to the progress notification but a race that might happen, which is: 1-) client reads all data from file with _task_read_async() 2-) client flush data to agent and request to read more; 3-) the stream_read_async() will return EOF but async 4-) the agent might send SUCCEED while stream_read_async() did not return yet, meaning that file-transfer is pending and triggers an error (can't succeed on pending) This one took a while for me to understand while doing a file-transfer with 40+ files at the same time as test. > If we keep this approach, there's a little comment issue to fix: > "might can" -> "might" Thanks! I'll fix it :) > > > + > > + self->pending = TRUE; > > + g_input_stream_read_async(G_INPUT_STREAM(self->file_stream), > > + self->buffer, > > + FILE_XFER_CHUNK_SIZE, > > + G_PRIORITY_DEFAULT, > > + self->cancellable, > > + spice_file_transfer_task_read_stream_cb, > > + task); > > +} > > + > > +static gssize spice_file_transfer_task_read_finish(SpiceFileTransferTask > > *self, > > + GAsyncResult *result, > > + char **buffer, > > + GError **error) > > +{ > > + gssize nbytes; > > + GTask *task = G_TASK(result); > > + > > + g_return_val_if_fail(self != NULL, -1); > > + > > + nbytes = g_task_propagate_int(task, error); > > + if (nbytes >= 0 && buffer != NULL) > > + *buffer = self->buffer; > > + > > + return nbytes; > > +} > > + > > static void > > spice_file_transfer_task_get_property(GObject *object, > > guint property_id, > > > Acked-by: Jonathon Jongsma <jjongsma@xxxxxxxxxx> Cheers, toso > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > https://lists.freedesktop.org/mailman/listinfo/spice-devel _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel