[Patch] tabled: write objects to multiply chunks

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This is a small, but self-contained step toward the data redundancy.
The patch adds ability to write data to several Chunk servers.
Along the way, it fixes things up so we use asynchronous writes.

N.B. This patch changes the format of tabled's database in order to
reflect that object IDs are global in the cell. This is done so the
consistency checking and redundancy monitoring and recovery could
be run without accessing the tabled's database. In orther words,
it's a flag day, the database must be re-created.

Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx>

diff --git a/include/tdb.h b/include/tdb.h
index 51843ba..e12d00e 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -30,8 +30,8 @@
 #define DB_OBJ_INLINE        0x1
 
 struct db_obj_addr {
-	uint32_t	nid;			/* 0 == absent */
-	uint64_t	oid;
+	uint64_t oid;
+	uint32_t nidv[MAXWAY];			/* 0 == absent */
 };
 
 struct db_obj_key {
@@ -41,16 +41,16 @@ struct db_obj_key {
 
 struct db_obj_ent {
 	uint32_t	flags;
+	uint32_t	n_str;			/* # attached string pairs */
 	uint64_t	size;
 	uint64_t	mtime;		/* UNIX time, but in microseconds */
 	union {
-		struct db_obj_addr avec[MAXWAY];
+		struct db_obj_addr a;
 		unsigned char indata[INSIZE];
 	} d;
 	char		bucket[64];		/* bucket */
 	char		owner[64];		/* object owner */
 	char		md5[40];		/* data checksum */
-	uint32_t	n_str;			/* # attached string pairs */
 
 	/* array of uint16_t
 	   representing string lengths of HTTP headers.
diff --git a/server/bucket.c b/server/bucket.c
index 60e46bc..976d716 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -997,7 +997,7 @@ static bool bucket_list_keys(struct client *cli, const char *user,
 		memset(&v, 0, sizeof(v));
 		strcpy(v.md5, obj->md5);
 		if (!(GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE))
-			memcpy(&v.addr, &obj->d.avec[0], sizeof(v.addr));
+			memcpy(&v.addr, &obj->d.a, sizeof(v.addr));
 		v.mtime = obj->mtime;
 		v.size = obj->size;
 		free(obj);
diff --git a/server/cldu.c b/server/cldu.c
index 706faeb..ea2266f 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -198,7 +198,6 @@ static void cldu_event(int fd, short events, void *userdata)
 #if 0
 		if (rc == -ECONNREFUSED) {	/* ICMP tells us */
 			int newactive;
-			/* P3 */ applog(LOG_INFO, "Restarting session");
 			// evtimer_del(&sp->tm);
 			cldc_kill_sess(sp->lib->sess);
 			sp->lib->sess = NULL;
diff --git a/server/object.c b/server/object.c
index f888d7c..0b6e4fb 100644
--- a/server/object.c
+++ b/server/object.c
@@ -132,17 +132,17 @@ static int object_unlink(struct db_obj_ent *obj)
 	if (GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
 		return 0;
 	/*
-	 * FIXME Iterate over all of avec[] when redundancy is added;
+	 * FIXME Iterate over all of NIDs when redundancy is added;
 	 * use nid to locate node in all_stor.
 	 */
-	addr = &obj->d.avec[0];
+	addr = &obj->d.a;
 
 	if (list_empty(&tabled_srv.all_stor))
 		return -EIO;
 	stnode = list_entry(tabled_srv.all_stor.next,
 			    struct storage_node, all_link);
 
-	return stor_obj_del(stnode, addr->oid);
+	return stor_obj_del(stnode, GUINT64_FROM_LE(addr->oid));
 }
 
 bool object_del(struct client *cli, const char *user,
@@ -233,13 +233,25 @@ err_out:
 	return cli_err(cli, err);
 }
 
+static void cli_ochunk_free(struct list_head *olist)
+{
+	struct open_chunk *ochunk;
+
+	while (!list_empty(olist)) {
+		ochunk = list_entry(olist->next, struct open_chunk, link);
+		list_del(&ochunk->link);
+		stor_abort(ochunk);
+		stor_close(ochunk);
+		free(ochunk);
+	}
+}
+
 void cli_out_end(struct client *cli)
 {
 	if (!cli)
 		return;
 
-	stor_abort(&cli->out_ce);
-	stor_close(&cli->out_ce);
+	cli_ochunk_free(&cli->out_ch);
 
 	free(cli->out_bucket);
 	free(cli->out_key);
@@ -248,6 +260,9 @@ void cli_out_end(struct client *cli)
 	cli->out_user =
 	cli->out_bucket =
 	cli->out_key = NULL;
+
+	free(cli->out_buf);
+	cli->out_buf = NULL;
 }
 
 static const char *copy_headers[] = {
@@ -293,6 +308,8 @@ static bool object_put_end(struct client *cli)
 	char md5[33], timestr[64];
 	char *type, *hdr;
 	int rc, i;
+	struct list_head *pos, *tmp;
+	int nok;
 	enum errcode err = InternalError;
 	struct db_obj_ent *obj;
 	struct db_obj_key *obj_key;
@@ -314,22 +331,32 @@ static bool object_put_end(struct client *cli)
 	else
 		cli->state = evt_dispose;
 
-	if (!stor_put_end(&cli->out_ce)) {
-		applog(LOG_ERR, "Chunk sync failed");
-		goto err_out;
-	}
-
-	if (debugging) {
-		/* FIXME how do we test for inline objects here? */
-		if (!stor_obj_test(&cli->out_ce, cli->out_objid))
-			applog(LOG_ERR, "Stat (%llX) failed",
-			       (unsigned long long) cli->out_objid);
-		else
-			applog(LOG_DEBUG, "STORED %llX, size -",
-			       (unsigned long long) cli->out_objid);
+	nok = 0;
+	list_for_each_safe(pos, tmp, &cli->out_ch) {
+		struct open_chunk *ochunk;
+
+		ochunk = list_entry(pos, struct open_chunk, link);
+		if (!stor_put_end(ochunk)) {
+			applog(LOG_ERR, "Chunk sync failed");
+			/* stor_abort(ochunk); */
+		} else {
+			if (debugging) {
+				/* FIXME how do we test for inline objects here? */
+				if (!stor_obj_test(ochunk, cli->out_objid))
+					applog(LOG_ERR, "Stat (%llX) failed",
+					       (unsigned long long) cli->out_objid);
+				else
+					applog(LOG_DEBUG, "STORED %llX, size -",
+					       (unsigned long long) cli->out_objid);
+			}
+			nok++;
+		}
+		stor_close(ochunk);
+		list_del(&ochunk->link);
+		free(ochunk);
 	}
-
-	stor_close(&cli->out_ce);
+	if (!nok)
+		goto err_out;
 
 	MD5_Final(md, &cli->out_md5);
 
@@ -408,8 +435,8 @@ static bool object_put_end(struct client *cli)
 	/* encode object header */
 	obj->size = cli->out_size;
 	obj->mtime = (uint64_t)time(NULL) * 1000000;
-	obj->d.avec[0].nid = 1;		/* FIXME */
-	obj->d.avec[0].oid = cli->out_objid;
+	obj->d.a.nidv[0] = GUINT32_TO_LE(1);		/* FIXME */
+	obj->d.a.oid = GUINT64_TO_LE(cli->out_objid);
 	strncpy(obj->bucket, cli->out_bucket, sizeof(obj->bucket));
 	strncpy(obj->owner, cli->out_user, sizeof(obj->owner));
 	strncpy(obj->md5, md5, sizeof(obj->md5));
@@ -455,7 +482,7 @@ static bool object_put_end(struct client *cli)
 	 */
 	if (delobj && object_unlink(&oldobj) < 0) {
 		applog(LOG_ERR, "object data(%llX) orphaned",
-		       (unsigned long long) oldobj.d.avec[0].oid);
+		       (unsigned long long) GUINT64_FROM_LE(oldobj.d.a.oid));
 	}
 
 	free(cli->out_bucket);
@@ -499,16 +526,100 @@ err_out:
 	return cli_err(cli, err);
 }
 
+static void object_put_event(struct open_chunk *ochunk)
+{
+	struct client *cli = ochunk->cli;
+	ssize_t bytes;
+
+	if (ochunk->wcnt == 0) {
+		if (debugging)
+			applog(LOG_DEBUG, "spurious write notify");
+		return;
+	}
+
+	bytes = stor_put_buf(ochunk, cli->out_buf, cli->out_bcnt);
+	if (bytes < 0) {
+		if (debugging)
+			applog(LOG_DEBUG, "write(2) error: %s",
+			       strerror(-bytes));
+		if (!cli->out_nput) {
+			applog(LOG_INFO, "out_nput imbalance on error");
+		} else {
+			--cli->out_nput;
+		}
+		if (!cli->out_nput) {
+			if (!cli->ev_active) {
+				event_add(&cli->ev, NULL);
+				cli->ev_active = true;
+			}
+		}
+		list_del(&ochunk->link);
+		stor_abort(ochunk);
+		stor_close(ochunk);
+		free(ochunk);
+		return;
+	}
+	ochunk->wcnt -= bytes;
+
+	if (ochunk->wcnt == 0) {
+		if (!cli->out_nput) {
+			applog(LOG_INFO, "out_nput imbalance");
+		} else {
+			--cli->out_nput;
+		}
+
+		if (!cli->out_nput) {
+			if (!cli->ev_active) {
+				event_add(&cli->ev, NULL);
+				cli->ev_active = true;
+			}
+		}
+	}
+}
+
+static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
+			  char *buf, size_t len)
+{
+	ssize_t bytes;
+
+	ochunk->wcnt = len;
+
+	bytes = stor_put_buf(ochunk, buf, len);
+	if (bytes < 0) {
+		if (debugging) {
+			applog(LOG_ERR, "write(2) error in HTTP data-in: %s",
+			       strerror(-bytes));
+		}
+		return -EIO;
+	}
+	ochunk->wcnt -= bytes;
+
+	if (ochunk->wcnt != 0)
+		cli->out_nput++;
+	return 0;
+}
+
 bool cli_evt_http_data_in(struct client *cli, unsigned int events)
 {
-	char buf[4096];
-	char *p = buf;
-	ssize_t avail, bytes;
+	ssize_t avail;
+	struct open_chunk *ochunk;
+	struct list_head *pos, *tmp;
+	int nok;
 
 	if (!cli->out_len)
 		return object_put_end(cli);
 
-	avail = read(cli->fd, buf, MIN(cli->out_len, sizeof(buf)));
+	if (cli->out_nput) {
+		if (cli->ev_active) {
+			event_del(&cli->ev);
+			cli->ev_active = false;
+		} else {
+			/* P3 temporary */ applog(LOG_INFO, "spurious ev");
+		}
+		return false;
+	}
+
+	avail = read(cli->fd, cli->out_buf, MIN(cli->out_len, CLI_DATA_BUF_SZ));
 	if (avail <= 0) {
 		if ((avail < 0) && (errno == EAGAIN))
 			return false;
@@ -521,27 +632,120 @@ bool cli_evt_http_data_in(struct client *cli, unsigned int events)
 			applog(LOG_ERR, "object read(2) unexpected EOF");
 		return cli_err(cli, InternalError);
 	}
+	cli->out_bcnt = avail;
+
+	MD5_Update(&cli->out_md5, cli->out_buf, avail);
+
+	nok = 0;
+	list_for_each_safe(pos, tmp, &cli->out_ch) {
+		ochunk = list_entry(pos, struct open_chunk, link);
+		if (object_put_buf(cli, ochunk, cli->out_buf, avail) < 0) {
+			list_del(&ochunk->link);
+			stor_abort(ochunk);
+			stor_close(ochunk);
+			free(ochunk);
+		} else {
+			nok++;
+		}
+	}
+	if (!nok) {
+		cli_out_end(cli);
+		/* if (debugging) */ /* P3 temporary */
+			applog(LOG_ERR, "data write-out error");
+		return cli_err(cli, InternalError);
+	}
 
-	while (avail > 0) {
-		bytes = stor_put_buf(&cli->out_ce, p, avail);
-		if (bytes < 0) {
-			cli_out_end(cli);
-			applog(LOG_ERR, "write(2) error in HTTP data-in: %s",
-				strerror(errno));
-			return cli_err(cli, InternalError);
+	if (!cli->out_nput) {
+		cli->out_len -= avail;
+		if (!cli->out_len)
+			return object_put_end(cli);
+	} else {
+		if (cli->ev_active) {
+			event_del(&cli->ev);
+			cli->ev_active = false;
 		}
+	}
 
-		MD5_Update(&cli->out_md5, p, bytes);
+	return false;
+}
+
+static struct open_chunk *open_chunk1(struct storage_node *stnode,
+				     uint64_t objid, long content_len)
+{
+	struct open_chunk *ochunk;
+	int rc;
 
-		cli->out_len -= bytes;
-		p += bytes;
-		avail -= bytes;
+	ochunk = calloc(1, sizeof(struct open_chunk));
+	if (!ochunk) {
+		applog(LOG_ERR, "OOM");
+		goto err_alloc;
 	}
 
-	if (!cli->out_len)
-		return object_put_end(cli);
+	rc = stor_open(ochunk, stnode);
+	if (rc != 0) {
+		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
+		       stnode->id, rc);
+		goto err_open;
+	}
+
+	rc = stor_put_start(ochunk, object_put_event, objid, content_len);
+	if (rc != 0) {
+		applog(LOG_WARNING, "Cannot start putting for %llX (%d)",
+		       (unsigned long long) objid, rc);
+		goto err_start;
+	}
 
-	return (avail == sizeof(buf)) ? true : false;
+	return ochunk;
+
+ err_start:
+	stor_close(ochunk);
+ err_open:
+	free(ochunk);
+ err_alloc:
+	return NULL;
+}
+
+/*
+ * Open up to MAXWAY chunks from slist, pre-start writing on all of them,
+ * and put them on the olist.
+ *
+ * This is a very limited implementation for now (FIXME).
+ *  - we do not do any kind of sensible selection, like the least full node,
+ *    just first-forward
+ *  - we ignore all redundancy issues and only return an error if no nodes
+ *    were opened
+ */
+static int open_chunks(struct list_head *olist, struct list_head *slist,
+		       struct client *cli, uint64_t objid, long content_len)
+{
+	struct storage_node *stnode;
+	struct open_chunk *ochunk;
+	int n;
+
+	n = 0;
+	list_for_each_entry(stnode, slist, all_link) {
+		if (!stnode->up)
+			continue;
+		ochunk = open_chunk1(stnode, objid, content_len);
+		if (!ochunk)
+			continue;
+		ochunk->cli = cli;
+		list_add(&ochunk->link, olist);
+		n++;
+	}
+	if (n == 0) {
+		applog(LOG_ERR, "No chunk nodes");
+		goto err;
+	}
+	return 0;
+
+err:
+	/*
+	 * cli_free does the same cleanup for us, but let's be good for KISS
+	 * and possible client reuse.
+	 */
+	cli_ochunk_free(olist);
+	return -1;
 }
 
 bool object_put_body(struct client *cli, const char *user, const char *bucket,
@@ -549,34 +753,22 @@ bool object_put_body(struct client *cli, const char *user, const char *bucket,
 {
 	long avail;
 	uint64_t objid;
-	struct storage_node *stnode;
 	int rc;
 
 	if (!user || !has_access(user, bucket, NULL, "WRITE"))
 		return cli_err(cli, AccessDenied);
 
-	objid = objid_next();
-
-	/* FIXME picking the first node until the redundancy is implemented */
-	if (list_empty(&tabled_srv.all_stor)) {
-		applog(LOG_ERR, "No chunk nodes");
+	if (!cli->out_buf && !(cli->out_buf = malloc(CLI_DATA_BUF_SZ))) {
+		applog(LOG_ERR, "OOM (%ld)", (long)CLI_DATA_BUF_SZ);
 		return cli_err(cli, InternalError);
 	}
-	stnode = list_entry(tabled_srv.all_stor.next,
-			    struct storage_node, all_link);
 
-	rc = stor_open(&cli->out_ce, stnode);
-	if (rc != 0) {
-		applog(LOG_WARNING, "Cannot open chunk (%d)", rc);
-		return cli_err(cli, InternalError);
-	}
+	objid = objid_next();
 
-	rc = stor_put_start(&cli->out_ce, objid, content_len);
-	if (rc != 0) {
-		applog(LOG_WARNING, "Cannot start putting for %llX (%d)",
-		       (unsigned long long) objid, rc);
+	rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
+			 cli, objid, content_len);
+	if (rc)
 		return cli_err(cli, InternalError);
-	}
 
 	cli->out_bucket = strdup(bucket);
 	cli->out_key = strdup(key);
@@ -592,37 +784,55 @@ bool object_put_body(struct client *cli, const char *user, const char *bucket,
 	if (expect_cont) {
 		char *cont;
 
-		/* FIXME check for err */
-		asprintf(&cont, "HTTP/%d.%d 100 Continue\r\n\r\n",
-			 cli->req.major, cli->req.minor);
+		if (asprintf(&cont, "HTTP/%d.%d 100 Continue\r\n\r\n",
+			     cli->req.major, cli->req.minor) == -1) {
+			cli_out_end(cli);
+			return cli_err(cli, InternalError);
+		}
 		cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
 		cli_write_start(cli);
 	}
 
 	avail = MIN(cli_req_avail(cli), content_len);
 	if (avail) {
-		ssize_t bytes;
-
-		while (avail > 0) {
-			bytes = stor_put_buf(&cli->out_ce, cli->req_ptr, avail);
-			if (bytes < 0) {
-				cli_out_end(cli);
-				applog(LOG_ERR, "write(2) error in object_put: %s",
-					strerror(errno));
-				return cli_err(cli, InternalError);
+		struct list_head *pos, *tmp;
+		struct open_chunk *ochunk;
+		int nok;
+
+		cli->out_bcnt = avail;
+
+		MD5_Update(&cli->out_md5, cli->req_ptr, avail);
+
+		nok = 0;
+		list_for_each_safe(pos, tmp, &cli->out_ch) {
+			ochunk = list_entry(pos, struct open_chunk, link);
+			if (object_put_buf(cli, ochunk, cli->req_ptr, avail) < 0) {
+				list_del(&ochunk->link);
+				stor_abort(ochunk);
+				stor_close(ochunk);
+				free(ochunk);
+			} else {
+				nok++;
 			}
-
-			MD5_Update(&cli->out_md5, cli->req_ptr, bytes);
-
-			cli->out_len -= bytes;
-			cli->req_ptr += bytes;
-			avail -= bytes;
+		}
+		if (!nok) {
+			cli_out_end(cli);
+			if (debugging)
+				applog(LOG_ERR, "data pig-out error");
+			return cli_err(cli, InternalError);
 		}
 	}
 
-	if (!cli->out_len)
-		return object_put_end(cli);
-
+	if (!cli->out_nput) {
+		cli->out_len -= avail;
+		if (!cli->out_len)
+			return object_put_end(cli);
+	} else {
+		if (cli->ev_active) {
+			event_del(&cli->ev);
+			cli->ev_active = false;
+		}
+	}
 	cli->state = evt_http_data_in;
 	return true;
 }
@@ -936,7 +1146,7 @@ bool object_get_body(struct client *cli, const char *user, const char *bucket,
 		goto err_out_str;
 }
 
-	cli->in_objid = obj->d.avec[0].oid;
+	cli->in_objid = GUINT64_FROM_LE(obj->d.a.oid);
 
 	if (list_empty(&tabled_srv.all_stor)) {
 		applog(LOG_ERR, "No chunk nodes");
@@ -947,15 +1157,16 @@ bool object_get_body(struct client *cli, const char *user, const char *bucket,
 
 	rc = stor_open(&cli->in_ce, stnode);
 	if (rc < 0) {
-		applog(LOG_WARNING, "Cannot open chunk (%d)", rc);
+		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
+		       stnode->id, rc);
 		goto err_out_str;
 	}
 
 	rc = stor_open_read(&cli->in_ce, object_get_event, cli->in_objid,
 			    &objsize);
 	if (rc < 0) {
-		applog(LOG_ERR, "open oid %llX failed (%d)",
-		       (unsigned long long) cli->in_objid, rc);
+		applog(LOG_ERR, "open oid %llX failed, nid %u (%d)",
+		       (unsigned long long) cli->in_objid, stnode->id, rc);
 		goto err_out_str;
 	}
 
diff --git a/server/server.c b/server/server.c
index 60d28df..f1c49be 100644
--- a/server/server.c
+++ b/server/server.c
@@ -418,8 +418,9 @@ static void cli_free(struct client *cli)
 
 	/* clean up network socket */
 	if (cli->fd >= 0) {
-		if (event_del(&cli->ev) < 0)
+		if (cli->ev_active && event_del(&cli->ev) < 0)
 			applog(LOG_WARNING, "TCP client event_del");
+		cli->ev_active = false;
 		close(cli->fd);
 	}
 
@@ -444,6 +445,7 @@ static struct client *cli_alloc(void)
 
 	cli->state = evt_read_req;
 	INIT_LIST_HEAD(&cli->write_q);
+	INIT_LIST_HEAD(&cli->out_ch);
 	cli->req_ptr = cli->req_buf;
 	memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
 
@@ -1286,6 +1288,7 @@ static void tcp_srv_event(int fd, short events, void *userdata)
 		applog(LOG_WARNING, "tcp client event_add");
 		goto err_out_fd;
 	}
+	cli->ev_active = true;
 
 	/* pretty-print incoming cxn info */
 	memset(host, 0, sizeof(host));
@@ -1393,6 +1396,7 @@ int stor_update_cb(void)
 			if (debugging)
 				applog(LOG_DEBUG, " NID %u is up", stn->id);
 			num_up++;
+			stn->up = true;
 		}
 	}
 	if (num_up < 1) {
@@ -1400,7 +1404,7 @@ int stor_update_cb(void)
 		return num_up;
 	}
 	if (debugging)
-		applog(LOG_DEBUG, "Active storage node(s): %d", stn->id);
+		applog(LOG_DEBUG, "Active storage node(s): %d", num_up);
 
 	/*
 	 * We initiate operations even if there's no redundancy in order
diff --git a/server/storage.c b/server/storage.c
index 6d830e2..398e64a 100644
--- a/server/storage.c
+++ b/server/storage.c
@@ -48,11 +48,20 @@ static void stor_read_event(int fd, short events, void *userdata)
 {
 	struct open_chunk *cep = userdata;
 
-	cep->r_armed = 0;
+	cep->r_armed = 0;		/* no EV_PERSIST */
 	if (cep->rcb)
 		(*cep->rcb)(cep);
 }
 
+static void stor_write_event(int fd, short events, void *userdata)
+{
+	struct open_chunk *cep = userdata;
+
+	cep->w_armed = 0;		/* no EV_PERSIST */
+	if (cep->wcb)
+		(*cep->wcb)(cep);
+}
+
 /*
  * Open *cep using stn, set up chunk session if needed.
  */
@@ -63,11 +72,8 @@ int stor_open(struct open_chunk *cep, struct storage_node *stn)
 	if (cep->stc)
 		return 0;
 
-	if ((rc = stor_new_stc(stn, &cep->stc)) < 0) {
-		if (debugging)
-			applog(LOG_INFO, "Failed to open Chunk (%d)\n", rc);
+	if ((rc = stor_new_stc(stn, &cep->stc)) < 0)
 		return rc;
-	}
 
 	cep->node = stn;
 	stn->nchu++;
@@ -77,7 +83,8 @@ int stor_open(struct open_chunk *cep, struct storage_node *stn)
 	return 0;
 }
 
-int stor_put_start(struct open_chunk *cep, uint64_t key, uint64_t size)
+int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *),
+		   uint64_t key, uint64_t size)
 {
 	char stckey[STOR_KEY_SLEN+1];
 
@@ -96,6 +103,9 @@ int stor_put_start(struct open_chunk *cep, uint64_t key, uint64_t size)
 	}
 	cep->wtogo = size;
 	cep->wkey = key;
+	cep->wcb = cb;
+	event_set(&cep->wevt, cep->wfd, EV_WRITE, stor_write_event, cep);
+
 	if (debugging)
 		applog(LOG_INFO, "stor put %s new for %lld",
 		       stckey, (long long) size);
@@ -157,6 +167,11 @@ void stor_close(struct open_chunk *cep)
 		cep->r_armed = 0;
 	}
 	cep->rsize = 0;
+
+	if (cep->w_armed) {
+		event_del(&cep->wevt);
+		cep->w_armed = false;
+	}
 }
 
 /*
@@ -176,7 +191,7 @@ void stor_abort(struct open_chunk *cep)
 		return;
 
 	if (debugging)
-		applog(LOG_INFO, "stor aborting\n");
+		applog(LOG_INFO, "stor aborting");
 
 	stc_free(cep->stc);
 	cep->stc = NULL;
@@ -187,7 +202,8 @@ void stor_abort(struct open_chunk *cep)
 		cep->node = NULL;
 
 		if (debugging)
-			applog(LOG_INFO, "Failed to reopen Chunk (%d)\n", rc);
+			applog(LOG_INFO, "Failed to reopen Chunk nid %u (%d)",
+			       cep->node->id, rc);
 		return;
 	}
 
@@ -202,10 +218,17 @@ void stor_abort(struct open_chunk *cep)
 		cep->r_armed = 0;
 	}
 	cep->rsize = 0;
+
+	if (cep->w_armed) {
+		event_del(&cep->wevt);
+		cep->w_armed = false;
+	}
 }
 
 ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len)
 {
+	int rc;
+
 	if (len > cep->wtogo) {
 		applog(LOG_ERR, "Put size %ld remaining %ld",
 		       (long) len, (long) cep->wtogo);
@@ -217,15 +240,24 @@ ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len)
 		cep->wtogo -= len;
 	}
 
-	if (cep->stc)
-		return stc_put_send(cep->stc, data, len);
-	return -EPIPE;
+	if (!cep->stc)
+		return -EPIPE;
+	rc = stc_put_send(cep->stc, data, len);
+	if (rc == 0 && !cep->w_armed) {
+		event_add(&cep->wevt, NULL);
+		cep->w_armed = true;
+	}
+	return rc;
 }
 
 bool stor_put_end(struct open_chunk *cep)
 {
 	if (!cep->stc)
 		return true;
+	if (cep->w_armed) {
+		event_del(&cep->wevt);
+		cep->w_armed = false;
+	}
 	return stc_put_sync(cep->stc);
 }
 
diff --git a/server/tabled.h b/server/tabled.h
index 5f74748..ff0e699 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -89,6 +89,7 @@ struct geo {
 struct storage_node {
 	struct list_head	all_link;
 	uint32_t		id;
+	bool			up;
 
 	unsigned		alen;
 	int			addr_af;
@@ -114,16 +115,22 @@ struct client_write {
 struct open_chunk {
 	struct st_client	*stc;
 	struct storage_node	*node;
+	struct list_head	link;
+	struct client		*cli;
 
 	uint64_t		wtogo;
 	uint64_t		wkey;
+	void (*wcb)(struct open_chunk *);
 	int			wfd;
+	bool			w_armed;
+	struct event		wevt;
+	size_t			wcnt;	/* in current buffer */
 
 	uint64_t		roff;
 	uint64_t		rsize;
 	void (*rcb)(struct open_chunk *);
 	int			rfd;
-	int			r_armed;
+	bool			r_armed;
 	struct event		revt;
 };
 
@@ -145,6 +152,7 @@ struct client {
 	struct sockaddr_in6	addr;		/* inet address */
 	char			addr_host[64];	/* ASCII version of inet addr */
 	int			fd;		/* socket */
+	bool			ev_active;
 	struct event		ev;
 	struct event		write_ev;
 
@@ -158,7 +166,7 @@ struct client {
 	char			*hdr_start;	/* current hdr start */
 	char			*hdr_end;	/* current hdr end (so far) */
 
-	struct open_chunk	out_ce;		/* just one for now FIXME */
+	struct list_head	out_ch;		/* open_chunk.link */
 	char			*out_bucket;
 	char			*out_key;
 	char			*out_user;
@@ -166,6 +174,9 @@ struct client {
 	long			out_len;
 	uint64_t		out_size;
 	uint64_t		out_objid;
+	char			*out_buf;
+	size_t			out_bcnt;	/* used length of out_buf */
+	int			out_nput;	/* number of users of out_buf */
 
 	struct open_chunk	in_ce;
 	unsigned char		*in_mem;
@@ -314,7 +325,9 @@ extern int stor_open_read(struct open_chunk *cep,
 			  uint64_t key, uint64_t *psz);
 extern void stor_close(struct open_chunk *cep);
 extern void stor_abort(struct open_chunk *cep);
-extern int stor_put_start(struct open_chunk *cep, uint64_t key, uint64_t size);
+extern int stor_put_start(struct open_chunk *cep,
+			  void (*cb)(struct open_chunk *),
+			  uint64_t key, uint64_t size);
 extern ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len);
 extern bool stor_put_end(struct open_chunk *cep);
 extern ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t len);
diff --git a/server/tdbadm.c b/server/tdbadm.c
index e0e674e..823432a 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -437,21 +437,20 @@ static void print_obj(struct db_obj_ent *obj)
 			GUINT16_FROM_LE(obj->size),
 			n_str);
 	} else {
-		printf("%s\t%s\t%s",
+		printf("%s\t%s\t%s\t%llX",
 			obj->bucket,
 			obj->owner,
-			obj->md5);
+			obj->md5,
+			(long long) GUINT64_FROM_LE(obj->d.a.oid));
 		for (i = 0; i < MAXWAY; i++) {
 			if (i == 0) {
 				printf("\t");
 			} else {
 				printf(",");
 			}
-			printf("\t%d:%lld",
-			       GUINT32_FROM_LE(obj->d.avec[i].nid),
-			       (long long) GUINT64_FROM_LE(obj->d.avec[i].oid));
-			printf("%u\n", n_str);
+			printf("%d", GUINT32_FROM_LE(obj->d.a.nidv[i]));
 		}
+		printf(" %u\n", n_str);
 	}
 
 	p = obj;
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux