[RFC v9 10/27] virtio-blk: Stop data plane thread cleanly

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Stefan Hajnoczi <stefanha@xxxxxxxxxxxxxxxxxx>
---
 hw/dataplane/event-poll.h |   79 ++++++++++++++++-------
 hw/dataplane/ioq.h        |   65 +++++++++++++------
 hw/dataplane/vring.h      |    6 +-
 hw/virtio-blk.c           |  154 +++++++++++++++++++++++++++++++++++++++------
 4 files changed, 243 insertions(+), 61 deletions(-)

diff --git a/hw/dataplane/event-poll.h b/hw/dataplane/event-poll.h
index f38e969..acd85e1 100644
--- a/hw/dataplane/event-poll.h
+++ b/hw/dataplane/event-poll.h
@@ -5,17 +5,40 @@
 #include "event_notifier.h"
 
 typedef struct EventHandler EventHandler;
-typedef void EventCallback(EventHandler *handler);
+typedef bool EventCallback(EventHandler *handler);
 struct EventHandler
 {
-    EventNotifier *notifier;    /* eventfd */
-    EventCallback *callback;    /* callback function */
+    EventNotifier *notifier;        /* eventfd */
+    EventCallback *callback;        /* callback function */
 };
 
 typedef struct {
-    int epoll_fd;               /* epoll(2) file descriptor */
+    int epoll_fd;                   /* epoll(2) file descriptor */
+    EventNotifier stop_notifier;    /* stop poll notifier */
+    EventHandler stop_handler;      /* stop poll handler */
 } EventPoll;
 
+/* Add an event notifier and its callback for polling */
+static void event_poll_add(EventPoll *poll, EventHandler *handler, EventNotifier *notifier, EventCallback *callback)
+{
+    struct epoll_event event = {
+        .events = EPOLLIN,
+        .data.ptr = handler,
+    };
+    handler->notifier = notifier;
+    handler->callback = callback;
+    if (epoll_ctl(poll->epoll_fd, EPOLL_CTL_ADD, event_notifier_get_fd(notifier), &event) != 0) {
+        fprintf(stderr, "failed to add event handler to epoll: %m\n");
+        exit(1);
+    }
+}
+
+/* Event callback for stopping the event_poll_run() loop */
+static bool handle_stop(EventHandler *handler)
+{
+    return false; /* stop event loop */
+}
+
 static void event_poll_init(EventPoll *poll)
 {
     /* Create epoll file descriptor */
@@ -24,35 +47,29 @@ static void event_poll_init(EventPoll *poll)
         fprintf(stderr, "epoll_create1 failed: %m\n");
         exit(1);
     }
+
+    /* Set up stop notifier */
+    if (event_notifier_init(&poll->stop_notifier, 0) < 0) {
+        fprintf(stderr, "failed to init stop notifier\n");
+        exit(1);
+    }
+    event_poll_add(poll, &poll->stop_handler,
+                   &poll->stop_notifier, handle_stop);
 }
 
 static void event_poll_cleanup(EventPoll *poll)
 {
+    event_notifier_cleanup(&poll->stop_notifier);
     close(poll->epoll_fd);
     poll->epoll_fd = -1;
 }
 
-/* Add an event notifier and its callback for polling */
-static void event_poll_add(EventPoll *poll, EventHandler *handler, EventNotifier *notifier, EventCallback *callback)
-{
-    struct epoll_event event = {
-        .events = EPOLLIN,
-        .data.ptr = handler,
-    };
-    handler->notifier = notifier;
-    handler->callback = callback;
-    if (epoll_ctl(poll->epoll_fd, EPOLL_CTL_ADD, event_notifier_get_fd(notifier), &event) != 0) {
-        fprintf(stderr, "failed to add event handler to epoll: %m\n");
-        exit(1);
-    }
-}
-
 /* Block until the next event and invoke its callback
  *
  * Signals must be masked, EINTR should never happen.  This is true for QEMU
  * threads.
  */
-static void event_poll(EventPoll *poll)
+static bool event_poll(EventPoll *poll)
 {
     EventHandler *handler;
     struct epoll_event event;
@@ -73,7 +90,27 @@ static void event_poll(EventPoll *poll)
     event_notifier_test_and_clear(handler->notifier);
 
     /* Handle the event */
-    handler->callback(handler);
+    return handler->callback(handler);
+}
+
+static void event_poll_run(EventPoll *poll)
+{
+    while (event_poll(poll)) {
+        /* do nothing */
+    }
+}
+
+/* Stop the event_poll_run() loop
+ *
+ * This function can be used from another thread.
+ */
+static void event_poll_stop(EventPoll *poll)
+{
+    uint64_t dummy = 1;
+    int eventfd = event_notifier_get_fd(&poll->stop_notifier);
+    ssize_t unused __attribute__((unused));
+
+    unused = write(eventfd, &dummy, sizeof dummy);
 }
 
 #endif /* EVENT_POLL_H */
diff --git a/hw/dataplane/ioq.h b/hw/dataplane/ioq.h
index 26ca307..7200e87 100644
--- a/hw/dataplane/ioq.h
+++ b/hw/dataplane/ioq.h
@@ -3,10 +3,10 @@
 
 typedef struct {
     int fd;                         /* file descriptor */
-    unsigned int maxreqs;           /* max length of freelist and queue */
+    unsigned int max_reqs;           /* max length of freelist and queue */
 
     io_context_t io_ctx;            /* Linux AIO context */
-    EventNotifier notifier;         /* Linux AIO eventfd */
+    EventNotifier io_notifier;      /* Linux AIO eventfd */
 
     /* Requests can complete in any order so a free list is necessary to manage
      * available iocbs.
@@ -19,25 +19,28 @@ typedef struct {
     unsigned int queue_idx;
 } IOQueue;
 
-static void ioq_init(IOQueue *ioq, int fd, unsigned int maxreqs)
+static void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
 {
+    int rc;
+
     ioq->fd = fd;
-    ioq->maxreqs = maxreqs;
+    ioq->max_reqs = max_reqs;
 
-    if (io_setup(maxreqs, &ioq->io_ctx) != 0) {
-        fprintf(stderr, "ioq io_setup failed\n");
+    memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
+    if ((rc = io_setup(max_reqs, &ioq->io_ctx)) != 0) {
+        fprintf(stderr, "ioq io_setup failed %d\n", rc);
         exit(1);
     }
 
-    if (event_notifier_init(&ioq->notifier, 0) != 0) {
-        fprintf(stderr, "ioq io event notifier creation failed\n");
+    if ((rc = event_notifier_init(&ioq->io_notifier, 0)) != 0) {
+        fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
         exit(1);
     }
 
-    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * maxreqs);
+    ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
     ioq->freelist_idx = 0;
 
-    ioq->queue = g_malloc0(sizeof ioq->queue[0] * maxreqs);
+    ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
     ioq->queue_idx = 0;
 }
 
@@ -46,13 +49,13 @@ static void ioq_cleanup(IOQueue *ioq)
     g_free(ioq->freelist);
     g_free(ioq->queue);
 
-    event_notifier_cleanup(&ioq->notifier);
+    event_notifier_cleanup(&ioq->io_notifier);
     io_destroy(ioq->io_ctx);
 }
 
 static EventNotifier *ioq_get_notifier(IOQueue *ioq)
 {
-    return &ioq->notifier;
+    return &ioq->io_notifier;
 }
 
 static struct iocb *ioq_get_iocb(IOQueue *ioq)
@@ -63,18 +66,19 @@ static struct iocb *ioq_get_iocb(IOQueue *ioq)
     }
     struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
     ioq->queue[ioq->queue_idx++] = iocb;
+    return iocb;
 }
 
-static __attribute__((unused)) void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
+static void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
 {
-    if (unlikely(ioq->freelist_idx == ioq->maxreqs)) {
+    if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
         fprintf(stderr, "ioq overflow\n");
         exit(1);
     }
     ioq->freelist[ioq->freelist_idx++] = iocb;
 }
 
-static __attribute__((unused)) void ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigned int count, long long offset)
+static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigned int count, long long offset)
 {
     struct iocb *iocb = ioq_get_iocb(ioq);
 
@@ -83,22 +87,45 @@ static __attribute__((unused)) void ioq_rdwr(IOQueue *ioq, bool read, struct iov
     } else {
         io_prep_pwritev(iocb, ioq->fd, iov, count, offset);
     }
-    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->notifier));
+    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+    return iocb;
 }
 
-static __attribute__((unused)) void ioq_fdsync(IOQueue *ioq)
+static struct iocb *ioq_fdsync(IOQueue *ioq)
 {
     struct iocb *iocb = ioq_get_iocb(ioq);
 
     io_prep_fdsync(iocb, ioq->fd);
-    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->notifier));
+    io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+    return iocb;
 }
 
-static __attribute__((unused)) int ioq_submit(IOQueue *ioq)
+static int ioq_submit(IOQueue *ioq)
 {
     int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
     ioq->queue_idx = 0; /* reset */
     return rc;
 }
 
+typedef void IOQueueCompletion(struct iocb *iocb, ssize_t ret, void *opaque);
+static int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion, void *opaque)
+{
+    struct io_event events[ioq->max_reqs];
+    int nevents, i;
+
+    nevents = io_getevents(ioq->io_ctx, 0, ioq->max_reqs, events, NULL);
+    if (unlikely(nevents < 0)) {
+        fprintf(stderr, "io_getevents failed %d\n", nevents);
+        exit(1);
+    }
+
+    for (i = 0; i < nevents; i++) {
+        ssize_t ret = ((uint64_t)events[i].res2 << 32) | events[i].res;
+
+        completion(events[i].obj, ret, opaque);
+        ioq_put_iocb(ioq, events[i].obj);
+    }
+    return nevents;
+}
+
 #endif /* IO_QUEUE_H */
diff --git a/hw/dataplane/vring.h b/hw/dataplane/vring.h
index b07d4f6..70675e5 100644
--- a/hw/dataplane/vring.h
+++ b/hw/dataplane/vring.h
@@ -56,8 +56,8 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, int n)
     vring_init(&vring->vr, virtio_queue_get_num(vdev, n),
                phys_to_host(vring, virtio_queue_get_ring_addr(vdev, n)), 4096);
 
-    vring->last_avail_idx = vring->vr.avail->idx;
-    vring->last_used_idx = vring->vr.used->idx;
+    vring->last_avail_idx = 0;
+    vring->last_used_idx = 0;
 
     fprintf(stderr, "vring physical=%#lx desc=%p avail=%p used=%p\n",
             virtio_queue_get_ring_addr(vdev, n),
@@ -176,7 +176,7 @@ static unsigned int vring_pop(Vring *vring,
  *
  * Stolen from linux-2.6/drivers/vhost/vhost.c.
  */
-static __attribute__((unused)) void vring_push(Vring *vring, unsigned int head, int len)
+static void vring_push(Vring *vring, unsigned int head, int len)
 {
 	struct vring_used_elem *used;
 
diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
index 5e1ed79..52ea601 100644
--- a/hw/virtio-blk.c
+++ b/hw/virtio-blk.c
@@ -29,8 +29,13 @@ enum {
     REQ_MAX = VRING_MAX / 2,        /* maximum number of requests in the vring */
 };
 
-typedef struct VirtIOBlock
-{
+typedef struct {
+    struct iocb iocb;               /* Linux AIO control block */
+    unsigned char *status;          /* virtio block status code */
+    unsigned int head;              /* vring descriptor index */
+} VirtIOBlockRequest;
+
+typedef struct {
     VirtIODevice vdev;
     BlockDriverState *bs;
     VirtQueue *vq;
@@ -44,11 +49,12 @@ typedef struct VirtIOBlock
 
     Vring vring;                    /* virtqueue vring */
 
-    IOQueue ioqueue;                /* Linux AIO queue (should really be per dataplane thread) */
-
     EventPoll event_poll;           /* event poller */
     EventHandler io_handler;        /* Linux AIO completion handler */
     EventHandler notify_handler;    /* virtqueue notify handler */
+
+    IOQueue ioqueue;                /* Linux AIO queue (should really be per dataplane thread) */
+    VirtIOBlockRequest requests[REQ_MAX]; /* pool of requests, managed by the queue */
 } VirtIOBlock;
 
 static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
@@ -56,12 +62,40 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
     return (VirtIOBlock *)vdev;
 }
 
-static void handle_io(EventHandler *handler)
+static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
 {
-    fprintf(stderr, "io completion happened\n");
+    VirtIOBlock *s = opaque;
+    VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
+    int len;
+
+    if (likely(ret >= 0)) {
+        *req->status = VIRTIO_BLK_S_OK;
+        len = ret;
+    } else {
+        *req->status = VIRTIO_BLK_S_IOERR;
+        len = 0;
+    }
+
+    /* According to the virtio specification len should be the number of bytes
+     * written to, but for virtio-blk it seems to be the number of bytes
+     * transferred plus the status bytes.
+     */
+    vring_push(&s->vring, req->head, len + sizeof req->status);
 }
 
-static void process_request(struct iovec iov[], unsigned int out_num, unsigned int in_num)
+static bool handle_io(EventHandler *handler)
+{
+    VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
+
+    if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
+        /* TODO is this thread-safe and can it be done faster? */
+        virtio_irq(s->vq);
+    }
+
+    return true;
+}
+
+static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_num, unsigned int in_num, unsigned int head)
 {
     /* Virtio block requests look like this: */
     struct virtio_blk_outhdr *outhdr; /* iov[0] */
@@ -78,11 +112,54 @@ static void process_request(struct iovec iov[], unsigned int out_num, unsigned i
     outhdr = iov[0].iov_base;
     inhdr = iov[out_num + in_num - 1].iov_base;
 
+    /*
     fprintf(stderr, "virtio-blk request type=%#x sector=%#lx\n",
             outhdr->type, outhdr->sector);
+    */
+
+    if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
+        fprintf(stderr, "virtio-blk unsupported request type %#x\n", outhdr->type);
+        exit(1);
+    }
+
+    struct iocb *iocb;
+    switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
+    case VIRTIO_BLK_T_IN:
+        if (unlikely(out_num != 1)) {
+            fprintf(stderr, "virtio-blk invalid read request\n");
+            exit(1);
+        }
+        iocb = ioq_rdwr(ioq, true, &iov[1], in_num - 1, outhdr->sector * 512UL); /* TODO is it always 512? */
+        break;
+
+    case VIRTIO_BLK_T_OUT:
+        if (unlikely(in_num != 1)) {
+            fprintf(stderr, "virtio-blk invalid write request\n");
+            exit(1);
+        }
+        iocb = ioq_rdwr(ioq, false, &iov[1], out_num - 1, outhdr->sector * 512UL); /* TODO is it always 512? */
+        break;
+
+    case VIRTIO_BLK_T_FLUSH:
+        if (unlikely(in_num != 1 || out_num != 1)) {
+            fprintf(stderr, "virtio-blk invalid flush request\n");
+            exit(1);
+        }
+        iocb = ioq_fdsync(ioq);
+        break;
+
+    default:
+        fprintf(stderr, "virtio-blk multiple request type bits set\n");
+        exit(1);
+    }
+
+    /* Fill in virtio block metadata needed for completion */
+    VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
+    req->head = head;
+    req->status = &inhdr->status;
 }
 
-static void handle_notify(EventHandler *handler)
+static bool handle_notify(EventHandler *handler)
 {
     VirtIOBlock *s = container_of(handler, VirtIOBlock, notify_handler);
 
@@ -114,19 +191,29 @@ static void handle_notify(EventHandler *handler)
             break; /* no more requests */
         }
 
-        fprintf(stderr, "head=%u out_num=%u in_num=%u\n", head, out_num, in_num);
+        /*
+        fprintf(stderr, "out_num=%u in_num=%u head=%u\n", out_num, in_num, head);
+        */
 
-        process_request(iov, out_num, in_num);
+        process_request(&s->ioqueue, iov, out_num, in_num, head);
     }
+
+    /* Submit requests, if any */
+    if (likely(iov != iovec)) {
+        if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
+            fprintf(stderr, "ioq_submit failed\n");
+            exit(1);
+        }
+    }
+
+    return true;
 }
 
 static void *data_plane_thread(void *opaque)
 {
     VirtIOBlock *s = opaque;
 
-    for (;;) {
-        event_poll(&s->event_poll);
-    }
+    event_poll_run(&s->event_poll);
     return NULL;
 }
 
@@ -140,10 +227,13 @@ static int get_raw_posix_fd_hack(VirtIOBlock *s)
 
 static void data_plane_start(VirtIOBlock *s)
 {
+    int i;
+
     vring_setup(&s->vring, &s->vdev, 0);
 
     event_poll_init(&s->event_poll);
 
+    /* Set up virtqueue notify */
     if (s->vdev.binding->set_host_notifier(s->vdev.binding_opaque, 0, true) != 0) {
         fprintf(stderr, "virtio-blk failed to set host notifier, ensure -enable-kvm is set\n");
         exit(1);
@@ -152,8 +242,11 @@ static void data_plane_start(VirtIOBlock *s)
                    virtio_queue_get_host_notifier(s->vq),
                    handle_notify);
 
+    /* Set up ioqueue */
     ioq_init(&s->ioqueue, get_raw_posix_fd_hack(s), REQ_MAX);
-    /* TODO populate ioqueue freelist */
+    for (i = 0; i < ARRAY_SIZE(s->requests); i++) {
+        ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
+    }
     event_poll_add(&s->event_poll, &s->io_handler, ioq_get_notifier(&s->ioqueue), handle_io);
 
     qemu_thread_create(&s->data_plane_thread, data_plane_thread, s, QEMU_THREAD_JOINABLE);
@@ -165,7 +258,9 @@ static void data_plane_stop(VirtIOBlock *s)
 {
     s->data_plane_started = false;
 
-    /* TODO stop data plane thread */
+    /* Tell data plane thread to stop and then wait for it to return */
+    event_poll_stop(&s->event_poll);
+    pthread_join(s->data_plane_thread.thread, NULL);
 
     ioq_cleanup(&s->ioqueue);
 
@@ -183,6 +278,10 @@ static void virtio_blk_set_status(VirtIODevice *vdev, uint8_t val)
         return;
     }
 
+    /*
+    fprintf(stderr, "virtio_blk_set_status %#x\n", val);
+    */
+
     if (val & VIRTIO_CONFIG_S_DRIVER_OK) {
         data_plane_start(s);
     } else {
@@ -190,11 +289,29 @@ static void virtio_blk_set_status(VirtIODevice *vdev, uint8_t val)
     }
 }
 
+static void virtio_blk_reset(VirtIODevice *vdev)
+{
+    virtio_blk_set_status(vdev, 0);
+}
+
 static void virtio_blk_handle_output(VirtIODevice *vdev, VirtQueue *vq)
 {
-    fprintf(stderr, "virtio_blk_handle_output: should never get here, "
-                    "data plane thread should process requests\n");
-    exit(1);
+    VirtIOBlock *s = to_virtio_blk(vdev);
+
+    if (s->data_plane_started) {
+        fprintf(stderr, "virtio_blk_handle_output: should never get here, "
+                        "data plane thread should process requests\n");
+        exit(1);
+    }
+
+    /* Linux seems to notify before the driver comes up.  This needs more
+     * investigation.  Just use a hack for now.
+     */
+    virtio_blk_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK); /* start the thread */
+
+    /* Now kick the thread */
+    uint64_t dummy = 1;
+    ssize_t unused __attribute__((unused)) = write(event_notifier_get_fd(virtio_queue_get_host_notifier(s->vq)), &dummy, sizeof dummy);
 }
 
 /* coalesce internal state, copy to pci i/o region 0
@@ -273,6 +390,7 @@ VirtIODevice *virtio_blk_init(DeviceState *dev, BlockConf *conf,
     s->vdev.get_config = virtio_blk_update_config;
     s->vdev.get_features = virtio_blk_get_features;
     s->vdev.set_status = virtio_blk_set_status;
+    s->vdev.reset = virtio_blk_reset;
     s->bs = conf->bs;
     s->conf = conf;
     s->serial = *serial;
-- 
1.7.10.4

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux