On Tue, May 28, 2019 at 6:03 PM David Howells <dhowells@xxxxxxxxxx> wrote: > Implement a misc device that implements a general notification queue as a > ring buffer that can be mmap()'d from userspace. [...] > +receive notifications from the kernel. This is can be used in conjunction typo: s/is can/can/ [...] > +Overview > +======== > + > +This facility appears as a misc device file that is opened and then mapped and > +polled. Each time it is opened, it creates a new buffer specific to the > +returned file descriptor. Then, when the opening process sets watches, it > +indicates that particular buffer it wants notifications from that watch to be > +written into. Note that there are no read() and write() methods (except for s/that particular buffer/the particular buffer/ > +debugging). The user is expected to access the ring directly and to use poll > +to wait for new data. [...] > +/** > + * __post_watch_notification - Post an event notification > + * @wlist: The watch list to post the event to. > + * @n: The notification record to post. > + * @cred: The creds of the process that triggered the notification. > + * @id: The ID to match on the watch. > + * > + * Post a notification of an event into a set of watch queues and let the users > + * know. > + * > + * If @n is NULL then WATCH_INFO_LENGTH will be set on the next event posted. > + * > + * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and > + * should be in units of sizeof(*n). > + */ > +void __post_watch_notification(struct watch_list *wlist, > + struct watch_notification *n, > + const struct cred *cred, > + u64 id) > +{ > + const struct watch_filter *wf; > + struct watch_queue *wqueue; > + struct watch *watch; > + > + rcu_read_lock(); > + > + hlist_for_each_entry_rcu(watch, &wlist->watchers, list_node) { > + if (watch->id != id) > + continue; > + n->info &= ~(WATCH_INFO_ID | WATCH_INFO_OVERRUN); > + n->info |= watch->info_id; > + > + wqueue = rcu_dereference(watch->queue); > + wf = rcu_dereference(wqueue->filter); > + if (wf && !filter_watch_notification(wf, n)) > + continue; > + > + post_one_notification(wqueue, n, cred); > + } > + > + rcu_read_unlock(); > +} > +EXPORT_SYMBOL(__post_watch_notification); [...] > +static vm_fault_t watch_queue_fault(struct vm_fault *vmf) > +{ > + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; > + struct page *page; > + > + page = wqueue->pages[vmf->pgoff]; I don't see you setting any special properties on the VMA that would prevent userspace from extending its size via mremap() - no VM_DONTEXPAND or VM_PFNMAP. So I think you might get an out-of-bounds access here? > + get_page(page); > + if (!lock_page_or_retry(page, vmf->vma->vm_mm, vmf->flags)) { > + put_page(page); > + return VM_FAULT_RETRY; > + } > + vmf->page = page; > + return VM_FAULT_LOCKED; > +} > + > +static void watch_queue_map_pages(struct vm_fault *vmf, > + pgoff_t start_pgoff, pgoff_t end_pgoff) > +{ > + struct watch_queue *wqueue = vmf->vma->vm_file->private_data; > + struct page *page; > + > + rcu_read_lock(); > + > + do { > + page = wqueue->pages[start_pgoff]; Same as above. > + if (trylock_page(page)) { > + vm_fault_t ret; > + get_page(page); > + ret = alloc_set_pte(vmf, NULL, page); > + if (ret != 0) > + put_page(page); > + > + unlock_page(page); > + } > + } while (++start_pgoff < end_pgoff); > + > + rcu_read_unlock(); > +} [...] > +static int watch_queue_mmap(struct file *file, struct vm_area_struct *vma) > +{ > + struct watch_queue *wqueue = file->private_data; > + > + if (vma->vm_pgoff != 0 || > + vma->vm_end - vma->vm_start > wqueue->nr_pages * PAGE_SIZE || > + !(pgprot_val(vma->vm_page_prot) & pgprot_val(PAGE_SHARED))) > + return -EINVAL; This thing should probably have locking against concurrent watch_queue_set_size()? > + vma->vm_ops = &watch_queue_vm_ops; > + > + vma_interval_tree_insert(vma, &wqueue->mapping.i_mmap); > + return 0; > +} > + > +/* > + * Allocate the required number of pages. > + */ > +static long watch_queue_set_size(struct watch_queue *wqueue, unsigned long nr_pages) > +{ > + struct watch_queue_buffer *buf; > + u32 len; > + int i; > + > + if (nr_pages == 0 || > + nr_pages > 16 || /* TODO: choose a better hard limit */ > + !is_power_of_2(nr_pages)) > + return -EINVAL; > + > + wqueue->pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); > + if (!wqueue->pages) > + goto err; > + > + for (i = 0; i < nr_pages; i++) { > + wqueue->pages[i] = alloc_page(GFP_KERNEL | __GFP_ZERO); > + if (!wqueue->pages[i]) > + goto err_some_pages; > + wqueue->pages[i]->mapping = &wqueue->mapping; > + SetPageUptodate(wqueue->pages[i]); > + } > + > + buf = vmap(wqueue->pages, nr_pages, VM_MAP, PAGE_SHARED); > + if (!buf) > + goto err_some_pages; > + > + wqueue->buffer = buf; > + wqueue->nr_pages = nr_pages; > + wqueue->size = ((nr_pages * PAGE_SIZE) / sizeof(struct watch_notification)); > + > + /* The first four slots in the buffer contain metadata about the ring, > + * including the head and tail indices and mask. > + */ > + len = sizeof(buf->meta) / sizeof(buf->slots[0]); > + buf->meta.watch.info = len << WATCH_LENGTH_SHIFT; > + buf->meta.watch.type = WATCH_TYPE_META; > + buf->meta.watch.subtype = WATCH_META_SKIP_NOTIFICATION; > + buf->meta.tail = len; > + buf->meta.mask = wqueue->size - 1; > + smp_store_release(&buf->meta.head, len); Why is this an smp_store_release()? The entire buffer should not be visible to userspace before this setup is complete, right? > + return 0; > + > +err_some_pages: > + for (i--; i >= 0; i--) { > + ClearPageUptodate(wqueue->pages[i]); > + wqueue->pages[i]->mapping = NULL; > + put_page(wqueue->pages[i]); > + } > + > + kfree(wqueue->pages); > + wqueue->pages = NULL; > +err: > + return -ENOMEM; > +} [...] > + > +/* > + * Set parameters. > + */ > +static long watch_queue_ioctl(struct file *file, unsigned int cmd, unsigned long arg) > +{ > + struct watch_queue *wqueue = file->private_data; > + struct inode *inode = file_inode(file); > + long ret; > + > + switch (cmd) { > + case IOC_WATCH_QUEUE_SET_SIZE: > + if (wqueue->buffer) > + return -EBUSY; The preceding check occurs without any locks held and therefore has no reliable effect. It should probably be moved below the inode_lock(...). > + inode_lock(inode); > + ret = watch_queue_set_size(wqueue, arg); > + inode_unlock(inode); > + return ret; > + > + case IOC_WATCH_QUEUE_SET_FILTER: > + inode_lock(inode); > + ret = watch_queue_set_filter( > + inode, wqueue, > + (struct watch_notification_filter __user *)arg); > + inode_unlock(inode); > + return ret; > + > + default: > + return -EOPNOTSUPP; > + } > +} [...] > +static void free_watch(struct rcu_head *rcu) > +{ > + struct watch *watch = container_of(rcu, struct watch, rcu); > + > + put_watch_queue(rcu_access_pointer(watch->queue)); This should be rcu_dereference_protected(..., 1). > +/** > + * remove_watch_from_object - Remove a watch or all watches from an object. > + * @wlist: The watch list to remove from > + * @wq: The watch queue of interest (ignored if @all is true) > + * @id: The ID of the watch to remove (ignored if @all is true) > + * @all: True to remove all objects > + * > + * Remove a specific watch or all watches from an object. A notification is > + * sent to the watcher to tell them that this happened. > + */ > +int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq, > + u64 id, bool all) > +{ > + struct watch_notification n; > + struct watch_queue *wqueue; > + struct watch *watch; > + int ret = -EBADSLT; > + > + rcu_read_lock(); > + > +again: > + spin_lock(&wlist->lock); > + hlist_for_each_entry(watch, &wlist->watchers, list_node) { > + if (all || > + (watch->id == id && rcu_access_pointer(watch->queue) == wq)) > + goto found; > + } > + spin_unlock(&wlist->lock); > + goto out; > + > +found: > + ret = 0; > + hlist_del_init_rcu(&watch->list_node); > + rcu_assign_pointer(watch->watch_list, NULL); > + spin_unlock(&wlist->lock); > + > + n.type = WATCH_TYPE_META; > + n.subtype = WATCH_META_REMOVAL_NOTIFICATION; > + n.info = watch->info_id | sizeof(n); > + > + wqueue = rcu_dereference(watch->queue); > + post_one_notification(wqueue, &n, wq ? wq->cred : NULL); > + > + /* We don't need the watch list lock for the next bit as RCU is > + * protecting everything from being deallocated. Does "everything" mean "the wqueue" or more than that? > + */ > + if (wqueue) { > + spin_lock_bh(&wqueue->lock); > + > + if (!hlist_unhashed(&watch->queue_node)) { > + hlist_del_init_rcu(&watch->queue_node); > + put_watch(watch); > + } > + > + spin_unlock_bh(&wqueue->lock); > + } > + > + if (wlist->release_watch) { > + rcu_read_unlock(); > + wlist->release_watch(wlist, watch); > + rcu_read_lock(); > + } > + put_watch(watch); > + > + if (all && !hlist_empty(&wlist->watchers)) > + goto again; > +out: > + rcu_read_unlock(); > + return ret; > +} > +EXPORT_SYMBOL(remove_watch_from_object); > + > +/* > + * Remove all the watches that are contributory to a queue. This will > + * potentially race with removal of the watches by the destruction of the > + * objects being watched or the distribution of notifications. > + */ > +static void watch_queue_clear(struct watch_queue *wqueue) > +{ > + struct watch_list *wlist; > + struct watch *watch; > + bool release; > + > + rcu_read_lock(); > + spin_lock_bh(&wqueue->lock); > + > + /* Prevent new additions and prevent notifications from happening */ > + wqueue->defunct = true; > + > + while (!hlist_empty(&wqueue->watches)) { > + watch = hlist_entry(wqueue->watches.first, struct watch, queue_node); > + hlist_del_init_rcu(&watch->queue_node); > + spin_unlock_bh(&wqueue->lock); > + > + /* We can't do the next bit under the queue lock as we need to > + * get the list lock - which would cause a deadlock if someone > + * was removing from the opposite direction at the same time or > + * posting a notification. > + */ > + wlist = rcu_dereference(watch->watch_list); > + if (wlist) { > + spin_lock(&wlist->lock); > + > + release = !hlist_unhashed(&watch->list_node); > + if (release) { > + hlist_del_init_rcu(&watch->list_node); > + rcu_assign_pointer(watch->watch_list, NULL); > + } > + > + spin_unlock(&wlist->lock); > + > + if (release) { > + if (wlist->release_watch) { > + rcu_read_unlock(); > + /* This might need to call dput(), so > + * we have to drop all the locks. > + */ > + wlist->release_watch(wlist, watch); How are you holding a reference to `wlist` here? You got the reference through rcu_dereference(), you've dropped the RCU read lock, and I don't see anything that stabilizes the reference. > + rcu_read_lock(); > + } > + put_watch(watch); > + } > + } > + > + put_watch(watch); > + spin_lock_bh(&wqueue->lock); > + } > + > + spin_unlock_bh(&wqueue->lock); > + rcu_read_unlock(); > +} > + > +/* > + * Release the file. > + */ > +static int watch_queue_release(struct inode *inode, struct file *file) > +{ > + struct watch_filter *wfilter; > + struct watch_queue *wqueue = file->private_data; > + int i, pgref; > + > + watch_queue_clear(wqueue); > + > + if (wqueue->pages && wqueue->pages[0]) > + WARN_ON(page_ref_count(wqueue->pages[0]) != 1); Is there a reason why there couldn't still be references to the pages from get_user_pages()/get_user_pages_fast()? > + if (wqueue->buffer) > + vfree(wqueue->buffer); > + for (i = 0; i < wqueue->nr_pages; i++) { > + ClearPageUptodate(wqueue->pages[i]); > + wqueue->pages[i]->mapping = NULL; > + pgref = page_ref_count(wqueue->pages[i]); > + WARN(pgref != 1, > + "FREE PAGE[%d] refcount %d\n", i, page_ref_count(wqueue->pages[i])); > + __free_page(wqueue->pages[i]); > + } > + > + wfilter = rcu_access_pointer(wqueue->filter); Again, rcu_dereference_protected(..., 1). > + if (wfilter) > + kfree_rcu(wfilter, rcu); > + kfree(wqueue->pages); > + put_cred(wqueue->cred); > + put_watch_queue(wqueue); > + return 0; > +} > + > +#ifdef DEBUG_WITH_WRITE > +static ssize_t watch_queue_write(struct file *file, > + const char __user *_buf, size_t len, loff_t *pos) > +{ > + struct watch_notification *n; > + struct watch_queue *wqueue = file->private_data; > + ssize_t ret; > + > + if (!wqueue->buffer) > + return -ENOBUFS; > + > + if (len & ~WATCH_INFO_LENGTH || len == 0 || !_buf) > + return -EINVAL; > + > + n = memdup_user(_buf, len); > + if (IS_ERR(n)) > + return PTR_ERR(n); > + > + ret = -EINVAL; > + if ((n->info & WATCH_INFO_LENGTH) != len) > + goto error; > + n->info &= (WATCH_INFO_LENGTH | WATCH_INFO_TYPE_FLAGS | WATCH_INFO_ID); Should the non-atomic modification of n->info (and perhaps also the following uses of ->debug) be protected by some lock? > + if (post_one_notification(wqueue, n, file->f_cred)) > + wqueue->debug = 0; > + else > + wqueue->debug++; > + ret = len; > + if (wqueue->debug > 20) > + ret = -EIO; > + > +error: > + kfree(n); > + return ret; > +} > +#endif [...] > +#define IOC_WATCH_QUEUE_SET_SIZE _IO('s', 0x01) /* Set the size in pages */ > +#define IOC_WATCH_QUEUE_SET_FILTER _IO('s', 0x02) /* Set the filter */ Should these ioctl numbers be registered in Documentation/ioctl/ioctl-number.txt?