[PATCH 6/8] Introduce generic RPC client objects

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



To facilitate creation of new clients using XDR RPC services,
pull alot of the remote driver code into a set of reusable
objects.

 - virNetClient: Encapsulates a socket connection to a
   remote RPC server. Handles all the network I/O for
   reading/writing RPC messages. Delegates RPC encoding
   and decoding to the registered programs

 - virNetClientProgram: Handles processing and dispatch
   of RPC messages for a single RPC (program,version).
   A program can register to receive async events
   from a client
 - virNetClientSASLContext: Handles everything todo with
   SASL authentication and encryption. The callers no
   longer need directly call any cyrus-sasl APIs, which
   means error handling is simpler & alternative SASL
   impls can be provided for Win32

Each new client program now merely needs to define the list of
RPC procedures & events it wants and their handlers. It does
not need to deal with any of the network I/O functionality at
all.
---
 src/Makefile.am                   |   14 +-
 src/rpc/virnetclient.c            | 1237 +++++++++++++++++++++++++++++++++++++
 src/rpc/virnetclient.h            |   60 ++
 src/rpc/virnetclientprogram.c     |  258 ++++++++
 src/rpc/virnetclientprogram.h     |   71 +++
 src/rpc/virnetclientsaslcontext.c |  246 ++++++++
 src/rpc/virnetclientsaslcontext.h |   66 ++
 7 files changed, 1951 insertions(+), 1 deletions(-)
 create mode 100644 src/rpc/virnetclient.c
 create mode 100644 src/rpc/virnetclient.h
 create mode 100644 src/rpc/virnetclientprogram.c
 create mode 100644 src/rpc/virnetclientprogram.h
 create mode 100644 src/rpc/virnetclientsaslcontext.c
 create mode 100644 src/rpc/virnetclientsaslcontext.h

diff --git a/src/Makefile.am b/src/Makefile.am
index e78a0af..4c6efa8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1117,7 +1117,7 @@ libvirt_qemu_la_LIBADD = libvirt.la $(CYGWIN_EXTRA_LIBADD)
 EXTRA_DIST += $(LIBVIRT_QEMU_SYMBOL_FILE)
 
 
-noinst_LTLIBRARIES += libvirt-net-rpc.la libvirt-net-rpc-server.la
+noinst_LTLIBRARIES += libvirt-net-rpc.la libvirt-net-rpc-server.la libvirt-net-rpc-client.la
 
 libvirt_net_rpc_la_SOURCES = \
 	../daemon/event.c \
@@ -1153,6 +1153,18 @@ libvirt_net_server_la_LDFLAGS = \
 libvirt_net_server_la_LIBADD = \
 			$(CYGWIN_EXTRA_LIBADD)
 
+libvirt_net_client_la_SOURCES = \
+	rpc/virnetclientsaslcontext.h rpc/virnetclientsaslcontext.c \
+	rpc/virnetclientprogram.h rpc/virnetclientprogram.c \
+	rpc/virnetclient.h rpc/virnetclient.c
+libvirt_net_client_la_CFLAGS = \
+			$(AM_CFLAGS)
+libvirt_net_client_la_LDFLAGS = \
+			$(AM_LDFLAGS) \
+			$(CYGWIN_EXTRA_LDFLAGS) \
+			$(MINGW_EXTRA_LDFLAGS)l
+libvirt_net_client_la_LIBADD = \
+			$(CYGWIN_EXTRA_LIBADD)
 
 libexec_PROGRAMS =
 
diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
new file mode 100644
index 0000000..d3a8740
--- /dev/null
+++ b/src/rpc/virnetclient.c
@@ -0,0 +1,1237 @@
+
+
+#include <config.h>
+
+#include <unistd.h>
+#include <poll.h>
+#include <signal.h>
+
+#include "virnetclient.h"
+#include "virnetsocket.h"
+#include "memory.h"
+#include "threads.h"
+#include "files.h"
+#include "logging.h"
+#include "util.h"
+#include "virterror_internal.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(NULL, VIR_FROM_RPC, code, __FILE__,      \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+#ifdef WIN32
+# define pipe(fds) _pipe(fds,4096, _O_BINARY)
+#endif
+
+typedef struct _virNetClientCall virNetClientCall;
+typedef virNetClientCall *virNetClientCallPtr;
+
+enum {
+    VIR_NET_CLIENT_MODE_WAIT_TX,
+    VIR_NET_CLIENT_MODE_WAIT_RX,
+    VIR_NET_CLIENT_MODE_COMPLETE,
+};
+
+struct _virNetClientCall {
+    int mode;
+
+    virNetMessagePtr msg;
+    int expectReply;
+
+    virCond cond;
+
+
+/*    remote_error err; */
+
+    virNetClientCallPtr next;
+};
+
+
+struct _virNetClient {
+    int refs;
+
+    virMutex lock;
+
+    virNetSocketPtr sock;
+
+    virNetTLSSessionPtr tls;
+    char *hostname;
+
+    virNetClientProgramPtr *programs;
+    size_t nprograms;
+
+    /* For incoming message packets */
+    virNetMessage msg;
+
+#if HAVE_SASL
+    virNetClientSaslContextPtr sasl;
+
+    const char *saslDecoded;
+    size_t saslDecodedLength;
+    size_t saslDecodedOffset;
+
+    const char *saslEncoded;
+    size_t saslEncodedLength;
+    size_t saslEncodedOffset;
+#endif
+
+    /* Self-pipe to wakeup threads waiting in poll() */
+    int wakeupSendFD;
+    int wakeupReadFD;
+
+    /* List of threads currently waiting for dispatch */
+    virNetClientCallPtr waitDispatch;
+};
+
+
+static void virNetClientLock(virNetClientPtr client)
+{
+    virMutexLock(&client->lock);
+}
+
+
+static void virNetClientUnlock(virNetClientPtr client)
+{
+    virMutexUnlock(&client->lock);
+}
+
+static ssize_t virNetClientTLSWriteFunc(const char *buf, size_t len,
+                                        void *opaque)
+{
+    virNetClientPtr client = opaque;
+
+    return virNetSocketWrite(client->sock, buf, len);
+}
+
+
+static ssize_t virNetClientTLSReadFunc(char *buf, size_t len,
+                                       void *opaque)
+{
+    virNetClientPtr client = opaque;
+
+    return virNetSocketRead(client->sock, buf, len);
+}
+
+
+static void virNetClientIncomingEvent(virNetSocketPtr sock,
+                                      int events,
+                                      void *opaque);
+
+static virNetClientPtr virNetClientNew(virNetSocketPtr sock,
+                                       const char *hostname)
+{
+    virNetClientPtr client;
+    int wakeupFD[2] = { -1, -1 };
+
+    if (pipe(wakeupFD) < 0) {
+        virReportSystemError(errno, "%s",
+                             _("unable to make pipe"));
+        goto error;
+    }
+
+    if (VIR_ALLOC(client) < 0)
+        goto no_memory;
+
+    client->refs = 1;
+
+    if (virMutexInit(&client->lock) < 0)
+        goto error;
+
+    client->sock = sock;
+    client->wakeupReadFD = wakeupFD[0];
+    client->wakeupSendFD = wakeupFD[1];
+    wakeupFD[0] = wakeupFD[1] = -1;
+
+    if (hostname &&
+        !(client->hostname = strdup(hostname)))
+        goto no_memory;
+
+    /* Set up a callback to listen on the socket data */
+    if (virNetSocketAddIOCallback(client->sock,
+                                  VIR_EVENT_HANDLE_READABLE,
+                                  virNetClientIncomingEvent,
+                                  client) < 0)
+        VIR_DEBUG0("Failed to add event watch, disabling events");
+
+    return client;
+
+no_memory:
+    virReportOOMError();
+error:
+    VIR_FORCE_CLOSE(wakeupFD[0]);
+    VIR_FORCE_CLOSE(wakeupFD[1]);
+    virNetClientFree(client);
+    return NULL;
+}
+
+
+virNetClientPtr virNetClientNewUNIX(const char *path,
+                                    bool spawnDaemon,
+                                    const char *binary)
+{
+    virNetSocketPtr sock;
+
+    if (virNetSocketNewConnectUNIX(path, spawnDaemon, binary, &sock) < 0)
+        return NULL;
+
+    return virNetClientNew(sock, NULL);
+}
+
+
+virNetClientPtr virNetClientNewTCP(const char *nodename,
+                                   const char *service)
+{
+    virNetSocketPtr sock;
+
+    if (virNetSocketNewConnectTCP(nodename, service, &sock) < 0)
+        return NULL;
+
+    return virNetClientNew(sock, nodename);
+}
+
+virNetClientPtr virNetClientNewSSH(const char *nodename,
+                                   const char *service,
+                                   const char *binary,
+                                   const char *username,
+                                   bool noTTY,
+                                   const char *netcat,
+                                   const char *path)
+{
+    virNetSocketPtr sock;
+
+    if (virNetSocketNewConnectSSH(nodename, service, binary, username, noTTY, netcat, path, &sock) < 0)
+        return NULL;
+
+    return virNetClientNew(sock, NULL);
+}
+
+virNetClientPtr virNetClientNewCommand(const char **cmdargv,
+                                       const char **cmdenv)
+{
+    virNetSocketPtr sock;
+
+    if (virNetSocketNewConnectCommand(cmdargv, cmdenv, &sock) < 0)
+        return NULL;
+
+    return virNetClientNew(sock, NULL);
+}
+
+
+void virNetClientRef(virNetClientPtr client)
+{
+    virNetClientLock(client);
+    client->refs++;
+    virNetClientUnlock(client);
+}
+
+
+void virNetClientFree(virNetClientPtr client)
+{
+    int i;
+
+    if (!client)
+        return;
+
+    virNetClientLock(client);
+    client->refs--;
+    if (client->refs > 0) {
+        virNetClientUnlock(client);
+        return;
+    }
+
+    for (i = 0 ; i < client->nprograms ; i++)
+        virNetClientProgramFree(client->programs[i]);
+    VIR_FREE(client->programs);
+
+    VIR_FORCE_CLOSE(client->wakeupSendFD);
+    VIR_FORCE_CLOSE(client->wakeupReadFD);
+
+    VIR_FREE(client->hostname);
+
+    virNetSocketRemoveIOCallback(client->sock);
+    virNetSocketFree(client->sock);
+    virNetTLSSessionFree(client->tls);
+    virNetClientSaslContextFree(client->sasl);
+    virNetClientUnlock(client);
+    virMutexDestroy(&client->lock);
+
+    VIR_FREE(client);
+}
+
+
+void virNetClientSetSASLContext(virNetClientPtr client,
+                                virNetClientSaslContextPtr ctxt)
+{
+    virNetClientLock(client);
+    client->sasl = ctxt;
+    virNetClientSaslContextRef(ctxt);
+    virNetClientUnlock(client);
+}
+
+
+int virNetClientSetTLSSession(virNetClientPtr client,
+                              virNetTLSContextPtr tls)
+{
+    int ret;
+    char buf[1];
+    int len;
+    struct pollfd fds[1];
+#ifdef HAVE_PTHREAD_SIGMASK
+    sigset_t oldmask, blockedsigs;
+
+    sigemptyset (&blockedsigs);
+    sigaddset (&blockedsigs, SIGWINCH);
+    sigaddset (&blockedsigs, SIGCHLD);
+    sigaddset (&blockedsigs, SIGPIPE);
+#endif
+
+    virNetClientLock(client);
+
+    if (!(client->tls = virNetTLSSessionNew(tls,
+                                            client->hostname,
+                                            virNetClientTLSWriteFunc,
+                                            virNetClientTLSReadFunc,
+                                            client)))
+        goto error;
+
+    for (;;) {
+        ret = virNetTLSSessionHandshake(client->tls);
+
+        if (ret < 0)
+            goto error;
+        if (ret == 0)
+            break;
+
+        fds[0].fd = virNetSocketFD(client->sock);
+        fds[0].revents = 0;
+        if (virNetTLSSessionHandshakeDirection(client->tls) == 0)
+            fds[0].events = POLLIN;
+        else
+            fds[0].events = POLLOUT;
+
+        /* Block SIGWINCH from interrupting poll in curses programs,
+         * then restore the original signal mask again immediately
+         * after the call (RHBZ#567931).  Same for SIGCHLD and SIGPIPE
+         * at the suggestion of Paolo Bonzini and Daniel Berrange.
+         */
+#ifdef HAVE_PTHREAD_SIGMASK
+        ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
+#endif
+
+    repoll:
+        ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+        if (ret < 0 && errno == EAGAIN)
+            goto repoll;
+
+#ifdef HAVE_PTHREAD_SIGMASK
+        ignore_value(pthread_sigmask(SIG_BLOCK, &oldmask, NULL));
+#endif
+    }
+
+    ret = virNetTLSContextCheckCertificate(tls, client->tls);
+
+    if (ret < 0)
+        goto error;
+
+    /* At this point, the server is verifying _our_ certificate, IP address,
+     * etc.  If we make the grade, it will send us a '\1' byte.
+     */
+
+    fds[0].fd = virNetSocketFD(client->sock);
+    fds[0].revents = 0;
+    fds[0].events = POLLIN;
+
+#ifdef HAVE_PTHREAD_SIGMASK
+    /* Block SIGWINCH from interrupting poll in curses programs */
+    ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
+#endif
+
+    repoll2:
+    ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+    if (ret < 0 && errno == EAGAIN)
+        goto repoll2;
+
+#ifdef HAVE_PTHREAD_SIGMASK
+    ignore_value(pthread_sigmask(SIG_BLOCK, &oldmask, NULL));
+#endif
+
+    len = virNetTLSSessionRead(client->tls, buf, 1);
+    if (len < 0) {
+        virReportSystemError(errno, "%s",
+                             _("Unable to read TLS confirmation"));
+        goto error;
+    }
+    if (len != 1 || buf[0] != '\1') {
+        virNetError(VIR_ERR_RPC, "%s",
+                    _("server verification (of our certificate or IP "
+                      "address) failed"));
+        goto error;
+    }
+
+    virNetClientUnlock(client);
+    return 0;
+
+error:
+    virNetTLSSessionFree(client->tls);
+    client->tls = NULL;
+    virNetClientUnlock(client);
+    return -1;
+}
+
+bool virNetClientIsEncrypted(virNetClientPtr client)
+{
+    bool ret;
+    virNetClientLock(client);
+    ret = client->tls || client->sasl ? true : false;
+    virNetClientUnlock(client);
+    return ret;
+}
+
+
+int virNetClientAddProgram(virNetClientPtr client,
+                           virNetClientProgramPtr prog)
+{
+    virNetClientLock(client);
+
+    if (VIR_EXPAND_N(client->programs, client->nprograms, 1) < 0)
+        goto no_memory;
+
+    client->programs[client->nprograms-1] = prog;
+    virNetClientProgramRef(prog);
+
+    virNetClientUnlock(client);
+    return 0;
+
+no_memory:
+    virReportOOMError();
+    virNetClientUnlock(client);
+    return -1;
+}
+
+
+const char *virNetClientLocalAddrString(virNetClientPtr client)
+{
+    return virNetSocketLocalAddrString(client->sock);
+}
+
+const char *virNetClientRemoteAddrString(virNetClientPtr client)
+{
+    return virNetSocketRemoteAddrString(client->sock);
+}
+
+int virNetClientGetTLSKeySize(virNetClientPtr client)
+{
+    int ret = 0;
+    virNetClientLock(client);
+    if (client->tls)
+        ret = virNetTLSSessionGetKeySize(client->tls);
+    virNetClientUnlock(client);
+    return ret;
+}
+
+static int
+virNetClientCallDispatchReply(virNetClientPtr client)
+{
+    virNetClientCallPtr thecall;
+
+    /* Ok, definitely got an RPC reply now find
+       out who's been waiting for it */
+    thecall = client->waitDispatch;
+    while (thecall &&
+           !(thecall->msg->header.prog == client->msg.header.prog &&
+             thecall->msg->header.vers == client->msg.header.vers &&
+             thecall->msg->header.serial == client->msg.header.serial))
+        thecall = thecall->next;
+
+    if (!thecall) {
+        virNetError(VIR_ERR_RPC,
+                    _("no call waiting for reply with prog %d vers %d serial %d"),
+                    client->msg.header.prog, client->msg.header.vers, client->msg.header.serial);
+        return -1;
+    }
+
+    memcpy(thecall->msg->buffer, client->msg.buffer, sizeof(client->msg.buffer));
+    memcpy(&thecall->msg->header, &client->msg.header, sizeof(client->msg.header));
+    thecall->msg->bufferLength = client->msg.bufferLength;
+    thecall->msg->bufferOffset = client->msg.bufferOffset;
+
+    thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
+
+    return 0;
+}
+
+static int virNetClientCallDispatchMessage(virNetClientPtr client)
+{
+    int i;
+    virNetClientProgramPtr prog = NULL;
+
+    for (i = 0 ; i < client->nprograms ; i++) {
+        if (virNetClientProgramMatches(client->programs[i],
+                                       &client->msg)) {
+            prog = client->programs[i];
+            break;
+        }
+    }
+    if (!prog) {
+        VIR_DEBUG("No program found for event with prog=%d vers=%d",
+                  client->msg.header.prog, client->msg.header.vers);
+        return -1;
+    }
+
+    virNetClientProgramDispatch(prog, client, &client->msg);
+
+    return 0;
+}
+
+static int virNetClientCallDispatchStream(virNetClientPtr client ATTRIBUTE_UNUSED)
+{
+#if 0
+    struct private_stream_data *privst;
+    virNetClientCallPtr thecall;
+
+    /* Try and find a matching stream */
+    privst = client->streams;
+    while (privst &&
+           privst->serial != hdr->serial &&
+           privst->proc_nr != hdr->proc)
+        privst = privst->next;
+
+    if (!privst) {
+        VIR_DEBUG("No registered stream matching serial=%d, proc=%d",
+                  hdr->serial, hdr->proc);
+        return -1;
+    }
+
+    /* See if there's also a (optional) call waiting for this reply */
+    thecall = client->waitDispatch;
+    while (thecall &&
+           thecall->serial != hdr->serial)
+        thecall = thecall->next;
+
+
+    /* Status is either REMOTE_OK (meaning that what follows is a ret
+     * structure), or REMOTE_ERROR (and what follows is a remote_error
+     * structure).
+     */
+    switch (hdr->status) {
+    case REMOTE_CONTINUE: {
+        int avail = privst->incomingLength - privst->incomingOffset;
+        int need = client->bufferLength - client->bufferOffset;
+        VIR_DEBUG0("Got a stream data packet");
+
+        /* XXX flag stream as complete somwhere if need==0 */
+
+        if (need > avail) {
+            int extra = need - avail;
+            if (VIR_REALLOC_N(privst->incoming,
+                              privst->incomingLength + extra) < 0) {
+                VIR_DEBUG0("Out of memory handling stream data");
+                return -1;
+            }
+            privst->incomingLength += extra;
+        }
+
+        memcpy(privst->incoming + privst->incomingOffset,
+               client->buffer + client->bufferOffset,
+               client->bufferLength - client->bufferOffset);
+        privst->incomingOffset += (client->bufferLength - client->bufferOffset);
+
+        if (thecall && thecall->want_reply) {
+            VIR_DEBUG("Got sync data packet offset=%d", privst->incomingOffset);
+            thecall->mode = REMOTE_MODE_COMPLETE;
+        } else {
+            VIR_DEBUG("Got aysnc data packet offset=%d", privst->incomingOffset);
+            remoteStreamEventTimerUpdate(privst);
+        }
+        return 0;
+    }
+
+    case REMOTE_OK:
+        VIR_DEBUG0("Got a synchronous confirm");
+        if (!thecall) {
+            VIR_DEBUG0("Got unexpected stream finish confirmation");
+            return -1;
+        }
+        thecall->mode = REMOTE_MODE_COMPLETE;
+        return 0;
+
+    case REMOTE_ERROR:
+        if (thecall && thecall->want_reply) {
+            VIR_DEBUG0("Got a synchronous error");
+            /* Give the error straight to this call */
+            memset (&thecall->err, 0, sizeof thecall->err);
+            if (!xdr_remote_error (xdr, &thecall->err)) {
+                remoteError(VIR_ERR_RPC, "%s", _("unmarshalling remote_error"));
+                return -1;
+            }
+            thecall->mode = REMOTE_MODE_ERROR;
+        } else {
+            VIR_DEBUG0("Got a asynchronous error");
+            /* No call, so queue the error against the stream */
+            if (privst->has_error) {
+                VIR_DEBUG0("Got unexpected duplicate stream error");
+                return -1;
+            }
+            privst->has_error = 1;
+            memset (&privst->err, 0, sizeof privst->err);
+            if (!xdr_remote_error (xdr, &privst->err)) {
+                VIR_DEBUG0("Failed to unmarshall error");
+                return -1;
+            }
+        }
+        return 0;
+
+    default:
+        VIR_WARN("Stream with unexpected serial=%d, proc=%d, status=%d",
+                 hdr->serial, hdr->proc, hdr->status);
+        return -1;
+    }
+#endif
+    return 0;
+}
+
+
+static int
+virNetClientCallDispatch(virNetClientPtr client)
+{
+    if (virNetMessageDecodeHeader(&client->msg) < 0)
+        return -1;
+
+    switch (client->msg.header.type) {
+    case VIR_NET_REPLY: /* Normal RPC replies */
+        return virNetClientCallDispatchReply(client);
+
+    case VIR_NET_MESSAGE: /* Async notifications */
+        return virNetClientCallDispatchMessage(client);
+
+    case VIR_NET_STREAM: /* Stream protocol */
+        return virNetClientCallDispatchStream(client);
+
+    default:
+        virNetError(VIR_ERR_RPC,
+                    _("got unexpected RPC call prog %d vers %d proc %d type %d"),
+                    client->msg.header.prog, client->msg.header.vers,
+                    client->msg.header.proc, client->msg.header.type);
+        return -1;
+    }
+}
+
+
+static ssize_t
+virNetClientIOWriteBuffer(virNetClientPtr client,
+                          const char *bytes, size_t len)
+{
+    ssize_t ret;
+
+    resend:
+    if (client->tls)
+        ret = virNetTLSSessionWrite(client->tls, bytes, len);
+    else
+        ret = virNetSocketWrite(client->sock, bytes, len);
+    if (ret < 0) {
+        if (errno == EINTR)
+            goto resend;
+        if (errno == EAGAIN)
+            return 0;
+
+        virReportSystemError(errno, "%s", _("cannot send data"));
+        return -1;
+    }
+
+    return ret;
+}
+
+
+static ssize_t
+virNetClientIOReadBuffer(virNetClientPtr client,
+                         char *bytes, size_t len)
+{
+    size_t ret;
+
+resend:
+    if (client->tls)
+        ret = virNetTLSSessionRead(client->tls, bytes, len);
+    else
+        ret = virNetSocketRead(client->sock, bytes, len);
+    if (ret <= 0) {
+        if (ret == -1) {
+            if (errno == EINTR)
+                goto resend;
+            if (errno == EAGAIN)
+                return 0;
+
+            virReportSystemError(errno, "%s",
+                                 _("cannot recv data"));
+        } else {
+            virNetError(VIR_ERR_SYSTEM_ERROR, "%s",
+                        _("server closed connection"));
+        }
+        return -1;
+    }
+
+    return ret;
+}
+
+
+static ssize_t
+virNetClientIOWriteMessage(virNetClientPtr client,
+                           virNetClientCallPtr thecall)
+{
+#if HAVE_SASL
+    if (client->sasl) {
+        const char *output;
+        size_t outputlen;
+        ssize_t ret;
+
+        if (!client->saslEncoded) {
+            if (virNetClientSaslContextEncode(client->sasl,
+                                              thecall->msg->buffer + thecall->msg->bufferOffset,
+                                              thecall->msg->bufferLength - thecall->msg->bufferOffset,
+                                              &output, &outputlen) < 0)
+                return -1;
+
+            client->saslEncoded = output;
+            client->saslEncodedLength = outputlen;
+            client->saslEncodedOffset = 0;
+
+            thecall->msg->bufferOffset = thecall->msg->bufferLength;
+        }
+
+        ret = virNetClientIOWriteBuffer(client,
+                                        client->saslEncoded + client->saslEncodedOffset,
+                                        client->saslEncodedLength - client->saslEncodedOffset);
+        if (ret < 0)
+            return ret;
+        client->saslEncodedOffset += ret;
+
+        if (client->saslEncodedOffset == client->saslEncodedLength) {
+            client->saslEncoded = NULL;
+            client->saslEncodedOffset = client->saslEncodedLength = 0;
+            if (thecall->expectReply)
+                thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
+            else
+                thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
+        }
+    } else {
+#endif
+        ssize_t ret;
+        ret = virNetClientIOWriteBuffer(client,
+                                        thecall->msg->buffer + thecall->msg->bufferOffset,
+                                        thecall->msg->bufferLength - thecall->msg->bufferOffset);
+        if (ret < 0)
+            return ret;
+        thecall->msg->bufferOffset += ret;
+
+        if (thecall->msg->bufferOffset == thecall->msg->bufferLength) {
+            thecall->msg->bufferOffset = thecall->msg->bufferLength = 0;
+            if (thecall->expectReply)
+                thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX;
+            else
+                thecall->mode = VIR_NET_CLIENT_MODE_COMPLETE;
+        }
+#if HAVE_SASL
+    }
+#endif
+    return 0;
+}
+
+
+static ssize_t
+virNetClientIOHandleOutput(virNetClientPtr client)
+{
+    virNetClientCallPtr thecall = client->waitDispatch;
+
+    while (thecall &&
+           thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX)
+        thecall = thecall->next;
+
+    if (!thecall)
+        return -1; /* Shouldn't happen, but you never know... */
+
+    while (thecall) {
+        ssize_t ret = virNetClientIOWriteMessage(client, thecall);
+        if (ret < 0)
+            return ret;
+
+        if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
+            return 0; /* Blocking write, to back to event loop */
+
+        thecall = thecall->next;
+    }
+
+    return 0; /* No more calls to send, all done */
+}
+
+static ssize_t
+virNetClientIOReadMessage(virNetClientPtr client)
+{
+    size_t wantData;
+
+    /* Start by reading length word */
+    if (client->msg.bufferLength == 0)
+        client->msg.bufferLength = 4;
+
+    wantData = client->msg.bufferLength - client->msg.bufferOffset;
+
+#if HAVE_SASL
+    if (client->sasl) {
+        if (client->saslDecoded == NULL) {
+            char encoded[8192];
+            ssize_t ret;
+            ret = virNetClientIOReadBuffer(client, encoded, sizeof(encoded));
+            if (ret < 0)
+                return -1;
+            if (ret == 0)
+                return 0;
+
+            if (virNetClientSaslContextDecode(client->sasl,
+                                              encoded,
+                                              ret,
+                                              &client->saslDecoded,
+                                              &client->saslDecodedLength) < 0)
+                return -1;
+
+            client->saslDecodedOffset = 0;
+        }
+
+        if ((client->saslDecodedLength - client->saslDecodedOffset) < wantData)
+            wantData = (client->saslDecodedLength - client->saslDecodedOffset);
+
+        memcpy(client->msg.buffer + client->msg.bufferOffset,
+               client->saslDecoded + client->saslDecodedOffset,
+               wantData);
+        client->saslDecodedOffset += wantData;
+        client->msg.bufferOffset += wantData;
+        if (client->saslDecodedOffset == client->saslDecodedLength) {
+            client->saslDecodedOffset = client->saslDecodedLength = 0;
+            client->saslDecoded = NULL;
+        }
+
+        return wantData;
+    } else {
+#endif
+        ssize_t ret;
+
+        ret = virNetClientIOReadBuffer(client,
+                                       client->msg.buffer + client->msg.bufferOffset,
+                                       wantData);
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;
+
+        client->msg.bufferOffset += ret;
+
+        return ret;
+#if HAVE_SASL
+    }
+#endif
+}
+
+
+static ssize_t
+virNetClientIOHandleInput(virNetClientPtr client)
+{
+    /* Read as much data as is available, until we get
+     * EAGAIN
+     */
+    for (;;) {
+        ssize_t ret = virNetClientIOReadMessage(client);
+
+        if (ret < 0)
+            return -1;
+        if (ret == 0)
+            return 0;  /* Blocking on read */
+
+        /* Check for completion of our goal */
+        if (client->msg.bufferOffset == client->msg.bufferLength) {
+            if (client->msg.bufferOffset == 4) {
+                ret = virNetMessageDecodeLength(&client->msg);
+                if (ret < 0)
+                    return -1;
+
+                /*
+                 * We'll carry on around the loop to immediately
+                 * process the message body, because it has probably
+                 * already arrived. Worst case, we'll get EAGAIN on
+                 * next iteration.
+                 */
+            } else {
+                ret = virNetClientCallDispatch(client);
+                client->msg.bufferOffset = client->msg.bufferLength = 0;
+                /*
+                 * We've completed one call, so return even
+                 * though there might still be more data on
+                 * the wire. We need to actually let the caller
+                 * deal with this arrived message to keep good
+                 * response, and also to correctly handle EOF.
+                 */
+                return ret;
+            }
+        }
+    }
+}
+
+
+/*
+ * Process all calls pending dispatch/receive until we
+ * get a reply to our own call. Then quit and pass the buck
+ * to someone else.
+ */
+static int virNetClientIOEventLoop(virNetClientPtr client,
+                                   virNetClientCallPtr thiscall)
+{
+    struct pollfd fds[2];
+    int ret;
+
+    fds[0].fd = virNetSocketFD(client->sock);
+    fds[1].fd = client->wakeupReadFD;
+
+    for (;;) {
+        virNetClientCallPtr tmp = client->waitDispatch;
+        virNetClientCallPtr prev;
+        char ignore;
+#ifdef HAVE_PTHREAD_SIGMASK
+        sigset_t oldmask, blockedsigs;
+#endif
+
+        fds[0].events = fds[0].revents = 0;
+        fds[1].events = fds[1].revents = 0;
+
+        fds[1].events = POLLIN;
+        while (tmp) {
+            if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
+                fds[0].events |= POLLIN;
+            if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
+                fds[0].events |= POLLOUT;
+
+            tmp = tmp->next;
+        }
+
+#if 0
+        XXX
+        if (client->streams)
+            fds[0].events |= POLLIN;
+#endif
+
+        /* Release lock while poll'ing so other threads
+         * can stuff themselves on the queue */
+        virNetClientUnlock(client);
+
+        /* Block SIGWINCH from interrupting poll in curses programs,
+         * then restore the original signal mask again immediately
+         * after the call (RHBZ#567931).  Same for SIGCHLD and SIGPIPE
+         * at the suggestion of Paolo Bonzini and Daniel Berrange.
+         */
+#ifdef HAVE_PTHREAD_SIGMASK
+        sigemptyset (&blockedsigs);
+        sigaddset (&blockedsigs, SIGWINCH);
+        sigaddset (&blockedsigs, SIGCHLD);
+        sigaddset (&blockedsigs, SIGPIPE);
+        ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
+#endif
+
+    repoll:
+        ret = poll(fds, ARRAY_CARDINALITY(fds), -1);
+        if (ret < 0 && errno == EAGAIN)
+            goto repoll;
+
+#ifdef HAVE_PTHREAD_SIGMASK
+        ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
+#endif
+
+        virNetClientLock(client);
+
+        if (fds[1].revents) {
+            VIR_DEBUG0("Woken up from poll by other thread");
+            if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
+                virReportSystemError(errno, "%s",
+                                     _("read on wakeup fd failed"));
+                goto error;
+            }
+        }
+
+        if (ret < 0) {
+            if (errno == EWOULDBLOCK)
+                continue;
+            virReportSystemError(errno,
+                                 "%s", _("poll on socket failed"));
+            goto error;
+        }
+
+        if (fds[0].revents & POLLOUT) {
+            if (virNetClientIOHandleOutput(client) < 0)
+                goto error;
+        }
+
+        if (fds[0].revents & POLLIN) {
+            if (virNetClientIOHandleInput(client) < 0)
+                goto error;
+        }
+
+        /* Iterate through waiting threads and if
+         * any are complete then tell 'em to wakeup
+         */
+        tmp = client->waitDispatch;
+        prev = NULL;
+        while (tmp) {
+            if (tmp != thiscall &&
+                tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+                /* Take them out of the list */
+                if (prev)
+                    prev->next = tmp->next;
+                else
+                    client->waitDispatch = tmp->next;
+
+                /* And wake them up....
+                 * ...they won't actually wakeup until
+                 * we release our mutex a short while
+                 * later...
+                 */
+                VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch);
+                virCondSignal(&tmp->cond);
+            }
+            prev = tmp;
+            tmp = tmp->next;
+        }
+
+        /* Now see if *we* are done */
+        if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+            /* We're at head of the list already, so
+             * remove us
+             */
+            client->waitDispatch = thiscall->next;
+            VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch);
+            /* See if someone else is still waiting
+             * and if so, then pass the buck ! */
+            if (client->waitDispatch) {
+                VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
+                virCondSignal(&client->waitDispatch->cond);
+            }
+            return 0;
+        }
+
+
+        if (fds[0].revents & (POLLHUP | POLLERR)) {
+            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("received hangup / error event on socket"));
+            goto error;
+        }
+    }
+
+
+error:
+    client->waitDispatch = thiscall->next;
+    VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch);
+    /* See if someone else is still waiting
+     * and if so, then pass the buck ! */
+    if (client->waitDispatch) {
+        VIR_DEBUG("Passing the buck to %p", client->waitDispatch);
+        virCondSignal(&client->waitDispatch->cond);
+    }
+    return -1;
+}
+
+
+/*
+ * This function sends a message to remote server and awaits a reply
+ *
+ * NB. This does not free the args structure (not desirable, since you
+ * often want this allocated on the stack or else it contains strings
+ * which come from the user).  It does however free any intermediate
+ * results, eg. the error structure if there is one.
+ *
+ * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
+ * else Bad Things will happen in the XDR code.
+ *
+ * NB(3) You must have the client lock before calling this
+ *
+ * NB(4) This is very complicated. Multiple threads are allowed to
+ * use the client for RPC at the same time. Obviously only one of
+ * them can. So if someone's using the socket, other threads are put
+ * to sleep on condition variables. The existing thread may completely
+ * send & receive their RPC call/reply while they're asleep. Or it
+ * may only get around to dealing with sending the call. Or it may
+ * get around to neither. So upon waking up from slumber, the other
+ * thread may or may not have more work todo.
+ *
+ * We call this dance  'passing the buck'
+ *
+ *      http://en.wikipedia.org/wiki/Passing_the_buck
+ *
+ *   "Buck passing or passing the buck is the action of transferring
+ *    responsibility or blame unto another person. It is also used as
+ *    a strategy in power politics when the actions of one country/
+ *    nation are blamed on another, providing an opportunity for war."
+ *
+ * NB(5) Don't Panic!
+ */
+static int virNetClientIO(virNetClientPtr client,
+                          virNetClientCallPtr thiscall)
+{
+    int rv = -1;
+
+    VIR_DEBUG("program=%u version=%u serial=%u proc=%d type=%d length=%d dispatach=%p",
+              thiscall->msg->header.prog,
+              thiscall->msg->header.vers,
+              thiscall->msg->header.serial,
+              thiscall->msg->header.proc,
+              thiscall->msg->header.type,
+              thiscall->msg->bufferLength,
+              client->waitDispatch);
+
+    /* Check to see if another thread is dispatching */
+    if (client->waitDispatch) {
+        /* Stick ourselves on the end of the wait queue */
+        virNetClientCallPtr tmp = client->waitDispatch;
+        char ignore = 1;
+        while (tmp && tmp->next)
+            tmp = tmp->next;
+        if (tmp)
+            tmp->next = thiscall;
+        else
+            client->waitDispatch = thiscall;
+
+        /* Force other thread to wakeup from poll */
+        if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
+            if (tmp)
+                tmp->next = NULL;
+            else
+                client->waitDispatch = NULL;
+            virReportSystemError(errno, "%s",
+                                 _("failed to wake up polling thread"));
+            return -1;
+        }
+
+        VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall);
+        /* Go to sleep while other thread is working... */
+        if (virCondWait(&thiscall->cond, &client->lock) < 0) {
+            if (client->waitDispatch == thiscall) {
+                client->waitDispatch = thiscall->next;
+            } else {
+                tmp = client->waitDispatch;
+                while (tmp && tmp->next &&
+                       tmp->next != thiscall) {
+                    tmp = tmp->next;
+                }
+                if (tmp && tmp->next == thiscall)
+                    tmp->next = thiscall->next;
+            }
+            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("failed to wait on condition"));
+            return -1;
+        }
+
+        VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall);
+        /* Two reasons we can be woken up
+         *  1. Other thread has got our reply ready for us
+         *  2. Other thread is all done, and it is our turn to
+         *     be the dispatcher to finish waiting for
+         *     our reply
+         */
+        if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
+            rv = 0;
+            /*
+             * We avoided catching the buck and our reply is ready !
+             * We've already had 'thiscall' removed from the list
+             * so just need to (maybe) handle errors & free it
+             */
+            goto cleanup;
+        }
+
+        /* Grr, someone passed the buck onto us ... */
+
+    } else {
+        /* We're first to catch the buck */
+        client->waitDispatch = thiscall;
+    }
+
+    VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);
+    /*
+     * The buck stops here!
+     *
+     * At this point we're about to own the dispatch
+     * process...
+     */
+
+    /*
+     * Avoid needless wake-ups of the event loop in the
+     * case where this call is being made from a different
+     * thread than the event loop. These wake-ups would
+     * cause the event loop thread to be blocked on the
+     * mutex for the duration of the call
+     */
+    virNetSocketUpdateIOCallback(client->sock, 0);
+
+    rv = virNetClientIOEventLoop(client, thiscall);
+
+    virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE);
+
+cleanup:
+    VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv);
+    return rv;
+}
+
+
+void virNetClientIncomingEvent(virNetSocketPtr sock,
+                               int events,
+                               void *opaque)
+{
+    virNetClientPtr client = opaque;
+
+    virNetClientLock(client);
+
+    /* This should be impossible, but it doesn't hurt to check */
+    if (client->waitDispatch)
+        goto done;
+
+    VIR_DEBUG("Event fired %p %d", sock, events);
+
+    if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) {
+        VIR_DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or "
+                  "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__);
+        virNetSocketRemoveIOCallback(sock);
+        goto done;
+    }
+
+    if (virNetClientIOHandleInput(client) < 0)
+        VIR_DEBUG0("Something went wrong during async message processing");
+
+done:
+    virNetClientUnlock(client);
+}
+
+
+int virNetClientSend(virNetClientPtr client,
+                     virNetMessagePtr msg,
+                     bool expectReply)
+{
+    virNetClientCallPtr call;
+    int ret = -1;
+
+    if (VIR_ALLOC(call) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    virNetClientLock(client);
+
+    if (virCondInit(&call->cond) < 0) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("cannot initialize condition variable"));
+        goto cleanup;
+    }
+
+    call->mode = VIR_NET_CLIENT_MODE_WAIT_TX;
+    call->msg = msg;
+    call->expectReply = expectReply;
+
+    ret = virNetClientIO(client, call);
+
+cleanup:
+    VIR_FREE(call);
+    virNetClientUnlock(client);
+    return ret;
+}
diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h
new file mode 100644
index 0000000..cd6a20f
--- /dev/null
+++ b/src/rpc/virnetclient.h
@@ -0,0 +1,60 @@
+
+
+#ifndef __VIR_NET_CLIENT_H__
+#define __VIR_NET_CLIENT_H__
+
+#include <stdbool.h>
+
+#include "virnettlscontext.h"
+#include "virnetmessage.h"
+#if HAVE_SASL
+#include "virnetclientsaslcontext.h"
+#endif
+#include "virnetclientprogram.h"
+
+
+virNetClientPtr virNetClientNewUNIX(const char *path,
+                                    bool spawnDaemon,
+                                    const char *daemon);
+
+virNetClientPtr virNetClientNewTCP(const char *nodename,
+                                   const char *service);
+
+virNetClientPtr virNetClientNewSSH(const char *nodename,
+                                   const char *service,
+                                   const char *binary,
+                                   const char *username,
+                                   bool noTTY,
+                                   const char *netcat,
+                                   const char *path);
+
+virNetClientPtr virNetClientNewCommand(const char **cmdargv,
+                                       const char **cmdenv);
+
+void virNetClientRef(virNetClientPtr client);
+
+int virNetClientAddProgram(virNetClientPtr client,
+                           virNetClientProgramPtr prog);
+
+int virNetClientSend(virNetClientPtr client,
+                     virNetMessagePtr msg,
+                     bool expectReply);
+
+#if HAVE_SASL
+void virNetClientSetSASLContext(virNetClientPtr client,
+                                virNetClientSaslContextPtr ctxt);
+#endif
+
+int virNetClientSetTLSSession(virNetClientPtr client,
+                              virNetTLSContextPtr tls);
+
+bool virNetClientIsEncrypted(virNetClientPtr client);
+
+const char *virNetClientLocalAddrString(virNetClientPtr client);
+const char *virNetClientRemoteAddrString(virNetClientPtr client);
+
+int virNetClientGetTLSKeySize(virNetClientPtr client);
+
+void virNetClientFree(virNetClientPtr client);
+
+#endif /* __VIR_NET_CLIENT_H__ */
diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c
new file mode 100644
index 0000000..eb918f0
--- /dev/null
+++ b/src/rpc/virnetclientprogram.c
@@ -0,0 +1,258 @@
+
+#include <config.h>
+
+#include "virnetclientprogram.h"
+#include "virnetclient.h"
+#include "virnetprotocol.h"
+
+#include "memory.h"
+#include "virterror_internal.h"
+#include "logging.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(NULL, VIR_FROM_RPC, code, __FILE__,      \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virNetClientProgram {
+    int refs;
+
+    unsigned program;
+    unsigned version;
+    virNetClientProgramEventPtr events;
+    size_t nevents;
+    void *eventOpaque;
+    virNetClientProgramErrorHanderPtr err;
+};
+
+virNetClientProgramPtr virNetClientProgramNew(unsigned program,
+                                              unsigned version,
+                                              virNetClientProgramEventPtr events,
+                                              size_t nevents,
+                                              void *eventOpaque,
+                                              virNetClientProgramErrorHanderPtr err)
+{
+    virNetClientProgramPtr prog;
+
+    if (VIR_ALLOC(prog) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    prog->refs = 1;
+    prog->program = program;
+    prog->version = version;
+    prog->events = events;
+    prog->nevents = nevents;
+    prog->eventOpaque = eventOpaque;
+    prog->err = err;
+
+    return prog;
+}
+
+
+void virNetClientProgramRef(virNetClientProgramPtr prog)
+{
+    prog->refs++;
+}
+
+
+void virNetClientProgramFree(virNetClientProgramPtr prog)
+{
+    if (!prog)
+        return;
+
+    prog->refs--;
+    if (prog->refs > 0)
+        return;
+
+    VIR_FREE(prog);
+}
+
+
+int virNetClientProgramMatches(virNetClientProgramPtr prog,
+                               virNetMessagePtr msg)
+{
+    if (prog->program == msg->header.prog &&
+        prog->version == msg->header.vers)
+        return 1;
+    return 0;
+}
+
+
+static int virNetClientProgramDispatchError(virNetClientProgramPtr prog,
+                                            virNetMessagePtr msg)
+{
+    char *err;
+    int ret = -1;
+
+    if (VIR_ALLOC_N(err, prog->err->len) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    if (virNetMessageDecodePayload(msg, prog->err->filter, err) < 0)
+        goto cleanup;
+
+    prog->err->func(prog, err);
+
+    ret = 0;
+
+cleanup:
+    VIR_FREE(err);
+    return ret;
+}
+
+static virNetClientProgramEventPtr virNetClientProgramGetEvent(virNetClientProgramPtr prog,
+                                                               int procedure)
+{
+    int i;
+
+    for (i = 0 ; i < prog->nevents ; i++) {
+        if (prog->events[i].proc == procedure)
+            return &prog->events[i];
+    }
+
+    return NULL;
+}
+
+
+int virNetClientProgramDispatch(virNetClientProgramPtr prog,
+                                virNetClientPtr client,
+                                virNetMessagePtr msg)
+{
+    virNetClientProgramEventPtr event;
+    char *evdata;
+
+    VIR_DEBUG("prog=%d ver=%d type=%d status=%d serial=%d proc=%d",
+              msg->header.prog, msg->header.vers, msg->header.type,
+              msg->header.status, msg->header.serial, msg->header.proc);
+
+    /* Check version, etc. */
+    if (msg->header.prog != prog->program) {
+        VIR_ERROR(_("program mismatch in event (actual %x, expected %x)"),
+                  msg->header.prog, prog->program);
+        return -1;
+    }
+
+    if (msg->header.vers != prog->version) {
+        VIR_ERROR(_("version mismatch in event (actual %x, expected %x)"),
+                  msg->header.vers, prog->version);
+        return -1;
+    }
+
+    if (msg->header.status != VIR_NET_OK) {
+        VIR_ERROR(_("status mismatch in event (actual %x, expected %x)"),
+                  msg->header.status, VIR_NET_OK);
+        return -1;
+    }
+
+    if (msg->header.type != VIR_NET_MESSAGE) {
+        VIR_ERROR(_("type mismatch in event (actual %x, expected %x)"),
+                  msg->header.type, VIR_NET_MESSAGE);
+        return -1;
+    }
+
+    event = virNetClientProgramGetEvent(prog, msg->header.proc);
+
+    if (!event) {
+        VIR_ERROR(_("No event expected with procedure %x"),
+                  msg->header.proc);
+        return -1;
+    }
+
+    if (VIR_ALLOC_N(evdata, event->msg_len) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    if (virNetMessageDecodePayload(msg, event->msg_filter, evdata) < 0)
+        goto cleanup;
+
+    event->func(prog, client, &evdata, prog->eventOpaque);
+
+    xdr_free(event->msg_filter, evdata);
+
+cleanup:
+    VIR_FREE(evdata);
+    return 0;
+}
+
+
+int virNetClientProgramCall(virNetClientProgramPtr prog,
+                            virNetClientPtr client,
+                            unsigned serial,
+                            int proc,
+                            xdrproc_t args_filter, void *args,
+                            xdrproc_t ret_filter, void *ret)
+{
+    virNetMessagePtr msg;
+
+    if (VIR_ALLOC(msg) < 0) {
+        virReportOOMError();
+        return -1;
+    }
+
+    msg->header.prog = prog->program;
+    msg->header.vers = prog->version;
+    msg->header.status = VIR_NET_OK;
+    msg->header.type = VIR_NET_CALL;
+    msg->header.serial = serial;
+    msg->header.proc = proc;
+
+    if (virNetMessageEncodeHeader(msg) < 0)
+        goto error;
+
+    if (virNetMessageEncodePayload(msg, args_filter, args) < 0)
+        goto error;
+
+    if (virNetClientSend(client, msg, true) < 0)
+        goto error;
+
+    /* None of these 3 should ever happen here, because
+     * virNetClientSend should have validated the reply,
+     * but it doesn't hurt to check again.
+     */
+    if (msg->header.type != VIR_NET_REPLY) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("Unexpected message type %d"), msg->header.type);
+        goto error;
+    }
+    if (msg->header.proc != proc) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("Unexpected message proc %d != %d"),
+                    msg->header.proc, proc);
+        goto error;
+    }
+    if (msg->header.serial != serial) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("Unexpected message serial %d != %d"),
+                    msg->header.serial, serial);
+        goto error;
+    }
+
+    switch (msg->header.status) {
+    case VIR_NET_OK:
+        if (virNetMessageDecodePayload(msg, ret_filter, ret) < 0)
+            goto error;
+        break;
+
+    case VIR_NET_ERROR:
+        virNetClientProgramDispatchError(prog, msg);
+        goto error;
+
+    default:
+        virNetError(VIR_ERR_RPC,
+                    _("Unexpected message status %d"), msg->header.status);
+        goto error;
+    }
+
+    VIR_FREE(msg);
+
+    return 0;
+
+error:
+    VIR_FREE(msg);
+    return -1;
+}
diff --git a/src/rpc/virnetclientprogram.h b/src/rpc/virnetclientprogram.h
new file mode 100644
index 0000000..5a5b937
--- /dev/null
+++ b/src/rpc/virnetclientprogram.h
@@ -0,0 +1,71 @@
+
+#ifndef __VIR_NET_CLIENT_PROGRAM_H__
+#define __VIR_NET_CLIENT_PROGRAM_H__
+
+#include <rpc/types.h>
+#include <rpc/xdr.h>
+
+#include "virnetmessage.h"
+
+typedef struct _virNetClient virNetClient;
+typedef virNetClient *virNetClientPtr;
+
+typedef struct _virNetClientProgram virNetClientProgram;
+typedef virNetClientProgram *virNetClientProgramPtr;
+
+typedef struct _virNetClientProgramEvent virNetClientProgramEvent;
+typedef virNetClientProgramEvent *virNetClientProgramEventPtr;
+
+typedef struct _virNetClientProgramErrorHandler virNetClientProgramErrorHander;
+typedef virNetClientProgramErrorHander *virNetClientProgramErrorHanderPtr;
+
+typedef int (*virNetClientProgramErrorFunc)(virNetClientProgramPtr prog,
+                                            void *rerr);
+
+struct _virNetClientProgramErrorHandler {
+    virNetClientProgramErrorFunc func;
+    size_t len;
+    xdrproc_t filter;
+};
+
+
+typedef void (*virNetClientProgramDispatchFunc)(virNetClientProgramPtr prog,
+                                                virNetClientPtr client,
+                                                void *msg,
+                                                void *opaque);
+
+struct _virNetClientProgramEvent {
+    int proc;
+    virNetClientProgramDispatchFunc func;
+    size_t msg_len;
+    xdrproc_t msg_filter;
+};
+
+virNetClientProgramPtr virNetClientProgramNew(unsigned program,
+                                              unsigned version,
+                                              virNetClientProgramEventPtr events,
+                                              size_t nevents,
+                                              void *eventOpaque,
+                                              virNetClientProgramErrorHanderPtr err);
+
+void virNetClientProgramRef(virNetClientProgramPtr prog);
+
+void virNetClientProgramFree(virNetClientProgramPtr prog);
+
+int virNetClientProgramMatches(virNetClientProgramPtr prog,
+                               virNetMessagePtr msg);
+
+int virNetClientProgramDispatch(virNetClientProgramPtr prog,
+                                virNetClientPtr client,
+                                virNetMessagePtr msg);
+
+int virNetClientProgramCall(virNetClientProgramPtr prog,
+                            virNetClientPtr client,
+                            unsigned serial,
+                            int proc,
+                            xdrproc_t args_filter, void *args,
+                            xdrproc_t ret_filter, void *ret);
+
+
+
+#endif /* __VIR_NET_CLIENT_PROGRAM_H__ */
diff --git a/src/rpc/virnetclientsaslcontext.c b/src/rpc/virnetclientsaslcontext.c
new file mode 100644
index 0000000..757cd72
--- /dev/null
+++ b/src/rpc/virnetclientsaslcontext.c
@@ -0,0 +1,246 @@
+
+#include <config.h>
+
+#include "virnetclientsaslcontext.h"
+
+#include "virterror_internal.h"
+#include "memory.h"
+#include "logging.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(NULL, VIR_FROM_RPC, code, __FILE__,      \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+
+struct _virNetClientSaslContext {
+    sasl_conn_t *conn;
+    int refs;
+};
+
+virNetClientSaslContextPtr virNetClientSaslContextNew(const char *service,
+                                                      const char *hostname,
+                                                      const char *localAddr,
+                                                      const char *remoteAddr,
+                                                      const sasl_callback_t *cbs)
+{
+    virNetClientSaslContextPtr sasl = NULL;
+    int err;
+
+    err = sasl_client_init(NULL);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_AUTH_FAILED,
+                    _("failed to initialize SASL library: %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        goto cleanup;
+    }
+
+    if (VIR_ALLOC(sasl) < 0) {
+        virReportOOMError();
+        goto cleanup;
+    }
+
+    sasl->refs = 1;
+
+    err = sasl_client_new(service,
+                          hostname,
+                          localAddr,
+                          remoteAddr,
+                          cbs,
+                          SASL_SUCCESS_DATA,
+                          &sasl->conn);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_AUTH_FAILED,
+                    _("Failed to create SASL client context: %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        goto cleanup;
+    }
+
+    return sasl;
+
+cleanup:
+    virNetClientSaslContextFree(sasl);
+    return NULL;
+}
+
+void virNetClientSaslContextRef(virNetClientSaslContextPtr sasl)
+{
+    sasl->refs++;
+}
+
+int virNetClientSaslContextExtKeySize(virNetClientSaslContextPtr sasl,
+                                      int ssf)
+{
+    int err;
+
+    err = sasl_setprop(sasl->conn, SASL_SSF_EXTERNAL, &ssf);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("cannot set external SSF %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        return -1;
+    }
+    return 0;
+}
+
+int virNetClientSaslContextGetKeySize(virNetClientSaslContextPtr sasl)
+{
+    int err;
+    int ssf;
+    const void *val;
+    err = sasl_getprop(sasl->conn, SASL_SSF, &val);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_AUTH_FAILED,
+                    _("cannot query SASL ssf on connection %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        return -1;
+    }
+    ssf = *(const int *)val;
+    return ssf;
+}
+
+int virNetClientSaslContextSecProps(virNetClientSaslContextPtr sasl,
+                                    int minSSF,
+                                    int maxSSF,
+                                    bool allowAnonymous)
+{
+    sasl_security_properties_t secprops;
+    int err;
+
+    memset (&secprops, 0, sizeof secprops);
+
+    secprops.min_ssf = minSSF;
+    secprops.max_ssf = maxSSF;
+    secprops.maxbufsize = 100000;
+    secprops.security_flags = allowAnonymous ? 0 :
+        SASL_SEC_NOANONYMOUS | SASL_SEC_NOPLAINTEXT;
+
+    err = sasl_setprop(sasl->conn, SASL_SEC_PROPS, &secprops);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("cannot set security props %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        return -1;
+    }
+
+    return 0;
+}
+
+
+int virNetClientSaslContextStart(virNetClientSaslContextPtr sasl,
+                                 const char *mechlist,
+                                 sasl_interact_t **prompt_need,
+                                 const char **clientout,
+                                 size_t *clientoutlen,
+                                 const char **mech)
+{
+    int err = sasl_client_start(sasl->conn,
+                                mechlist,
+                                prompt_need,
+                                clientout,
+                                (unsigned *)clientoutlen,
+                                mech);
+
+    switch (err) {
+    case SASL_OK:
+        return VIR_NET_CLIENT_SASL_COMPLETE;
+    case SASL_CONTINUE:
+        return VIR_NET_CLIENT_SASL_CONTINUE;
+    case SASL_INTERACT:
+        return VIR_NET_CLIENT_SASL_INTERACT;
+
+    default:
+        virNetError(VIR_ERR_AUTH_FAILED,
+                    _("Failed to start SASL negotiation: %d (%s)"),
+                    err, sasl_errdetail(sasl->conn));
+        return -1;
+    }
+}
+
+
+int virNetClientSaslContextStep(virNetClientSaslContextPtr sasl,
+                                const char *serverin,
+                                size_t serverinlen,
+                                sasl_interact_t **prompt_need,
+                                const char **clientout,
+                                size_t *clientoutlen)
+{
+    int err = sasl_client_step(sasl->conn,
+                               serverin,
+                               (unsigned)serverinlen,
+                               prompt_need,
+                               clientout,
+                               (unsigned *)clientoutlen);
+
+
+    switch (err) {
+    case SASL_OK:
+        return VIR_NET_CLIENT_SASL_COMPLETE;
+    case SASL_CONTINUE:
+        return VIR_NET_CLIENT_SASL_CONTINUE;
+    case SASL_INTERACT:
+        return VIR_NET_CLIENT_SASL_INTERACT;
+
+    default:
+        virNetError(VIR_ERR_AUTH_FAILED,
+                    _("Failed to start SASL negotiation: %d (%s)"),
+                    err, sasl_errdetail(sasl->conn));
+        return -1;
+    }
+}
+
+ssize_t virNetClientSaslContextEncode(virNetClientSaslContextPtr sasl,
+                                      const char *input,
+                                      size_t inputLen,
+                                      const char **output,
+                                      size_t *outputlen)
+{
+    int err;
+    err = sasl_encode(sasl->conn,
+                      input, (unsigned)inputLen,
+                      output, (unsigned *)outputlen);
+
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("failed to encode SASL data: %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        return -1;
+    }
+    return 0;
+}
+
+ssize_t virNetClientSaslContextDecode(virNetClientSaslContextPtr sasl,
+                                      const char *input,
+                                      size_t inputLen,
+                                      const char **output,
+                                      size_t *outputlen)
+{
+    int err;
+    err = sasl_decode(sasl->conn,
+                      input, (unsigned)inputLen,
+                      output, (unsigned *)outputlen);
+    if (err != SASL_OK) {
+        virNetError(VIR_ERR_INTERNAL_ERROR,
+                    _("failed to decode SASL data: %d (%s)"),
+                    err, sasl_errstring(err, NULL, NULL));
+        return -1;
+    }
+    return 0;
+}
+
+void virNetClientSaslContextFree(virNetClientSaslContextPtr sasl)
+{
+    if (!sasl)
+        return;
+
+    sasl->refs--;
+    if (sasl->refs > 0)
+        return;
+
+    if (sasl->conn)
+        sasl_dispose(&sasl->conn);
+
+    VIR_FREE(sasl);
+}
+
diff --git a/src/rpc/virnetclientsaslcontext.h b/src/rpc/virnetclientsaslcontext.h
new file mode 100644
index 0000000..043ae58
--- /dev/null
+++ b/src/rpc/virnetclientsaslcontext.h
@@ -0,0 +1,66 @@
+
+#ifndef __VIR_NET_CLIENT_SASL_CONTEXT_H__
+# define __VIR_NET_CLIENT_SASL_CONTEXT_H__
+
+# include <sasl/sasl.h>
+
+# include <sys/types.h>
+
+#include "virnetsocket.h"
+
+typedef struct _virNetClientSaslContext virNetClientSaslContext;
+typedef virNetClientSaslContext *virNetClientSaslContextPtr;
+
+enum {
+    VIR_NET_CLIENT_SASL_COMPLETE,
+    VIR_NET_CLIENT_SASL_CONTINUE,
+    VIR_NET_CLIENT_SASL_INTERACT,
+};
+
+virNetClientSaslContextPtr virNetClientSaslContextNew(const char *service,
+                                                      const char *hostname,
+                                                      const char *localAddr,
+                                                      const char *remoteAddr,
+                                                      const sasl_callback_t *cbs);
+
+void virNetClientSaslContextRef(virNetClientSaslContextPtr sasl);
+
+int virNetClientSaslContextExtKeySize(virNetClientSaslContextPtr sasl,
+                                      int ssf);
+
+int virNetClientSaslContextGetKeySize(virNetClientSaslContextPtr sasl);
+
+int virNetClientSaslContextSecProps(virNetClientSaslContextPtr sasl,
+                                    int minSSF,
+                                    int maxSSF,
+                                    bool allowAnonymous);
+
+int virNetClientSaslContextStart(virNetClientSaslContextPtr sasl,
+                                 const char *mechlist,
+                                 sasl_interact_t **prompt_need,
+                                 const char **clientout,
+                                 size_t *clientoutlen,
+                                 const char **mech);
+
+int virNetClientSaslContextStep(virNetClientSaslContextPtr sasl,
+                                const char *serverin,
+                                size_t serverinlen,
+                                sasl_interact_t **prompt_need,
+                                const char **clientout,
+                                size_t *clientoutlen);
+
+ssize_t virNetClientSaslContextEncode(virNetClientSaslContextPtr sasl,
+                                      const char *input,
+                                      size_t inputLen,
+                                      const char **output,
+                                      size_t *outputlen);
+
+ssize_t virNetClientSaslContextDecode(virNetClientSaslContextPtr sasl,
+                                      const char *input,
+                                      size_t inputLen,
+                                      const char **output,
+                                      size_t *outputlen);
+
+void virNetClientSaslContextFree(virNetClientSaslContextPtr sasl);
+
+#endif /* __VIR_NET_CLIENT_SASL_CONTEXT_H__ */
-- 
1.7.2.3

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]