On 12/10/19 1:56 PM, Alex Kogan wrote: > ----- longman@xxxxxxxxxx wrote: > >> On 11/25/19 4:07 PM, Alex Kogan wrote: >>> @@ -234,12 +263,13 @@ __always_inline u32 cna_pre_scan(struct >> qspinlock *lock, >>> struct cna_node *cn = (struct cna_node *)node; >>> >>> /* >>> - * setting @pre_scan_result to 1 indicates that no post-scan >>> + * setting @pre_scan_result to 1 or 2 indicates that no post-scan >>> * should be made in cna_pass_lock() >>> */ >>> cn->pre_scan_result = >>> - cn->intra_count == intra_node_handoff_threshold ? >>> - 1 : cna_scan_main_queue(node, node); >>> + (node->locked <= 1 && probably(SHUFFLE_REDUCTION_PROB_ARG)) ? >>> + 1 : cn->intra_count == intra_node_handoff_threshold ? >>> + 2 : cna_scan_main_queue(node, node); >>> >>> return 0; >>> } >>> @@ -253,12 +283,15 @@ static inline void cna_pass_lock(struct >> mcs_spinlock *node, >>> >>> u32 scan = cn->pre_scan_result; >>> >>> + if (scan == 1) >>> + goto pass_lock; >>> + >>> /* >>> * check if a successor from the same numa node has not been found >> in >>> * pre-scan, and if so, try to find it in post-scan starting from >> the >>> * node where pre-scan stopped (stored in @pre_scan_result) >>> */ >>> - if (scan > 1) >>> + if (scan > 2) >>> scan = cna_scan_main_queue(node, decode_tail(scan)); >>> >>> if (!scan) { /* if found a successor from the same numa node */ >>> @@ -281,5 +314,6 @@ static inline void cna_pass_lock(struct >> mcs_spinlock *node, >>> tail_2nd->next = next; >>> } >>> >>> +pass_lock: >>> arch_mcs_pass_lock(&next_holder->locked, val); >>> } >> I think you might have mishandled the proper accounting of >> intra_count. >> How about something like: >> >> diff --git a/kernel/locking/qspinlock_cna.h >> b/kernel/locking/qspinlock_cna.h >> index f1eef6bece7b..03f8fdec2b80 100644 >> --- a/kernel/locking/qspinlock_cna.h >> +++ b/kernel/locking/qspinlock_cna.h >> @@ -268,7 +268,7 @@ __always_inline u32 cna_pre_scan(struct qspinlock >> *lock, >> */ >> cn->pre_scan_result = >> (node->locked <= 1 && >> probably(SHUFFLE_REDUCTION_PROB_ARG)) ? >> - 1 : cn->intra_count == >> intra_node_handoff_threshold ? >> + 1 : cn->intra_count >= >> intra_node_handoff_threshold ? > We reset ‘intra_count' in cna_init_node(), which is called before we enter > the slow path, and set it at most once when we pass the internal (CNA) lock > by taking the owner’s value + 1. Only after we get the internal lock, we > call this cna_pre_scan() function, where we check the threshold. > IOW, having 'intra_count > intra_node_handoff_threshold' would mean a bug, > and having “>=“ would mask it. > Perhaps I can add WARN_ON(cn->intra_count > intra_node_handoff_threshold) > here instead, although I'm not sure if that is a good idea performance-wise. The code that I added below could have the possibility of making intra_count > intra_node_handoff_threshold. I agree with your assessment of the current code. This conditional check is fine if no further change is made. >> 2 : cna_scan_main_queue(node, node); >> >> return 0; >> @@ -283,9 +283,6 @@ static inline void cna_pass_lock(struct >> mcs_spinlock >> *node, >> >> u32 scan = cn->pre_scan_result; >> >> - if (scan == 1) >> - goto pass_lock; >> - > The thing is that we want to avoid as much of the shuffling-related overhead > as we can when the spinlock is only lightly contended. That's why we have this > early exit here that avoids the rest of the logic of triaging through possible > 'scan' values. That is a valid point. Maybe you can document that fact you are optimizing for performance instead of better correctness. >> /* >> * check if a successor from the same numa node has not been >> found in >> * pre-scan, and if so, try to find it in post-scan starting >> from the >> @@ -294,7 +291,13 @@ static inline void cna_pass_lock(struct >> mcs_spinlock *node, >> if (scan > 2) >> scan = cna_scan_main_queue(node, decode_tail(scan)); >> >> - if (!scan) { /* if found a successor from the same numa node >> */ >> + if (scan <= 1) { /* if found a successor from the same numa >> node */ >> + /* inc @intra_count if the secondary queue is not >> empty */ >> + ((struct cna_node *)next_holder)->intra_count = >> + cn->intra_count + (node->locked > 1); >> + if ((scan == 1) >> + goto pass_lock; >> + > Hmm, I am not sure this makes the code any better/more readable, > while this does add the overhead of going through 3 branches before > jumping to 'pass_lock'. > This is just a suggestion for improving the correctness of the code. I am fine if you opt for better performance. Cheers, Longman