On 15/10/2019 23.48, David Howells wrote: > Convert pipes to use head and tail pointers for the buffer ring rather than > pointer and length as the latter requires two atomic ops to update (or a > combined op) whereas the former only requires one. > > (1) The head pointer is the point at which production occurs and points to > the slot in which the next buffer will be placed. This is equivalent > to pipe->curbuf + pipe->nrbufs. > > The head pointer belongs to the write-side. > > (2) The tail pointer is the point at which consumption occurs. It points > to the next slot to be consumed. This is equivalent to pipe->curbuf. > > The tail pointer belongs to the read-side. > > (3) head and tail are allowed to run to UINT_MAX and wrap naturally. They > are only masked off when the array is being accessed, e.g.: > > pipe->bufs[head & mask] > > This means that it is not necessary to have a dead slot in the ring as > head == tail isn't ambiguous. > > (4) The ring is empty if "head == tail". > > (5) The occupancy of the ring is "head - tail". > > (6) The number of free slots in the ring is "(tail + pipe->ring_size) - > head". Seems an odd way of writing pipe->ring_size - (head - tail) ; i.e. obviously #free slots is #size minus #occupancy. > (7) The ring is full if "head >= (tail + pipe->ring_size)", which can also > be written as "head - tail >= pipe->ring_size". > No it cannot, it _must_ be written in the latter form. Assuming sizeof(int)==1 for simplicity, consider ring_size = 16, tail = 240. Regardless whether head is 240, 241, ..., 255, 0, tail + ring_size wraps to 0, so the former expression states the ring is full in all cases. Better spell out somewhere that while head and tail are free-running, at any point in time they satisfy the invariant head - tail <= pipe_size (and also 0 <= head - tail, but that's a tautology for unsigned ints...). Then it's a matter of taste if one wants to write "full" as head-tail == pipe_size or head-tail >= pipe_size. > Also split pipe->buffers into pipe->ring_size (which indicates the size of the > ring) and pipe->max_usage (which restricts the amount of ring that write() is > allowed to fill). This allows for a pipe that is both writable by the kernel > notification facility and by userspace, allowing plenty of ring space for > notifications to be added whilst preventing userspace from being able to use > up too much buffer space. That seems like something that should be added in a separate patch - adding ->max_usage and switching appropriate users of ->ring_size over, so it's more clear where you're using one or the other. > @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > > pipe_lock(pipe); > > - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer), > - GFP_KERNEL); > + head = pipe->head; > + tail = pipe->tail; > + mask = pipe->ring_size - 1; > + count = head - tail; > + > + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL); > if (!bufs) { > pipe_unlock(pipe); > return -ENOMEM; > @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > > nbuf = 0; > rem = 0; > - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++) > - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len; > + for (idx = tail; idx < head && rem < len; idx++) > + rem += pipe->bufs[idx & mask].len; > > ret = -EINVAL; > if (rem < len) > @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > struct pipe_buffer *ibuf; > struct pipe_buffer *obuf; > > - BUG_ON(nbuf >= pipe->buffers); > - BUG_ON(!pipe->nrbufs); > - ibuf = &pipe->bufs[pipe->curbuf]; > + BUG_ON(nbuf >= pipe->ring_size); > + BUG_ON(tail == head); > + ibuf = &pipe->bufs[tail]; I don't see where tail gets masked between tail = pipe->tail; above and here, but I may be missing it. In any case, how about seeding head and tail with something like 1<<20 when creating the pipe so bugs like that are hit more quickly. > @@ -515,17 +525,19 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) > static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg) > { > struct pipe_inode_info *pipe = filp->private_data; > - int count, buf, nrbufs; > + int count, head, tail, mask; > > switch (cmd) { > case FIONREAD: > __pipe_lock(pipe); > count = 0; > - buf = pipe->curbuf; > - nrbufs = pipe->nrbufs; > - while (--nrbufs >= 0) { > - count += pipe->bufs[buf].len; > - buf = (buf+1) & (pipe->buffers - 1); > + head = pipe->head; > + tail = pipe->tail; > + mask = pipe->ring_size - 1; > + > + while (tail < head) { > + count += pipe->bufs[tail & mask].len; > + tail++; > } This is broken if head has wrapped but tail has not. It has to be "while (head - tail)" or perhaps just "while (tail != head)" or something along those lines. > @@ -1086,17 +1104,21 @@ static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg) > } > > /* > - * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't > - * expect a lot of shrink+grow operations, just free and allocate > - * again like we would do for growing. If the pipe currently > + * We can shrink the pipe, if arg is greater than the ring occupancy. > + * Since we don't expect a lot of shrink+grow operations, just free and > + * allocate again like we would do for growing. If the pipe currently > * contains more buffers than arg, then return busy. > */ > - if (nr_pages < pipe->nrbufs) { > + mask = pipe->ring_size - 1; > + head = pipe->head & mask; > + tail = pipe->tail & mask; > + n = pipe->head - pipe->tail; I think it's confusing to "premask" head and tail here. Can you either drop that (pipe_set_size should hardly be a hot path?), or perhaps call them something else to avoid a future reader seeing an unmasked bufs[head] and thinking that's a bug? > @@ -1254,9 +1290,10 @@ static ssize_t pipe_get_pages(struct iov_iter *i, > struct page **pages, size_t maxsize, unsigned maxpages, > size_t *start) > { > + unsigned int p_tail; > + unsigned int i_head; > unsigned npages; > size_t capacity; > - int idx; > > if (!maxsize) > return 0; > @@ -1264,12 +1301,15 @@ static ssize_t pipe_get_pages(struct iov_iter *i, > if (!sanity(i)) > return -EFAULT; > > - data_start(i, &idx, start); > - /* some of this one + all after this one */ > - npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1; > - capacity = min(npages,maxpages) * PAGE_SIZE - *start; > + data_start(i, &i_head, start); > + p_tail = i->pipe->tail; > + /* Amount of free space: some of this one + all after this one */ > + npages = (p_tail + i->pipe->ring_size) - i_head; Hm, it's not clear that this is equivalent to the old computation. Since it seems repeated in a few places, could it be factored to a little helper (before this patch) and the "some of this one + all after this one" comment perhaps expanded to explain what is going on? Rasmus