On Wed, 11 Mar 2009 09:01:50 +0100 Tomasz Chmielewski <mangoo@xxxxxxxx> wrote: > FUJITA Tomonori schrieb: > > RHEL 5.3 release note says (I think that some of you guys already know > > this): > > > > = > > iSCSI target capability > > > > The iSCSI target capability, delivered as part of the Linux Target > > (tgt) framework, moves from Technology Preview to full support in Red > > (...) > > > http://www.redhat.com/docs/manuals/enterprise/ > > > > > > We need to be more serious. :) Hopefully, this would attract more > > users to tgt. > > > > Feel feel to post what features you want, what areas need to be fixed, > > etc. > > The only thing that bothers me is target/lun limit, caused by libc > pthread limit. I thought it only affects 32 bit targets, but now I see > it happens also with 64 bits. > > I understand it's a major design problem - do you think it could be > addressed? I.e., use prefork model used by Apache and just launch more > tgtd processes? I'm not sure Apache uses such model, 'client per process with preforking) because it sucks (I guess it used the model): http://www.kegel.com/c10k.html The only reason we have to use pthread is that Linux AIO is broken. I've been waiting for the new AIO mechanism: http://lwn.net/Articles/316193/ It would improve the performance too. Anyway, the target/lun limit problem is bad. So here's a patch to fix it. I've not tested it much. So let me know how it works. = From: FUJITA Tomonori <fujita.tomonori@xxxxxxxxxxxxx> Subject: [PATCH] add async threads Signed-off-by: FUJITA Tomonori <fujita.tomonori@xxxxxxxxxxxxx> --- usr/Makefile | 2 +- usr/async.c | 318 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ usr/async.h | 18 +++ usr/bs.c | 245 +++++++++++++++++++++++------------------- usr/scsi_cmnd.h | 4 + usr/tgtd.c | 7 ++ 6 files changed, 481 insertions(+), 113 deletions(-) create mode 100644 usr/async.c create mode 100644 usr/async.h diff --git a/usr/Makefile b/usr/Makefile index 3fc848e..743a2b6 100644 --- a/usr/Makefile +++ b/usr/Makefile @@ -61,7 +61,7 @@ LIBS += -lpthread PROGRAMS += tgtd tgtadm tgtimg SCRIPTS += ../scripts/tgt-setup-lun ../scripts/tgt-admin -TGTD_OBJS += tgtd.o mgmt.o target.o scsi.o log.o driver.o util.o work.o \ +TGTD_OBJS += tgtd.o mgmt.o target.o scsi.o log.o driver.o util.o work.o async.o \ parser.o spc.o sbc.o mmc.o osd.o scc.o smc.o \ ssc.o bs_ssc.o libssc.o \ bs_null.o bs_sg.o bs.o libcrc32c.o diff --git a/usr/async.c b/usr/async.c new file mode 100644 index 0000000..85fdbee --- /dev/null +++ b/usr/async.c @@ -0,0 +1,318 @@ +/* + * infrastructure to perform synchronous functions asynchronously + * + * Copyright (C) 2008 FUJITA Tomonori <tomof@xxxxxxx> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, version 2 of the + * License. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA + */ +#include <errno.h> +#include <string.h> +#include <inttypes.h> +#include <pthread.h> +#include <sys/epoll.h> + +#include "list.h" +#include "util.h" +#include "tgtd.h" +#include "async.h" + +#define NR_THREADS 8 + +struct async_task_info { + pthread_t ack_thread; + pthread_t worker_thread[NR_THREADS]; + + /* protected by pipe */ + struct list_head ack_list; + + pthread_cond_t finished_cond; + pthread_mutex_t finished_lock; + struct list_head finished_list; + + /* wokers sleep on this and signaled by tgtd */ + pthread_cond_t pending_cond; + /* locked by tgtd and workers */ + pthread_mutex_t pending_lock; + /* protected by pending_lock */ + struct list_head pending_list; + + pthread_mutex_t startup_lock; + + int command_fd[2]; + int done_fd[2]; + + int stop; +}; + +struct async_task_info *info, __info; + +static void *atask_ack_fn(void *arg) +{ + struct async_task_info *info = arg; + int command, ret, nr; + struct atask *task; + +retry: + ret = read(info->command_fd[0], &command, sizeof(command)); + if (ret < 0) { + eprintf("ack pthread will be dead, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto retry; + + goto out; + } + + if (info->stop) + goto out; + + pthread_mutex_lock(&info->finished_lock); +retest: + if (list_empty(&info->finished_list)) { + pthread_cond_wait(&info->finished_cond, &info->finished_lock); + goto retest; + } + + while (!list_empty(&info->finished_list)) { + task = list_first_entry(&info->finished_list, + struct atask, at_list); + + list_del(&task->at_list); + list_add(&task->at_list, &info->ack_list); + } + + pthread_mutex_unlock(&info->finished_lock); + + nr = 1; +rewrite: + ret = write(info->done_fd[1], &nr, sizeof(nr)); + if (ret < 0) { + eprintf("can't ack tgtd, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + goto out; + } + + goto retry; +out: + pthread_exit(NULL); +} + +static void *bs_thread_worker_fn(void *arg) +{ + struct async_task_info *info = arg; + struct atask *task; + + pthread_mutex_lock(&info->startup_lock); + dprintf("started this thread\n"); + pthread_mutex_unlock(&info->startup_lock); + + while (!info->stop) { + pthread_mutex_lock(&info->pending_lock); + retest: + if (list_empty(&info->pending_list)) { + pthread_cond_wait(&info->pending_cond, &info->pending_lock); + if (info->stop) { + pthread_mutex_unlock(&info->pending_lock); + pthread_exit(NULL); + } + goto retest; + } + + task = list_first_entry(&info->pending_list, + struct atask, at_list); + + + list_del(&task->at_list); + pthread_mutex_unlock(&info->pending_lock); + + task->fn(task); + + pthread_mutex_lock(&info->finished_lock); + list_add(&task->at_list, &info->finished_list); + pthread_mutex_unlock(&info->finished_lock); + + pthread_cond_signal(&info->finished_cond); + } + + pthread_exit(NULL); +} + +static void atask_done_fn(int fd, int events, void *data) +{ + struct async_task_info *info = data; + struct atask *task; + int nr_events, ret; + + ret = read(info->done_fd[0], &nr_events, sizeof(nr_events)); + if (ret < 0) { + eprintf("wrong wakeup\n"); + return; + } + + while (!list_empty(&info->ack_list)) { + task = list_first_entry(&info->ack_list, + struct atask, at_list); + + list_del(&task->at_list); + task->done(task); + } + +rewrite: + ret = write(info->command_fd[1], &nr_events, sizeof(nr_events)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + return; + } +} + +int init_async(void) +{ + int i, ret; + + info = &__info; + + INIT_LIST_HEAD(&info->ack_list); + INIT_LIST_HEAD(&info->finished_list); + INIT_LIST_HEAD(&info->pending_list); + + pthread_cond_init(&info->finished_cond, NULL); + pthread_cond_init(&info->pending_cond, NULL); + + pthread_mutex_init(&info->finished_lock, NULL); + pthread_mutex_init(&info->pending_lock, NULL); + pthread_mutex_init(&info->startup_lock, NULL); + + ret = pipe(info->command_fd); + if (ret) { + eprintf("failed to create command pipe, %m\n"); + goto destroy_cond_mutex; + } + + ret = pipe(info->done_fd); + if (ret) { + eprintf("failed to done command pipe, %m\n"); + goto close_command_fd; + } + + ret = tgt_event_add(info->done_fd[0], EPOLLIN, atask_done_fn, info); + if (ret) { + eprintf("failed to add epoll event\n"); + goto close_done_fd; + } + + ret = pthread_create(&info->ack_thread, NULL, atask_ack_fn, info); + if (ret) { + eprintf("failed to create an ack thread, %s\n", strerror(ret)); + goto event_del; + } + + pthread_mutex_lock(&info->startup_lock); + for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++) { + ret = pthread_create(&info->worker_thread[i], NULL, + bs_thread_worker_fn, info); + + if (ret) { + eprintf("failed to create a worker thread, %d %s\n", + i, strerror(ret)); + if (ret) + goto destroy_threads; + } + } + pthread_mutex_unlock(&info->startup_lock); +rewrite: + ret = write(info->command_fd[1], &ret, sizeof(ret)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + } + + return 0; +destroy_threads: + info->stop = 1; + write(info->command_fd[1], &ret, sizeof(ret)); + pthread_join(info->ack_thread, NULL); + + eprintf("stopped the ack thread\n"); + + pthread_mutex_unlock(&info->startup_lock); + for (; i > 0; i--) { + pthread_join(info->worker_thread[i - 1], NULL); + eprintf("stopped the worker thread %d\n", i - 1); + } +event_del: + tgt_event_del(info->done_fd[0]); +close_done_fd: + close(info->done_fd[0]); + close(info->done_fd[1]); +close_command_fd: + close(info->command_fd[0]); + close(info->command_fd[1]); +destroy_cond_mutex: + pthread_cond_destroy(&info->finished_cond); + pthread_cond_destroy(&info->pending_cond); + pthread_mutex_destroy(&info->finished_lock); + pthread_mutex_destroy(&info->pending_lock); + pthread_mutex_destroy(&info->startup_lock); + + return 1; +} + +void exit_async(void) +{ + int i; + + pthread_cancel(info->ack_thread); + pthread_join(info->ack_thread, NULL); + + info->stop = 1; + pthread_cond_broadcast(&info->pending_cond); + + for (i = 0; info->worker_thread[i] && + i < ARRAY_SIZE(info->worker_thread); i++) + pthread_join(info->worker_thread[i], NULL); + + pthread_cond_destroy(&info->finished_cond); + pthread_cond_destroy(&info->pending_cond); + pthread_mutex_destroy(&info->finished_lock); + pthread_mutex_destroy(&info->pending_lock); + pthread_mutex_destroy(&info->startup_lock); + + tgt_event_del(info->done_fd[0]); + + close(info->done_fd[0]); + close(info->done_fd[1]); + + close(info->command_fd[0]); + close(info->command_fd[1]); + + info->stop = 0; +} + +void async_submit(struct atask *task) +{ + pthread_mutex_lock(&info->pending_lock); + + list_add(&task->at_list, &info->pending_list); + + pthread_mutex_unlock(&info->pending_lock); + + pthread_cond_signal(&info->pending_cond); +} diff --git a/usr/async.h b/usr/async.h new file mode 100644 index 0000000..0a0960d --- /dev/null +++ b/usr/async.h @@ -0,0 +1,18 @@ +#ifndef __ASYNC_H__ +#define __ASYNC_H__ + +struct atask; + +typedef void (async_task_func_t) (struct atask *); + +struct atask { + struct list_head at_list; + async_task_func_t *fn; + async_task_func_t *done; +}; + +int init_async(void); +void exit_async(void); +void async_submit(struct atask *task); + +#endif diff --git a/usr/bs.c b/usr/bs.c index cd19b86..3876188 100644 --- a/usr/bs.c +++ b/usr/bs.c @@ -30,6 +30,7 @@ #include "tgtadm_error.h" #include "util.h" #include "bs_thread.h" +#include "async.h" static LIST_HEAD(bst_list); @@ -184,145 +185,165 @@ static void *bs_thread_worker_fn(void *arg) int bs_thread_open(struct bs_thread_info *info, request_func_t *rfn, int nr_threads) { - int i, ret; +/* int i, ret; */ info->request_fn = rfn; - INIT_LIST_HEAD(&info->ack_list); - INIT_LIST_HEAD(&info->finished_list); - INIT_LIST_HEAD(&info->pending_list); +/* INIT_LIST_HEAD(&info->ack_list); */ +/* INIT_LIST_HEAD(&info->finished_list); */ +/* INIT_LIST_HEAD(&info->pending_list); */ + +/* pthread_cond_init(&info->finished_cond, NULL); */ +/* pthread_cond_init(&info->pending_cond, NULL); */ + +/* pthread_mutex_init(&info->finished_lock, NULL); */ +/* pthread_mutex_init(&info->pending_lock, NULL); */ +/* pthread_mutex_init(&info->startup_lock, NULL); */ + +/* ret = pipe(info->command_fd); */ +/* if (ret) { */ +/* eprintf("failed to create command pipe, %m\n"); */ +/* goto destroy_cond_mutex; */ +/* } */ + +/* ret = pipe(info->done_fd); */ +/* if (ret) { */ +/* eprintf("failed to done command pipe, %m\n"); */ +/* goto close_command_fd; */ +/* } */ + +/* ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_thread_request_done, info); */ +/* if (ret) { */ +/* eprintf("failed to add epoll event\n"); */ +/* goto close_done_fd; */ +/* } */ + +/* ret = pthread_create(&info->ack_thread, NULL, bs_thread_ack_fn, info); */ +/* if (ret) { */ +/* eprintf("failed to create an ack thread, %s\n", strerror(ret)); */ +/* goto event_del; */ +/* } */ + +/* if (nr_threads > ARRAY_SIZE(info->worker_thread)) { */ +/* eprintf("too many threads %d\n", nr_threads); */ +/* nr_threads = ARRAY_SIZE(info->worker_thread); */ +/* } */ + +/* pthread_mutex_lock(&info->startup_lock); */ +/* for (i = 0; i < nr_threads; i++) { */ +/* ret = pthread_create(&info->worker_thread[i], NULL, */ +/* bs_thread_worker_fn, info); */ + +/* if (ret) { */ +/* eprintf("failed to create a worker thread, %d %s\n", */ +/* i, strerror(ret)); */ +/* if (ret) */ +/* goto destroy_threads; */ +/* } */ +/* } */ +/* pthread_mutex_unlock(&info->startup_lock); */ +/* rewrite: */ +/* ret = write(info->command_fd[1], &ret, sizeof(ret)); */ +/* if (ret < 0) { */ +/* eprintf("can't write done, %m\n"); */ +/* if (errno == EAGAIN || errno == EINTR) */ +/* goto rewrite; */ +/* } */ - pthread_cond_init(&info->finished_cond, NULL); - pthread_cond_init(&info->pending_cond, NULL); + return 0; +/* destroy_threads: */ +/* info->stop = 1; */ +/* write(info->command_fd[1], &ret, sizeof(ret)); */ +/* pthread_join(info->ack_thread, NULL); */ + +/* eprintf("stopped the ack thread\n"); */ + +/* pthread_mutex_unlock(&info->startup_lock); */ +/* for (; i > 0; i--) { */ +/* pthread_join(info->worker_thread[i - 1], NULL); */ +/* eprintf("stopped the worker thread %d\n", i - 1); */ +/* } */ +/* event_del: */ +/* tgt_event_del(info->done_fd[0]); */ +/* close_done_fd: */ +/* close(info->done_fd[0]); */ +/* close(info->done_fd[1]); */ +/* close_command_fd: */ +/* close(info->command_fd[0]); */ +/* close(info->command_fd[1]); */ +/* destroy_cond_mutex: */ +/* pthread_cond_destroy(&info->finished_cond); */ +/* pthread_cond_destroy(&info->pending_cond); */ +/* pthread_mutex_destroy(&info->finished_lock); */ +/* pthread_mutex_destroy(&info->pending_lock); */ +/* pthread_mutex_destroy(&info->startup_lock); */ + +/* return TGTADM_NOMEM; */ +} - pthread_mutex_init(&info->finished_lock, NULL); - pthread_mutex_init(&info->pending_lock, NULL); - pthread_mutex_init(&info->startup_lock, NULL); +void bs_thread_close(struct bs_thread_info *info) +{ +/* int i; */ - ret = pipe(info->command_fd); - if (ret) { - eprintf("failed to create command pipe, %m\n"); - goto destroy_cond_mutex; - } +/* pthread_cancel(info->ack_thread); */ +/* pthread_join(info->ack_thread, NULL); */ - ret = pipe(info->done_fd); - if (ret) { - eprintf("failed to done command pipe, %m\n"); - goto close_command_fd; - } +/* info->stop = 1; */ +/* pthread_cond_broadcast(&info->pending_cond); */ - ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_thread_request_done, info); - if (ret) { - eprintf("failed to add epoll event\n"); - goto close_done_fd; - } +/* for (i = 0; info->worker_thread[i] && */ +/* i < ARRAY_SIZE(info->worker_thread); i++) */ +/* pthread_join(info->worker_thread[i], NULL); */ - ret = pthread_create(&info->ack_thread, NULL, bs_thread_ack_fn, info); - if (ret) { - eprintf("failed to create an ack thread, %s\n", strerror(ret)); - goto event_del; - } +/* pthread_cond_destroy(&info->finished_cond); */ +/* pthread_cond_destroy(&info->pending_cond); */ +/* pthread_mutex_destroy(&info->finished_lock); */ +/* pthread_mutex_destroy(&info->pending_lock); */ +/* pthread_mutex_destroy(&info->startup_lock); */ - if (nr_threads > ARRAY_SIZE(info->worker_thread)) { - eprintf("too many threads %d\n", nr_threads); - nr_threads = ARRAY_SIZE(info->worker_thread); - } +/* tgt_event_del(info->done_fd[0]); */ - pthread_mutex_lock(&info->startup_lock); - for (i = 0; i < nr_threads; i++) { - ret = pthread_create(&info->worker_thread[i], NULL, - bs_thread_worker_fn, info); - - if (ret) { - eprintf("failed to create a worker thread, %d %s\n", - i, strerror(ret)); - if (ret) - goto destroy_threads; - } - } - pthread_mutex_unlock(&info->startup_lock); -rewrite: - ret = write(info->command_fd[1], &ret, sizeof(ret)); - if (ret < 0) { - eprintf("can't write done, %m\n"); - if (errno == EAGAIN || errno == EINTR) - goto rewrite; - } +/* close(info->done_fd[0]); */ +/* close(info->done_fd[1]); */ - return 0; -destroy_threads: - info->stop = 1; - write(info->command_fd[1], &ret, sizeof(ret)); - pthread_join(info->ack_thread, NULL); - - eprintf("stopped the ack thread\n"); +/* close(info->command_fd[0]); */ +/* close(info->command_fd[1]); */ - pthread_mutex_unlock(&info->startup_lock); - for (; i > 0; i--) { - pthread_join(info->worker_thread[i - 1], NULL); - eprintf("stopped the worker thread %d\n", i - 1); - } -event_del: - tgt_event_del(info->done_fd[0]); -close_done_fd: - close(info->done_fd[0]); - close(info->done_fd[1]); -close_command_fd: - close(info->command_fd[0]); - close(info->command_fd[1]); -destroy_cond_mutex: - pthread_cond_destroy(&info->finished_cond); - pthread_cond_destroy(&info->pending_cond); - pthread_mutex_destroy(&info->finished_lock); - pthread_mutex_destroy(&info->pending_lock); - pthread_mutex_destroy(&info->startup_lock); - - return TGTADM_NOMEM; +/* info->stop = 0; */ } -void bs_thread_close(struct bs_thread_info *info) +static void bs_fn(struct atask *task) { - int i; - - pthread_cancel(info->ack_thread); - pthread_join(info->ack_thread, NULL); - - info->stop = 1; - pthread_cond_broadcast(&info->pending_cond); - - for (i = 0; info->worker_thread[i] && - i < ARRAY_SIZE(info->worker_thread); i++) - pthread_join(info->worker_thread[i], NULL); - - pthread_cond_destroy(&info->finished_cond); - pthread_cond_destroy(&info->pending_cond); - pthread_mutex_destroy(&info->finished_lock); - pthread_mutex_destroy(&info->pending_lock); - pthread_mutex_destroy(&info->startup_lock); - - tgt_event_del(info->done_fd[0]); + struct scsi_cmd *cmd = container_of(task, struct scsi_cmd, at); + struct scsi_lu *lu = cmd->dev; + struct bs_thread_info *info = BS_THREAD_I(lu); - close(info->done_fd[0]); - close(info->done_fd[1]); + info->request_fn(cmd); +} - close(info->command_fd[0]); - close(info->command_fd[1]); +static void bs_done(struct atask *task) +{ + struct scsi_cmd *cmd = container_of(task, struct scsi_cmd, at); - info->stop = 0; + cmd->scsi_cmd_done(cmd, scsi_get_result(cmd)); } int bs_thread_cmd_submit(struct scsi_cmd *cmd) { - struct scsi_lu *lu = cmd->dev; - struct bs_thread_info *info = BS_THREAD_I(lu); + struct atask *task = &cmd->at; + + task->fn = bs_fn; + task->done = bs_done; + + async_submit(task); - pthread_mutex_lock(&info->pending_lock); +/* pthread_mutex_lock(&info->pending_lock); */ - list_add(&cmd->bs_list, &info->pending_list); +/* list_add(&cmd->bs_list, &info->pending_list); */ - pthread_mutex_unlock(&info->pending_lock); +/* pthread_mutex_unlock(&info->pending_lock); */ - pthread_cond_signal(&info->pending_cond); +/* pthread_cond_signal(&info->pending_cond); */ set_cmd_async(cmd); diff --git a/usr/scsi_cmnd.h b/usr/scsi_cmnd.h index 011f3e6..e496fde 100644 --- a/usr/scsi_cmnd.h +++ b/usr/scsi_cmnd.h @@ -1,3 +1,5 @@ +#include "async.h" + struct target; struct mgmt_req; @@ -51,6 +53,8 @@ struct scsi_cmd { struct it_nexus *it_nexus; struct it_nexus_lu_info *itn_lu_info; + + struct atask at; }; #define scsi_cmnd_accessor(field, type) \ diff --git a/usr/tgtd.c b/usr/tgtd.c index 8569d41..e57373d 100644 --- a/usr/tgtd.c +++ b/usr/tgtd.c @@ -37,6 +37,7 @@ #include "tgtd.h" #include "driver.h" #include "work.h" +#include "async.h" #include "util.h" unsigned long pagesize, pageshift, pagemask; @@ -388,6 +389,10 @@ int main(int argc, char **argv) } } + err = init_async(); + if (err) + exit(1); + event_loop(); lld_exit(); @@ -396,5 +401,7 @@ int main(int argc, char **argv) log_close(); + exit_async(); + return 0; } -- 1.5.6.5 -- To unsubscribe from this list: send the line "unsubscribe stgt" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html