On 04 Mar 2013 07:10:00 +0100 NeilBrown <neilb@xxxxxxx> wrote: > We currently queue an upcall after setting CACHE_PENDING, > and dequeue after clearing CACHE_PENDING. > So a request should only be present when CACHE_PENDING is set. > > However we don't combine the test and the enqueue/dequeue in > a protected region, so it is possible (if unlikely) for a race > to result in a request being queued without CACHE_PENDING set, > or a request to be absent despite CACHE_PENDING. > > So: include a test for CACHE_PENDING inside the regions of > enqueue and dequeue where queue_lock is held, and abort > the operation if the value is not as expected. > > Also remove the 'return' from cache_dequeue() to ensure it always > removes all entries: As there is no locking between setting > CACHE_PENDING and calling sunrpc_cache_pipe_upcall it is not > inconceivable for some other thread to clear CACHE_PENDING and then > someone else to set it can call sunrpc_cache_pipe_upcall, both before > the original thread completed the call. > > With this, it perfectly safe and correct to: > - call cache_dequeue() if and only if we have just > cleared CACHE_PENDING > - call sunrpc_cache_pipe_upcall() (via cache_make_upcall) > if and only if we have just set CACHE_PENDING. > > Reported-by: Bodo Stroesser <bstroesser@xxxxxxxxxxxxxx> > Signed-off-by: NeilBrown <neilb@xxxxxxx> > --- > net/sunrpc/cache.c | 17 ++++++++++++++--- > 1 file changed, 14 insertions(+), 3 deletions(-) > > diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c > index 9afa439..0400a92 100644 > --- a/net/sunrpc/cache.c > +++ b/net/sunrpc/cache.c > @@ -1022,6 +1022,9 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch) > struct cache_request *cr = container_of(cq, struct cache_request, q); > if (cr->item != ch) > continue; > + if (test_bit(CACHE_PENDING, &ch->flags)) > + /* Lost a race and it is pending again */ > + break; > if (cr->readers != 0) > continue; > list_del(&cr->q.list); > @@ -1029,7 +1032,6 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch) > cache_put(cr->item, detail); > kfree(cr->buf); > kfree(cr); > - return; Just removing "return" is not enough. If the loop no longer stops after dequeueing the first entry found, some other changes are necessary also: 1) use list_for_each_entry_safe() instead of list_for_each_entry() 2) don't call spin_unlock() in the loop. Thus, at the end of this mail I added a revised patch. With this patch cache_dequeue() no longer frees the requests in the loop, but moves them to a temporary queue. After the loop it calls spin_unlock() and does the kfree() and cache_put() in a second loop. The patch is not tested on a mainline kernel. Instead, I ported it to SLES11 SP1 2.6.32.59-0.7.1. On SLES 11 the test is still running fine. Best Regards, Bodo > } > spin_unlock(&queue_lock); > } > @@ -1151,6 +1153,7 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h, > struct cache_request *crq; > char *bp; > int len; > + int ret = 0; > > if (!cache_listeners_exist(detail)) { > warn_no_listener(detail); > @@ -1182,10 +1185,18 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h, > crq->len = PAGE_SIZE - len; > crq->readers = 0; > spin_lock(&queue_lock); > - list_add_tail(&crq->q.list, &detail->queue); > + if (test_bit(CACHE_PENDING, &h->flags)) > + list_add_tail(&crq->q.list, &detail->queue); > + else > + /* Lost a race, no longer PENDING, so don't enqueue */ > + ret = -EAGAIN; > spin_unlock(&queue_lock); > wake_up(&queue_wait); > - return 0; > + if (ret == -EAGAIN) { > + kfree(buf); > + kfree(crq); > + } > + return ret; > } > EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall); > > ..... Reported-by: Bodo Stroesser <bstroesser@xxxxxxxxxxxxxx> Signed-off-by: NeilBrown <neilb@xxxxxxx> Signed-off-by: Bodo Stroesser <bstroesser@xxxxxxxxxxxxxx> --- --- a/net/sunrpc/cache.c 2013-02-20 14:05:08.000000000 +0100 +++ b/net/sunrpc/cache.c 2013-03-05 13:30:13.000000000 +0100 @@ -1015,23 +1015,31 @@ static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch) { - struct cache_queue *cq; + struct cache_request *cr, *tmp; + struct list_head dequeued; + + INIT_LIST_HEAD(&dequeued); spin_lock(&queue_lock); - list_for_each_entry(cq, &detail->queue, list) - if (!cq->reader) { - struct cache_request *cr = container_of(cq, struct cache_request, q); + list_for_each_entry_safe(cr, tmp, &detail->queue, q.list) + if (!cr->q.reader) { if (cr->item != ch) continue; + if (test_bit(CACHE_PENDING, &ch->flags)) + /* Lost a race and it is pending again */ + break; if (cr->readers != 0) continue; - list_del(&cr->q.list); - spin_unlock(&queue_lock); - cache_put(cr->item, detail); - kfree(cr->buf); - kfree(cr); - return; + list_move(&cr->q.list, &dequeued); } spin_unlock(&queue_lock); + + while (!list_empty(&dequeued)) { + cr = list_entry(dequeued.next, struct cache_request, q.list); + list_del(&cr->q.list); + cache_put(cr->item, detail); + kfree(cr->buf); + kfree(cr); + } } /* @@ -1151,6 +1159,7 @@ struct cache_request *crq; char *bp; int len; + int ret = 0; if (!cache_listeners_exist(detail)) { warn_no_listener(detail); @@ -1182,10 +1191,18 @@ crq->len = PAGE_SIZE - len; crq->readers = 0; spin_lock(&queue_lock); - list_add_tail(&crq->q.list, &detail->queue); + if (test_bit(CACHE_PENDING, &h->flags)) + list_add_tail(&crq->q.list, &detail->queue); + else + /* Lost a race, no longer PENDING, so don't enqueue */ + ret = -EAGAIN; spin_unlock(&queue_lock); wake_up(&queue_wait); - return 0; + if (ret == -EAGAIN) { + kfree(buf); + kfree(crq); + } + return ret; } EXPORT_SYMBOL_GPL(sunrpc_cache_pipe_upcall); ÿôèº{.nÇ+?·?®??+%?Ëÿ±éݶ¥?wÿº{.nÇ+?·¥?{±þwìþ)í?æèw*jg¬±¨¶????Ý¢jÿ¾«þG«?éÿ¢¸¢·¦j:+v?¨?wèjØm¶?ÿþø¯ù®w¥þ?àþf£¢·h??â?úÿ?Ù¥