Re: [PATCH spice-gtk] controller: async flush read/write

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Jul 03, 2012 at 03:41:21AM +0200, Marc-André Lureau wrote:
> Windows namedpipes behave a bit differently from Unix socket, and may
> return incomplete read/write. By using 2 read/write() helpers, try to
> complete the operation before returning. Since the IO operation may be
> splitted over several call, we make sure the buffer pointer is on the
> heap. We use exception for EOF or BROKEN_PIPE condition, which also
> simplifies the code.

Looks good to me.

Can't see who handles the thrown CLOSED exception but I guess it's
somewhere.

> 
> To really work with namedpipe, the giowin32streams need to be fixed as
> well to handle concurrent read & write properly, see for details:
> https://bugzilla.gnome.org/show_bug.cgi?id=679288
> ---
>  gtk/controller/Makefile.am       |    1 +
>  gtk/controller/controller.vala   |   37 +++++++----------------------
>  gtk/controller/foreign-menu.vala |   48 ++++++++++++++------------------------
>  gtk/controller/util.vala         |   42 +++++++++++++++++++++++++++++++++
>  4 files changed, 69 insertions(+), 59 deletions(-)
>  create mode 100644 gtk/controller/util.vala
> 
> diff --git a/gtk/controller/Makefile.am b/gtk/controller/Makefile.am
> index 90fce40..7bfa51b 100644
> --- a/gtk/controller/Makefile.am
> +++ b/gtk/controller/Makefile.am
> @@ -26,6 +26,7 @@ libspice_controller_la_VALASOURCES =		\
>  	menu.vala				\
>  	controller.vala				\
>  	foreign-menu.vala			\
> +	util.vala				\
>  	$(NULL)
>  
>  libspice_controller_la_BUILT_SOURCES =			\
> diff --git a/gtk/controller/controller.vala b/gtk/controller/controller.vala
> index 185f5e0..24433ec 100644
> --- a/gtk/controller/controller.vala
> +++ b/gtk/controller/controller.vala
> @@ -69,10 +69,10 @@ public class Controller: Object {
>  		// message.
>  		try {
>  			if (excl_connection != null) {
> -				yield excl_connection.output_stream.write_async (p);
> +				yield output_stream_write (excl_connection.output_stream, p);
>  			} else {
>  				foreach (var c in clients)
> -					yield c.output_stream.write_async (p);
> +					yield output_stream_write (c.output_stream, p);
>  			}
>  		} catch (GLib.Error e) {
>  			warning (e.message);
> @@ -180,27 +180,19 @@ public class Controller: Object {
>  	}
>  
>  	private async void handle_client (IOStream c) throws GLib.Error {
> -		var init = SpiceProtocol.Controller.Init ();
>  		var excl = false;
> -		unowned uint8[] p = null;
>  
>  		debug ("new socket client, reading init header");
>  
> -		p = ((uint8[])(&init))[0:sizeof(SpiceProtocol.Controller.InitHeader)]; // FIXME vala
> -		var read = yield c.input_stream.read_async (p);
> -		if (warn_if (read != sizeof (SpiceProtocol.Controller.InitHeader)))
> -			return;
> +		var p = new uint8[sizeof(SpiceProtocol.Controller.Init)];
> +		var init = (SpiceProtocol.Controller.Init*)p;
> +		yield input_stream_read (c.input_stream, p);
>  		if (warn_if (init.base.magic != SpiceProtocol.Controller.MAGIC))
>  			return;
>  		if (warn_if (init.base.version != SpiceProtocol.Controller.VERSION))
>  			return;
>  		if (warn_if (init.base.size < sizeof (SpiceProtocol.Controller.Init)))
>  			return;
> -
> -		p = ((uint8[])(&init.credentials))[0:init.base.size - sizeof(SpiceProtocol.Controller.InitHeader)];
> -		read = yield c.input_stream.read_async (p);
> -		if (warn_if (read != (init.base.size - sizeof (SpiceProtocol.Controller.InitHeader))))
> -			return;
>  		if (warn_if (init.credentials != 0))
>  			return;
>  		if (warn_if (excl_connection != null))
> @@ -217,29 +209,18 @@ public class Controller: Object {
>  
>  		client_connected ();
>  
> -		var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
>  		for (;;) {
> -			read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.Controller.Msg)]);
> -			if (read == 0)
> -				break;
> -
> -			if (warn_if (read != sizeof (SpiceProtocol.Controller.Msg))) {
> -				warning ("read only: " + read.to_string ());
> -				break;
> -			}
> -
> +			var t = new uint8[sizeof(SpiceProtocol.Controller.Msg)];
> +			yield input_stream_read (c.input_stream, t);
>  			var msg = (SpiceProtocol.Controller.Msg*)t;
> +			debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
>  			if (warn_if (msg.size < sizeof (SpiceProtocol.Controller.Msg)))
>  				break;
>  
>  			if (msg.size > sizeof (SpiceProtocol.Controller.Msg)) {
>  				t.resize ((int)msg.size);
>  				msg = (SpiceProtocol.Controller.Msg*)t;
> -				read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
> -				if (read == 0)
> -					break;
> -				if (warn_if (read != msg.size - sizeof(SpiceProtocol.Controller.Msg)))
> -					break;
> +				yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.Controller.Msg):msg.size]);
>  			}
>  
>  			handle_message (msg);
> diff --git a/gtk/controller/foreign-menu.vala b/gtk/controller/foreign-menu.vala
> index f2406bd..db2f353 100644
> --- a/gtk/controller/foreign-menu.vala
> +++ b/gtk/controller/foreign-menu.vala
> @@ -70,14 +70,15 @@ public class ForeignMenu: Object {
>  		send_msg (p);
>  	}
>  
> -	public async bool send_msg (uint8[] p) throws GLib.Error {
> +	public async bool send_msg (owned uint8[] p) throws GLib.Error {
>  		// vala FIXME: pass Controller.Msg instead
>  		// vala doesn't keep reference on the struct in async methods
>  		// it copies only base, which is not enough to transmit the whole
>  		// message.
>  		try {
> -			foreach (var c in clients)
> -				yield c.output_stream.write_async (p);
> +			foreach (var c in clients) {
> +				yield output_stream_write (c.output_stream, p);
> +			}
>  		} catch (GLib.Error e) {
>  			warning (e.message);
>  		}
> @@ -126,15 +127,11 @@ public class ForeignMenu: Object {
>  	}
>  
>  	private async void handle_client (IOStream c) throws GLib.Error {
> -		var header = SpiceProtocol.ForeignMenu.InitHeader ();
> -		unowned uint8[] p = null;
> -
>  		debug ("new socket client, reading init header");
>  
> -		p = ((uint8[])(&header))[0:sizeof(SpiceProtocol.ForeignMenu.InitHeader)]; // FIXME vala
> -		var read = yield c.input_stream.read_async (p);
> -		if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.InitHeader)))
> -			return;
> +		var p = new uint8[sizeof(SpiceProtocol.ForeignMenu.InitHeader)];
> +		var header = (SpiceProtocol.ForeignMenu.InitHeader*)p;
> +		yield input_stream_read (c.input_stream, p);
>  		if (warn_if (header.magic != SpiceProtocol.ForeignMenu.MAGIC))
>  			return;
>  		if (warn_if (header.version != SpiceProtocol.ForeignMenu.VERSION))
> @@ -142,44 +139,33 @@ public class ForeignMenu: Object {
>  		if (warn_if (header.size < sizeof (SpiceProtocol.ForeignMenu.Init)))
>  			return;
>  
> -		uint64 credentials = 0;
> -		p = ((uint8[])(&credentials))[0:sizeof(uint64)];
> -		read = yield c.input_stream.read_async (p);
> -		if (warn_if (read != sizeof(uint64)))
> -			return;
> +		var cp = new uint8[sizeof(uint64)];
> +		yield input_stream_read (c.input_stream, cp);
> +		uint64 credentials = *(uint64*)cp;
>  		if (warn_if (credentials != 0))
>  			return;
>  
>  		var title_size = header.size - sizeof(SpiceProtocol.ForeignMenu.Init);
>  		var title = new uint8[title_size + 1];
> -		read = yield c.input_stream.read_async (title[0:title_size]);
> +		yield c.input_stream.read_async (title[0:title_size]);
>  		this.title = (string)title;
>  
>  		client_connected ();
>  
> -		var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
>  		for (;;) {
> -			read = yield c.input_stream.read_async (t[0:sizeof(SpiceProtocol.ForeignMenu.Msg)]);
> -			if (read == 0)
> -				break;
> -
> -			if (warn_if (read != sizeof (SpiceProtocol.ForeignMenu.Msg))) {
> -				warning ("read only: " + read.to_string ());
> -				break;
> -			}
> -
> +			var t = new uint8[sizeof(SpiceProtocol.ForeignMenu.Msg)];
> +			yield input_stream_read (c.input_stream, t);
>  			var msg = (SpiceProtocol.ForeignMenu.Msg*)t;
> +			debug ("new message " + msg.id.to_string () + "size " + msg.size.to_string ());
> +
>  			if (warn_if (msg.size < sizeof (SpiceProtocol.ForeignMenu.Msg)))
>  				break;
>  
>  			if (msg.size > sizeof (SpiceProtocol.ForeignMenu.Msg)) {
>  				t.resize ((int)msg.size);
>  				msg = (SpiceProtocol.ForeignMenu.Msg*)t;
> -				read = yield c.input_stream.read_async (t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
> -				if (read == 0)
> -					break;
> -				if (warn_if (read != msg.size - sizeof(SpiceProtocol.ForeignMenu.Msg)))
> -					break;
> +
> +				yield input_stream_read (c.input_stream, t[sizeof(SpiceProtocol.ForeignMenu.Msg):msg.size]);
>  			}
>  
>  			handle_message (msg);
> diff --git a/gtk/controller/util.vala b/gtk/controller/util.vala
> new file mode 100644
> index 0000000..16f546c
> --- /dev/null
> +++ b/gtk/controller/util.vala
> @@ -0,0 +1,42 @@
> +// Copyright (C) 2012 Red Hat, Inc.
> +
> +// This library is free software; you can redistribute it and/or
> +// modify it under the terms of the GNU Lesser General Public
> +// License as published by the Free Software Foundation; either
> +// version 2.1 of the License, or (at your option) any later version.
> +
> +// This library is distributed in the hope that it will be useful,
> +// but WITHOUT ANY WARRANTY; without even the implied warranty of
> +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
> +// Lesser General Public License for more details.
> +
> +// You should have received a copy of the GNU Lesser General Public
> +// License along with this library; if not, see <http://www.gnu.org/licenses/>.
> +
> +namespace SpiceCtrl {
> +
> +	public async void input_stream_read (InputStream stream, uint8[] buffer) throws GLib.IOError {
> +		var length = buffer.length;
> +		ssize_t i = 0;
> +
> +		while (i < length) {
> +			var n = yield stream.read_async (buffer[i:length]);
> +			if (n == 0)
> +				throw new GLib.IOError.CLOSED ("closed stream") ;
> +			i += n;
> +		}
> +	}
> +
> +	public async void output_stream_write (OutputStream stream, owned uint8[] buffer) throws GLib.IOError {
> +		var length = buffer.length;
> +		ssize_t i = 0;
> +
> +		while (i < length) {
> +			var n = yield stream.write_async (buffer[i:length]);
> +			if (n == 0)
> +				throw new GLib.IOError.CLOSED ("closed stream") ;
> +			i += n;
> +		}
> +	}
> +
> +}
> \ No newline at end of file
> -- 
> 1.7.10.2
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel@xxxxxxxxxxxxxxxxxxxxx
> http://lists.freedesktop.org/mailman/listinfo/spice-devel
_______________________________________________
Spice-devel mailing list
Spice-devel@xxxxxxxxxxxxxxxxxxxxx
http://lists.freedesktop.org/mailman/listinfo/spice-devel



[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Security]     [Bugtraq]     [Linux]     [Linux OMAP]     [Linux MIPS]     [ECOS]     [Asterisk Internet PBX]     [Linux API]     [Monitors]