[PATCH 2/9] Implement common keepalive handling

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



These APIs are used by both client and server RPC layer to handle
processing of keepalive messages.
---
 po/POTFILES.in         |    1 +
 src/Makefile.am        |    3 +-
 src/rpc/virkeepalive.c |  464 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/rpc/virkeepalive.h |   58 ++++++
 4 files changed, 525 insertions(+), 1 deletions(-)
 create mode 100644 src/rpc/virkeepalive.c
 create mode 100644 src/rpc/virkeepalive.h

diff --git a/po/POTFILES.in b/po/POTFILES.in
index 5ce35ae..71254dd 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c
 src/qemu/qemu_process.c
 src/remote/remote_client_bodies.h
 src/remote/remote_driver.c
+src/rpc/virkeepalive.c
 src/rpc/virnetclient.c
 src/rpc/virnetclientprogram.c
 src/rpc/virnetclientstream.c
diff --git a/src/Makefile.am b/src/Makefile.am
index ff890e1..d983d28 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1326,7 +1326,8 @@ libvirt_net_rpc_la_SOURCES = \
 	rpc/virnetprotocol.h rpc/virnetprotocol.c \
 	rpc/virnetsocket.h rpc/virnetsocket.c \
 	rpc/virnettlscontext.h rpc/virnettlscontext.c \
-	rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c
+	rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \
+	rpc/virkeepalive.h rpc/virkeepalive.c
 if HAVE_SASL
 libvirt_net_rpc_la_SOURCES += \
 	rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c
diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
new file mode 100644
index 0000000..5536b61
--- /dev/null
+++ b/src/rpc/virkeepalive.c
@@ -0,0 +1,464 @@
+/*
+ * virkeepalive.c: keepalive handling
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Jiri Denemark <jdenemar@xxxxxxxxxx>
+ */
+
+#include <config.h>
+
+#include "memory.h"
+#include "threads.h"
+#include "virfile.h"
+#include "logging.h"
+#include "util.h"
+#include "virterror_internal.h"
+#include "virnetsocket.h"
+#include "virkeepaliveprotocol.h"
+#include "virkeepalive.h"
+
+#define VIR_FROM_THIS VIR_FROM_RPC
+#define virNetError(code, ...)                                    \
+    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
+                         __FUNCTION__, __LINE__, __VA_ARGS__)
+
+struct _virKeepAlive {
+    int refs;
+    virMutex lock;
+
+    int interval;
+    unsigned int count;
+    unsigned int countToDeath;
+    time_t lastPacketReceived;
+    bool advertised;
+    int timer;
+
+    virNetMessagePtr response;
+    int responseTimer;
+
+    virKeepAliveSendFunc sendCB;
+    virKeepAliveDeadFunc deadCB;
+    virKeepAliveFreeFunc freeCB;
+    void *client;
+};
+
+
+static void
+virKeepAliveLock(virKeepAlivePtr ka)
+{
+    virMutexLock(&ka->lock);
+}
+
+static void
+virKeepAliveUnlock(virKeepAlivePtr ka)
+{
+    virMutexUnlock(&ka->lock);
+}
+
+
+static virNetMessagePtr
+virKeepAliveMessage(int proc)
+{
+    virNetMessagePtr msg;
+
+    if (!(msg = virNetMessageNew(false)))
+        return NULL;
+
+    msg->header.prog = KEEPALIVE_PROGRAM;
+    msg->header.vers = KEEPALIVE_VERSION;
+    msg->header.type = VIR_NET_MESSAGE;
+    msg->header.proc = proc;
+
+    if (virNetMessageEncodeHeader(msg) < 0 ||
+        virNetMessageEncodePayloadEmpty(msg) < 0) {
+        virNetMessageFree(msg);
+        return NULL;
+    }
+
+    return msg;
+}
+
+
+static int
+virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
+{
+    int ret;
+    const char *proc;
+    void *client = ka->client;
+    virKeepAliveSendFunc sendCB = ka->sendCB;
+
+    switch (msg->header.proc) {
+    case KEEPALIVE_PROC_ADVERTISE:
+        proc = "advertisement";
+        break;
+    case KEEPALIVE_PROC_PING:
+        proc = "request";
+        break;
+    case KEEPALIVE_PROC_PONG:
+        proc = "response";
+        break;
+    }
+
+    VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
+
+    ka->refs++;
+    virKeepAliveUnlock(ka);
+
+    if ((ret = sendCB(client, msg)) < 0) {
+        VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
+        virNetMessageFree(msg);
+    }
+
+    virKeepAliveLock(ka);
+    ka->refs--;
+
+    return ret;
+}
+
+
+int
+virKeepAliveAdvertise(virKeepAlivePtr ka)
+{
+    virNetMessagePtr msg;
+    int ret = -1;
+
+    virKeepAliveLock(ka);
+
+    VIR_DEBUG("Advertising keepalive support to client %p", ka->client);
+
+    if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_ADVERTISE))) {
+        VIR_WARN("Failed to generate keepalive advertisement");
+        goto cleanup;
+    }
+
+    if ((ret = virKeepAliveSend(ka, msg)) == 0)
+        ka->advertised = true;
+
+cleanup:
+    virKeepAliveUnlock(ka);
+    return ret;
+}
+
+
+static void
+virKeepAliveScheduleResponse(virKeepAlivePtr ka)
+{
+    if (ka->responseTimer == -1)
+        return;
+
+    VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
+
+    if (!ka->response &&
+        !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
+        VIR_WARN("Failed to generate keepalive response");
+        return;
+    }
+
+    virEventUpdateTimeout(ka->responseTimer, 0);
+}
+
+
+static void
+virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virKeepAlivePtr ka = opaque;
+    time_t now = time(NULL);
+
+    virKeepAliveLock(ka);
+
+    VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago",
+              ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived);
+
+    if (now - ka->lastPacketReceived < ka->interval - 1) {
+        int timeout = ka->interval - (now - ka->lastPacketReceived);
+        virEventUpdateTimeout(ka->timer, timeout * 1000);
+        goto cleanup;
+    }
+
+    if (ka->countToDeath == 0) {
+        virKeepAliveDeadFunc deadCB = ka->deadCB;
+        void *client = ka->client;
+
+        VIR_WARN("No response from client %p after %d keepalive messages in"
+                 " %d seconds",
+                 ka->client,
+                 ka->count,
+                 (int) (now - ka->lastPacketReceived));
+        ka->refs++;
+        virKeepAliveUnlock(ka);
+        deadCB(client);
+        virKeepAliveLock(ka);
+        ka->refs--;
+    } else {
+        virNetMessagePtr msg;
+
+        ka->countToDeath--;
+        if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING)))
+            VIR_WARN("Failed to generate keepalive request");
+        else
+            ignore_value(virKeepAliveSend(ka, msg));
+        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
+    }
+
+cleanup:
+    virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
+{
+    virKeepAlivePtr ka = opaque;
+    virNetMessagePtr msg;
+
+    virKeepAliveLock(ka);
+
+    VIR_DEBUG("ka=%p, client=%p, response=%p",
+              ka, ka->client, ka->response);
+
+    if (ka->response) {
+        msg = ka->response;
+        ka->response = NULL;
+        ignore_value(virKeepAliveSend(ka, msg));
+    }
+
+    virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
+
+    virKeepAliveUnlock(ka);
+}
+
+
+static void
+virKeepAliveTimerFree(void *opaque)
+{
+    virKeepAliveFree(opaque);
+}
+
+
+virKeepAlivePtr
+virKeepAliveNew(int interval,
+                unsigned int count,
+                void *client,
+                virKeepAliveSendFunc sendCB,
+                virKeepAliveDeadFunc deadCB,
+                virKeepAliveFreeFunc freeCB)
+{
+    virKeepAlivePtr ka;
+
+    VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);
+
+    if (VIR_ALLOC(ka) < 0) {
+        virReportOOMError();
+        return NULL;
+    }
+
+    if (virMutexInit(&ka->lock) < 0) {
+        VIR_FREE(ka);
+        return NULL;
+    }
+
+    ka->refs = 1;
+    ka->interval = interval;
+    ka->count = count;
+    ka->countToDeath = count;
+    ka->timer = -1;
+    ka->client = client;
+    ka->sendCB = sendCB;
+    ka->deadCB = deadCB;
+    ka->freeCB = freeCB;
+
+    ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
+                                           ka, virKeepAliveTimerFree);
+    if (ka->responseTimer < 0) {
+        virKeepAliveFree(ka);
+        return NULL;
+    }
+    /* the timer now has a reference to ka */
+    ka->refs++;
+
+    return ka;
+}
+
+
+void
+virKeepAliveRef(virKeepAlivePtr ka)
+{
+    virKeepAliveLock(ka);
+    ka->refs++;
+    VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
+    virKeepAliveUnlock(ka);
+}
+
+
+void
+virKeepAliveFree(virKeepAlivePtr ka)
+{
+    if (!ka)
+        return;
+
+    virKeepAliveLock(ka);
+    VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
+    if (--ka->refs > 0) {
+        virKeepAliveUnlock(ka);
+        return;
+    }
+
+    virMutexDestroy(&ka->lock);
+    ka->freeCB(ka->client);
+    VIR_FREE(ka);
+}
+
+
+int
+virKeepAliveStart(virKeepAlivePtr ka,
+                  int interval,
+                  unsigned int count)
+{
+    int ret = -1;
+    time_t delay;
+    int timeout;
+
+    VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u",
+              ka, ka->client, interval, count);
+
+    virKeepAliveLock(ka);
+
+    if (!ka->advertised) {
+        virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                    _("keepalive support was not advertised to remote party"));
+        goto cleanup;
+    }
+
+    if (ka->timer >= 0) {
+        VIR_DEBUG("Keepalive messages already enabled");
+        ret = 0;
+        goto cleanup;
+    }
+
+    if (interval > 0) {
+        if (ka->interval > 0) {
+            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
+                        _("keepalive interval already set"));
+            goto cleanup;
+        }
+        ka->interval = interval;
+        ka->count = count;
+        ka->countToDeath = count;
+    }
+
+    if (ka->interval <= 0) {
+        VIR_DEBUG("Keepalive messages disabled by configuration");
+        ret = 0;
+        goto cleanup;
+    }
+
+    VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u",
+              ka->interval, ka->count);
+
+    delay = time(NULL) - ka->lastPacketReceived;
+    if (delay > ka->interval)
+        timeout = 0;
+    else
+        timeout = ka->interval - delay;
+    ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
+                                   ka, virKeepAliveTimerFree);
+    if (ka->timer < 0)
+        goto cleanup;
+
+    /* the timer now has another reference to this object */
+    ka->refs++;
+    ret = 0;
+
+cleanup:
+    virKeepAliveUnlock(ka);
+    return ret;
+}
+
+
+void
+virKeepAliveStop(virKeepAlivePtr ka)
+{
+    VIR_DEBUG("ka=%p, client=%p", ka, ka->client);
+
+    virKeepAliveLock(ka);
+    if (ka->timer > 0) {
+        virEventRemoveTimeout(ka->timer);
+        ka->timer = -1;
+    }
+    if (ka->responseTimer > 0) {
+        virEventRemoveTimeout(ka->responseTimer);
+        ka->responseTimer = -1;
+    }
+    virKeepAliveUnlock(ka);
+}
+
+
+bool
+virKeepAliveCheckMessage(virKeepAlivePtr ka,
+                         virNetMessagePtr msg)
+{
+    bool ret = false;
+    bool start = false;
+
+    VIR_DEBUG("ka=%p, client=%p, msg=%p",
+              ka, ka ? ka->client : "(null)", msg);
+
+    if (!ka)
+        return false;
+
+    virKeepAliveLock(ka);
+
+    ka->countToDeath = ka->count;
+    ka->lastPacketReceived = time(NULL);
+
+    if (msg->header.prog == KEEPALIVE_PROGRAM &&
+        msg->header.vers == KEEPALIVE_VERSION &&
+        msg->header.type == VIR_NET_MESSAGE) {
+        ret = true;
+        switch (msg->header.proc) {
+        case KEEPALIVE_PROC_ADVERTISE:
+            VIR_DEBUG("Client %p advertises keepalive support", ka->client);
+            ka->advertised = true;
+            start = true;
+            break;
+
+        case KEEPALIVE_PROC_PING:
+            VIR_DEBUG("Got keepalive request from client %p", ka->client);
+            virKeepAliveScheduleResponse(ka);
+            break;
+
+        case KEEPALIVE_PROC_PONG:
+            VIR_DEBUG("Got keepalive response from client %p", ka->client);
+            break;
+
+        default:
+            VIR_DEBUG("Ignoring unknown keepalive message %d from client %p",
+                      msg->header.proc, ka->client);
+        }
+    }
+
+    if (ka->timer >= 0)
+        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
+
+    virKeepAliveUnlock(ka);
+
+    if (start && virKeepAliveStart(ka, 0, 0) < 0)
+        VIR_WARN("Failed to start keepalive protocol");
+
+    return ret;
+}
diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h
new file mode 100644
index 0000000..ac96859
--- /dev/null
+++ b/src/rpc/virkeepalive.h
@@ -0,0 +1,58 @@
+/*
+ * virkeepalive.h: keepalive handling
+ *
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
+ *
+ * Author: Jiri Denemark <jdenemar@xxxxxxxxxx>
+ */
+
+#ifndef __VIR_KEEPALIVE_H__
+# define __VIR_KEEPALIVE_H__
+
+# include "virnetmessage.h"
+
+typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg);
+typedef void (*virKeepAliveDeadFunc)(void *client);
+typedef void (*virKeepAliveFreeFunc)(void *client);
+
+typedef struct _virKeepAlive virKeepAlive;
+typedef virKeepAlive *virKeepAlivePtr;
+
+
+virKeepAlivePtr virKeepAliveNew(int interval,
+                                unsigned int count,
+                                void *client,
+                                virKeepAliveSendFunc sendCB,
+                                virKeepAliveDeadFunc deadCB,
+                                virKeepAliveFreeFunc freeCB)
+                                ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4)
+                                ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6);
+
+void virKeepAliveRef(virKeepAlivePtr ka);
+void virKeepAliveFree(virKeepAlivePtr ka);
+
+int virKeepAliveAdvertise(virKeepAlivePtr ka);
+
+int virKeepAliveStart(virKeepAlivePtr ka,
+                      int interval,
+                      unsigned int count);
+void virKeepAliveStop(virKeepAlivePtr ka);
+
+bool virKeepAliveCheckMessage(virKeepAlivePtr ka,
+                              virNetMessagePtr msg);
+
+#endif /* __VIR_KEEPALIVE_H__ */
-- 
1.7.6.1

--
libvir-list mailing list
libvir-list@xxxxxxxxxx
https://www.redhat.com/mailman/listinfo/libvir-list


[Index of Archives]     [Virt Tools]     [Libvirt Users]     [Lib OS Info]     [Fedora Users]     [Fedora Desktop]     [Fedora SELinux]     [Big List of Linux Books]     [Yosemite News]     [KDE Users]     [Fedora Tools]