These APIs are used by both client and server RPC layer to handle processing of keepalive messages. --- po/POTFILES.in | 1 + src/Makefile.am | 3 +- src/rpc/virkeepalive.c | 464 ++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc/virkeepalive.h | 58 ++++++ 4 files changed, 525 insertions(+), 1 deletions(-) create mode 100644 src/rpc/virkeepalive.c create mode 100644 src/rpc/virkeepalive.h diff --git a/po/POTFILES.in b/po/POTFILES.in index 5ce35ae..71254dd 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c src/qemu/qemu_process.c src/remote/remote_client_bodies.h src/remote/remote_driver.c +src/rpc/virkeepalive.c src/rpc/virnetclient.c src/rpc/virnetclientprogram.c src/rpc/virnetclientstream.c diff --git a/src/Makefile.am b/src/Makefile.am index ff890e1..d983d28 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1326,7 +1326,8 @@ libvirt_net_rpc_la_SOURCES = \ rpc/virnetprotocol.h rpc/virnetprotocol.c \ rpc/virnetsocket.h rpc/virnetsocket.c \ rpc/virnettlscontext.h rpc/virnettlscontext.c \ - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \ + rpc/virkeepalive.h rpc/virkeepalive.c if HAVE_SASL libvirt_net_rpc_la_SOURCES += \ rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c new file mode 100644 index 0000000..5536b61 --- /dev/null +++ b/src/rpc/virkeepalive.c @@ -0,0 +1,464 @@ +/* + * virkeepalive.c: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@xxxxxxxxxx> + */ + +#include <config.h> + +#include "memory.h" +#include "threads.h" +#include "virfile.h" +#include "logging.h" +#include "util.h" +#include "virterror_internal.h" +#include "virnetsocket.h" +#include "virkeepaliveprotocol.h" +#include "virkeepalive.h" + +#define VIR_FROM_THIS VIR_FROM_RPC +#define virNetError(code, ...) \ + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ + __FUNCTION__, __LINE__, __VA_ARGS__) + +struct _virKeepAlive { + int refs; + virMutex lock; + + int interval; + unsigned int count; + unsigned int countToDeath; + time_t lastPacketReceived; + bool advertised; + int timer; + + virNetMessagePtr response; + int responseTimer; + + virKeepAliveSendFunc sendCB; + virKeepAliveDeadFunc deadCB; + virKeepAliveFreeFunc freeCB; + void *client; +}; + + +static void +virKeepAliveLock(virKeepAlivePtr ka) +{ + virMutexLock(&ka->lock); +} + +static void +virKeepAliveUnlock(virKeepAlivePtr ka) +{ + virMutexUnlock(&ka->lock); +} + + +static virNetMessagePtr +virKeepAliveMessage(int proc) +{ + virNetMessagePtr msg; + + if (!(msg = virNetMessageNew(false))) + return NULL; + + msg->header.prog = KEEPALIVE_PROGRAM; + msg->header.vers = KEEPALIVE_VERSION; + msg->header.type = VIR_NET_MESSAGE; + msg->header.proc = proc; + + if (virNetMessageEncodeHeader(msg) < 0 || + virNetMessageEncodePayloadEmpty(msg) < 0) { + virNetMessageFree(msg); + return NULL; + } + + return msg; +} + + +static int +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) +{ + int ret; + const char *proc; + void *client = ka->client; + virKeepAliveSendFunc sendCB = ka->sendCB; + + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + proc = "advertisement"; + break; + case KEEPALIVE_PROC_PING: + proc = "request"; + break; + case KEEPALIVE_PROC_PONG: + proc = "response"; + break; + } + + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); + + ka->refs++; + virKeepAliveUnlock(ka); + + if ((ret = sendCB(client, msg)) < 0) { + VIR_WARN("Failed to send keepalive %s to client %p", proc, client); + virNetMessageFree(msg); + } + + virKeepAliveLock(ka); + ka->refs--; + + return ret; +} + + +int +virKeepAliveAdvertise(virKeepAlivePtr ka) +{ + virNetMessagePtr msg; + int ret = -1; + + virKeepAliveLock(ka); + + VIR_DEBUG("Advertising keepalive support to client %p", ka->client); + + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_ADVERTISE))) { + VIR_WARN("Failed to generate keepalive advertisement"); + goto cleanup; + } + + if ((ret = virKeepAliveSend(ka, msg)) == 0) + ka->advertised = true; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +static void +virKeepAliveScheduleResponse(virKeepAlivePtr ka) +{ + if (ka->responseTimer == -1) + return; + + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); + + if (!ka->response && + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { + VIR_WARN("Failed to generate keepalive response"); + return; + } + + virEventUpdateTimeout(ka->responseTimer, 0); +} + + +static void +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + time_t now = time(NULL); + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago", + ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived); + + if (now - ka->lastPacketReceived < ka->interval - 1) { + int timeout = ka->interval - (now - ka->lastPacketReceived); + virEventUpdateTimeout(ka->timer, timeout * 1000); + goto cleanup; + } + + if (ka->countToDeath == 0) { + virKeepAliveDeadFunc deadCB = ka->deadCB; + void *client = ka->client; + + VIR_WARN("No response from client %p after %d keepalive messages in" + " %d seconds", + ka->client, + ka->count, + (int) (now - ka->lastPacketReceived)); + ka->refs++; + virKeepAliveUnlock(ka); + deadCB(client); + virKeepAliveLock(ka); + ka->refs--; + } else { + virNetMessagePtr msg; + + ka->countToDeath--; + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) + VIR_WARN("Failed to generate keepalive request"); + else + ignore_value(virKeepAliveSend(ka, msg)); + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + } + +cleanup: + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) +{ + virKeepAlivePtr ka = opaque; + virNetMessagePtr msg; + + virKeepAliveLock(ka); + + VIR_DEBUG("ka=%p, client=%p, response=%p", + ka, ka->client, ka->response); + + if (ka->response) { + msg = ka->response; + ka->response = NULL; + ignore_value(virKeepAliveSend(ka, msg)); + } + + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); + + virKeepAliveUnlock(ka); +} + + +static void +virKeepAliveTimerFree(void *opaque) +{ + virKeepAliveFree(opaque); +} + + +virKeepAlivePtr +virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) +{ + virKeepAlivePtr ka; + + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count); + + if (VIR_ALLOC(ka) < 0) { + virReportOOMError(); + return NULL; + } + + if (virMutexInit(&ka->lock) < 0) { + VIR_FREE(ka); + return NULL; + } + + ka->refs = 1; + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + ka->timer = -1; + ka->client = client; + ka->sendCB = sendCB; + ka->deadCB = deadCB; + ka->freeCB = freeCB; + + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, + ka, virKeepAliveTimerFree); + if (ka->responseTimer < 0) { + virKeepAliveFree(ka); + return NULL; + } + /* the timer now has a reference to ka */ + ka->refs++; + + return ka; +} + + +void +virKeepAliveRef(virKeepAlivePtr ka) +{ + virKeepAliveLock(ka); + ka->refs++; + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + virKeepAliveUnlock(ka); +} + + +void +virKeepAliveFree(virKeepAlivePtr ka) +{ + if (!ka) + return; + + virKeepAliveLock(ka); + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); + if (--ka->refs > 0) { + virKeepAliveUnlock(ka); + return; + } + + virMutexDestroy(&ka->lock); + ka->freeCB(ka->client); + VIR_FREE(ka); +} + + +int +virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count) +{ + int ret = -1; + time_t delay; + int timeout; + + VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u", + ka, ka->client, interval, count); + + virKeepAliveLock(ka); + + if (!ka->advertised) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive support was not advertised to remote party")); + goto cleanup; + } + + if (ka->timer >= 0) { + VIR_DEBUG("Keepalive messages already enabled"); + ret = 0; + goto cleanup; + } + + if (interval > 0) { + if (ka->interval > 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("keepalive interval already set")); + goto cleanup; + } + ka->interval = interval; + ka->count = count; + ka->countToDeath = count; + } + + if (ka->interval <= 0) { + VIR_DEBUG("Keepalive messages disabled by configuration"); + ret = 0; + goto cleanup; + } + + VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u", + ka->interval, ka->count); + + delay = time(NULL) - ka->lastPacketReceived; + if (delay > ka->interval) + timeout = 0; + else + timeout = ka->interval - delay; + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, + ka, virKeepAliveTimerFree); + if (ka->timer < 0) + goto cleanup; + + /* the timer now has another reference to this object */ + ka->refs++; + ret = 0; + +cleanup: + virKeepAliveUnlock(ka); + return ret; +} + + +void +virKeepAliveStop(virKeepAlivePtr ka) +{ + VIR_DEBUG("ka=%p, client=%p", ka, ka->client); + + virKeepAliveLock(ka); + if (ka->timer > 0) { + virEventRemoveTimeout(ka->timer); + ka->timer = -1; + } + if (ka->responseTimer > 0) { + virEventRemoveTimeout(ka->responseTimer); + ka->responseTimer = -1; + } + virKeepAliveUnlock(ka); +} + + +bool +virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg) +{ + bool ret = false; + bool start = false; + + VIR_DEBUG("ka=%p, client=%p, msg=%p", + ka, ka ? ka->client : "(null)", msg); + + if (!ka) + return false; + + virKeepAliveLock(ka); + + ka->countToDeath = ka->count; + ka->lastPacketReceived = time(NULL); + + if (msg->header.prog == KEEPALIVE_PROGRAM && + msg->header.vers == KEEPALIVE_VERSION && + msg->header.type == VIR_NET_MESSAGE) { + ret = true; + switch (msg->header.proc) { + case KEEPALIVE_PROC_ADVERTISE: + VIR_DEBUG("Client %p advertises keepalive support", ka->client); + ka->advertised = true; + start = true; + break; + + case KEEPALIVE_PROC_PING: + VIR_DEBUG("Got keepalive request from client %p", ka->client); + virKeepAliveScheduleResponse(ka); + break; + + case KEEPALIVE_PROC_PONG: + VIR_DEBUG("Got keepalive response from client %p", ka->client); + break; + + default: + VIR_DEBUG("Ignoring unknown keepalive message %d from client %p", + msg->header.proc, ka->client); + } + } + + if (ka->timer >= 0) + virEventUpdateTimeout(ka->timer, ka->interval * 1000); + + virKeepAliveUnlock(ka); + + if (start && virKeepAliveStart(ka, 0, 0) < 0) + VIR_WARN("Failed to start keepalive protocol"); + + return ret; +} diff --git a/src/rpc/virkeepalive.h b/src/rpc/virkeepalive.h new file mode 100644 index 0000000..ac96859 --- /dev/null +++ b/src/rpc/virkeepalive.h @@ -0,0 +1,58 @@ +/* + * virkeepalive.h: keepalive handling + * + * Copyright (C) 2011 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Jiri Denemark <jdenemar@xxxxxxxxxx> + */ + +#ifndef __VIR_KEEPALIVE_H__ +# define __VIR_KEEPALIVE_H__ + +# include "virnetmessage.h" + +typedef int (*virKeepAliveSendFunc)(void *client, virNetMessagePtr msg); +typedef void (*virKeepAliveDeadFunc)(void *client); +typedef void (*virKeepAliveFreeFunc)(void *client); + +typedef struct _virKeepAlive virKeepAlive; +typedef virKeepAlive *virKeepAlivePtr; + + +virKeepAlivePtr virKeepAliveNew(int interval, + unsigned int count, + void *client, + virKeepAliveSendFunc sendCB, + virKeepAliveDeadFunc deadCB, + virKeepAliveFreeFunc freeCB) + ATTRIBUTE_NONNULL(3) ATTRIBUTE_NONNULL(4) + ATTRIBUTE_NONNULL(5) ATTRIBUTE_NONNULL(6); + +void virKeepAliveRef(virKeepAlivePtr ka); +void virKeepAliveFree(virKeepAlivePtr ka); + +int virKeepAliveAdvertise(virKeepAlivePtr ka); + +int virKeepAliveStart(virKeepAlivePtr ka, + int interval, + unsigned int count); +void virKeepAliveStop(virKeepAlivePtr ka); + +bool virKeepAliveCheckMessage(virKeepAlivePtr ka, + virNetMessagePtr msg); + +#endif /* __VIR_KEEPALIVE_H__ */ -- 1.7.6.1 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list