Re: [PATCH] io_thread/x86: don't reset 'cs', 'ss', 'ds' and 'es' registers for io_threads

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, 2021-05-20 at 00:13 -0400, Olivier Langlois wrote:
> I know that my test case isn't conclusive. It is a failed attempt to
> capture what my program is doing.
> 
> The priority of investigating my core dump issue has substantially
> dropped last week because I did solve my primary issue (A buffer leak
> in the provided buffers to io_uring during disconnection). My program
> did run for days but it did crash morning without any core dump
> again.
> It is a very frustrating situation because it would probably be a bug
> trivial to diagnostic and fix but without the core, the logs are
> opaque
> and they just don't give no clue about why the program did crash.
> 
> A key characteristic of my program, it is that it generates at least
> 1
> io-worker thread per io_uring instance.

As I get more familiar with io_uring source code, I have come to
realize that it is practically impossible to not end up with NO io-wq
threads. They are created in from io_uring_install_fd() which is called
for every instances created.

I'm a bit lazy for rebooting my desktop and I am still running 5.11.5
on it. I guess that with this kernel version, the io_uring threads
weren't threads belonging to the user process and arent showing with ps
by searching for a specific process LWPs.

I correctly see all the generated threads when I run the test program
on an up-to-date server (5.12.4).

I have rewritten the whole test program. It has now become an io_uring
multi connection http client (it could make a nice io_uring example
program, IMHO). Still no success in reproducing the problem with it.

So, I am giving up the idea of reproducing the problem with a showcase
program unless I have some clue about how to reproduce it.

However, I can reproduce it at will with my real program. So as Linus
has suggested, I'll investigate by searching where the PF_IO_WORKER is
used.

I'll keep the list updated if I discover something.

Greetings,
/*
 * Test program to reproduce issue generating core dumps while using io_uring.
 *
 * To compile: g++ -pthread test_io_uring_coredump.cpp -luring
 *
 */

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>   // for abort()
#include <unistd.h>   // for close()
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <string.h>   // for strerror()
#include <memory.h>   // for memset()
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdint.h>   // for uint32_t, uint64_t
#include <pthread.h>
#include <sstream>
#include <string>

#define QD 256
#define BGID1 0
#define BGID2 1
#define PORT1 1975
#define PORT2 1976
#define BUFSZ 5*4096

enum {
    IOURING_POLL_ADD,
    IOURING_POLL_REMOVE,
    IOURING_PROV_BUF,
    IOURING_BUF_READ,
    IOURING_WRITE
};

static
const char *Io_Uring_OpTypeToStr(char type)
{
    const char *res;
    switch (type) {
        case IOURING_POLL_ADD:
            res = "IOURING_POLL_ADD";
            break;
        case IOURING_POLL_REMOVE:
            res = "IOURING_POLL_REMOVE";
            break;
        case IOURING_PROV_BUF:
            res = "IOURING_PROV_BUF";
            break;
        case IOURING_BUF_READ:
            res = "IOURING_BUF_READ";
            break;
        case IOURING_WRITE:
            res = "IOURING_WRITE";
            break;
        default:
            res = "Unknown";
    }
    return res;
}

enum { THREAD_MAX_FD = 86, BIO_RING_SZ = 8 };

inline
void *
iouring_build_user_data(char type, int fd, uint32_t clientId)
{
    return (void *)((uint32_t)fd | ((__u64)(clientId & 0x00ffffff) << 32 ) |
                    ((__u64)type << 56));
}

inline
void
iouring_decode_user_data(uint64_t data, char *type, int *fd, uint32_t *clientId)
{
    *type = data >> 56;
    *fd   = data & 0xffffffffU;
    *clientId = (data >> 32) & 0x00ffffffU;
}

/*
 * provideSimpleBuffer()
 */
static int provideSimpleBuffer(struct io_uring *ring, void *addr, int len, int nr, int bgid, int bid, int block)
{
    int res;
    // register buffers for buffer selection
    struct io_uring_sqe *sqe;
    struct io_uring_cqe *cqe;

    sqe = io_uring_get_sqe(ring);
    io_uring_prep_provide_buffers(sqe, addr, len, nr, bgid, bid);
    io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_PROV_BUF, 0, -1));
    if (block) {
        io_uring_submit(ring);
        io_uring_wait_cqe(ring, &cqe);
        res = cqe->res;
        if (res < 0) {
            errno = -res;
        }
        io_uring_cqe_seen(ring, cqe);
    }
    else {
        res = 0;
    }
    return res;
}

/*
 * initBufSelectRead()
 */
static void initBufSelectRead(struct io_uring *ring, int fd, size_t max_read, int gid, uint32_t clientId)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_read(sqe, fd, NULL, max_read, 0);
    io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT|IOSQE_ASYNC);
    sqe->buf_group = gid;
    io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_BUF_READ, fd, clientId));
}

static void
iouring_write(struct io_uring *ring, int fd, const void *addr, int len, uint32_t clientId)
{
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_write(sqe, fd, addr, len, 0);
  io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_WRITE, fd, clientId));
}

class BufferManager
{
public:
    BufferManager(struct io_uring *ring, int nr, int max_sz, int gid);
    ~BufferManager();
    
    char *getBuffer(int bid) { return m_bufferBase + bid*m_max_sz; }
    void  releaseBuffer(int bid);
    int getMaxSz() const { return m_max_sz; }
    int getGid()   const { return m_gid; }
    
private:
    struct io_uring *m_ring;
    char *m_bufferBase;
    int   m_nr;
    int   m_max_sz;
    int   m_gid;
};

BufferManager::BufferManager(struct io_uring *ring, int nr, int max_sz, int gid)
: m_ring(ring),
  m_bufferBase(nullptr),
  m_nr(nr),
  m_max_sz(max_sz),
  m_gid(gid)
{
    int res = posix_memalign(reinterpret_cast<void **>(&m_bufferBase), 1024, m_nr*m_max_sz);
    if (res) {
        fprintf(stderr, "posix_memalign: (%d) %s", res, strerror(res));
        exit(0);
    }
    if (provideSimpleBuffer(m_ring, m_bufferBase, m_max_sz, m_nr, gid, 0, 1) < 0) {
        fprintf(stderr, "provideSimpleBuffer() failed: (%d) %s\n", errno, strerror(errno));
        exit(0);
    }
}

BufferManager::~BufferManager()
{
    // We could remove the buffer from io_uring here but for the sake of the test we won't
}

/*
 * releaseBuffer()
 */
void BufferManager::releaseBuffer(int bid)
{
    provideSimpleBuffer(m_ring, getBuffer(bid), m_max_sz, 1, m_gid, bid, 0);
}

class HttpClient
{
public:
    HttpClient();
    ~HttpClient();
    
    /*
     * I don't want to write an URL parser for a test.
     */
    void initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId);
    /*
     * As long as the read succeeds, we initiate a new read. Eventually, the server
     * is going to close its side of the connection and we will stop submitting operations.
     * When all the clients reach that state, the thread should block into io_uring_enter waiting
     * for new completion that will never come
     */
    void processCompletion(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr);

private:
    void sendRequest(struct io_uring *ring, uint32_t clientId);
    void recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr);
    int m_fd;
    enum { DOWN, CONNECTING, WAIT_REPLY } m_state;
    const char *m_host;
    const char *m_path;
    std::string m_request;
};

HttpClient::HttpClient()
: m_fd(-1),
  m_state(DOWN),
  m_host(nullptr),
  m_path(nullptr)
{
}

HttpClient::~HttpClient()
{
    if (m_fd >= 0)
        close(m_fd);
    m_fd = -1;
}

void HttpClient::initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId)
{
    m_host = host;
    m_path = path;

    struct addrinfo hints;
    struct addrinfo *result = nullptr;
    memset(&hints, 0, sizeof(hints));
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_family = AF_INET;
    if (getaddrinfo(m_host, nullptr, &hints, &result) < 0) {
        fprintf(stderr, "getaddinfo() failed: (%d) %s\n", errno, strerror(errno));
        exit(0);
    }
    void *p = NULL;
    struct addrinfo *res = result;

    /* pick the first AF_INET (IPv4) result */

    while (!p && res) {
        switch (res->ai_family) {
            case AF_INET:
                p = &((struct sockaddr_in *)res->ai_addr)->sin_addr;
                break;
        }
        res = res->ai_next;
    }
    struct sockaddr_in sockAddr;
    memset(&sockAddr, 0, sizeof(struct sockaddr_in));
    sockAddr.sin_family = AF_INET;
    sockAddr.sin_addr = *((struct in_addr *)p);
    sockAddr.sin_port = htons(80);
    freeaddrinfo(result);
    m_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (m_fd < 0) {
        fprintf(stderr, "socket() failed: (%d) %s\n", errno, strerror(errno));
        exit(0);
    }
    if (fcntl(m_fd, F_SETFL, O_NONBLOCK) < 0) {
        fprintf(stderr, "fcntl() failed: (%d) %s\n", errno, strerror(errno));
        exit(0);
    }
    if (connect(m_fd, reinterpret_cast<struct sockaddr *>(&sockAddr), sizeof(struct sockaddr_in)) < 0 &&
        errno != EINPROGRESS) {
        fprintf(stderr, "connect() failed: (%d) %s\n", errno, strerror(errno));
        exit(0);
    }
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_poll_add(sqe, m_fd, POLLOUT);
    io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_POLL_ADD, m_fd, clientId));

    m_state = CONNECTING;
}

/*
 * sendRequest()
 * 
 * GET / HTTP/1.1
 * Host: kernel.org:80
 */
void HttpClient::sendRequest(struct io_uring *ring, uint32_t clientId)
{
    std::ostringstream ost;
    ost << "GET " << m_path << " HTTP/1.1\r\n";
    ost << "Host: " << m_host << ":80\r\n";
    ost << "User-Agent: io_uring coredump test 1.0\r\n\r\n";
    m_request = ost.str();
    iouring_write(ring, m_fd, m_request.c_str(), m_request.size(), clientId);
}

/*
 * recvReply()
 */
void HttpClient::recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr)
{
    char type;
    int  fd, bid;
    uint32_t clientId;
    int res = cqe->res;

    iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
    bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
    char *buf = bufmgr.getBuffer(bid);
    fprintf(stderr, "client %d reply:\n\n%.*s\n", clientId, res, buf);
    
    // returns the buffer back to io_uring.
    provideSimpleBuffer(ring, buf, bufmgr.getMaxSz(), 1, bufmgr.getGid(), bid, 0);
}

/*
 * processCompletion()
 */
void HttpClient::processCompletion(struct io_uring *ring,
                                   struct io_uring_cqe *cqe,
                                   BufferManager &bufmgr)
{
    char type;
    int  fd, bid;
    uint32_t clientId;
    int res = cqe->res;

    iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
    switch (type) {
        case IOURING_POLL_ADD:
            if (m_state != CONNECTING) {
                fprintf(stderr, "client %d: unexpected IOURING_POLL_ADD completion\n", clientId);
                abort();
            }
            // clear the O_NONBLOCK flag
            fcntl(m_fd, F_SETFL, 0);
            /*
             * Setup the next read before sending the request.
             * Maybe it is having multiple concurrent operations on the same
             * fd that is triggering the io thread spawn.
             */
            initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId);
            sendRequest(ring, clientId);
            m_state = WAIT_REPLY;
            break;
        case IOURING_POLL_REMOVE:
            fprintf(stderr, "client %d: unexpected IOURING_POLL_REMOVE completion\n", clientId);
            abort();
            break;
        case IOURING_PROV_BUF:
            if (res < 0) {
                fprintf(stderr, "client %d: unexpected IOURING_PROV_BUF failure: (%d) %s\n",
                        clientId, -res, strerror(-res));
                abort();
            }
            break;
        case IOURING_BUF_READ:
            if (res <= 0) {
                fprintf(stderr, "client %d: unexpected IOURING_BUF_READ res: %d\n",
                        clientId, res);
            }
            else {
                recvReply(ring, cqe, bufmgr);
                // Reload the read
                initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId);
            }
            break;
        case IOURING_WRITE:
            fprintf(stderr, "client %d: request tx completed: %d\n", clientId, res);
            break;
    }
}

#define NUMCLIENTS 6

int main(int argc, char *argv[])
{
    int res;
    struct io_uring_params params;
    struct io_uring ring;
    
    memset(&params, 0, sizeof(params));

    if (io_uring_queue_init_params(QD, &ring, &params) < 0) {
        fprintf(stderr, "io_uring_queue_init_params() failed: (%d) %s\n", errno, strerror(errno));
        return res;
    }
    BufferManager bufmgr(&ring, THREAD_MAX_FD*BIO_RING_SZ, BUFSZ, BGID1);

    const char *servers[NUMCLIENTS] = {
        "google.com",
        "facebook.com",
        "twitter.com",
        "www.cloudflare.com",
        "kernel.org",
        "phoronix.com"
    };

    HttpClient clients[NUMCLIENTS];

    /*
     * Give the tester
     */
    sleep(10);
    
    for (uint32_t i = 0; i < NUMCLIENTS; ++i) {
        clients[i].initiate(&ring, servers[i], "/", i);
    }

    while (1) {
        io_uring_submit_and_wait(&ring, 1);
        unsigned completed = 0;
        unsigned head;
        struct io_uring_cqe *cqe;
        io_uring_for_each_cqe(&ring, head, cqe) {
            ++completed;
            char type;
            int  fd, bid;
            uint32_t clientId;
            iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
            if (clientId < NUMCLIENTS) {
                clients[clientId].processCompletion(&ring, cqe, bufmgr);
            }
            else {
                /*
                 * The only operation for which we don't assign a client id is IORING_OP_PROVIDE_BUFFERS
                 */
                if (type != IOURING_PROV_BUF) {
                    fprintf(stderr, "Unexpected completion event with no client association: %s\n",
                            Io_Uring_OpTypeToStr(type));
                    abort();
                }
                else if (cqe->res < 0) {
                    fprintf(stderr, "unexpected IOURING_PROV_BUF failure: (%d) %s\n",
                            -cqe->res, strerror(-cqe->res));
                    abort();
                }
            }
        }
        if (completed) {
            io_uring_cq_advance(&ring, completed);
        }
    }
err2:
    io_uring_queue_exit(&ring);
err1:
    return res;
}

[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux