On Wed, Oct 12, 2011 at 07:16:20AM +0200, Jiri Denemark wrote: > These APIs are used by both client and server RPC layer to handle > processing of keepalive messages. > --- > Notes: > Version 3: > - remove ADVERTISE message handling > > Version 2: > - no change > > po/POTFILES.in | 1 + > src/Makefile.am | 3 +- > src/rpc/virkeepalive.c | 426 ++++++++++++++++++++++++++++++++++++++++++++++++ > src/rpc/virkeepalive.h | 56 +++++++ > 4 files changed, 485 insertions(+), 1 deletions(-) > create mode 100644 src/rpc/virkeepalive.c > create mode 100644 src/rpc/virkeepalive.h > > diff --git a/po/POTFILES.in b/po/POTFILES.in > index 5ce35ae..71254dd 100644 > --- a/po/POTFILES.in > +++ b/po/POTFILES.in > @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c > src/qemu/qemu_process.c > src/remote/remote_client_bodies.h > src/remote/remote_driver.c > +src/rpc/virkeepalive.c > src/rpc/virnetclient.c > src/rpc/virnetclientprogram.c > src/rpc/virnetclientstream.c > diff --git a/src/Makefile.am b/src/Makefile.am > index af07020..944629c 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -1370,7 +1370,8 @@ libvirt_net_rpc_la_SOURCES = \ > rpc/virnetprotocol.h rpc/virnetprotocol.c \ > rpc/virnetsocket.h rpc/virnetsocket.c \ > rpc/virnettlscontext.h rpc/virnettlscontext.c \ > - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c > + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \ > + rpc/virkeepalive.h rpc/virkeepalive.c > if HAVE_SASL > libvirt_net_rpc_la_SOURCES += \ > rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c > diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c > new file mode 100644 > index 0000000..44cc322 > --- /dev/null > +++ b/src/rpc/virkeepalive.c > @@ -0,0 +1,426 @@ > +/* > + * virkeepalive.c: keepalive handling > + * > + * Copyright (C) 2011 Red Hat, Inc. > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA > + * > + * Author: Jiri Denemark <jdenemar@xxxxxxxxxx> > + */ > + > +#include <config.h> > + > +#include "memory.h" > +#include "threads.h" > +#include "virfile.h" > +#include "logging.h" > +#include "util.h" > +#include "virterror_internal.h" > +#include "virnetsocket.h" > +#include "virkeepaliveprotocol.h" > +#include "virkeepalive.h" > + > +#define VIR_FROM_THIS VIR_FROM_RPC > +#define virNetError(code, ...) \ > + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \ > + __FUNCTION__, __LINE__, __VA_ARGS__) > + > +struct _virKeepAlive { > + int refs; > + virMutex lock; > + > + int interval; > + unsigned int count; > + unsigned int countToDeath; > + time_t lastPacketReceived; > + int timer; > + > + virNetMessagePtr response; > + int responseTimer; > + > + virKeepAliveSendFunc sendCB; > + virKeepAliveDeadFunc deadCB; > + virKeepAliveFreeFunc freeCB; > + void *client; > +}; > + > + > +static void > +virKeepAliveLock(virKeepAlivePtr ka) > +{ > + virMutexLock(&ka->lock); > +} > + > +static void > +virKeepAliveUnlock(virKeepAlivePtr ka) > +{ > + virMutexUnlock(&ka->lock); > +} > + > + > +static virNetMessagePtr > +virKeepAliveMessage(int proc) > +{ > + virNetMessagePtr msg; > + > + if (!(msg = virNetMessageNew(false))) > + return NULL; > + > + msg->header.prog = KEEPALIVE_PROGRAM; > + msg->header.vers = KEEPALIVE_VERSION; > + msg->header.type = VIR_NET_MESSAGE; > + msg->header.proc = proc; > + > + if (virNetMessageEncodeHeader(msg) < 0 || > + virNetMessageEncodePayloadEmpty(msg) < 0) { > + virNetMessageFree(msg); > + return NULL; > + } > + > + return msg; > +} > + > + > +static int > +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg) > +{ > + int ret; > + const char *proc = NULL; > + void *client = ka->client; > + virKeepAliveSendFunc sendCB = ka->sendCB; > + > + switch (msg->header.proc) { > + case KEEPALIVE_PROC_PING: > + proc = "request"; > + break; > + case KEEPALIVE_PROC_PONG: > + proc = "response"; > + break; > + } > + > + if (!proc) { > + VIR_WARN("Refusing to send unknown keepalive message: %d", > + msg->header.proc); > + return -1; This exit path requires the caller to free 'msg' > + } > + > + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client); > + > + ka->refs++; > + virKeepAliveUnlock(ka); > + > + if ((ret = sendCB(client, msg)) < 0) { > + VIR_WARN("Failed to send keepalive %s to client %p", proc, client); > + virNetMessageFree(msg); Where as this exit path free's the msg itself. > + } > + > + virKeepAliveLock(ka); > + ka->refs--; > + > + return ret; > +} > + > + > +static void > +virKeepAliveScheduleResponse(virKeepAlivePtr ka) > +{ > + if (ka->responseTimer == -1) > + return; > + > + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client); > + > + if (!ka->response && > + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) { > + VIR_WARN("Failed to generate keepalive response"); > + return; > + } > + > + virEventUpdateTimeout(ka->responseTimer, 0); > +} > + > + > +static void > +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque) > +{ > + virKeepAlivePtr ka = opaque; > + time_t now = time(NULL); > + > + virKeepAliveLock(ka); > + > + VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago", > + ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived); > + > + if (now - ka->lastPacketReceived < ka->interval - 1) { > + int timeout = ka->interval - (now - ka->lastPacketReceived); > + virEventUpdateTimeout(ka->timer, timeout * 1000); > + goto cleanup; > + } > + > + if (ka->countToDeath == 0) { > + virKeepAliveDeadFunc deadCB = ka->deadCB; > + void *client = ka->client; > + > + VIR_WARN("No response from client %p after %d keepalive messages in" > + " %d seconds", > + ka->client, > + ka->count, > + (int) (now - ka->lastPacketReceived)); > + ka->refs++; > + virKeepAliveUnlock(ka); > + deadCB(client); > + virKeepAliveLock(ka); > + ka->refs--; > + } else { > + virNetMessagePtr msg; > + > + ka->countToDeath--; > + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING))) > + VIR_WARN("Failed to generate keepalive request"); > + else > + ignore_value(virKeepAliveSend(ka, msg)); This might need to change depending on how you fix the return handling of this method wrt free'ing msg. > + virEventUpdateTimeout(ka->timer, ka->interval * 1000); > + } > + > +cleanup: > + virKeepAliveUnlock(ka); > +} > + > + > +static void > +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque) > +{ > + virKeepAlivePtr ka = opaque; > + virNetMessagePtr msg; > + > + virKeepAliveLock(ka); > + > + VIR_DEBUG("ka=%p, client=%p, response=%p", > + ka, ka->client, ka->response); > + > + if (ka->response) { > + msg = ka->response; > + ka->response = NULL; > + ignore_value(virKeepAliveSend(ka, msg)); Likewise possible change needed here. > + } > + > + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1); > + > + virKeepAliveUnlock(ka); > +} > + > + > +static void > +virKeepAliveTimerFree(void *opaque) > +{ > + virKeepAliveFree(opaque); > +} > + > + > +virKeepAlivePtr > +virKeepAliveNew(int interval, > + unsigned int count, > + void *client, > + virKeepAliveSendFunc sendCB, > + virKeepAliveDeadFunc deadCB, > + virKeepAliveFreeFunc freeCB) > +{ > + virKeepAlivePtr ka; > + > + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count); > + > + if (VIR_ALLOC(ka) < 0) { > + virReportOOMError(); > + return NULL; > + } > + > + if (virMutexInit(&ka->lock) < 0) { > + VIR_FREE(ka); > + return NULL; > + } > + > + ka->refs = 1; > + ka->interval = interval; > + ka->count = count; > + ka->countToDeath = count; > + ka->timer = -1; > + ka->client = client; > + ka->sendCB = sendCB; > + ka->deadCB = deadCB; > + ka->freeCB = freeCB; > + > + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer, > + ka, virKeepAliveTimerFree); > + if (ka->responseTimer < 0) { > + virKeepAliveFree(ka); > + return NULL; > + } > + /* the timer now has a reference to ka */ > + ka->refs++; > + > + return ka; > +} > + > + > +void > +virKeepAliveRef(virKeepAlivePtr ka) > +{ > + virKeepAliveLock(ka); > + ka->refs++; > + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); > + virKeepAliveUnlock(ka); > +} > + > + > +void > +virKeepAliveFree(virKeepAlivePtr ka) > +{ > + if (!ka) > + return; > + > + virKeepAliveLock(ka); > + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs); > + if (--ka->refs > 0) { > + virKeepAliveUnlock(ka); > + return; > + } > + > + virMutexDestroy(&ka->lock); > + ka->freeCB(ka->client); > + VIR_FREE(ka); > +} > + > + > +int > +virKeepAliveStart(virKeepAlivePtr ka, > + int interval, > + unsigned int count) > +{ > + int ret = -1; > + time_t delay; > + int timeout; > + > + VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u", > + ka, ka->client, interval, count); > + > + virKeepAliveLock(ka); > + > + if (ka->timer >= 0) { > + VIR_DEBUG("Keepalive messages already enabled"); > + ret = 0; > + goto cleanup; > + } > + > + if (interval > 0) { > + if (ka->interval > 0) { > + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", > + _("keepalive interval already set")); > + goto cleanup; > + } > + ka->interval = interval; > + ka->count = count; > + ka->countToDeath = count; > + } > + > + if (ka->interval <= 0) { > + VIR_DEBUG("Keepalive messages disabled by configuration"); > + ret = 0; > + goto cleanup; > + } > + > + VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u", > + ka->interval, ka->count); > + > + delay = time(NULL) - ka->lastPacketReceived; > + if (delay > ka->interval) > + timeout = 0; > + else > + timeout = ka->interval - delay; > + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer, > + ka, virKeepAliveTimerFree); > + if (ka->timer < 0) > + goto cleanup; > + > + /* the timer now has another reference to this object */ > + ka->refs++; > + ret = 0; > + > +cleanup: > + virKeepAliveUnlock(ka); > + return ret; > +} > + > + > +void > +virKeepAliveStop(virKeepAlivePtr ka) > +{ > + VIR_DEBUG("ka=%p, client=%p", ka, ka->client); > + > + virKeepAliveLock(ka); > + if (ka->timer > 0) { > + virEventRemoveTimeout(ka->timer); > + ka->timer = -1; > + } > + if (ka->responseTimer > 0) { > + virEventRemoveTimeout(ka->responseTimer); > + ka->responseTimer = -1; > + } > + virKeepAliveUnlock(ka); > +} Do we need to clear any dangling 'ka->response' object ? ACK if those questions are cleared up. Bonus points if you fancy inserting some DTrace/SystemTAP probes to the code as a later patch. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :| -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list