On Thu, Jan 30, 2020 at 03:51:05PM +0100, Pavel Hrdina wrote: > On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote: > > To eliminate the dependancy on GNULIB's poll impl, we need > > to change the RPC client code to use GMainLoop. We don't > > really want to use GIOChannel, but it provides the most > > convenient way to do socket event watches with Windows > > portability. The other alternative would be to use GSocket > > but that is a much more complex change affecting libvirt > > more broadly. > > > > Signed-off-by: Daniel P. Berrangé <berrange@xxxxxxxxxx> > > --- > > src/rpc/virnetclient.c | 215 ++++++++++++++++++++++------------------- > > 1 file changed, 113 insertions(+), 102 deletions(-) > > > > diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c > > index 031a99711f..9069c57113 100644 > > --- a/src/rpc/virnetclient.c > > +++ b/src/rpc/virnetclient.c > > @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client, > > * queue and close the client because we set client->wantClose. > > */ > > This comment should be probably updated to not reference threads. I'm not sure what you mean here, as the comment looks still accurate to me. > > > if (client->haveTheBuck) { > > - char ignore = 1; > > - size_t len = sizeof(ignore); > > - > > - if (safewrite(client->wakeupSendFD, &ignore, len) != len) > > - VIR_ERROR(_("failed to wake up polling thread")); > > + g_main_loop_quit(client->eventLoop); > > } else { > > virNetClientIOEventLoopPassTheBuck(client, NULL); > > } > > @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client, > > #endif > > > > > > +static gboolean > > +virNetClientIOEventTLS(int fd, > > + GIOCondition ev, > > + gpointer opaque); > > + > > +static gboolean > > +virNetClientTLSHandshake(virNetClientPtr client) > > +{ > > + GIOCondition ev; > > + int ret; > > + > > + ret = virNetTLSSessionHandshake(client->tls); > > + > > + if (ret <= 0) > > + return FALSE; > > + > > + if (virNetTLSSessionGetHandshakeStatus(client->tls) == > > + VIR_NET_TLS_HANDSHAKE_RECVING) > > + ev = G_IO_IN; > > + else > > + ev = G_IO_OUT; > > + > > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > > + ev, > > + client->eventCtx, > > + virNetClientIOEventTLS, client, NULL); > > + > > + return TRUE; > > +} > > + > > + > > +static gboolean > > +virNetClientIOEventTLS(int fd G_GNUC_UNUSED, > > + GIOCondition ev G_GNUC_UNUSED, > > + gpointer opaque) > > +{ > > + virNetClientPtr client = opaque; > > + > > + if (!virNetClientTLSHandshake(client)) > > + g_main_loop_quit(client->eventLoop); > > + > > + return G_SOURCE_REMOVE; > > +} > > + > > + > > +static gboolean > > +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED, > > + GIOCondition ev G_GNUC_UNUSED, > > + gpointer opaque) > > +{ > > + virNetClientPtr client = opaque; > > + > > + g_main_loop_quit(client->eventLoop); > > + > > + return G_SOURCE_REMOVE; > > +} > > + > > + > > int virNetClientSetTLSSession(virNetClientPtr client, > > virNetTLSContextPtr tls) > > { > > int ret; > > char buf[1]; > > int len; > > - struct pollfd fds[1]; > > > > #ifndef WIN32 > > sigset_t oldmask, blockedsigs; > > @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client, > > > > virNetSocketSetTLSSession(client->sock, client->tls); > > > > - for (;;) { > > - ret = virNetTLSSessionHandshake(client->tls); > > - > > - if (ret < 0) > > - goto error; > > - if (ret == 0) > > - break; > > - > > - fds[0].fd = virNetSocketGetFD(client->sock); > > - fds[0].revents = 0; > > - if (virNetTLSSessionGetHandshakeStatus(client->tls) == > > - VIR_NET_TLS_HANDSHAKE_RECVING) > > - fds[0].events = POLLIN; > > - else > > - fds[0].events = POLLOUT; > > - > > + virResetLastError(); > > + if (virNetClientTLSHandshake(client)) { > > #ifndef WIN32 > > /* Block SIGWINCH from interrupting poll in curses programs, > > * then restore the original signal mask again immediately > > @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client, > > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > > #endif /* !WIN32 */ > > > > - repoll: > > - ret = poll(fds, G_N_ELEMENTS(fds), -1); > > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > > - goto repoll; > > + g_main_loop_run(client->eventLoop); > > > > #ifndef WIN32 > > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); > > #endif /* !WIN32 */ > > } > > > > + if (virGetLastErrorCode() != VIR_ERR_OK) > > + goto error; > > + > > ret = virNetTLSContextCheckCertificate(tls, client->tls); > > > > if (ret < 0) > > @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client, > > * etc. If we make the grade, it will send us a '\1' byte. > > */ > > > > - fds[0].fd = virNetSocketGetFD(client->sock); > > - fds[0].revents = 0; > > - fds[0].events = POLLIN; > > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > > + G_IO_IN, > > + client->eventCtx, > > + virNetClientIOEventTLSConfirm, client, NULL); > > > > #ifndef WIN32 > > /* Block SIGWINCH from interrupting poll in curses programs */ > > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > > #endif /* !WIN32 */ > > > > - repoll2: > > - ret = poll(fds, G_N_ELEMENTS(fds), -1); > > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > > - goto repoll2; > > + g_main_loop_run(client->eventLoop); > > > > #ifndef WIN32 > > ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); > > @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client) > > static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, > > void *opaque) > > { > > - struct pollfd *fd = opaque; > > + GIOCondition *ev = opaque; > > > > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) > > - fd->events |= POLLIN; > > + *ev |= G_IO_IN; > > if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) > > - fd->events |= POLLOUT; > > + *ev |= G_IO_OUT; > > > > return false; > > } > > @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, > > } > > > > > > +static gboolean > > +virNetClientIOEventFD(int fd G_GNUC_UNUSED, > > + GIOCondition ev, > > + gpointer opaque) > > +{ > > + GIOCondition *rev = opaque; > > + *rev = ev; > > + > > + return G_SOURCE_REMOVE; > > +} > > + > > + > > /* > > * Process all calls pending dispatch/receive until we > > * get a reply to our own call. Then quit and pass the buck > > @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, > > static int virNetClientIOEventLoop(virNetClientPtr client, > > virNetClientCallPtr thiscall) > > { > > - struct pollfd fds[2]; > > bool error = false; > > int closeReason; > > - int ret; > > - > > - fds[0].fd = virNetSocketGetFD(client->sock); > > - fds[1].fd = client->wakeupReadFD; > > > > for (;;) { > > - char ignore; > > #ifndef WIN32 > > sigset_t oldmask, blockedsigs; > > #endif /* !WIN32 */ > > int timeout = -1; > > virNetMessagePtr msg = NULL; > > + GIOCondition ev = 0; > > + GIOCondition rev = 0; > > > > /* If we have existing SASL decoded data we don't want to sleep in > > * the poll(), just check if any other FDs are also ready. > > @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > > if (timeout == -1) > > timeout = virKeepAliveTimeout(client->keepalive); > > > > - fds[0].events = fds[0].revents = 0; > > - fds[1].events = fds[1].revents = 0; > > - > > - fds[1].events = POLLIN; > > - > > /* Calculate poll events for calls */ > > virNetClientCallMatchPredicate(client->waitDispatch, > > virNetClientIOEventLoopPollEvents, > > - &fds[0]); > > + &ev); > > > > /* We have to be prepared to receive stream data > > * regardless of whether any of the calls waiting > > * for dispatch are for streams. > > */ > > if (client->nstreams) > > - fds[0].events |= POLLIN; > > + ev |= G_IO_IN; > > + > > + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), > > + ev, > > + client->eventCtx, > > + virNetClientIOEventFD, &rev, NULL); > > > > /* Release lock while poll'ing so other threads > > * can stuff themselves on the queue */ > > @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, > > sigaddset(&blockedsigs, SIGCHLD); > > # endif > > sigaddset(&blockedsigs, SIGPIPE); > > + > > ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); > > #endif /* !WIN32 */ > > > > - repoll: > > - ret = poll(fds, G_N_ELEMENTS(fds), timeout); > > - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) > > - goto repoll; > > + while (!rev) > > + g_main_context_iteration(client->eventCtx, TRUE); > > Is there a reason why we don't use g_main_loop_run() here and use > g_main_loop_quit() in virNetClientIOEventFD() the same way we use it > in virNetClientIOEventTLSConfirm() ? > > If I'm looking at the code correctly the call to g_main_loop_quit() from > virNetClientIO() where we want to force other threads from poll would be > ignored by the g_main_context_iteration(). This would be a change in > behavior from the old core where the write to "client->wakeupSendFD" > would make the poll() function wake since it is listening on > "client->wakeupReadFD" as well. Yeah you are right here. Regards, Daniel -- |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :| |: https://libvirt.org -o- https://fstop138.berrange.com :| |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|