RDMA MAD kernel module (ibcm) disallow more than one MAD-agent for a given MAD class. This does not go hand-by-hand with qemu pvrdma device's requirements where each VM is MAD agent. Fix it by adding implementation of RDMA MAD multiplexer service which on one hand register as a sole MAD agent with the kernel module and on the other hand gives service to more than one VM. Design Overview: ---------------- A server process is registered to UMAD framework (for this to work the rdma_cm kernel module needs to be unloaded) and creates a unix socket to listen to incoming request from clients. A client process (such as QEMU) connects to this unix socket and registers with its own GID. TX: --- When client needs to send rdma_cm MAD message it construct it the same way as without this multiplexer, i.e. creates a umad packet but this time it writes its content to the socket instead of calling umad_send(). The server, upon receiving such a message fetch local_comm_id from it so a context for this session can be maintain and relay the message to UMAD layer by calling umad_send(). RX: --- The server creates a worker thread to process incoming rdma_cm MAD messages. When an incoming message arrived (umad_recv()) the server, depending on the message type (attr_id) looks for target client by either searching in gid->fd table or in local_comm_id->fd table. With the extracted fd the server relays to incoming message to the client. Signed-off-by: Yuval Shaia <yuval.shaia@xxxxxxxxxx> --- Makefile | 3 + Makefile.objs | 1 + contrib/rdmacm-mux/Makefile.objs | 3 + contrib/rdmacm-mux/main.c | 680 +++++++++++++++++++++++++++++++ 4 files changed, 687 insertions(+) create mode 100644 contrib/rdmacm-mux/Makefile.objs create mode 100644 contrib/rdmacm-mux/main.c diff --git a/Makefile b/Makefile index 2da686be33..9ef307ba6e 100644 --- a/Makefile +++ b/Makefile @@ -416,6 +416,7 @@ dummy := $(call unnest-vars,, \ qga-obj-y \ ivshmem-client-obj-y \ ivshmem-server-obj-y \ + rdmacm-mux-obj-y \ libvhost-user-obj-y \ vhost-user-scsi-obj-y \ vhost-user-blk-obj-y \ @@ -717,6 +718,8 @@ vhost-user-scsi$(EXESUF): $(vhost-user-scsi-obj-y) libvhost-user.a $(call LINK, $^) vhost-user-blk$(EXESUF): $(vhost-user-blk-obj-y) libvhost-user.a $(call LINK, $^) +rdmacm-mux$(EXESUF): $(rdmacm-mux-obj-y) $(COMMON_LDADDS) + $(call LINK, $^) module_block.h: $(SRC_PATH)/scripts/modules/module_block.py config-host.mak $(call quiet-command,$(PYTHON) $< $@ \ diff --git a/Makefile.objs b/Makefile.objs index 7a9828da28..8a7b6fc7b6 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -193,6 +193,7 @@ vhost-user-scsi.o-cflags := $(LIBISCSI_CFLAGS) vhost-user-scsi.o-libs := $(LIBISCSI_LIBS) vhost-user-scsi-obj-y = contrib/vhost-user-scsi/ vhost-user-blk-obj-y = contrib/vhost-user-blk/ +rdmacm-mux-obj-y = contrib/rdmacm-mux/ ###################################################################### trace-events-subdirs = diff --git a/contrib/rdmacm-mux/Makefile.objs b/contrib/rdmacm-mux/Makefile.objs new file mode 100644 index 0000000000..416288fc36 --- /dev/null +++ b/contrib/rdmacm-mux/Makefile.objs @@ -0,0 +1,3 @@ +CFLAGS += -libumad -Wno-format-truncation +rdmacm-mux-obj-y = main.o + diff --git a/contrib/rdmacm-mux/main.c b/contrib/rdmacm-mux/main.c new file mode 100644 index 0000000000..cba9f48b00 --- /dev/null +++ b/contrib/rdmacm-mux/main.c @@ -0,0 +1,680 @@ +#include "qemu/osdep.h" +#include "sys/poll.h" +#include "sys/ioctl.h" +#include "pthread.h" +#include "syslog.h" + +#include "infiniband/verbs.h" +#include "infiniband/umad.h" +#include "infiniband/umad_types.h" + +#define SCALE_US 1000 +#define COMMID_TTL 2 /* How many SCALE_US a context of MAD session is saved */ +#define SLEEP_SECS 5 /* This is used both in poll() and thread */ +#define SERVER_LISTEN_BACKLOG 10 +#define MAX_CLIENTS 4096 +#define MAD_BUF_SIZE 256 +#define MAD_MGMT_CLASS 0x7 +#define MAD_MGMT_VERSION 2 +#define MAD_RMPP_VERSION 0 +#define MAD_METHOD_MASK0 0x8 + +#define CM_REQ_ATTR_ID 0x0010 +#define CM_MRA_ATTR_ID 0x0011 +#define CM_REJ_ATTR_ID 0x0012 +#define CM_REP_ATTR_ID 0x0013 +#define CM_RTU_ATTR_ID 0x0014 +#define CM_DREQ_ATTR_ID 0x0015 +#define CM_DREP_ATTR_ID 0x0016 +#define CM_SIDR_REQ_ATTR_ID 0x0017 +#define CM_SIDR_REP_ATTR_ID 0x0018 +#define CM_LAP_ATTR_ID 0x0019 +#define CM_APR_ATTR_ID 0x001A + +#define CM_REQ_DGID_POS 80 +#define CM_SIDR_REQ_DGID_POS 44 + +/* The below can be override by command line parameter */ +#define UNIX_SOCKET_PATH "/var/run/rdmacm-mux" +#define RDMA_DEVICE "rxe0" +#define RDMA_PORT_NUM 1 + +typedef struct RdmaCmServerArgs { + char unix_socket_path[PATH_MAX]; + char rdma_dev_name[NAME_MAX]; + int rdma_port_num; +} RdmaCMServerArgs; + +typedef struct CommId2FdEntry { + int fd; + int ttl; /* Initialized to 2, decrement each timeout, entry delete when 0 */ +} CommId2FdEntry; + +typedef struct RdmaCmUMadAgent { + int port_id; + int agent_id; + GHashTable *gid2fd; /* Used to find fd of a given gid */ + GHashTable *fd2gid; /* Used to find gid of a fd before it is closed */ + GHashTable *commid2fd; /* Used to find fd on of a given comm_id */ +} RdmaCmUMadAgent; + +typedef struct RdmaCmServer { + bool run; + RdmaCMServerArgs args; + struct pollfd fds[MAX_CLIENTS]; + int nfds; + RdmaCmUMadAgent umad_agent; + pthread_t umad_recv_thread; + pthread_rwlock_t lock; +} RdmaCMServer; + +typedef struct RdmaCmUMad { + struct ib_user_mad hdr; + char mad[MAD_BUF_SIZE]; +} RdmaCmUMad; + +RdmaCMServer server = {0}; + +void signal_handler(int signo); + +static void usage(const char *progname) +{ + printf("Usage: %s [OPTION]...\n" + "\t-h Show this help\n" + "\t-s unix-socket-path Path to unix socket to listen to (default %s)\n" + "\t-d rdma-device-name Name of RDMA device to register with (default %s)\n" + "\t-p rdma-device-port Port number of RDMA device to register with (default %d)\n", + progname, UNIX_SOCKET_PATH, RDMA_DEVICE, RDMA_PORT_NUM); +} + +static void help(const char *progname) +{ + fprintf(stderr, "Try '%s -h' for more information.\n", progname); +} + +static void parse_args(int argc, char *argv[]) +{ + int c; + char unix_socket_path[PATH_MAX]; + + strcpy(unix_socket_path, UNIX_SOCKET_PATH); + strcpy(server.args.rdma_dev_name, RDMA_DEVICE); + server.args.rdma_port_num = RDMA_PORT_NUM; + + while ((c = getopt(argc, argv, "hs:d:p:")) != -1) { + switch (c) { + case 'h': + usage(argv[0]); + exit(0); + break; + + case 's': + /* This is temporary, final name will build below */ + strcpy(unix_socket_path, optarg); + break; + + case 'd': + strcpy(server.args.rdma_dev_name, optarg); + break; + + case 'p': + server.args.rdma_port_num = atoi(optarg); + break; + + default: + help(argv[0]); + exit(1); + break; + } + } + + /* Build unique unix-socket file name */ + snprintf(server.args.unix_socket_path, PATH_MAX, "%s-%s-%d", + unix_socket_path, server.args.rdma_dev_name, + server.args.rdma_port_num); + + syslog(LOG_INFO, "unix_socket_path=%s", server.args.unix_socket_path); + syslog(LOG_INFO, "rdma-device-name=%s", server.args.rdma_dev_name); + syslog(LOG_INFO, "rdma-device-port=%d", server.args.rdma_port_num); +} + +static void hash_tbl_alloc(void) +{ + + server.umad_agent.gid2fd = g_hash_table_new_full(g_int64_hash, + g_int64_equal, + g_free, g_free); + server.umad_agent.fd2gid = g_hash_table_new_full(g_int_hash, g_int_equal, + g_free, g_free); + server.umad_agent.commid2fd = g_hash_table_new_full(g_int_hash, + g_int_equal, + g_free, g_free); +} + +static void hash_tbl_free(void) +{ + if (server.umad_agent.commid2fd) { + g_hash_table_destroy(server.umad_agent.commid2fd); + } + if (server.umad_agent.fd2gid) { + g_hash_table_destroy(server.umad_agent.fd2gid); + } + if (server.umad_agent.gid2fd) { + g_hash_table_destroy(server.umad_agent.gid2fd); + } +} + +static int hash_tbl_search_fd_by_gid(uint64_t gid) +{ + int *fd; + + pthread_rwlock_rdlock(&server.lock); + fd = g_hash_table_lookup(server.umad_agent.gid2fd, &gid); + pthread_rwlock_unlock(&server.lock); + + if (!fd) { + syslog(LOG_WARNING, "Can't find matching for gid 0x%lx\n", gid); + return -ENOENT; + } + + return *fd; +} + +static uint64_t hash_tbl_search_gid(int fd) +{ + uint64_t *gid; + + pthread_rwlock_rdlock(&server.lock); + gid = g_hash_table_lookup(server.umad_agent.fd2gid, &fd); + pthread_rwlock_unlock(&server.lock); + + if (!gid) { + syslog(LOG_WARNING, "Can't find matching for fd %d\n", fd); + return 0; + } + + return *gid; +} + +static int hash_tbl_search_fd_by_comm_id(uint32_t comm_id) +{ + CommId2FdEntry *fde; + + pthread_rwlock_rdlock(&server.lock); + fde = g_hash_table_lookup(server.umad_agent.commid2fd, &comm_id); + pthread_rwlock_unlock(&server.lock); + + if (!fde) { + syslog(LOG_WARNING, "Can't find matching for comm_id 0x%x\n", comm_id); + return 0; + } + + return fde->fd; +} + +static void hash_tbl_save_fd_gid_pair(int fd, uint64_t gid) +{ + pthread_rwlock_wrlock(&server.lock); + g_hash_table_insert(server.umad_agent.fd2gid, g_memdup(&fd, sizeof(fd)), + g_memdup(&gid, sizeof(gid))); + g_hash_table_insert(server.umad_agent.gid2fd, g_memdup(&gid, sizeof(gid)), + g_memdup(&fd, sizeof(fd))); + pthread_rwlock_unlock(&server.lock); +} + +static void hash_tbl_save_fd_comm_id_pair(int fd, uint32_t comm_id) +{ + CommId2FdEntry fde = {fd, COMMID_TTL}; + + pthread_rwlock_wrlock(&server.lock); + g_hash_table_insert(server.umad_agent.commid2fd, + g_memdup(&comm_id, sizeof(comm_id)), + g_memdup(&fde, sizeof(fde))); + pthread_rwlock_unlock(&server.lock); +} + +static gboolean remove_old_comm_ids(gpointer key, gpointer value, + gpointer user_data) +{ + CommId2FdEntry *fde = (CommId2FdEntry *)value; + + return !fde->ttl--; +} + +static void hash_tbl_remove_fd_gid_pair(int fd) +{ + uint64_t gid; + + gid = hash_tbl_search_gid(fd); + if (!gid) { + return; + } + + pthread_rwlock_wrlock(&server.lock); + g_hash_table_remove(server.umad_agent.fd2gid, &fd); + g_hash_table_remove(server.umad_agent.gid2fd, &gid); + pthread_rwlock_unlock(&server.lock); +} + +static inline int get_fd(const char *mad) +{ + struct umad_hdr *hdr = (struct umad_hdr *)mad; + char *data = (char *)hdr + sizeof(*hdr); + int64_t gid; + int32_t comm_id; + uint16_t attr_id = be16toh(hdr->attr_id); + + if (attr_id == CM_REQ_ATTR_ID) { + memcpy(&gid, data + CM_REQ_DGID_POS, sizeof(gid)); + return hash_tbl_search_fd_by_gid(gid); + } + + if (attr_id == CM_SIDR_REQ_ATTR_ID) { + memcpy(&gid, data + CM_SIDR_REQ_DGID_POS, sizeof(gid)); + return hash_tbl_search_fd_by_gid(gid); + } + + switch (attr_id) { + case CM_REP_ATTR_ID: + /* Fall through */ + case CM_REJ_ATTR_ID: + /* Fall through */ + case CM_DREQ_ATTR_ID: + /* Fall through */ + case CM_DREP_ATTR_ID: + /* Fall through */ + case CM_RTU_ATTR_ID: + data += sizeof(comm_id); + /* Fall through */ + case CM_SIDR_REP_ATTR_ID: + memcpy(&comm_id, data, sizeof(comm_id)); + return hash_tbl_search_fd_by_comm_id(comm_id); + } + + syslog(LOG_WARNING, "Unsupported attr_id 0x%x\n", attr_id); + + return 0; +} + +static void *umad_recv_thread_func(void *args) +{ + int rc; + RdmaCmUMad umad = {0}; + int len; + int fd; + + while (server.run) { + do { + len = sizeof(umad.mad); + rc = umad_recv(server.umad_agent.port_id, &umad, &len, + SLEEP_SECS * SCALE_US); + if ((rc == -EIO) || (rc == -EINVAL)) { + syslog(LOG_CRIT, "Fatal error while trying to read MAD"); + } + + if (rc == -ETIMEDOUT) { + g_hash_table_foreach_remove(server.umad_agent.commid2fd, + remove_old_comm_ids, NULL); + } + } while (rc && server.run); + + if (server.run) { + fd = get_fd(umad.mad); + if (!fd) { + continue; + } + + send(fd, &umad, sizeof(umad), 0); + } + } + + return NULL; +} + +static int read_and_process(int fd) +{ + int rc; + RdmaCmUMad umad = {0}; + struct umad_hdr *hdr; + uint32_t *comm_id; + uint16_t attr_id; + + rc = recv(fd, &umad, sizeof(umad), 0); + + if (rc < 0 && errno != EWOULDBLOCK) { + return EIO; + } + + if (rc == 0) { + return EPIPE; + } + + if (rc == sizeof(umad.hdr)) { + hash_tbl_save_fd_gid_pair(fd, umad.hdr.addr.ib_gid.global.interface_id); + + syslog(LOG_INFO, "0x%llx registered on socket %d", + umad.hdr.addr.ib_gid.global.interface_id, fd); + + return 0; + } + + hdr = (struct umad_hdr *)umad.mad; + attr_id = be16toh(hdr->attr_id); + + /* If this is REQ or REP then store the pair comm_id,fd to be later + * used for other messages where gid is unknown */ + if ((attr_id == CM_REQ_ATTR_ID) || (attr_id == CM_DREQ_ATTR_ID) || + (attr_id == CM_SIDR_REQ_ATTR_ID) || (attr_id == CM_REP_ATTR_ID) || + (attr_id == CM_DREP_ATTR_ID)) { + comm_id = (uint32_t *)(umad.mad + sizeof(*hdr)); + hash_tbl_save_fd_comm_id_pair(fd, *comm_id); + } + + rc = umad_send(server.umad_agent.port_id, server.umad_agent.agent_id, &umad, + umad.hdr.length, 1, 0); + if (rc) { + syslog(LOG_WARNING, "Fail to send MAD message, err=%d", rc); + } + + return rc; +} + +static int accept_all(void) +{ + int fd, rc = 0;; + + pthread_rwlock_wrlock(&server.lock); + + do { + if ((server.nfds + 1) > MAX_CLIENTS) { + syslog(LOG_WARNING, "Too many clients (%d)", server.nfds); + rc = EIO; + goto out; + } + + fd = accept(server.fds[0].fd, NULL, NULL); + if (fd < 0) { + if (errno != EWOULDBLOCK) { + syslog(LOG_WARNING, "accept() failed"); + rc = EIO; + goto out; + } + break; + } + + server.fds[server.nfds].fd = fd; + server.fds[server.nfds].events = POLLIN; + server.nfds++; + } while (fd != -1); + +out: + pthread_rwlock_unlock(&server.lock); + return rc; +} + +static void close_fd(int idx) +{ + int i; + + close(server.fds[idx].fd); + syslog(LOG_INFO, "Socket %d closed\n", server.fds[idx].fd); + hash_tbl_remove_fd_gid_pair(server.fds[idx].fd); + + pthread_rwlock_wrlock(&server.lock); + + server.nfds--; + for (i = idx; i < server.nfds; i++) { + server.fds[i].fd = server.fds[i + 1].fd; + } + + pthread_rwlock_unlock(&server.lock); +} + +static void run(void) +{ + int rc, nfds, i; + + syslog(LOG_INFO, "Service started"); + + while (server.run) { + rc = poll(server.fds, server.nfds, SLEEP_SECS * SCALE_US); + if (rc < 0) { + if (errno != EINTR) { + syslog(LOG_WARNING, "poll() failed"); + } + continue; + } + + if (rc == 0) { + continue; + } + + nfds = server.nfds; + for (i = 0; i < nfds; i++) { + if (server.fds[i].revents == 0) { + continue; + } + + if (server.fds[i].revents != POLLIN) { + if (i == 0) { + syslog(LOG_NOTICE, "Unexpected poll() event (0x%x)\n", + server.fds[i].revents); + } else { + close_fd(i); + server.fds[i].revents = 0; + i--; /* Adjusting index since we just close this one */ + } + continue; + } + + if (i == 0) { + rc = accept_all(); + if (rc) { + continue; + } + } else { + rc = read_and_process(server.fds[i].fd); + if (rc) { + close_fd(i); + server.fds[i].revents = 0; + i--; /* Adjusting index since we just close this one */ + } + } + } + } +} + +static void fini_listener(void) +{ + int i; + + if (server.fds[0].fd <= 0) { + return; + } + + for (i = server.nfds - 1; i >= 0; i--) { + if (server.fds[i].fd) { + close(server.fds[i].fd); + } + } + + unlink(server.args.unix_socket_path); +} + +static void fini_umad(void) +{ + if (server.umad_agent.agent_id) { + umad_unregister(server.umad_agent.port_id, server.umad_agent.agent_id); + } + + if (server.umad_agent.port_id) { + umad_close_port(server.umad_agent.port_id); + } + + hash_tbl_free(); +} + +static void fini(void) +{ + if (server.umad_recv_thread) { + pthread_join(server.umad_recv_thread, NULL); + server.umad_recv_thread = 0; + } + fini_umad(); + fini_listener(); + pthread_rwlock_destroy(&server.lock); + + syslog(LOG_INFO, "Service going down"); +} + +static int init_listener(void) +{ + struct sockaddr_un sun; + int rc, on = 1; + + server.fds[0].fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (server.fds[0].fd < 0) { + syslog(LOG_WARNING, "socket() failed"); + return EIO; + } + + rc = setsockopt(server.fds[0].fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, + sizeof(on)); + if (rc < 0) { + syslog(LOG_WARNING, "setsockopt() failed"); + return EIO; + } + + rc = ioctl(server.fds[0].fd, FIONBIO, (char *)&on); + if (rc < 0) { + syslog(LOG_WARNING, "ioctl() failed"); + return EIO; + } + + sun.sun_family = AF_UNIX; + rc = snprintf(sun.sun_path, sizeof(sun.sun_path), "%s", + server.args.unix_socket_path); + if (rc < 0 || rc >= sizeof(sun.sun_path)) { + syslog(LOG_DEBUG, "Could not copy unix socket path\n"); + return EINVAL; + } + + rc = bind(server.fds[0].fd, (struct sockaddr *)&sun, sizeof(sun)); + if (rc < 0) { + syslog(LOG_WARNING, "bind() failed"); + return EIO; + } + + rc = listen(server.fds[0].fd, SERVER_LISTEN_BACKLOG); + if (rc < 0) { + syslog(LOG_WARNING, "listen() failed"); + return EIO; + } + + server.fds[0].events = POLLIN; + server.nfds = 1; + server.run = true; + + return 0; +} + +static int init_umad(void) +{ + long method_mask[1]; + + server.umad_agent.port_id = umad_open_port(server.args.rdma_dev_name, + server.args.rdma_port_num); + + if (server.umad_agent.port_id < 0) { + syslog(LOG_WARNING, "umad_open_port() failed"); + return EIO; + } + + method_mask[0] = MAD_METHOD_MASK0; + server.umad_agent.agent_id = umad_register(server.umad_agent.port_id, + MAD_MGMT_CLASS, MAD_MGMT_VERSION, + MAD_RMPP_VERSION, method_mask); + if (server.umad_agent.agent_id < 0) { + syslog(LOG_WARNING, "umad_register() failed"); + return EIO; + } + + hash_tbl_alloc(); + + return 0; +} + +void signal_handler(int signo) +{ + static bool warned = false; + + /* Prevent stop if clients are connected */ + if (server.nfds != 1) { + if (!warned) { + syslog(LOG_WARNING, + "Can't stop while active client exist, resend SIGINT to overid"); + warned = true; + return; + } + } + + if (signo == SIGINT) { + server.run = false; + fini(); + } + + exit(0); +} + +static int init(void) +{ + int rc; + + rc = init_listener(); + if (rc) { + return rc; + } + + rc = init_umad(); + if (rc) { + return rc; + } + + pthread_rwlock_init(&server.lock, 0); + + rc = pthread_create(&server.umad_recv_thread, NULL, umad_recv_thread_func, + NULL); + if (!rc) { + return rc; + } + + return 0; +} + +int main(int argc, char *argv[]) +{ + int rc; + + if (signal(SIGINT, signal_handler) == SIG_ERR) { + syslog(LOG_ERR, "Fail to install SIGINT handler\n"); + return EAGAIN; + } + + memset(&server, 0, sizeof(server)); + + parse_args(argc, argv); + + rc = init(); + if (rc) { + syslog(LOG_ERR, "Fail to initialize server (%d)\n", rc); + rc = EAGAIN; + goto out; + } + + run(); + +out: + fini(); + + return rc; +} -- 2.17.1 -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html