Re: [PATCH 1/7] virCommand: Introduce virCommandDoAsyncIO

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 01/23/13 10:41, Michal Privoznik wrote:
Currently, if we want to feed stdin, or catch stdout or stderr of a
virCommand we have to use virCommandRun(). When using virCommandRunAsync()
we have to register FD handles by hand. This may lead to code duplication.
Hence, introduce an internal API, which does this automatically within
virCommandRunAsync(). The intended usage looks like this:

     virCommandPtr cmd = virCommandNew*(...);
     char *buf = NULL;

     ...

     virCommandSetOutputBuffer(cmd, &buf);
     virCommandDoAsyncIO(cmd);

     if (virCommandRunAsync(cmd, NULL) < 0)
         goto cleanup;

     ...

     if (virCommandWait(cmd, NULL) < 0)
         goto cleanup;

     /* @buf now contains @cmd's stdout */
     VIR_DEBUG("STDOUT: %s", NULLSTR(buf));

     ...

cleanup:
     VIR_FREE(buf);
     virCommandFree(cmd);

Note, that both stdout and stderr buffers may change until virCommandWait()
returns.
---
  src/libvirt_private.syms |   1 +
  src/util/vircommand.c    | 214 +++++++++++++++++++++++++++++++++++++++++++++--
  src/util/vircommand.h    |   1 +
  3 files changed, 208 insertions(+), 8 deletions(-)

diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms
index fc23adc..f89d1aa 100644
--- a/src/libvirt_private.syms
+++ b/src/libvirt_private.syms
@@ -142,6 +142,7 @@ virCommandAddEnvString;
  virCommandAllowCap;
  virCommandClearCaps;
  virCommandDaemonize;
+virCommandDoAsyncIO;
  virCommandExec;
  virCommandFree;
  virCommandHandshakeNotify;
diff --git a/src/util/vircommand.c b/src/util/vircommand.c
index 8566d1a..5d67bd2 100644
--- a/src/util/vircommand.c
+++ b/src/util/vircommand.c
@@ -47,11 +47,12 @@

  /* Flags for virExecWithHook */
  enum {
-    VIR_EXEC_NONE   = 0,
-    VIR_EXEC_NONBLOCK = (1 << 0),
-    VIR_EXEC_DAEMON = (1 << 1),
+    VIR_EXEC_NONE       = 0,
+    VIR_EXEC_NONBLOCK   = (1 << 0),
+    VIR_EXEC_DAEMON     = (1 << 1),
      VIR_EXEC_CLEAR_CAPS = (1 << 2),
-    VIR_EXEC_RUN_SYNC = (1 << 3),
+    VIR_EXEC_RUN_SYNC   = (1 << 3),
+    VIR_EXEC_ASYNC_IO   = (1 << 4),
  };

  struct _virCommand {
@@ -84,6 +85,11 @@ struct _virCommand {
      int *outfdptr;
      int *errfdptr;

+    size_t inbufOffset;
+    int inWatch;
+    int outWatch;
+    int errWatch;
+
      bool handshake;
      int handshakeWait[2];
      int handshakeNotify[2];
@@ -779,6 +785,7 @@ virCommandNewArgs(const char *const*args)
      cmd->handshakeNotify[1] = -1;

      cmd->infd = cmd->outfd = cmd->errfd = -1;
+    cmd->inWatch = cmd->outWatch = cmd->errWatch = -1;
      cmd->pid = -1;

      virCommandAddArgSet(cmd, args);
@@ -2122,6 +2129,152 @@ virCommandHook(void *data)
  }


+static void
+virCommandHandleReadWrite(int watch, int fd, int events, void *opaque)
+{
+    virCommandPtr cmd = (virCommandPtr) opaque;
+    char **bufptr = NULL;
+    char buf[1024];
+    ssize_t nread;
+    size_t len = 0;
+    int *watchPtr = NULL;
+
+    VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events);
+
+    if (watch == cmd->inWatch) {
+        watchPtr = &cmd->inWatch;
+
+        if (events & VIR_EVENT_HANDLE_WRITABLE) {
+            len = strlen(cmd->inbuf);

I suppose this isn't intended to work on non-string data. It is worth documenting this so that it doesn't surprise someone.

+
+            while ((nread = write(fd, cmd->inbuf + cmd->inbufOffset,
+                                  len - cmd->inbufOffset))) {

Hm, what's the difference between len and inbuffOffset here?

+                if (nread < 0) {
+                    if (errno != EAGAIN)
+                        virReportSystemError(errno,
+                                             _("Unable to write command's "
+                                               "input to FD %d"),
+                                             fd);
+                    break;
+                }
+
+                cmd->inbufOffset += nread;
+                if (cmd->inbufOffset == len)
+                    VIR_FORCE_CLOSE(cmd->infd);
+            }
+        }
+    } else {
+        if (watch == cmd->outWatch) {
+            watchPtr = &cmd->outWatch;
+            bufptr = cmd->outbuf;
+        }
+
+        if (watch == cmd->errWatch) {
+            watchPtr = &cmd->errWatch;
+            bufptr = cmd->errbuf;
+        }
+
+        if (bufptr && events & VIR_EVENT_HANDLE_READABLE) {
+            if (*bufptr)
+                len = strlen(*bufptr);
+
+            while ((nread = read(fd, buf, sizeof(buf)))) {
+                if (nread < 0) {
+                    if (errno != EAGAIN)
+                        virReportSystemError(errno,
+                                             _("unable to read command's "
+                                               "output from FD %d"),
+                                             fd);
+                    break;
+                }
+
+                if (VIR_REALLOC_N(*bufptr, len + nread + 1) < 0) {
+                    virReportOOMError();
+                    break;
+                }
+
+                memcpy(*bufptr + len, buf, nread);
+                (*bufptr)[len + nread] = '\0';
+            }
+        }
+    }
+
+    if (events & VIR_EVENT_HANDLE_HANGUP) {
+        *watchPtr = -1;
+        virEventRemoveHandle(watch);
+    }
+}
+
+
+static int
+virCommandRegisterEventLoop(virCommandPtr cmd)
+{
+    int ret = -1;
+
+    if (cmd->inbuf) {
+        if (virSetNonBlock(cmd->infd) < 0) {
+            virReportSystemError(errno,
+                                 _("Failed to set non-blocking flag on FD %d"),
+                                 cmd->infd);
+            goto cleanup;
+        }
+
+        if ((cmd->inWatch = virEventAddHandle(cmd->infd,
+                                              VIR_EVENT_HANDLE_WRITABLE,
+                                              virCommandHandleReadWrite,
+                                              cmd, NULL)) < 0) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Unable to register infd %d in the event loop"),
+                           cmd->infd);
+            goto cleanup;
+        }
+    }
+
+    if (cmd->outbuf && cmd->outfdptr == &cmd->outfd) {
+        if (virSetNonBlock(cmd->outfd) < 0) {
+            virReportSystemError(errno,
+                                 _("Failed to set non-blocking flag on FD %d"),
+                                 cmd->outfd);
+            goto cleanup;
+        }
+
+        if ((cmd->outWatch = virEventAddHandle(cmd->outfd,
+                                               VIR_EVENT_HANDLE_READABLE,
+                                               virCommandHandleReadWrite,
+                                               cmd, NULL)) < 0) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Unable to register outfd %d in the event loop"),
+                           cmd->outfd);
+            goto cleanup;
+        }
+    }
+
+    if (cmd->errbuf && cmd->errfdptr == &cmd->errfd) {
+        if (virSetNonBlock(cmd->errfd) < 0) {
+            virReportSystemError(errno,
+                                 _("Failed to set non-blocking flag on FD %d"),
+                                 cmd->errfd);
+            goto cleanup;
+        }
+
+        if ((cmd->errWatch = virEventAddHandle(cmd->errfd,
+                                               VIR_EVENT_HANDLE_READABLE,
+                                               virCommandHandleReadWrite,
+                                               cmd, NULL)) < 0) {
+            virReportError(VIR_ERR_INTERNAL_ERROR,
+                           _("Unable to register errfd %d in the event loop"),
+                           cmd->errfd);
+            goto cleanup;
+        }
+    }
+
+    ret = 0;
+
+cleanup:
+    return ret;
+}
+
+
  /**
   * virCommandRunAsync:
   * @cmd: command to start
@@ -2149,6 +2302,7 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
      char *str;
      int i;
      bool synchronous = false;
+    int infd[2] = {-1, -1};

      if (!cmd || cmd->has_error == ENOMEM) {
          virReportOOMError();
@@ -2163,10 +2317,23 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
      synchronous = cmd->flags & VIR_EXEC_RUN_SYNC;
      cmd->flags &= ~VIR_EXEC_RUN_SYNC;

-    /* Buffer management can only be requested via virCommandRun.  */
-    if ((cmd->inbuf && cmd->infd == -1) ||
-        (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
-        (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
+    /* Buffer management can only be requested via virCommandRun, unless help
+     * from the event loop has been requested via virCommandDoAsyncIO. */
+    if (cmd->flags & VIR_EXEC_ASYNC_IO) {
+        /* If we have an input buffer, we need
+         * a pipe to feed the data to the child */
+        if (cmd->inbuf && cmd->infd == -1) {
+            if (pipe2(infd, O_CLOEXEC) < 0) {
+                virReportSystemError(errno, "%s",
+                                     _("unable to open pipe"));
+                cmd->has_error = -1;
+                return -1;
+            }
+            cmd->infd = infd[0];
+        }
+    } else if ((cmd->inbuf && cmd->infd == -1) ||
+         (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
+         (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
          virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                         _("cannot mix string I/O with asynchronous command"));
          return -1;
@@ -2228,6 +2395,16 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
      else
          cmd->reap = true;

+    if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) {
+        cmd->flags &= ~VIR_EXEC_ASYNC_IO;
+        if (cmd->inbuf && cmd->infd != -1) {
+            /* close the read end of infd and replace it with the write end */
+            VIR_FORCE_CLOSE(cmd->infd);
+            cmd->infd = infd[1];
+        }
+        ret = virCommandRegisterEventLoop(cmd);
+    }
+
      return ret;
  }

@@ -2265,6 +2442,11 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
          return -1;
      }

+    while (cmd->inWatch != -1 &&
+           cmd->outWatch != -1 &&
+           cmd->errWatch != -1)
+        usleep(100);

Hm, is there a possibility to avoid this active wait loop?

+
      /* If virProcessWait reaps pid but then returns failure because
       * exitstatus was NULL, then a second virCommandWait would risk
       * calling waitpid on an unrelated process.  Besides, that error
@@ -2516,8 +2698,24 @@ virCommandFree(virCommandPtr cmd)
      if (cmd->reap)
          virCommandAbort(cmd);

+    if (cmd->inWatch != -1)
+        virEventRemoveHandle(cmd->inWatch);
+    if (cmd->outWatch != -1)
+        virEventRemoveHandle(cmd->outWatch);
+    if (cmd->errWatch != -1)
+        virEventRemoveHandle(cmd->errWatch);
+
      VIR_FREE(cmd->transfer);
      VIR_FREE(cmd->preserve);

      VIR_FREE(cmd);
  }
+
+void
+virCommandDoAsyncIO(virCommandPtr cmd)
+{
+   if (!cmd || cmd->has_error)
+       return;
+
+   cmd->flags |= VIR_EXEC_ASYNC_IO;
+}
diff --git a/src/util/vircommand.h b/src/util/vircommand.h
index 9b7117d..c1a2e24 100644
--- a/src/util/vircommand.h
+++ b/src/util/vircommand.h
@@ -163,4 +163,5 @@ void virCommandAbort(virCommandPtr cmd);

  void virCommandFree(virCommandPtr cmd);

+void virCommandDoAsyncIO(virCommandPtr cmd);
  #endif /* __VIR_COMMAND_H__ */


Peter

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]