On Thu, Oct 24, 2019 at 11:49 AM David Howells <dhowells@xxxxxxxxxx> wrote: > > Convert pipes to use head and tail pointers for the buffer ring rather than > pointer and length as the latter requires two atomic ops to update (or a > combined op) whereas the former only requires one. > > (1) The head pointer is the point at which production occurs and points to > the slot in which the next buffer will be placed. This is equivalent > to pipe->curbuf + pipe->nrbufs. > > The head pointer belongs to the write-side. > > (2) The tail pointer is the point at which consumption occurs. It points > to the next slot to be consumed. This is equivalent to pipe->curbuf. > > The tail pointer belongs to the read-side. > > (3) head and tail are allowed to run to UINT_MAX and wrap naturally. They > are only masked off when the array is being accessed, e.g.: > > pipe->bufs[head & mask] > > This means that it is not necessary to have a dead slot in the ring as > head == tail isn't ambiguous. > > (4) The ring is empty if "head == tail". > > A helper, pipe_empty(), is provided for this. > > (5) The occupancy of the ring is "head - tail". > > A helper, pipe_occupancy(), is provided for this. > > (6) The number of free slots in the ring is "pipe->ring_size - occupancy". > > A helper, pipe_space_for_user() is provided to indicate how many slots > userspace may use. > > (7) The ring is full if "head - tail >= pipe->ring_size". > > A helper, pipe_full(), is provided for this. > > Signed-off-by: David Howells <dhowells@xxxxxxxxxx> > --- > > fs/fuse/dev.c | 31 +++-- > fs/pipe.c | 169 ++++++++++++++++------------- > fs/splice.c | 188 ++++++++++++++++++++------------ > include/linux/pipe_fs_i.h | 86 ++++++++++++++- > include/linux/uio.h | 4 - > lib/iov_iter.c | 266 +++++++++++++++++++++++++-------------------- > 6 files changed, 464 insertions(+), 280 deletions(-) > > diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c > index dadd617d826c..1e4bc27573cc 100644 > --- a/fs/fuse/dev.c > +++ b/fs/fuse/dev.c > @@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs) > cs->pipebufs++; > cs->nr_segs--; > } else { > - if (cs->nr_segs == cs->pipe->buffers) > + if (cs->nr_segs >= cs->pipe->ring_size) > return -EIO; > > page = alloc_page(GFP_HIGHUSER); > @@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page, > struct pipe_buffer *buf; > int err; > > - if (cs->nr_segs == cs->pipe->buffers) > + if (cs->nr_segs >= cs->pipe->ring_size) > return -EIO; > > err = unlock_request(cs->req); > @@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, > if (!fud) > return -EPERM; > > - bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer), > + bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer), > GFP_KERNEL); > if (!bufs) > return -ENOMEM; > @@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos, > if (ret < 0) > goto out; > > - if (pipe->nrbufs + cs.nr_segs > pipe->buffers) { > + if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) { > ret = -EIO; > goto out; > } > @@ -1935,6 +1935,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > struct file *out, loff_t *ppos, > size_t len, unsigned int flags) > { > + unsigned int head, tail, mask, count; > unsigned nbuf; > unsigned idx; > struct pipe_buffer *bufs; > @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > > pipe_lock(pipe); > > - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer), > - GFP_KERNEL); > + head = pipe->head; > + tail = pipe->tail; > + mask = pipe->ring_size - 1; > + count = head - tail; > + > + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL); > if (!bufs) { > pipe_unlock(pipe); > return -ENOMEM; > @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > > nbuf = 0; > rem = 0; > - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++) > - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len; > + for (idx = tail; idx < head && rem < len; idx++) > + rem += pipe->bufs[idx & mask].len; > > ret = -EINVAL; > if (rem < len) > @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe, > struct pipe_buffer *ibuf; > struct pipe_buffer *obuf; > > - BUG_ON(nbuf >= pipe->buffers); > - BUG_ON(!pipe->nrbufs); > - ibuf = &pipe->bufs[pipe->curbuf]; > + BUG_ON(nbuf >= pipe->ring_size); > + BUG_ON(tail == head); > + ibuf = &pipe->bufs[tail & mask]; > obuf = &bufs[nbuf]; > > if (rem >= ibuf->len) { > *obuf = *ibuf; > ibuf->ops = NULL; > - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1); > - pipe->nrbufs--; > + tail++; > + pipe_commit_read(pipe, tail); > } else { > if (!pipe_buf_get(pipe, ibuf)) > goto out_free; > diff --git a/fs/pipe.c b/fs/pipe.c > index 8a2ab2f974bd..8a0806fe12d3 100644 > --- a/fs/pipe.c > +++ b/fs/pipe.c > @@ -43,10 +43,11 @@ unsigned long pipe_user_pages_hard; > unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR; > > /* > - * We use a start+len construction, which provides full use of the > - * allocated memory. > - * -- Florian Coosmann (FGC) > - * > + * We use head and tail indices that aren't masked off, except at the point of > + * dereference, but rather they're allowed to wrap naturally. This means there > + * isn't a dead spot in the buffer, provided the ring size < INT_MAX. > + * -- David Howells 2019-09-23. Hi David, Is "ring size < INT_MAX" constraint correct? I've never had to implement this free running indices scheme, but the way I've always visualized it is that the top bit of the index is used as a lap (as in a race) indicator, leaving 31 bits to work with (in case of unsigned ints). Should that be ring size <= 2^31 or more precisely ring size is a power of two <= 2^31 or am I missing something? Thanks, Ilya