[PATCH] chunkd: add cp command, for local intra-table copies

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The following patch, against current hail.git, adds the "CP" command to
chunkd, permitting copying from object->object inside a single table.

It also adds a worker thread pool that may be used by other background
tasks as well.


diff --git a/chunkd/chunkd.h b/chunkd/chunkd.h
index e019f0d..f1f7d04 100644
--- a/chunkd/chunkd.h
+++ b/chunkd/chunkd.h
@@ -104,6 +104,8 @@ struct client {
 	unsigned int		req_used;	/* amount of req_buf in use */
 	void			*req_ptr;	/* start of unexamined data */
 	uint16_t		key_len;
+	unsigned int		var_len;
+	bool			second_var;
 
 	char			*hdr_start;	/* current hdr start */
 	char			*hdr_end;	/* current hdr end (so far) */
@@ -124,6 +126,7 @@ struct client {
 	char			netbuf_out[CLI_DATA_BUF_SZ];
 	char			key[CHD_KEY_SZ];
 	char			table[CHD_KEY_SZ];
+	char			key2[CHD_KEY_SZ];
 };
 
 struct backend_obj {
@@ -162,6 +165,16 @@ struct volume_entry {
 	char			*owner;		/* obj owner username */
 };
 
+enum chunk_worker_op {
+	CW_CP,					/* local obj copy */
+};
+
+struct worker_info {
+	enum chunk_worker_op	op;
+	struct client		*cli;
+	enum chunk_errcode	err;
+};
+
 struct server_stats {
 	unsigned long		poll;		/* number polls */
 	unsigned long		event;		/* events dispatched */
@@ -209,6 +222,10 @@ struct server {
 
 	GHashTable		*fd_info;
 
+	GThreadPool		*workers;
+	int			max_workers;
+	int			worker_pipe[2];
+
 	struct list_head	wr_trash;
 	unsigned int		trash_sz;
 
@@ -278,6 +295,9 @@ extern int fs_obj_do_sum(const char *fn, unsigned int klen, char **csump);
 extern bool object_del(struct client *cli);
 extern bool object_put(struct client *cli);
 extern bool object_get(struct client *cli, bool want_body);
+extern bool object_cp(struct client *cli);
+extern void worker_cp_thr(struct worker_info *wi);
+extern void worker_cp_pipe(struct worker_info *wi);
 extern bool cli_evt_data_in(struct client *cli, unsigned int events);
 extern void cli_out_end(struct client *cli);
 extern void cli_in_end(struct client *cli);
@@ -314,12 +334,14 @@ extern bool cli_err(struct client *cli, enum chunk_errcode code, bool recycle_ok
 extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
 		     cli_write_func cb, void *cb_data);
 extern bool cli_wr_sendfile(struct client *, cli_write_func);
+extern bool cli_rd_set_poll(struct client *cli, bool readable);
 extern void cli_wr_set_poll(struct client *cli, bool writable);
 extern bool cli_cb_free(struct client *cli, struct client_write *wr,
 			bool done);
 extern bool cli_write_start(struct client *cli);
 extern int cli_req_avail(struct client *cli);
 extern int cli_poll_mod(struct client *cli);
+extern bool tcp_cli_event(int fd, short events, void *userdata);
 extern void resp_init_req(struct chunksrv_resp *resp,
 		   const struct chunksrv_req *req);
 
diff --git a/chunkd/object.c b/chunkd/object.c
index 116792f..105eb84 100644
--- a/chunkd/object.c
+++ b/chunkd/object.c
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <string.h>
 #include <errno.h>
+#include <poll.h>
 #include <stdio.h>
 #include <syslog.h>
 #include <glib.h>
@@ -356,3 +357,102 @@ start_write:
 	return cli_write_start(cli);
 }
 
+bool object_cp(struct client *cli)
+{
+	enum chunk_errcode err = che_InternalError;
+	struct worker_info *wi;
+
+	wi = calloc(1, sizeof(*wi));
+	if (!wi)
+		return cli_err(cli, err, false);
+
+	wi->op = CW_CP;
+	wi->cli = cli;
+
+	g_thread_pool_push(chunkd_srv.workers, wi, NULL);
+
+	cli_rd_set_poll(cli, false);
+
+	return false;
+}
+
+void worker_cp_thr(struct worker_info *wi)
+{
+	unsigned char md[SHA_DIGEST_LENGTH];
+	char hashstr[50];
+	struct client *cli = wi->cli;
+	struct backend_obj *obj = NULL, *out_obj = NULL;
+	enum chunk_errcode err = che_InternalError;
+	void *buf;
+	static const unsigned bufsz = 1 * 1024 * 1024;
+
+	buf = malloc(bufsz);
+	if (!buf)
+		goto out;
+
+	cli->in_obj = obj = fs_obj_open(cli->table_id, cli->user, cli->key2,
+					cli->var_len, &err);
+	if (!obj)
+		goto out;
+
+	cli->in_len = obj->size;
+
+	cli->out_bo = out_obj = fs_obj_new(cli->table_id,
+					   cli->key, cli->key_len, &err);
+	if (!cli->out_bo)
+		goto out;
+
+	SHA1_Init(&cli->out_hash);
+
+	while (cli->in_len > 0) {
+		ssize_t rrc, wrc;
+
+		rrc = fs_obj_read(obj, buf, MIN(cli->in_len, bufsz));
+		if (rrc < 0)
+			goto err_out;
+		if (rrc == 0)
+			break;
+
+		SHA1_Update(&cli->out_hash, buf, rrc);
+
+		wrc = fs_obj_write(out_obj, buf, rrc);
+		if (wrc < 0)
+			goto err_out;
+
+		cli->in_len -= wrc;
+	}
+
+	SHA1_Final(md, &cli->out_hash);
+	hexstr(md, SHA_DIGEST_LENGTH, hashstr);
+
+	if (!fs_obj_write_commit(out_obj, cli->user, hashstr, false))
+		goto err_out;
+
+	err = che_Success;
+
+out:
+	if (buf)
+		free(buf);
+	if (obj)
+		fs_obj_free(obj);
+	if (out_obj)
+		fs_obj_free(out_obj);
+	wi->err = err;
+	write(chunkd_srv.worker_pipe[1], &wi, sizeof(wi));
+
+err_out:
+	/* FIXME: remove half-written destination object */
+	goto out;
+}
+
+void worker_cp_pipe(struct worker_info *wi)
+{
+	struct client *cli = wi->cli;
+	bool rcb;
+
+	cli_rd_set_poll(cli, true);
+
+	rcb = cli_err(cli, wi->err, (wi->err == che_Success) ? true : false);
+	if (rcb)
+		tcp_cli_event(cli->fd, POLLIN | POLLOUT, cli);
+}
diff --git a/chunkd/server.c b/chunkd/server.c
index abd29ad..0fec33a 100644
--- a/chunkd/server.c
+++ b/chunkd/server.c
@@ -418,6 +418,16 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
 	return true;
 }
 
+bool cli_rd_set_poll(struct client *cli, bool readable)
+{
+	if (readable)
+		srv_poll_mask(cli->fd, POLLIN, 0);
+	else
+		srv_poll_mask(cli->fd, 0, POLLIN);
+	
+	return true;
+}
+
 void cli_wr_set_poll(struct client *cli, bool writable)
 {
 	if (writable)
@@ -1049,6 +1059,7 @@ static const char *op2str(enum chunksrv_ops op)
 	case CHO_CHECK_START:	return "CHO_CHECK_START";
 	case CHO_CHECK_STATUS:	return "CHO_CHECK_STATUS";
 	case CHO_START_TLS:	return "CHO_START_TLS";
+	case CHO_CP:		return "CHO_CP";
 
 	default:
 		return "BUG/UNKNOWN!";
@@ -1143,6 +1154,9 @@ static bool cli_evt_exec_req(struct client *cli, unsigned int events)
 	case CHO_DEL:
 		rcb = object_del(cli);
 		break;
+	case CHO_CP:
+		rcb = object_cp(cli);
+		break;
 	case CHO_LIST:
 		rcb = volume_list(cli);
 		break;
@@ -1226,8 +1240,10 @@ static bool cli_evt_read_fixed(struct client *cli, unsigned int events)
 
 	/* otherwise, go to read-variable-len-record state */
 	cli->req_ptr = &cli->key;
+	cli->var_len = cli->key_len;
 	cli->req_used = 0;
 	cli->state = evt_read_var;
+	cli->second_var = false;
 
 	return true;
 }
@@ -1235,7 +1251,7 @@ static bool cli_evt_read_fixed(struct client *cli, unsigned int events)
 static bool cli_evt_read_var(struct client *cli, unsigned int events)
 {
 	int rc = cli_read_data(cli, cli->req_ptr,
-			       cli->key_len - cli->req_used);
+			       cli->var_len - cli->req_used);
 	if (rc < 0) {
 		cli->state = evt_dispose;
 		return true;
@@ -1245,10 +1261,17 @@ static bool cli_evt_read_var(struct client *cli, unsigned int events)
 	cli->req_used += rc;
 
 	/* poll for more, if variable-length record not yet received */
-	if (cli->req_used < cli->key_len)
+	if (cli->req_used < cli->var_len)
 		return false;
 
-	cli->state = evt_exec_req;
+	if (cli->creq.op == CHO_CP && !cli->second_var) {
+		cli->req_ptr = &cli->key2;
+		cli->var_len = le64_to_cpu(cli->creq.data_len);
+		cli->req_used = 0;
+		cli->state = evt_read_var;
+		cli->second_var = true;
+	} else
+		cli->state = evt_exec_req;
 
 	return true;
 }
@@ -1301,7 +1324,7 @@ static void tcp_cli_wr_event(int fd, short events, void *userdata)
 		cli_writable(cli);
 }
 
-static bool tcp_cli_event(int fd, short events, void *userdata)
+bool tcp_cli_event(int fd, short events, void *userdata)
 {
 	struct client *cli = userdata;
 	bool loop = false, disposing = false;
@@ -1419,6 +1442,25 @@ static int net_write_port(const char *port_file, const char *port_str)
 	return 0;
 }
 
+static bool pipe_watch(int pipe_fd_0, 
+		       bool (*cb)(int fd, short events, void *userdata),
+		       void *userdata)
+{
+	struct server_poll *sp;
+
+	sp = calloc(1, sizeof(*sp));
+	if (!sp)
+		return false;
+
+	sp->events = POLLIN;
+	sp->cb = cb;
+	sp->userdata = userdata;
+
+	g_hash_table_insert(chunkd_srv.fd_info, GINT_TO_POINTER(pipe_fd_0), sp);
+
+	return true;
+}
+
 static int net_open_socket(const struct listen_cfg *cfg,
 			   int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr)
@@ -1634,6 +1676,33 @@ static int net_open(struct listen_cfg *cfg)
 		return net_open_known(cfg);
 }
 
+static void worker_thread(gpointer data, gpointer userdata)
+{
+	struct worker_info *wi = data;
+
+	switch (wi->op) {
+	case CW_CP:
+		worker_cp_thr(wi);
+		break;
+	}
+}
+
+static bool worker_pipe_evt(int fd, short events, void *userdata)
+{
+	struct worker_info *wi = NULL;
+
+	if (read(fd, &wi, sizeof(wi)) != sizeof(wi))
+		return false;
+
+	switch (wi->op) {
+	case CW_CP:
+		worker_cp_pipe(wi);
+		break;
+	}
+
+	return true;
+}
+
 static void fill_poll_arr(gpointer key, gpointer val, gpointer userdata)
 {
 	int fd = GPOINTER_TO_INT(key);
@@ -1830,21 +1899,38 @@ int main (int argc, char *argv[])
 		goto err_out_session;
 	}
 
+	chunkd_srv.max_workers = 10;
+	chunkd_srv.workers = g_thread_pool_new(worker_thread, NULL,
+					       chunkd_srv.max_workers,
+					       FALSE, NULL);
+	if (!chunkd_srv.workers) {
+		rc = 1;
+		goto err_out_fd_info;
+	}
+
 	if (objcache_init(&chunkd_srv.actives) != 0) {
 		rc = 1;
-		goto err_out_objcache;
+		goto err_out_workers;
 	}
 
 	chunkd_srv.trash_sz = 0;
 
 	if (pipe(chunkd_srv.chk_pipe) < 0) {
 		rc = 1;
-		goto err_out_pipe;
+		goto err_out_objcache;
+	}
+	if (pipe(chunkd_srv.worker_pipe) < 0) {
+		rc = 1;
+		goto err_out_chk_pipe;
+	}
+	if (!pipe_watch(chunkd_srv.worker_pipe[0], worker_pipe_evt, NULL)) {
+		rc = 1;
+		goto err_out_chk_pipe;
 	}
 
 	if (fs_open()) {
 		rc = 1;
-		goto err_out_fs;
+		goto err_out_worker_pipe;
 	}
 
 	/* set up server networking */
@@ -1874,13 +1960,17 @@ err_out_cld:
 	/* net_close(); */
 err_out_listen:
 	fs_close();
-err_out_fs:
+err_out_worker_pipe:
+err_out_chk_pipe:
 	cmd = CHK_CMD_EXIT;
 	write(chunkd_srv.chk_pipe[1], &cmd, 1);
 	close(chunkd_srv.chk_pipe[1]);
-err_out_pipe:
-	objcache_fini(&chunkd_srv.actives);
 err_out_objcache:
+	objcache_fini(&chunkd_srv.actives);
+err_out_workers:
+	if (strict_free)
+		g_thread_pool_free(chunkd_srv.workers, TRUE, FALSE);
+err_out_fd_info:
 	if (strict_free)
 		g_hash_table_destroy(chunkd_srv.fd_info);
 err_out_session:
diff --git a/include/chunk_msg.h b/include/chunk_msg.h
index 4a3c15d..3ee1c6a 100644
--- a/include/chunk_msg.h
+++ b/include/chunk_msg.h
@@ -48,6 +48,8 @@ enum chunksrv_ops {
 	 * functions' success/failure is sufficient indication.
 	 */
 	CHO_START_TLS		= 10,	/* Encrypt all subsequent msgs */
+
+	CHO_CP			= 11,	/* local object copy (intra-table) */
 };
 
 enum chunk_errcode {
diff --git a/include/chunkc.h b/include/chunkc.h
index 683992e..005ba58 100644
--- a/include/chunkc.h
+++ b/include/chunkc.h
@@ -85,6 +85,9 @@ extern bool stc_put_sync(struct st_client *stc);
 extern bool stc_put_inline(struct st_client *stc, const void *key,
 			   size_t key_len, void *data, uint64_t len,
 			   uint32_t flags);
+extern bool stc_cp(struct st_client *stc,
+		   const void *dest_key, size_t dest_key_len,
+		   const void *src_key, size_t src_key_len);
 
 extern bool stc_del(struct st_client *stc, const void *key, size_t key_len);
 extern bool stc_ping(struct st_client *stc);
@@ -133,4 +136,11 @@ static inline bool stc_table_openz(struct st_client *stc, const char *key,
 	return stc_table_open(stc, key, strlen(key) + 1, flags);
 }
 
+static inline bool stc_cpz(struct st_client *stc,
+			   const char *dest_key, const char *src_key)
+{
+	return stc_cp(stc, dest_key, strlen(dest_key),
+		      src_key, strlen(src_key));
+}
+
 #endif /* __STC_H__ */
diff --git a/lib/chunkdc.c b/lib/chunkdc.c
index 7441662..260f4d9 100644
--- a/lib/chunkdc.c
+++ b/lib/chunkdc.c
@@ -1091,6 +1091,62 @@ bool stc_check_status(struct st_client *stc, struct chunk_check_status *out)
 	return true;
 }
 
+bool stc_cp(struct st_client *stc,
+	    const void *dest_key, size_t dest_key_len,
+	    const void *src_key, size_t src_key_len)
+{
+	struct chunksrv_resp resp;
+	struct chunksrv_req *req;
+	void *p;
+	bool rcb = false;
+	size_t alloc_len;
+
+	if (stc->verbose)
+		fprintf(stderr, "libstc: CP\n");
+
+	alloc_len = sizeof(*req) + src_key_len + dest_key_len;
+	req = malloc(alloc_len);
+	if (!req)
+		return false;
+
+	/* initialize request */
+	req_init(stc, req);
+	req->op = CHO_CP;
+	req->data_len = cpu_to_le64(src_key_len);
+
+	/* store destination (new) key in key (1st) buffer area */
+	req_set_key(req, dest_key, dest_key_len);
+
+	/* store source (old) key in data (2nd) buffer area */
+	p = (req + 1);
+	p += dest_key_len;
+	memcpy(p, src_key, src_key_len);
+
+	/* sign request */
+	chreq_sign(req, stc->key, req->sig);
+
+	/* write request */
+	if (!net_write(stc, req, alloc_len))
+		goto out;
+
+	/* read response header */
+	if (!net_read(stc, &resp, sizeof(resp)))
+		goto out;
+
+	/* check response code */
+	if (resp.resp_code != che_Success) {
+		if (stc->verbose)
+			fprintf(stderr, "CP resp code: %d\n", resp.resp_code);
+		goto out;
+	}
+
+	rcb = true;
+
+out:
+	free(req);
+	return rcb;
+}
+
 /*
  * For extra safety, call stc_init after g_thread_init, if present.
  * Currently we just call srand(), but since we use GLib, we may need
diff --git a/test/chunkd/Makefile.am b/test/chunkd/Makefile.am
index 1cb9875..390462b 100644
--- a/test/chunkd/Makefile.am
+++ b/test/chunkd/Makefile.am
@@ -24,13 +24,14 @@ TESTS =				\
 	nop			\
 	basic-object		\
 	auth			\
+	cp			\
 	large-object		\
 	lotsa-objects		\
 	selfcheck-unit		\
 	stop-daemon		\
 	clean-db
 
-check_PROGRAMS		= auth basic-object it-works large-object \
+check_PROGRAMS		= auth basic-object cp it-works large-object \
 			  lotsa-objects nop objcache-unit selfcheck-unit
 
 TESTLDADD		= ../../lib/libhail.la	\
@@ -38,6 +39,7 @@ TESTLDADD		= ../../lib/libhail.la	\
 			  @GLIB_LIBS@ @CRYPTO_LIBS@ \
 			  @XML_LIBS@ @SSL_LIBS@
 basic_object_LDADD	= $(TESTLDADD)
+cp_LDADD		= $(TESTLDADD)
 auth_LDADD		= $(TESTLDADD)
 it_works_LDADD		= $(TESTLDADD)
 large_object_LDADD	= $(TESTLDADD)
diff --git a/test/chunkd/cp.c b/test/chunkd/cp.c
new file mode 100644
index 0000000..85f61c4
--- /dev/null
+++ b/test/chunkd/cp.c
@@ -0,0 +1,116 @@
+
+/*
+ * Copyright 2009-2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "hail-config.h"
+
+#include <sys/types.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <time.h>
+#include <string.h>
+#include <locale.h>
+#include <chunkc.h>
+#include "test.h"
+
+static void test(bool do_encrypt)
+{
+	struct st_object *obj;
+	struct st_keylist *klist;
+	struct st_client *stc;
+	int port;
+	bool rcb;
+	char val[] = "my first value";
+	char key[64] = "deadbeef";
+	char key2[64] = "deadcab0";
+	size_t len = 0;
+	void *mem;
+
+	port = stc_readport(TEST_PORTFILE);
+	OK(port > 0);
+
+	stc = stc_new(TEST_HOST, port, TEST_USER, TEST_USER_KEY, do_encrypt);
+	OK(stc);
+
+	rcb = stc_table_openz(stc, TEST_TABLE, 0);
+	OK(rcb);
+
+	/* store object */
+	rcb = stc_put_inlinez(stc, key, val, strlen(val), 0);
+	OK(rcb);
+
+	/* make sure object appears in list of volume keys */
+	klist = stc_keys(stc);
+	OK(klist);
+	OK(klist->contents);
+	OK(klist->contents->next == NULL);
+
+	obj = klist->contents->data;
+	OK(obj);
+	OK(obj->name);
+	OK(!strcmp(obj->name, key));
+	OK(obj->time_mod);
+	OK(obj->etag);
+	OK(obj->size == strlen(val));
+	OK(obj->owner);
+
+	stc_free_keylist(klist);
+
+	/* get object */
+	mem = stc_get_inlinez(stc, key, &len);
+	OK(mem);
+	OK(len == strlen(val));
+	OK(!memcmp(val, mem, strlen(val)));
+
+	free(mem);
+
+	/* copy object */
+	rcb = stc_cpz(stc, key2, key);
+	OK(rcb);
+
+	/* get object copy */
+	mem = stc_get_inlinez(stc, key2, &len);
+	OK(mem);
+	OK(len == strlen(val));
+	OK(!memcmp(val, mem, strlen(val)));
+
+	free(mem);
+
+	/* delete objects */
+	rcb = stc_delz(stc, key);
+	OK(rcb);
+	rcb = stc_delz(stc, key2);
+	OK(rcb);
+
+	stc_free(stc);
+}
+
+int main(int argc, char *argv[])
+{
+	setlocale(LC_ALL, "C");
+
+	stc_init();
+	SSL_library_init();
+	SSL_load_error_strings();
+
+	test(false);
+	test(true);
+
+	return 0;
+}
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Fedora Clound]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux