Hi Daniel, > -----Original Message----- > From: Daniel P. Berrangé [mailto:berrange@xxxxxxxxxx] > Sent: Friday, June 7, 2024 5:04 PM > To: Gonglei (Arei) <arei.gonglei@xxxxxxxxxx> > Cc: qemu-devel@xxxxxxxxxx; peterx@xxxxxxxxxx; yu.zhang@xxxxxxxxx; > mgalaxy@xxxxxxxxxx; elmar.gerdes@xxxxxxxxx; zhengchuan > <zhengchuan@xxxxxxxxxx>; armbru@xxxxxxxxxx; lizhijian@xxxxxxxxxxx; > pbonzini@xxxxxxxxxx; mst@xxxxxxxxxx; Xiexiangyou > <xiexiangyou@xxxxxxxxxx>; linux-rdma@xxxxxxxxxxxxxxx; lixiao (H) > <lixiao91@xxxxxxxxxx>; jinpu.wang@xxxxxxxxx; Wangjialin > <wangjialin23@xxxxxxxxxx> > Subject: Re: [PATCH 3/6] io/channel-rdma: support working in coroutine > > On Tue, Jun 04, 2024 at 08:14:09PM +0800, Gonglei wrote: > > From: Jialin Wang <wangjialin23@xxxxxxxxxx> > > > > It is not feasible to obtain RDMA completion queue notifications > > through poll/ppoll on the rsocket fd. Therefore, we create a thread > > named rpoller for each rsocket fd and two eventfds: pollin_eventfd and > > pollout_eventfd. > > > > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN > > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd > > and pollout_eventfd instead of the rsocket fd. > > > > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT > > events. > > When a POLLIN event occurs, the rpoller write the pollin_eventfd, and > > then poll/ppoll will return the POLLIN event. > > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and > > then poll/ppoll will return the POLLOUT event. > > > > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will > > read/write the pollin/pollout_eventfd, preventing poll/ppoll from > > returning POLLIN/POLLOUT events. > > > > Known limitations: > > > > For a blocking rsocket fd, if we use io_create_watch to wait for > > POLLIN or POLLOUT events, since the rsocket fd is blocking, we > > cannot determine when it is not ready to read/write as we can with > > non-blocking fds. Therefore, when an event occurs, it will occurs > > always, potentially leave the qemu hanging. So we need be cautious > > to avoid hanging when using io_create_watch . > > > > Luckily, channel-rdma works well in coroutines :) > > > > Signed-off-by: Jialin Wang <wangjialin23@xxxxxxxxxx> > > Signed-off-by: Gonglei <arei.gonglei@xxxxxxxxxx> > > --- > > include/io/channel-rdma.h | 15 +- > > io/channel-rdma.c | 363 > +++++++++++++++++++++++++++++++++++++- > > 2 files changed, 376 insertions(+), 2 deletions(-) > > > > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h > > index 8cab2459e5..cb56127d76 100644 > > --- a/include/io/channel-rdma.h > > +++ b/include/io/channel-rdma.h > > @@ -47,6 +47,18 @@ struct QIOChannelRDMA { > > socklen_t localAddrLen; > > struct sockaddr_storage remoteAddr; > > socklen_t remoteAddrLen; > > + > > + /* private */ > > + > > + /* qemu g_poll/ppoll() POLLIN event on it */ > > + int pollin_eventfd; > > + /* qemu g_poll/ppoll() POLLOUT event on it */ > > + int pollout_eventfd; > > + > > + /* the index in the rpoller's fds array */ > > + int index; > > + /* rpoller will rpoll() rpoll_events on the rsocket fd */ > > + short int rpoll_events; > > }; > > > > /** > > @@ -147,6 +159,7 @@ void > qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress > *addr, > > * > > * Returns: the new client channel, or NULL on error > > */ > > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, > Error > > **errp); > > +QIOChannelRDMA *coroutine_mixed_fn > qio_channel_rdma_accept(QIOChannelRDMA *ioc, > > + > Error > > +**errp); > > > > #endif /* QIO_CHANNEL_RDMA_H */ > > diff --git a/io/channel-rdma.c b/io/channel-rdma.c index > > 92c362df52..9792add5cf 100644 > > --- a/io/channel-rdma.c > > +++ b/io/channel-rdma.c > > @@ -23,10 +23,15 @@ > > > > #include "qemu/osdep.h" > > #include "io/channel-rdma.h" > > +#include "io/channel-util.h" > > +#include "io/channel-watch.h" > > #include "io/channel.h" > > #include "qapi/clone-visitor.h" > > #include "qapi/error.h" > > #include "qapi/qapi-visit-sockets.h" > > +#include "qemu/atomic.h" > > +#include "qemu/error-report.h" > > +#include "qemu/thread.h" > > #include "trace.h" > > #include <errno.h> > > #include <netdb.h> > > @@ -39,11 +44,274 @@ > > #include <sys/poll.h> > > #include <unistd.h> > > > > +typedef enum { > > + CLEAR_POLLIN, > > + CLEAR_POLLOUT, > > + SET_POLLIN, > > + SET_POLLOUT, > > +} UpdateEvent; > > + > > +typedef enum { > > + RP_CMD_ADD_IOC, > > + RP_CMD_DEL_IOC, > > + RP_CMD_UPDATE, > > +} RpollerCMD; > > + > > +typedef struct { > > + RpollerCMD cmd; > > + QIOChannelRDMA *rioc; > > +} RpollerMsg; > > + > > +/* > > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT > > +event > > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to > > +allow > > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event */ static struct > > +Rpoller { > > + QemuThread thread; > > + bool is_running; > > + int sock[2]; > > + int count; /* the number of rsocket fds being rpoll() */ > > + int size; /* the size of fds/riocs */ > > + struct pollfd *fds; > > + QIOChannelRDMA **riocs; > > +} rpoller; > > + > > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc, > > + RpollerCMD cmd) { > > + RpollerMsg msg; > > + int ret; > > + > > + msg.cmd = cmd; > > + msg.rioc = rioc; > > + > > + ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg)); > > So this message is handled asynchronously by the poll thread, but you're not > acquiring any reference on teh 'rioc' object. So there's the possibility that the > owner of the rioc calls 'unref' free'ing the last reference, before the poll thread > has finished processing the message. IMHO the poll thread must hold a > reference on the rioc for as long as it needs the object. > Yes. You're right. > > + if (ret != sizeof msg) { > > + error_report("%s: failed to send msg, errno: %d", __func__, > errno); > > + } > > I feel like this should be propagated to the caller via an Error **errp parameter. > OK. > > +} > > + > > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc, > > + UpdateEvent > action, > > + bool notify_rpoller) > { > > + /* An eventfd with the value of ULLONG_MAX - 1 is readable but > unwritable */ > > + unsigned long long buf = ULLONG_MAX - 1; > > + > > + switch (action) { > > + /* only rpoller do SET_* action, to allow qemu ppoll() get the event */ > > + case SET_POLLIN: > > + RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf)); > > + rioc->rpoll_events &= ~POLLIN; > > + break; > > + case SET_POLLOUT: > > + RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf)); > > + rioc->rpoll_events &= ~POLLOUT; > > + break; > > + > > + /* the rsocket fd is not ready to rread/rwrite */ > > + case CLEAR_POLLIN: > > + RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf)); > > + rioc->rpoll_events |= POLLIN; > > + break; > > + case CLEAR_POLLOUT: > > + RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf)); > > + rioc->rpoll_events |= POLLOUT; > > + break; > > + default: > > + break; > > + } > > + > > + /* notify rpoller to rpoll() POLLIN/POLLOUT events */ > > + if (notify_rpoller) { > > + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE); > > + } > > +} > > + > > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) { > > + if (rioc->index != -1) { > > + error_report("%s: rioc already exsits", __func__); > > + return; > > + } > > + > > + rioc->index = ++rpoller.count; > > + > > + if (rpoller.count + 1 > rpoller.size) { > > + rpoller.size *= 2; > > + rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size); > > + rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs, > rpoller.size); > > + } > > + > > + rpoller.fds[rioc->index].fd = rioc->fd; > > + rpoller.fds[rioc->index].events = rioc->rpoll_events; > > + rpoller.riocs[rioc->index] = rioc; } > > + > > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) { > > + if (rioc->index == -1) { > > + error_report("%s: rioc not exsits", __func__); > > + return; > > + } > > + > > + rpoller.fds[rioc->index] = rpoller.fds[rpoller.count]; > > + rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count]; > > + rpoller.riocs[rioc->index]->index = rioc->index; > > + rpoller.count--; > > + > > + close(rioc->pollin_eventfd); > > + close(rioc->pollout_eventfd); > > + rioc->index = -1; > > + rioc->rpoll_events = 0; > > +} > > + > > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc) > > +{ > > + if (rioc->index == -1) { > > + error_report("%s: rioc not exsits", __func__); > > + return; > > + } > > + > > + rpoller.fds[rioc->index].fd = rioc->fd; > > + rpoller.fds[rioc->index].events = rioc->rpoll_events; } > > + > > +static void qio_channel_rdma_rpoller_process_msg(void) > > +{ > > + RpollerMsg msg; > > + int ret; > > + > > + ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg)); > > + if (ret != sizeof msg) { > > + error_report("%s: rpoller failed to recv msg: %s", __func__, > > + strerror(errno)); > > + return; > > + } > > + > > + switch (msg.cmd) { > > + case RP_CMD_ADD_IOC: > > + qio_channel_rdma_rpoller_add_rioc(msg.rioc); > > + break; > > + case RP_CMD_DEL_IOC: > > + qio_channel_rdma_rpoller_del_rioc(msg.rioc); > > + break; > > + case RP_CMD_UPDATE: > > + qio_channel_rdma_rpoller_update_ioc(msg.rioc); > > + break; > > + default: > > + break; > > + } > > +} > > + > > +static void qio_channel_rdma_rpoller_cleanup(void) > > +{ > > + close(rpoller.sock[0]); > > + close(rpoller.sock[1]); > > + rpoller.sock[0] = -1; > > + rpoller.sock[1] = -1; > > + g_free(rpoller.fds); > > + g_free(rpoller.riocs); > > + rpoller.fds = NULL; > > + rpoller.riocs = NULL; > > + rpoller.count = 0; > > + rpoller.size = 0; > > + rpoller.is_running = false; > > +} > > + > > +static void *qio_channel_rdma_rpoller_thread(void *opaque) { > > + int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL; > > + > > + do { > > + ret = rpoll(rpoller.fds, rpoller.count + 1, -1); > > + if (ret < 0 && errno != -EINTR) { > > + error_report("%s: rpoll() error: %s", __func__, > strerror(errno)); > > + break; > > + } > > + > > + for (i = 1; i <= rpoller.count; i++) { > > + if (rpoller.fds[i].revents & (POLLIN | error_events)) { > > + qio_channel_rdma_update_poll_event(rpoller.riocs[i], > SET_POLLIN, > > + false); > > + rpoller.fds[i].events &= ~POLLIN; > > + } > > + if (rpoller.fds[i].revents & (POLLOUT | error_events)) { > > + qio_channel_rdma_update_poll_event(rpoller.riocs[i], > > + > SET_POLLOUT, false); > > + rpoller.fds[i].events &= ~POLLOUT; > > + } > > + /* ignore this fd */ > > + if (rpoller.fds[i].revents & (error_events)) { > > + rpoller.fds[i].fd = -1; > > + } > > + } > > + > > + if (rpoller.fds[0].revents) { > > + qio_channel_rdma_rpoller_process_msg(); > > + } > > + } while (rpoller.count >= 1); > > + > > + qio_channel_rdma_rpoller_cleanup(); > > + > > + return NULL; > > +} > > + > > +static void qio_channel_rdma_rpoller_start(void) > > +{ > > + if (qatomic_xchg(&rpoller.is_running, true)) { > > + return; > > + } > > + > > + if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) { > > + rpoller.is_running = false; > > + error_report("%s: failed to create socketpair %s", __func__, > > + strerror(errno)); > > + return; > > + } > > + > > + rpoller.count = 0; > > + rpoller.size = 4; > > + rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd)); > > + rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *)); > > + rpoller.fds[0].fd = rpoller.sock[1]; > > + rpoller.fds[0].events = POLLIN; > > + > > + qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller", > > + qio_channel_rdma_rpoller_thread, NULL, > > + QEMU_THREAD_JOINABLE); } > > + > > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA > > +*rioc) { > > + int flags = EFD_CLOEXEC | EFD_NONBLOCK; > > + > > + /* > > + * A single eventfd is either readable or writable. A single eventfd > cannot > > + * represent a state where it is neither readable nor writable. so use > two > > + * eventfds here. > > + */ > > + rioc->pollin_eventfd = eventfd(0, flags); > > + rioc->pollout_eventfd = eventfd(0, flags); > > + /* pollout_eventfd with the value 0, means writable, make it > unwritable */ > > + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false); > > + > > + /* tell the rpoller to rpoll() events on rioc->socketfd */ > > + rioc->rpoll_events = POLLIN | POLLOUT; > > + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); } > > + > > QIOChannelRDMA *qio_channel_rdma_new(void) { > > QIOChannelRDMA *rioc; > > QIOChannel *ioc; > > > > + qio_channel_rdma_rpoller_start(); > > + if (!rpoller.is_running) { > > + return NULL; > > + } > > + > > rioc = > QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); > > ioc = QIO_CHANNEL(rioc); > > qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); > @@ > > -125,6 +393,8 @@ retry: > > goto out; > > } > > > > + qio_channel_rdma_add_rioc_to_rpoller(rioc); > > + > > out: > > if (ret) { > > trace_qio_channel_rdma_connect_fail(rioc); > > @@ -211,6 +481,8 @@ int > qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress > *addr, > > qio_channel_set_feature(QIO_CHANNEL(rioc), > QIO_CHANNEL_FEATURE_LISTEN); > > trace_qio_channel_rdma_listen_complete(rioc, fd); > > > > + qio_channel_rdma_add_rioc_to_rpoller(rioc); > > + > > out: > > if (ret) { > > trace_qio_channel_rdma_listen_fail(rioc); > > @@ -267,8 +539,10 @@ void > qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress > *addr, > > qio_channel_listen_worker_free, > context); > > } > > > > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, > Error > > **errp) > > +QIOChannelRDMA *coroutine_mixed_fn > qio_channel_rdma_accept(QIOChannelRDMA *rioc, > > + > Error > > +**errp) > > { > > + QIOChannel *ioc = QIO_CHANNEL(rioc); > > QIOChannelRDMA *cioc; > > > > cioc = qio_channel_rdma_new(); > > @@ -283,6 +557,17 @@ retry: > > if (errno == EINTR) { > > goto retry; > > } > > + if (errno == EAGAIN) { > > + if (!(rioc->rpoll_events & POLLIN)) { > > + qio_channel_rdma_update_poll_event(rioc, > CLEAR_POLLIN, true); > > + } > > + if (qemu_in_coroutine()) { > > + qio_channel_yield(ioc, G_IO_IN); > > + } else { > > + qio_channel_wait(ioc, G_IO_IN); > > + } > > + goto retry; > > + } > > error_setg_errno(errp, errno, "Unable to accept connection"); > > goto error; > > } > > @@ -294,6 +579,8 @@ retry: > > goto error; > > } > > > > + qio_channel_rdma_add_rioc_to_rpoller(cioc); > > + > > trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd); > > return cioc; > > > > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj) { > > QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj); > > ioc->fd = -1; > > + ioc->pollin_eventfd = -1; > > + ioc->pollout_eventfd = -1; > > + ioc->index = -1; > > + ioc->rpoll_events = 0; > > } > > > > static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7 > > @@ static void qio_channel_rdma_finalize(Object *obj) > > QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj); > > > > if (ioc->fd != -1) { > > + qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC); > > This is unsafe. > > When finalize runs, the object has dropped its last reference and is about to be > free()d. The notify_rpoller() method, however, sends an async message to the > poll thread, which the poll thread will end up processing after the rioc is free()d. > ie a use-after-free. > > If you take my earlier suggestion that the poll thread should hold its own > reference on the ioc, then it becomes impossible for the rioc to be freed while > there is still an active I/O watch, and thus this call can go away, and so will the > use after free. > Yes, will fixed in the next version. Regards, -Gonglei > > rclose(ioc->fd); > > ioc->fd = -1; > > } > > With regards, > Daniel > -- > |: https://berrange.com -o- > https://www.flickr.com/photos/dberrange :| > |: https://libvirt.org -o- > https://fstop138.berrange.com :| > |: https://entangle-photo.org -o- > https://www.instagram.com/dberrange :| >