On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@xxxxxxxxx wrote: [...] (Some more comments/questions for the MP implementation...) > +static inline int ring_mp_put(Ring *ring, void *data) > +{ > + unsigned int index, in, in_next, out; > + > + do { > + in = atomic_read(&ring->in); > + out = atomic_read(&ring->out); [0] Do we need to fetch "out" with load_acquire()? Otherwise what's the pairing of below store_release() at [1]? This barrier exists in SP-SC case which makes sense to me, I assume that's also needed for MP-SC case, am I right? > + > + if (__ring_is_full(ring, in, out)) { > + if (atomic_read(&ring->in) == in && > + atomic_read(&ring->out) == out) { Why read again? After all the ring API seems to be designed as non-blocking. E.g., I see the poll at [2] below makes more sense since when reaches [2] it means that there must be a producer that is _doing_ the queuing, so polling is very possible to complete fast. However here it seems to be a pure busy poll without any hint. Then not sure whether we should just let the caller decide whether it wants to call ring_put() again. > + return -ENOBUFS; > + } > + > + /* a entry has been fetched out, retry. */ > + continue; > + } > + > + in_next = in + 1; > + } while (atomic_cmpxchg(&ring->in, in, in_next) != in); > + > + index = ring_index(ring, in); > + > + /* > + * smp_rmb() paired with the memory barrier of (A) in ring_mp_get() > + * is implied in atomic_cmpxchg() as we should read ring->out first > + * before fetching the entry, otherwise this assert will fail. Thanks for all these comments! These are really helpful for reviewers. However I'm not sure whether I understand it correctly here on MB of (A) for ring_mp_get() - AFAIU that should corresponds to a smp_rmb() at [0] above when reading the "out" variable rather than this assertion, and that's why I thought at [0] we should have something like a load_acquire() there (which contains a rmb()). >From content-wise, I think the code here is correct, since atomic_cmpxchg() should have one implicit smp_mb() after all so we don't need anything further barriers here. > + */ > + assert(!atomic_read(&ring->data[index])); > + > + /* > + * smp_mb() paired with the memory barrier of (B) in ring_mp_get() is > + * implied in atomic_cmpxchg(), that is needed here as we should read > + * ring->out before updating the entry, it is the same as we did in > + * __ring_put(). > + * > + * smp_wmb() paired with the memory barrier of (C) in ring_mp_get() > + * is implied in atomic_cmpxchg(), that is needed as we should increase > + * ring->in before updating the entry. > + */ > + atomic_set(&ring->data[index], data); > + > + return 0; > +} > + > +static inline void *ring_mp_get(Ring *ring) > +{ > + unsigned int index, in; > + void *data; > + > + do { > + in = atomic_read(&ring->in); > + > + /* > + * (C) should read ring->in first to make sure the entry pointed by this > + * index is available > + */ > + smp_rmb(); > + > + if (!__ring_is_empty(in, ring->out)) { > + break; > + } > + > + if (atomic_read(&ring->in) == in) { > + return NULL; > + } > + /* new entry has been added in, retry. */ > + } while (1); > + > + index = ring_index(ring, ring->out); > + > + do { > + data = atomic_read(&ring->data[index]); > + if (data) { > + break; > + } > + /* the producer is updating the entry, retry */ > + cpu_relax(); [2] > + } while (1); > + > + atomic_set(&ring->data[index], NULL); > + > + /* > + * (B) smp_mb() is needed as we should read the entry out before > + * updating ring->out as we did in __ring_get(). > + * > + * (A) smp_wmb() is needed as we should make the entry be NULL before > + * updating ring->out (which will make the entry be visible and usable). > + */ > + atomic_store_release(&ring->out, ring->out + 1); [1] > + > + return data; > +} > + > +static inline int ring_put(Ring *ring, void *data) > +{ > + if (ring->flags & RING_MULTI_PRODUCER) { > + return ring_mp_put(ring, data); > + } > + return __ring_put(ring, data); > +} > + > +static inline void *ring_get(Ring *ring) > +{ > + if (ring->flags & RING_MULTI_PRODUCER) { > + return ring_mp_get(ring); > + } > + return __ring_get(ring); > +} > +#endif > -- > 2.14.4 > Thanks, -- Peter Xu