I saw WARN_ON(!list_empty(&dev->work_list)) trigger so our custom flush is not as airtight as need be. This patch switches to a simple atomic counter + srcu instead of the custom locked queue + flush implementation. This will slow down the setup ioctls, which should not matter - it's slow path anyway. We use the expedited flush to at least make sure it has a sane time bound. Works fine for me. I got reports that with many guests, work lock is highly contended, and this patch should in theory fix this as well - but I haven't tested this yet. Signed-off-by: Michael S. Tsirkin <mst@xxxxxxxxxx> --- drivers/vhost/net.c | 55 +++++-------------- drivers/vhost/vhost.c | 140 ++++++++++++++++++++++--------------------------- drivers/vhost/vhost.h | 47 +++++++++------- 3 files changed, 103 insertions(+), 139 deletions(-) diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c index f13e56b..ee69c51 100644 --- a/drivers/vhost/net.c +++ b/drivers/vhost/net.c @@ -111,8 +111,9 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock) /* Expects to be always run from workqueue - which acts as * read-size critical section for our kind of RCU. */ -static void handle_tx(struct vhost_net *net) +static void handle_tx(struct vhost_dev *dev) { + struct vhost_net *net = container_of(dev, struct vhost_net, dev); struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX]; unsigned out, in, s; int head; @@ -127,7 +128,7 @@ static void handle_tx(struct vhost_net *net) size_t len, total_len = 0; int err, wmem; size_t hdr_size; - struct socket *sock = rcu_dereference(vq->private_data); + struct socket *sock = vhost_vq_data(vq, &net->dev); if (!sock) return; @@ -305,7 +306,7 @@ static void handle_rx_big(struct vhost_net *net) size_t len, total_len = 0; int err; size_t hdr_size; - struct socket *sock = rcu_dereference(vq->private_data); + struct socket *sock = vhost_vq_data(vq, &net->dev); if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue)) return; @@ -416,7 +417,7 @@ static void handle_rx_mergeable(struct vhost_net *net) int err, headcount; size_t vhost_hlen, sock_hlen; size_t vhost_len, sock_len; - struct socket *sock = rcu_dereference(vq->private_data); + struct socket *sock = vhost_vq_data(vq, &net->dev); if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue)) return; @@ -500,46 +501,15 @@ static void handle_rx_mergeable(struct vhost_net *net) unuse_mm(net->dev.mm); } -static void handle_rx(struct vhost_net *net) +static void handle_rx(struct vhost_dev *dev) { + struct vhost_net *net = container_of(dev, struct vhost_net, dev); if (vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF)) handle_rx_mergeable(net); else handle_rx_big(net); } -static void handle_tx_kick(struct vhost_work *work) -{ - struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue, - poll.work); - struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); - - handle_tx(net); -} - -static void handle_rx_kick(struct vhost_work *work) -{ - struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue, - poll.work); - struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); - - handle_rx(net); -} - -static void handle_tx_net(struct vhost_work *work) -{ - struct vhost_net *net = container_of(work, struct vhost_net, - poll[VHOST_NET_VQ_TX].work); - handle_tx(net); -} - -static void handle_rx_net(struct vhost_work *work) -{ - struct vhost_net *net = container_of(work, struct vhost_net, - poll[VHOST_NET_VQ_RX].work); - handle_rx(net); -} - static int vhost_net_open(struct inode *inode, struct file *f) { struct vhost_net *n = kmalloc(sizeof *n, GFP_KERNEL); @@ -550,16 +520,18 @@ static int vhost_net_open(struct inode *inode, struct file *f) return -ENOMEM; dev = &n->dev; - n->vqs[VHOST_NET_VQ_TX].handle_kick = handle_tx_kick; - n->vqs[VHOST_NET_VQ_RX].handle_kick = handle_rx_kick; + vhost_work_set_fn(&n->vqs[VHOST_NET_VQ_TX].work, handle_tx); + vhost_work_set_fn(&n->vqs[VHOST_NET_VQ_RX].work, handle_rx); r = vhost_dev_init(dev, n->vqs, VHOST_NET_VQ_MAX); if (r < 0) { kfree(n); return r; } - vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT, dev); - vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN, dev); + vhost_poll_init(n->poll + VHOST_NET_VQ_TX, + &n->vqs[VHOST_NET_VQ_TX].work, POLLOUT, dev); + vhost_poll_init(n->poll + VHOST_NET_VQ_RX, + &n->vqs[VHOST_NET_VQ_RX].work, POLLIN, dev); n->tx_poll_state = VHOST_NET_POLL_DISABLED; f->private_data = n; @@ -640,6 +612,7 @@ static int vhost_net_release(struct inode *inode, struct file *f) /* We do an extra flush before freeing memory, * since jobs can re-queue themselves. */ vhost_net_flush(n); + vhost_dev_free(&n->dev); kfree(n); return 0; } diff --git a/drivers/vhost/vhost.c b/drivers/vhost/vhost.c index e05557d..daa95c8 100644 --- a/drivers/vhost/vhost.c +++ b/drivers/vhost/vhost.c @@ -60,22 +60,27 @@ static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync, return 0; } +/* Must be called for each vq before vhost_dev_init. */ +void vhost_work_set_fn(struct vhost_work *work, vhost_work_fn_t fn) +{ + work->fn = fn; +} + +static void vhost_work_init(struct vhost_work *work) +{ + atomic_set(&work->queue_seq, 0); + work->done_seq = 0; +} + /* Init poll structure */ -void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn, +void vhost_poll_init(struct vhost_poll *poll, struct vhost_work *work, unsigned long mask, struct vhost_dev *dev) { - struct vhost_work *work = &poll->work; - + poll->work = work; init_waitqueue_func_entry(&poll->wait, vhost_poll_wakeup); init_poll_funcptr(&poll->table, vhost_poll_func); poll->mask = mask; poll->dev = dev; - - INIT_LIST_HEAD(&work->node); - work->fn = fn; - init_waitqueue_head(&work->done); - work->flushing = 0; - work->queue_seq = work->done_seq = 0; } /* Start polling a file. We add ourselves to file's wait queue. The caller must @@ -99,40 +104,16 @@ void vhost_poll_stop(struct vhost_poll *poll) * locks that are also used by the callback. */ void vhost_poll_flush(struct vhost_poll *poll) { - struct vhost_work *work = &poll->work; - unsigned seq; - int left; - int flushing; - - spin_lock_irq(&poll->dev->work_lock); - seq = work->queue_seq; - work->flushing++; - spin_unlock_irq(&poll->dev->work_lock); - wait_event(work->done, ({ - spin_lock_irq(&poll->dev->work_lock); - left = seq - work->done_seq <= 0; - spin_unlock_irq(&poll->dev->work_lock); - left; - })); - spin_lock_irq(&poll->dev->work_lock); - flushing = --work->flushing; - spin_unlock_irq(&poll->dev->work_lock); - BUG_ON(flushing < 0); + synchronize_srcu_expedited(&poll->dev->worker_srcu); } void vhost_poll_queue(struct vhost_poll *poll) { struct vhost_dev *dev = poll->dev; - struct vhost_work *work = &poll->work; - unsigned long flags; - - spin_lock_irqsave(&dev->work_lock, flags); - if (list_empty(&work->node)) { - list_add_tail(&work->node, &dev->work_list); - work->queue_seq++; - wake_up_process(dev->worker); - } - spin_unlock_irqrestore(&dev->work_lock, flags); + struct vhost_work *work = poll->work; + + atomic_inc(&work->queue_seq); + wake_up_process(dev->worker); } static void vhost_vq_reset(struct vhost_dev *dev, @@ -164,41 +145,39 @@ static void vhost_vq_reset(struct vhost_dev *dev, static int vhost_worker(void *data) { struct vhost_dev *dev = data; - struct vhost_work *work = NULL; - unsigned uninitialized_var(seq); + struct vhost_work *uninitialized_var(work); + unsigned n, i, vq = 0; + int seq; - for (;;) { - /* mb paired w/ kthread_stop */ - set_current_state(TASK_INTERRUPTIBLE); + n = dev->nvqs; +repeat: + set_current_state(TASK_INTERRUPTIBLE); /* mb paired w/ kthread_stop */ - spin_lock_irq(&dev->work_lock); - if (work) { - work->done_seq = seq; - if (work->flushing) - wake_up_all(&work->done); - } + if (kthread_should_stop()) { + __set_current_state(TASK_RUNNING); + return 0; + } - if (kthread_should_stop()) { - spin_unlock_irq(&dev->work_lock); - __set_current_state(TASK_RUNNING); - return 0; + for (i = 0; i < n; ++i) { + work = &dev->vqs[(vq + i) % n].work; + seq = atomic_read(&work->queue_seq); + if (seq != work->done_seq) { + work->done_seq = seq; + break; } - if (!list_empty(&dev->work_list)) { - work = list_first_entry(&dev->work_list, - struct vhost_work, node); - list_del_init(&work->node); - seq = work->queue_seq; - } else - work = NULL; - spin_unlock_irq(&dev->work_lock); + work = NULL; + } - if (work) { - __set_current_state(TASK_RUNNING); - work->fn(work); - } else - schedule(); + if (work) { + int idx; + __set_current_state(TASK_RUNNING); + idx = srcu_read_lock(&dev->worker_srcu); + work->fn(dev); + srcu_read_unlock(&dev->worker_srcu, idx); + } else + schedule(); - } + goto repeat; } long vhost_dev_init(struct vhost_dev *dev, @@ -213,20 +192,22 @@ long vhost_dev_init(struct vhost_dev *dev, dev->log_file = NULL; dev->memory = NULL; dev->mm = NULL; - spin_lock_init(&dev->work_lock); - INIT_LIST_HEAD(&dev->work_list); dev->worker = NULL; for (i = 0; i < dev->nvqs; ++i) { dev->vqs[i].dev = dev; mutex_init(&dev->vqs[i].mutex); vhost_vq_reset(dev, dev->vqs + i); - if (dev->vqs[i].handle_kick) + if (dev->vqs[i].work.fn) vhost_poll_init(&dev->vqs[i].poll, - dev->vqs[i].handle_kick, POLLIN, dev); + &dev->vqs[i].work, POLLIN, dev); } + return init_srcu_struct(&dev->worker_srcu); +} - return 0; +void vhost_dev_free(struct vhost_dev *dev) +{ + cleanup_srcu_struct(&dev->worker_srcu); } /* Caller should have device mutex */ @@ -240,7 +221,7 @@ long vhost_dev_check_owner(struct vhost_dev *dev) static long vhost_dev_set_owner(struct vhost_dev *dev) { struct task_struct *worker; - int err; + int i, err; /* Is there an owner already? */ if (dev->mm) { err = -EBUSY; @@ -258,6 +239,10 @@ static long vhost_dev_set_owner(struct vhost_dev *dev) err = cgroup_attach_task_current_cg(worker); if (err) goto err_cgroup; + + for (i = 0; i < dev->nvqs; ++i) { + vhost_work_init(&dev->vqs[i].work); + } wake_up_process(worker); /* avoid contributing to loadavg */ return 0; @@ -293,7 +278,7 @@ void vhost_dev_cleanup(struct vhost_dev *dev) { int i; for (i = 0; i < dev->nvqs; ++i) { - if (dev->vqs[i].kick && dev->vqs[i].handle_kick) { + if (dev->vqs[i].kick && dev->vqs[i].work.fn) { vhost_poll_stop(&dev->vqs[i].poll); vhost_poll_flush(&dev->vqs[i].poll); } @@ -322,7 +307,6 @@ void vhost_dev_cleanup(struct vhost_dev *dev) mmput(dev->mm); dev->mm = NULL; - WARN_ON(!list_empty(&dev->work_list)); kthread_stop(dev->worker); } @@ -644,7 +628,7 @@ static long vhost_set_vring(struct vhost_dev *d, int ioctl, void __user *argp) r = -ENOIOCTLCMD; } - if (pollstop && vq->handle_kick) + if (pollstop && vq->work.fn) vhost_poll_stop(&vq->poll); if (ctx) @@ -652,12 +636,12 @@ static long vhost_set_vring(struct vhost_dev *d, int ioctl, void __user *argp) if (filep) fput(filep); - if (pollstart && vq->handle_kick) + if (pollstart && vq->work.fn) vhost_poll_start(&vq->poll, vq->kick); mutex_unlock(&vq->mutex); - if (pollstop && vq->handle_kick) + if (pollstop && vq->work.fn) vhost_poll_flush(&vq->poll); return r; } diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h index afd7729..9c990ea 100644 --- a/drivers/vhost/vhost.h +++ b/drivers/vhost/vhost.h @@ -11,9 +11,10 @@ #include <linux/uio.h> #include <linux/virtio_config.h> #include <linux/virtio_ring.h> +#include <linux/srcu.h> #include <asm/atomic.h> -struct vhost_device; +struct vhost_dev; enum { /* Enough place for all fragments, head, and virtio net header. */ @@ -21,29 +22,33 @@ enum { }; struct vhost_work; -typedef void (*vhost_work_fn_t)(struct vhost_work *work); +typedef void (*vhost_work_fn_t)(struct vhost_dev *dev); struct vhost_work { - struct list_head node; + /* Callback function to execute. */ vhost_work_fn_t fn; - wait_queue_head_t done; - int flushing; - unsigned queue_seq; - unsigned done_seq; + /* Incremented to request callback execution. + * Atomic to allow multiple writers. */ + atomic_t queue_seq; + /* Used by worker to track execution requests. + * Used from a single thread so no locking. */ + int done_seq; }; +void vhost_work_set_fn(struct vhost_work *work, vhost_work_fn_t fn); + /* Poll a file (eventfd or socket) */ /* Note: there's nothing vhost specific about this structure. */ struct vhost_poll { poll_table table; wait_queue_head_t *wqh; wait_queue_t wait; - struct vhost_work work; + struct vhost_work *work; unsigned long mask; struct vhost_dev *dev; }; -void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn, +void vhost_poll_init(struct vhost_poll *poll, struct vhost_work* work, unsigned long mask, struct vhost_dev *dev); void vhost_poll_start(struct vhost_poll *poll, struct file *file); void vhost_poll_stop(struct vhost_poll *poll); @@ -72,11 +77,12 @@ struct vhost_virtqueue { struct eventfd_ctx *error_ctx; struct eventfd_ctx *log_ctx; + /* The work to execute when the Guest kicks us, + * on Host activity, or timeout. */ + struct vhost_work work; + /* Poll Guest for kicks */ struct vhost_poll poll; - /* The routine to call when the Guest pings us, or timeout. */ - vhost_work_fn_t handle_kick; - /* Last available index we saw. */ u16 last_avail_idx; @@ -99,12 +105,7 @@ struct vhost_virtqueue { size_t vhost_hlen; size_t sock_hlen; struct vring_used_elem heads[VHOST_NET_MAX_SG]; - /* We use a kind of RCU to access private pointer. - * All readers access it from worker, which makes it possible to - * flush the vhost_work instead of synchronize_rcu. Therefore readers do - * not need to call rcu_read_lock/rcu_read_unlock: the beginning of - * vhost_work execution acts instead of rcu_read_lock() and the end of - * vhost_work execution acts instead of rcu_read_lock(). + /* Readers use worker_srcu in device to access private pointer. * Writers use virtqueue mutex. */ void *private_data; /* Log write descriptors */ @@ -112,6 +113,12 @@ struct vhost_virtqueue { struct vhost_log log[VHOST_NET_MAX_SG]; }; +static inline void *vhost_vq_data(struct vhost_virtqueue *vq, + struct vhost_dev *dev) +{ + return srcu_dereference(vq->private_data, &dev->worker_srcu); +} + struct vhost_dev { /* Readers use RCU to access memory table pointer * log base pointer and features. @@ -124,12 +131,12 @@ struct vhost_dev { int nvqs; struct file *log_file; struct eventfd_ctx *log_ctx; - spinlock_t work_lock; - struct list_head work_list; struct task_struct *worker; + struct srcu_struct worker_srcu; }; long vhost_dev_init(struct vhost_dev *, struct vhost_virtqueue *vqs, int nvqs); +void vhost_dev_free(struct vhost_dev *); long vhost_dev_check_owner(struct vhost_dev *); long vhost_dev_reset_owner(struct vhost_dev *); void vhost_dev_cleanup(struct vhost_dev *); -- 1.7.2.rc0.14.g41c1c -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html