[PATCH RFC 41/48] iohelper_message: Implement formatted write

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This is nearly the same story as formatted read. With one
exception. Due to the recurring pattern of calling write
functions in our code:

  repeat {
    nwritten = write(dest, buf + offset, buflen - offset);
    offset += nwritten;
  }

we have to be cautious about the return value of iohelperWrite().
If we would return number of bytes partially written, we would be
called again with an offset shifted. But at the beginning of
iohelperWrite() we flush the output queue so the remaining data
is written then. Moreover, there's no way for us to determine
whether this is the case or we are called with fresh data,
completely unrelated to the first call.
Therefore, we must keep claiming EAGAIN, even though the message
is being partially written, and just on the last iteration claim
success and return the size of data we were requested to write in
the first place. Even if this means that in the last iteration
just one byte was written to make the message write complete.

Signed-off-by: Michal Privoznik <mprivozn@xxxxxxxxxx>
---
 src/iohelper/iohelper_message.c | 105 ++++++++++++++++++++++++++++++++++++++--
 1 file changed, 100 insertions(+), 5 deletions(-)

diff --git a/src/iohelper/iohelper_message.c b/src/iohelper/iohelper_message.c
index fe2304b..d900c2f 100644
--- a/src/iohelper/iohelper_message.c
+++ b/src/iohelper/iohelper_message.c
@@ -39,9 +39,11 @@ struct iohelperCtl {
     bool blocking;
     virNetMessagePtr msg;
     bool msgReadyRead;
+    bool msgReadyWrite;
 };
 
 typedef ssize_t (*readfunc)(int fd, void *buf, size_t count);
+typedef ssize_t (*writefunc)(int fd, const void *buf, size_t count);
 
 static virClassPtr iohelperCtlClass;
 
@@ -84,6 +86,7 @@ iohelperCtlNew(int fd,
     ret->fd = fd;
     ret->blocking = blocking;
     ret->msgReadyRead = false;
+    ret->msgReadyWrite = true;
 
     return ret;
 
@@ -98,6 +101,7 @@ messageClear(iohelperCtlPtr ctl)
 {
     virNetMessageClear(ctl->msg);
     ctl->msgReadyRead = false;
+    ctl->msgReadyWrite = true;
 }
 
 
@@ -107,6 +111,11 @@ messageReadyRead(iohelperCtlPtr ctl)
     return ctl->msgReadyRead;
 }
 
+static inline bool
+messageReadyWrite(iohelperCtlPtr ctl)
+{
+    return ctl->msgReadyWrite;
+}
 
 static ssize_t
 messageRecv(iohelperCtlPtr ctl)
@@ -165,6 +174,47 @@ messageRecv(iohelperCtlPtr ctl)
 }
 
 
+static ssize_t
+messageSend(iohelperCtlPtr ctl)
+{
+    virNetMessagePtr msg = ctl->msg;
+    writefunc writeF = ctl->blocking ? safewrite : write;
+
+    ctl->msgReadyWrite = false;
+
+    while (true) {
+        ssize_t nwritten;
+        size_t want;
+
+        want = msg->bufferLength - msg->bufferOffset;
+
+     rewrite:
+        errno = 0;
+        nwritten = writeF(ctl->fd,
+                          msg->buffer + msg->bufferOffset,
+                          want);
+
+        if (nwritten < 0) {
+            if (errno == EINTR)
+                goto rewrite;
+            if (errno == EAGAIN)
+                return 0;
+            return -1;
+        } else if (nwritten == 0) {
+            /* EOF while writing */
+            return 0;
+        } else {
+            msg->bufferOffset += nwritten;
+        }
+
+        if (msg->bufferOffset == msg->bufferLength) {
+            ctl->msgReadyWrite = true;
+            return msg->bufferLength;
+        }
+    }
+}
+
+
 ssize_t
 iohelperRead(iohelperCtlPtr ctl,
              char *bytes,
@@ -206,12 +256,57 @@ iohelperRead(iohelperCtlPtr ctl,
 
 
 ssize_t
-iohelperWrite(iohelperCtlPtr ctl ATTRIBUTE_UNUSED,
-              const char *bytes ATTRIBUTE_UNUSED,
-              size_t nbytes ATTRIBUTE_UNUSED)
+iohelperWrite(iohelperCtlPtr ctl,
+              const char *bytes,
+              size_t nbytes)
 {
-    virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                   _("sparse stream not supported"));
+    size_t headerLen;
+    ssize_t nwritten, totalNwritten = 0;
+
+    virNetMessagePtr msg = ctl->msg;
+
+    if (!messageReadyWrite(ctl)) {
+        /* Okay, the outgoing message is not fully sent. Try to
+         * finish the sending and recheck. */
+        if ((nwritten = messageSend(ctl)) < 0)
+            return -1;
+
+        if (!nwritten && errno != EAGAIN)
+            return 0;
+
+        if (!messageReadyWrite(ctl)) {
+            errno = EAGAIN;
+            return -2;
+        }
+
+        totalNwritten += nwritten;
+    }
+
+    memset(&msg->header, 0, sizeof(msg->header));
+    msg->header.type = VIR_NET_STREAM;
+    msg->header.status = nbytes ? VIR_NET_CONTINUE : VIR_NET_OK;
+
+    /* Encoding a message is fatal and we should discard any
+     * partially encoded message. */
+    if (virNetMessageEncodeHeader(msg) < 0)
+        goto error;
+
+    headerLen = msg->bufferOffset;
+
+    if (virNetMessageEncodePayloadRaw(msg, bytes, nbytes) < 0)
+        goto error;
+
+    /* At this point, the message is successfully encoded. Don't
+     * discard it if something below fails. */
+    if ((nwritten = messageSend(ctl)) < 0)
+        return -1;
+
+    totalNwritten += nwritten - headerLen;
+
+    return totalNwritten;
+
+ error:
+    messageClear(ctl);
     return -1;
 }
 
-- 
2.8.4

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list



[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]