[libvirt-java] [PATCH 58/65] Implement interface ByteChannel for Stream class

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This makes the Stream class a native citizen of the Java API.

It can be used with the NIO Channel API, as well as (In,Out)putStream's
using the java.nio.channels.Channels convenience wrappers.

Signed-off-by: Claudio Bley <cbley@xxxxxxxxxx>
---
 src/main/java/org/libvirt/Stream.java      |  183 ++++++++++++++++++++++++++--
 src/main/java/org/libvirt/jna/Libvirt.java |    4 +-
 2 files changed, 178 insertions(+), 9 deletions(-)

diff --git a/src/main/java/org/libvirt/Stream.java b/src/main/java/org/libvirt/Stream.java
index c36ed70..71a2a6e 100644
--- a/src/main/java/org/libvirt/Stream.java
+++ b/src/main/java/org/libvirt/Stream.java
@@ -1,12 +1,46 @@
 package org.libvirt;
 
+import java.io.IOException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.NonWritableChannelException;
+
 import org.libvirt.jna.Libvirt;
 import org.libvirt.jna.SizeT;
 import org.libvirt.jna.StreamPointer;
 import static org.libvirt.Library.libvirt;
 import static org.libvirt.ErrorHandler.processError;
 
-public class Stream {
+/**
+ * The Stream class is used to transfer data between a libvirt daemon
+ * and a client.
+ * <p>
+ * It implements the ByteChannel interface.
+ * <p>
+ * Basic usage:
+ *
+ * <pre>
+ * {@code
+ * ByteBuffer buf = ByteBuffer.allocate(1024);
+ * Stream str = conn.streamNew(0);
+ *
+ * ... // open the stream e.g. calling Domain.screenshot
+ *
+ * while (str.read(buf) != -1) {
+ *     buf.flip();
+ *     ... // do something with the data
+ *     buf.compact();
+ * }}</pre>
+ * <p>
+ * If you want to use this class as an InputStream or OutputStream,
+ * convert it using the {@link java.nio.channels.Channels#newInputStream
+ *  Channels.newInputStream} and {@link java.nio.channels.Channels#newOutputStream
+ *  Channels.newOutputStream} respectively.
+ */
+public class Stream implements ByteChannel {
 
     public static int VIR_STREAM_NONBLOCK = (1 << 0);
 
@@ -20,6 +54,56 @@ public class Stream {
      */
     private Connect virConnect;
 
+    private final static int CLOSED   =  0;
+    private final static int READABLE =  1;
+    private final static int WRITABLE =  2;
+    private final static int OPEN     = READABLE | WRITABLE;
+    private final static int EOF      =  4;
+
+    /* The status of the stream. A stream starts its live in the
+     * "CLOSED" state.
+     *
+     * It will be opened for input / output by another libvirt
+     * operation (e.g. virStorageVolDownload), which means it will
+     * be in state "READABLE" or "WRITABLE", exclusively.
+     *
+     * It will reach state "EOF", if {@link finish()} is called.
+     *
+     * It will be in the "CLOSED" state again, after calling abort()
+     * or close().
+     */
+    private int state = CLOSED;
+
+    void markReadable() {
+        assert !isWritable()
+            : "A Stream cannot be readable and writable at the same time";
+
+        state |= READABLE;
+    }
+
+    void markWritable() {
+        assert !isReadable()
+            : "A Stream cannot be readable and writable at the same time";
+
+        state |= WRITABLE;
+    }
+
+    boolean isReadable() {
+        return (state & READABLE) != 0;
+    }
+
+    boolean isWritable() {
+        return (state & WRITABLE) != 0;
+    }
+
+    protected boolean isEOF() {
+        return (state & EOF) != 0;
+    }
+
+    private void markEOF() {
+        state |= EOF;
+    }
+
     Stream(Connect virConnect, StreamPointer VSP) {
         this.virConnect = virConnect;
         this.VSP = VSP;
@@ -32,7 +116,9 @@ public class Stream {
      * @return <em>ignore</em> (always 0)
      */
     public int abort() throws LibvirtException {
-        return processError(libvirt.virStreamAbort(VSP));
+        int returnValue = processError(libvirt.virStreamAbort(VSP));
+        this.state = CLOSED;
+        return returnValue;
     }
 
     /**
@@ -66,7 +152,9 @@ public class Stream {
      * @throws LibvirtException
      */
     public int finish() throws LibvirtException {
-        return processError(libvirt.virStreamFinish(VSP));
+        int returnValue = processError(libvirt.virStreamFinish(VSP));
+        markEOF();
+        return returnValue;
     }
 
     /**
@@ -79,7 +167,8 @@ public class Stream {
     public int free() throws LibvirtException {
         int success = 0;
         if (VSP != null) {
-            processError(libvirt.virStreamFree(VSP));
+            closeStream();
+            success = processError(libvirt.virStreamFree(VSP));
             VSP = null;
         }
 
@@ -95,7 +184,80 @@ public class Stream {
      * @throws LibvirtException
      */
     public int receive(byte[] data) throws LibvirtException {
-        return processError(libvirt.virStreamRecv(VSP, data, new SizeT(data.length)));
+        return receive(ByteBuffer.wrap(data));
+    }
+
+    protected int receive(ByteBuffer buffer) throws LibvirtException {
+        int returnValue = processError(libvirt.virStreamRecv(VSP, buffer, new SizeT(buffer.remaining())));
+        buffer.position(buffer.position() + returnValue);
+        return returnValue;
+    }
+
+    @Override
+    public int read(ByteBuffer buffer) throws IOException {
+        if (!isOpen()) throw new ClosedChannelException();
+        if (!isReadable()) throw new NonReadableChannelException();
+        if (isEOF()) return -1;
+
+        try {
+            int ret = receive(buffer);
+
+            switch (ret) {
+            case 0:
+                finish();
+                return -1;
+
+            case -2:
+                throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
+
+            default:
+                return ret;
+            }
+        } catch (LibvirtException e) {
+            throw new IOException("could not read from stream", e);
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer buffer) throws IOException {
+        if (!isOpen()) throw new ClosedChannelException();
+        if (!isWritable()) throw new NonWritableChannelException();
+
+        int pos = buffer.position();
+
+        try {
+            while (buffer.hasRemaining()) {
+                int ret = send(buffer);
+
+                if (ret == -2)
+                    throw new UnsupportedOperationException("non-blocking I/O stream not yet supported");
+            }
+            return buffer.position() - pos;
+        } catch (LibvirtException e) {
+            throw new IOException("could not write to stream", e);
+        }
+    }
+
+    protected void closeStream() throws LibvirtException {
+        if (isOpen() && !isEOF()) {
+            if (isWritable()) finish();
+            else if (isReadable()) abort();
+        }
+        this.state = CLOSED;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            closeStream();
+        } catch (LibvirtException e) {
+            throw new IOException("error while closing Stream", e);
+        }
+    }
+
+    @Override
+    public boolean isOpen() {
+        return (this.state & OPEN) != 0;
     }
 
     /**
@@ -131,8 +293,15 @@ public class Stream {
      *         full
      * @throws LibvirtException
      */
-    public int send(String data) throws LibvirtException {
-        return processError(libvirt.virStreamSend(VSP, data, new SizeT(data.length())));
+    public int send(byte[] data) throws LibvirtException {
+        return send(ByteBuffer.wrap(data));
+    }
+
+    protected int send(ByteBuffer buffer) throws LibvirtException {
+        SizeT size = new SizeT(buffer.remaining());
+        int returnValue = processError(libvirt.virStreamSend(VSP, buffer, size));
+        buffer.position(buffer.position() + returnValue);
+        return returnValue;
     }
 
     /**
diff --git a/src/main/java/org/libvirt/jna/Libvirt.java b/src/main/java/org/libvirt/jna/Libvirt.java
index c8735d2..c383ba6 100644
--- a/src/main/java/org/libvirt/jna/Libvirt.java
+++ b/src/main/java/org/libvirt/jna/Libvirt.java
@@ -440,9 +440,9 @@ public interface Libvirt extends Library {
     int virStreamFinish(StreamPointer virStreamPtr) ;
     int virStreamFree(StreamPointer virStreamPtr) ;
     StreamPointer virStreamNew(ConnectionPointer virConnectPtr, int flags) ;
-    int virStreamSend(StreamPointer virStreamPtr, String data, SizeT size);
+    int virStreamSend(StreamPointer virStreamPtr, ByteBuffer data, SizeT size);
     int virStreamSendAll(StreamPointer virStreamPtr, Libvirt.VirStreamSourceFunc handler, Pointer opaque);
-    int virStreamRecv(StreamPointer virStreamPtr, byte[] data, SizeT length);
+    int virStreamRecv(StreamPointer virStreamPtr, ByteBuffer data, SizeT length);
     int virStreamRecvAll(StreamPointer virStreamPtr, Libvirt.VirStreamSinkFunc handler, Pointer opaque);
 
     //DomainSnapshot Methods
-- 
1.7.9.5

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list




[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]