Re: [PATCH v4 15/16] virtio-fs: add virtiofs filesystem

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Sep 03, 2019 at 09:55:49AM -0400, Michael S. Tsirkin wrote:
[..]
> What's with all of the TODOs? Some of these are really scary,
> looks like they need to be figured out before this is merged.

Hi Michael,

One of the issue I noticed is races w.r.t device removal and super
block initialization. I am about to post a set of patches which
take care of these races and also get rid of some of the scary
TODOs. Other TODOs like suspend/restore, multiqueue support etc
are improvements which we can do over a period of time.

[..]
> > +/* Per-virtqueue state */
> > +struct virtio_fs_vq {
> > +	spinlock_t lock;
> > +	struct virtqueue *vq;     /* protected by ->lock */
> > +	struct work_struct done_work;
> > +	struct list_head queued_reqs;
> > +	struct delayed_work dispatch_work;
> > +	struct fuse_dev *fud;
> > +	bool connected;
> > +	long in_flight;
> > +	char name[24];
> 
> I'd keep names somewhere separate as they are not used on data path.

Ok, this sounds like a nice to have. Will take care of this once base
patch gets merged.

[..]
> > +struct virtio_fs_forget {
> > +	struct fuse_in_header ih;
> > +	struct fuse_forget_in arg;
> 
> These structures are all native endian.
> 
> Passing them to host will make cross-endian setups painful to support,
> and hardware implementations impossible.
> 
> How about converting everything to LE?

So looks like endianness issue is now resolved (going by the other
emails). So I will not worry about it.

[..]
> > +/* Add a new instance to the list or return -EEXIST if tag name exists*/
> > +static int virtio_fs_add_instance(struct virtio_fs *fs)
> > +{
> > +	struct virtio_fs *fs2;
> > +	bool duplicate = false;
> > +
> > +	mutex_lock(&virtio_fs_mutex);
> > +
> > +	list_for_each_entry(fs2, &virtio_fs_instances, list) {
> > +		if (strcmp(fs->tag, fs2->tag) == 0)
> > +			duplicate = true;
> > +	}
> > +
> > +	if (!duplicate)
> > +		list_add_tail(&fs->list, &virtio_fs_instances);
> 
> 
> This is O(N^2) as it's presumably called for each istance.
> Doesn't scale - please switch to a tree or such.

This is O(N) and not O(N^2) right? Addition of device is O(N), search
during mount is O(N).

This is not a frequent event at all and number of virtiofs instances
per guest are expected to be fairly small (say less than 10). So I 
really don't think there is any value in converting this into a tree
(instead of a list).

[..]
> > +static void virtio_fs_free_devs(struct virtio_fs *fs)
> > +{
> > +	unsigned int i;
> > +
> > +	/* TODO lock */
> 
> Doesn't inspire confidence, does it?

Agreed. Getting rid of this in set of fixes I am about to post.

> 
> > +
> > +	for (i = 0; i < fs->nvqs; i++) {
> > +		struct virtio_fs_vq *fsvq = &fs->vqs[i];
> > +
> > +		if (!fsvq->fud)
> > +			continue;
> > +
> > +		flush_work(&fsvq->done_work);
> > +		flush_delayed_work(&fsvq->dispatch_work);
> > +
> > +		/* TODO need to quiesce/end_requests/decrement dev_count */
> 
> Indeed. Won't this crash if we don't?

Took care of this as well.

[..]
> > +static void virtio_fs_hiprio_dispatch_work(struct work_struct *work)
> > +{
> > +	struct virtio_fs_forget *forget;
> > +	struct virtio_fs_vq *fsvq = container_of(work, struct virtio_fs_vq,
> > +						 dispatch_work.work);
> > +	struct virtqueue *vq = fsvq->vq;
> > +	struct scatterlist sg;
> > +	struct scatterlist *sgs[] = {&sg};
> > +	bool notify;
> > +	int ret;
> > +
> > +	pr_debug("virtio-fs: worker %s called.\n", __func__);
> > +	while (1) {
> > +		spin_lock(&fsvq->lock);
> > +		forget = list_first_entry_or_null(&fsvq->queued_reqs,
> > +					struct virtio_fs_forget, list);
> > +		if (!forget) {
> > +			spin_unlock(&fsvq->lock);
> > +			return;
> > +		}
> > +
> > +		list_del(&forget->list);
> > +		if (!fsvq->connected) {
> > +			spin_unlock(&fsvq->lock);
> > +			kfree(forget);
> > +			continue;
> > +		}
> > +
> > +		sg_init_one(&sg, forget, sizeof(*forget));
> 
> This passes to host a structure including "struct list_head list";
> 
> Not a good idea.

Ok, host does not have to see "struct list_head list". Its needed for
guest. Will look into getting rid of this.

> 
> 
> > +
> > +		/* Enqueue the request */
> > +		dev_dbg(&vq->vdev->dev, "%s\n", __func__);
> > +		ret = virtqueue_add_sgs(vq, sgs, 1, 0, forget, GFP_ATOMIC);
> 
> 
> This is easier as add_outbuf.

Will look into it.

> 
> Also - why GFP_ATOMIC?

Hmm..., may be it can be GFP_KERNEL. I don't see atomic context here. Will
look into it.

> 
> > +		if (ret < 0) {
> > +			if (ret == -ENOMEM || ret == -ENOSPC) {
> > +				pr_debug("virtio-fs: Could not queue FORGET: err=%d. Will try later\n",
> > +					 ret);
> > +				list_add_tail(&forget->list,
> > +						&fsvq->queued_reqs);
> > +				schedule_delayed_work(&fsvq->dispatch_work,
> > +						msecs_to_jiffies(1));
> 
> Can't we we requeue after some buffers get consumed?

That's what dispatch work is doing. It tries to requeue the request after
a while.

[..]
> > +static int virtio_fs_probe(struct virtio_device *vdev)
> > +{
> > +	struct virtio_fs *fs;
> > +	int ret;
> > +
> > +	fs = devm_kzalloc(&vdev->dev, sizeof(*fs), GFP_KERNEL);
> > +	if (!fs)
> > +		return -ENOMEM;
> > +	vdev->priv = fs;
> > +
> > +	ret = virtio_fs_read_tag(vdev, fs);
> > +	if (ret < 0)
> > +		goto out;
> > +
> > +	ret = virtio_fs_setup_vqs(vdev, fs);
> > +	if (ret < 0)
> > +		goto out;
> > +
> > +	/* TODO vq affinity */
> > +	/* TODO populate notifications vq */
> 
> what's notifications vq?

It has not been implemented yet. At some point of time we want to have
a notion of notification queue so that host can send notifications to
guest. Will get rid of this comment for now.

[..]
> > +#ifdef CONFIG_PM_SLEEP
> > +static int virtio_fs_freeze(struct virtio_device *vdev)
> > +{
> > +	return 0; /* TODO */
> > +}
> > +
> > +static int virtio_fs_restore(struct virtio_device *vdev)
> > +{
> > +	return 0; /* TODO */
> > +}
> 
> Is this really a good idea? I'd rather it was implemented,
> but if not possible at all disabling PM seems better than just
> keep going.

I agree. Will look into disabling it.

> 
> > +#endif /* CONFIG_PM_SLEEP */
> > +
> > +const static struct virtio_device_id id_table[] = {
> > +	{ VIRTIO_ID_FS, VIRTIO_DEV_ANY_ID },
> > +	{},
> > +};
> > +
> > +const static unsigned int feature_table[] = {};
> > +
> > +static struct virtio_driver virtio_fs_driver = {
> > +	.driver.name		= KBUILD_MODNAME,
> > +	.driver.owner		= THIS_MODULE,
> > +	.id_table		= id_table,
> > +	.feature_table		= feature_table,
> > +	.feature_table_size	= ARRAY_SIZE(feature_table),
> > +	/* TODO validate config_get != NULL */
> 
> Why?

Don't know. Stefan, do you remember why did you put this comment? If not,
I will get rid of it.

> 
> > +	.probe			= virtio_fs_probe,
> > +	.remove			= virtio_fs_remove,
> > +#ifdef CONFIG_PM_SLEEP
> > +	.freeze			= virtio_fs_freeze,
> > +	.restore		= virtio_fs_restore,
> > +#endif
> > +};
> > +
> > +static void virtio_fs_wake_forget_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +	struct fuse_forget_link *link;
> > +	struct virtio_fs_forget *forget;
> > +	struct scatterlist sg;
> > +	struct scatterlist *sgs[] = {&sg};
> > +	struct virtio_fs *fs;
> > +	struct virtqueue *vq;
> > +	struct virtio_fs_vq *fsvq;
> > +	bool notify;
> > +	u64 unique;
> > +	int ret;
> > +
> > +	link = fuse_dequeue_forget(fiq, 1, NULL);
> > +	unique = fuse_get_unique(fiq);
> > +
> > +	fs = fiq->priv;
> > +	fsvq = &fs->vqs[VQ_HIPRIO];
> > +	spin_unlock(&fiq->waitq.lock);
> > +
> > +	/* Allocate a buffer for the request */
> > +	forget = kmalloc(sizeof(*forget), GFP_NOFS | __GFP_NOFAIL);
> > +
> > +	forget->ih = (struct fuse_in_header){
> > +		.opcode = FUSE_FORGET,
> > +		.nodeid = link->forget_one.nodeid,
> > +		.unique = unique,
> > +		.len = sizeof(*forget),
> > +	};
> > +	forget->arg = (struct fuse_forget_in){
> > +		.nlookup = link->forget_one.nlookup,
> > +	};
> > +
> > +	sg_init_one(&sg, forget, sizeof(*forget));
> > +
> > +	/* Enqueue the request */
> > +	vq = fsvq->vq;
> > +	dev_dbg(&vq->vdev->dev, "%s\n", __func__);
> > +	spin_lock(&fsvq->lock);
> > +
> > +	ret = virtqueue_add_sgs(vq, sgs, 1, 0, forget, GFP_ATOMIC);
> > +	if (ret < 0) {
> > +		if (ret == -ENOMEM || ret == -ENOSPC) {
> > +			pr_debug("virtio-fs: Could not queue FORGET: err=%d. Will try later.\n",
> > +				 ret);
> > +			list_add_tail(&forget->list, &fsvq->queued_reqs);
> > +			schedule_delayed_work(&fsvq->dispatch_work,
> > +					msecs_to_jiffies(1));
> > +		} else {
> > +			pr_debug("virtio-fs: Could not queue FORGET: err=%d. Dropping it.\n",
> > +				 ret);
> > +			kfree(forget);
> > +		}
> > +		spin_unlock(&fsvq->lock);
> > +		goto out;
> > +	}
> 
> 
> code duplicated from virtio_fs_hiprio_dispatch_work

Will look into reusing the code.

> 
> > +
> > +	fsvq->in_flight++;
> > +	notify = virtqueue_kick_prepare(vq);
> > +
> > +	spin_unlock(&fsvq->lock);
> > +
> > +	if (notify)
> > +		virtqueue_notify(vq);
> > +out:
> > +	kfree(link);
> > +}
> > +
> > +static void virtio_fs_wake_interrupt_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +	/* TODO */
> 
> what's needed?

We have not implemented this interrupt thing. It interrupts the existing
pending requests. Its not a must to have.

[..]
> > +static int virtio_fs_enqueue_req(struct virtqueue *vq, struct fuse_req *req)
> > +{
> > +	/* requests need at least 4 elements */
> > +	struct scatterlist *stack_sgs[6];
> > +	struct scatterlist stack_sg[ARRAY_SIZE(stack_sgs)];
> > +	struct scatterlist **sgs = stack_sgs;
> > +	struct scatterlist *sg = stack_sg;
> > +	struct virtio_fs_vq *fsvq;
> > +	unsigned int argbuf_used = 0;
> > +	unsigned int out_sgs = 0;
> > +	unsigned int in_sgs = 0;
> > +	unsigned int total_sgs;
> > +	unsigned int i;
> > +	int ret;
> > +	bool notify;
> > +
> > +	/* Does the sglist fit on the stack? */
> > +	total_sgs = sg_count_fuse_req(req);
> > +	if (total_sgs > ARRAY_SIZE(stack_sgs)) {
> > +		sgs = kmalloc_array(total_sgs, sizeof(sgs[0]), GFP_ATOMIC);
> > +		sg = kmalloc_array(total_sgs, sizeof(sg[0]), GFP_ATOMIC);
> > +		if (!sgs || !sg) {
> > +			ret = -ENOMEM;
> > +			goto out;
> > +		}
> > +	}
> > +
> > +	/* Use a bounce buffer since stack args cannot be mapped */
> > +	ret = copy_args_to_argbuf(req);
> > +	if (ret < 0)
> > +		goto out;
> > +
> > +	/* Request elements */
> > +	sg_init_one(&sg[out_sgs++], &req->in.h, sizeof(req->in.h));
> > +	out_sgs += sg_init_fuse_args(&sg[out_sgs], req,
> > +				     (struct fuse_arg *)req->in.args,
> > +				     req->in.numargs, req->in.argpages,
> > +				     req->argbuf, &argbuf_used);
> > +
> > +	/* Reply elements */
> > +	if (test_bit(FR_ISREPLY, &req->flags)) {
> > +		sg_init_one(&sg[out_sgs + in_sgs++],
> > +			    &req->out.h, sizeof(req->out.h));
> > +		in_sgs += sg_init_fuse_args(&sg[out_sgs + in_sgs], req,
> > +					    req->out.args, req->out.numargs,
> > +					    req->out.argpages,
> > +					    req->argbuf + argbuf_used, NULL);
> > +	}
> > +
> > +	WARN_ON(out_sgs + in_sgs != total_sgs);
> > +
> > +	for (i = 0; i < total_sgs; i++)
> > +		sgs[i] = &sg[i];
> > +
> > +	fsvq = vq_to_fsvq(vq);
> > +	spin_lock(&fsvq->lock);
> > +
> > +	ret = virtqueue_add_sgs(vq, sgs, out_sgs, in_sgs, req, GFP_ATOMIC);
> > +	if (ret < 0) {
> > +		/* TODO handle full virtqueue */
> 
> Indeed. What prevents this?

So for forget requests (VQ_HIPRIO), I already handled this by retrying 
submitting the request with the help of a worker.

For regular requests (VQ_REQUESTS), I have never run into virt queue
being full so far. That's why never worried about it.

So nothing prevents this. But we have not noticed it yet. So its a TODO
item. It will be nice to retry if virtuqueue gets full (instead of
returning error to caller).

[..]
> > +static void virtio_fs_wake_pending_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +	unsigned int queue_id = VQ_REQUEST; /* TODO multiqueue */
> > +	struct virtio_fs *fs;
> > +	struct fuse_conn *fc;
> > +	struct fuse_req *req;
> > +	struct fuse_pqueue *fpq;
> > +	int ret;
> > +
> > +	WARN_ON(list_empty(&fiq->pending));
> > +	req = list_last_entry(&fiq->pending, struct fuse_req, list);
> > +	clear_bit(FR_PENDING, &req->flags);
> > +	list_del_init(&req->list);
> > +	WARN_ON(!list_empty(&fiq->pending));
> > +	spin_unlock(&fiq->waitq.lock);
> > +
> > +	fs = fiq->priv;
> > +	fc = fs->vqs[queue_id].fud->fc;
> > +
> > +	dev_dbg(&fs->vqs[queue_id].vq->vdev->dev,
> > +		"%s: opcode %u unique %#llx nodeid %#llx in.len %u out.len %u\n",
> > +		__func__, req->in.h.opcode, req->in.h.unique, req->in.h.nodeid,
> > +		req->in.h.len, fuse_len_args(req->out.numargs, req->out.args));
> > +
> > +	fpq = &fs->vqs[queue_id].fud->pq;
> > +	spin_lock(&fpq->lock);
> > +	if (!fpq->connected) {
> > +		spin_unlock(&fpq->lock);
> > +		req->out.h.error = -ENODEV;
> > +		pr_err("virtio-fs: %s disconnected\n", __func__);
> > +		fuse_request_end(fc, req);
> > +		return;
> > +	}
> > +	list_add_tail(&req->list, fpq->processing);
> > +	spin_unlock(&fpq->lock);
> > +	set_bit(FR_SENT, &req->flags);
> > +	/* matches barrier in request_wait_answer() */
> > +	smp_mb__after_atomic();
> > +	/* TODO check for FR_INTERRUPTED? */
> 
> 
> ?

hmm... we don't support FR_INTERRUPTED. Stefan, do you remember why
this TODO is here. If not, I will get rid of it.

> 
> > +
> > +retry:
> > +	ret = virtio_fs_enqueue_req(fs->vqs[queue_id].vq, req);
> > +	if (ret < 0) {
> > +		if (ret == -ENOMEM || ret == -ENOSPC) {
> > +			/* Virtqueue full. Retry submission */
> > +			usleep_range(20, 30);
> 
> again, why not respond to completion?

Using usleep() was a quick fix. Will look into using completion in second
round of cleanup. In first round I am taking care of more scary TODOs
which deal with races w.r.t device removal.

> 
> > +			goto retry;
> > +		}
> > +		req->out.h.error = ret;
> > +		pr_err("virtio-fs: virtio_fs_enqueue_req() failed %d\n", ret);
> > +		fuse_request_end(fc, req);
> > +		return;
> > +	}
> > +}
> > +
> > +static void virtio_fs_flush_hiprio_queue(struct virtio_fs_vq *fsvq)
> > +{
> > +	struct virtio_fs_forget *forget;
> > +
> > +	WARN_ON(fsvq->in_flight < 0);
> > +
> > +	/* Go through pending forget requests and free them */
> > +	spin_lock(&fsvq->lock);
> > +	while (1) {
> > +		forget = list_first_entry_or_null(&fsvq->queued_reqs,
> > +					struct virtio_fs_forget, list);
> > +		if (!forget)
> > +			break;
> > +		list_del(&forget->list);
> > +		kfree(forget);
> > +	}
> > +
> > +	spin_unlock(&fsvq->lock);
> > +
> > +	/* Wait for in flight requests to finish.*/
> > +	while (1) {
> > +		spin_lock(&fsvq->lock);
> > +		if (!fsvq->in_flight) {
> > +			spin_unlock(&fsvq->lock);
> > +			break;
> > +		}
> > +		spin_unlock(&fsvq->lock);
> > +		usleep_range(1000, 2000);
> 
> Same thing here. Can we use e.g. a completion and not usleep?

Second round cleanup.

> 
> > +	}
> > +}
> > +
> > +const static struct fuse_iqueue_ops virtio_fs_fiq_ops = {
> > +	.wake_forget_and_unlock		= virtio_fs_wake_forget_and_unlock,
> > +	.wake_interrupt_and_unlock	= virtio_fs_wake_interrupt_and_unlock,
> > +	.wake_pending_and_unlock	= virtio_fs_wake_pending_and_unlock,
> > +};
> > +
> > +static int virtio_fs_fill_super(struct super_block *sb)
> > +{
> > +	struct fuse_conn *fc = get_fuse_conn_super(sb);
> > +	struct virtio_fs *fs = fc->iq.priv;
> > +	unsigned int i;
> > +	int err;
> > +	struct fuse_req *init_req;
> > +	struct fuse_fs_context ctx = {
> > +		.rootmode = S_IFDIR,
> > +		.default_permissions = 1,
> > +		.allow_other = 1,
> > +		.max_read = UINT_MAX,
> > +		.blksize = 512,
> > +		.destroy = true,
> > +		.no_control = true,
> > +		.no_force_umount = true,
> > +	};
> > +
> > +	/* TODO lock */
> 
> what does this refer to?

Took care of in first round of cleanup.

> 
> > +	if (fs->vqs[VQ_REQUEST].fud) {
> > +		pr_err("virtio-fs: device already in use\n");
> > +		err = -EBUSY;
> > +		goto err;
> > +	}
> > +
> > +	err = -ENOMEM;
> > +	/* Allocate fuse_dev for hiprio and notification queues */
> > +	for (i = 0; i < VQ_REQUEST; i++) {
> > +		struct virtio_fs_vq *fsvq = &fs->vqs[i];
> > +
> > +		fsvq->fud = fuse_dev_alloc();
> > +		if (!fsvq->fud)
> > +			goto err_free_fuse_devs;
> > +	}
> > +
> > +	init_req = fuse_request_alloc(0);
> > +	if (!init_req)
> > +		goto err_free_fuse_devs;
> > +	__set_bit(FR_BACKGROUND, &init_req->flags);
> > +
> > +	ctx.fudptr = (void **)&fs->vqs[VQ_REQUEST].fud;
> > +	err = fuse_fill_super_common(sb, &ctx);
> > +	if (err < 0)
> > +		goto err_free_init_req;
> > +
> > +	fc = fs->vqs[VQ_REQUEST].fud->fc;
> > +
> > +	/* TODO take fuse_mutex around this loop? */
> 
> Don't we need to figure this kind of thing out before this
> code lands upstream?

I think we don't need this TODO. fuse_connection object and associated
super block are still being formed. And my cleanup has taken care of
the additional locking.

Thanks
Vivek



[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]

  Powered by Linux