This makes the Stream class a native citizen of the Java API. It can be used with the NIO Channel API, as well as (In,Out)putStream's using the java.nio.channels.Channels convenience wrappers. Signed-off-by: Claudio Bley <cbley@xxxxxxxxxx> --- src/main/java/org/libvirt/Stream.java | 183 ++++++++++++++++++++++++++-- src/main/java/org/libvirt/jna/Libvirt.java | 4 +- 2 files changed, 178 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/libvirt/Stream.java b/src/main/java/org/libvirt/Stream.java index c36ed70..71a2a6e 100644 --- a/src/main/java/org/libvirt/Stream.java +++ b/src/main/java/org/libvirt/Stream.java @@ -1,12 +1,46 @@ package org.libvirt; +import java.io.IOException; + +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonReadableChannelException; +import java.nio.channels.NonWritableChannelException; + import org.libvirt.jna.Libvirt; import org.libvirt.jna.SizeT; import org.libvirt.jna.StreamPointer; import static org.libvirt.Library.libvirt; import static org.libvirt.ErrorHandler.processError; -public class Stream { +/** + * The Stream class is used to transfer data between a libvirt daemon + * and a client. + * <p> + * It implements the ByteChannel interface. + * <p> + * Basic usage: + * + * <pre> + * {@code + * ByteBuffer buf = ByteBuffer.allocate(1024); + * Stream str = conn.streamNew(0); + * + * ... // open the stream e.g. calling Domain.screenshot + * + * while (str.read(buf) != -1) { + * buf.flip(); + * ... // do something with the data + * buf.compact(); + * }}</pre> + * <p> + * If you want to use this class as an InputStream or OutputStream, + * convert it using the {@link java.nio.channels.Channels#newInputStream + * Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream + * Channels.newOutputStream} respectively. + */ +public class Stream implements ByteChannel { public static int VIR_STREAM_NONBLOCK = (1 << 0); @@ -20,6 +54,56 @@ public class Stream { */ private Connect virConnect; + private final static int CLOSED = 0; + private final static int READABLE = 1; + private final static int WRITABLE = 2; + private final static int OPEN = READABLE | WRITABLE; + private final static int EOF = 4; + + /* The status of the stream. A stream starts its live in the + * "CLOSED" state. + * + * It will be opened for input / output by another libvirt + * operation (e.g. virStorageVolDownload), which means it will + * be in state "READABLE" or "WRITABLE", exclusively. + * + * It will reach state "EOF", if {@link finish()} is called. + * + * It will be in the "CLOSED" state again, after calling abort() + * or close(). + */ + private int state = CLOSED; + + void markReadable() { + assert !isWritable() + : "A Stream cannot be readable and writable at the same time"; + + state |= READABLE; + } + + void markWritable() { + assert !isReadable() + : "A Stream cannot be readable and writable at the same time"; + + state |= WRITABLE; + } + + boolean isReadable() { + return (state & READABLE) != 0; + } + + boolean isWritable() { + return (state & WRITABLE) != 0; + } + + protected boolean isEOF() { + return (state & EOF) != 0; + } + + private void markEOF() { + state |= EOF; + } + Stream(Connect virConnect, StreamPointer VSP) { this.virConnect = virConnect; this.VSP = VSP; @@ -32,7 +116,9 @@ public class Stream { * @return <em>ignore</em> (always 0) */ public int abort() throws LibvirtException { - return processError(libvirt.virStreamAbort(VSP)); + int returnValue = processError(libvirt.virStreamAbort(VSP)); + this.state = CLOSED; + return returnValue; } /** @@ -66,7 +152,9 @@ public class Stream { * @throws LibvirtException */ public int finish() throws LibvirtException { - return processError(libvirt.virStreamFinish(VSP)); + int returnValue = processError(libvirt.virStreamFinish(VSP)); + markEOF(); + return returnValue; } /** @@ -79,7 +167,8 @@ public class Stream { public int free() throws LibvirtException { int success = 0; if (VSP != null) { - processError(libvirt.virStreamFree(VSP)); + closeStream(); + success = processError(libvirt.virStreamFree(VSP)); VSP = null; } @@ -95,7 +184,80 @@ public class Stream { * @throws LibvirtException */ public int receive(byte[] data) throws LibvirtException { - return processError(libvirt.virStreamRecv(VSP, data, new SizeT(data.length))); + return receive(ByteBuffer.wrap(data)); + } + + protected int receive(ByteBuffer buffer) throws LibvirtException { + int returnValue = processError(libvirt.virStreamRecv(VSP, buffer, new SizeT(buffer.remaining()))); + buffer.position(buffer.position() + returnValue); + return returnValue; + } + + @Override + public int read(ByteBuffer buffer) throws IOException { + if (!isOpen()) throw new ClosedChannelException(); + if (!isReadable()) throw new NonReadableChannelException(); + if (isEOF()) return -1; + + try { + int ret = receive(buffer); + + switch (ret) { + case 0: + finish(); + return -1; + + case -2: + throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); + + default: + return ret; + } + } catch (LibvirtException e) { + throw new IOException("could not read from stream", e); + } + } + + @Override + public int write(ByteBuffer buffer) throws IOException { + if (!isOpen()) throw new ClosedChannelException(); + if (!isWritable()) throw new NonWritableChannelException(); + + int pos = buffer.position(); + + try { + while (buffer.hasRemaining()) { + int ret = send(buffer); + + if (ret == -2) + throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); + } + return buffer.position() - pos; + } catch (LibvirtException e) { + throw new IOException("could not write to stream", e); + } + } + + protected void closeStream() throws LibvirtException { + if (isOpen() && !isEOF()) { + if (isWritable()) finish(); + else if (isReadable()) abort(); + } + this.state = CLOSED; + } + + @Override + public void close() throws IOException { + try { + closeStream(); + } catch (LibvirtException e) { + throw new IOException("error while closing Stream", e); + } + } + + @Override + public boolean isOpen() { + return (this.state & OPEN) != 0; } /** @@ -131,8 +293,15 @@ public class Stream { * full * @throws LibvirtException */ - public int send(String data) throws LibvirtException { - return processError(libvirt.virStreamSend(VSP, data, new SizeT(data.length()))); + public int send(byte[] data) throws LibvirtException { + return send(ByteBuffer.wrap(data)); + } + + protected int send(ByteBuffer buffer) throws LibvirtException { + SizeT size = new SizeT(buffer.remaining()); + int returnValue = processError(libvirt.virStreamSend(VSP, buffer, size)); + buffer.position(buffer.position() + returnValue); + return returnValue; } /** diff --git a/src/main/java/org/libvirt/jna/Libvirt.java b/src/main/java/org/libvirt/jna/Libvirt.java index c8735d2..c383ba6 100644 --- a/src/main/java/org/libvirt/jna/Libvirt.java +++ b/src/main/java/org/libvirt/jna/Libvirt.java @@ -440,9 +440,9 @@ public interface Libvirt extends Library { int virStreamFinish(StreamPointer virStreamPtr) ; int virStreamFree(StreamPointer virStreamPtr) ; StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ; - int virStreamSend(StreamPointer virStreamPtr, String data, SizeT size); + int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, SizeT size); int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler, Pointer opaque); - int virStreamRecv(StreamPointer virStreamPtr, byte[] data, SizeT length); + int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, SizeT length); int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler, Pointer opaque); //DomainSnapshot Methods -- 1.7.9.5 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list