Since you are implementing new APIs here, have you considered doing an aio_sendfilev to be able to send a header with the data ? Regards Suparna On Wed, Jan 17, 2007 at 09:30:35AM +0300, Evgeniy Polyakov wrote: > > Kevent based AIO (aio_sendfile()/aio_sendfile_path()). > > aio_sendfile()/aio_sendfile_path() contains of two major parts: AIO > state machine and page processing code. > The former is just a small subsystem, which allows to queue callback > for theirs invocation in process' context on behalf of pool of kernel > threads. It allows to queue caches of callbacks to the local thread > or to any other specified. Each cache of callbacks is processed until > there are callbacks in it, callbacks can requeue themselfs into the > same cache. > > Real work is being done in page processing code - code which populates > pages into VFS cache and then sends pages to the destination socket > via ->sendpage(). Unlike previous aio_sendfile() implementation, new > one does not require low-level filesystem specific callbacks (->get_block()) > at all, instead I extended struct address_space_operations to contain new > member called ->aio_readpages(), which is exactly the same as ->readpage() > (read: mpage_readpages()) except different BIO allocation and sumbission > routines. I changed mpage_readpages() to provide mpage_alloc() and > mpage_bio_submit() to the new function called __mpage_readpages(), which is > exactly old mpage_readpages() with provided callback invocation instead of > usage for old functions. mpage_readpages_aio() provides kevent specific > callbacks, which calls old functions, but with different destructor callbacks, > which are essentially the same, except that they reschedule AIO processing. > > aio_sendfile_path() is essentially aio_sendfile(), except that it takes > source filename as parameter and returns opened file descriptor. > > Benchmark of the 100 1MB files transfer (files are in VFS already) using sync > sendfile() against aio_sendfile_path() shows about 10MB/sec performance win > (78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server is one-way > AMD Athlong 64 3500+) for aio_sendfile_path(). > > AIO state machine is a base for network AIO (which becomes > quite trivial), but I will not start implementation until > roadback of kevent as a whole and AIO implementation become more clear. > > Signed-off-by: Evgeniy Polyakov <johnpol@xxxxxxxxxxx> > > diff --git a/fs/bio.c b/fs/bio.c > index 7618bcb..291e7e8 100644 > --- a/fs/bio.c > +++ b/fs/bio.c > @@ -120,7 +120,7 @@ void bio_free(struct bio *bio, struct bio_set *bio_set) > /* > * default destructor for a bio allocated with bio_alloc_bioset() > */ > -static void bio_fs_destructor(struct bio *bio) > +void bio_fs_destructor(struct bio *bio) > { > bio_free(bio, fs_bio_set); > } > diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c > index beaf25f..f08c957 100644 > --- a/fs/ext3/inode.c > +++ b/fs/ext3/inode.c > @@ -1650,6 +1650,13 @@ ext3_readpages(struct file *file, struct address_space *mapping, > return mpage_readpages(mapping, pages, nr_pages, ext3_get_block); > } > > +static int > +ext3_readpages_aio(struct file *file, struct address_space *mapping, > + struct list_head *pages, unsigned nr_pages, void *priv) > +{ > + return mpage_readpages_aio(mapping, pages, nr_pages, ext3_get_block, priv); > +} > + > static void ext3_invalidatepage(struct page *page, unsigned long offset) > { > journal_t *journal = EXT3_JOURNAL(page->mapping->host); > @@ -1768,6 +1775,7 @@ static int ext3_journalled_set_page_dirty(struct page *page) > } > > static const struct address_space_operations ext3_ordered_aops = { > + .aio_readpages = ext3_readpages_aio, > .readpage = ext3_readpage, > .readpages = ext3_readpages, > .writepage = ext3_ordered_writepage, > diff --git a/fs/mpage.c b/fs/mpage.c > index 692a3e5..e5ba44b 100644 > --- a/fs/mpage.c > +++ b/fs/mpage.c > @@ -102,7 +102,7 @@ static struct bio *mpage_bio_submit(int rw, struct bio *bio) > static struct bio * > mpage_alloc(struct block_device *bdev, > sector_t first_sector, int nr_vecs, > - gfp_t gfp_flags) > + gfp_t gfp_flags, void *priv) > { > struct bio *bio; > > @@ -116,6 +116,7 @@ mpage_alloc(struct block_device *bdev, > if (bio) { > bio->bi_bdev = bdev; > bio->bi_sector = first_sector; > + bio->bi_private = priv; > } > return bio; > } > @@ -175,7 +176,10 @@ map_buffer_to_page(struct page *page, struct buffer_head *bh, int page_block) > static struct bio * > do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages, > sector_t *last_block_in_bio, struct buffer_head *map_bh, > - unsigned long *first_logical_block, get_block_t get_block) > + unsigned long *first_logical_block, get_block_t get_block, > + struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector, > + int nr_vecs, gfp_t gfp_flags, void *priv), > + struct bio *(*submit)(int rw, struct bio *bio), void *priv) > { > struct inode *inode = page->mapping->host; > const unsigned blkbits = inode->i_blkbits; > @@ -302,25 +306,25 @@ do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages, > * This page will go to BIO. Do we need to send this BIO off first? > */ > if (bio && (*last_block_in_bio != blocks[0] - 1)) > - bio = mpage_bio_submit(READ, bio); > + bio = submit(READ, bio); > > alloc_new: > if (bio == NULL) { > - bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9), > + bio = alloc(bdev, blocks[0] << (blkbits - 9), > min_t(int, nr_pages, bio_get_nr_vecs(bdev)), > - GFP_KERNEL); > + GFP_KERNEL, priv); > if (bio == NULL) > goto confused; > } > > length = first_hole << blkbits; > if (bio_add_page(bio, page, length, 0) < length) { > - bio = mpage_bio_submit(READ, bio); > + bio = submit(READ, bio); > goto alloc_new; > } > > if (buffer_boundary(map_bh) || (first_hole != blocks_per_page)) > - bio = mpage_bio_submit(READ, bio); > + bio = submit(READ, bio); > else > *last_block_in_bio = blocks[blocks_per_page - 1]; > out: > @@ -328,7 +332,7 @@ out: > > confused: > if (bio) > - bio = mpage_bio_submit(READ, bio); > + bio = submit(READ, bio); > if (!PageUptodate(page)) > block_read_full_page(page, get_block); > else > @@ -336,6 +340,48 @@ confused: > goto out; > } > > +int > +__mpage_readpages(struct address_space *mapping, struct list_head *pages, > + unsigned nr_pages, get_block_t get_block, > + struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector, > + int nr_vecs, gfp_t gfp_flags, void *priv), > + struct bio *(*submit)(int rw, struct bio *bio), > + void *priv) > +{ > + struct bio *bio = NULL; > + unsigned page_idx; > + sector_t last_block_in_bio = 0; > + struct pagevec lru_pvec; > + struct buffer_head map_bh; > + unsigned long first_logical_block = 0; > + > + clear_buffer_mapped(&map_bh); > + pagevec_init(&lru_pvec, 0); > + for (page_idx = 0; page_idx < nr_pages; page_idx++) { > + struct page *page = list_entry(pages->prev, struct page, lru); > + > + prefetchw(&page->flags); > + list_del(&page->lru); > + if (!add_to_page_cache(page, mapping, > + page->index, GFP_KERNEL)) { > + bio = do_mpage_readpage(bio, page, > + nr_pages - page_idx, > + &last_block_in_bio, &map_bh, > + &first_logical_block, > + get_block, alloc, submit, priv); > + if (!pagevec_add(&lru_pvec, page)) > + __pagevec_lru_add(&lru_pvec); > + } else { > + page_cache_release(page); > + } > + } > + pagevec_lru_add(&lru_pvec); > + BUG_ON(!list_empty(pages)); > + if (bio) > + submit(READ, bio); > + return 0; > +} > + > /** > * mpage_readpages - populate an address space with some pages, and > * start reads against them. > @@ -386,40 +432,28 @@ int > mpage_readpages(struct address_space *mapping, struct list_head *pages, > unsigned nr_pages, get_block_t get_block) > { > - struct bio *bio = NULL; > - unsigned page_idx; > - sector_t last_block_in_bio = 0; > - struct pagevec lru_pvec; > - struct buffer_head map_bh; > - unsigned long first_logical_block = 0; > + return __mpage_readpages(mapping, pages, nr_pages, get_block, > + mpage_alloc, mpage_bio_submit, NULL); > +} > +EXPORT_SYMBOL(mpage_readpages); > > - clear_buffer_mapped(&map_bh); > - pagevec_init(&lru_pvec, 0); > - for (page_idx = 0; page_idx < nr_pages; page_idx++) { > - struct page *page = list_entry(pages->prev, struct page, lru); > +#ifdef CONFIG_KEVENT_AIO > +extern struct bio *kaio_mpage_alloc(struct block_device *bdev, sector_t first_sector, > + int nr_vecs, gfp_t gfp_flags, void *priv); > +extern struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio); > +#else > +#define kaio_mpage_alloc mpage_alloc > +#define kaio_mpage_bio_submit mpage_bio_submit > +#endif > > - prefetchw(&page->flags); > - list_del(&page->lru); > - if (!add_to_page_cache(page, mapping, > - page->index, GFP_KERNEL)) { > - bio = do_mpage_readpage(bio, page, > - nr_pages - page_idx, > - &last_block_in_bio, &map_bh, > - &first_logical_block, > - get_block); > - if (!pagevec_add(&lru_pvec, page)) > - __pagevec_lru_add(&lru_pvec); > - } else { > - page_cache_release(page); > - } > - } > - pagevec_lru_add(&lru_pvec); > - BUG_ON(!list_empty(pages)); > - if (bio) > - mpage_bio_submit(READ, bio); > - return 0; > +int > +mpage_readpages_aio(struct address_space *mapping, struct list_head *pages, > + unsigned nr_pages, get_block_t get_block, void *priv) > +{ > + return __mpage_readpages(mapping, pages, nr_pages, get_block, > + kaio_mpage_alloc, kaio_mpage_bio_submit, priv); > } > -EXPORT_SYMBOL(mpage_readpages); > +EXPORT_SYMBOL(mpage_readpages_aio); > > /* > * This isn't called much at all > @@ -433,7 +467,8 @@ int mpage_readpage(struct page *page, get_block_t get_block) > > clear_buffer_mapped(&map_bh); > bio = do_mpage_readpage(bio, page, 1, &last_block_in_bio, > - &map_bh, &first_logical_block, get_block); > + &map_bh, &first_logical_block, get_block, > + mpage_alloc, mpage_bio_submit, NULL); > if (bio) > mpage_bio_submit(READ, bio); > return 0; > @@ -595,7 +630,7 @@ page_is_mapped: > alloc_new: > if (bio == NULL) { > bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9), > - bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH); > + bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH, NULL); > if (bio == NULL) > goto confused; > } > diff --git a/include/linux/mpage.h b/include/linux/mpage.h > index cc5fb75..accdbdd 100644 > --- a/include/linux/mpage.h > +++ b/include/linux/mpage.h > @@ -16,6 +16,8 @@ typedef int (writepage_t)(struct page *page, struct writeback_control *wbc); > > int mpage_readpages(struct address_space *mapping, struct list_head *pages, > unsigned nr_pages, get_block_t get_block); > +int mpage_readpages_aio(struct address_space *mapping, struct list_head *pages, > + unsigned nr_pages, get_block_t get_block, void *priv); > int mpage_readpage(struct page *page, get_block_t get_block); > int mpage_writepages(struct address_space *mapping, > struct writeback_control *wbc, get_block_t get_block); > diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c > new file mode 100644 > index 0000000..d4c1c5f > --- /dev/null > +++ b/kernel/kevent/kevent_aio.c > @@ -0,0 +1,881 @@ > +/* > + * 2006 Copyright (c) Evgeniy Polyakov <johnpol@xxxxxxxxxxx> > + * All rights reserved. > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA > + */ > + > +#include <linux/kernel.h> > +#include <linux/module.h> > +#include <linux/types.h> > +#include <linux/spinlock.h> > +#include <linux/list.h> > +#include <linux/kthread.h> > +#include <linux/slab.h> > +#include <linux/bio.h> > +#include <linux/pagemap.h> > +#include <linux/file.h> > +#include <linux/swap.h> > +#include <linux/kevent.h> > + > +#define KAIO_CALL_NUM 8 > +#define KAIO_THREAD_NUM 8 > + > +//#define KEVENT_AIO_DEBUG > + > +#ifdef KEVENT_AIO_DEBUG > +#define dprintk(f, a...) printk(f, ##a) > +#else > +#define dprintk(f, a...) do {} while (0) > +#endif > + > +struct kaio_thread > +{ > + struct list_head req_list; > + spinlock_t req_lock; > + struct task_struct *thread; > + int refcnt; > + wait_queue_head_t wait; > +}; > + > +extern struct file_operations kevent_user_fops; > +static DEFINE_PER_CPU(u32, kaio_req_counter); > +static DEFINE_PER_CPU(int, kaio_req_cpu); > + > +/* > + * Array of working threads. > + * It can only be accessed under RCU protection, > + * so threads reference counters are not atomic. > + */ > +static struct kaio_thread *kaio_threads[KAIO_THREAD_NUM] __read_mostly; > +static struct kmem_cache *kaio_req_cache __read_mostly; > +static struct kmem_cache *kaio_priv_cache __read_mostly; > + > +struct kaio_req; > +typedef int (* kaio_callback)(struct kaio_req *req, int direct); > + > +#define KAIO_REQ_PENDING 0 > + > +/* > + * Cache of kaio request callbacks. > + * It is not allowed to change the same cache entry > + * simultaneously (for example it is forbidden to add entries > + * in parallel). > + * > + * When cache entry is scheduled for execution in one of the threads, > + * it is forbidden to access it, since it will be freed when > + * all callbacks have been invoked. > + * > + * It is possible to add callbacks into this cache from callbacks itself. > + */ > +struct kaio_req > +{ > + struct list_head req_entry; > + kaio_callback call[KAIO_CALL_NUM]; > + int read_idx, add_idx; > + int cpu; > + long flags; > + atomic_t refcnt; > + void (*destructor)(struct kaio_req *); > + void *priv; > +}; > + > +/* > + * Returns pointer to thread entry for given index. > + * Must be called under RCU protection. > + */ > +static struct kaio_thread *kaio_get_thread(int cpu) > +{ > + struct kaio_thread *th; > + > + if (cpu == -1) { > +#if 1 > + int *cnt = &__get_cpu_var(kaio_req_cpu); > + cpu = *cnt; > + > + *cnt = *cnt + 1; > + if (*cnt >= KAIO_THREAD_NUM) > + *cnt = 0; > +#else > + cpu = 0; > +#endif > + } > + > + if (unlikely(cpu >= KAIO_THREAD_NUM || !kaio_threads[cpu])) > + return NULL; > + > + th = kaio_threads[cpu]; > + th->refcnt++; > + > + return th; > +} > + > +/* > + * Drops reference counter for given thread. > + * Must be called under RCU protection. > + */ > +static inline void kaio_put_thread(struct kaio_thread *th) > +{ > + th->refcnt--; > +} > + > +void kaio_schedule_req(struct kaio_req *req) > +{ > + struct kaio_thread *th; > + unsigned long flags; > + > + rcu_read_lock(); > + th = kaio_get_thread(req->cpu); > + if (!th) { > + req->cpu = -1; > + th = kaio_get_thread(-1); > + BUG_ON(!th); > + } > + > + if (!test_and_set_bit(KAIO_REQ_PENDING, &req->flags)) { > + spin_lock_irqsave(&th->req_lock, flags); > + list_add_tail(&req->req_entry, &th->req_list); > + spin_unlock_irqrestore(&th->req_lock, flags); > + } > + > + wake_up(&th->wait); > + > + kaio_put_thread(th); > + rcu_read_unlock(); > +} > + > +EXPORT_SYMBOL_GPL(kaio_schedule_req); > + > +static inline void kaio_req_get(struct kaio_req *req) > +{ > + atomic_inc(&req->refcnt); > +} > + > +static inline int kaio_req_put(struct kaio_req *req) > +{ > + if (atomic_dec_and_test(&req->refcnt)) { > + dprintk("%s: freeing req: %p, priv: %p.\n", __func__, req, req->priv); > + if (req->destructor) > + req->destructor(req); > + kmem_cache_free(kaio_req_cache, req); > + return 1; > + } > + > + return 0; > +} > + > +/* > + * Append a call request into cache. > + * Returns -EOVERFLOW in case cache is full, and 0 otherwise. > + */ > +int kaio_append_call(struct kaio_req *req, kaio_callback call) > +{ > + if ((req->add_idx + 1 == req->read_idx) || > + ((req->add_idx + 1 == KAIO_CALL_NUM) && req->read_idx == 0)) > + return -EOVERFLOW; > + > + req->call[req->add_idx] = call; > + > + dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n", > + __func__, req, req->read_idx, req->add_idx, > + req->call[req->read_idx], req->call[req->add_idx]); > + if (++req->add_idx == KAIO_CALL_NUM) > + req->add_idx = 0; > + > + kaio_req_get(req); > + > + return 0; > +} > + > +EXPORT_SYMBOL_GPL(kaio_append_call); > + > +/* > + * Adds one call request into given cache. > + * If cache is NULL or full, allocate new one. > + */ > +struct kaio_req *kaio_add_call(struct kaio_req *req, kaio_callback call, int cpu, gfp_t gflags) > +{ > + if (req && !kaio_append_call(req, call)) { > + kaio_schedule_req(req); > + return req; > + } > + > + req = kmem_cache_alloc(kaio_req_cache, gflags); > + if (!req) > + return NULL; > + > + memset(req->call, 0, sizeof(req->call)); > + > + req->destructor = NULL; > + req->cpu = cpu; > + req->call[0] = call; > + req->add_idx = 1; > + req->read_idx = 0; > + req->flags = 0; > + atomic_set(&req->refcnt, 1); > + > + dprintk("%s: req: %p, call: %p [%p].\n", __func__, req, call, req->call[0]); > + > + return req; > +} > + > +EXPORT_SYMBOL_GPL(kaio_add_call); > + > +/* > + * Call appropriate callbacks in cache. > + * This can only be called by working threads, which means that cache > + * is filled (probably partially) and are not even accessible from > + * the originator of requests, which means that cache will be freed > + * when all callbacks are invoked. > + * > + * Callback itself can reschedule new callback into the same cache. > + * > + * If callback returns negative value, the whole cache will be freed. > + * If positive value is returned, then further processing is stopped, > + * so cache can be queued into the end of the processing FIFO by callback. > + * If zero is returned, next callback will be invoked if any. > + */ > +static int kaio_call(struct kaio_req *req) > +{ > + int err = -EINVAL; > + > + if (likely(req->add_idx != req->read_idx)) { > + dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n", > + __func__, req, req->read_idx, req->add_idx, > + req->call[req->read_idx], req->call[0]); > + err = (*req->call[req->read_idx])(req, 0); > + if (++req->read_idx == KAIO_CALL_NUM) > + req->read_idx = 0; > + > + if (kaio_req_put(req)) > + err = 0; > + } > + return err; > +} > + > +static int kaio_thread_process(void *data) > +{ > + struct kaio_thread *th = data; > + unsigned long flags; > + struct kaio_req *req, *first; > + DECLARE_WAITQUEUE(wait, current); > + int err; > + > + add_wait_queue_exclusive(&th->wait, &wait); > + > + while (!kthread_should_stop()) { > + first = req = NULL; > + do { > + req = NULL; > + spin_lock_irqsave(&th->req_lock, flags); > + if (!list_empty(&th->req_list)) { > + req = list_entry(th->req_list.prev, struct kaio_req, req_entry); > + if (first != req) > + list_del(&req->req_entry); > + } > + spin_unlock_irqrestore(&th->req_lock, flags); > + > + if (!first) > + first = req; > + else if (first == req) > + break; > + > + if (req) { > + err = 0; > + while ((req->read_idx != req->add_idx) && !kthread_should_stop()) { > + dprintk("%s: req: %p, read_idx: %d, add_idx: %d, err: %d.\n", > + __func__, req, req->read_idx, req->add_idx, err); > + err = kaio_call(req); > + if (err != 0) > + break; > + } > + > + if (err > 0) { > + spin_lock_irqsave(&th->req_lock, flags); > + list_add_tail(&req->req_entry, &th->req_list); > + spin_unlock_irqrestore(&th->req_lock, flags); > + } > + } > + } while (req); > + __set_current_state(TASK_INTERRUPTIBLE); > + schedule_timeout(HZ); > + __set_current_state(TASK_RUNNING); > + } > + > + remove_wait_queue(&th->wait, &wait); > + > + return 0; > +} > + > +struct kaio_private > +{ > + union { > + void *sptr; > + __u64 sdata; > + }; > + union { > + void *dptr; > + __u64 ddata; > + }; > + __u64 offset, processed; > + __u64 count, limit; > + struct kevent_user *kevent_user; > +}; > + > +extern void bio_fs_destructor(struct bio *bio); > + > +static void kaio_bio_destructor(struct bio *bio) > +{ > + dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt); > + bio_fs_destructor(bio); > +} > + > +static int kaio_read_send_pages(struct kaio_req *req, int direct); > + > +static int kaio_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, int err) > +{ > + const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags); > + struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1; > + struct kaio_req *req = bio->bi_private; > + > + if (bio->bi_size) > + return 1; > + > + do { > + struct page *page = bvec->bv_page; > + > + if (--bvec >= bio->bi_io_vec) > + prefetchw(&bvec->bv_page->flags); > + > + if (uptodate) { > + SetPageUptodate(page); > + } else { > + ClearPageUptodate(page); > + SetPageError(page); > + } > + unlock_page(page); > + } while (bvec >= bio->bi_io_vec); > + > + dprintk("%s: bio: %p, req: %p, pending: %d.\n", > + __func__, bio, req, test_bit(KAIO_REQ_PENDING, &req->flags)); > + > + kaio_append_call(req, kaio_read_send_pages); > + kaio_req_put(req); > + kaio_schedule_req(req); > + > + bio_put(bio); > + return 0; > +} > + > +struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio) > +{ > + if (bio) { > + bio->bi_end_io = kaio_mpage_end_io_read; > + dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt); > + submit_bio(READ, bio); > + } > + return NULL; > +} > + > +struct bio *kaio_mpage_alloc(struct block_device *bdev, > + sector_t first_sector, int nr_vecs, gfp_t gfp_flags, void *priv) > +{ > + struct bio *bio; > + > + bio = bio_alloc(gfp_flags, nr_vecs); > + > + if (bio == NULL && (current->flags & PF_MEMALLOC)) { > + while (!bio && (nr_vecs /= 2)) > + bio = bio_alloc(gfp_flags, nr_vecs); > + } > + > + if (bio) { > + struct kaio_req *req = priv; > + > + bio->bi_bdev = bdev; > + bio->bi_sector = first_sector; > + bio->bi_private = priv; > + bio->bi_destructor = kaio_bio_destructor; > + kaio_req_get(req); > + dprintk("%s: bio: %p, req: %p, num: %d.\n", __func__, bio, priv, nr_vecs); > + } > + return bio; > +} > + > +static ssize_t kaio_vfs_read_actor(struct kaio_private *priv, struct page *page, size_t len) > +{ > + struct socket *sock = priv->dptr; > + struct file *file = sock->file; > + > + return file->f_op->sendpage(file, page, 0, len, &file->f_pos, 1); > +} > + > +static int kaio_vfs_read(struct kaio_private *priv, > + ssize_t (*actor)(struct kaio_private *, struct page *, size_t)) > +{ > + struct address_space *mapping; > + struct file *file = priv->sptr; > + ssize_t actor_size; > + loff_t isize; > + int i = 0, pg_num; > + > + mapping = file->f_mapping; > + isize = i_size_read(file->f_dentry->d_inode); > + > + if (priv->processed >= isize) { > + priv->count = 0; > + return 0; > + } > + priv->count = isize - priv->processed; > + pg_num = ALIGN(min_t(u64, isize, priv->count), PAGE_SIZE) >> PAGE_SHIFT; > + > + dprintk("%s: start: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n", > + __func__, priv, i, pg_num, priv->count, priv->offset, priv->processed); > + > + for (i=0; i<pg_num && priv->count; ++i) { > + struct page *page; > + size_t nr = PAGE_CACHE_SIZE; > + > + page = find_get_page(mapping, priv->processed >> PAGE_CACHE_SHIFT); > + if (unlikely(page == NULL)) > + break; > + if (!PageUptodate(page)) { > + dprintk("%s: %2d: page=%p, processed=%Lu, count=%Lu not uptodate.\n", > + __func__, i, page, priv->processed, priv->count); > + page_cache_release(page); > + break; > + } > + > + if (mapping_writably_mapped(mapping)) > + flush_dcache_page(page); > + > + mark_page_accessed(page); > + > + if (nr + priv->processed > isize) > + nr = isize - priv->processed; > + if (nr > priv->count) > + nr = priv->count; > + > + actor_size = actor(priv, page, nr); > + if (actor_size < 0) { > + page_cache_release(page); > + i = (int)actor_size; > + break; > + } > + > + page_cache_release(page); > + > + priv->processed += actor_size; > + priv->count -= actor_size; > + } > + > + if (!priv->count) > + i = pg_num; > + > + dprintk("%s: end: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n", > + __func__, priv, i, pg_num, priv->count, priv->offset, priv->processed); > + > + return i; > +} > + > +static int kaio_read_send_pages(struct kaio_req *req, int direct) > +{ > + struct kaio_private *priv = req->priv; > + struct file *file = priv->sptr; > + struct address_space *mapping = file->f_mapping; > + struct page *page; > + int err, i, num; > + u64 offset; > + LIST_HEAD(page_pool); > + > + err = kaio_vfs_read(priv, &kaio_vfs_read_actor); > + if (err < 0) > + return err; > + > + if (err == 0) { > + priv->limit >>= 1; > + } else { > + if (priv->limit) > + priv->limit <<= 1; > + else > + priv->limit = 8; > + } > + > + if (priv->offset < priv->processed) > + priv->offset = priv->processed; > + > + if (!priv->count) { > + kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL); > + return 0; > + } > + > + if (priv->offset >= priv->processed + priv->count) { > + kaio_append_call(req, kaio_read_send_pages); > + return 0; > + } > + > + num = min_t(int, max_sane_readahead(priv->limit), > + ALIGN(priv->count, PAGE_SIZE) >> PAGE_SHIFT); > + > + offset = priv->offset; > + for (i=0; i<num; ++i) { > + page = page_cache_alloc_cold(mapping); > + if (!page) > + break; > + > + page->index = priv->offset >> PAGE_CACHE_SHIFT; > + list_add(&page->lru, &page_pool); > + > + priv->offset += PAGE_CACHE_SIZE; > + } > + > + dprintk("%s: submit: req: %p, priv: %p, offset: %Lu, num: %d, limit: %Lu.\n", > + __func__, req, priv, offset, i, priv->limit); > + > + err = mapping->a_ops->aio_readpages(file, mapping, &page_pool, i, req); > + if (err) { > + dprintk("%s: kevent_mpage_readpages failed: err=%d, count=%Lu.\n", > + __func__, err, priv->count); > + kaio_schedule_req(req); > + return err; > + } > + > + return 1; > +} > + > +static int kaio_add_kevent(int fd, struct kaio_req *req) > +{ > + struct ukevent uk; > + struct file *file; > + struct kevent_user *u; > + int err, need_fput = 0; > + u32 *cnt; > + > + file = fget_light(fd, &need_fput); > + if (!file) { > + err = -EBADF; > + goto err_out; > + } > + > + if (file->f_op != &kevent_user_fops) { > + err = -EINVAL; > + goto err_out_fput; > + } > + > + u = file->private_data; > + > + memset(&uk, 0, sizeof(struct ukevent)); > + > + uk.event = KEVENT_MASK_ALL; > + uk.type = KEVENT_AIO; > + > + preempt_disable(); > + uk.id.raw[0] = smp_processor_id(); > + cnt = &__get_cpu_var(kaio_req_counter); > + uk.id.raw[1] = *cnt; > + *cnt = *cnt + 1; > + preempt_enable(); > + > + uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE; > + uk.ptr = req; > + > + err = kevent_user_add_ukevent(&uk, u); > + if (err) > + goto err_out_fput; > + > + kevent_user_get(u); > + > + fput_light(file, need_fput); > + > + return 0; > + > +err_out_fput: > + fput_light(file, need_fput); > +err_out: > + return err; > +} > + > +static void kaio_destructor(struct kaio_req *req) > +{ > + struct kaio_private *priv = req->priv; > + struct socket *sock = priv->dptr; > + struct file *file = priv->sptr; > + > + fput(file); > + sockfd_put(sock); > + > + kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL); > + kevent_user_put(priv->kevent_user); > + > + kmem_cache_free(kaio_priv_cache, req->priv); > +} > + > +static struct kaio_req *kaio_sendfile(int kevent_fd, int sock_fd, struct file *file, off_t offset, size_t count) > +{ > + struct kaio_req *req; > + struct socket *sock; > + struct kaio_private *priv; > + int err; > + > + sock = sockfd_lookup(sock_fd, &err); > + if (!sock) > + goto err_out_exit; > + > + priv = kmem_cache_alloc(kaio_priv_cache, GFP_KERNEL); > + if (!priv) > + goto err_out_sput; > + > + priv->sptr = file; > + priv->dptr = sock; > + priv->offset = offset; > + priv->count = min_t(u64, i_size_read(file->f_dentry->d_inode), count); > + priv->processed = offset; > + priv->limit = 128; > + > + req = kaio_add_call(NULL, &kaio_read_send_pages, -1, GFP_KERNEL); > + if (!req) > + goto err_out_free; > + > + req->destructor = kaio_destructor; > + req->priv = priv; > + > + err = kaio_add_kevent(kevent_fd, req); > + > + dprintk("%s: req: %p, priv: %p, call: %p [%p], offset: %Lu, processed: %Lu, count: %Lu, err: %d.\n", > + __func__, req, priv, &kaio_read_send_pages, > + kaio_read_send_pages, priv->offset, priv->processed, priv->count, err); > + > + if (err) > + goto err_out_remove; > + > + kaio_schedule_req(req); > + > + return req; > + > +err_out_remove: > + /* It is safe to just free the object since it is guaranteed that it was not > + * queued for processing. > + */ > + kmem_cache_free(kaio_req_cache, req); > +err_out_free: > + kmem_cache_free(kaio_priv_cache, priv); > +err_out_sput: > + sockfd_put(sock); > +err_out_exit: > + return NULL; > + > +} > + > +asmlinkage long sys_aio_sendfile(int kevent_fd, int sock_fd, int in_fd, off_t offset, size_t count) > +{ > + struct kaio_req *req; > + struct file *file; > + int err; > + > + file = fget(in_fd); > + if (!file) { > + err = -EBADF; > + goto err_out_exit; > + } > + > + req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count); > + if (!req) { > + err = -EINVAL; > + goto err_out_fput; > + } > + > + return (long)req; > + > +err_out_fput: > + fput(file); > +err_out_exit: > + return err; > +} > + > +asmlinkage long sys_aio_sendfile_path(int kevent_fd, int sock_fd, char __user *filename, off_t offset, size_t count) > +{ > + char *tmp = getname(filename); > + int fd = PTR_ERR(tmp); > + int flags = O_RDONLY, err; > + struct nameidata nd; > + struct file *file; > + struct kaio_req *req; > + > + if (force_o_largefile()) > + flags = O_LARGEFILE; > + > + if (IS_ERR(tmp)) { > + err = fd; > + goto err_out_exit; > + } > + > + fd = get_unused_fd(); > + if (fd < 0) { > + err = fd; > + goto err_out_put_name; > + } > + > + if ((flags+1) & O_ACCMODE) > + flags++; > + > + err = open_namei(AT_FDCWD, tmp, flags, 0400, &nd); > + if (err) > + goto err_out_fdput; > + > + file = nameidata_to_filp(&nd, flags); > + if (!file) > + goto err_out_fdput; > + > + /* One reference will be released in sys_close(), > + * second one through req->destructor() > + */ > + atomic_inc(&file->f_count); > + > + req = kaio_sendfile(kevent_fd, sock_fd, file, offset, count); > + if (!req) { > + err = -EINVAL; > + goto err_out_fput; > + } > + > + fd_install(fd, file); > + > + return fd; > + > +err_out_fput: > + fput(file); > + fput(file); > +err_out_fdput: > + put_unused_fd(fd); > +err_out_put_name: > + putname(tmp); > +err_out_exit: > + return err; > +} > + > +static int kevent_aio_callback(struct kevent *k) > +{ > + struct kaio_req *req = k->event.ptr; > + struct kaio_private *priv = req->priv; > + > + if (!priv->count) { > + __u32 *processed = (__u32 *)&priv->processed; > + k->event.ret_data[0] = processed[0]; > + k->event.ret_data[1] = processed[1]; > + return 1; > + } > + > + return 0; > +} > + > +int kevent_aio_enqueue(struct kevent *k) > +{ > + int err; > + struct kaio_req *req = k->event.ptr; > + struct kaio_private *priv = req->priv; > + > + err = kevent_storage_enqueue(&k->user->st, k); > + if (err) > + goto err_out_exit; > + > + priv->kevent_user = k->user; > + if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE) > + kevent_requeue(k); > + > + return 0; > + > +err_out_exit: > + return err; > +} > + > +int kevent_aio_dequeue(struct kevent *k) > +{ > + kevent_storage_dequeue(k->st, k); > + > + return 0; > +} > + > +static void kaio_thread_stop(struct kaio_thread *th) > +{ > + kthread_stop(th->thread); > + kfree(th); > +} > + > +static int kaio_thread_start(struct kaio_thread **thp, int num) > +{ > + struct kaio_thread *th; > + > + th = kzalloc(sizeof(struct kaio_thread), GFP_KERNEL); > + if (!th) > + return -ENOMEM; > + > + th->refcnt = 1; > + spin_lock_init(&th->req_lock); > + INIT_LIST_HEAD(&th->req_list); > + init_waitqueue_head(&th->wait); > + > + th->thread = kthread_run(kaio_thread_process, th, "kaio/%d", num); > + if (IS_ERR(th->thread)) { > + int err = PTR_ERR(th->thread); > + kfree(th); > + return err; > + } > + > + *thp = th; > + wmb(); > + > + return 0; > +} > + > +static int __init kevent_init_aio(void) > +{ > + struct kevent_callbacks sc = { > + .callback = &kevent_aio_callback, > + .enqueue = &kevent_aio_enqueue, > + .dequeue = &kevent_aio_dequeue, > + .flags = 0, > + }; > + int err, i; > + > + kaio_req_cache = kmem_cache_create("kaio_req", sizeof(struct kaio_req), > + 0, SLAB_PANIC, NULL, NULL); > + kaio_priv_cache = kmem_cache_create("kaio_priv", sizeof(struct kaio_private), > + 0, SLAB_PANIC, NULL, NULL); > + > + memset(kaio_threads, 0, sizeof(kaio_threads)); > + > + for (i=0; i<KAIO_THREAD_NUM; ++i) { > + err = kaio_thread_start(&kaio_threads[i], i); > + if (err) > + goto err_out_stop; > + } > + > + err = kevent_add_callbacks(&sc, KEVENT_AIO); > + if (err) > + goto err_out_stop; > + > + return 0; > + > +err_out_stop: > + while (--i >= 0) { > + struct kaio_thread *th = kaio_threads[i]; > + > + kaio_threads[i] = NULL; > + wmb(); > + > + kaio_thread_stop(th); > + } > + return err; > +} > +module_init(kevent_init_aio); > > - > To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in > the body of a message to majordomo@xxxxxxxxxxxxxxx > More majordomo info at http://vger.kernel.org/majordomo-info.html -- Suparna Bhattacharya (suparna@xxxxxxxxxx) Linux Technology Center IBM Software Lab, India - To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html