Currently r->queue[] is cleared after r->consumer_head is moved forward, which makes the __ptr_ring_empty() checking called in page_pool_refill_alloc_cache() unreliable if the checking is done after the r->queue clearing and before the consumer_head moving forward. Move the r->queue[] clearing after consumer_head moving forward to make __ptr_ring_empty() checking more reliable. As a side effect of above change, a consumer_head checking is avoided for the likely case, and it has noticeable performance improvement when it is tested using the ptr_ring_test selftest added in the previous patch. Using "taskset -c 1 ./ptr_ring_test -s 1000 -m 0 -N 100000000" to test the case of single thread doing both the enqueuing and dequeuing: arch unpatched patched delta arm64 4648 ms 4464 ms +3.9% X86 2562 ms 2401 ms +6.2% Using "taskset -c 1-2 ./ptr_ring_test -s 1000 -m 1 -N 100000000" to test the case of one thread doing enqueuing and another thread doing dequeuing concurrently, also known as single-producer/single- consumer: arch unpatched patched delta arm64 3624 ms + 3624 ms 3462 ms + 3462 ms +4.4% x86 2758 ms + 2758 ms 2547 ms + 2547 ms +7.6% Signed-off-by: Yunsheng Lin <linyunsheng@xxxxxxxxxx> --- V2: Add performance data. --- include/linux/ptr_ring.h | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 808f9d3..db9c282 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -261,8 +261,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty * to work correctly. */ - int consumer_head = r->consumer_head; - int head = consumer_head++; + int consumer_head = r->consumer_head + 1; /* Once we have processed enough entries invalidate them in * the ring all at once so producer can reuse their space in the ring. @@ -271,19 +270,27 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) */ if (unlikely(consumer_head - r->consumer_tail >= r->batch || consumer_head >= r->size)) { + int tail = r->consumer_tail; + + if (unlikely(consumer_head >= r->size)) { + r->consumer_tail = 0; + WRITE_ONCE(r->consumer_head, 0); + } else { + r->consumer_tail = consumer_head; + WRITE_ONCE(r->consumer_head, consumer_head); + } + /* Zero out entries in the reverse order: this way we touch the * cache line that producer might currently be reading the last; * producer won't make progress and touch other cache lines * besides the first one until we write out all entries. */ - while (likely(head >= r->consumer_tail)) - r->queue[head--] = NULL; - r->consumer_tail = consumer_head; - } - if (unlikely(consumer_head >= r->size)) { - consumer_head = 0; - r->consumer_tail = 0; + while (likely(--consumer_head >= tail)) + r->queue[consumer_head] = NULL; + + return; } + /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ WRITE_ONCE(r->consumer_head, consumer_head); } -- 2.7.4