On Thu, May 16, 2019 at 10:57:57AM +0200, Roman Penyaev wrote: > When new event comes for some epoll item kernel does the following: > > struct epoll_uitem *uitem; > > /* Each item has a bit (index in user items array), discussed later */ > uitem = user_header->items[epi->bit]; > > if (!atomic_fetch_or(uitem->ready_events, pollflags)) { > i = atomic_add(&ep->user_header->tail, 1); So this is where you increment tail > > item_idx = &user_index[i & index_mask]; > > /* Signal with a bit, user spins on index expecting value > 0 */ > *item_idx = idx + 1; IUC, this is where you write the idx into shared memory, which is _after_ tail has already been incremented. > } > > Important thing here is that ring can't infinitely grow and corrupt other > elements, because kernel always checks that item was marked as ready, so > userspace has to clear ready_events field. > > On userside events the following code should be used in order to consume > events: > > tail = READ_ONCE(header->tail); > for (i = 0; header->head != tail; header->head++) { > item_idx_ptr = &index[idx & indeces_mask]; > > /* > * Spin here till we see valid index > */ > while (!(idx = __atomic_load_n(item_idx_ptr, __ATOMIC_ACQUIRE))) > ; Which you then try and fix up by busy waiting for @idx to become !0 ?! Why not write the idx first, then increment the ->tail, such that when we see ->tail, we already know idx must be correct? > > item = &header->items[idx - 1]; > > /* > * Mark index as invalid, that is for userspace only, kernel does not care > * and will refill this pointer only when observes that event is cleared, > * which happens below. > */ > *item_idx_ptr = 0; That avoids this store too. > > /* > * Fetch data first, if event is cleared by the kernel we drop the data > * returning false. > */ > event->data = item->event.data; > event->events = __atomic_exchange_n(&item->ready_events, 0, > __ATOMIC_RELEASE); > > } Aside from that, you have to READ/WRITE_ONCE() on ->head, to avoid load/store tearing. That would give something like: kernel: slot = atomic_fetch_inc(&ep->slot); item_idx = &user_index[slot & idx_mask]; WRITE_ONCE(*item_idx, idx); smp_store_release(&ep->user_header->tail, slot); userspace: tail = smp_load_acquire(&header->tail); for (head = READ_ONCE(header->head); head != tail; head++) { idx = READ_ONCE(index[head & idx_mask]); itemp = &header->items[idx]; ... } smp_store_release(&header->head, head);