So far we are relying that for a locking requrest reply always comes first and then notification comes in. Both of these travel on different virtqueues and there is no guarantee that client will always see/process them in this order. Though it is a very unlikely event, but it is possible that notification comes first and then reply to lock request comes. Hence take care of this possibility. If notification arrives and does not find a corresponding request waiting, queue up notification in a list. When request reply will come, it will check if corresponding notification has already arrived. If yes, it will finish the request otherwise wait for notification to arrive. Signed-off-by: Vivek Goyal <vgoyal@xxxxxxxxxx> --- fs/fuse/virtio_fs.c | 134 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 116 insertions(+), 18 deletions(-) diff --git a/fs/fuse/virtio_fs.c b/fs/fuse/virtio_fs.c index 1634ea2d0555..028ca9198bc4 100644 --- a/fs/fuse/virtio_fs.c +++ b/fs/fuse/virtio_fs.c @@ -42,6 +42,17 @@ enum { #define VQ_NAME_LEN 24 #define VQ_NOTIFY_ELEMS 16 /* Number of notification elements */ +/* + * virtio_fs_vq->lock spinlock nesting subclasses: + * + * 0: normal + * 1: nested + */ +enum { + VQ_LOCK_NORMAL, + VQ_LOCK_NESTED +}; + /* Per-virtqueue state */ struct virtio_fs_vq { spinlock_t lock; @@ -52,6 +63,8 @@ struct virtio_fs_vq { struct list_head end_reqs; /* End these requests */ struct virtio_fs_notify_node *notify_nodes; struct list_head notify_reqs; /* List for queuing notify requests */ + /* Notifications to be processed when request reply arrives. */ + struct list_head wait_notify; struct delayed_work dispatch_work; struct fuse_dev *fud; bool connected; @@ -576,13 +589,32 @@ static int virtio_fs_enqueue_all_notify(struct virtio_fs_vq *fsvq) return 0; } +/* + * Find a waiting process/request and complete it. If request and + * notification got re-ordered, it is possible that there is no + * waiting request yet. In that case queue up notification and + * return 1 so that caller does not try to reuse notifiy node yet. + * + * Return: + * negative value : in case of error + * 0 : If request processed successfully + * 1 : Could not find waiting processed. Request got queued. + * Do not reuse this notify node yet. + */ static int notify_complete_waiting_req(struct virtio_fs *vfs, struct fuse_notify_lock_out *out_args) { /* TODO: Handle multiqueue */ struct virtio_fs_vq *fsvq = &vfs->vqs[vfs->first_reqq_idx]; + struct virtio_fs_vq *notify_fsvq = &vfs->vqs[VQ_NOTIFY_IDX]; struct fuse_req *req, *next; + struct virtio_fs_notify *notify; + struct virtio_fs_notify_node *notifyn; bool found = false; + int ret = 0; + + notify = container_of((void *)out_args, struct virtio_fs_notify, outarg); + notifyn = container_of(notify, struct virtio_fs_notify_node, notify); /* Find waiting request with the unique number and end it */ spin_lock(&fsvq->lock); @@ -596,25 +628,39 @@ static int notify_complete_waiting_req(struct virtio_fs *vfs, break; } } - spin_unlock(&fsvq->lock); - /* - * TODO: It is possible that some re-ordering happens in notify - * comes before request is complete. Deal with it. + * It probably is a rare case of re-ordering where notification + * arrived/processed before reply. Queue up the notification. */ + spin_lock_nested(¬ify_fsvq->lock, VQ_LOCK_NESTED); + if (!found) { + list_add_tail(¬ifyn->list, ¬ify_fsvq->wait_notify); + ret = 1; + } + spin_unlock(¬ify_fsvq->lock); + spin_unlock(&fsvq->lock); + if (found) { end_req_dec_in_flight(req, fsvq); - } else - pr_debug("virtio-fs: Did not find waiting request with unique=0x%llx\n", - out_args->unique); + } - return 0; + return ret; +} + +static void notify_node_reuse(struct virtio_fs_vq *notify_fsvq, + struct virtio_fs_notify_node *notifyn) +{ + spin_lock(¬ify_fsvq->lock); + list_add_tail(¬ifyn->list, ¬ify_fsvq->notify_reqs); + spin_unlock(¬ify_fsvq->lock); } static int virtio_fs_handle_notify(struct virtio_fs *vfs, - struct virtio_fs_notify *notify) + struct virtio_fs_notify_node *notifyn) { - int ret = 0; + int ret = 0, no_reuse = 0; + struct virtio_fs_notify *notify = ¬ifyn->notify; + struct virtio_fs_vq *notify_fsvq = &vfs->vqs[VQ_NOTIFY_IDX]; struct fuse_out_header *oh = ¬ify->out_hdr; struct fuse_notify_lock_out *lo; @@ -625,11 +671,15 @@ static int virtio_fs_handle_notify(struct virtio_fs *vfs, switch (oh->error) { case FUSE_NOTIFY_LOCK: lo = (struct fuse_notify_lock_out *) ¬ify->outarg; - notify_complete_waiting_req(vfs, lo); + no_reuse = notify_complete_waiting_req(vfs, lo); break; default: pr_err("virtio-fs: Unexpected notification %d\n", oh->error); } + + if (!no_reuse) + notify_node_reuse(notify_fsvq, notifyn); + return ret; } @@ -659,12 +709,11 @@ static void virtio_fs_notify_done_work(struct work_struct *work) list_for_each_entry_safe(notifyn, next, &reqs, list) { oh = ¬ifyn->notify.out_hdr; WARN_ON(oh->unique); + list_del_init(¬ifyn->list); /* Handle notification */ - virtio_fs_handle_notify(vfs, ¬ifyn->notify); + virtio_fs_handle_notify(vfs, notifyn); spin_lock(&fsvq->lock); dec_in_flight_req(fsvq); - list_del_init(¬ifyn->list); - list_add_tail(¬ifyn->list, &fsvq->notify_reqs); spin_unlock(&fsvq->lock); } @@ -747,6 +796,57 @@ static void copy_args_from_argbuf(struct fuse_args *args, struct fuse_req *req) req->argbuf = NULL; } +static void virtio_fs_wait_or_complete_request(struct virtio_fs_vq *fsvq, + struct fuse_req *req) +{ + struct virtqueue *vq = fsvq->vq; + struct virtio_fs *vfs = vq->vdev->priv; + struct virtio_fs_vq *notify_fsvq = &vfs->vqs[VQ_NOTIFY_IDX]; + bool found = false; + struct virtio_fs_notify_node *notifyn, *next; + + /* + * Note, notification queue lock nests inside request queue + * lock and not otherwise. + */ + spin_lock(&fsvq->lock); + spin_lock_nested(¬ify_fsvq->lock, VQ_LOCK_NESTED); + /* + * Check if there is already a notification event in case of + * reply and notification event got re-ordered. Very unlikley. + */ + list_for_each_entry_safe(notifyn, next, ¬ify_fsvq->wait_notify, + list) { + struct fuse_notify_lock_out *lo; + + lo = (struct fuse_notify_lock_out *) ¬ifyn->notify.outarg; + + if (req->in.h.unique == lo->unique) { + list_del_init(¬ifyn->list); + clear_bit(FR_SENT, &req->flags); + /* Transfer error code from notify */ + req->out.h.error = lo->error; + found = true; + break; + } + } + + if (!found) { + /* Wait for notification to arrive. */ + list_add_tail(&req->list, &fsvq->wait_reqs); + } + + spin_unlock(¬ify_fsvq->lock); + spin_unlock(&fsvq->lock); + + if (found) { + end_req_dec_in_flight(req, fsvq); + notify_node_reuse(notify_fsvq, notifyn); + if (fsvq->connected) + virtio_fs_enqueue_all_notify(fsvq); + } +} + /* Work function for request completion */ static void virtio_fs_request_complete(struct fuse_req *req, struct virtio_fs_vq *fsvq) @@ -762,10 +862,7 @@ static void virtio_fs_request_complete(struct fuse_req *req, */ if (req->out.h.error == 1) { /* Wait for notification to complete request */ - spin_lock(&fsvq->lock); - list_add_tail(&req->list, &fsvq->wait_reqs); - spin_unlock(&fsvq->lock); - return; + return virtio_fs_wait_or_complete_request(fsvq, req); } args = req->args; @@ -863,6 +960,7 @@ static int virtio_fs_init_vq(struct virtio_fs *fs, struct virtio_fs_vq *fsvq, INIT_LIST_HEAD(&fsvq->wait_reqs); INIT_LIST_HEAD(&fsvq->end_reqs); INIT_LIST_HEAD(&fsvq->notify_reqs); + INIT_LIST_HEAD(&fsvq->wait_notify); init_completion(&fsvq->in_flight_zero); if (vq_type == VQ_TYPE_REQUEST) { -- 2.31.1