From: "Daniel P. Berrange" <berrange@xxxxxxxxxx> Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter. Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O. TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present. * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API --- src/rpc/virnetclient.c | 197 +++++++++++++++++++++++++++++++++++++++++++----- src/rpc/virnetclient.h | 4 + src/rpc/virnetsocket.c | 13 +++ src/rpc/virnetsocket.h | 1 + 4 files changed, 197 insertions(+), 18 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 96d1886..9891f55 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,9 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; + bool sentSomeData; virCond cond; @@ -86,7 +89,12 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; - /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; /* True if a thread holds the buck */ bool haveTheBuck; @@ -648,7 +656,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall; /* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -824,6 +832,8 @@ virNetClientIOWriteMessage(virNetClientPtr client, ret = virNetSocketWrite(client->sock, thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->bufferLength - thecall->msg->bufferOffset); + if (ret || virNetSocketHasPendingData(client->sock)) + thecall->sentSomeData = true; if (ret <= 0) return ret; @@ -1015,17 +1025,66 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, return false; /* - * ...they won't actually wakeup until + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleeping call %p", call); - virCondSignal(&call->cond); + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + } else { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } return true; } +static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (!call->nonBlock) + return false; + + /* + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until + * we release our mutex a short while + * later... + */ + if (call->haveThread) { + VIR_DEBUG("Waking up sleep %p", call); + virCondSignal(&call->cond); + return true; + } else if (!call->sentSomeData) { + if (call->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + return true; + } + + return false; +} + + static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) { VIR_DEBUG("Giving up the buck %p", thiscall); @@ -1033,19 +1092,29 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli /* See if someone else is still waiting * and if so, then pass the buck ! */ while (tmp) { - if (tmp != thiscall) { + if (tmp != thiscall && tmp->haveThread) { VIR_DEBUG("Passing the buck to %p", tmp); virCondSignal(&tmp->cond); break; } tmp = tmp->next; } + VIR_DEBUG("No thread to pass the buck to"); +} + + +static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) +{ + return call->nonBlock; } /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1068,6 +1137,14 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0; + /* If there are any non-blocking calls in the queue, + * then we don't want to sleep in poll() + */ + if (virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopWantNonBlock, + NULL)) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -1116,8 +1193,9 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -1129,6 +1207,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -1146,20 +1225,33 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientIOEventLoopRemoveDone, thiscall); + /* Iterate through waiting calls and if any are + * non-blocking, remove them from the dispatch list... + */ + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveNonBlocking, + thiscall); + /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall); - return 0; + return 2; } + /* We're not done, but we're non-blocking */ + if (thiscall->nonBlock) { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientIOEventLoopPassTheBuck(client, thiscall); + return thiscall->sentSomeData ? 1 : 0; + } if (fds[0].revents & (POLLHUP | POLLERR)) { virNetError(VIR_ERR_INTERNAL_ERROR, "%s", @@ -1218,7 +1310,31 @@ static void virNetClientIOUpdateCallback(virNetClientPtr client, * a strategy in power politics when the actions of one country/ * nation are blamed on another, providing an opportunity for war." * - * NB(5) Don't Panic! + * NB(5) If the 'thiscall' has the 'nonBlock' flag set, the caller + * must *NOT* free it, if this returns '1' (ie partial send). + * + * NB(6) The following input states are valid if *no* threads + * are currently executing this method + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * The following input states are valid, if n threads are currently + * executing + * + * - waitDispatch != NULL + * - 0 or 1 waitDispatch.nonBlock == false, without any threads + * - 0 or more waitDispatch.nonBlock == false, with threads + * + * The following output states are valid when all threads are done + * + * - waitDispatch == NULL, + * - waitDispatch != NULL, waitDispatch.nonBlock == true + * + * NB(7) Don't Panic! + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -1259,14 +1375,15 @@ static int virNetClientIO(virNetClientPtr client, } VIR_DEBUG("Wokeup from sleep %p %p", client->waitDispatch, thiscall); - /* Two reasons we can be woken up + /* Three reasons we can be woken up * 1. Other thread has got our reply ready for us * 2. Other thread is all done, and it is our turn to * be the dispatcher to finish waiting for * our reply + * 3. I/O was expected to block */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 0; + rv = 2; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1275,6 +1392,15 @@ static int virNetClientIO(virNetClientPtr client, goto cleanup; } + /* If we're non-blocking, get outta here */ + if (thiscall->nonBlock) { + if (thiscall->sentSomeData) + rv = 1; /* In progress */ + else + rv = 0; /* none at all */ + goto cleanup; + } + /* Grr, someone passed the buck onto us ... */ } @@ -1348,9 +1474,14 @@ done: } +/* + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error + */ static int virNetClientSendInternal(virNetClientPtr client, virNetMessagePtr msg, - bool expectReply) + bool expectReply, + bool nonBlock) { virNetClientCallPtr call; int ret = -1; @@ -1369,6 +1500,12 @@ static int virNetClientSendInternal(virNetClientPtr client, return -1; } + if (expectReply && nonBlock) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Attempt to send an non-blocking message with a synchronous reply")); + return -1; + } + if (VIR_ALLOC(call) < 0) { virReportOOMError(); return -1; @@ -1389,16 +1526,24 @@ static int virNetClientSendInternal(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->nonBlock = nonBlock; + call->haveThread = true; ret = virNetClientIO(client, call); cleanup: - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + /* If partially sent, then the call is still on the dispatch queue */ + if (ret == 1) { + call->haveThread = false; + } else { + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + /* * @msg: a message allocated on heap or stack * @@ -1412,7 +1557,7 @@ cleanup: int virNetClientSendWithReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, true); + int ret = virNetClientSendInternal(client, msg, true, false); if (ret < 0) return -1; return 0; @@ -1432,8 +1577,24 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg) { - int ret = virNetClientSendInternal(client, msg, false); + int ret = virNetClientSendInternal(client, msg, false, false); if (ret < 0) return -1; return 0; } + +/* + * @msg: a message allocated on the heap. + * + * Send a message asynchronously, without any reply + * + * The caller is responsible for free'ing @msg, *except* if + * this method returns -1. + * + * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(client, msg, false, true); +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index eef3eb3..71db543 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -73,6 +73,10 @@ int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client, virNetMessagePtr msg); +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg); + + # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, virNetSASLSessionPtr sasl); diff --git a/src/rpc/virnetsocket.c b/src/rpc/virnetsocket.c index 30b8fe6..2449353 100644 --- a/src/rpc/virnetsocket.c +++ b/src/rpc/virnetsocket.c @@ -931,6 +931,19 @@ bool virNetSocketHasCachedData(virNetSocketPtr sock ATTRIBUTE_UNUSED) } +bool virNetSocketHasPendingData(virNetSocketPtr sock ATTRIBUTE_UNUSED) +{ + bool hasPending = false; + virMutexLock(&sock->lock); +#if HAVE_SASL + if (sock->saslEncoded) + hasPending = true; +#endif + virMutexUnlock(&sock->lock); + return hasPending; +} + + static ssize_t virNetSocketReadWire(virNetSocketPtr sock, char *buf, size_t len) { char *errout = NULL; diff --git a/src/rpc/virnetsocket.h b/src/rpc/virnetsocket.h index e444aef..508dd47 100644 --- a/src/rpc/virnetsocket.h +++ b/src/rpc/virnetsocket.h @@ -106,6 +106,7 @@ void virNetSocketSetSASLSession(virNetSocketPtr sock, virNetSASLSessionPtr sess); # endif bool virNetSocketHasCachedData(virNetSocketPtr sock); +bool virNetSocketHasPendingData(virNetSocketPtr sock); void virNetSocketRef(virNetSocketPtr sock); void virNetSocketFree(virNetSocketPtr sock); -- 1.7.6.4 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list