This makes the Stream class a native citizen of the Java API. It can be used with the NIO Channel API, as well as (In,Out)putStream's using the java.nio.channels.Channels convenience wrappers. --- src/main/java/org/libvirt/Stream.java | 171 ++++++++++++++++++++++++++++- src/main/java/org/libvirt/jna/Libvirt.java | 6 +- 2 files changed, 172 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/libvirt/Stream.java b/src/main/java/org/libvirt/Stream.java index 404c9a0..975e1b6 100644 --- a/src/main/java/org/libvirt/Stream.java +++ b/src/main/java/org/libvirt/Stream.java @@ -1,12 +1,48 @@ package org.libvirt; +import java.io.IOException; + +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonReadableChannelException; +import java.nio.channels.NonWritableChannelException; + import org.libvirt.jna.Libvirt; import org.libvirt.jna.StreamPointer; import static org.libvirt.Library.libvirt; import com.sun.jna.NativeLong; -public class Stream { +/** + * The Stream class is used to transfer data between a libvirt daemon + * and a client. + * <p> + * It implements the ByteChannel interface. + * <p> + * Basic usage: + * + * <pre> + * {@code + * ByteBuffer buf = ByteBuffer.allocate(1024); + * Stream str = conn.streamNew(0); + * + * ... // open the stream e.g. calling Domain.screenshot + * + * while (str.read(buf) != -1) { + * buf.flip(); + * ... // do something with the data + * buf.compact(); + * } + * } + * </pre> + * <p> + * If you want to use this class as an InputStream or OutputStream, + * convert it using the {@link java.nio.channels.Channels#newInputStream + * Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream + * Channels.newOutputStream} respectively. + */ +public class Stream implements ByteChannel { public static int VIR_STREAM_NONBLOCK = (1 << 0); @@ -20,6 +56,56 @@ public class Stream { */ private Connect virConnect; + private final static int CLOSED = 0; + private final static int READABLE = 1; + private final static int WRITABLE = 2; + private final static int OPEN = READABLE | WRITABLE; + private final static int EOF = 4; + + /* The status of the stream. A stream starts its live in the + * "CLOSED" state. + * + * It will be opened for input / output by another libvirt + * operation (e.g. virStorageVolDownload), which means it will + * be in state "READABLE" or "WRITABLE", exclusively. + * + * It will reach state "EOF", if {@link finish()} is called. + * + * It will be in the "CLOSED" state again, after calling abort() + * or close(). + */ + private int state = CLOSED; + + void markReadable() { + assert !isWritable() + : "A Stream cannot be readable and writable at the same time"; + + state |= READABLE; + } + + void markWritable() { + assert !isReadable() + : "A Stream cannot be readable and writable at the same time"; + + state |= WRITABLE; + } + + boolean isReadable() { + return (state & READABLE) != 0; + } + + boolean isWritable() { + return (state & WRITABLE) != 0; + } + + protected boolean isEOF() { + return (state & EOF) != 0; + } + + private void markEOF() { + state |= EOF; + } + Stream(Connect virConnect, StreamPointer VSP) { this.virConnect = virConnect; this.VSP = VSP; @@ -32,6 +118,7 @@ public class Stream { public int abort() throws LibvirtException { int returnValue = libvirt.virStreamAbort(VSP); processError(); + this.state = CLOSED; return returnValue; } @@ -70,6 +157,7 @@ public class Stream { public int finish() throws LibvirtException { int returnValue = libvirt.virStreamFinish(VSP); processError(); + markEOF(); return returnValue; } @@ -83,6 +171,7 @@ public class Stream { public int free() throws LibvirtException { int success = 0; if (VSP != null) { + closeStream(); success = libvirt.virStreamFree(VSP); processError(); VSP = null; @@ -108,11 +197,82 @@ public class Stream { * @throws LibvirtException */ public int receive(byte[] data) throws LibvirtException { - int returnValue = libvirt.virStreamRecv(VSP, data, new NativeLong(data.length)); + return receive(ByteBuffer.wrap(data)); + } + + protected int receive(ByteBuffer buffer) throws LibvirtException { + int returnValue = libvirt.virStreamRecv(VSP, buffer, new NativeLong(buffer.remaining())); processError(); + buffer.position(buffer.position() + returnValue); return returnValue; } + @Override + public int read(ByteBuffer buffer) throws IOException { + if (!isOpen()) throw new ClosedChannelException(); + if (!isReadable()) throw new NonReadableChannelException(); + if (isEOF()) return -1; + + try { + int ret = receive(buffer); + + switch (ret) { + case 0: + finish(); + return -1; + + case -2: + throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); + + default: + return ret; + } + } catch (LibvirtException e) { + throw new IOException("could not read from stream", e); + } + } + + @Override + public int write(ByteBuffer buffer) throws IOException { + if (!isOpen()) throw new ClosedChannelException(); + if (!isWritable()) throw new NonWritableChannelException(); + + int pos = buffer.position(); + + try { + while (buffer.hasRemaining()) { + int ret = send(buffer); + + if (ret == -2) + throw new UnsupportedOperationException("non-blocking I/O stream not yet supported"); + } + return buffer.position() - pos; + } catch (LibvirtException e) { + throw new IOException("could not write to stream", e); + } + } + + protected void closeStream() throws LibvirtException { + if (isOpen() && !isEOF()) { + abort(); + } + this.state = CLOSED; + } + + @Override + public void close() throws IOException { + try { + closeStream(); + } catch (LibvirtException e) { + throw new IOException("error while closing Stream", e); + } + } + + @Override + public boolean isOpen() { + return (this.state & OPEN) != 0; + } + /** * Batch receive method * @@ -174,8 +334,13 @@ public class Stream { * @since 1.5.2 */ public int send(byte[] data) throws LibvirtException { - int returnValue = libvirt.virStreamSend(VSP, data, new NativeLong(data.length)); + return send(ByteBuffer.wrap(data)); + } + + protected int send(ByteBuffer buffer) throws LibvirtException { + int returnValue = libvirt.virStreamSend(VSP, buffer, new NativeLong(buffer.remaining())); processError(); + buffer.position(buffer.position() + returnValue); return returnValue; } diff --git a/src/main/java/org/libvirt/jna/Libvirt.java b/src/main/java/org/libvirt/jna/Libvirt.java index 98f2125..fe74087 100644 --- a/src/main/java/org/libvirt/jna/Libvirt.java +++ b/src/main/java/org/libvirt/jna/Libvirt.java @@ -8,6 +8,8 @@ import com.sun.jna.Pointer; import com.sun.jna.ptr.IntByReference; import com.sun.jna.ptr.LongByReference; +import java.nio.ByteBuffer; + /** * The libvirt interface which is exposed via JNA. The complete API is * documented at http://www.libvirt.org/html/libvirt-libvirt.html. @@ -368,9 +370,9 @@ public interface Libvirt extends Library { int virStreamFinish(StreamPointer virStreamPtr) ; int virStreamFree(StreamPointer virStreamPtr) ; StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ; - int virStreamSend(StreamPointer virStreamPtr, byte[] data, NativeLong size); + int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, NativeLong size); int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler, Pointer opaque); - int virStreamRecv(StreamPointer virStreamPtr, byte[] data, NativeLong length); + int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, NativeLong length); int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler, Pointer opaque); //DomainSnapshot Methods -- 1.8.5.2.msysgit.0 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list