On Mon 2015-08-03 14:31:09, Steven Rostedt wrote: > On Tue, 28 Jul 2015 16:39:26 +0200 > Petr Mladek <pmladek@xxxxxxxx> wrote: > > > It looks strange to initialize the completions repeatedly. > > > > This patch uses static initialization. It simplifies the code > > and even helps to get rid of two memory barriers. > > There was a reason I did it this way and did not use static > initializers. But I can't recall why I did that. :-/ > > I'll have to think about this some more. Heh, the parallel programming is a real fun. I tried to understand the code in more details and sometimes felt like Duane Dibbley. Anyway, I found few possible races related to the completions. One scenario was opened by my previous fix b44754d8262d3aab8429 ("ring_buffer: Allow to exit the ring buffer benchmark immediately") The races can be fixed by the patch below. I still do not see any scenario where the extra initialization of the two completions is needed but I am not brave enough to remove it after all ;-) >From ad75428b1e5e5127bf7dc6062f880ece11dbdbbf Mon Sep 17 00:00:00 2001 From: Petr Mladek <pmladek@xxxxxxxx> Date: Fri, 28 Aug 2015 15:59:00 +0200 Subject: [PATCH 1/2] ring_buffer: Do no not complete benchmark reader too early It seems that complete(&read_done) might be called too early in some situations. 1st scenario: ------------- CPU0 CPU1 ring_buffer_producer_thread() wake_up_process(consumer); wait_for_completion(&read_start); ring_buffer_consumer_thread() complete(&read_start); ring_buffer_producer() # producing data in # the do-while cycle ring_buffer_consumer(); # reading data # got error # set kill_test = 1; set_current_state( TASK_INTERRUPTIBLE); if (reader_finish) # false schedule(); # producer still in the middle of # do-while cycle if (consumer && !(cnt % wakeup_interval)) wake_up_process(consumer); # spurious wakeup while (!reader_finish && !kill_test) # leaving because # kill_test == 1 reader_finish = 0; complete(&read_done); 1st BANG: We might access uninitialized "read_done" if this is the the first round. # producer finally leaving # the do-while cycle because kill_test == 1; if (consumer) { reader_finish = 1; wake_up_process(consumer); wait_for_completion(&read_done); 2nd BANG: This will never complete because consumer already did the completion. 2nd scenario: ------------- CPU0 CPU1 ring_buffer_producer_thread() wake_up_process(consumer); wait_for_completion(&read_start); ring_buffer_consumer_thread() complete(&read_start); ring_buffer_producer() # CPU3 removes the module <--- difference from # and stops producer <--- the 1st scenario if (kthread_should_stop()) kill_test = 1; ring_buffer_consumer(); while (!reader_finish && !kill_test) # kill_test == 1 => we never go # into the top level while() reader_finish = 0; complete(&read_done); # producer still in the middle of # do-while cycle if (consumer && !(cnt % wakeup_interval)) wake_up_process(consumer); # spurious wakeup while (!reader_finish && !kill_test) # leaving because kill_test == 1 reader_finish = 0; complete(&read_done); BANG: We are in the same "bang" situations as in the 1st scenario. Root of the problem: -------------------- ring_buffer_consumer() must complete "read_done" only when "reader_finish" variable is set. It must not be skipped because of other conditions. Note that we still must keep the check for "reader_finish" in a loop because there might be the spurious wakeup as described in the above scenarios.. Solution: ---------- The top level cycle in ring_buffer_consumer() will finish only when "reader_finish" is set. The data will be read in "while-do" cycle so that they are not read after an error (kill_test == 1) and the spurious wake up. In addition, "reader_finish" is manipulated by the producer thread. Therefore we add READ_ONCE() to make sure that the fresh value is read in each cycle. Also we add the corresponding barrier to synchronize the sleep check. Next we set back TASK_RUNNING state for the situation when we did not sleep. Just from paranoid reasons, we initialize both completions statically. It should be more safe if there is other race that we do not know of. As a side effect we could remove the memory barrier from ring_buffer_producer_thread(). IMHO, this was the reason of the barrier. ring_buffer_reset() uses spin locks that should provide the needed memory barrier for using the buffer. Signed-off-by: Petr Mladek <pmladek@xxxxxxxx> --- kernel/trace/ring_buffer_benchmark.c | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/kernel/trace/ring_buffer_benchmark.c b/kernel/trace/ring_buffer_benchmark.c index a1503a027ee2..045e0a24c2a0 100644 --- a/kernel/trace/ring_buffer_benchmark.c +++ b/kernel/trace/ring_buffer_benchmark.c @@ -24,8 +24,8 @@ struct rb_page { static int wakeup_interval = 100; static int reader_finish; -static struct completion read_start; -static struct completion read_done; +static DECLARE_COMPLETION(read_start); +static DECLARE_COMPLETION(read_done); static struct ring_buffer *buffer; static struct task_struct *producer; @@ -178,10 +178,14 @@ static void ring_buffer_consumer(void) read_events ^= 1; read = 0; - while (!reader_finish && !kill_test) { - int found; + /* + * Always wait until we are asked to finish and the producer + * is ready to wait for the completion. + */ + while (!READ_ONCE(reader_finish)) { + int found = 1; - do { + while (found && !kill_test) { int cpu; found = 0; @@ -195,17 +199,29 @@ static void ring_buffer_consumer(void) if (kill_test) break; + if (stat == EVENT_FOUND) found = 1; + } - } while (found && !kill_test); + } + /* + * Sleep a bit. Producer with wake up us when some more data + * are available or when we should finish reading. + */ set_current_state(TASK_INTERRUPTIBLE); + /* + * Make sure that we read the updated finish variable + * before producer tries to wakeup us. + */ + smp_rmb(); if (reader_finish) break; schedule(); } + __set_current_state(TASK_RUNNING); reader_finish = 0; complete(&read_done); } @@ -389,13 +405,10 @@ static int ring_buffer_consumer_thread(void *arg) static int ring_buffer_producer_thread(void *arg) { - init_completion(&read_start); - while (!kthread_should_stop() && !kill_test) { ring_buffer_reset(buffer); if (consumer) { - smp_wmb(); wake_up_process(consumer); wait_for_completion(&read_start); } -- 1.8.5.6 -- To unsubscribe, send a message with 'unsubscribe linux-mm' in the body to majordomo@xxxxxxxxx. For more info on Linux MM, see: http://www.linux-mm.org/ . Don't email: <a href=mailto:"dont@xxxxxxxxx"> email@xxxxxxxxx </a>