Add ops table for storage back-ends, so that we can have several. Signed-off-by: Pete Zaitcev <zaitcev@xxxxxxxxxx> --- server/storage.c | 85 +++++++++++++++++++++--------------- server/tabled.h | 106 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 138 insertions(+), 53 deletions(-) commit d2362897aa9efde5a30803a621a7d60e5bf5fd24 Author: Pete Zaitcev <zaitcev@xxxxxxxxx> Date: Thu Nov 11 16:18:53 2010 -0700 Add ops. diff --git a/server/storage.c b/server/storage.c index 5c6e3a7..1d3d6a1 100644 --- a/server/storage.c +++ b/server/storage.c @@ -107,8 +107,8 @@ static void stor_write_event(int fd, short events, void *userdata) /* * Open *cep using stn, set up chunk session if needed. */ -int stor_open(struct open_chunk *cep, struct storage_node *stn, - struct event_base *ev_base) +static int chunk_open(struct open_chunk *cep, struct storage_node *stn, + struct event_base *ev_base) { int rc; @@ -126,8 +126,9 @@ int stor_open(struct open_chunk *cep, struct storage_node *stn, return 0; } -int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *), - uint64_t key, uint64_t size) +static int chunk_put_start(struct open_chunk *cep, + void (*cb)(struct open_chunk *), + uint64_t key, uint64_t size) { char stckey[STOR_KEY_SLEN+1]; @@ -161,8 +162,9 @@ int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *), return 0; } -int stor_open_read(struct open_chunk *cep, void (*cb)(struct open_chunk *), - uint64_t key, uint64_t *psize) +static int chunk_open_read(struct open_chunk *cep, + void (*cb)(struct open_chunk *), + uint64_t key, uint64_t *psize) { char stckey[STOR_KEY_SLEN+1]; uint64_t size; @@ -203,7 +205,7 @@ int stor_open_read(struct open_chunk *cep, void (*cb)(struct open_chunk *), /* * FIXME We don't cache sessions while tabled is being debugged. Maybe later. */ -void stor_close(struct open_chunk *cep) +static void chunk_close(struct open_chunk *cep) { if (cep->stc) { stor_node_put(cep->node); @@ -227,14 +229,11 @@ void stor_close(struct open_chunk *cep) } /* - * The stor_abort has an annoying convention of being possibly called - * on an unopened open_chunk. We deal with that. - * * There's no "abort" call for an existing transfer. We could complete * the transfer instead of trashing the whole session, but that may involve * sending or receiving gigabytes. So we just cycle the session. */ -void stor_abort(struct open_chunk *cep) +static void chunk_abort(struct open_chunk *cep) { char stckey[STOR_KEY_SLEN+1]; int rc; @@ -284,7 +283,7 @@ void stor_abort(struct open_chunk *cep) cep->key = 0; } -ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len) +static ssize_t chunk_put_buf(struct open_chunk *cep, void *data, size_t len) { int rc; @@ -307,7 +306,7 @@ ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len) return rc; } -bool stor_put_end(struct open_chunk *cep) +static bool chunk_put_end(struct open_chunk *cep) { if (!cep->stc) return true; @@ -323,7 +322,7 @@ bool stor_put_end(struct open_chunk *cep) * This saves the object.c from the trouble of arming and disarming it, * at the cost of rather subtle semantics. */ -ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t req_len) +static ssize_t chunk_get_buf(struct open_chunk *cep, void *data, size_t req_len) { size_t xfer_len; ssize_t ret; @@ -358,7 +357,7 @@ ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t req_len) return ret; } -int stor_obj_del(struct storage_node *stn, uint64_t key) +static int chunk_obj_del(struct storage_node *stn, uint64_t key) { struct st_client *stc; char stckey[STOR_KEY_SLEN+1]; @@ -379,7 +378,7 @@ int stor_obj_del(struct storage_node *stn, uint64_t key) /* * XXX WTF?! This accidentially tests a node instead of object! FIXME */ -bool stor_obj_test(struct open_chunk *cep, uint64_t key) +static bool chunk_obj_test(struct open_chunk *cep, uint64_t key) { struct st_keylist *klist; @@ -393,6 +392,27 @@ bool stor_obj_test(struct open_chunk *cep, uint64_t key) return true; } +/* Return 0 if the node checks out ok */ +static int chunk_node_check(struct storage_node *stn) +{ + struct st_client *stc; + int rc; + + if (!stn->hostname) + return -1; + + rc = stor_new_stc(stn, &stc); + if (rc < 0) { + applog(LOG_INFO, + "Error %d connecting to chunkd on host %s", + rc, stn->hostname); + return -1; + } + + stc_free(stc); + return 0; +} + static struct storage_node *_stor_node_by_nid(uint32_t nid) { struct storage_node *sn; @@ -416,6 +436,20 @@ struct storage_node *stor_node_by_nid(uint32_t nid) return sn; } +struct st_node_ops stor_ops_chunk = { + .open = chunk_open, + .open_read = chunk_open_read, + .close = chunk_close, + .abort = chunk_abort, + .put_start = chunk_put_start, + .put_buf = chunk_put_buf, + .put_end = chunk_put_end, + .get_buf = chunk_get_buf, + .obj_del = chunk_obj_del, + .obj_test = chunk_obj_test, + .node_check = chunk_node_check, +}; + static int stor_add_node_addr(struct storage_node *sn, const char *hostname, const char *portstr) { @@ -475,6 +509,7 @@ void stor_add_node(uint32_t nid, const char *hostname, const char *portstr, } memset(sn, 0, sizeof(struct storage_node)); sn->id = nid; + sn->ops = &stor_ops_chunk; if ((sn->hostname = strdup(hostname)) == NULL) { applog(LOG_WARNING, "No core"); @@ -498,24 +533,6 @@ void stor_add_node(uint32_t nid, const char *hostname, const char *portstr, g_mutex_unlock(tabled_srv.bigmutex); } -/* Return 0 if the node checks out ok */ -int stor_node_check(struct storage_node *stn) -{ - struct st_client *stc; - int rc; - - rc = stor_new_stc(stn, &stc); - if (rc < 0) { - applog(LOG_INFO, - "Error %d connecting to chunkd on host %s", - rc, stn->hostname); - return -1; - } - - stc_free(stc); - return 0; -} - void stor_stats() { struct storage_node *sn; diff --git a/server/tabled.h b/server/tabled.h index 00d8f84..6ab32a3 100644 --- a/server/tabled.h +++ b/server/tabled.h @@ -90,17 +90,40 @@ struct geo { char *rack; }; +struct storage_node; +struct open_chunk; + +struct st_node_ops { + int (*open)(struct open_chunk *cep, struct storage_node *stn, + struct event_base *ev_base); + int (*open_read)(struct open_chunk *cep, + void (*cb)(struct open_chunk *), + uint64_t key, uint64_t *psz); + void (*close)(struct open_chunk *cep); + void (*abort)(struct open_chunk *cep); + int (*put_start)(struct open_chunk *cep, + void (*cb)(struct open_chunk *), + uint64_t key, uint64_t size); + ssize_t (*put_buf)(struct open_chunk *cep, void *data, size_t len); + bool (*put_end)(struct open_chunk *cep); + ssize_t (*get_buf)(struct open_chunk *cep, void *data, size_t len); + int (*obj_del)(struct storage_node *stn, uint64_t key); + bool (*obj_test)(struct open_chunk *cep, uint64_t key); + int (*node_check)(struct storage_node *stn); +}; + struct storage_node { struct list_head all_link; uint32_t id; bool up; time_t last_up; + struct st_node_ops *ops; + + int ref; /* number of open_chunk or other */ unsigned alen; struct sockaddr_in6 addr; char *hostname; - - int ref; /* number of open_chunk or other */ }; typedef bool (*cli_evt_func)(struct client *, unsigned int, @@ -119,6 +142,8 @@ struct open_chunk { uint64_t size; uint64_t done; + struct st_node_ops *ops; + /* chunk */ int wfd; bool w_armed; @@ -404,25 +429,68 @@ extern void read_config(void); /* storage.c */ extern struct storage_node *stor_node_get(struct storage_node *stn); extern void stor_node_put(struct storage_node *stn); -extern int stor_open(struct open_chunk *cep, struct storage_node *stn, - struct event_base *ev_base); -extern int stor_open_read(struct open_chunk *cep, - void (*cb)(struct open_chunk *), - uint64_t key, uint64_t *psz); -extern void stor_close(struct open_chunk *cep); -extern void stor_abort(struct open_chunk *cep); -extern int stor_put_start(struct open_chunk *cep, +static inline int stor_open(struct open_chunk *cep, struct storage_node *stn, + struct event_base *ev_base) +{ + cep->ops = stn->ops; + return cep->ops->open(cep, stn, ev_base); +} +static inline int stor_open_read(struct open_chunk *cep, + void (*cb)(struct open_chunk *), + uint64_t key, uint64_t *psz) +{ + return cep->ops->open_read(cep, cb, key, psz); +} +/* + * The stor_abort and stor_close have an annoying convention of being possibly + * called on an unopened open_chunk. We deal with that. + */ +static inline void stor_close(struct open_chunk *cep) +{ + if (cep->ops) + cep->ops->close(cep); +} +static inline void stor_abort(struct open_chunk *cep) +{ + if (cep->ops) + cep->ops->abort(cep); +} +static inline int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *), - uint64_t key, uint64_t size); -extern ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len); -extern bool stor_put_end(struct open_chunk *cep); -extern ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t len); -extern int stor_obj_del(struct storage_node *stn, uint64_t key); -extern bool stor_obj_test(struct open_chunk *cep, uint64_t key); + uint64_t key, uint64_t size) +{ + return cep->ops->put_start(cep, cb, key, size); +} +static inline ssize_t stor_put_buf(struct open_chunk *cep, void *data, + size_t len) +{ + return cep->ops->put_buf(cep, data, len); +} +static inline bool stor_put_end(struct open_chunk *cep) +{ + return cep->ops->put_end(cep); +} +static inline ssize_t stor_get_buf(struct open_chunk *cep, void *data, + size_t len) +{ + return cep->ops->get_buf(cep, data, len); +} +static inline int stor_obj_del(struct storage_node *stn, uint64_t key) +{ + return stn->ops->obj_del(stn, key); +} +static inline bool stor_obj_test(struct open_chunk *cep, uint64_t key) +{ + return cep->ops->obj_test(cep, key); +} +static inline int stor_node_check(struct storage_node *stn) +{ + return stn->ops->node_check(stn); +} extern struct storage_node *stor_node_by_nid(uint32_t nid); -extern void stor_add_node(uint32_t nid, const char *hostname, - const char *portstr, struct geo *locp); -extern int stor_node_check(struct storage_node *stn); +extern void stor_add_node(uint32_t nid, + const char *hostname, const char *portstr, + struct geo *locp); extern void stor_stats(void); extern bool stor_status(struct client *cli, GList *content); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html