So far, we were dropping non-blocking calls whenever sending them would block. In case a client is sending lots of stream calls (which are not supposed to generate any reply), the assumption that having other calls in a queue is sufficient to get a reply from the server doesn't work. I tried to fix this in b1e374a7ac56927cfe62435179bf0bba1e08b372 but failed and reverted that commit. With this patch, non-blocking calls are never dropped (unless the connection is being closed) and will always be sent. --- src/rpc/virnetclient.c | 164 +++++++++++++++++++++--------------------------- 1 file changed, 71 insertions(+), 93 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 3e661d2..614b469 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -58,7 +58,6 @@ struct _virNetClientCall { bool expectReply; bool nonBlock; bool haveThread; - bool sentSomeData; virCond cond; @@ -108,6 +107,10 @@ struct _virNetClient { }; +static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall); + + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client) virNetClientLock(client); - /* If there is a thread polling for data on the socket, set wantClose flag - * and wake the thread up or just immediately close the socket when no-one - * is polling on it. + client->wantClose = true; + + /* If there is a thread polling for data on the socket, wake the thread up + * otherwise try to pass the buck to a possibly waiting thread. If no + * thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the + * queue and close the client because we set client->wantClose. */ - if (client->waitDispatch) { + if (client->haveTheBuck) { char ignore = 1; size_t len = sizeof(ignore); - client->wantClose = true; if (safewrite(client->wakeupSendFD, &ignore, len) != len) VIR_ERROR(_("failed to wake up polling thread")); } else { - virNetClientCloseLocked(client); + virNetClientIOEventLoopPassTheBuck(client, NULL); } virNetClientUnlock(client); @@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); - if (ret > 0 || virNetSocketHasPendingData(client->sock)) - thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, } -static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, - void *opaque) +static bool +virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call, + void *opaque) { virNetClientCallPtr thiscall = opaque; - if (call == thiscall) - return false; - - if (!call->nonBlock) - return false; - - if (call->sentSomeData) { - /* - * If some data has been sent we must keep it in the list, - * but still wakeup any thread - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Keeping unfinished call %p in the list", call); - } - return false; - } else { - /* - * If no data has been sent, we can remove it from the list. - * Wakup any thread, otherwise free the caller ourselves - */ - if (call->haveThread) { - VIR_DEBUG("Waking up sleep %p", call); - virCondSignal(&call->cond); - } else { - VIR_DEBUG("Removing call %p", call); - if (call->expectReply) - VIR_WARN("Got a call expecting a reply but without a waiting thread"); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } + if (call != thiscall && call->nonBlock && call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + call->haveThread = false; + virCondSignal(&call->cond); return true; } + + return false; } -static void -virNetClientIOEventLoopRemoveAll(virNetClientPtr client, - virNetClientCallPtr thiscall) +static bool +virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call, + void *opaque) { - if (!client->waitDispatch) - return; + virNetClientCallPtr thiscall = opaque; - if (client->waitDispatch == thiscall) { - /* just pretend nothing was sent and the caller will free the call */ - thiscall->sentSomeData = false; - } else { - virNetClientCallPtr call = client->waitDispatch; - virNetClientCallRemove(&client->waitDispatch, call); - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call->msg); - VIR_FREE(call); - } + if (call == thiscall) + return false; + + VIR_DEBUG("Removing call %p", call); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call->msg); + VIR_FREE(call); + return true; } -static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) +static void +virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, + virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); virNetClientCallPtr tmp = client->waitDispatch; @@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli VIR_DEBUG("No thread to pass the buck to"); if (client->wantClose) { virNetClientCloseLocked(client); - virNetClientIOEventLoopRemoveAll(client, thiscall); + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveAll, + thiscall); } } -static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +static bool +virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, + void *opaque ATTRIBUTE_UNUSED) { - return call->nonBlock; + return call->nonBlock && call->haveThread; } /* @@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock) || client->wantClose) timeout = 0; - /* If there are any non-blocking calls in the queue, - * then we don't want to sleep in poll() + /* If there are any non-blocking calls with an associated thread + * in the queue, then we don't want to sleep in poll() */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, @@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* If we were woken up because a new non-blocking call was queued, - * we need to re-poll to check if we can send it. + * we need to re-poll to check if we can send it. To be precise, we + * will re-poll even if a blocking call arrived when unhandled + * non-blocking calls are still in the queue. But this can't hurt. */ if (virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopWantNonBlock, NULL)) { - VIR_DEBUG("New non-blocking call arrived; repolling"); + VIR_DEBUG("The queue contains new non-blocking call(s);" + " repolling"); continue; } } @@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } /* Iterate through waiting calls and if any are - * complete, remove them from the dispatch list.. + * complete, remove them from the dispatch list. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); - /* Iterate through waiting calls and if any are - * non-blocking, remove them from the dispatch list... + /* Iterate through waiting calls and wake up and detach threads + * attached to non-blocking calls. */ - virNetClientCallRemovePredicate(&client->waitDispatch, - virNetClientIOEventLoopRemoveNonBlocking, - thiscall); + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopDetachNonBlocking, + thiscall); /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { @@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, return 2; } - /* We're not done, but we're non-blocking */ + /* We're not done, but we're non-blocking; keep the call queued */ if (thiscall->nonBlock) { + thiscall->haveThread = false; virNetClientIOEventLoopPassTheBuck(client, thiscall); - if (thiscall->sentSomeData) { - return 1; - } else { - virNetClientCallRemove(&client->waitDispatch, thiscall); - return 0; - } + return 1; } if (fds[0].revents & (POLLHUP | POLLERR)) { @@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - error: virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); @@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } - /* If we're non-blocking, get outta here */ + /* If we're non-blocking, we were either queued (and detached) or the + * call was not sent because of an error. + */ if (thiscall->nonBlock) { - if (thiscall->sentSomeData) + if (!thiscall->haveThread) rv = 1; /* In progress */ else rv = 0; /* none at all */ @@ -1708,7 +1687,7 @@ done: /* - * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * Returns 2 if fully sent, 1 if queued (only for nonBlock==true), * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientSendInternal(virNetClientPtr client, @@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client, ret = virNetClientIO(client, call); - /* If partially sent, then the call is still on the dispatch queue */ - if (ret == 1) { - call->haveThread = false; - } else { - ignore_value(virCondDestroy(&call->cond)); - } + /* If queued, the call will be finished and freed later by another thread; + * we're done. */ + if (ret == 1) + return 1; + + ignore_value(virCondDestroy(&call->cond)); cleanup: - if (ret != 1) - VIR_FREE(call); + VIR_FREE(call); return ret; } -- 1.7.10.2 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list