The objects are allocated with SLAB_TYPESAFE_BY_RCU, and there is n->next = first within hlist_add_head_rcu() before rcu_assign_pointer(), which modifies obj->obj_node.next. We need to make sure obj->key is updated before obj->obj_node.next and obj->refcnt, atomic_set_release is not enough to provide the required memory barrier. In addition, the example was broken as noted by Paul[1], this patch also fixes the broken example and includes wording to explain how the the back-pointer[2] works. [1] https://lore.kernel.org/all/9eaf506f-cc14-4da6-9efc-057c0c3e56b0@paulmck-laptop/ [2] https://lore.kernel.org/all/02a84e4b-a322-4c43-ad9d-1832ce277c2f@paulmck-laptop/ Signed-off-by: Alan Huang <mmpgouride@xxxxxxxxx> --- Paul, apologies for the long delay :) Changelog: V3 -> V4: Fixes the broken example. Add wording to explain how the back-pointer works. Remove the _ONCE added before since the access happens after we get the reference of the object. V2 -> V3: Use compute_slot to detect whether the object has been moved. Remove the broken lockless_lookup, use macro instead. v1 -> v2: Use _ONCE to protect obj->key. Documentation/RCU/rculist_nulls.rst | 149 +++++++++++++++++++++------- 1 file changed, 113 insertions(+), 36 deletions(-) diff --git a/Documentation/RCU/rculist_nulls.rst b/Documentation/RCU/rculist_nulls.rst index 21e40fcc08de..ee1f2458e07e 100644 --- a/Documentation/RCU/rculist_nulls.rst +++ b/Documentation/RCU/rculist_nulls.rst @@ -34,54 +34,54 @@ objects, which is having below type. :: + object *obj; + struct hlist_node *pos, *next; + slot = compute_slot(key); + head = &table[slot]; begin: rcu_read_lock(); - obj = lockless_lookup(key); - if (obj) { + obj_for_each_entry_rcu(obj, pos, head, next) { if (!try_get_ref(obj)) { // might fail for free objects rcu_read_unlock(); goto begin; } /* - * Because a writer could delete object, and a writer could - * reuse these object before the RCU grace period, we - * must check key after getting the reference on object - */ - if (obj->key != key) { // not the object we expected + * Because a writer could delete object, and a writer could + * reuse these object before the RCU grace period, we must check + * whether this object has been moved. The smp_wmb() in insertion + * algorithm and the smp_rmb() in obj_for_each_entry_rcu() ensure + * that if we read the new 'obj->obj_node.next' value, we will also + * read the new 'obj->key'. + */ + if (compute_slot(obj->key) != slot) { put_ref(obj); rcu_read_unlock(); goto begin; } + if (obj->key != key) // not the object we expected + put_ref(obj); + else + goto out; } + out: rcu_read_unlock(); -Beware that lockless_lookup(key) cannot use traditional hlist_for_each_entry_rcu() +Beware that obj_for_each_entry_rcu() cannot use traditional hlist_for_each_entry_rcu() but a version with an additional memory barrier (smp_rmb()) :: - lockless_lookup(key) - { - struct hlist_node *node, *next; - for (pos = rcu_dereference((head)->first); - pos && ({ next = pos->next; smp_rmb(); prefetch(next); 1; }) && - ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); - pos = rcu_dereference(next)) - if (obj->key == key) - return obj; - return NULL; - } + for (pos = rcu_dereference((head)->first); + pos && ({ next = READ_ONCE(pos->next); smp_rmb(); prefetch(next); 1; }) && + ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); + pos = rcu_dereference(next)) And note the traditional hlist_for_each_entry_rcu() misses this smp_rmb():: - struct hlist_node *node; for (pos = rcu_dereference((head)->first); pos && ({ prefetch(pos->next); 1; }) && ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); pos = rcu_dereference(pos->next)) - if (obj->key == key) - return obj; - return NULL; Quoting Corey Minyard:: @@ -112,7 +112,12 @@ detect the fact that it missed following items in original chain. obj = kmem_cache_alloc(...); lock_chain(); // typically a spin_lock() obj->key = key; - atomic_set_release(&obj->refcnt, 1); // key before refcnt + /* + * We need to make sure obj->key is updated before obj->obj_node.next + * and obj->refcnt. + */ + smp_wmb(); + atomic_set(&obj->refcnt, 1); hlist_add_head_rcu(&obj->obj_node, list); unlock_chain(); // typically a spin_unlock() @@ -140,7 +145,7 @@ very very fast (before the end of RCU grace period) Avoiding extra smp_rmb() ======================== -With hlist_nulls we can avoid extra smp_rmb() in lockless_lookup(). +With hlist_nulls we can avoid extra smp_rmb() in obj_for_each_entry_rcu(). For example, if we choose to store the slot number as the 'nulls' end-of-list marker for each slot of the hash table, we can detect @@ -165,18 +170,14 @@ Note that using hlist_nulls means the type of 'obj_node' field of begin: rcu_read_lock(); hlist_nulls_for_each_entry_rcu(obj, node, head, obj_node) { - if (obj->key == key) { - if (!try_get_ref(obj)) { // might fail for free objects - rcu_read_unlock(); - goto begin; - } - if (obj->key != key) { // not the object we expected - put_ref(obj); - rcu_read_unlock(); - goto begin; - } - goto out; + if (!try_get_ref(obj)) { // might fail for free objects + rcu_read_unlock(); + goto begin; } + if (obj->key != key) // not the object we expected + put_ref(obj); + else + goto out; } // If the nulls value we got at the end of this lookup is @@ -213,3 +214,79 @@ hlist_add_head_rcu(). */ hlist_nulls_add_head_rcu(&obj->obj_node, list); unlock_chain(); // typically a spin_unlock() + + + +-------------------------------------------------------------------------- + +Using Back-pointer +================== + +It is possible to detect a move mid-list, which can reduce the search time. +To achieve this, we store a back-pointer in every element that points to the +head of its chain. It is worth noting that we can also utilize 'compute_slot' +to detect this, which involves a tradeoff between time and space. + +:: + + struct object { + struct hlist_nulls_node obj_node; + strcut hlist_nulls_head *backptr; + atomic_t refcnt; + unsigned int key; + }; + + +1) lookup algorithm +------------------- + +:: + + head = &table[slot]; + begin: + rcu_read_lock(); + hlist_nulls_for_each_entry_rcu(obj, node, head, obj_node) { + if (!try_get_ref(obj)) { // might fail for free objects + rcu_read_unlock(); + goto begin; + } + if (obj->backptr != head) { // object is moved + put_ref(obj); + rcu_read_unlock(); + goto begin; + } + if (obj->key != key) // not the object we expected + put_ref(obj); + else + goto out; + } + + if (get_nulls_value(node) != slot) { + put_ref(obj); + rcu_read_unlock(); + goto begin; + } + obj = NULL; + + out: + rcu_read_unlock(); + +2) Insert algorithm +------------------- + +:: + + /* + * Please note that new inserts are done at the head of list, + * not in the middle or end. + */ + obj = kmem_cache_alloc(cachep); + lock_chain(); // typically a spin_lock() + obj->key = key; + obj->backptr = list; + atomic_set_release(&obj->refcnt, 1); // key and backptr before refcnt + /* + * insert obj in RCU way (readers might be traversing chain) + */ + hlist_nulls_add_head_rcu(&obj->obj_node, list); + unlock_chain(); // typically a spin_unlock() \ No newline at end of file -- 2.34.1