On Tue, Mar 16, 2010 at 05:32:17PM +0800, Xin Xiaohui wrote: > The vhost-net backend now only supports synchronous send/recv > operations. The patch provides multiple submits and asynchronous > notifications. This is needed for zero-copy case. > > Signed-off-by: Xin Xiaohui <xiaohui.xin@xxxxxxxxx> > --- > > Michael, > I don't use the kiocb comes from the sendmsg/recvmsg, > since I have embeded the kiocb in page_info structure, > and allocate it when page_info allocated. So what I suggested was that vhost allocates and tracks the iocbs, and passes them to your device with sendmsg/ recvmsg calls. This way your device won't need to share structures and locking strategy with vhost: you get an iocb, handle it, invoke a callback to notify vhost about completion. This also gets rid of the 'receiver' callback. > Please have a review and thanks for the instruction > for replying email which helps me a lot. > > Thanks, > Xiaohui > > drivers/vhost/net.c | 159 +++++++++++++++++++++++++++++++++++++++++++++++-- > drivers/vhost/vhost.h | 12 ++++ > 2 files changed, 166 insertions(+), 5 deletions(-) > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c > index 22d5fef..5483848 100644 > --- a/drivers/vhost/net.c > +++ b/drivers/vhost/net.c > @@ -17,11 +17,13 @@ > #include <linux/workqueue.h> > #include <linux/rcupdate.h> > #include <linux/file.h> > +#include <linux/aio.h> > > #include <linux/net.h> > #include <linux/if_packet.h> > #include <linux/if_arp.h> > #include <linux/if_tun.h> > +#include <linux/mpassthru.h> > > #include <net/sock.h> > > @@ -91,6 +93,12 @@ static void tx_poll_start(struct vhost_net *net, struct socket *sock) > net->tx_poll_state = VHOST_NET_POLL_STARTED; > } > > +static void handle_async_rx_events_notify(struct vhost_net *net, > + struct vhost_virtqueue *vq); > + > +static void handle_async_tx_events_notify(struct vhost_net *net, > + struct vhost_virtqueue *vq); > + A couple of style comments: - It's better to arrange functions in such order that forward declarations aren't necessary. Since we don't have recursion, this should always be possible. - continuation lines should be idented at least at the position of '(' on the previous line. > /* Expects to be always run from workqueue - which acts as > * read-size critical section for our kind of RCU. */ > static void handle_tx(struct vhost_net *net) > @@ -124,6 +132,8 @@ static void handle_tx(struct vhost_net *net) > tx_poll_stop(net); > hdr_size = vq->hdr_size; > > + handle_async_tx_events_notify(net, vq); > + > for (;;) { > head = vhost_get_vq_desc(&net->dev, vq, vq->iov, > ARRAY_SIZE(vq->iov), > @@ -151,6 +161,12 @@ static void handle_tx(struct vhost_net *net) > /* Skip header. TODO: support TSO. */ > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, out); > msg.msg_iovlen = out; > + > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) { > + vq->head = head; > + msg.msg_control = (void *)vq; So here a device gets a pointer to vhost_virtqueue structure. If it gets an iocb and invokes a callback, it would not care about vhost internals. > + } > + > len = iov_length(vq->iov, out); > /* Sanity check */ > if (!len) { > @@ -166,6 +182,10 @@ static void handle_tx(struct vhost_net *net) > tx_poll_start(net, sock); > break; > } > + > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) > + continue; > + > if (err != len) > pr_err("Truncated TX packet: " > " len %d != %zd\n", err, len); > @@ -177,6 +197,8 @@ static void handle_tx(struct vhost_net *net) > } > } > > + handle_async_tx_events_notify(net, vq); > + > mutex_unlock(&vq->mutex); > unuse_mm(net->dev.mm); > } > @@ -206,7 +228,8 @@ static void handle_rx(struct vhost_net *net) > int err; > size_t hdr_size; > struct socket *sock = rcu_dereference(vq->private_data); > - if (!sock || skb_queue_empty(&sock->sk->sk_receive_queue)) > + if (!sock || (skb_queue_empty(&sock->sk->sk_receive_queue) && > + vq->link_state == VHOST_VQ_LINK_SYNC)) > return; > > use_mm(net->dev.mm); > @@ -214,9 +237,18 @@ static void handle_rx(struct vhost_net *net) > vhost_disable_notify(vq); > hdr_size = vq->hdr_size; > > - vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ? > + /* In async cases, for write logging, the simple way is to get > + * the log info always, and really logging is decided later. > + * Thus, when logging enabled, we can get log, and when logging > + * disabled, we can get log disabled accordingly. > + */ > + This adds overhead and might be one of the reasons your patch does not perform that well. A better way would be to flush outstanding requests or reread the vq when logging is enabled. > + vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) | > + (vq->link_state == VHOST_VQ_LINK_ASYNC) ? > vq->log : NULL; > > + handle_async_rx_events_notify(net, vq); > + > for (;;) { > head = vhost_get_vq_desc(&net->dev, vq, vq->iov, > ARRAY_SIZE(vq->iov), > @@ -245,6 +277,11 @@ static void handle_rx(struct vhost_net *net) > s = move_iovec_hdr(vq->iov, vq->hdr, hdr_size, in); > msg.msg_iovlen = in; > len = iov_length(vq->iov, in); > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) { > + vq->head = head; > + vq->_log = log; > + msg.msg_control = (void *)vq; > + } > /* Sanity check */ > if (!len) { > vq_err(vq, "Unexpected header len for RX: " > @@ -259,6 +296,10 @@ static void handle_rx(struct vhost_net *net) > vhost_discard_vq_desc(vq); > break; > } > + > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) > + continue; > + > /* TODO: Should check and handle checksum. */ > if (err > len) { > pr_err("Discarded truncated rx packet: " > @@ -284,10 +325,85 @@ static void handle_rx(struct vhost_net *net) > } > } > > + handle_async_rx_events_notify(net, vq); > + > mutex_unlock(&vq->mutex); > unuse_mm(net->dev.mm); > } > > +struct kiocb *notify_dequeue(struct vhost_virtqueue *vq) > +{ > + struct kiocb *iocb = NULL; > + unsigned long flags; > + > + spin_lock_irqsave(&vq->notify_lock, flags); > + if (!list_empty(&vq->notifier)) { > + iocb = list_first_entry(&vq->notifier, > + struct kiocb, ki_list); > + list_del(&iocb->ki_list); > + } > + spin_unlock_irqrestore(&vq->notify_lock, flags); > + return iocb; > +} > + > +static void handle_async_rx_events_notify(struct vhost_net *net, > + struct vhost_virtqueue *vq) > +{ > + struct kiocb *iocb = NULL; > + struct vhost_log *vq_log = NULL; > + int rx_total_len = 0; > + int log, size; > + > + if (vq->link_state != VHOST_VQ_LINK_ASYNC) > + return; > + if (vq != &net->dev.vqs[VHOST_NET_VQ_RX]) > + return; > + > + if (vq->receiver) > + vq->receiver(vq); > + vq_log = unlikely(vhost_has_feature( > + &net->dev, VHOST_F_LOG_ALL)) ? vq->log : NULL; > + while ((iocb = notify_dequeue(vq)) != NULL) { > + vhost_add_used_and_signal(&net->dev, vq, > + iocb->ki_pos, iocb->ki_nbytes); > + log = (int)iocb->ki_user_data; > + size = iocb->ki_nbytes; > + rx_total_len += iocb->ki_nbytes; > + if (iocb->ki_dtor) > + iocb->ki_dtor(iocb); > + if (unlikely(vq_log)) > + vhost_log_write(vq, vq_log, log, size); > + if (unlikely(rx_total_len >= VHOST_NET_WEIGHT)) { > + vhost_poll_queue(&vq->poll); > + break; > + } > + } > +} > + > +static void handle_async_tx_events_notify(struct vhost_net *net, > + struct vhost_virtqueue *vq) > +{ > + struct kiocb *iocb = NULL; > + int tx_total_len = 0; > + > + if (vq->link_state != VHOST_VQ_LINK_ASYNC) > + return; > + if (vq != &net->dev.vqs[VHOST_NET_VQ_TX]) > + return; > + Hard to see why the second check would be necessary > + while ((iocb = notify_dequeue(vq)) != NULL) { > + vhost_add_used_and_signal(&net->dev, vq, > + iocb->ki_pos, 0); > + tx_total_len += iocb->ki_nbytes; > + if (iocb->ki_dtor) > + iocb->ki_dtor(iocb); > + if (unlikely(tx_total_len >= VHOST_NET_WEIGHT)) { > + vhost_poll_queue(&vq->poll); > + break; > + } > + } > +} > + > static void handle_tx_kick(struct work_struct *work) > { > struct vhost_virtqueue *vq; > @@ -462,7 +578,19 @@ static struct socket *get_tun_socket(int fd) > return sock; > } > > -static struct socket *get_socket(int fd) > +static struct socket *get_mp_socket(int fd) > +{ > + struct file *file = fget(fd); > + struct socket *sock; > + if (!file) > + return ERR_PTR(-EBADF); > + sock = mp_get_socket(file); > + if (IS_ERR(sock)) > + fput(file); > + return sock; > +} > + > +static struct socket *get_socket(struct vhost_virtqueue *vq, int fd) > { > struct socket *sock; > if (fd == -1) > @@ -473,9 +601,26 @@ static struct socket *get_socket(int fd) > sock = get_tun_socket(fd); > if (!IS_ERR(sock)) > return sock; > + sock = get_mp_socket(fd); > + if (!IS_ERR(sock)) { > + vq->link_state = VHOST_VQ_LINK_ASYNC; > + return sock; > + } > return ERR_PTR(-ENOTSOCK); > } > > +static void vhost_init_link_state(struct vhost_net *n, int index) > +{ > + struct vhost_virtqueue *vq = n->vqs + index; > + > + WARN_ON(!mutex_is_locked(&vq->mutex)); > + if (vq->link_state == VHOST_VQ_LINK_ASYNC) { > + vq->receiver = NULL; > + INIT_LIST_HEAD(&vq->notifier); > + spin_lock_init(&vq->notify_lock); > + } > +} > + > static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd) > { > struct socket *sock, *oldsock; > @@ -493,12 +638,15 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd) > } > vq = n->vqs + index; > mutex_lock(&vq->mutex); > - sock = get_socket(fd); > + vq->link_state = VHOST_VQ_LINK_SYNC; > + sock = get_socket(vq, fd); > if (IS_ERR(sock)) { > r = PTR_ERR(sock); > goto err; > } > > + vhost_init_link_state(n, index); > + > /* start polling new socket */ > oldsock = vq->private_data; > if (sock == oldsock) > @@ -507,8 +655,8 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd) > vhost_net_disable_vq(n, vq); > rcu_assign_pointer(vq->private_data, sock); > vhost_net_enable_vq(n, vq); > - mutex_unlock(&vq->mutex); > done: > + mutex_unlock(&vq->mutex); > mutex_unlock(&n->dev.mutex); > if (oldsock) { > vhost_net_flush_vq(n, index); > @@ -516,6 +664,7 @@ done: > } > return r; > err: > + mutex_unlock(&vq->mutex); > mutex_unlock(&n->dev.mutex); > return r; > } > diff --git a/drivers/vhost/vhost.h b/drivers/vhost/vhost.h > index d1f0453..297af1c 100644 > --- a/drivers/vhost/vhost.h > +++ b/drivers/vhost/vhost.h > @@ -43,6 +43,11 @@ struct vhost_log { > u64 len; > }; > > +enum vhost_vq_link_state { > + VHOST_VQ_LINK_SYNC = 0, > + VHOST_VQ_LINK_ASYNC = 1, > +}; > + > /* The virtqueue structure describes a queue attached to a device. */ > struct vhost_virtqueue { > struct vhost_dev *dev; > @@ -96,6 +101,13 @@ struct vhost_virtqueue { > /* Log write descriptors */ > void __user *log_base; > struct vhost_log log[VHOST_NET_MAX_SG]; > + /*Differiate async socket for 0-copy from normal*/ > + enum vhost_vq_link_state link_state; > + int head; > + int _log; > + struct list_head notifier; > + spinlock_t notify_lock; > + void (*receiver)(struct vhost_virtqueue *); > }; > > struct vhost_dev { > -- > 1.5.4.4 -- To unsubscribe from this list: send the line "unsubscribe kvm" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html