On Thu, 2021-05-20 at 00:13 -0400, Olivier Langlois wrote: > I know that my test case isn't conclusive. It is a failed attempt to > capture what my program is doing. > > The priority of investigating my core dump issue has substantially > dropped last week because I did solve my primary issue (A buffer leak > in the provided buffers to io_uring during disconnection). My program > did run for days but it did crash morning without any core dump > again. > It is a very frustrating situation because it would probably be a bug > trivial to diagnostic and fix but without the core, the logs are > opaque > and they just don't give no clue about why the program did crash. > > A key characteristic of my program, it is that it generates at least > 1 > io-worker thread per io_uring instance. As I get more familiar with io_uring source code, I have come to realize that it is practically impossible to not end up with NO io-wq threads. They are created in from io_uring_install_fd() which is called for every instances created. I'm a bit lazy for rebooting my desktop and I am still running 5.11.5 on it. I guess that with this kernel version, the io_uring threads weren't threads belonging to the user process and arent showing with ps by searching for a specific process LWPs. I correctly see all the generated threads when I run the test program on an up-to-date server (5.12.4). I have rewritten the whole test program. It has now become an io_uring multi connection http client (it could make a nice io_uring example program, IMHO). Still no success in reproducing the problem with it. So, I am giving up the idea of reproducing the problem with a showcase program unless I have some clue about how to reproduce it. However, I can reproduce it at will with my real program. So as Linus has suggested, I'll investigate by searching where the PF_IO_WORKER is used. I'll keep the list updated if I discover something. Greetings,
/* * Test program to reproduce issue generating core dumps while using io_uring. * * To compile: g++ -pthread test_io_uring_coredump.cpp -luring * */ #include <liburing.h> #include <stdio.h> #include <stdlib.h> // for abort() #include <unistd.h> // for close() #include <errno.h> #include <fcntl.h> #include <poll.h> #include <string.h> // for strerror() #include <memory.h> // for memset() #include <arpa/inet.h> #include <sys/socket.h> #include <netdb.h> #include <stdint.h> // for uint32_t, uint64_t #include <pthread.h> #include <sstream> #include <string> #define QD 256 #define BGID1 0 #define BGID2 1 #define PORT1 1975 #define PORT2 1976 #define BUFSZ 5*4096 enum { IOURING_POLL_ADD, IOURING_POLL_REMOVE, IOURING_PROV_BUF, IOURING_BUF_READ, IOURING_WRITE }; static const char *Io_Uring_OpTypeToStr(char type) { const char *res; switch (type) { case IOURING_POLL_ADD: res = "IOURING_POLL_ADD"; break; case IOURING_POLL_REMOVE: res = "IOURING_POLL_REMOVE"; break; case IOURING_PROV_BUF: res = "IOURING_PROV_BUF"; break; case IOURING_BUF_READ: res = "IOURING_BUF_READ"; break; case IOURING_WRITE: res = "IOURING_WRITE"; break; default: res = "Unknown"; } return res; } enum { THREAD_MAX_FD = 86, BIO_RING_SZ = 8 }; inline void * iouring_build_user_data(char type, int fd, uint32_t clientId) { return (void *)((uint32_t)fd | ((__u64)(clientId & 0x00ffffff) << 32 ) | ((__u64)type << 56)); } inline void iouring_decode_user_data(uint64_t data, char *type, int *fd, uint32_t *clientId) { *type = data >> 56; *fd = data & 0xffffffffU; *clientId = (data >> 32) & 0x00ffffffU; } /* * provideSimpleBuffer() */ static int provideSimpleBuffer(struct io_uring *ring, void *addr, int len, int nr, int bgid, int bid, int block) { int res; // register buffers for buffer selection struct io_uring_sqe *sqe; struct io_uring_cqe *cqe; sqe = io_uring_get_sqe(ring); io_uring_prep_provide_buffers(sqe, addr, len, nr, bgid, bid); io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_PROV_BUF, 0, -1)); if (block) { io_uring_submit(ring); io_uring_wait_cqe(ring, &cqe); res = cqe->res; if (res < 0) { errno = -res; } io_uring_cqe_seen(ring, cqe); } else { res = 0; } return res; } /* * initBufSelectRead() */ static void initBufSelectRead(struct io_uring *ring, int fd, size_t max_read, int gid, uint32_t clientId) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_read(sqe, fd, NULL, max_read, 0); io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT|IOSQE_ASYNC); sqe->buf_group = gid; io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_BUF_READ, fd, clientId)); } static void iouring_write(struct io_uring *ring, int fd, const void *addr, int len, uint32_t clientId) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_write(sqe, fd, addr, len, 0); io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_WRITE, fd, clientId)); } class BufferManager { public: BufferManager(struct io_uring *ring, int nr, int max_sz, int gid); ~BufferManager(); char *getBuffer(int bid) { return m_bufferBase + bid*m_max_sz; } void releaseBuffer(int bid); int getMaxSz() const { return m_max_sz; } int getGid() const { return m_gid; } private: struct io_uring *m_ring; char *m_bufferBase; int m_nr; int m_max_sz; int m_gid; }; BufferManager::BufferManager(struct io_uring *ring, int nr, int max_sz, int gid) : m_ring(ring), m_bufferBase(nullptr), m_nr(nr), m_max_sz(max_sz), m_gid(gid) { int res = posix_memalign(reinterpret_cast<void **>(&m_bufferBase), 1024, m_nr*m_max_sz); if (res) { fprintf(stderr, "posix_memalign: (%d) %s", res, strerror(res)); exit(0); } if (provideSimpleBuffer(m_ring, m_bufferBase, m_max_sz, m_nr, gid, 0, 1) < 0) { fprintf(stderr, "provideSimpleBuffer() failed: (%d) %s\n", errno, strerror(errno)); exit(0); } } BufferManager::~BufferManager() { // We could remove the buffer from io_uring here but for the sake of the test we won't } /* * releaseBuffer() */ void BufferManager::releaseBuffer(int bid) { provideSimpleBuffer(m_ring, getBuffer(bid), m_max_sz, 1, m_gid, bid, 0); } class HttpClient { public: HttpClient(); ~HttpClient(); /* * I don't want to write an URL parser for a test. */ void initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId); /* * As long as the read succeeds, we initiate a new read. Eventually, the server * is going to close its side of the connection and we will stop submitting operations. * When all the clients reach that state, the thread should block into io_uring_enter waiting * for new completion that will never come */ void processCompletion(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr); private: void sendRequest(struct io_uring *ring, uint32_t clientId); void recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr); int m_fd; enum { DOWN, CONNECTING, WAIT_REPLY } m_state; const char *m_host; const char *m_path; std::string m_request; }; HttpClient::HttpClient() : m_fd(-1), m_state(DOWN), m_host(nullptr), m_path(nullptr) { } HttpClient::~HttpClient() { if (m_fd >= 0) close(m_fd); m_fd = -1; } void HttpClient::initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId) { m_host = host; m_path = path; struct addrinfo hints; struct addrinfo *result = nullptr; memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_family = AF_INET; if (getaddrinfo(m_host, nullptr, &hints, &result) < 0) { fprintf(stderr, "getaddinfo() failed: (%d) %s\n", errno, strerror(errno)); exit(0); } void *p = NULL; struct addrinfo *res = result; /* pick the first AF_INET (IPv4) result */ while (!p && res) { switch (res->ai_family) { case AF_INET: p = &((struct sockaddr_in *)res->ai_addr)->sin_addr; break; } res = res->ai_next; } struct sockaddr_in sockAddr; memset(&sockAddr, 0, sizeof(struct sockaddr_in)); sockAddr.sin_family = AF_INET; sockAddr.sin_addr = *((struct in_addr *)p); sockAddr.sin_port = htons(80); freeaddrinfo(result); m_fd = socket(AF_INET, SOCK_STREAM, 0); if (m_fd < 0) { fprintf(stderr, "socket() failed: (%d) %s\n", errno, strerror(errno)); exit(0); } if (fcntl(m_fd, F_SETFL, O_NONBLOCK) < 0) { fprintf(stderr, "fcntl() failed: (%d) %s\n", errno, strerror(errno)); exit(0); } if (connect(m_fd, reinterpret_cast<struct sockaddr *>(&sockAddr), sizeof(struct sockaddr_in)) < 0 && errno != EINPROGRESS) { fprintf(stderr, "connect() failed: (%d) %s\n", errno, strerror(errno)); exit(0); } struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_poll_add(sqe, m_fd, POLLOUT); io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_POLL_ADD, m_fd, clientId)); m_state = CONNECTING; } /* * sendRequest() * * GET / HTTP/1.1 * Host: kernel.org:80 */ void HttpClient::sendRequest(struct io_uring *ring, uint32_t clientId) { std::ostringstream ost; ost << "GET " << m_path << " HTTP/1.1\r\n"; ost << "Host: " << m_host << ":80\r\n"; ost << "User-Agent: io_uring coredump test 1.0\r\n\r\n"; m_request = ost.str(); iouring_write(ring, m_fd, m_request.c_str(), m_request.size(), clientId); } /* * recvReply() */ void HttpClient::recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr) { char type; int fd, bid; uint32_t clientId; int res = cqe->res; iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId); bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT; char *buf = bufmgr.getBuffer(bid); fprintf(stderr, "client %d reply:\n\n%.*s\n", clientId, res, buf); // returns the buffer back to io_uring. provideSimpleBuffer(ring, buf, bufmgr.getMaxSz(), 1, bufmgr.getGid(), bid, 0); } /* * processCompletion() */ void HttpClient::processCompletion(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr) { char type; int fd, bid; uint32_t clientId; int res = cqe->res; iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId); switch (type) { case IOURING_POLL_ADD: if (m_state != CONNECTING) { fprintf(stderr, "client %d: unexpected IOURING_POLL_ADD completion\n", clientId); abort(); } // clear the O_NONBLOCK flag fcntl(m_fd, F_SETFL, 0); /* * Setup the next read before sending the request. * Maybe it is having multiple concurrent operations on the same * fd that is triggering the io thread spawn. */ initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId); sendRequest(ring, clientId); m_state = WAIT_REPLY; break; case IOURING_POLL_REMOVE: fprintf(stderr, "client %d: unexpected IOURING_POLL_REMOVE completion\n", clientId); abort(); break; case IOURING_PROV_BUF: if (res < 0) { fprintf(stderr, "client %d: unexpected IOURING_PROV_BUF failure: (%d) %s\n", clientId, -res, strerror(-res)); abort(); } break; case IOURING_BUF_READ: if (res <= 0) { fprintf(stderr, "client %d: unexpected IOURING_BUF_READ res: %d\n", clientId, res); } else { recvReply(ring, cqe, bufmgr); // Reload the read initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId); } break; case IOURING_WRITE: fprintf(stderr, "client %d: request tx completed: %d\n", clientId, res); break; } } #define NUMCLIENTS 6 int main(int argc, char *argv[]) { int res; struct io_uring_params params; struct io_uring ring; memset(¶ms, 0, sizeof(params)); if (io_uring_queue_init_params(QD, &ring, ¶ms) < 0) { fprintf(stderr, "io_uring_queue_init_params() failed: (%d) %s\n", errno, strerror(errno)); return res; } BufferManager bufmgr(&ring, THREAD_MAX_FD*BIO_RING_SZ, BUFSZ, BGID1); const char *servers[NUMCLIENTS] = { "google.com", "facebook.com", "twitter.com", "www.cloudflare.com", "kernel.org", "phoronix.com" }; HttpClient clients[NUMCLIENTS]; /* * Give the tester */ sleep(10); for (uint32_t i = 0; i < NUMCLIENTS; ++i) { clients[i].initiate(&ring, servers[i], "/", i); } while (1) { io_uring_submit_and_wait(&ring, 1); unsigned completed = 0; unsigned head; struct io_uring_cqe *cqe; io_uring_for_each_cqe(&ring, head, cqe) { ++completed; char type; int fd, bid; uint32_t clientId; iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId); if (clientId < NUMCLIENTS) { clients[clientId].processCompletion(&ring, cqe, bufmgr); } else { /* * The only operation for which we don't assign a client id is IORING_OP_PROVIDE_BUFFERS */ if (type != IOURING_PROV_BUF) { fprintf(stderr, "Unexpected completion event with no client association: %s\n", Io_Uring_OpTypeToStr(type)); abort(); } else if (cqe->res < 0) { fprintf(stderr, "unexpected IOURING_PROV_BUF failure: (%d) %s\n", -cqe->res, strerror(-cqe->res)); abort(); } } } if (completed) { io_uring_cq_advance(&ring, completed); } } err2: io_uring_queue_exit(&ring); err1: return res; }