----- Original Message ----- > From: "jasowang" <jasowang@xxxxxxxxxx> > To: "Guo Zhi" <qtxuning1999@xxxxxxxxxxx>, "eperezma" <eperezma@xxxxxxxxxx>, "sgarzare" <sgarzare@xxxxxxxxxx>, "Michael > Tsirkin" <mst@xxxxxxxxxx> > Cc: "netdev" <netdev@xxxxxxxxxxxxxxx>, "linux-kernel" <linux-kernel@xxxxxxxxxxxxxxx>, "kvm list" <kvm@xxxxxxxxxxxxxxx>, > "virtualization" <virtualization@xxxxxxxxxxxxxxxxxxxxxxxxxx> > Sent: Thursday, August 25, 2022 3:44:41 PM > Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring > 在 2022/8/17 21:57, Guo Zhi 写道: >> If in order feature negotiated, we can skip the used ring to get >> buffer's desc id sequentially. >> >> Signed-off-by: Guo Zhi <qtxuning1999@xxxxxxxxxxx> >> --- >> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------ >> 1 file changed, 45 insertions(+), 8 deletions(-) >> >> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c >> index 1c1b3fa376a2..143184ebb5a1 100644 >> --- a/drivers/virtio/virtio_ring.c >> +++ b/drivers/virtio/virtio_ring.c >> @@ -144,6 +144,9 @@ struct vring_virtqueue { >> /* DMA address and size information */ >> dma_addr_t queue_dma_addr; >> size_t queue_size_in_bytes; >> + >> + /* In order feature batch begin here */ > > > We need tweak the comment, it's not easy for me to understand the > meaning here. > How about this: if in_order feature is negotiated, is the next head to consume. > >> + u16 next_desc_begin; >> } split; >> >> /* Available for packed ring */ >> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, >> unsigned int head, >> } >> >> vring_unmap_one_split(vq, i); >> - vq->split.desc_extra[i].next = vq->free_head; >> - vq->free_head = head; >> + /* In order feature use desc in order, >> + * that means, the next desc will always be free >> + */ > > > Maybe we should add something like "The descriptors are prepared in order". > I will change it to this: The descriptors are made available in order if the in order feature is used. Since the free_head is already a circular list, it must consume it sequentially. > >> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) { >> + vq->split.desc_extra[i].next = vq->free_head; >> + vq->free_head = head; >> + } >> >> /* Plus final descriptor */ >> vq->vq.num_free++; >> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue >> *_vq, >> { >> struct vring_virtqueue *vq = to_vvq(_vq); >> void *ret; >> - unsigned int i; >> + unsigned int i, j; >> u16 last_used; >> >> START_USE(vq); >> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue >> *_vq, >> /* Only get used array entries after they have been exposed by host. */ >> virtio_rmb(vq->weak_barriers); >> >> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1)); >> - i = virtio32_to_cpu(_vq->vdev, >> - vq->split.vring.used->ring[last_used].id); >> - *len = virtio32_to_cpu(_vq->vdev, >> - vq->split.vring.used->ring[last_used].len); >> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) { >> + /* Skip used ring and get used desc in order*/ >> + i = vq->split.next_desc_begin; >> + j = i; >> + /* Indirect only takes one descriptor in descriptor table */ >> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT)) >> + j = (j + 1) % vq->split.vring.num; > > > Let's move the expensive mod outside the loop. Or it's split so we can > use and here actually since the size is guaranteed to be power of the > two? Another question, is it better to store the next_desc in e.g > desc_extra? Thanks, I will use bit operation instead of mod. We only use one next_desc_begin at the same time, there is no need to store it in desc_extra. > > And this seems very expensive if the device doesn't do the batching > (which is not mandatory). > > We will judge whether the device batched the buffer or not, we will only use this way for the batched buffer. And Not in order is more expensive, because is following a linked list. >> + /* move to next */ >> + j = (j + 1) % vq->split.vring.num; >> + /* Next buffer will use this descriptor in order */ >> + vq->split.next_desc_begin = j; >> + if (!vq->indirect) { >> + *len = vq->split.desc_extra[i].len; >> + } else { >> + struct vring_desc *indir_desc = >> + vq->split.desc_state[i].indir_desc; >> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0; >> + >> + if (indir_desc) { >> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++) >> + buffer_len += indir_desc[j].len; > > > So I think we need to finalize this, then we can have much more stress > on the cache: > > https://lkml.org/lkml/2021/10/26/1300 > > It was reverted since it's too aggressive, we should instead: > > 1) do the validation only for morden device > > 2) fail only when we enable the validation via (e.g a module parameter). > > Thanks > > >> + } >> + >> + *len = buffer_len; >> + } >> + } else { >> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1)); >> + i = virtio32_to_cpu(_vq->vdev, >> + vq->split.vring.used->ring[last_used].id); >> + *len = virtio32_to_cpu(_vq->vdev, >> + vq->split.vring.used->ring[last_used].len); >> + } >> >> if (unlikely(i >= vq->split.vring.num)) { >> BAD_RING(vq, "id %u out of range\n", i); >> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int >> index, >> vq->split.avail_flags_shadow = 0; >> vq->split.avail_idx_shadow = 0; >> >> + vq->split.next_desc_begin = 0; >> + >> /* No callback? Tell other side not to bother us. */ >> if (!callback) { >> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;