Question about the optimal receiving TCP streams via io_uring

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hello,

I would like to receive n 10 Gbps TCP or UDP streams (jumbo frames) as fast as possible and write each socket stream to a file on a fast XFS storage. How can I optimally implement this with io_uring? I want to use io_uring for network and file IO and the CPU load should keeping low. I would like to know your opinions. My first naive implementation looks like this, But I can't get more than 1Gbps through by one TCP stream:

#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <unistd.h>

#include "liburing.h"

#define MAX_CONNECTIONS     4096
#define BACKLOG             512
#define MAX_MESSAGE_LEN     9000
#define BUFFERS_COUNT       MAX_CONNECTIONS

struct Stream_Server {
    int port;
    struct io_uring_params ring_params;
    struct io_uring ring;
    int socket_listen_fd;
    struct sockaddr_in next_client_address;
    socklen_t next_client_address_size;
    uint64_t total_cqe_count;

    struct Data_Analyzer *data_analyzer;
};

enum {
    ACCEPT,
    READ,
    WRITE,
    PROVIDE_BUFFERS,
};

typedef struct conn_info {
    __u32 fd;
    __u16 type;
    __u16 bid;
} conn_info;

char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int socket_buffers_group_id = 1337;
int mdf_file_buffers_group_id = 1338;

void make_accept_sqe_and_submit(struct io_uring *ring, int fd, struct sockaddr *client_address, socklen_t *client_address_size, __u8 flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, fd, client_address, client_address_size, 0);
    io_uring_sqe_set_flags(sqe, flags);

    conn_info *conn_i = (conn_info *)&sqe->user_data;
    conn_i->fd = fd;
    conn_i->type = ACCEPT;
    conn_i->bid = 0;
}

void make_socket_read_sqe_and_submit(struct io_uring *ring, int fd, unsigned gid, size_t message_size, __u8 flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_recv(sqe, fd, NULL, message_size, 0);
    io_uring_sqe_set_flags(sqe, flags);
    sqe->buf_group = gid;

    conn_info *conn_i = (conn_info *)&sqe->user_data;
    conn_i->fd = fd;
    conn_i->type = READ;
    conn_i->bid = 0;
}

void make_socket_write_sqe_and_submit(struct io_uring *ring, int fd, __u16 bid, size_t message_size, __u8 flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, fd, &bufs[bid], message_size, 0);
    io_uring_sqe_set_flags(sqe, flags);

    conn_info *conn_i = (conn_info *)&sqe->user_data;
    conn_i->fd = fd;
    conn_i->type = WRITE;
    conn_i->bid = bid;
}

// @Temporary:  support n file streams
int outfd = -1;
off_t file_offset = 0;
int file_index = 0;

void make_file_write_sqe_and_submit(struct io_uring *ring, int socket_fd, __u16 bid, size_t message_size, off_t file_offset, __u8 flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_write(sqe, outfd, &bufs[bid], message_size, file_offset);
    io_uring_sqe_set_flags(sqe, flags);

    conn_info *conn_i = (conn_info *)&sqe->user_data;
    conn_i->fd = socket_fd;
    conn_i->type = WRITE;
    conn_i->bid = bid;
}

void make_provide_buffers_sqe_and_submit(struct io_uring *ring, __u16 bid, unsigned gid) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);

    conn_info *conn_i = (conn_info *)&sqe->user_data;
    conn_i->fd = 0;
    conn_i->type = PROVIDE_BUFFERS;
    conn_i->bid = 0;
}

struct Stream_Server *stream_server = NULL;

void main_loop_process_cqes() {
    struct io_uring_cqe *cqe;
    unsigned head;
    stream_server->total_cqe_count = 0;

    while (1) {
        uint64_t cqe_count = 0;

        io_uring_submit_and_wait(&stream_server->ring, 1);
        //io_uring_submit(&stream_server->ring);

        io_uring_for_each_cqe(&stream_server->ring, head, cqe) {
            cqe_count += 1;

            conn_info *conn_i = (conn_info *)&cqe->user_data;

            if (cqe->res == -ENOBUFS) {
                fprintf(stdout, "bufs in automatic buffer selection empty, this should not happen...\n");
                fflush(stdout);
                exit(1);
            } else if (conn_i->type == PROVIDE_BUFFERS) {
                if (cqe->res < 0) {
                    printf("cqe->res = %d\n", cqe->res);
                    exit(1);
                }
            } else if (conn_i->type == ACCEPT) {
                int sock_conn_fd = cqe->res;
                if (sock_conn_fd >= 0) {
                    outfd = open("/brick_storage/test_io_file.out", O_WRONLY | O_CREAT | O_TRUNC, 0644);
                    if (outfd < 0) {
                        perror("open outfile");
                        exit(1);
                    }

                    make_socket_read_sqe_and_submit(&stream_server->ring, sock_conn_fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
                }

                // new connected client; read data from socket and re-add accept to monitor for new connections
                make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);
            } else if (conn_i->type == READ) {
                int bytes_read = cqe->res;
                if (cqe->res <= 0) {
                    // connection closed or error
                    shutdown(conn_i->fd, SHUT_RDWR);
                } else {
                    // bytes have been read into bufs, now add write to socket sqe
                    int bid = cqe->flags >> 16;

                    /*
                      int *data = (int *)&bufs[bid];
                      int *count = (int *) data;
                      int *id = (int *) data + 1;
                      printf("read cqe: bid %d, fd %d, count %d, id %d, bytes_read %d\n", bid, conn_i->fd, *count, *id, bytes_read);
                    */

                    file_index += 1;
                    file_offset += bytes_read;
                    make_file_write_sqe_and_submit(&stream_server->ring, conn_i->fd, bid, bytes_read, file_offset, 0);
                }
            } else if (conn_i->type == WRITE) {
                // write has been completed, first re-add the buffer
                make_provide_buffers_sqe_and_submit(&stream_server->ring, conn_i->bid, socket_buffers_group_id);

                // @Speed: Too late? What's the optimal way to keep receiving socket data as fast as possible?
                make_socket_read_sqe_and_submit(&stream_server->ring, conn_i->fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
            }
        }

        io_uring_cq_advance(&stream_server->ring, cqe_count);
        stream_server->total_cqe_count += cqe_count;
    }
}

int stream_server_proc(struct Stream_Server *_stream_server) {
    stream_server = _stream_server;
    stream_server->next_client_address_size = sizeof(stream_server->next_client_address);

    struct sockaddr_in serv_addr = { 0 };

    stream_server->socket_listen_fd = socket(AF_INET, SOCK_STREAM /* | SOCK_NONBLOCK */, 0);
    const int val = 1;
    setsockopt(stream_server->socket_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(stream_server->port);
    serv_addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(stream_server->socket_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("Error binding socket...\n");
        exit(1);
    }
    if (listen(stream_server->socket_listen_fd, BACKLOG) < 0) {
        perror("Error listening on socket...\n");
        exit(1);
    }
    printf("listening for connections on port: %d\n", stream_server->port);

    memset(&stream_server->ring_params, 0, sizeof(stream_server->ring_params));

    if (io_uring_queue_init_params(2048, &stream_server->ring, &stream_server->ring_params) < 0) {
        perror("io_uring_init_failed...\n");
        exit(1);
    }

    struct io_uring_probe *probe;
    probe = io_uring_get_probe_ring(&stream_server->ring);
    if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
        printf("Buffer select not supported, skipping...\n");
        exit(0);
    }
    free(probe);

    // first time, register buffers for buffer selection
    {
        struct io_uring_sqe *sqe;
        struct io_uring_cqe *cqe;

        sqe = io_uring_get_sqe(&stream_server->ring);
        io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, socket_buffers_group_id, 0);

        io_uring_submit(&stream_server->ring);
        io_uring_wait_cqe(&stream_server->ring, &cqe);
        if (cqe->res < 0) {
            printf("cqe->res = %d\n", cqe->res);
            exit(1);
        }
        io_uring_cqe_seen(&stream_server->ring, cqe);
    }

    // add first accept SQE to monitor for new incoming connections
    make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);

    main_loop_process_cqes();
}

Grüße / Regards
Jens Frederich




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux