[Patch 2/3] tabled: use argument of a thread

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



We replace a comment with code to better show the intent. Once we have
several threads, we can plug TLS easier into this.

Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx>

---
 server/replica.c |   69 ++++++++++++++++++++++++++-------------------
 1 file changed, 41 insertions(+), 28 deletions(-)

Not sure if we actually want this at this stage, but I coded it up
yesterday after we talked about argument passing and TLS, to see just
how many rep_xxx functions need extra arguments. It was not too bad.

commit 32c8b8072c901e549ecbd8f1a29581b37f6cec16
Author: Master <zaitcev@xxxxxxxxxxxxxxxxxx>
Date:   Tue Dec 15 21:20:22 2009 -0700

    Pass arguments to a thread by official means.

diff --git a/server/replica.c b/server/replica.c
index 067accb..83b559d 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -27,11 +27,16 @@
 #include <elist.h>
 #include "tabled.h"
 
+struct rep_arg {
+	struct event_base *evbase;
+};
+
 /*
  * Replication Job
  */
 struct rep_job {
 	struct list_head jlink;
+	struct rep_arg *arg;
 
 	uint64_t oid;
 	uint64_t size;		/* all of the object */
@@ -57,11 +62,17 @@ static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
 static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
 static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
 
-static struct event_base *evbase;
+/*
+ * These should actually be thread-local, but we only have one thread.
+ */
 static struct event kscan_timer;	/* db4 key rescan timer */
+static time_t kscan_last;
+
+/*
+ * These are module-scope things: global locks and flags, thread list, etc.
+ */
 static bool kscan_enabled = false;
 static GThread *scan_thread;
-static time_t kscan_last;
 
 static void job_dispatch(void);
 
@@ -253,7 +264,7 @@ static void job_dispatch()
 	if (!job->buf)
 		goto err_malloc;
 
-	rc = stor_open(&job->in_ce, job->src, evbase);
+	rc = stor_open(&job->in_ce, job->src, job->arg->evbase);
 	if (rc) {
 		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
 		       job->src->id, rc);
@@ -261,7 +272,7 @@ static void job_dispatch()
 	}
 	job->in_ce.cli = job;
 
-	rc = stor_open(&job->out_ce, job->dst, evbase);
+	rc = stor_open(&job->out_ce, job->dst, job->arg->evbase);
 	if (rc) {
 		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
 		       job->dst->id, rc);
@@ -366,7 +377,8 @@ static struct rep_job *job_find_by_oid(uint64_t oid)
 }
 
 /* start replicating the key somewhere */
-static void rep_job_start(size_t klen, struct db_obj_key *key,
+static void rep_job_start(struct rep_arg *arg,
+			  size_t klen, struct db_obj_key *key,
 			  uint64_t oid, uint64_t objsize,
 			  int nnum, struct storage_node *nvec[])
 {
@@ -386,18 +398,15 @@ static void rep_job_start(size_t klen, struct db_obj_key *key,
 	job = job_alloc(klen, key);
 	if (!job)
 		goto err_alloc;
+	job->arg = arg;
 	job->oid = oid;
 	job->size = objsize;
 	job->src = job_select_src(nnum, nvec);
-	if (!job->src) {
-		/* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+	if (!job->src)
 		goto err_src;
-	}
 	job->dst = job_select_dst(nnum, nvec);
-	if (!job->dst) {
-		/* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+	if (!job->dst)
 		goto err_dst;
-	}
 	if (job->src->id == job->dst->id) {
 		/* Is this bad enough to invoke exit(1) right here? */
 		applog(LOG_ERR, "Internal error, copy from/to nid %u",
@@ -540,7 +549,8 @@ static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
 }
 
 /* meat of scan - check if replication is need on the key */
-static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+static void rep_scan_verify(struct rep_arg *arg,
+			    struct cursor *cp, struct db_obj_ent *obj)
 {
 	char bucket_name[65];
 	char object_name[1025];
@@ -595,9 +605,8 @@ static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
 	       allcnt, redcnt);
 
 	if (redcnt < MAXWAY) {		/* maybe have MINWAY too? */
-		rep_job_start(cp->klen, cp->key, oid,
-			      GUINT64_FROM_LE(obj->size),
-			      redcnt, redvec);
+		rep_job_start(arg, cp->klen, cp->key, oid,
+			      GUINT64_FROM_LE(obj->size), redcnt, redvec);
 	}
 
 	for (i = 0; i < redcnt; i++)
@@ -723,7 +732,7 @@ static void rep_retire(void)
 	}
 }
 
-static void rep_scan(void)
+static void rep_scan(struct rep_arg *arg)
 {
 	struct cursor cur;
 	struct db_obj_ent *obj;
@@ -767,7 +776,7 @@ static void rep_scan(void)
 		}
 
 		if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
-			rep_scan_verify(&cur, obj);
+			rep_scan_verify(arg, &cur, obj);
 
 		free(obj);
 		kcnt++;
@@ -792,17 +801,20 @@ static void add_kscan_timer(void)
 
 static void tdb_keyscan(int fd, short events, void *userdata)
 {
+	struct rep_arg *arg = userdata;
+
 	if (kscan_enabled)
-		rep_scan();
+		rep_scan(arg);
 	add_kscan_timer();
 }
 
 static gpointer rep_thread_func(gpointer data)
 {
+	struct rep_arg *arg = data;
 	int rc;
 
-	evtimer_set(&kscan_timer, tdb_keyscan, NULL);
-	event_base_set(evbase, &kscan_timer);
+	evtimer_set(&kscan_timer, tdb_keyscan, arg);
+	event_base_set(arg->evbase, &kscan_timer);
 
 	/*
 	 * We must add an event now, or else event_base_dispatch will
@@ -811,7 +823,7 @@ static gpointer rep_thread_func(gpointer data)
 	add_kscan_timer();
 
 	for (;;) {
-		rc = event_base_dispatch(evbase);
+		rc = event_base_dispatch(arg->evbase);
 		applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc);
 		sleep(300);	/* Should not happen, so maybe exit(1)? */
 	}
@@ -821,15 +833,16 @@ static gpointer rep_thread_func(gpointer data)
 void rep_init(struct event_base *ev_base)
 {
 	GError *error;
+	struct rep_arg *arg;
 
-	/* We could pass this event_base as an arg to our replica thread
-	 * via g_thread_create(), but that seems pointless given that
-	 * we are storing the event base as a module-local static
-	 * anyway.
-	 */
-	evbase = ev_base;
+	arg = malloc(sizeof(struct rep_arg));
+	if (!arg) {
+		applog(LOG_ERR, "No core");
+		exit(1);
+	}
+	arg->evbase = ev_base;
 
-	scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error);
+	scan_thread = g_thread_create(rep_thread_func, arg, FALSE, &error);
 	if (scan_thread == NULL) {
 		applog(LOG_ERR, "Failed to start replication thread: %s",
 		       error->message);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux