Is there a way to process multiple shared umem sockets on same RX-queue in parallel?

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



*xdpsock_user.c* (https://github.com/torvalds/linux/blob/master/samples/bpf/xdpsock_user.c) implements multiple AF-XDP sockets on the same RX-Queue using shared Umem.
The sockets are then processed in a for-loop like this (1:1 copy):

		struct pollfd fds[MAX_SOCKS] = {};
		int i, ret;

		for (i = 0; i < num_socks; i++) {
			fds[i].fd = xsk_socket__fd(xsks[i]->xsk);
			fds[i].events = POLLIN;
		}

		for (;;) {
			if (opt_poll) {
				ret = poll(fds, num_socks, opt_timeout);
				if (ret <= 0)
					continue;
			}

			for (i = 0; i < num_socks; i++)
				rx_drop(xsks[i], fds);

			if (benchmark_done)
				break;
		}

This approach works fine in my implementation as well but I was wondering if it is possible to spawn a thread for every socket and let those threads process receiving packets in a parallel fashion.
After processing is done (right before they go to sleep to wait for the next batch of packets), those threads are adding an entry to a message queue, instructing another thread (which is processing the queue) to free Umem-Fill-Queue and RX-Ring memory of that socket.
I know that Umem is only single producer / single consumer so I went into the implementation with the mindset that it probably wouldn't work out - and it didn't. But now I am wondering whether this is not possible by design (this means that indeed every socket of a RX-Queue using shared umem has to be processed sequentially) or if I did something wrong.
So this is how a thread processes incoming packets:

		uint32_t idx_rx = 0;
		const int rcvd = xsk_ring_cons__peek(&xsk_socket->rx, INT32_MAX, &idx_rx);
		if (rcvd == 0) {
			continue;
		}

		uint32_t idx_fq = 0;
		const int ret = xsk_ring_prod__reserve(&xsk_socket->umem->fq, rcvd, &idx_fq);
                /* continue checking if ret != rcvd ... */
		if(ret == rcvd) {
			for (uint32_t i = idx_rx; i < (idx_rx + rcvd); i++) {
				const struct xdp_desc *desc = xsk_ring_cons__rx_desc(&xsk_socket->rx, i);
				uint64_t addr = desc->addr;
				const uint32_t len = desc->len;
				const uint64_t orig = xsk_umem__extract_addr(addr);

				addr = xsk_umem__add_offset_to_addr(addr);

				const int hdr_size = process_packet(xsk_socket, addr, len);
				*xsk_ring_prod__fill_addr(&xsk_socket->umem->fq, idx_fq++) = orig;
				
				xsk_socket->stats.rx_bytes += (len - hdr_size);
			}

			xsk_socket->stats.rx_packets += rcvd;

			struct xsk_thread_msg msg;
			msg.xsk_rx = &xsk_socket->rx;
			msg.xsk_rx_rcvd = rcvd;
			msg.xsk_prod_reserve_amnt = ret;
			if(msgsnd(msg_queue_id, (void*)(&msg), sizeof(struct xsk_thread_msg), 0) < 0) {
				fprintf(stderr, "Failed to add msg from xsk-socket: %s\n", strerror(errno));
			}
		
		}

and those messages are then received by another thread:

		struct xsk_thread_msg msg;
		while(!global_exit) {
			memset(&msg, 0, sizeof(struct xsk_thread_msg));
			if(msgrcv(msg_queue_id, (void*)(&msg), sizeof(struct xsk_thread_msg), 0, 0) == -1) {
				fprintf(stderr, "Failed to receive message: %s\n", strerror(errno));
			} else {
				xsk_ring_cons__release(msg.xsk_rx, msg.xsk_rx_rcvd);
				xsk_ring_prod__submit(&umem_info->fq, msg.xsk_prod_reserve_amnt);
			}
		}

What I am observing is that it works for a few seconds or so and then packet loss gradually increases to 100% (e.g. sequence numbers don't match up) but the amount of received packets stays the same.




[Index of Archives]     [Linux Networking Development]     [Fedora Linux Users]     [Linux SCTP]     [DCCP]     [Gimp]     [Yosemite Campsites]

  Powered by Linux