This patch * adds local, intra-table copy operation to chunkd/libhail * illustrates what files need updating, when adding a new op to chunk * adds some 'worker' infrastructure which should help with future ops, notably remote copy (RCP) * should assist tabled's implementation of S3 copy (x-amz-copy-source) chunkd/chunkd.h | 19 +++++++ chunkd/object.c | 117 ++++++++++++++++++++++++++++++++++++++++++++++ chunkd/server.c | 122 ++++++++++++++++++++++++++++++++++++++++++++---- doc/chcli.8 | 13 ++++- include/chunk_msg.h | 2 include/chunkc.h | 10 +++ lib/chunkdc.c | 56 ++++++++++++++++++++++ test/chunkd/Makefile.am | 5 + tools/chcli.c | 77 ++++++++++++++++++++++++++++++ 9 files changed, 409 insertions(+), 12 deletions(-) diff --git a/chunkd/chunkd.h b/chunkd/chunkd.h index e019f0d..5d39353 100644 --- a/chunkd/chunkd.h +++ b/chunkd/chunkd.h @@ -104,6 +104,8 @@ struct client { unsigned int req_used; /* amount of req_buf in use */ void *req_ptr; /* start of unexamined data */ uint16_t key_len; + unsigned int var_len; /* len of vari len record */ + bool second_var; /* inside 2nd vari len rec? */ char *hdr_start; /* current hdr start */ char *hdr_end; /* current hdr end (so far) */ @@ -124,6 +126,7 @@ struct client { char netbuf_out[CLI_DATA_BUF_SZ]; char key[CHD_KEY_SZ]; char table[CHD_KEY_SZ]; + char key2[CHD_KEY_SZ]; }; struct backend_obj { @@ -162,6 +165,14 @@ struct volume_entry { char *owner; /* obj owner username */ }; +struct worker_info { + enum chunk_errcode err; /* error returned to pipe */ + struct client *cli; /* associated client conn */ + + void (*thr_ev)(struct worker_info *); + void (*pipe_ev)(struct worker_info *); +}; + struct server_stats { unsigned long poll; /* number polls */ unsigned long event; /* events dispatched */ @@ -209,6 +220,10 @@ struct server { GHashTable *fd_info; + GThreadPool *workers; /* global thread worker pool */ + int max_workers; + int worker_pipe[2]; + struct list_head wr_trash; unsigned int trash_sz; @@ -278,6 +293,7 @@ extern int fs_obj_do_sum(const char *fn, unsigned int klen, char **csump); extern bool object_del(struct client *cli); extern bool object_put(struct client *cli); extern bool object_get(struct client *cli, bool want_body); +extern bool object_cp(struct client *cli); extern bool cli_evt_data_in(struct client *cli, unsigned int events); extern void cli_out_end(struct client *cli); extern void cli_in_end(struct client *cli); @@ -314,12 +330,15 @@ extern bool cli_err(struct client *cli, enum chunk_errcode code, bool recycle_ok extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen, cli_write_func cb, void *cb_data); extern bool cli_wr_sendfile(struct client *, cli_write_func); +extern bool cli_rd_set_poll(struct client *cli, bool readable); extern void cli_wr_set_poll(struct client *cli, bool writable); extern bool cli_cb_free(struct client *cli, struct client_write *wr, bool done); extern bool cli_write_start(struct client *cli); extern int cli_req_avail(struct client *cli); extern int cli_poll_mod(struct client *cli); +extern bool worker_pipe_signal(struct worker_info *wi); +extern bool tcp_cli_event(int fd, short events, void *userdata); extern void resp_init_req(struct chunksrv_resp *resp, const struct chunksrv_req *req); diff --git a/chunkd/object.c b/chunkd/object.c index 116792f..af187b6 100644 --- a/chunkd/object.c +++ b/chunkd/object.c @@ -25,6 +25,7 @@ #include <unistd.h> #include <string.h> #include <errno.h> +#include <poll.h> #include <stdio.h> #include <syslog.h> #include <glib.h> @@ -356,3 +357,119 @@ start_write: return cli_write_start(cli); } +static void worker_cp_thr(struct worker_info *wi) +{ + static const unsigned bufsz = (1 * 1024 * 1024); + void *buf = NULL; + struct client *cli = wi->cli; + struct backend_obj *obj = NULL, *out_obj = NULL; + enum chunk_errcode err = che_InternalError; + unsigned char md[SHA_DIGEST_LENGTH]; + char hashstr[50]; + + buf = malloc(bufsz); + if (!buf) + goto out; + + cli->in_obj = obj = fs_obj_open(cli->table_id, cli->user, cli->key2, + cli->var_len, &err); + if (!obj) + goto out; + + cli->in_len = obj->size; + + cli->out_ce = objcache_get_dirty(&chunkd_srv.actives, + cli->key, cli->key_len); + if (!cli->out_ce) + goto out; + + cli->out_bo = out_obj = fs_obj_new(cli->table_id, + cli->key, cli->key_len, &err); + if (!cli->out_bo) + goto out; + + SHA1_Init(&cli->out_hash); + + while (cli->in_len > 0) { + ssize_t rrc, wrc; + + rrc = fs_obj_read(obj, buf, MIN(cli->in_len, bufsz)); + if (rrc < 0) + goto err_out; + if (rrc == 0) + break; + + SHA1_Update(&cli->out_hash, buf, rrc); + cli->in_len -= rrc; + + while (rrc > 0) { + wrc = fs_obj_write(out_obj, buf, rrc); + if (wrc < 0) + goto err_out; + + rrc -= wrc; + } + } + + SHA1_Final(md, &cli->out_hash); + hexstr(md, SHA_DIGEST_LENGTH, hashstr); + + if (!fs_obj_write_commit(out_obj, cli->user, hashstr, false)) + goto err_out; + + err = che_Success; + +out: + free(buf); + cli_in_end(cli); + cli_out_end(cli); + wi->err = err; + worker_pipe_signal(wi); + return; + +err_out: + /* FIXME: remove half-written destination object */ + goto out; +} + +static void worker_cp_pipe(struct worker_info *wi) +{ + struct client *cli = wi->cli; + bool rcb; + + cli_rd_set_poll(cli, true); + + rcb = cli_err(cli, wi->err, (wi->err == che_Success) ? true : false); + if (rcb) { + short events = POLLIN; + if (cli->writing) + events |= POLLOUT; + tcp_cli_event(cli->fd, events, cli); + } + + memset(wi, 0xffffffff, sizeof(*wi)); /* poison */ + free(wi); +} + +bool object_cp(struct client *cli) +{ + enum chunk_errcode err = che_InternalError; + struct worker_info *wi; + + cli_rd_set_poll(cli, false); + + wi = calloc(1, sizeof(*wi)); + if (!wi) { + cli_rd_set_poll(cli, true); + return cli_err(cli, err, false); + } + + wi->thr_ev = worker_cp_thr; + wi->pipe_ev = worker_cp_pipe; + wi->cli = cli; + + g_thread_pool_push(chunkd_srv.workers, wi, NULL); + + return false; +} + diff --git a/chunkd/server.c b/chunkd/server.c index 7dd2227..c3984e9 100644 --- a/chunkd/server.c +++ b/chunkd/server.c @@ -418,6 +418,16 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events) return true; } +bool cli_rd_set_poll(struct client *cli, bool readable) +{ + if (readable) + srv_poll_mask(cli->fd, POLLIN, 0); + else + srv_poll_mask(cli->fd, 0, POLLIN); + + return true; +} + void cli_wr_set_poll(struct client *cli, bool writable) { if (writable) @@ -1052,6 +1062,7 @@ static const char *op2str(enum chunksrv_ops op) case CHO_CHECK_START: return "CHO_CHECK_START"; case CHO_CHECK_STATUS: return "CHO_CHECK_STATUS"; case CHO_START_TLS: return "CHO_START_TLS"; + case CHO_CP: return "CHO_CP"; default: return "BUG/UNKNOWN!"; @@ -1146,6 +1157,9 @@ static bool cli_evt_exec_req(struct client *cli, unsigned int events) case CHO_DEL: rcb = object_del(cli); break; + case CHO_CP: + rcb = object_cp(cli); + break; case CHO_LIST: rcb = volume_list(cli); break; @@ -1229,8 +1243,10 @@ static bool cli_evt_read_fixed(struct client *cli, unsigned int events) /* otherwise, go to read-variable-len-record state */ cli->req_ptr = &cli->key; + cli->var_len = cli->key_len; cli->req_used = 0; cli->state = evt_read_var; + cli->second_var = false; return true; } @@ -1238,7 +1254,7 @@ static bool cli_evt_read_fixed(struct client *cli, unsigned int events) static bool cli_evt_read_var(struct client *cli, unsigned int events) { int rc = cli_read_data(cli, cli->req_ptr, - cli->key_len - cli->req_used); + cli->var_len - cli->req_used); if (rc < 0) { cli->state = evt_dispose; return true; @@ -1248,10 +1264,17 @@ static bool cli_evt_read_var(struct client *cli, unsigned int events) cli->req_used += rc; /* poll for more, if variable-length record not yet received */ - if (cli->req_used < cli->key_len) + if (cli->req_used < cli->var_len) return false; - cli->state = evt_exec_req; + if (cli->creq.op == CHO_CP && !cli->second_var) { + cli->req_ptr = &cli->key2; + cli->var_len = le64_to_cpu(cli->creq.data_len); + cli->req_used = 0; + cli->state = evt_read_var; + cli->second_var = true; + } else + cli->state = evt_exec_req; return true; } @@ -1304,7 +1327,7 @@ static void tcp_cli_wr_event(int fd, short events, void *userdata) cli_writable(cli); } -static bool tcp_cli_event(int fd, short events, void *userdata) +bool tcp_cli_event(int fd, short events, void *userdata) { struct client *cli = userdata; bool loop = false, disposing = false; @@ -1422,6 +1445,25 @@ static int net_write_port(const char *port_file, const char *port_str) return 0; } +static bool pipe_watch(int pipe_fd_0, + bool (*cb)(int fd, short events, void *userdata), + void *userdata) +{ + struct server_poll *sp; + + sp = calloc(1, sizeof(*sp)); + if (!sp) + return false; + + sp->events = POLLIN; + sp->cb = cb; + sp->userdata = userdata; + + g_hash_table_insert(chunkd_srv.fd_info, GINT_TO_POINTER(pipe_fd_0), sp); + + return true; +} + static int net_open_socket(const struct listen_cfg *cfg, int addr_fam, int sock_type, int sock_prot, int addr_len, void *addr_ptr) @@ -1637,6 +1679,45 @@ static int net_open(struct listen_cfg *cfg) return net_open_known(cfg); } +static void worker_thread(gpointer data, gpointer userdata) +{ + struct worker_info *wi = data; + + wi->thr_ev(wi); +} + +bool worker_pipe_signal(struct worker_info *wi) +{ + ssize_t wrc; + + wrc = write(chunkd_srv.worker_pipe[1], &wi, sizeof(wi)); + if (wrc != sizeof(wi)) { + applog(LOG_ERR, "worker pipe output failed: %s", + strerror(errno)); + return false; + } + + return true; +} + +static bool worker_pipe_evt(int fd, short events, void *userdata) +{ + struct worker_info *wi = NULL; + + if (read(fd, &wi, sizeof(wi)) != sizeof(wi)) { + applog(LOG_ERR, "worker pipe input failed: %s", + strerror(errno)); + return false; + } + + wi->pipe_ev(wi); + + if (!srv_poll_ready(fd)) + applog(LOG_ERR, "unable to ready worker fd for polling"); + + return true; +} + static void fill_poll_arr(gpointer key, gpointer val, gpointer userdata) { int fd = GPOINTER_TO_INT(key); @@ -1833,21 +1914,38 @@ int main (int argc, char *argv[]) goto err_out_session; } + chunkd_srv.max_workers = 10; + chunkd_srv.workers = g_thread_pool_new(worker_thread, NULL, + chunkd_srv.max_workers, + FALSE, NULL); + if (!chunkd_srv.workers) { + rc = 1; + goto err_out_fd_info; + } + if (objcache_init(&chunkd_srv.actives) != 0) { rc = 1; - goto err_out_objcache; + goto err_out_workers; } chunkd_srv.trash_sz = 0; if (pipe(chunkd_srv.chk_pipe) < 0) { rc = 1; - goto err_out_pipe; + goto err_out_objcache; + } + if (pipe(chunkd_srv.worker_pipe) < 0) { + rc = 1; + goto err_out_chk_pipe; + } + if (!pipe_watch(chunkd_srv.worker_pipe[0], worker_pipe_evt, NULL)) { + rc = 1; + goto err_out_chk_pipe; } if (fs_open()) { rc = 1; - goto err_out_fs; + goto err_out_worker_pipe; } /* set up server networking */ @@ -1877,13 +1975,17 @@ err_out_cld: /* net_close(); */ err_out_listen: fs_close(); -err_out_fs: +err_out_worker_pipe: +err_out_chk_pipe: cmd = CHK_CMD_EXIT; write(chunkd_srv.chk_pipe[1], &cmd, 1); close(chunkd_srv.chk_pipe[1]); -err_out_pipe: - objcache_fini(&chunkd_srv.actives); err_out_objcache: + objcache_fini(&chunkd_srv.actives); +err_out_workers: + if (strict_free) + g_thread_pool_free(chunkd_srv.workers, TRUE, FALSE); +err_out_fd_info: if (strict_free) g_hash_table_destroy(chunkd_srv.fd_info); err_out_session: diff --git a/doc/chcli.8 b/doc/chcli.8 index 477d8d4..0de059b 100644 --- a/doc/chcli.8 +++ b/doc/chcli.8 @@ -86,7 +86,14 @@ than the command line. Obtain key portion of a key/value pair from the specified file, rather than the command line. Keys provided on the command line (as opposed to via -k) are stored with a C-style nul terminating -character appended, adding 1 byte to each key. +character appended, adding 1 byte to each key. If the command is copy (CP), +this represents the destination key. +.TP +.B \-s \-\-src +Obtain source-key portion of a source-key/source-value pair from the +specified file, rather than the command line. Keys provided on the +command line (as opposed to via -s) are stored with a C-style nul +terminating character appended, adding 1 byte to each key. .TP .B \-\-list-cmds List all supported commands, and a short command usage synopsis. @@ -131,6 +138,10 @@ Fetch status of server self-check .B CHECKSTART Begin server self-check .TP +.B CP dest-key src-key +Copy object represented by 'src-key' into new object referenced +by 'dest-key' +.TP Keys provided on the command line (as opposed to via -k) are stored with a C-style nul terminating character appended, adding 1 byte to each key. diff --git a/include/chunk_msg.h b/include/chunk_msg.h index 4a3c15d..3ee1c6a 100644 --- a/include/chunk_msg.h +++ b/include/chunk_msg.h @@ -48,6 +48,8 @@ enum chunksrv_ops { * functions' success/failure is sufficient indication. */ CHO_START_TLS = 10, /* Encrypt all subsequent msgs */ + + CHO_CP = 11, /* local object copy (intra-table) */ }; enum chunk_errcode { diff --git a/include/chunkc.h b/include/chunkc.h index 683992e..1fd7066 100644 --- a/include/chunkc.h +++ b/include/chunkc.h @@ -85,6 +85,9 @@ extern bool stc_put_sync(struct st_client *stc); extern bool stc_put_inline(struct st_client *stc, const void *key, size_t key_len, void *data, uint64_t len, uint32_t flags); +extern bool stc_cp(struct st_client *stc, + const void *dest_key, size_t dest_key_len, + const void *src_key, size_t src_key_len); extern bool stc_del(struct st_client *stc, const void *key, size_t key_len); extern bool stc_ping(struct st_client *stc); @@ -133,4 +136,11 @@ static inline bool stc_table_openz(struct st_client *stc, const char *key, return stc_table_open(stc, key, strlen(key) + 1, flags); } +static inline bool stc_cpz(struct st_client *stc, + const char *dest_key, const char *src_key) +{ + return stc_cp(stc, dest_key, strlen(dest_key) + 1, + src_key, strlen(src_key) + 1); +} + #endif /* __STC_H__ */ diff --git a/lib/chunkdc.c b/lib/chunkdc.c index 7441662..260f4d9 100644 --- a/lib/chunkdc.c +++ b/lib/chunkdc.c @@ -1091,6 +1091,62 @@ bool stc_check_status(struct st_client *stc, struct chunk_check_status *out) return true; } +bool stc_cp(struct st_client *stc, + const void *dest_key, size_t dest_key_len, + const void *src_key, size_t src_key_len) +{ + struct chunksrv_resp resp; + struct chunksrv_req *req; + void *p; + bool rcb = false; + size_t alloc_len; + + if (stc->verbose) + fprintf(stderr, "libstc: CP\n"); + + alloc_len = sizeof(*req) + src_key_len + dest_key_len; + req = malloc(alloc_len); + if (!req) + return false; + + /* initialize request */ + req_init(stc, req); + req->op = CHO_CP; + req->data_len = cpu_to_le64(src_key_len); + + /* store destination (new) key in key (1st) buffer area */ + req_set_key(req, dest_key, dest_key_len); + + /* store source (old) key in data (2nd) buffer area */ + p = (req + 1); + p += dest_key_len; + memcpy(p, src_key, src_key_len); + + /* sign request */ + chreq_sign(req, stc->key, req->sig); + + /* write request */ + if (!net_write(stc, req, alloc_len)) + goto out; + + /* read response header */ + if (!net_read(stc, &resp, sizeof(resp))) + goto out; + + /* check response code */ + if (resp.resp_code != che_Success) { + if (stc->verbose) + fprintf(stderr, "CP resp code: %d\n", resp.resp_code); + goto out; + } + + rcb = true; + +out: + free(req); + return rcb; +} + /* * For extra safety, call stc_init after g_thread_init, if present. * Currently we just call srand(), but since we use GLib, we may need diff --git a/test/chunkd/Makefile.am b/test/chunkd/Makefile.am index 1cb9875..def9b36 100644 --- a/test/chunkd/Makefile.am +++ b/test/chunkd/Makefile.am @@ -1,5 +1,6 @@ INCLUDES = -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib \ @GLIB_CFLAGS@ @XML_CPPFLAGS@ EXTRA_DIST = \ @@ -24,13 +25,14 @@ TESTS = \ nop \ basic-object \ auth \ + cp \ large-object \ lotsa-objects \ selfcheck-unit \ stop-daemon \ clean-db -check_PROGRAMS = auth basic-object it-works large-object \ +check_PROGRAMS = auth basic-object cp it-works large-object \ lotsa-objects nop objcache-unit selfcheck-unit TESTLDADD = ../../lib/libhail.la \ @@ -38,6 +40,7 @@ TESTLDADD = ../../lib/libhail.la \ @GLIB_LIBS@ @CRYPTO_LIBS@ \ @XML_LIBS@ @SSL_LIBS@ basic_object_LDADD = $(TESTLDADD) +cp_LDADD = $(TESTLDADD) auth_LDADD = $(TESTLDADD) it_works_LDADD = $(TESTLDADD) large_object_LDADD = $(TESTLDADD) diff --git a/tools/chcli.c b/tools/chcli.c index 55e7a7d..160af18 100644 --- a/tools/chcli.c +++ b/tools/chcli.c @@ -56,6 +56,8 @@ static struct argp_option options[] = { "Read value from FILE, rather than command line" }, { "output", 'o', "FILE", 0, "Send GET output to FILE, rather than stdout" }, + { "src", 's', "FILE", 0, + "Read source key from FILE, rather than command line" }, { "ssl", 'S', NULL, 0, "Enable SSL channel security" }, { "table", 't', "TABLE", 0, @@ -91,6 +93,7 @@ enum chcli_cmd { CHC_PING, CHC_CHECKSTATUS, CHC_CHECKSTART, + CHC_CP, }; struct chcli_host { @@ -107,6 +110,9 @@ static char *output_fn; static char *key_data; static gsize key_data_len; static bool key_in_file; +static char *key2_data; +static gsize key2_data_len; +static bool key2_in_file; static char *input_fn; static bool value_in_file; static char *table_name; @@ -186,6 +192,7 @@ static void show_cmds(void) "PING Ping server\n" "CHECKSTATUS Fetch status of server self-check\n" "CHECKSTART Begin server self-check\n" +"CP dst src Copy object 'src' into new object 'dst'\n" "\n" "Keys provided on the command line (as opposed to via -k) are stored\n" "with a C-style nul terminating character appended, adding 1 byte to\n" @@ -283,6 +290,14 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) } key_in_file = true; break; + case 's': + if (!g_file_get_contents(arg, &key2_data, &key2_data_len, + NULL)) { + fprintf(stderr, "failed to read src file %s\n", arg); + argp_usage(state); + } + key2_in_file = true; + break; case 'i': input_fn = arg; value_in_file = true; @@ -324,6 +339,8 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) cmd_mode = CHC_CHECKSTATUS; else if (!strcasecmp(arg, "checkstart")) cmd_mode = CHC_CHECKSTART; + else if (!strcasecmp(arg, "cp")) + cmd_mode = CHC_CP; else argp_usage(state); /* invalid cmd */ break; @@ -386,6 +403,64 @@ static int cmd_ping(void) return 0; } +static int cmd_cp(void) +{ + struct st_client *stc; + + /* if dest key data not supplied via file, absorb first cmd arg */ + if (!key_data) { + if (!n_cmd_args) { + fprintf(stderr, "CP requires dst,src args\n"); + return 1; + } + + key_data = cmd_args[0]; + key_data_len = strlen(cmd_args[0]) + 1; + + cmd_args++; + n_cmd_args--; + } + + /* if src key data not supplied via file, absorb second cmd arg */ + if (!key2_data) { + if (!n_cmd_args) { + fprintf(stderr, "CP requires dst,src args\n"); + return 1; + } + + key2_data = cmd_args[0]; + key2_data_len = strlen(cmd_args[0]) + 1; + + cmd_args++; + n_cmd_args--; + } + + if (key_data_len < 1 || key_data_len > CHD_KEY_SZ) { + fprintf(stderr, "CP: invalid key size %u\n", + (unsigned int) key_data_len); + return 1; + } + if (key2_data_len < 1 || key2_data_len > CHD_KEY_SZ) { + fprintf(stderr, "CP: invalid key size %u\n", + (unsigned int) key2_data_len); + return 1; + } + + stc = chcli_stc_new(); + if (!stc) + return 1; + + if (!stc_cp(stc, key_data, key_data_len, key2_data, key2_data_len)) { + fprintf(stderr, "CP failed\n"); + stc_free(stc); + return 1; + } + + stc_free(stc); + + return 0; +} + static int cmd_del(void) { struct st_client *stc; @@ -765,6 +840,8 @@ int main (int argc, char *argv[]) return cmd_check_status(); case CHC_CHECKSTART: return cmd_check_start(); + case CHC_CP: + return cmd_cp(); } return 0; -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html