On Tue, Jun 4, 2024 at 2:14 PM Gonglei <arei.gonglei@xxxxxxxxxx> wrote: > > From: Jialin Wang <wangjialin23@xxxxxxxxxx> > > It is not feasible to obtain RDMA completion queue notifications > through poll/ppoll on the rsocket fd. Therefore, we create a thread > named rpoller for each rsocket fd and two eventfds: pollin_eventfd > and pollout_eventfd. > > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd > and pollout_eventfd instead of the rsocket fd. > > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT > events. > When a POLLIN event occurs, the rpoller write the pollin_eventfd, > and then poll/ppoll will return the POLLIN event. > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, > and then poll/ppoll will return the POLLOUT event. > > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will > read/write the pollin/pollout_eventfd, preventing poll/ppoll from > returning POLLIN/POLLOUT events. > > Known limitations: > > For a blocking rsocket fd, if we use io_create_watch to wait for > POLLIN or POLLOUT events, since the rsocket fd is blocking, we > cannot determine when it is not ready to read/write as we can with > non-blocking fds. Therefore, when an event occurs, it will occurs > always, potentially leave the qemu hanging. So we need be cautious > to avoid hanging when using io_create_watch . > > Luckily, channel-rdma works well in coroutines :) > > Signed-off-by: Jialin Wang <wangjialin23@xxxxxxxxxx> > Signed-off-by: Gonglei <arei.gonglei@xxxxxxxxxx> > --- > include/io/channel-rdma.h | 15 +- > io/channel-rdma.c | 363 +++++++++++++++++++++++++++++++++++++- > 2 files changed, 376 insertions(+), 2 deletions(-) > > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h > index 8cab2459e5..cb56127d76 100644 > --- a/include/io/channel-rdma.h > +++ b/include/io/channel-rdma.h > @@ -47,6 +47,18 @@ struct QIOChannelRDMA { > socklen_t localAddrLen; > struct sockaddr_storage remoteAddr; > socklen_t remoteAddrLen; > + > + /* private */ > + > + /* qemu g_poll/ppoll() POLLIN event on it */ > + int pollin_eventfd; > + /* qemu g_poll/ppoll() POLLOUT event on it */ > + int pollout_eventfd; > + > + /* the index in the rpoller's fds array */ > + int index; > + /* rpoller will rpoll() rpoll_events on the rsocket fd */ > + short int rpoll_events; > }; > > /** > @@ -147,6 +159,7 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr, > * > * Returns: the new client channel, or NULL on error > */ > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, Error **errp); > +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *ioc, > + Error **errp); > > #endif /* QIO_CHANNEL_RDMA_H */ > diff --git a/io/channel-rdma.c b/io/channel-rdma.c > index 92c362df52..9792add5cf 100644 > --- a/io/channel-rdma.c > +++ b/io/channel-rdma.c > @@ -23,10 +23,15 @@ > > #include "qemu/osdep.h" > #include "io/channel-rdma.h" > +#include "io/channel-util.h" > +#include "io/channel-watch.h" > #include "io/channel.h" > #include "qapi/clone-visitor.h" > #include "qapi/error.h" > #include "qapi/qapi-visit-sockets.h" > +#include "qemu/atomic.h" > +#include "qemu/error-report.h" > +#include "qemu/thread.h" > #include "trace.h" > #include <errno.h> > #include <netdb.h> > @@ -39,11 +44,274 @@ > #include <sys/poll.h> > #include <unistd.h> > > +typedef enum { > + CLEAR_POLLIN, > + CLEAR_POLLOUT, > + SET_POLLIN, > + SET_POLLOUT, > +} UpdateEvent; > + > +typedef enum { > + RP_CMD_ADD_IOC, > + RP_CMD_DEL_IOC, > + RP_CMD_UPDATE, > +} RpollerCMD; > + > +typedef struct { > + RpollerCMD cmd; > + QIOChannelRDMA *rioc; > +} RpollerMsg; > + > +/* > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT event > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to allow > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event > + */ > +static struct Rpoller { > + QemuThread thread; > + bool is_running; > + int sock[2]; > + int count; /* the number of rsocket fds being rpoll() */ > + int size; /* the size of fds/riocs */ > + struct pollfd *fds; > + QIOChannelRDMA **riocs; > +} rpoller; > + > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc, > + RpollerCMD cmd) > +{ > + RpollerMsg msg; > + int ret; > + > + msg.cmd = cmd; > + msg.rioc = rioc; > + > + ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg)); > + if (ret != sizeof msg) { > + error_report("%s: failed to send msg, errno: %d", __func__, errno); > + } > +} > + > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc, > + UpdateEvent action, > + bool notify_rpoller) > +{ > + /* An eventfd with the value of ULLONG_MAX - 1 is readable but unwritable */ > + unsigned long long buf = ULLONG_MAX - 1; > + > + switch (action) { > + /* only rpoller do SET_* action, to allow qemu ppoll() get the event */ > + case SET_POLLIN: > + RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf)); > + rioc->rpoll_events &= ~POLLIN; > + break; > + case SET_POLLOUT: > + RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf)); > + rioc->rpoll_events &= ~POLLOUT; > + break; > + > + /* the rsocket fd is not ready to rread/rwrite */ > + case CLEAR_POLLIN: > + RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf)); > + rioc->rpoll_events |= POLLIN; > + break; > + case CLEAR_POLLOUT: > + RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf)); > + rioc->rpoll_events |= POLLOUT; > + break; > + default: > + break; > + } > + > + /* notify rpoller to rpoll() POLLIN/POLLOUT events */ > + if (notify_rpoller) { > + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE); > + } > +} > + > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) > +{ > + if (rioc->index != -1) { > + error_report("%s: rioc already exsits", __func__); > + return; > + } > + > + rioc->index = ++rpoller.count; > + > + if (rpoller.count + 1 > rpoller.size) { > + rpoller.size *= 2; > + rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size); > + rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs, rpoller.size); > + } > + > + rpoller.fds[rioc->index].fd = rioc->fd; > + rpoller.fds[rioc->index].events = rioc->rpoll_events; The allotment of rioc fds and events to rpoller slots are sequential, but making the deletion also sequentials means that the del_rioc needs to be called in the exact opposite sequence as they were added (through add_rioc). Otherwise we leaves holes in between, and readditions might step on an already used slot. Does this setup make sure that the above restriction is satisfied, or am I missing something? > + rpoller.riocs[rioc->index] = rioc; > +} > + > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) > +{ > + if (rioc->index == -1) { > + error_report("%s: rioc not exsits", __func__); > + return; > + } > + > + rpoller.fds[rioc->index] = rpoller.fds[rpoller.count]; Should this be rpoller.count-1? > + rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count]; > + rpoller.riocs[rioc->index]->index = rioc->index; > + rpoller.count--; > + > + close(rioc->pollin_eventfd); > + close(rioc->pollout_eventfd); > + rioc->index = -1; > + rioc->rpoll_events = 0; > +} > + > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc) > +{ > + if (rioc->index == -1) { > + error_report("%s: rioc not exsits", __func__); > + return; > + } > + > + rpoller.fds[rioc->index].fd = rioc->fd; > + rpoller.fds[rioc->index].events = rioc->rpoll_events; > +} > + > +static void qio_channel_rdma_rpoller_process_msg(void) > +{ > + RpollerMsg msg; > + int ret; > + > + ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg)); > + if (ret != sizeof msg) { > + error_report("%s: rpoller failed to recv msg: %s", __func__, > + strerror(errno)); > + return; > + } > + > + switch (msg.cmd) { > + case RP_CMD_ADD_IOC: > + qio_channel_rdma_rpoller_add_rioc(msg.rioc); > + break; > + case RP_CMD_DEL_IOC: > + qio_channel_rdma_rpoller_del_rioc(msg.rioc); > + break; > + case RP_CMD_UPDATE: > + qio_channel_rdma_rpoller_update_ioc(msg.rioc); > + break; > + default: > + break; > + } > +} > + > +static void qio_channel_rdma_rpoller_cleanup(void) > +{ > + close(rpoller.sock[0]); > + close(rpoller.sock[1]); > + rpoller.sock[0] = -1; > + rpoller.sock[1] = -1; > + g_free(rpoller.fds); > + g_free(rpoller.riocs); > + rpoller.fds = NULL; > + rpoller.riocs = NULL; > + rpoller.count = 0; > + rpoller.size = 0; > + rpoller.is_running = false; > +} > + > +static void *qio_channel_rdma_rpoller_thread(void *opaque) > +{ > + int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL; > + > + do { > + ret = rpoll(rpoller.fds, rpoller.count + 1, -1); > + if (ret < 0 && errno != -EINTR) { > + error_report("%s: rpoll() error: %s", __func__, strerror(errno)); > + break; > + } > + > + for (i = 1; i <= rpoller.count; i++) { > + if (rpoller.fds[i].revents & (POLLIN | error_events)) { > + qio_channel_rdma_update_poll_event(rpoller.riocs[i], SET_POLLIN, > + false); > + rpoller.fds[i].events &= ~POLLIN; > + } > + if (rpoller.fds[i].revents & (POLLOUT | error_events)) { > + qio_channel_rdma_update_poll_event(rpoller.riocs[i], > + SET_POLLOUT, false); > + rpoller.fds[i].events &= ~POLLOUT; > + } > + /* ignore this fd */ > + if (rpoller.fds[i].revents & (error_events)) { > + rpoller.fds[i].fd = -1; > + } > + } > + > + if (rpoller.fds[0].revents) { > + qio_channel_rdma_rpoller_process_msg(); > + } > + } while (rpoller.count >= 1); > + > + qio_channel_rdma_rpoller_cleanup(); > + > + return NULL; > +} > + > +static void qio_channel_rdma_rpoller_start(void) > +{ > + if (qatomic_xchg(&rpoller.is_running, true)) { > + return; > + } > + > + if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) { > + rpoller.is_running = false; > + error_report("%s: failed to create socketpair %s", __func__, > + strerror(errno)); > + return; > + } > + > + rpoller.count = 0; > + rpoller.size = 4; > + rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd)); > + rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *)); > + rpoller.fds[0].fd = rpoller.sock[1]; > + rpoller.fds[0].events = POLLIN; > + > + qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller", > + qio_channel_rdma_rpoller_thread, NULL, > + QEMU_THREAD_JOINABLE); > +} > + > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA *rioc) > +{ > + int flags = EFD_CLOEXEC | EFD_NONBLOCK; > + > + /* > + * A single eventfd is either readable or writable. A single eventfd cannot > + * represent a state where it is neither readable nor writable. so use two > + * eventfds here. > + */ > + rioc->pollin_eventfd = eventfd(0, flags); > + rioc->pollout_eventfd = eventfd(0, flags); > + /* pollout_eventfd with the value 0, means writable, make it unwritable */ > + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false); > + > + /* tell the rpoller to rpoll() events on rioc->socketfd */ > + rioc->rpoll_events = POLLIN | POLLOUT; > + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); > +} > + > QIOChannelRDMA *qio_channel_rdma_new(void) > { > QIOChannelRDMA *rioc; > QIOChannel *ioc; > > + qio_channel_rdma_rpoller_start(); > + if (!rpoller.is_running) { > + return NULL; > + } > + > rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); > ioc = QIO_CHANNEL(rioc); > qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); > @@ -125,6 +393,8 @@ retry: > goto out; > } > > + qio_channel_rdma_add_rioc_to_rpoller(rioc); > + > out: > if (ret) { > trace_qio_channel_rdma_connect_fail(rioc); > @@ -211,6 +481,8 @@ int qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress *addr, > qio_channel_set_feature(QIO_CHANNEL(rioc), QIO_CHANNEL_FEATURE_LISTEN); > trace_qio_channel_rdma_listen_complete(rioc, fd); > > + qio_channel_rdma_add_rioc_to_rpoller(rioc); > + > out: > if (ret) { > trace_qio_channel_rdma_listen_fail(rioc); > @@ -267,8 +539,10 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr, > qio_channel_listen_worker_free, context); > } > > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, Error **errp) > +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *rioc, > + Error **errp) > { > + QIOChannel *ioc = QIO_CHANNEL(rioc); > QIOChannelRDMA *cioc; > > cioc = qio_channel_rdma_new(); > @@ -283,6 +557,17 @@ retry: > if (errno == EINTR) { > goto retry; > } > + if (errno == EAGAIN) { > + if (!(rioc->rpoll_events & POLLIN)) { > + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true); > + } > + if (qemu_in_coroutine()) { > + qio_channel_yield(ioc, G_IO_IN); > + } else { > + qio_channel_wait(ioc, G_IO_IN); > + } > + goto retry; > + } > error_setg_errno(errp, errno, "Unable to accept connection"); > goto error; > } > @@ -294,6 +579,8 @@ retry: > goto error; > } > > + qio_channel_rdma_add_rioc_to_rpoller(cioc); > + > trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd); > return cioc; > > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj) > { > QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj); > ioc->fd = -1; > + ioc->pollin_eventfd = -1; > + ioc->pollout_eventfd = -1; > + ioc->index = -1; > + ioc->rpoll_events = 0; > } > > static void qio_channel_rdma_finalize(Object *obj) > @@ -314,6 +605,7 @@ static void qio_channel_rdma_finalize(Object *obj) > QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj); > > if (ioc->fd != -1) { > + qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC); > rclose(ioc->fd); > ioc->fd = -1; > } > @@ -330,6 +622,12 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, const struct iovec *iov, > retry: > ret = rreadv(rioc->fd, iov, niov); > if (ret < 0) { > + if (errno == EAGAIN) { > + if (!(rioc->rpoll_events & POLLIN)) { > + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true); > + } > + return QIO_CHANNEL_ERR_BLOCK; > + } > if (errno == EINTR) { > goto retry; > } > @@ -351,6 +649,12 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, const struct iovec *iov, > retry: > ret = rwritev(rioc->fd, iov, niov); > if (ret <= 0) { > + if (errno == EAGAIN) { > + if (!(rioc->rpoll_events & POLLOUT)) { > + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, true); > + } > + return QIO_CHANNEL_ERR_BLOCK; > + } > if (errno == EINTR) { > goto retry; > } > @@ -361,6 +665,28 @@ retry: > return ret; > } > > +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled, > + Error **errp G_GNUC_UNUSED) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + int flags, ret; > + > + flags = rfcntl(rioc->fd, F_GETFL); > + if (enabled) { > + flags &= ~O_NONBLOCK; > + } else { > + flags |= O_NONBLOCK; > + } > + > + ret = rfcntl(rioc->fd, F_SETFL, flags); > + if (ret) { > + error_setg_errno(errp, errno, > + "Unable to rfcntl rsocket fd with flags %d", flags); > + } > + > + return ret; > +} > + > static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled) > { > QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > @@ -374,6 +700,7 @@ static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp) > QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > > if (rioc->fd != -1) { > + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC); > rclose(rioc->fd); > rioc->fd = -1; > } > @@ -408,6 +735,37 @@ static int qio_channel_rdma_shutdown(QIOChannel *ioc, QIOChannelShutdown how, > return 0; > } > > +static void > +qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext *read_ctx, > + IOHandler *io_read, AioContext *write_ctx, > + IOHandler *io_write, void *opaque) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + > + qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx, io_read, > + rioc->pollout_eventfd, write_ctx, > + io_write, opaque); > +} > + > +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, > + GIOCondition condition) > +{ > + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); > + > + switch (condition) { > + case G_IO_IN: > + return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd, > + condition); > + case G_IO_OUT: > + return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd, > + condition); > + default: > + error_report("%s: do not support watch 0x%x event", __func__, > + condition); > + return NULL; > + } > +} > + > static void qio_channel_rdma_class_init(ObjectClass *klass, > void *class_data G_GNUC_UNUSED) > { > @@ -415,9 +773,12 @@ static void qio_channel_rdma_class_init(ObjectClass *klass, > > ioc_klass->io_writev = qio_channel_rdma_writev; > ioc_klass->io_readv = qio_channel_rdma_readv; > + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; > ioc_klass->io_close = qio_channel_rdma_close; > ioc_klass->io_shutdown = qio_channel_rdma_shutdown; > ioc_klass->io_set_delay = qio_channel_rdma_set_delay; > + ioc_klass->io_create_watch = qio_channel_rdma_create_watch; > + ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; > } > > static const TypeInfo qio_channel_rdma_info = { > -- > 2.43.0 > >