From: Pavel Begunkov <asml.silence@xxxxxxxxx> Currently, if user fails to keep up with the network and doesn't refill the buffer ring fast enough the NIC/driver will start dropping packets. That might be too punishing, so let's fall back to non-zerocopy version by allowing the driver to do normal kernel allocations. Later, when we're in the task context doing zc_rx_recv_skb() we'll detect such pages and copy them into user specified buffers. This patch implement the second (copy) part. It'll facilitate adoption and help the user to strike the balance b/w allocation the right amount of zerocopy buffers and being resilient to surges in traffic. Note, due to technical reasons for now we're only using buffers from ->freelist, which is unreliably and is likely to fail with time. It'll be revised in later patches. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> Signed-off-by: David Wei <dw@xxxxxxxxxxx> --- io_uring/zc_rx.c | 115 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 10 deletions(-) diff --git a/io_uring/zc_rx.c b/io_uring/zc_rx.c index c1502ec3e629..c2ed600f0951 100644 --- a/io_uring/zc_rx.c +++ b/io_uring/zc_rx.c @@ -498,6 +498,26 @@ static void io_zc_rx_refill_cache(struct io_zc_rx_ifq *ifq, int count) pool->cache_count += filled; } +static struct io_zc_rx_buf *io_zc_get_buf_task_safe(struct io_zc_rx_ifq *ifq) +{ + struct io_zc_rx_pool *pool = ifq->pool; + struct io_zc_rx_buf *buf = NULL; + u32 pgid; + + if (!READ_ONCE(pool->free_count)) + return NULL; + + spin_lock_bh(&pool->freelist_lock); + if (pool->free_count) { + pool->free_count--; + pgid = pool->freelist[pool->free_count]; + buf = &pool->bufs[pgid]; + atomic_set(&buf->refcount, 1); + } + spin_unlock_bh(&pool->freelist_lock); + return buf; +} + struct io_zc_rx_buf *io_zc_rx_get_buf(struct io_zc_rx_ifq *ifq) { struct io_zc_rx_pool *pool = ifq->pool; @@ -576,6 +596,11 @@ static struct io_zc_rx_ifq *io_zc_rx_ifq_skb(struct sk_buff *skb) return NULL; } +static inline void io_zc_return_rbuf_cqe(struct io_zc_rx_ifq *ifq) +{ + ifq->cached_cq_tail--; +} + static inline struct io_uring_rbuf_cqe *io_zc_get_rbuf_cqe(struct io_zc_rx_ifq *ifq) { struct io_uring_rbuf_cqe *cqe; @@ -595,6 +620,51 @@ static inline struct io_uring_rbuf_cqe *io_zc_get_rbuf_cqe(struct io_zc_rx_ifq * return cqe; } +static ssize_t zc_rx_copy_chunk(struct io_zc_rx_ifq *ifq, void *data, + unsigned int offset, size_t len) +{ + size_t copy_size, copied = 0; + struct io_uring_rbuf_cqe *cqe; + struct io_zc_rx_buf *buf; + unsigned int pgid; + int ret = 0, off = 0; + u8 *vaddr; + + do { + cqe = io_zc_get_rbuf_cqe(ifq); + if (!cqe) { + ret = ENOBUFS; + break; + } + buf = io_zc_get_buf_task_safe(ifq); + if (!buf) { + io_zc_return_rbuf_cqe(ifq); + ret = -ENOMEM; + break; + } + + vaddr = kmap_local_page(buf->page); + copy_size = min_t(size_t, PAGE_SIZE, len); + memcpy(vaddr, data + offset, copy_size); + kunmap_local(vaddr); + + pgid = page_private(buf->page) & 0xffffffff; + io_zc_rx_get_buf_uref(ifq->pool, pgid); + io_zc_rx_put_buf(ifq, buf); + + cqe->region = 0; + cqe->off = pgid * PAGE_SIZE + off; + cqe->len = copy_size; + cqe->flags = 0; + + offset += copy_size; + len -= copy_size; + copied += copy_size; + } while (offset < len); + + return copied ? copied : ret; +} + static int zc_rx_recv_frag(struct io_zc_rx_ifq *ifq, const skb_frag_t *frag, int off, int len, bool zc_skb) { @@ -618,9 +688,21 @@ static int zc_rx_recv_frag(struct io_zc_rx_ifq *ifq, const skb_frag_t *frag, cqe->len = len; cqe->flags = 0; } else { - /* TODO: copy frags that aren't backed by zc pages */ - WARN_ON_ONCE(1); - return -ENOMEM; + u32 p_off, p_len, t, copied = 0; + u8 *vaddr; + int ret = 0; + + skb_frag_foreach_page(frag, off, len, + page, p_off, p_len, t) { + vaddr = kmap_local_page(page); + ret = zc_rx_copy_chunk(ifq, vaddr, p_off, p_len); + kunmap_local(vaddr); + + if (ret < 0) + return copied ? copied : ret; + copied += ret; + } + len = copied; } return len; @@ -633,7 +715,7 @@ zc_rx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb, struct io_zc_rx_ifq *ifq = desc->arg.data; struct io_zc_rx_ifq *skb_ifq; struct sk_buff *frag_iter; - unsigned start, start_off; + unsigned start, start_off = offset; int i, copy, end, off; bool zc_skb = true; int ret = 0; @@ -643,14 +725,27 @@ zc_rx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb, zc_skb = false; if (WARN_ON_ONCE(skb_ifq)) return -EFAULT; - pr_debug("non zerocopy pages are not supported\n"); - return -EFAULT; } - start = skb_headlen(skb); - start_off = offset; - // TODO: copy payload in skb linear data */ - WARN_ON_ONCE(offset < start); + if (unlikely(offset < skb_headlen(skb))) { + ssize_t copied; + size_t to_copy; + + to_copy = min_t(size_t, skb_headlen(skb) - offset, len); + copied = zc_rx_copy_chunk(ifq, skb->data, offset, to_copy); + if (copied < 0) { + ret = copied; + goto out; + } + offset += copied; + len -= copied; + if (!len) + goto out; + if (offset != skb_headlen(skb)) + goto out; + } + + start = skb_headlen(skb); for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { const skb_frag_t *frag; -- 2.39.3