When a client wants to send a keepalive message it needs to do so in a non-blocking way to avoid blocking its event loop. This patch adds dontBlock flag which says that the call should be processed without blocking. Such calls do not have a thread waiting for the result associated with them. This means, that sending such call fails if no thread is dispatching and writing to the socket would block. In case there is a thread waiting for its (normal) call to finish, sending non-blocking call just pushes it into the queue and lets the dispatching thread send it. The thread which has the buck tries to send all non-blocking calls in the queue in a best effort way---if sending them would block or there's an error on the socket, non-blocking calls are simply removed from the queue and discarded. In case a non-blocking call is partially sent but sending the rest of it would block, it is moved into client's unfinishedCall and left for future delivery. Every sending attempt first sends the rest of unfinishedCall and than continues with other queued calls. --- Notes: Version 5: - partially sent non-blocking calls now work even for SASL (or other transports that cache data internally) - fixed several other bugs in that area Version 4: - correctly handle partially sent non-blocking calls that would block Version 3: - no changes Version 2: - no changes src/rpc/virnetclient.c | 285 ++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 229 insertions(+), 56 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 2b5f67c..66d86e0 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,8 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool dontBlock; + bool sending; virCond cond; @@ -86,6 +88,9 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; + /* Unfinished call that needs to be finished before any of the calls in + * the queue can be processed */ + virNetClientCallPtr unfinishedCall; /* List of threads currently waiting for dispatch */ virNetClientCallPtr waitDispatch; @@ -94,6 +99,11 @@ struct _virNetClient { }; +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock); + static void virNetClientLock(virNetClientPtr client) { virMutexLock(&client->lock); @@ -739,6 +749,7 @@ virNetClientIOWriteMessage(virNetClientPtr client, { ssize_t ret; + thecall->sending = true; ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); @@ -754,6 +765,7 @@ virNetClientIOWriteMessage(virNetClientPtr client, return -1; } thecall->msg->bufferOffset = thecall->msg->bufferLength = 0; + thecall->sending = false; if (thecall->expectReply) thecall->mode = VIR_NET_CLIENT_MODE_WAIT_RX; else @@ -768,26 +780,45 @@ static ssize_t virNetClientIOHandleOutput(virNetClientPtr client) { virNetClientCallPtr thecall = client->waitDispatch; + ssize_t ret = -1; while (thecall && thecall->mode != VIR_NET_CLIENT_MODE_WAIT_TX) thecall = thecall->next; + /* If there is an unfinished non-blocking call, process it first */ + if (client->unfinishedCall) { + client->unfinishedCall->next = thecall; + thecall = client->unfinishedCall; + } + if (!thecall) - return -1; /* Shouldn't happen, but you never know... */ + goto cleanup; /* Shouldn't happen, but you never know... */ while (thecall) { - ssize_t ret = virNetClientIOWriteMessage(client, thecall); + ret = virNetClientIOWriteMessage(client, thecall); if (ret < 0) - return ret; + goto cleanup; + + if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) { + /* Blocking write, go back to event loop */ + ret = 0; + goto cleanup; + } - if (thecall->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - return 0; /* Blocking write, to back to event loop */ + if (thecall == client->unfinishedCall) + VIR_DEBUG("Nonblocking call %p finished", thecall); thecall = thecall->next; } - return 0; /* No more calls to send, all done */ + ret = 0; /* No more calls to send, all done */ + +cleanup: + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; + + return ret; } static ssize_t @@ -870,6 +901,89 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static void +virNetClientDiscardNonBlocking(virNetClientPtr client, + virNetClientCallPtr thiscall, + bool error) +{ + virNetClientCallPtr call = client->waitDispatch; + virNetClientCallPtr prev = NULL; + + if (client->unfinishedCall) { + client->unfinishedCall->next = call; + call = client->unfinishedCall; + } + + while (call) { + virNetClientCallPtr next = call->next; + + if (!call->dontBlock) { + prev = call; + goto skip; + } + + /* We can't remove nonblocking call which was already partially sent + * to the remote party (unless there was an error in which case we + * won't be able to send anything anymore anyway); we store it in + * unfinishedCall and when someone needs to send something in the + * future, it will first send the rest of the unfinishedCall. + */ + if (!error && call->sending) { + VIR_DEBUG("Can't finish nonblocking call %p without blocking", + call); + if (call == client->unfinishedCall) + goto skip; + + client->unfinishedCall = call; + goto next; + } + + /* We should never free thiscall since it will be freed by the caller. + * We shouldn't remove thiscall from the queue either since that is + * handled elsewhere. + */ + if (call == thiscall) { + prev = call; + goto skip; + } + + /* Remove and free completed calls or calls that we didn't even get to + * without blocking or error. + */ + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) { + const char *action; + if (call->sending > 0) + action = "finish"; + else + action = "send"; + + VIR_DEBUG("Can't %s nonblocking call %p without %s", + action, call, error ? "error" : "blocking"); + } + + if (call == client->unfinishedCall) { + client->unfinishedCall = NULL; + virNetMessageFree(call->msg); + VIR_FREE(call); + goto skip; + } + + virNetMessageFree(call->msg); + VIR_FREE(call); + +next: + if (prev) + prev->next = next; + else + client->waitDispatch = next; +skip: + call = next; + } + + if (client->unfinishedCall) + client->unfinishedCall->next = NULL; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -879,7 +993,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) { struct pollfd fds[2]; - int ret; + int pollret; + bool error; + int ret = -1; fds[0].fd = virNetSocketGetFD(client->sock); fds[1].fd = client->wakeupReadFD; @@ -902,11 +1018,19 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0; fds[1].events = POLLIN; + if (client->unfinishedCall) + fds[0].events = POLLOUT; + while (tmp) { if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) fds[0].events |= POLLIN; if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) fds[0].events |= POLLOUT; + /* We don't want to sleep in poll if any of the calls is + * non-blocking + */ + if (tmp->dontBlock) + timeout = 0; tmp = tmp->next; } @@ -938,8 +1062,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); repoll: - ret = poll(fds, ARRAY_CARDINALITY(fds), timeout); - if (ret < 0 && errno == EAGAIN) + pollret = poll(fds, ARRAY_CARDINALITY(fds), timeout); + if (pollret < 0 && errno == EAGAIN) goto repoll; ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -961,7 +1085,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - if (ret < 0) { + if (pollret < 0) { if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -979,8 +1103,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* All calls in the queue have been sent or sending would block, remove + * nonblocking calls since we did all we could for them. + */ + error = !!(fds[0].revents & (POLLHUP | POLLERR)); + virNetClientDiscardNonBlocking(client, thiscall, error); + + /* Iterate through waiting calls and if any are complete, tell + * their threads to wake up. */ tmp = client->waitDispatch; prev = NULL; @@ -1006,41 +1136,48 @@ static int virNetClientIOEventLoop(virNetClientPtr client, tmp = tmp->next; } - /* Now see if *we* are done */ + /* Now see if *we* are done or deferred */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* We're at head of the list already, so - * remove us - */ - client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } - return 0; + ret = 0; + goto pass; } - if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup / error event on socket")); goto error; } - } + if (thiscall->dontBlock) { + if (thiscall == client->unfinishedCall) { + ret = 0; + } else { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Can't send nonblocking call without blocking")); + } + goto pass; + } + } error: - client->waitDispatch = thiscall->next; - VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); + virNetClientDiscardNonBlocking(client, thiscall, true); + VIR_DEBUG("Giving up the buck due to I/O error"); + +pass: + if (thiscall != client->unfinishedCall) + client->waitDispatch = thiscall->next; + else if (ret != 0) + client->unfinishedCall = NULL; + + VIR_DEBUG("Giving up the buck call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* See if someone else is still waiting * and if so, then pass the buck ! */ if (client->waitDispatch) { VIR_DEBUG("Passing the buck to %p", client->waitDispatch); virCondSignal(&client->waitDispatch->cond); } - return -1; + return ret; } @@ -1082,39 +1219,43 @@ static int virNetClientIO(virNetClientPtr client, { int rv = -1; - VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", + VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d" + " length=%zu dontBlock=%d dispatch=%p", thiscall->msg->header.prog, thiscall->msg->header.vers, thiscall->msg->header.serial, thiscall->msg->header.proc, thiscall->msg->header.type, thiscall->msg->bufferLength, + thiscall->dontBlock, client->waitDispatch); /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + virNetClientCallPtr tmp; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; } - VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp->next) + tmp = tmp->next; + tmp->next = thiscall; + + if (thiscall->dontBlock) { + VIR_DEBUG("Sending non-blocking call while another thread is" + " dispatching; it will send the call for us"); + return 0; + } + + VIR_DEBUG("Going to sleep call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { if (client->waitDispatch == thiscall) { @@ -1133,7 +1274,7 @@ static int virNetClientIO(virNetClientPtr client, return -1; } - VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("Woken up from sleep %p %p", client->waitDispatch, thiscall); /* Two reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to @@ -1157,7 +1298,8 @@ static int virNetClientIO(virNetClientPtr client, client->waitDispatch = thiscall; } - VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); + VIR_DEBUG("We have the buck call=%p unfinishedCall=%p waitDispatch=%p", + thiscall, client->unfinishedCall, client->waitDispatch); /* * The buck stops here! * @@ -1184,7 +1326,8 @@ static int virNetClientIO(virNetClientPtr client, rv = -1; cleanup: - VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); + VIR_DEBUG("All done with our call %p %d unfinishedCall=%p waitDispatch=%p", + thiscall, rv, client->unfinishedCall, client->waitDispatch); return rv; } @@ -1223,9 +1366,11 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +static int +virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool dontBlock) { virNetClientCallPtr call; int ret = -1; @@ -1238,7 +1383,7 @@ int virNetClientSend(virNetClientPtr client, if (expectReply && (msg->bufferLength != 0) && - (msg->header.status == VIR_NET_CONTINUE)) { + (msg->header.status == VIR_NET_CONTINUE || dontBlock)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("Attempt to send an asynchronous message with a synchronous reply")); return -1; @@ -1251,10 +1396,15 @@ int virNetClientSend(virNetClientPtr client, virNetClientLock(client); - if (virCondInit(&call->cond) < 0) { - virNetError(VIR_ERR_INTERNAL_ERROR, "%s", - _("cannot initialize condition variable")); - goto cleanup; + /* We don't need call->cond for non-blocking calls since there's no + * thread to be woken up anyway + */ + if (!dontBlock) { + if (virCondInit(&call->cond) < 0) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("cannot initialize condition variable")); + goto cleanup; + } } if (msg->bufferLength) @@ -1263,12 +1413,35 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->dontBlock = dontBlock; ret = virNetClientIO(client, call); cleanup: ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + if (ret != 0) { + VIR_FREE(call); + } else if (dontBlock) { + /* Only free the call if it was completed since otherwise it was just + * queued up and will be processed later. + */ + if (call->mode == VIR_NET_CLIENT_MODE_COMPLETE) { + /* We need to free the message as well since no-one is waiting for + * it. + */ + virNetMessageFree(msg); + VIR_FREE(call); + } + } else { + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply) +{ + return virNetClientSendInternal(client, msg, expectReply, false); +} -- 1.7.7.2 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list