The objects are allocated with SLAB_TYPESAFE_BY_RCU, and there is n->next = first within hlist_add_head_rcu() before rcu_assign_pointer(), which modifies obj->obj_node.next. There may be readers holding the reference of obj in lockless_lookup, and when updater modifies ->next, readers can see the change immediately because of SLAB_TYPESAFE_BY_RCU. There are two memory ordering required in the insertion algorithm, we need to make sure obj->key is updated before obj->obj_node.next and obj->refcnt, atomic_set_release is not enough to provide the required memory barrier. Also use compute_slot to detect whether the object has been moved. Signed-off-by: Alan Huang <mmpgouride@xxxxxxxxx> --- Changelog: V2 -> V3: Use compute_slot to detect whether the object has been moved. Remove the broken lockless_lookup, use macro instead. v1 -> v2: Use _ONCE to protect obj->key. Documentation/RCU/rculist_nulls.rst | 73 +++++++++++++++-------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/Documentation/RCU/rculist_nulls.rst b/Documentation/RCU/rculist_nulls.rst index 21e40fcc08de..9d39394602b0 100644 --- a/Documentation/RCU/rculist_nulls.rst +++ b/Documentation/RCU/rculist_nulls.rst @@ -34,54 +34,54 @@ objects, which is having below type. :: + object *obj; + struct hlist_node *pos, *next; + slot = compute_slot(key); + head = &table[slot]; begin: rcu_read_lock(); - obj = lockless_lookup(key); - if (obj) { - if (!try_get_ref(obj)) { // might fail for free objects + obj_for_each_entry_rcu(obj, pos, head, next) { + if (compute_slot(READ_ONCE(obj->key)) != slot) { // object is moved rcu_read_unlock(); goto begin; } - /* - * Because a writer could delete object, and a writer could - * reuse these object before the RCU grace period, we - * must check key after getting the reference on object - */ - if (obj->key != key) { // not the object we expected - put_ref(obj); - rcu_read_unlock(); - goto begin; + if (READ_ONCE(obj->key) == key) { + if (!try_get_ref(obj)) { // might fail for free objects + rcu_read_unlock(); + goto begin; + } + /* + * Because a writer could delete object, and a writer could + * reuse these object before the RCU grace period, we + * must check key after getting the reference on object + */ + if (READ_ONCE(obj->key) != key) { // not the object we expected + put_ref(obj); + rcu_read_unlock(); + goto begin; + } + goto out; } } + out: rcu_read_unlock(); -Beware that lockless_lookup(key) cannot use traditional hlist_for_each_entry_rcu() +Beware that obj_for_each_entry_rcu() cannot use traditional hlist_for_each_entry_rcu() but a version with an additional memory barrier (smp_rmb()) :: - lockless_lookup(key) - { - struct hlist_node *node, *next; - for (pos = rcu_dereference((head)->first); - pos && ({ next = pos->next; smp_rmb(); prefetch(next); 1; }) && - ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); - pos = rcu_dereference(next)) - if (obj->key == key) - return obj; - return NULL; - } + for (pos = rcu_dereference((head)->first); + pos && ({ next = READ_ONCE(pos->next); smp_rmb(); prefetch(next); 1; }) && + ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); + pos = rcu_dereference(next)) And note the traditional hlist_for_each_entry_rcu() misses this smp_rmb():: - struct hlist_node *node; for (pos = rcu_dereference((head)->first); pos && ({ prefetch(pos->next); 1; }) && ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; }); pos = rcu_dereference(pos->next)) - if (obj->key == key) - return obj; - return NULL; Quoting Corey Minyard:: @@ -111,8 +111,13 @@ detect the fact that it missed following items in original chain. */ obj = kmem_cache_alloc(...); lock_chain(); // typically a spin_lock() - obj->key = key; - atomic_set_release(&obj->refcnt, 1); // key before refcnt + WRITE_ONCE(obj->key, key); + /* + * We need to make sure obj->key is updated before obj->obj_node.next + * and obj->refcnt. + */ + smp_wmb(); + atomic_set(&obj->refcnt, 1); hlist_add_head_rcu(&obj->obj_node, list); unlock_chain(); // typically a spin_unlock() @@ -140,7 +145,7 @@ very very fast (before the end of RCU grace period) Avoiding extra smp_rmb() ======================== -With hlist_nulls we can avoid extra smp_rmb() in lockless_lookup(). +With hlist_nulls we can avoid extra smp_rmb() in obj_for_each_entry_rcu(). For example, if we choose to store the slot number as the 'nulls' end-of-list marker for each slot of the hash table, we can detect @@ -165,12 +170,12 @@ Note that using hlist_nulls means the type of 'obj_node' field of begin: rcu_read_lock(); hlist_nulls_for_each_entry_rcu(obj, node, head, obj_node) { - if (obj->key == key) { + if (READ_ONCE(obj->key) == key) { if (!try_get_ref(obj)) { // might fail for free objects rcu_read_unlock(); goto begin; } - if (obj->key != key) { // not the object we expected + if (READ_ONCE(obj->key) != key) { // not the object we expected put_ref(obj); rcu_read_unlock(); goto begin; @@ -206,7 +211,7 @@ hlist_add_head_rcu(). */ obj = kmem_cache_alloc(cachep); lock_chain(); // typically a spin_lock() - obj->key = key; + WRITE_ONCE(obj->key, key); atomic_set_release(&obj->refcnt, 1); // key before refcnt /* * insert obj in RCU way (readers might be traversing chain) -- 2.34.1