commit f2493782233b8b581edd5975c3b2019d80ca6552 Author: Jeff Garzik <jeff@xxxxxxxxxx> Date: Tue Sep 14 22:59:50 2010 -0400 chunk: add get-partial operation Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx> chunkd/be-fs.c | 25 +++++++++- chunkd/chunkd.h | 6 +- chunkd/object.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++ chunkd/server.c | 14 +++++ doc/chcli.8 | 4 + include/chunk_msg.h | 17 ++++++- include/chunkc.h | 33 +++++++++++++ lib/chunkdc.c | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/chunksrv.c | 3 + tools/chcli.c | 102 ++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 435 insertions(+), 7 deletions(-) diff --git a/chunkd/be-fs.c b/chunkd/be-fs.c index 24baad7..c2dc901 100644 --- a/chunkd/be-fs.c +++ b/chunkd/be-fs.c @@ -55,6 +55,8 @@ struct fs_obj { off_t in_pos; off_t sendfile_ofs; + off_t value_ofs; + off_t tail_pos; size_t tail_len; @@ -396,12 +398,13 @@ struct backend_obj *fs_obj_new(uint32_t table_id, /* calculate size of front-of-file metadata area */ skip_len = sizeof(struct be_fs_obj_hdr) + key_len + csum_bytes; + obj->value_ofs = skip_len; /* position file pointer where object data (as in, not metadata) * will begin */ errno = 0; - if (lseek(obj->out_fd, skip_len, SEEK_SET) != skip_len) { + if (lseek(obj->out_fd, obj->value_ofs, SEEK_SET) != obj->value_ofs) { applog(LOG_ERR, "obj hdr seek(%s) failed: %s", fn, strerror(errno)); goto err_out; @@ -500,9 +503,10 @@ struct backend_obj *fs_obj_open(uint32_t table_id, const char *user, csum_bytes = obj->n_blk * CHD_CSUM_SZ; obj->tail_pos = value_len & ~(CHUNK_BLK_SZ - 1); obj->tail_len = value_len & (CHUNK_BLK_SZ - 1); + obj->value_ofs = sizeof(hdr) + key_len + csum_bytes; /* verify file size large enough to contain value */ - tmp64 = value_len + sizeof(hdr) + key_len + csum_bytes; + tmp64 = obj->value_ofs + value_len; if (G_UNLIKELY(st.st_size < tmp64)) { applog(LOG_ERR, "obj(%s) size error, too small", obj->in_fn); goto err_out; @@ -596,6 +600,23 @@ static bool can_csum_blk(struct fs_obj *obj, size_t len) return false; } +int fs_obj_seek(struct backend_obj *bo, off_t ofs) +{ + struct fs_obj *obj = bo->private; + off_t rc; + + rc = lseek(obj->in_fd, obj->value_ofs + ofs, SEEK_SET); + if (rc == (off_t)-1) { + applog(LOG_ERR, "obj seek(%s) failed: %s", + obj->in_fn, strerror(errno)); + return -errno; + } + + obj->in_pos = ofs; + + return 0; +} + ssize_t fs_obj_read(struct backend_obj *bo, void *ptr, size_t len) { struct fs_obj *obj = bo->private; diff --git a/chunkd/chunkd.h b/chunkd/chunkd.h index 7da2c5f..15e96be 100644 --- a/chunkd/chunkd.h +++ b/chunkd/chunkd.h @@ -36,9 +36,6 @@ #endif enum { - CHUNK_BLK_ORDER = 16, /* 64k blocks */ - CHUNK_BLK_SZ = 1 << CHUNK_BLK_ORDER, - CLI_DATA_BUF_SZ = CHUNK_BLK_SZ, CHD_TRASH_MAX = 1000, @@ -95,6 +92,7 @@ struct client { bool writing; struct chunksrv_req creq; + struct chunksrv_req_getpart creq_getpart; unsigned int req_used; /* amount of req_buf in use */ void *req_ptr; /* start of unexamined data */ uint16_t key_len; @@ -264,6 +262,7 @@ extern struct backend_obj *fs_obj_open(uint32_t table_id, const char *user, enum chunk_errcode *err_code); extern ssize_t fs_obj_write(struct backend_obj *bo, const void *ptr, size_t len); extern ssize_t fs_obj_read(struct backend_obj *bo, void *ptr, size_t len); +extern int fs_obj_seek(struct backend_obj *bo, off_t ofs); extern void fs_obj_free(struct backend_obj *bo); extern bool fs_obj_write_commit(struct backend_obj *bo, const char *user, unsigned char *md, bool sync_data); @@ -290,6 +289,7 @@ extern int fs_obj_do_sum(const char *fn, unsigned int klen, unsigned char *md); extern bool object_del(struct client *cli); extern bool object_put(struct client *cli); extern bool object_get(struct client *cli, bool want_body); +extern bool object_get_part(struct client *cli); extern bool object_cp(struct client *cli); extern bool cli_evt_data_in(struct client *cli, unsigned int events); extern void cli_out_end(struct client *cli); diff --git a/chunkd/object.c b/chunkd/object.c index d7d3cb6..3242ba0 100644 --- a/chunkd/object.c +++ b/chunkd/object.c @@ -354,6 +354,119 @@ start_write: return cli_write_start(cli); } +bool object_get_part(struct client *cli) +{ + static const uint64_t max_getpart = CHUNK_MAX_GETPART * CHUNK_BLK_SZ; + int rc; + enum chunk_errcode err = che_InternalError; + struct backend_obj *obj; + struct chunksrv_resp_get *get_resp = NULL; + uint64_t offset, length, remain; + uint64_t aligned_ofs, aligned_len, aligned_rem; + ssize_t rrc; + void *mem = NULL; + + get_resp = calloc(1, sizeof(*get_resp)); + if (!get_resp) { + cli->state = evt_dispose; + return true; + } + + resp_init_req(&get_resp->resp, &cli->creq); + + cli->in_obj = obj = fs_obj_open(cli->table_id, cli->user, cli->key, + cli->key_len, &err); + if (!obj) { + free(get_resp); + return cli_err(cli, err, true); + } + + cli->in_len = obj->size; + + /* obtain requested offset */ + offset = le64_to_cpu(cli->creq_getpart.offset); + if (offset > obj->size) { + err = che_InvalidSeek; + free(get_resp); + return cli_err(cli, err, true); + } + + /* align to block boundary */ + aligned_ofs = offset & ~CHUNK_BLK_MASK; + remain = obj->size - offset; + aligned_rem = obj->size - aligned_ofs; + + /* obtain requested length; 0 == "until end of object" */ + length = le64_to_cpu(cli->creq.data_len); + if (length == 0 || length > remain) + length = remain; + if (length > max_getpart) + length = max_getpart; + + /* calculate length based on block size */ + aligned_len = length + (offset - aligned_ofs); + if (aligned_len & CHUNK_BLK_MASK) + aligned_len += (CHUNK_BLK_SZ - (aligned_len & CHUNK_BLK_MASK)); + if (aligned_len > aligned_rem) + aligned_len = aligned_rem; + + if (length) { + /* seek to offset */ + rc = fs_obj_seek(obj, aligned_ofs); + if (rc) { + err = che_InvalidSeek; + free(get_resp); + return cli_err(cli, err, true); + } + + /* allocate buffer to hold all get_part request data */ + mem = malloc(aligned_len); + if (!mem) { + free(get_resp); + return cli_err(cli, err, true); + } + + /* read requested data in its entirety */ + rrc = fs_obj_read(obj, mem, aligned_len); + if (rrc != aligned_len) { + free(mem); + free(get_resp); + return cli_err(cli, err, true); + } + } + + /* fill in response */ + if (length == remain) + get_resp->resp.flags |= CHF_GET_PART_LAST; + get_resp->resp.data_len = cpu_to_le64(length); + SHA1(mem, aligned_len, get_resp->resp.hash); + get_resp->mtime = cpu_to_le64(obj->mtime); + + /* write response header */ + rc = cli_writeq(cli, get_resp, sizeof(*get_resp), cli_cb_free, get_resp); + if (rc) { + free(mem); + free(get_resp); + return true; + } + + if (length) { + /* write response data */ + rc = cli_writeq(cli, mem + (offset - aligned_ofs), + length, cli_cb_free, mem); + if (rc) { + free(mem); + free(get_resp); /* FIXME: double-free due to + cli_wq success? */ + return true; + } + } + + cli_in_end(cli); + + return cli_write_start(cli); +} + static void worker_cp_thr(struct worker_info *wi) { void *buf = NULL; diff --git a/chunkd/server.c b/chunkd/server.c index d3b593f..b816416 100644 --- a/chunkd/server.c +++ b/chunkd/server.c @@ -149,6 +149,10 @@ static struct { [che_KeyExists] = { "che_KeyExists", 403, "Key already exists" }, + + [che_InvalidSeek] = + { "che_InvalidSeek", 404, + "Invalid seek" }, }; void applog(int prio, const char *fmt, ...) @@ -1063,6 +1067,7 @@ static const char *op2str(enum chunksrv_ops op) case CHO_CHECK_STATUS: return "CHO_CHECK_STATUS"; case CHO_START_TLS: return "CHO_START_TLS"; case CHO_CP: return "CHO_CP"; + case CHO_GET_PART: return "CHO_GET_PART"; default: return "BUG/UNKNOWN!"; @@ -1151,6 +1156,9 @@ static bool cli_evt_exec_req(struct client *cli, unsigned int events) case CHO_GET_META: rcb = object_get(cli, false); break; + case CHO_GET_PART: + rcb = object_get_part(cli); + break; case CHO_PUT: rcb = object_put(cli); break; @@ -1273,6 +1281,12 @@ static bool cli_evt_read_var(struct client *cli, unsigned int events) cli->req_used = 0; cli->state = evt_read_var; cli->second_var = true; + } else if (cli->creq.op == CHO_GET_PART && !cli->second_var) { + cli->req_ptr = &cli->creq_getpart; + cli->var_len = sizeof(cli->creq_getpart); + cli->req_used = 0; + cli->state = evt_read_var; + cli->second_var = true; } else cli->state = evt_exec_req; diff --git a/doc/chcli.8 b/doc/chcli.8 index 0de059b..3092fca 100644 --- a/doc/chcli.8 +++ b/doc/chcli.8 @@ -123,6 +123,10 @@ The following commands are available: .B GET key Retrieve the data object associated with the specified key. .TP +.B GETPART key offset length +Retrieve a subset of the data object associated with the specified key, +starting at given offset, for the given length. +.TP .B PUT key val Store data object associated with the specified key. .TP diff --git a/include/chunk_msg.h b/include/chunk_msg.h index 4c170e4..91ca1fb 100644 --- a/include/chunk_msg.h +++ b/include/chunk_msg.h @@ -31,6 +31,13 @@ enum { CHD_SIG_SZ = 64, }; +enum { + CHUNK_BLK_ORDER = 16, /* 64k blocks */ + CHUNK_BLK_SZ = 1ULL << CHUNK_BLK_ORDER, + CHUNK_BLK_MASK = CHUNK_BLK_SZ - 1ULL, + CHUNK_MAX_GETPART = 4, /* max GET_PART req: 256k */ +}; + enum chunksrv_ops { CHO_NOP = 0, /* No-op (ping server) */ CHO_GET = 1, /* GET object */ @@ -50,6 +57,7 @@ enum chunksrv_ops { CHO_START_TLS = 10, /* Encrypt all subsequent msgs */ CHO_CP = 11, /* local object copy (intra-table) */ + CHO_GET_PART = 12, /* GET subset of object */ }; enum chunk_errcode { @@ -64,12 +72,14 @@ enum chunk_errcode { che_InvalidTable = 8, che_Busy = 9, che_KeyExists = 10, + che_InvalidSeek = 11, }; enum chunk_flags { CHF_SYNC = (1 << 0), /* force write to media */ CHF_TBL_CREAT = (1 << 1), /* create tbl, if needed */ CHF_TBL_EXCL = (1 << 2), /* fail, if tbl exists */ + CHF_GET_PART_LAST = (1 << 3), /* true, if end-of-obj*/ }; struct chunksrv_req { @@ -84,10 +94,15 @@ struct chunksrv_req { /* variable-length key */ }; +struct chunksrv_req_getpart { + uint64_t offset; /* GET_PART offset */ +}; + struct chunksrv_resp { uint8_t magic[CHD_MAGIC_SZ]; /* CHUNKD_MAGIC */ uint8_t resp_code; /* chunk_errcode's */ - uint8_t rsv1[3]; + uint8_t flags; /* CHF_xxx */ + uint8_t rsv1[2]; uint32_t nonce; /* txn id, copied from request */ uint64_t data_len; /* len of addn'l data */ unsigned char hash[CHD_CSUM_SZ]; /* SHA1 checksum */ diff --git a/include/chunkc.h b/include/chunkc.h index 1fd7066..e3c2bb7 100644 --- a/include/chunkc.h +++ b/include/chunkc.h @@ -51,7 +51,8 @@ struct st_client { SSL_CTX *ssl_ctx; SSL *ssl; - char req_buf[sizeof(struct chunksrv_req) + CHD_KEY_SZ]; + char req_buf[sizeof(struct chunksrv_req) + CHD_KEY_SZ + + sizeof(struct chunksrv_req_getpart)]; }; extern void stc_free(struct st_client *stc); @@ -74,6 +75,19 @@ extern bool stc_get_start(struct st_client *stc, const void *key, size_t key_len,int *pfd, uint64_t *len); extern size_t stc_get_recv(struct st_client *stc, void *data, size_t len); +extern bool stc_get_part(struct st_client *stc, const void *key, size_t key_len, + uint64_t offset, uint64_t max_len, + size_t (*write_cb)(void *, size_t, size_t, void *), + void *user_data); +extern void *stc_get_part_inline(struct st_client *stc, + const void *key, size_t key_len, + uint64_t offset, uint64_t max_len, + size_t *len); +extern bool stc_get_part_start(struct st_client *stc, const void *key, + size_t key_len, + uint64_t offset, uint64_t max_len, + int *pfd, uint64_t *len); + extern bool stc_put(struct st_client *stc, const void *key, size_t key_len, size_t (*read_cb)(void *, size_t, size_t, void *), uint64_t len, void *user_data, uint32_t flags); @@ -113,6 +127,23 @@ static inline bool stc_get_startz(struct st_client *stc, const char *key, return stc_get_start(stc, key, strlen(key) + 1, pfd, len); } +static inline void *stc_get_part_inlinez(struct st_client *stc, + const char *key, + uint64_t offset, uint64_t max_len, + size_t *len) +{ + return stc_get_part_inline(stc, key, strlen(key) + 1, offset, max_len, + len); +} + +static inline bool stc_get_part_startz(struct st_client *stc, const char *key, + uint64_t offset, uint64_t max_len, + int *pfd, uint64_t *len) +{ + return stc_get_part_start(stc, key, strlen(key) + 1, + offset, max_len, pfd, len); +} + static inline bool stc_put_inlinez(struct st_client *stc, const char *key, void *data, uint64_t len, uint32_t flags) { diff --git a/lib/chunkdc.c b/lib/chunkdc.c index 13f5d36..6ee9fc4 100644 --- a/lib/chunkdc.c +++ b/lib/chunkdc.c @@ -521,6 +521,131 @@ size_t stc_get_recv(struct st_client *stc, void *data, size_t data_len) return done_cnt; } +/* + * Request the transfer in the chunk server. + */ +static bool stc_get_part_req(struct st_client *stc, const void *key, + uint64_t offset, uint64_t max_len, + size_t key_len, uint64_t *plen) +{ + struct chunksrv_resp_get get_resp; + struct chunksrv_req *req = (struct chunksrv_req *) stc->req_buf; + struct chunksrv_req_getpart gpr; + + if (stc->verbose) + fprintf(stderr, "libstc: GET(%u)\n", (unsigned int) key_len); + + if (!key_valid(key, key_len)) + return false; + + /* initialize request */ + req_init(stc, req); + req->op = CHO_GET_PART; + req->data_len = cpu_to_le64(max_len); + req_set_key(req, key, key_len); + + gpr.offset = cpu_to_le64(offset); + memcpy(stc->req_buf + sizeof(struct chunksrv_req) + key_len, + &gpr, sizeof(struct chunksrv_req_getpart)); + + /* sign request */ + chreq_sign(req, stc->key, req->sig); + + /* write request */ + if (!net_write(stc, req, req_len(req))) + return false; + + /* read response header */ + if (!resp_read(stc, &get_resp.resp)) + return false; + + /* check response code */ + if (get_resp.resp.resp_code != che_Success) { + if (stc->verbose) + fprintf(stderr, "GET resp code: %d\n", get_resp.resp.resp_code); + return false; + } + + /* read rest of response header */ + if (!net_read(stc, &get_resp.mtime, + sizeof(get_resp) - sizeof(get_resp.resp))) + return false; + + *plen = le64_to_cpu(get_resp.resp.data_len); + return true; +} + +bool stc_get_part(struct st_client *stc, const void *key, size_t key_len, + uint64_t offset, uint64_t max_len, + size_t (*write_cb)(void *, size_t, size_t, void *), + void *user_data) +{ + char netbuf[4096]; + uint64_t content_len; + + if (!stc_get_part_req(stc, key, key_len, offset, max_len, &content_len)) + return false; + + /* read response data */ + while (content_len) { + size_t xfer_len; + + xfer_len = MIN(content_len, sizeof(netbuf)); + if (!net_read(stc, netbuf, xfer_len)) + return false; + + write_cb(netbuf, xfer_len, 1, user_data); + content_len -= xfer_len; + } + + return true; +} + +/* + * Set stc to be used for streaming transfers. + * In chunkd protocol, this delivers the size of the presumed object, + * and clients are expected to fetch exactly psize amount. + */ +bool stc_get_part_start(struct st_client *stc, const void *key, size_t key_len, + uint64_t offset, uint64_t max_len, + int *pfd, uint64_t *psize) +{ + + if (!stc_get_part_req(stc, key, key_len, offset, max_len, psize)) + return false; + + *pfd = stc->fd; + return true; +} + +void *stc_get_part_inline(struct st_client *stc, const void *key, + size_t key_len, uint64_t offset, uint64_t max_len, + size_t *len) +{ + bool rcb; + void *mem; + GByteArray *all_data; + + all_data = g_byte_array_new(); + if (!all_data) + return NULL; + + rcb = stc_get_part(stc, key, key_len, offset, max_len, + all_data_cb, all_data); + if (!rcb) { + g_byte_array_free(all_data, TRUE); + return NULL; + } + + if (len) + *len = all_data->len; + + mem = all_data->data; + + g_byte_array_free(all_data, FALSE); + return mem; +} + bool stc_table_open(struct st_client *stc, const void *key, size_t key_len, uint32_t flags) { diff --git a/lib/chunksrv.c b/lib/chunksrv.c index a9bfc46..6bda870 100644 --- a/lib/chunksrv.c +++ b/lib/chunksrv.c @@ -32,6 +32,9 @@ size_t req_len(const struct chunksrv_req *req) len = sizeof(struct chunksrv_req) + GUINT16_FROM_LE(req->key_len); + if (req->op == CHO_GET_PART) + len += sizeof(struct chunksrv_req_getpart); + return len; } diff --git a/tools/chcli.c b/tools/chcli.c index 160af18..c310a06 100644 --- a/tools/chcli.c +++ b/tools/chcli.c @@ -94,6 +94,7 @@ enum chcli_cmd { CHC_CHECKSTATUS, CHC_CHECKSTART, CHC_CP, + CHC_GET_PART, }; struct chcli_host { @@ -187,6 +188,7 @@ static void show_cmds(void) "Supported chcli commands:\n" "\n" "GET key Retrieve key, send to output (def: stdout)\n" +"GETPART key offset length Retrieve key subset, send to output (def: stdout)\n" "PUT key val Store key\n" "DEL key Delete key\n" "PING Ping server\n" @@ -341,6 +343,8 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state) cmd_mode = CHC_CHECKSTART; else if (!strcasecmp(arg, "cp")) cmd_mode = CHC_CP; + else if (!strcasecmp(arg, "getpart")) + cmd_mode = CHC_GET_PART; else argp_usage(state); /* invalid cmd */ break; @@ -714,6 +718,102 @@ static int cmd_get(void) return 0; } +static int cmd_get_part(void) +{ + struct st_client *stc; + int rfd = -1, wfd; + unsigned long long pofs = 0, plen = 0; + uint64_t ofs, len; + + /* if key data not supplied via file, absorb first cmd arg */ + if (!key_data) { + if (!n_cmd_args) { + fprintf(stderr, "GETPART requires key arg\n"); + return 1; + } + + key_data = cmd_args[0]; + key_data_len = strlen(cmd_args[0]) + 1; + + cmd_args++; + n_cmd_args--; + } + + /* parse offset, length required args */ + if (n_cmd_args != 2) { + fprintf(stderr, "GETPART requires offset, length args\n"); + return 1; + } + if ((sscanf(cmd_args[0], "%llu", &pofs) != 1) || + (sscanf(cmd_args[1], "%llu", &plen) != 1)) { + fprintf(stderr, "invalid GETPART offset and/or length arg\n"); + return 1; + } + ofs = pofs; + len = plen; + + if (key_data_len < 1 || key_data_len > CHD_KEY_SZ) { + fprintf(stderr, "GET: invalid key size %u\n", + (unsigned int) key_data_len); + return 1; + } + + stc = chcli_stc_new(); + if (!stc) + return 1; + + if (!stc_get_part_start(stc, key_data, key_data_len, + ofs, len, &rfd, &get_len)) { + fprintf(stderr, "GET initiation failed\n"); + stc_free(stc); + return 1; + } + + if (!output_fn || !strcmp(output_fn, "-")) + wfd = STDOUT_FILENO; + else { + wfd = open(output_fn, O_CREAT | O_TRUNC | O_WRONLY, 0666); + if (wfd < 0) { + fprintf(stderr, "GET output file %s open failed: %s\n", + output_fn, + strerror(errno)); + stc_free(stc); + return 1; + } + } + + while (get_len > 0) { + size_t need_len; + ssize_t rc; + + need_len = MIN(GET_BUFSZ, get_len); + + if (!recv_buf(stc, rfd, get_buf, need_len)) { + fprintf(stderr, "GET buffer failed\n"); + stc_free(stc); + return 1; + } + + rc = write(wfd, get_buf, need_len); + if (rc < 0) { + fprintf(stderr, "GET write to output failed: %s\n", + strerror(errno)); + unlink(output_fn); + stc_free(stc); + return 1; + } + + get_len -= rc; + } + + if (wfd != STDOUT_FILENO) + close(wfd); + + stc_free(stc); + + return 0; +} + static int cmd_check_start(void) { struct st_client *stc; @@ -830,6 +930,8 @@ int main (int argc, char *argv[]) return 1; case CHC_GET: return cmd_get(); + case CHC_GET_PART: + return cmd_get_part(); case CHC_PUT: return cmd_put(); case CHC_DEL: -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html