On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote: > To eliminate the dependancy on GNULIB's poll impl, we need > to change the RPC client code to use GMainLoop. We don't > really want to use GIOChannel, but it provides the most > convenient way to do socket event watches with Windows > portability. The other alternative would be to use GSocket > but that is a much more complex change affecting libvirt > more broadly. > > Signed-off-by: Daniel P. Berrangé <berrange@xxxxxxxxxx> > --- > src/rpc/virnetclient.c | 215 ++++++++++++++++++++++------------------- > 1 file changed, 113 insertions(+), 102 deletions(-) > > diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c > index 031a99711f..9069c57113 100644 > --- a/src/rpc/virnetclient.c > +++ b/src/rpc/virnetclient.c > @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client, > * queue and close the client because we set client->wantClose. > */ This comment should be probably updated to not reference threads. > if (client->haveTheBuck) { > - char ignore = 1; > - size_t len = sizeof(ignore); > - > - if (safewrite(client->wakeupSendFD, &ignore, len) != len) > - VIR_ERROR(_("failed to wake up polling thread")); > + g_main_loop_quit(client->eventLoop); > } else { > virNetClientIOEventLoopPassTheBuck(client, NULL); > } > @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client, > #endif > > > +static gboolean > +virNetClientIOEventTLS(int fd, > + GIOCondition ev, > + gpointer opaque); > + > +static gboolean > +virNetClientTLSHandshake(virNetClientPtr client) > +{ > + GIOCondition ev; > + int ret; > + > + ret = virNetTLSSessionHandshake(client->tls); > + > + if (ret <= 0) > + return FALSE; > + > + if (virNetTLSSessionGetHandshakeStatus(client->tls) == > + VIR_NET_TLS_HANDSHAKE_RECVING) > + ev = G_IO_IN; > + else > + ev = G_IO_OUT; > + > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > + ev, > + client->eventCtx, > + virNetClientIOEventTLS, client, NULL); > + > + return TRUE; > +} > + > + > +static gboolean > +virNetClientIOEventTLS(int fd G_GNUC_UNUSED, > + GIOCondition ev G_GNUC_UNUSED, > + gpointer opaque) > +{ > + virNetClientPtr client = opaque; > + > + if (!virNetClientTLSHandshake(client)) > + g_main_loop_quit(client->eventLoop); > + > + return G_SOURCE_REMOVE; > +} > + > + > +static gboolean > +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED, > + GIOCondition ev G_GNUC_UNUSED, > + gpointer opaque) > +{ > + virNetClientPtr client = opaque; > + > + g_main_loop_quit(client->eventLoop); > + > + return G_SOURCE_REMOVE; > +} > + > + > int virNetClientSetTLSSession(virNetClientPtr client, > virNetTLSContextPtr tls) > { > int ret; > char buf[1]; > int len; > - struct pollfd fds[1]; > > #ifndef WIN32 > sigset_t oldmask, blockedsigs; > @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client, > > virNetSocketSetTLSSession(client->sock, client->tls); > > - for (;;) { > - ret = virNetTLSSessionHandshake(client->tls); > - > - if (ret < 0) > - goto error; > - if (ret == 0) > - break; > - > - fds[0].fd = virNetSocketGetFD(client->sock); > - fds[0].revents = 0; > - if (virNetTLSSessionGetHandshakeStatus(client->tls) == > - VIR_NET_TLS_HANDSHAKE_RECVING) > - fds[0].events = POLLIN; > - else > - fds[0].events = POLLOUT; > - > + virResetLastError(); > + if (virNetClientTLSHandshake(client)) { > #ifndef WIN32 > /* Block SIGWINCH from interrupting poll in curses programs, > * then restore the original signal mask again immediately > @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client, > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > #endif /* !WIN32 */ > > - repoll: > - ret = poll(fds, G_N_ELEMENTS(fds), -1); > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > - goto repoll; > + g_main_loop_run(client->eventLoop); > > #ifndef WIN32 > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); > #endif /* !WIN32 */ > } > > + if (virGetLastErrorCode() != VIR_ERR_OK) > + goto error; > + > ret = virNetTLSContextCheckCertificate(tls, client->tls); > > if (ret < 0) > @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client, > * etc. If we make the grade, it will send us a '\1' byte. > */ > > - fds[0].fd = virNetSocketGetFD(client->sock); > - fds[0].revents = 0; > - fds[0].events = POLLIN; > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > + G_IO_IN, > + client->eventCtx, > + virNetClientIOEventTLSConfirm, client, NULL); > > #ifndef WIN32 > /* Block SIGWINCH from interrupting poll in curses programs */ > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > #endif /* !WIN32 */ > > - repoll2: > - ret = poll(fds, G_N_ELEMENTS(fds), -1); > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > - goto repoll2; > + g_main_loop_run(client->eventLoop); > > #ifndef WIN32 > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); > @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client) > static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, > void *opaque) > { > - struct pollfd *fd = opaque; > + GIOCondition *ev = opaque; > > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) > - fd->events |= POLLIN; > + *ev |= G_IO_IN; > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) > - fd->events |= POLLOUT; > + *ev |= G_IO_OUT; > > return false; > } > @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, > } > > > +static gboolean > +virNetClientIOEventFD(int fd G_GNUC_UNUSED, > + GIOCondition ev, > + gpointer opaque) > +{ > + GIOCondition *rev = opaque; > + *rev = ev; > + > + return G_SOURCE_REMOVE; > +} > + > + > /* > * Process all calls pending dispatch/receive until we > * get a reply to our own call. Then quit and pass the buck > @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, > static int virNetClientIOEventLoop(virNetClientPtr client, > virNetClientCallPtr thiscall) > { > - struct pollfd fds[2]; > bool error = false; > int closeReason; > - int ret; > - > - fds[0].fd = virNetSocketGetFD(client->sock); > - fds[1].fd = client->wakeupReadFD; > > for (;;) { > - char ignore; > #ifndef WIN32 > sigset_t oldmask, blockedsigs; > #endif /* !WIN32 */ > int timeout = -1; > virNetMessagePtr msg = NULL; > + GIOCondition ev = 0; > + GIOCondition rev = 0; > > /* If we have existing SASL decoded data we don't want to sleep in > * the poll(), just check if any other FDs are also ready. > @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > if (timeout == -1) > timeout = virKeepAliveTimeout(client->keepalive); > > - fds[0].events = fds[0].revents = 0; > - fds[1].events = fds[1].revents = 0; > - > - fds[1].events = POLLIN; > - > /* Calculate poll events for calls */ > virNetClientCallMatchPredicate(client->waitDispatch, > virNetClientIOEventLoopPollEvents, > - &fds[0]); > + &ev); > > /* We have to be prepared to receive stream data > * regardless of whether any of the calls waiting > * for dispatch are for streams. > */ > if (client->nstreams) > - fds[0].events |= POLLIN; > + ev |= G_IO_IN; > + > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > + ev, > + client->eventCtx, > + virNetClientIOEventFD, &rev, NULL); > > /* Release lock while poll'ing so other threads > * can stuff themselves on the queue */ > @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > sigaddset(&blockedsigs, SIGCHLD); > # endif > sigaddset(&blockedsigs, SIGPIPE); > + > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > #endif /* !WIN32 */ > > - repoll: > - ret = poll(fds, G_N_ELEMENTS(fds), timeout); > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > - goto repoll; > + while (!rev) > + g_main_context_iteration(client->eventCtx, TRUE); Is there a reason why we don't use g_main_loop_run() here and use g_main_loop_quit() in virNetClientIOEventFD() the same way we use it in virNetClientIOEventTLSConfirm() ? If I'm looking at the code correctly the call to g_main_loop_quit() from virNetClientIO() where we want to force other threads from poll would be ignored by the g_main_context_iteration(). This would be a change in behavior from the old core where the write to "client->wakeupSendFD" would make the poll() function wake since it is listening on "client->wakeupReadFD" as well. Otherwise looks good. Pavel > > #ifndef WIN32 > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); > @@ -1644,12 +1679,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > > virObjectLock(client); > > - if (ret < 0) { > - virReportSystemError(errno, > - "%s", _("poll on socket failed")); > - goto error; > - } > - > if (virKeepAliveTrigger(client->keepalive, &msg)) { > virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE); > } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) { > @@ -1661,7 +1690,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > * the socket became readable so we consume it > */ > if (virNetSocketHasCachedData(client->sock)) > - fds[0].revents |= POLLIN; > + rev |= G_IO_IN; > > /* If wantClose flag is set, pretend there was an error on the socket, > * but still read and process any data we received so far. > @@ -1669,23 +1698,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > if (client->wantClose) > error = true; > > - if (fds[1].revents) { > - VIR_DEBUG("Woken up from poll by other thread"); > - if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { > - virReportSystemError(errno, "%s", > - _("read on wakeup fd failed")); > - virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_ERROR); > - error = true; > - /* Fall through to process any pending data. */ > - } > - } > - > - if (fds[0].revents & POLLHUP) > + if (rev & G_IO_HUP) > closeReason = VIR_CONNECT_CLOSE_REASON_EOF; > else > closeReason = VIR_CONNECT_CLOSE_REASON_ERROR; > > - if (fds[0].revents & POLLOUT) { > + if (rev & G_IO_OUT) { > if (virNetClientIOHandleOutput(client) < 0) { > virNetClientMarkClose(client, closeReason); > error = true; > @@ -1693,7 +1711,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > } > } > > - if (fds[0].revents & POLLIN) { > + if (rev & G_IO_IN) { > if (virNetClientIOHandleInput(client) < 0) { > virNetClientMarkClose(client, closeReason); > error = true; > @@ -1725,13 +1743,13 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > if (error) > goto error; > > - if (fds[0].revents & POLLHUP) { > + if (rev & G_IO_HUP) { > virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > _("received hangup event on socket")); > virNetClientMarkClose(client, closeReason); > goto error; > } > - if (fds[0].revents & POLLERR) { > + if (rev & G_IO_ERR) { > virReportError(VIR_ERR_INTERNAL_ERROR, "%s", > _("received error event on socket")); > virNetClientMarkClose(client, closeReason); > @@ -1858,15 +1876,8 @@ static int virNetClientIO(virNetClientPtr client, > > /* Check to see if another thread is dispatching */ > if (client->haveTheBuck) { > - char ignore = 1; > - > /* Force other thread to wakeup from poll */ > - if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { > - virNetClientCallRemove(&client->waitDispatch, thiscall); > - virReportSystemError(errno, "%s", > - _("failed to wake up polling thread")); > - return -1; > - } > + g_main_loop_quit(client->eventLoop); > > /* If we are non-blocking, detach the thread and keep the call in the > * queue. */ > -- > 2.24.1 >
Attachment:
signature.asc
Description: PGP signature