--- Notes: Version 5: - rebased on top of DanB's non-blocking patches; this is the only part that required non-trivial rebase so I'm posting it for additional review Version 4: - no changes Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 99 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 025d270..b4b2fe7 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -101,9 +101,13 @@ struct _virNetClient { size_t nstreams; virNetClientStreamPtr *streams; + + bool wantClose; }; +void virNetClientRequestClose(virNetClientPtr client); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -409,12 +413,14 @@ void virNetClientFree(virNetClientPtr client) } -void virNetClientClose(virNetClientPtr client) +static void +virNetClientCloseLocked(virNetClientPtr client) { - if (!client) + VIR_DEBUG("client=%p, sock=%p", client, client->sock); + + if (!client->sock) return; - virNetClientLock(client); virNetSocketRemoveIOCallback(client->sock); virNetSocketFree(client->sock); client->sock = NULL; @@ -424,6 +430,41 @@ void virNetClientClose(virNetClientPtr client) virNetSASLSessionFree(client->sasl); client->sasl = NULL; #endif + client->wantClose = false; +} + +void virNetClientClose(virNetClientPtr client) +{ + if (!client) + return; + + virNetClientLock(client); + virNetClientCloseLocked(client); + virNetClientUnlock(client); +} + +void +virNetClientRequestClose(virNetClientPtr client) +{ + VIR_DEBUG("client=%p", client); + + virNetClientLock(client); + + /* If there is a thread polling for data on the socket, set wantClose flag + * and wake the thread up or just immediately close the socket when no-one + * is polling on it. + */ + if (client->waitDispatch) { + char ignore = 1; + int len = sizeof(ignore); + + client->wantClose = true; + if (safewrite(client->wakeupSendFD, &ignore, len) != len) + VIR_ERROR(_("failed to wake up polling thread")); + } else { + virNetClientCloseLocked(client); + } + virNetClientUnlock(client); } @@ -1096,6 +1137,26 @@ static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, } +static void +virNetClientIOEventLoopRemoveAll(virNetClientPtr client, + virNetClientCallPtr thiscall) +{ + if (!client->waitDispatch) + return; + + if (client->waitDispatch == thiscall) { + /* just pretend nothing was sent and the caller will free the call */ + thiscall->sentSomeData = false; + } else { + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallRemove(&client->waitDispatch, call); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call->msg); + VIR_FREE(call); + } +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1110,7 +1171,12 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); + if (client->wantClose) { + virNetClientCloseLocked(client); + virNetClientIOEventLoopRemoveAll(client, thiscall); + } } @@ -1141,11 +1207,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, sigset_t oldmask, blockedsigs; int timeout = -1; - /* If we have existing SASL decoded data we - * don't want to sleep in the poll(), just - * check if any other FDs are also ready + /* If we have existing SASL decoded data we don't want to sleep in + * the poll(), just check if any other FDs are also ready. + * If the connection is going to be closed, we don't want to sleep in + * poll() either. */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; /* If there are any non-blocking calls in the queue, @@ -1208,6 +1275,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[0].revents |= POLLIN; } + /* If wantClose flag is set, pretend there was an error on the socket + */ + if (client->wantClose) + fds[0].revents = POLLERR; + if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { @@ -1441,7 +1513,8 @@ static int virNetClientIO(virNetClientPtr client, virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetClientIOUpdateCallback(client, true); + if (client->sock) + virNetClientIOUpdateCallback(client, true); if (rv == 0 && virGetLastError()) @@ -1467,7 +1540,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->haveTheBuck) + if (client->haveTheBuck || client->wantClose) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1528,6 +1601,12 @@ static int virNetClientSendInternal(virNetClientPtr client, virNetClientLock(client); + if (!client->sock || client->wantClose) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("client socket is closed")); + goto unlock; + } + if (virCondInit(&call->cond) < 0) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("cannot initialize condition variable")); @@ -1554,6 +1633,8 @@ cleanup: ignore_value(virCondDestroy(&call->cond)); VIR_FREE(call); } + +unlock: virNetClientUnlock(client); return ret; } -- 1.7.8.rc3 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list