We replace a comment with code to better show the intent. Once we have several threads, we can plug TLS easier into this. Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx> --- server/replica.c | 69 ++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 28 deletions(-) Not sure if we actually want this at this stage, but I coded it up yesterday after we talked about argument passing and TLS, to see just how many rep_xxx functions need extra arguments. It was not too bad. commit 32c8b8072c901e549ecbd8f1a29581b37f6cec16 Author: Master <zaitcev@xxxxxxxxxxxxxxxxxx> Date: Tue Dec 15 21:20:22 2009 -0700 Pass arguments to a thread by official means. diff --git a/server/replica.c b/server/replica.c index 067accb..83b559d 100644 --- a/server/replica.c +++ b/server/replica.c @@ -27,11 +27,16 @@ #include <elist.h> #include "tabled.h" +struct rep_arg { + struct event_base *evbase; +}; + /* * Replication Job */ struct rep_job { struct list_head jlink; + struct rep_arg *arg; uint64_t oid; uint64_t size; /* all of the object */ @@ -57,11 +62,17 @@ static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) }; static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) }; static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) }; -static struct event_base *evbase; +/* + * These should actually be thread-local, but we only have one thread. + */ static struct event kscan_timer; /* db4 key rescan timer */ +static time_t kscan_last; + +/* + * These are module-scope things: global locks and flags, thread list, etc. + */ static bool kscan_enabled = false; static GThread *scan_thread; -static time_t kscan_last; static void job_dispatch(void); @@ -253,7 +264,7 @@ static void job_dispatch() if (!job->buf) goto err_malloc; - rc = stor_open(&job->in_ce, job->src, evbase); + rc = stor_open(&job->in_ce, job->src, job->arg->evbase); if (rc) { applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)", job->src->id, rc); @@ -261,7 +272,7 @@ static void job_dispatch() } job->in_ce.cli = job; - rc = stor_open(&job->out_ce, job->dst, evbase); + rc = stor_open(&job->out_ce, job->dst, job->arg->evbase); if (rc) { applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)", job->dst->id, rc); @@ -366,7 +377,8 @@ static struct rep_job *job_find_by_oid(uint64_t oid) } /* start replicating the key somewhere */ -static void rep_job_start(size_t klen, struct db_obj_key *key, +static void rep_job_start(struct rep_arg *arg, + size_t klen, struct db_obj_key *key, uint64_t oid, uint64_t objsize, int nnum, struct storage_node *nvec[]) { @@ -386,18 +398,15 @@ static void rep_job_start(size_t klen, struct db_obj_key *key, job = job_alloc(klen, key); if (!job) goto err_alloc; + job->arg = arg; job->oid = oid; job->size = objsize; job->src = job_select_src(nnum, nvec); - if (!job->src) { - /* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid); + if (!job->src) goto err_src; - } job->dst = job_select_dst(nnum, nvec); - if (!job->dst) { - /* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid); + if (!job->dst) goto err_dst; - } if (job->src->id == job->dst->id) { /* Is this bad enough to invoke exit(1) right here? */ applog(LOG_ERR, "Internal error, copy from/to nid %u", @@ -540,7 +549,8 @@ static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj) } /* meat of scan - check if replication is need on the key */ -static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj) +static void rep_scan_verify(struct rep_arg *arg, + struct cursor *cp, struct db_obj_ent *obj) { char bucket_name[65]; char object_name[1025]; @@ -595,9 +605,8 @@ static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj) allcnt, redcnt); if (redcnt < MAXWAY) { /* maybe have MINWAY too? */ - rep_job_start(cp->klen, cp->key, oid, - GUINT64_FROM_LE(obj->size), - redcnt, redvec); + rep_job_start(arg, cp->klen, cp->key, oid, + GUINT64_FROM_LE(obj->size), redcnt, redvec); } for (i = 0; i < redcnt; i++) @@ -723,7 +732,7 @@ static void rep_retire(void) } } -static void rep_scan(void) +static void rep_scan(struct rep_arg *arg) { struct cursor cur; struct db_obj_ent *obj; @@ -767,7 +776,7 @@ static void rep_scan(void) } if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE) - rep_scan_verify(&cur, obj); + rep_scan_verify(arg, &cur, obj); free(obj); kcnt++; @@ -792,17 +801,20 @@ static void add_kscan_timer(void) static void tdb_keyscan(int fd, short events, void *userdata) { + struct rep_arg *arg = userdata; + if (kscan_enabled) - rep_scan(); + rep_scan(arg); add_kscan_timer(); } static gpointer rep_thread_func(gpointer data) { + struct rep_arg *arg = data; int rc; - evtimer_set(&kscan_timer, tdb_keyscan, NULL); - event_base_set(evbase, &kscan_timer); + evtimer_set(&kscan_timer, tdb_keyscan, arg); + event_base_set(arg->evbase, &kscan_timer); /* * We must add an event now, or else event_base_dispatch will @@ -811,7 +823,7 @@ static gpointer rep_thread_func(gpointer data) add_kscan_timer(); for (;;) { - rc = event_base_dispatch(evbase); + rc = event_base_dispatch(arg->evbase); applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc); sleep(300); /* Should not happen, so maybe exit(1)? */ } @@ -821,15 +833,16 @@ static gpointer rep_thread_func(gpointer data) void rep_init(struct event_base *ev_base) { GError *error; + struct rep_arg *arg; - /* We could pass this event_base as an arg to our replica thread - * via g_thread_create(), but that seems pointless given that - * we are storing the event base as a module-local static - * anyway. - */ - evbase = ev_base; + arg = malloc(sizeof(struct rep_arg)); + if (!arg) { + applog(LOG_ERR, "No core"); + exit(1); + } + arg->evbase = ev_base; - scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error); + scan_thread = g_thread_create(rep_thread_func, arg, FALSE, &error); if (scan_thread == NULL) { applog(LOG_ERR, "Failed to start replication thread: %s", error->message); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html