Re: [RFC v2 3/5] vhost/vsock: add support for vhost dgram.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Sep 14, 2021 at 05:54:36AM +0000, Jiang Wang wrote:
> This patch supports dgram on vhost side, including
> tx and rx. The vhost send packets asynchronously.

how exactly is fairness ensured? what prevents one socket
from monopolizing the device? spec proposal seems to
leave it to the implementation.

> Also, ignore vq errors when vq number is larger than 2,
> so it will be comptaible with old versions.

this needs more explanation.

> Signed-off-by: Jiang Wang <jiang.wang@xxxxxxxxxxxxx>



> ---
>  drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
>  1 file changed, 189 insertions(+), 28 deletions(-)
> 
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index c79789af0365..a8755cbebd40 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -28,7 +28,10 @@
>   * small pkts.
>   */
>  #define VHOST_VSOCK_PKT_WEIGHT 256
> +#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128
>  
> +/* Max wait time in busy poll in microseconds */
> +#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20

While adding busy polling isn't out of the question, I would think
it should be kept separate from initial dgram support.


>  enum {
>  	VHOST_VSOCK_FEATURES = VHOST_FEATURES |
>  			       (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
> @@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);
>  
>  struct vhost_vsock {
>  	struct vhost_dev dev;
> -	struct vhost_virtqueue vqs[2];
> +	struct vhost_virtqueue vqs[4];
>  
>  	/* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
>  	struct hlist_node hash;
> @@ -55,6 +58,11 @@ struct vhost_vsock {
>  	spinlock_t send_pkt_list_lock;
>  	struct list_head send_pkt_list;	/* host->guest pending packets */
>  
> +	spinlock_t dgram_send_pkt_list_lock;
> +	struct list_head dgram_send_pkt_list;	/* host->guest pending packets */
> +	struct vhost_work dgram_send_pkt_work;
> +	int  dgram_used; /*pending packets to be send */

to be sent?

> +
>  	atomic_t queued_replies;
>  
>  	u32 guest_cid;
> @@ -92,10 +100,22 @@ static void
>  vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  			    struct vhost_virtqueue *vq)
>  {
> -	struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +	struct vhost_virtqueue *tx_vq;
>  	int pkts = 0, total_len = 0;
>  	bool added = false;
>  	bool restart_tx = false;
> +	spinlock_t *lock;
> +	struct list_head *send_pkt_list;
> +
> +	if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
> +		tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +		lock = &vsock->send_pkt_list_lock;
> +		send_pkt_list = &vsock->send_pkt_list;
> +	} else {
> +		tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +		lock = &vsock->dgram_send_pkt_list_lock;
> +		send_pkt_list = &vsock->dgram_send_pkt_list;
> +	}
>  
>  	mutex_lock(&vq->mutex);
>  
> @@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		size_t iov_len, payload_len;
>  		int head;
>  		bool restore_flag = false;
> +		bool is_dgram = false;
>  
> -		spin_lock_bh(&vsock->send_pkt_list_lock);
> -		if (list_empty(&vsock->send_pkt_list)) {
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_lock_bh(lock);
> +		if (list_empty(send_pkt_list)) {
> +			spin_unlock_bh(lock);
>  			vhost_enable_notify(&vsock->dev, vq);
>  			break;
>  		}
>  
> -		pkt = list_first_entry(&vsock->send_pkt_list,
> +		pkt = list_first_entry(send_pkt_list,
>  				       struct virtio_vsock_pkt, list);
>  		list_del_init(&pkt->list);
> -		spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_unlock_bh(lock);
> +
> +		if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
> +			is_dgram = true;
>  
>  		head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
>  					 &out, &in, NULL, NULL);
>  		if (head < 0) {
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  			break;
>  		}
>  
>  		if (head == vq->num) {
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			if (is_dgram) {
> +				virtio_transport_free_pkt(pkt);
> +				vq_err(vq, "Dgram virtqueue is full!");
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +				break;
> +			}
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  
>  			/* We cannot finish yet if more buffers snuck in while
> -			 * re-enabling notify.
> -			 */
> +			* re-enabling notify.
> +			*/

looks like you are breaking alignment of comments...

>  			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>  				vhost_disable_notify(&vsock->dev, vq);
>  				continue;
> @@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (out) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Expected 0 output buffers, got %u\n", out);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
> +
>  			break;
>  		}
>  
> @@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (iov_len < sizeof(pkt->hdr)) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
> +			break;
> +		}
> +
> +		if (iov_len < pkt->len - pkt->off &&
> +			vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
> +			virtio_transport_free_pkt(pkt);
> +			vq_err(vq, "Buffer len [%zu] too small for dgram\n", iov_len);
>  			break;
>  		}
>  
> @@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (nbytes != sizeof(pkt->hdr)) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Faulted on copying pkt hdr\n");
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
>  			break;
>  		}
>  
> @@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		/* If we didn't send all the payload we can requeue the packet
>  		 * to send it with the next available buffer.
>  		 */
> -		if (pkt->off < pkt->len) {
> +		if ((pkt->off < pkt->len)
> +			&& (vq == &vsock->vqs[VSOCK_VQ_RX])) {

&& on the previous line and no need for extra ().

>  			if (restore_flag)
>  				pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
> -

no need to drop this imho.

>  			/* We are queueing the same virtio_vsock_pkt to handle
>  			 * the remaining bytes, and we want to deliver it
>  			 * to monitoring devices in the next iteration.
>  			 */
>  			pkt->tap_delivered = false;
>  
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  		} else {
>  			if (pkt->reply) {
>  				int val;
> @@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  			}
>  
>  			virtio_transport_free_pkt(pkt);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
>  		}
>  	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>  	if (added)
> @@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct vhost_work *work)
>  	vhost_transport_do_send_pkt(vsock, vq);
>  }
>  
> +static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
> +{
> +	struct vhost_virtqueue *vq;
> +	struct vhost_vsock *vsock;
> +
> +	vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
> +	vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +
> +	vhost_transport_do_send_pkt(vsock, vq);
> +}
> +
>  static int
>  vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  {
>  	struct vhost_vsock *vsock;
>  	int len = pkt->len;
> +	spinlock_t *lock;
> +	struct list_head *send_pkt_list;
> +	struct vhost_work *work;
>  
>  	rcu_read_lock();
>  
> @@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  		return -ENODEV;
>  	}
>  
> +	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
> +	     le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
> +		lock = &vsock->send_pkt_list_lock;
> +		send_pkt_list = &vsock->send_pkt_list;
> +		work = &vsock->send_pkt_work;
> +	} else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +		lock = &vsock->dgram_send_pkt_list_lock;
> +		send_pkt_list = &vsock->dgram_send_pkt_list;
> +		work = &vsock->dgram_send_pkt_work;
> +	} else {
> +		rcu_read_unlock();
> +		virtio_transport_free_pkt(pkt);
> +		return -EINVAL;
> +	}
> +
> +
>  	if (pkt->reply)
>  		atomic_inc(&vsock->queued_replies);
>  
> -	spin_lock_bh(&vsock->send_pkt_list_lock);
> -	list_add_tail(&pkt->list, &vsock->send_pkt_list);
> -	spin_unlock_bh(&vsock->send_pkt_list_lock);
> +	spin_lock_bh(lock);
> +	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +		if (vsock->dgram_used  == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
> +			len = -ENOMEM;
> +		else {
> +			vsock->dgram_used++;
> +			list_add_tail(&pkt->list, send_pkt_list);
> +		}
> +	} else
> +		list_add_tail(&pkt->list, send_pkt_list);
>  
> -	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +	spin_unlock_bh(lock);
> +
> +	vhost_work_queue(&vsock->dev, work);
>  
>  	rcu_read_unlock();
>  	return len;
> @@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 remote_cid)
>  	return seqpacket_allow;
>  }
>  
> +static inline unsigned long busy_clock(void)
> +{
> +	return local_clock() >> 10;
> +}
> +
> +static bool vhost_can_busy_poll(unsigned long endtime)
> +{
> +	return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
> +		      !signal_pending(current));
> +}
> +
> +
>  static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  {
>  	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
> @@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  	int head, pkts = 0, total_len = 0;
>  	unsigned int out, in;
>  	bool added = false;
> +	unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
> +	unsigned long endtime;
>  
>  	mutex_lock(&vq->mutex);
>  
> @@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  	if (!vq_meta_prefetch(vq))
>  		goto out;
>  
> +	endtime = busy_clock() + busyloop_timeout;
>  	vhost_disable_notify(&vsock->dev, vq);
> +	preempt_disable();

That's quite a bit of work done with preempt disabled. Why?
What's going on here?

>  	do {
>  		u32 len;
>  
> -		if (!vhost_vsock_more_replies(vsock)) {
> +		if (vq == &vsock->vqs[VSOCK_VQ_TX]
> +			&& !vhost_vsock_more_replies(vsock)) {
>  			/* Stop tx until the device processes already
>  			 * pending replies.  Leave tx virtqueue
>  			 * callbacks disabled.
> @@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  			break;
>  
>  		if (head == vq->num) {
> +			if (vhost_can_busy_poll(endtime)) {
> +				cpu_relax();
> +				continue;
> +			}
> +
>  			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>  				vhost_disable_notify(&vsock->dev, vq);
>  				continue;
> @@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  		total_len += len;
>  		added = true;
>  	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
> +	preempt_enable();
>  
>  no_more_replies:
>  	if (added)
> @@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>  
>  		if (!vhost_vq_access_ok(vq)) {
>  			ret = -EFAULT;
> +			/* when running with old guest and qemu, vq 2 may
> +			 * not exist, so return 0 in this case.

Can't the new stuff be gated by a feature bit as usual?

> +			 */
> +			if (i == 2) {
> +				mutex_unlock(&vq->mutex);
> +				break;
> +			}
>  			goto err_vq;
>  		}
>  
>  		if (!vhost_vq_get_backend(vq)) {
>  			vhost_vq_set_backend(vq, vsock);
>  			ret = vhost_vq_init_access(vq);
> -			if (ret)
> -				goto err_vq;
> +			if (ret) {
> +				mutex_unlock(&vq->mutex);
> +				/* when running with old guest and qemu, vq 2 may
> +				 * not exist, so return 0 in this case.
> +				 */
> +				if (i == 2) {
> +					mutex_unlock(&vq->mutex);
> +					break;
> +				}
> +				goto err;
> +			}
>  		}
>  
>  		mutex_unlock(&vq->mutex);
> @@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>  	 * let's kick the send worker to send them.
>  	 */
>  	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +	vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);
>  
>  	mutex_unlock(&vsock->dev.mutex);
>  	return 0;
> @@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
>  
>  	vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
>  	vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
> +	vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +	vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
>  	vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
>  	vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
> +	vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
> +						vhost_vsock_handle_tx_kick;
> +	vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
> +						vhost_vsock_handle_rx_kick;
>  
>  	vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
>  		       UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
> @@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
>  	spin_lock_init(&vsock->send_pkt_list_lock);
>  	INIT_LIST_HEAD(&vsock->send_pkt_list);
>  	vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
> +	spin_lock_init(&vsock->dgram_send_pkt_list_lock);
> +	INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
> +	vhost_work_init(&vsock->dgram_send_pkt_work,
> +			vhost_transport_dgram_send_pkt_work);
> +
>  	return 0;
>  
>  out:
> @@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, struct file *file)
>  	}
>  	spin_unlock_bh(&vsock->send_pkt_list_lock);
>  
> +	spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
> +	while (!list_empty(&vsock->dgram_send_pkt_list)) {
> +		struct virtio_vsock_pkt *pkt;
> +
> +		pkt = list_first_entry(&vsock->dgram_send_pkt_list,
> +				struct virtio_vsock_pkt, list);
> +		list_del_init(&pkt->list);
> +		virtio_transport_free_pkt(pkt);
> +	}
> +	spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
> +
>  	vhost_dev_cleanup(&vsock->dev);
>  	kfree(vsock->dev.vqs);
>  	vhost_vsock_free(vsock);
> @@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
>  	int ret;
>  
>  	ret = vsock_core_register(&vhost_transport.transport,
> -				  VSOCK_TRANSPORT_F_H2G);
> +				  VSOCK_TRANSPORT_F_H2G | VSOCK_TRANSPORT_F_DGRAM);
>  	if (ret < 0)
>  		return ret;
>  	return misc_register(&vhost_vsock_misc);
> -- 
> 2.20.1




[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux