From: "Daniel P. Berrange" <berrange@xxxxxxxxxx> Split the existing virNetClientSend into two parts virNetClientSend and virNetClientSendNoReply, instead of having a 'bool expectReply' parameter. Add a new virNetClientSendNonBlock which returns 2 on full send, 1 on partial send, 0 on no send, -1 on error If a partial send occurs, then a subsequent call to any of the virNetClientSend* APIs will finish any outstanding I/O. TODO: the virNetClientEvent event handler could be used to speed up completion of partial sends if an event loop is present. * src/rpc/virnetclientprogram.c, src/rpc/virnetclientstream.c: Update for changed API --- src/rpc/virnetclient.c | 249 +++++++++++++++++++++++++++++++++------- src/rpc/virnetclient.h | 12 ++- src/rpc/virnetclientprogram.c | 2 +- src/rpc/virnetclientstream.c | 11 ++- 4 files changed, 224 insertions(+), 50 deletions(-) diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9..b0ed507 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -55,6 +55,8 @@ struct _virNetClientCall { virNetMessagePtr msg; bool expectReply; + bool nonBlock; + bool haveThread; virCond cond; @@ -86,8 +88,15 @@ struct _virNetClient { int wakeupSendFD; int wakeupReadFD; - /* List of threads currently waiting for dispatch */ + /* + * List of calls currently waiting for dispatch + * The calls should all have threads waiting for + * them, except possibly the first call in the list + * which might be a partially sent non-blocking call. + */ virNetClientCallPtr waitDispatch; + /* Whether a thread is dispatching */ + bool haveTheBuck; size_t nstreams; virNetClientStreamPtr *streams; @@ -555,7 +564,7 @@ virNetClientCallDispatchReply(virNetClientPtr client) virNetClientCallPtr thecall; /* Ok, definitely got an RPC reply now find - out who's been waiting for it */ + out which waiting call is associated with it */ thecall = client->waitDispatch; while (thecall && !(thecall->msg->header.prog == client->msg.header.prog && @@ -896,10 +905,31 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static void virNetClientPassTheBuck(virNetClientPtr client) +{ + virNetClientCallPtr tmp = client->waitDispatch; + + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + while (tmp) { + if (tmp->haveThread) { + VIR_DEBUG("Passing the buck to %p", tmp); + virCondSignal(&tmp->cond); + return; + } + tmp = tmp->next; + } + VIR_DEBUG("No thread to pass the buck to"); +} + + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck * to someone else. + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) @@ -924,6 +954,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (virNetSocketHasCachedData(client->sock)) timeout = 0; + /* If we're a non-blocking call, then we don't + * want to wait for I/O readyness + */ + if (thiscall->nonBlock) + timeout = 0; + fds[0].events = fds[0].revents = 0; fds[1].events = fds[1].revents = 0; @@ -975,8 +1011,34 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* If we have existing SASL decoded data, pretend * the socket became readable so we consume it */ - if (virNetSocketHasCachedData(client->sock)) + if (virNetSocketHasCachedData(client->sock)) { fds[0].revents |= POLLIN; + } else if (ret == 0 && thiscall->nonBlock) { + if (thiscall->msg->bufferOffset == 0) { + /* No data sent at all, remove ourselves from the list */ + tmp = client->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp == thiscall) { + if (prev) { + prev->next = thiscall->next; + } else { + client->waitDispatch = thiscall->next; + } + break; + } + prev = tmp; + tmp = tmp->next; + } + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 0; + } else { + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + virNetClientPassTheBuck(client); + return 1; /* partial send */ + } + } if (fds[1].revents) { VIR_DEBUG("Woken up from poll by other thread"); @@ -988,6 +1050,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } if (ret < 0) { + /* XXX what's this dubious errno check doing ? */ if (errno == EWOULDBLOCK) continue; virReportSystemError(errno, @@ -1005,8 +1068,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, goto error; } - /* Iterate through waiting threads and if - * any are complete then tell 'em to wakeup + /* Iterate through waiting calls and if any are + * complete, remove them from the dispatch list.. */ tmp = client->waitDispatch; prev = NULL; @@ -1019,13 +1082,25 @@ static int virNetClientIOEventLoop(virNetClientPtr client, else client->waitDispatch = tmp->next; - /* And wake them up.... - * ...they won't actually wakeup until + /* + * ...if the call being removed from the list + * still has a thread, then wake that thread up, + * otherwise free the call. The latter should + * only happen for calls without replies. + * + * ...the threads won't actually wakeup until * we release our mutex a short while * later... */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); + if (tmp->haveThread) { + VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); + virCondSignal(&tmp->cond); + } else { + if (tmp->expectReply) + VIR_WARN("Got a call expecting a reply but without a waiting thread"); + ignore_value(virCondDestroy(&tmp->cond)); + VIR_FREE(tmp); + } } else { prev = tmp; } @@ -1039,13 +1114,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, */ client->waitDispatch = thiscall->next; VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } - return 0; + virNetClientPassTheBuck(client); + return 2; } @@ -1060,16 +1130,21 @@ static int virNetClientIOEventLoop(virNetClientPtr client, error: client->waitDispatch = thiscall->next; VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); - /* See if someone else is still waiting - * and if so, then pass the buck ! */ - if (client->waitDispatch) { - VIR_DEBUG("Passing the buck to %p", client->waitDispatch); - virCondSignal(&client->waitDispatch->cond); - } + virNetClientPassTheBuck(client); return -1; } +static void +virNetClientUpdateIOCallback(virNetClientPtr client, bool enabled) +{ + int events = 0; + if (enabled) { + events |= VIR_EVENT_HANDLE_READABLE; + } + virNetSocketUpdateIOCallback(client->sock, events); +} + /* * This function sends a message to remote server and awaits a reply * @@ -1102,11 +1177,15 @@ error: * nation are blamed on another, providing an opportunity for war." * * NB(5) Don't Panic! + * + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error */ static int virNetClientIO(virNetClientPtr client, virNetClientCallPtr thiscall) { int rv = -1; + virNetClientCallPtr tmp; VIR_DEBUG("Outgoing message prog=%u version=%u serial=%u proc=%d type=%d length=%zu dispatch=%p", thiscall->msg->header.prog, @@ -1117,20 +1196,27 @@ static int virNetClientIO(virNetClientPtr client, thiscall->msg->bufferLength, client->waitDispatch); + /* Trivially detect blocking if someone else has the buck already */ + if (client->haveTheBuck && + thiscall->nonBlock) + return 0; + + /* Stick ourselves on the end of the wait queue */ + tmp = client->waitDispatch; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + client->waitDispatch = thiscall; + /* Check to see if another thread is dispatching */ - if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; + if (client->haveTheBuck) { char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { + /* Something went wrong, so we need to remove that call we just added */ if (tmp) tmp->next = NULL; else @@ -1143,6 +1229,7 @@ static int virNetClientIO(virNetClientPtr client, VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { + /* Something went wrong, so we need to remove that call we previously added */ if (client->waitDispatch == thiscall) { client->waitDispatch = thiscall->next; } else { @@ -1167,7 +1254,7 @@ static int virNetClientIO(virNetClientPtr client, * our reply */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - rv = 0; + rv = 2; /* * We avoided catching the buck and our reply is ready ! * We've already had 'thiscall' removed from the list @@ -1177,12 +1264,10 @@ static int virNetClientIO(virNetClientPtr client, } /* Grr, someone passed the buck onto us ... */ - - } else { - /* We're first to catch the buck */ - client->waitDispatch = thiscall; } + client->haveTheBuck = true; + VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall); /* * The buck stops here! @@ -1198,17 +1283,19 @@ static int virNetClientIO(virNetClientPtr client, * cause the event loop thread to be blocked on the * mutex for the duration of the call */ - virNetSocketUpdateIOCallback(client->sock, 0); + virNetClientUpdateIOCallback(client, false); virResetLastError(); rv = virNetClientIOEventLoop(client, thiscall); - virNetSocketUpdateIOCallback(client->sock, VIR_EVENT_HANDLE_READABLE); + virNetClientUpdateIOCallback(client, true); if (rv == 0 && virGetLastError()) rv = -1; + client->haveTheBuck = false; + cleanup: VIR_DEBUG("All done with our call %p %p %d", client->waitDispatch, thiscall, rv); return rv; @@ -1227,7 +1314,7 @@ void virNetClientIncomingEvent(virNetSocketPtr sock, goto done; /* This should be impossible, but it doesn't hurt to check */ - if (client->waitDispatch) + if (client->haveTheBuck) goto done; VIR_DEBUG("Event fired %p %d", sock, events); @@ -1249,9 +1336,14 @@ done: } -int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply) +/* + * Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), + * 0 if nothing sent (only for nonBlock==true) and -1 on error + */ +static int virNetClientSendInternal(virNetClientPtr client, + virNetMessagePtr msg, + bool expectReply, + bool nonBlock) { virNetClientCallPtr call; int ret = -1; @@ -1270,6 +1362,12 @@ int virNetClientSend(virNetClientPtr client, return -1; } + if (expectReply && nonBlock) { + virNetError(VIR_ERR_INTERNAL_ERROR, "%s", + _("Attempt to send an non-blocking message with a synchronous reply")); + return -1; + } + if (VIR_ALLOC(call) < 0) { virReportOOMError(); return -1; @@ -1290,12 +1388,75 @@ int virNetClientSend(virNetClientPtr client, call->mode = VIR_NET_CLIENT_MODE_WAIT_RX; call->msg = msg; call->expectReply = expectReply; + call->nonBlock = nonBlock; + call->haveThread = true; ret = virNetClientIO(client, call); cleanup: - ignore_value(virCondDestroy(&call->cond)); - VIR_FREE(call); + /* If partially sent, then the call is still on the dispatch queue */ + if (ret == 1) { + call->haveThread = false; + } else { + ignore_value(virCondDestroy(&call->cond)); + VIR_FREE(call); + } virNetClientUnlock(client); return ret; } + + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, and wait for the reply synchronously + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSend(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, true, false); + if (ret < 0) + return -1; + return 0; +} + + +/* + * @msg: a message allocated on heap or stack + * + * Send a message synchronously, without any reply + * + * The caller is responsible for free'ing @msg if it was allocated + * on the heap + * + * Returns 0 on success, -1 on failure + */ +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg) +{ + int ret = virNetClientSendInternal(client, msg, false, false); + if (ret < 0) + return -1; + return 0; +} + +/* + * @msg: a message allocated on the heap. + * + * Send a message asynchronously, without any reply + * + * The caller is responsible for free'ing @msg, *except* if + * this method returns -1. + * + * Returns 2 on full send, 1 on partial send, 0 on no send, -1 on error + */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg) +{ + return virNetClientSendInternal(client, msg, false, true); +} diff --git a/src/rpc/virnetclient.h b/src/rpc/virnetclient.h index fb679e8..d3c112a 100644 --- a/src/rpc/virnetclient.h +++ b/src/rpc/virnetclient.h @@ -67,9 +67,17 @@ int virNetClientAddStream(virNetClientPtr client, void virNetClientRemoveStream(virNetClientPtr client, virNetClientStreamPtr st); +/* Send a message and wait for reply */ int virNetClientSend(virNetClientPtr client, - virNetMessagePtr msg, - bool expectReply); + virNetMessagePtr msg); + +/* Send a message without needing a reply */ +int virNetClientSendNoReply(virNetClientPtr client, + virNetMessagePtr msg); + +/* Send a message without needing a reply, and don't block on I/O */ +int virNetClientSendNonBlock(virNetClientPtr client, + virNetMessagePtr msg); # ifdef HAVE_SASL void virNetClientSetSASLSession(virNetClientPtr client, diff --git a/src/rpc/virnetclientprogram.c b/src/rpc/virnetclientprogram.c index 36e2384..cb02a25 100644 --- a/src/rpc/virnetclientprogram.c +++ b/src/rpc/virnetclientprogram.c @@ -327,7 +327,7 @@ int virNetClientProgramCall(virNetClientProgramPtr prog, if (virNetMessageEncodePayload(msg, args_filter, args) < 0) goto error; - if (virNetClientSend(client, msg, true) < 0) + if (virNetClientSend(client, msg) < 0) goto error; /* None of these 3 should ever happen here, because diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index 7e2d9ae..309d48d 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -361,8 +361,13 @@ int virNetClientStreamSendPacket(virNetClientStreamPtr st, wantReply = true; } - if (virNetClientSend(client, msg, wantReply) < 0) - goto error; + if (wantReply) { + if (virNetClientSend(client, msg) < 0) + goto error; + } else { + if (virNetClientSendNoReply(client, msg) < 0) + goto error; + } virNetMessageFree(msg); @@ -407,7 +412,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, VIR_DEBUG("Dummy packet to wait for stream data"); virMutexUnlock(&st->lock); - ret = virNetClientSend(client, msg, true); + ret = virNetClientSend(client, msg); virMutexLock(&st->lock); virNetMessageFree(msg); -- 1.7.6.4 -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list