On 09/29, Paul E. McKenney wrote: > > On Sun, Sep 29, 2013 at 08:36:34PM +0200, Oleg Nesterov wrote: > > > > struct xxx_struct { > > atomic_t counter; > > }; > > > > static inline bool xxx_is_idle(struct xxx_struct *xxx) > > { > > return atomic_read(&xxx->counter) == 0; > > } > > > > static inline void xxx_enter(struct xxx_struct *xxx) > > { > > atomic_inc(&xxx->counter); > > synchronize_sched(); > > } > > > > static inline void xxx_enter(struct xxx_struct *xxx) > > { > > synchronize_sched(); > > atomic_dec(&xxx->counter); > > } > > But there is nothing for synchronize_sched() to wait for in the above. > Presumably the caller of xxx_is_idle() is required to disable preemption > or be under rcu_read_lock_sched()? Yes, yes, sure, xxx_is_idle() should be called under preempt_disable(). (or rcu_read_lock() if xxx_enter() uses synchronize_rcu()). > So you are trying to make something that abstracts the RCU-protected > state-change pattern? Or perhaps more accurately, the RCU-protected > state-change-and-back pattern? Yes, exactly. > > struct xxx_struct { > > int gp_state; > > > > int gp_count; > > wait_queue_head_t gp_waitq; > > > > int cb_state; > > struct rcu_head cb_head; > > spinlock_t xxx_lock; /* ? */ See #define xxx_lock gp_waitq.lock in .c below, but we can add another spinlock. > This spinlock might not make the big-system guys happy, but it appears to > be needed below. Only the writers use this spinlock, and they should synchronize with each other anyway. I don't think this can really penalize, say, percpu_down_write or cpu_hotplug_begin. > > // .c ----------------------------------------------------------------------- > > > > enum { GP_IDLE = 0, GP_PENDING, GP_PASSED }; > > > > enum { CB_IDLE = 0, CB_PENDING, CB_REPLAY }; > > > > #define xxx_lock gp_waitq.lock > > > > void xxx_enter(struct xxx_struct *xxx) > > { > > bool need_wait, need_sync; > > > > spin_lock_irq(&xxx->xxx_lock); > > need_wait = xxx->gp_count++; > > need_sync = xxx->gp_state == GP_IDLE; > > Suppose ->gp_state is GP_PASSED. It could transition to GP_IDLE at any > time, right? As you already pointed below - no. Once we incremented ->nr_writers, nobody can set GP_IDLE. And if the caller is the "first" writer (need_sync == T) nobody else can change ->gp_state, so xxx_enter() sets GP_PASSED lockless. > > if (need_sync) > > xxx->gp_state = GP_PENDING; > > spin_unlock_irq(&xxx->xxx_lock); > > > > BUG_ON(need_wait && need_sync); > > > > } if (need_sync) { > > synchronize_sched(); > > xxx->gp_state = GP_PASSED; > > wake_up_all(&xxx->gp_waitq); > > } else if (need_wait) { > > wait_event(&xxx->gp_waitq, xxx->gp_state == GP_PASSED); > > Suppose the wakeup is delayed until after the state has been updated > back to GP_IDLE? Ah, presumably the non-zero ->gp_count prevents this. Yes, exactly. > > static void cb_rcu_func(struct rcu_head *rcu) > > { > > struct xxx_struct *xxx = container_of(rcu, struct xxx_struct, cb_head); > > long flags; > > > > BUG_ON(xxx->gp_state != GP_PASSED); > > BUG_ON(xxx->cb_state == CB_IDLE); > > > > spin_lock_irqsave(&xxx->xxx_lock, flags); > > if (xxx->gp_count) { > > xxx->cb_state = CB_IDLE; > > } else if (xxx->cb_state == CB_REPLAY) { > > xxx->cb_state = CB_PENDING; > > call_rcu_sched(&xxx->cb_head, cb_rcu_func); > > } else { > > xxx->cb_state = CB_IDLE; > > xxx->gp_state = GP_IDLE; > > } > > It took me a bit to work out the above. It looks like the intent is > to have the last xxx_exit() put the state back to GP_IDLE, which appears > to be the state in which readers can use a fastpath. Yes, and we we offload this work to rcu callback so xxx_exit() doesn't block. The only complication is the next writer which does xxx_enter() after xxx_exit(). If there are no other writers, the next xxx_exit() should do rcu_cancel(&xxx->cb_head); call_rcu_sched(&xxx->cb_head, cb_rcu_func); to "extend" the gp, but since we do not have rcu_cancel() it simply sets CB_REPLAY to instruct cb_rcu_func() to reschedule itself. > This works because if ->gp_count is non-zero and ->cb_state is CB_IDLE, > there must be an xxx_exit() in our future. Yes, but ->cb_state doesn't really matter if ->gp_count != 0 in xxx_exit() or cb_rcu_func() (except it can't be CB_IDLE in cb_rcu_func). > > void xxx_exit(struct xxx_struct *xxx) > > { > > spin_lock_irq(&xxx->xxx_lock); > > if (!--xxx->gp_count) { > > if (xxx->cb_state == CB_IDLE) { > > xxx->cb_state = CB_PENDING; > > call_rcu_sched(&xxx->cb_head, cb_rcu_func); > > } else if (xxx->cb_state == CB_PENDING) { > > xxx->cb_state = CB_REPLAY; > > } > > } > > spin_unlock_irq(&xxx->xxx_lock); > > } > > Then we also have something like this? > > bool xxx_readers_fastpath_ok(struct xxx_struct *xxx) > { > BUG_ON(!rcu_read_lock_sched_held()); > return xxx->gp_state == GP_IDLE; > } Yes, this is what xxx_is_idle() does (ignoring BUG_ON). It actually checks xxx->gp_state == 0, this is just to avoid the unnecessary export of GP_* enum. Oleg. -- To unsubscribe, send a message with 'unsubscribe linux-mm' in the body to majordomo@xxxxxxxxx. For more info on Linux MM, see: http://www.linux-mm.org/ . Don't email: <a href=mailto:"dont@xxxxxxxxx"> email@xxxxxxxxx </a>