Attached is the XDR working tree, following [master 2a73620] libcldc: cosmetic changes, preparing for upcoming XDR merge
diff -X /garz/tmp/dontdiff -urNp cld/.gitignore cld.rpcgen/.gitignore --- cld/.gitignore 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/.gitignore 2010-01-22 18:15:07.000000000 -0500 @@ -32,5 +32,9 @@ cld-config.h* cscope.* ncscope.* +# XDR files +*_rpc.h +*_rpc_xdr.c + # ignore Doxygen output directory gendoc diff -X /garz/tmp/dontdiff -urNp cld/include/cldc.h cld.rpcgen/include/cldc.h --- cld/include/cldc.h 2010-02-04 19:18:46.000000000 -0500 +++ cld.rpcgen/include/cldc.h 2010-02-04 19:07:36.000000000 -0500 @@ -22,7 +22,7 @@ #include <sys/types.h> #include <stdbool.h> #include <glib.h> -#include <cld_msg.h> +#include <cld_msg_rpc.h> #include <cld_common.h> #include <hail_log.h> @@ -35,23 +35,17 @@ struct cldc_call_opts { void *private; /* private; lib-owned */ - union { - struct { - const char *buf; - unsigned int size; - char inode_name[CLD_INODE_NAME_MAX + 1]; - } get; - } u; struct cld_msg_get_resp resp; }; struct cldc_pkt_info { int pkt_len; + int hdr_len; int retries; + char user[CLD_MAX_USERNAME]; /* must be at end of struct */ - struct cld_packet pkt; - uint8_t data[0]; + char data[0]; }; /** an outgoing message, from client to server */ @@ -70,18 +64,15 @@ struct cldc_msg { time_t expire_time; - int data_len; int n_pkts; - uint8_t *data; - /* must be at end of struct */ struct cldc_pkt_info *pkt_info[0]; }; /** an open file handle associated with a session */ struct cldc_fh { - uint64_t fh_le; /* fh id, LE */ + uint64_t fh; struct cldc_session *sess; bool valid; }; @@ -128,8 +119,11 @@ struct cldc_session { bool confirmed; + enum cld_msg_op msg_buf_op; unsigned int msg_buf_len; char msg_buf[CLD_MAX_MSG_SZ]; + char payload[CLD_MAX_PAYLOAD_SZ]; + char inode_name_temp[CLD_INODE_NAME_MAX]; }; /** Information for a single CLD server host */ @@ -209,7 +203,7 @@ extern void cldc_dirent_cur_init(struct extern void cldc_dirent_cur_fini(struct cld_dirent_cur *dc); extern char *cldc_dirent_name(struct cld_dirent_cur *dc); extern void cldc_call_opts_get_data(const struct cldc_call_opts *copts, - const char **data, size_t *data_len); + char **data, size_t *data_len); /* cldc-udp */ extern void cldc_udp_free(struct cldc_udp *udp); diff -X /garz/tmp/dontdiff -urNp cld/include/cld_common.h cld.rpcgen/include/cld_common.h --- cld/include/cld_common.h 2010-02-04 19:09:35.000000000 -0500 +++ cld.rpcgen/include/cld_common.h 2010-02-04 19:09:56.000000000 -0500 @@ -24,6 +24,7 @@ #include <stdbool.h> #include <string.h> #include <time.h> +#include <cld_msg_rpc.h> #define CLD_ALIGN8(n) ((8 - ((n) & 7)) & 7) diff -X /garz/tmp/dontdiff -urNp cld/include/cld_msg.h cld.rpcgen/include/cld_msg.h --- cld/include/cld_msg.h 2010-02-04 19:08:09.000000000 -0500 +++ cld.rpcgen/include/cld_msg.h 1969-12-31 19:00:00.000000000 -0500 @@ -1,245 +0,0 @@ -#ifndef __CLD_MSG_H__ -#define __CLD_MSG_H__ - -/* - * Copyright 2009 Red Hat, Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; see the file COPYING. If not, write to - * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. - * - */ - -#include <stdint.h> - -#define CLD_PKT_MAGIC "CLDc1pkt" -#define CLD_MSG_MAGIC "CLDc1msg" - -enum { - CLD_MAGIC_SZ = 8, /**< length of magic number */ - CLD_SID_SZ = 8, /**< length of session id */ - - CLD_INODE_NAME_MAX = 256, /**< max total pathname len */ - - CLD_MAX_USERNAME = 32, /**< includes req. nul */ - CLD_MAX_SECRET_KEY = 128, /**< includes req. nul */ - - CLD_MAX_PKT_MSG_SZ = 1024, - CLD_MAX_PAYLOAD_SZ = 131072, /**< maximum size of data that users - can GET or PUT */ - CLD_MAX_MSG_SZ = 196608, /**< maximum total - msg size, including all packets */ -}; - -/** available RPC operations */ -enum cld_msg_op { - /* client -> server */ - CMO_NOP = 0, /**< no op */ - CMO_NEW_SESS = 1, /**< new session */ - CMO_OPEN = 2, /**< open file */ - CMO_GET_META = 3, /**< get metadata */ - CMO_GET = 4, /**< get metadata + data */ - CMO_PUT = 6, /**< put data */ - CMO_CLOSE = 7, /**< close file */ - CMO_DEL = 8, /**< delete file */ - CMO_LOCK = 9, /**< lock */ - CMO_UNLOCK = 10, /**< unlock */ - CMO_TRYLOCK = 11, /**< trylock */ - CMO_ACK = 12, /**< ack of seqid rx'd */ - CMO_END_SESS = 13, /**< end session */ - - /* server -> client */ - CMO_PING = 30, /**< server to client ping */ - CMO_NOT_MASTER = 31, /**< I am not the master! */ - CMO_EVENT = 32, /**< server->cli async event */ - CMO_ACK_FRAG = 33, /**< ack partial msg */ -}; - -/** CLD error codes */ -enum cle_err_codes { - CLE_OK = 0, /**< success / no error */ - CLE_SESS_EXISTS = 1, /**< session exists */ - CLE_SESS_INVAL = 2, /**< session doesn't exist */ - CLE_DB_ERR = 3, /**< db error */ - CLE_BAD_PKT = 4, /**< invalid/corrupted packet */ - CLE_INODE_INVAL = 5, /**< inode doesn't exist */ - CLE_NAME_INVAL = 6, /**< inode name invalid */ - CLE_OOM = 7, /**< server out of memory */ - CLE_FH_INVAL = 8, /**< file handle invalid */ - CLE_DATA_INVAL = 9, /**< invalid data pkt */ - CLE_LOCK_INVAL = 10, /**< invalid lock */ - CLE_LOCK_CONFLICT = 11, /**< conflicting lock held */ - CLE_LOCK_PENDING = 12, /**< lock waiting to be acq. */ - CLE_MODE_INVAL = 13, /**< op incompat. w/ file mode */ - CLE_INODE_EXISTS = 14, /**< inode exists */ - CLE_DIR_NOTEMPTY = 15, /**< dir not empty */ - CLE_INTERNAL_ERR = 16, /**< nonspecific internal err */ - CLE_TIMEOUT = 17, /**< session timed out */ - CLE_SIG_INVAL = 18, /**< HMAC sig bad / auth failed */ -}; - -/** availble OPEN mode flags */ -enum cld_open_modes { - COM_READ = (1 << 0), /**< read */ - COM_WRITE = (1 << 1), /**< write */ - COM_LOCK = (1 << 2), /**< lock */ - COM_ACL = (1 << 3), /**< ACL update */ - COM_CREATE = (1 << 4), /**< create file, if not exist */ - COM_EXCL = (1 << 5), /**< fail create if file exists */ - COM_DIRECTORY = (1 << 6), /**< operate on a directory */ -}; - -/** potential events client may receive */ -enum cld_events { - CE_UPDATED = (1 << 0), /**< contents updated */ - CE_DELETED = (1 << 1), /**< inode deleted */ - CE_LOCKED = (1 << 2), /**< lock acquired */ - CE_MASTER_FAILOVER = (1 << 3), /**< master failover */ - CE_SESS_FAILED = (1 << 4), -}; - -/** LOCK flags */ -enum cld_lock_flags { - CLF_SHARED = (1 << 0), /**< a shared (read) lock */ -}; - -/** CLD packet flags */ -enum cld_packet_flags { - CPF_FIRST = (1 << 0), /**< first fragment */ - CPF_LAST = (1 << 1), /**< last fragment */ -}; - -/** header for each packet */ -struct cld_packet { - uint8_t magic[CLD_MAGIC_SZ]; /**< magic number; constant */ - uint64_t seqid; /**< sequence id */ - uint8_t sid[CLD_SID_SZ]; /**< client id */ - uint32_t flags; /**< CPF_xxx flags */ - uint8_t res[4]; - char user[CLD_MAX_USERNAME]; /**< authenticated user */ -}; - -/** header for each message */ -struct cld_msg_hdr { - uint8_t magic[CLD_MAGIC_SZ]; /**< magic number; constant */ - uint64_t xid; /**< opaque message id */ - uint8_t op; /**< operation code */ - uint8_t res1[7]; -}; - -/** standard response for each message */ -struct cld_msg_resp { - struct cld_msg_hdr hdr; - - uint32_t code; /**< error code, CLE_xxx */ - uint32_t rsv; /**< reserved */ - uint64_t xid_in; /**< C->S xid */ -}; - -/** ACK-FRAG message */ -struct cld_msg_ack_frag { - struct cld_msg_hdr hdr; - - uint64_t seqid; /**< sequence id to ack */ -}; - -/** OPEN message */ -struct cld_msg_open { - struct cld_msg_hdr hdr; - - uint32_t mode; /**< open mode, COM_xxx */ - uint32_t events; /**< events mask, CE_xxx */ - uint16_t name_len; /**< length of file name */ - uint8_t res[6]; - /* inode name */ -}; - -/** OPEN message response */ -struct cld_msg_open_resp { - struct cld_msg_resp resp; - - uint64_t fh; /**< handle opened */ -}; - -/** GET message */ -struct cld_msg_get { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ -}; - -/** GET message response */ -struct cld_msg_get_resp { - struct cld_msg_resp resp; - - uint64_t inum; /**< unique inode number */ - uint32_t ino_len; /**< inode name len */ - uint32_t size; /**< data size */ - uint64_t version; /**< inode version */ - uint64_t time_create; /**< creation time */ - uint64_t time_modify; /**< last modification time */ - uint32_t flags; /**< inode flags; CIFL_xxx */ - uint8_t res[4]; - /* inode name */ -}; - -/** PUT message */ -struct cld_msg_put { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ - uint32_t data_size; /**< total size of data */ - uint8_t res[4]; -}; - -/** CLOSE message */ -struct cld_msg_close { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ -}; - -/** DEL message */ -struct cld_msg_del { - struct cld_msg_hdr hdr; - - uint16_t name_len; /**< length of file name */ - uint8_t res[6]; - /* inode name */ -}; - -/** UNLOCK message */ -struct cld_msg_unlock { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ -}; - -/** LOCK message */ -struct cld_msg_lock { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ - uint32_t flags; /**< CLF_xxx */ - uint8_t res[4]; -}; - -/** Server-to-client EVENT message */ -struct cld_msg_event { - struct cld_msg_hdr hdr; - - uint64_t fh; /**< open file handle */ - uint32_t events; /**< CE_xxx */ - uint8_t res[4]; -}; - -#endif /* __CLD_MSG_H__ */ diff -X /garz/tmp/dontdiff -urNp cld/include/cld_pkt.h cld.rpcgen/include/cld_pkt.h --- cld/include/cld_pkt.h 1969-12-31 19:00:00.000000000 -0500 +++ cld.rpcgen/include/cld_pkt.h 2010-02-03 00:12:58.000000000 -0500 @@ -0,0 +1,56 @@ +#ifndef __CLD_PKT_H__ +#define __CLD_PKT_H__ + +/* + * Copyright 2010, Colin McCabe + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include <openssl/sha.h> +#include <cld_msg_rpc.h> +#include <stdbool.h> + +/* @file cld_pkt.h + * + * This file has some definitions and helper functions pertaining to the CLD + * network protocol. Unlike cld_msg.x, it's not an XDR file. + */ + +/* Returns a string representation of a packet header + * + * @param scratch (out param) buffer of length + * PKT_HDR_TO_STR_SCRATCH_LEN + * @param pkt_hdr packet header + * @param pkt_len length of packet + * + * @return pointer to 'scratch' + */ +const char *__cld_pkt_hdr_to_str(char *scratch, + const char *pkt_hdr, size_t pkt_len); + +void __cld_dump_buf(const void *buf, size_t len); + +/** Footer that appears at the end of each packet */ +struct __attribute__((packed)) cld_pkt_ftr { + uint64_t seqid; /**< packet sequence ID */ + char sha[SHA_DIGEST_LENGTH]; /**< packet signature */ +}; + +/** Length of the packet footer. This size is fixed */ +#define CLD_PKT_FTR_LEN sizeof(struct cld_pkt_ftr) + +#define PKT_HDR_TO_STR_SCRATCH_LEN 128 + +#endif diff -X /garz/tmp/dontdiff -urNp cld/include/Makefile.am cld.rpcgen/include/Makefile.am --- cld/include/Makefile.am 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/include/Makefile.am 2010-02-02 22:51:08.000000000 -0500 @@ -1,5 +1,5 @@ EXTRA_DIST = cld-private.h -include_HEADERS = cldc.h hail_log.h cld_common.h cld_msg.h +include_HEADERS = cldc.h hail_log.h cld_common.h cld_pkt.h diff -X /garz/tmp/dontdiff -urNp cld/lib/cldc.c cld.rpcgen/lib/cldc.c --- cld/lib/cldc.c 2010-02-04 19:46:11.000000000 -0500 +++ cld.rpcgen/lib/cldc.c 2010-02-04 19:35:13.000000000 -0500 @@ -35,6 +35,8 @@ #include <glib.h> #include <cld-private.h> #include <cldc.h> +#include <cld_pkt.h> +#include <cld_msg_rpc.h> #include <syslog.h> enum { @@ -47,12 +49,7 @@ enum { static const char *user_key(struct cldc_session *sess, const char *user); static int sess_send_pkt(struct cldc_session *sess, - const struct cld_packet *pkt, size_t pkt_len); - -static const struct cld_msg_hdr def_msg_ack = { - .magic = CLD_MSG_MAGIC, - .op = CMO_ACK, -}; + const void *pkt, size_t pkt_len); #ifndef HAVE_STRNLEN static size_t strnlen(const char *s, size_t maxlen) @@ -81,10 +78,10 @@ static size_t strnlen(const char *s, siz #endif void cldc_call_opts_get_data(const struct cldc_call_opts *copts, - const char **data, size_t *data_len) + char **data, size_t *data_len) { - *data = copts->u.get.buf; - *data_len = copts->u.get.size; + *data = copts->resp.data.data_val; + *data_len = copts->resp.data.data_len; } static void cldc_errlog(int prio, const char *fmt, ...) @@ -100,49 +97,72 @@ static void cldc_errlog(int prio, const static int ack_seqid(struct cldc_session *sess, uint64_t seqid_le) { - struct cld_packet *pkt; - struct cld_msg_hdr *resp; - size_t pkt_len; + XDR xdrs; + size_t hdr_len, total_len; + char buf[CLD_MAX_PKT_MSG_SZ]; + struct cld_pkt_hdr pkt; + struct cld_pkt_ftr *foot; int ret; + static const char * const magic = CLD_PKT_MAGIC; const char *secret_key; - pkt_len = sizeof(*pkt) + sizeof(*resp) + SHA_DIGEST_LENGTH; - pkt = alloca(pkt_len); - memset(pkt, 0, pkt_len); - /* Construct ACK packet */ - memcpy(pkt->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ); - pkt->seqid = seqid_le; - memcpy(pkt->sid, sess->sid, CLD_SID_SZ); - pkt->flags = cpu_to_le32(CPF_FIRST | CPF_LAST); - strncpy(pkt->user, sess->user, CLD_MAX_USERNAME - 1); + memset(&pkt, 0, sizeof(struct cld_pkt_hdr)); + memcpy(&pkt.magic, magic, sizeof(pkt.magic)); + memcpy(&pkt.sid, sess->sid, CLD_SID_SZ); + pkt.user = sess->user; + pkt.mi.order = CLD_PKT_ORD_FIRST_LAST; + pkt.mi.cld_pkt_msg_info_u.mi.xid = 0; + pkt.mi.cld_pkt_msg_info_u.mi.op = CMO_ACK; + + /* Serialize packet */ + xdrmem_create(&xdrs, (char *)buf, + sizeof(buf) - CLD_PKT_FTR_LEN, XDR_ENCODE); + if (!xdr_cld_pkt_hdr(&xdrs, &pkt)) { + HAIL_DEBUG(&sess->log, "%s: failed to encode header " + "for ack_seqid %llu", + __func__, + (unsigned long long) seqid_le); + xdr_destroy(&xdrs); + return -1009; + } - resp = (struct cld_msg_hdr *) (pkt + 1); - memcpy(resp, &def_msg_ack, sizeof(*resp)); + /* Fill in footer */ + hdr_len = xdr_getpos(&xdrs); + total_len = hdr_len + CLD_PKT_FTR_LEN; + foot = (struct cld_pkt_ftr *)(buf + hdr_len); + foot->seqid = seqid_le; + xdr_destroy(&xdrs); secret_key = user_key(sess, sess->user); ret = __cld_authsign(&sess->log, secret_key, - pkt, pkt_len - SHA_DIGEST_LENGTH, - (uint8_t *)pkt + pkt_len - SHA_DIGEST_LENGTH); + buf, total_len - SHA_DIGEST_LENGTH, foot->sha); if (ret) { HAIL_ERR(&sess->log, "%s: authsign failed: %d", __func__, ret); return ret; } - return sess_send_pkt(sess, pkt, pkt_len); + return sess_send_pkt(sess, buf, total_len); } static int rxmsg_generic(struct cldc_session *sess, - const struct cld_packet *pkt, - const void *msgbuf, size_t buflen) + const struct cld_pkt_hdr *pkt, + const struct cld_pkt_ftr *foot) { - const struct cld_msg_resp *resp = msgbuf; + XDR xdrs; + struct cld_msg_generic_resp resp; struct cldc_msg *req = NULL; GList *tmp; - if (buflen < sizeof(*resp)) + xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE); + if (!xdr_cld_msg_generic_resp(&xdrs, &resp)) { + HAIL_DEBUG(&sess->log, "%s: failed to decode " + "cld_msg_generic_resp", __func__); + xdr_destroy(&xdrs); return -1008; + } + xdr_destroy(&xdrs); /* Find out which outbound message this was a response to */ tmp = sess->out_msg; @@ -152,10 +172,10 @@ static int rxmsg_generic(struct cldc_ses HAIL_DEBUG(&sess->log, "%s: comparing req->xid (%llu) " "with resp.xid_in (%llu)", __func__, - (unsigned long long) le64_to_cpu(req->xid), - (unsigned long long) le64_to_cpu(resp->xid_in)); + (unsigned long long) req->xid, + (unsigned long long) resp.xid_in); - if (req->xid == resp->xid_in) + if (req->xid == resp.xid_in) break; tmp = tmp->next; } @@ -163,7 +183,7 @@ static int rxmsg_generic(struct cldc_ses HAIL_DEBUG(&sess->log, "%s: no match found with " "xid_in %llu", __func__, - (unsigned long long) le64_to_cpu(resp->xid_in)); + (unsigned long long) resp.xid_in); return -1005; } @@ -176,28 +196,37 @@ static int rxmsg_generic(struct cldc_ses req->done = true; if (req->cb) { - ssize_t rc = req->cb(req, msgbuf, buflen, resp->code); + ssize_t rc = req->cb(req, sess->msg_buf, + sess->msg_buf_len, resp.code); if (rc < 0) return rc; } } - return ack_seqid(sess, pkt->seqid); + return ack_seqid(sess, foot->seqid); } static int rxmsg_ack_frag(struct cldc_session *sess, - const struct cld_packet *pkt, - const void *msgbuf, size_t buflen) + const struct cld_pkt_hdr *pkt, + const struct cld_pkt_ftr *foot) { - const struct cld_msg_ack_frag *ack_msg = msgbuf; + XDR xdrs; + struct cld_msg_ack_frag ack_msg; GList *tmp; - if (buflen < sizeof(*ack_msg)) + xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE); + memset(&ack_msg, 0, sizeof(ack_msg)); + if (!xdr_cld_msg_ack_frag(&xdrs, &ack_msg)) { + HAIL_INFO(&sess->log, "%s: failed to decode ack_msg", + __func__); + xdr_destroy(&xdrs); return -1008; + } + xdr_destroy(&xdrs); HAIL_INFO(&sess->log, "%s: seqid %llu, want to ack", __func__, - (unsigned long long) ack_msg->seqid); + (unsigned long long) ack_msg.seqid); tmp = sess->out_msg; while (tmp) { @@ -209,18 +238,21 @@ static int rxmsg_ack_frag(struct cldc_se for (i = 0; i < req->n_pkts; i++) { struct cldc_pkt_info *pi; + struct cld_pkt_ftr *f; uint64_t seqid; pi = req->pkt_info[i]; if (!pi) continue; - seqid = pi->pkt.seqid; - if (seqid != ack_msg->seqid) + f = (struct cld_pkt_ftr *) + pi->data + (pi->pkt_len - CLD_PKT_FTR_LEN); + seqid = le64_to_cpu(f->seqid); + if (seqid != ack_msg.seqid) continue; HAIL_DEBUG(&sess->log, "%s: seqid %llu, expiring", __func__, - (unsigned long long) ack_msg->seqid); + (unsigned long long) ack_msg.seqid); req->pkt_info[i] = NULL; free(pi); @@ -231,19 +263,26 @@ static int rxmsg_ack_frag(struct cldc_se } static int rxmsg_event(struct cldc_session *sess, - const struct cld_packet *pkt, - const void *msgbuf, size_t buflen) + const struct cld_pkt_hdr *pkt, + const struct cld_pkt_ftr *foot) { - const struct cld_msg_event *ev = msgbuf; + XDR xdrs; + struct cld_msg_event ev; struct cldc_fh *fh = NULL; int i; - if (buflen < sizeof(*ev)) + xdrmem_create(&xdrs, sess->msg_buf, sess->msg_buf_len, XDR_DECODE); + if (!xdr_cld_msg_event(&xdrs, &ev)) { + HAIL_INFO(&sess->log, "%s: failed to decode cld_msg_event", + __func__); + xdr_destroy(&xdrs); return -1008; + } + xdr_destroy(&xdrs); for (i = 0; i < sess->fh->len; i++) { fh = &g_array_index(sess->fh, struct cldc_fh, i); - if (fh->fh_le == ev->fh) + if (fh->fh == ev.fh) break; else fh = NULL; @@ -252,7 +291,7 @@ static int rxmsg_event(struct cldc_sessi if (!fh) return -1011; - sess->ops->event(sess->private, sess, fh, le32_to_cpu(ev->events)); + sess->ops->event(sess->private, sess, fh, ev.events); return 0; } @@ -264,8 +303,6 @@ static void cldc_msg_free(struct cldc_ms if (!msg) return; - free(msg->data); - for (i = 0; i < msg->n_pkts; i++) free(msg->pkt_info[i]); @@ -304,51 +341,27 @@ static const char *user_key(struct cldc_ return sess->secret_key; } -static int cldc_receive_msg(struct cldc_session *sess, - const struct cld_packet *pkt, - size_t pkt_len) +static int rx_complete(struct cldc_session *sess, + const struct cld_pkt_hdr *pkt, + const struct cld_pkt_ftr *foot) { - const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) sess->msg_buf; - size_t msglen = sess->msg_buf_len; - - if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) { - HAIL_DEBUG(&sess->log, "%s: bad msg magic", __func__); - return -EPROTO; - } - - switch(msg->op) { + switch (sess->msg_buf_op) { case CMO_ACK: HAIL_INFO(&sess->log, "%s: received unexpected ACK", __func__); return -EBADRQC; case CMO_PING: /* send out an ACK */ - return ack_seqid(sess, pkt->seqid); - case CMO_NOP: - case CMO_CLOSE: - case CMO_DEL: - case CMO_LOCK: - case CMO_UNLOCK: - case CMO_TRYLOCK: - case CMO_PUT: - case CMO_NEW_SESS: - case CMO_END_SESS: - case CMO_OPEN: - case CMO_GET_META: - case CMO_GET: - return rxmsg_generic(sess, pkt, msg, msglen); + return ack_seqid(sess, foot->seqid); case CMO_NOT_MASTER: HAIL_ERR(&sess->log, "FIXME: not-master message received"); return -1055; /* FIXME */ case CMO_EVENT: - return rxmsg_event(sess, pkt, msg, msglen); + return rxmsg_event(sess, pkt, foot); case CMO_ACK_FRAG: - return rxmsg_ack_frag(sess, pkt, msg, msglen); + return rxmsg_ack_frag(sess, pkt, foot); default: - break; + return rxmsg_generic(sess, pkt, foot); } - - /* unknown op code */ - return -EBADRQC; } /** Accepts a packet's sequence ID. @@ -362,11 +375,8 @@ static int cldc_receive_msg(struct cldc_ * @return 0 on success; error code otherwise */ static int accept_seqid(struct cldc_session *sess, uint64_t seqid, - enum cld_msg_op op, bool *dupe) + enum cld_msg_op op) { - - *dupe = false; - switch (op) { case CMO_NEW_SESS: /* CMO_NEW_SESS initializes the session's sequence id */ @@ -393,7 +403,6 @@ static int accept_seqid(struct cldc_sess if (seqid_in_range(seqid, sess->next_seqid_in_tr, sess->next_seqid_in)) { - *dupe = true; return 0; } @@ -405,81 +414,47 @@ int cldc_receive_pkt(struct cldc_session const void *net_addr, size_t net_addrlen, const void *pktbuf, size_t pkt_len) { - const struct cld_packet *pkt = pktbuf; - const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1); const char *secret_key; - size_t msglen; struct timeval tv; time_t current_time; + struct cld_pkt_hdr pkt; + unsigned int hdr_len, msg_len; + const struct cld_pkt_ftr *foot; uint64_t seqid; - uint32_t pkt_flags; - bool first_frag, last_frag; - bool dupe = false; + XDR xdrs; int ret; - enum cld_msg_op msg_buf_op = CMO_ACK_FRAG; gettimeofday(&tv, NULL); current_time = tv.tv_sec; - if (pkt_len < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) { - HAIL_DEBUG(&sess->log, "%s: msg too short", __func__); + /* Decode the packet header */ + if (pkt_len < CLD_PKT_FTR_LEN) { + HAIL_DEBUG(&sess->log, "%s: packet too short to have a " + "well-formed footer", __func__); return -EPROTO; } - - msglen = pkt_len - sizeof(*pkt) - SHA_DIGEST_LENGTH; - - pkt_flags = le32_to_cpu(pkt->flags); - first_frag = pkt_flags & CPF_FIRST; - last_frag = pkt_flags & CPF_LAST; - if (first_frag) - msg_buf_op = msg->op; - - if (sess->log.verbose) { - if (msg_buf_op == CMO_GET) { - struct cld_msg_get_resp *dp; - dp = (struct cld_msg_get_resp *) msg; - HAIL_DEBUG(&sess->log, "%s(len %u, op %s" - ", seqid %llu, user %s, size %u)", - __func__, - (unsigned int) pkt_len, - __cld_opstr(msg->op), - (unsigned long long) le64_to_cpu(pkt->seqid), - pkt->user, - le32_to_cpu(dp->size)); - } else if (msg_buf_op == CMO_NEW_SESS) { - struct cld_msg_resp *dp; - dp = (struct cld_msg_resp *) msg; - HAIL_DEBUG(&sess->log, "%s(len %u, op %s" - ", seqid %llu, user %s, xid_in %llu)", - __func__, - (unsigned int) pkt_len, - __cld_opstr(msg->op), - (unsigned long long) le64_to_cpu(pkt->seqid), - pkt->user, - (unsigned long long) le64_to_cpu(dp->xid_in)); - } else { - HAIL_DEBUG(&sess->log, "%s(len %u, " - "flags %s%s, op %s, seqid %llu, user %s)", - __func__, - (unsigned int) pkt_len, - first_frag ? "F" : "", - last_frag ? "L" : "", - first_frag ? __cld_opstr(msg->op) : "n/a", - (unsigned long long) le64_to_cpu(pkt->seqid), - pkt->user); - } + xdrmem_create(&xdrs, (void *)pktbuf, + pkt_len - CLD_PKT_FTR_LEN, XDR_DECODE); + memset(&pkt, 0, sizeof(pkt)); + if (!xdr_cld_pkt_hdr(&xdrs, &pkt)) { + HAIL_DEBUG(&sess->log, "%s: failed to decode packet header", + __func__); + xdr_destroy(&xdrs); + return -EPROTO; } - - if (memcmp(pkt->magic, CLD_PKT_MAGIC, sizeof(pkt->magic))) { + hdr_len = xdr_getpos(&xdrs); + xdr_destroy(&xdrs); + if (memcmp(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic))) { HAIL_DEBUG(&sess->log, "%s: bad pkt magic", __func__); return -EPROTO; } /* check HMAC signature */ - secret_key = user_key(sess, pkt->user); + foot = (const struct cld_pkt_ftr *) + (((char *)pktbuf) + (pkt_len - CLD_PKT_FTR_LEN)); + secret_key = user_key(sess, pkt.user); ret = __cld_authcheck(&sess->log, secret_key, - pkt, pkt_len - SHA_DIGEST_LENGTH, - (uint8_t *)pkt + pkt_len - SHA_DIGEST_LENGTH); + pktbuf, pkt_len - SHA_DIGEST_LENGTH, foot->sha); if (ret) { HAIL_DEBUG(&sess->log, "%s: invalid auth (ret=%d)", __func__, ret); @@ -497,34 +472,40 @@ int cldc_receive_pkt(struct cldc_session if (current_time >= sess->msg_scan_time) sess_expire_outmsg(sess, current_time); - if (first_frag) - sess->msg_buf_len = 0; - - if ((sess->msg_buf_len + msglen) > CLD_MAX_MSG_SZ) { - HAIL_DEBUG(&sess->log, "%s: bad pkt length", __func__); - return -EPROTO; + if (pkt.mi.order & CLD_PKT_IS_FIRST) { + /* This packet begins a new message. + * Determine the new message's op */ + sess->msg_buf_op = pkt.mi.cld_pkt_msg_info_u.mi.op; } - memcpy(sess->msg_buf + sess->msg_buf_len, msg, msglen); - sess->msg_buf_len += msglen; - /* verify (or set, for new-sess) sequence id */ - seqid = le64_to_cpu(pkt->seqid); - ret = accept_seqid(sess, seqid, msg_buf_op, &dupe); + seqid = le64_to_cpu(foot->seqid); + ret = accept_seqid(sess, seqid, sess->msg_buf_op); if (ret) { HAIL_DEBUG(&sess->log, "%s: bad seqid %llu", __func__, (unsigned long long) seqid); return ret; } - if (dupe) - return ack_seqid(sess, pkt->seqid); + if (pkt.mi.order & CLD_PKT_IS_FIRST) + sess->msg_buf_len = 0; + msg_len = pkt_len - hdr_len - CLD_PKT_FTR_LEN; + if ((sess->msg_buf_len + msg_len) > CLD_MAX_MSG_SZ) { + HAIL_DEBUG(&sess->log, "%s: message too long", __func__); + return -EPROTO; + } + memcpy(sess->msg_buf + sess->msg_buf_len, pktbuf + hdr_len, msg_len); + sess->msg_buf_len += msg_len; sess->expire_time = current_time + CLDC_SESS_EXPIRE; - if (!last_frag) - return sess ? ack_seqid(sess, pkt->seqid) : 0; - - return cldc_receive_msg(sess, pkt, pkt_len); + if (pkt.mi.order & CLD_PKT_IS_LAST) { + HAIL_DEBUG(&sess->log, "%s: receiving complete message of " + "op %s", __func__, + __cld_opstr(sess->msg_buf_op)); + return rx_complete(sess, &pkt, foot); + } else { + return ack_seqid(sess, foot->seqid); + } } static void sess_next_seqid(struct cldc_session *sess, uint64_t *seqid) @@ -533,19 +514,49 @@ static void sess_next_seqid(struct cldc_ *seqid = rc; } +/** + * creates a new cldc_msg + * + * @param sess The session + * @param copts The call options + * @param op The op of message to create + * @param xdrproc The XDR function to use to create the message body + * @param data The data to pass to xdrproc + * + * @return The cldc message, or NULL on error, + */ static struct cldc_msg *cldc_new_msg(struct cldc_session *sess, const struct cldc_call_opts *copts, enum cld_msg_op op, - size_t msg_len) + xdrproc_t xdrproc, const void *data) { struct cldc_msg *msg; - struct cld_msg_hdr *hdr; struct timeval tv; - int n_pkts, i, data_left; - void *p; + size_t i, body_len, n_pkts; + char *body; + XDR xbdy; + + /* Encode the message body */ + body_len = xdr_sizeof(xdrproc, (void *)data); + body = alloca(body_len); + xdrmem_create(&xbdy, body, body_len, XDR_ENCODE); + if (!xdrproc(&xbdy, (void *)data)) { + HAIL_DEBUG(&sess->log, "%s: failed to encode " + "message", __func__); + xdr_destroy(&xbdy); + return NULL; + } + xdr_destroy(&xbdy); - n_pkts = msg_len / CLD_MAX_PKT_MSG_SZ; - n_pkts += ((msg_len % CLD_MAX_PKT_MSG_SZ) ? 1 : 0); + if (body_len == 0) + /* Some packets (like ACKS) just have a header, and no message + * body. */ + n_pkts = 1; + else { + /* round up */ + n_pkts = (body_len + CLD_MAX_PKT_MSG_SZ - 1) / + CLD_MAX_PKT_MSG_SZ; + } /* Create cldc_msg */ msg = calloc(1, sizeof(*msg) + @@ -553,57 +564,65 @@ static struct cldc_msg *cldc_new_msg(str if (!msg) return NULL; - msg->data = calloc(1, msg_len); - if (!msg->data) { - free(msg); - return NULL; - } - msg->n_pkts = n_pkts; __cld_rand64(&msg->xid); msg->op = op; msg->sess = sess; - if (copts) memcpy(&msg->copts, copts, sizeof(msg->copts)); - gettimeofday(&tv, NULL); msg->expire_time = tv.tv_sec + CLDC_MSG_EXPIRE; - msg->data_len = msg_len; - - p = msg->data; - data_left = msg_len; for (i = 0; i < msg->n_pkts; i++) { + XDR xhdr; + struct cld_pkt_hdr pkt; struct cldc_pkt_info *pi; - int pkt_len; + int hdr_len, body_chunk_len, pkt_len; - pkt_len = MIN(data_left, CLD_MAX_PKT_MSG_SZ); + /* Set up packet header */ + memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic)); + memcpy(&pkt.sid, sess->sid, CLD_SID_SZ); + pkt.user = sess->user; + if (i == 0) { + if (i == (msg->n_pkts - 1)) + pkt.mi.order = CLD_PKT_ORD_FIRST_LAST; + else + pkt.mi.order = CLD_PKT_ORD_FIRST; + pkt.mi.cld_pkt_msg_info_u.mi.xid = msg->xid; + pkt.mi.cld_pkt_msg_info_u.mi.op = op; + } else { + if (i == (msg->n_pkts - 1)) + pkt.mi.order = CLD_PKT_ORD_LAST; + else + pkt.mi.order = CLD_PKT_ORD_MID; + } - pi = calloc(1, sizeof(*pi) + pkt_len + SHA_DIGEST_LENGTH); + /* Allocate memory */ + hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, &pkt); + body_chunk_len = MIN(body_len, CLD_MAX_PKT_MSG_SZ); + pkt_len = hdr_len + body_chunk_len + CLD_PKT_FTR_LEN; + pi = calloc(1, sizeof(*pi) + pkt_len); if (!pi) goto err_out; - pi->pkt_len = pkt_len; + msg->pkt_info[i] = pi; + strncpy(pi->user, sess->user, CLD_MAX_USERNAME - 1); - memcpy(pi->pkt.magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ); - memcpy(pi->pkt.sid, sess->sid, CLD_SID_SZ); - strncpy(pi->pkt.user, sess->user, CLD_MAX_USERNAME - 1); - - if (i == 0) - pi->pkt.flags |= cpu_to_le32(CPF_FIRST); - if (i == (msg->n_pkts - 1)) - pi->pkt.flags |= cpu_to_le32(CPF_LAST); + /* Fill in the packet header */ + xdrmem_create(&xhdr, (char *)pi->data, hdr_len, XDR_ENCODE); + if (!xdr_cld_pkt_hdr(&xhdr, &pkt)) { + HAIL_DEBUG(&sess->log, "%s: failed to encode header " + "for packet %zu", __func__, i); + xdr_destroy(&xhdr); + goto err_out; + } - msg->pkt_info[i] = pi; - data_left -= pkt_len; + /* Fill in the body */ + memcpy(pi->data + hdr_len, body, body_chunk_len); + body += body_chunk_len; + body_len -= body_chunk_len; } - hdr = (struct cld_msg_hdr *) msg->data; - memcpy(&hdr->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ); - hdr->op = op; - hdr->xid = msg->xid; - return msg; err_out: @@ -642,32 +661,8 @@ static void sess_expire(struct cldc_sess } static int sess_send_pkt(struct cldc_session *sess, - const struct cld_packet *pkt, size_t pkt_len) + const void *pkt, size_t pkt_len) { - if (sess->log.verbose) { - uint32_t flags = le32_to_cpu(pkt->flags); - bool first = (flags & CPF_FIRST); - bool last = (flags & CPF_LAST); - uint8_t op = CMO_NOP; - - if (first) { - struct cld_msg_hdr *hdr; - - hdr = (struct cld_msg_hdr *) (pkt + 1); - op = hdr->op; - } - - HAIL_DEBUG(&sess->log, - "%s(len %zu, flags %s%s, " - "op %s, seqid %llu)", - __func__, - pkt_len, - first ? "F" : "", - last ? "L" : "", - first ? __cld_opstr(op) : "n/a", - (unsigned long long) le64_to_cpu(pkt->seqid)); - } - return sess->ops->pkt_send(sess->private, sess->addr, sess->addr_len, pkt, pkt_len); @@ -696,18 +691,12 @@ static int sess_timer(struct cldc_sessio for (i = 0; i < msg->n_pkts; i++) { struct cldc_pkt_info *pi; - int total_pkt_len; pi = msg->pkt_info[i]; if (!pi) continue; - - total_pkt_len = sizeof(struct cld_packet) + - pi->pkt_len + SHA_DIGEST_LENGTH; - pi->retries++; - - sess_send_pkt(sess, &pi->pkt, total_pkt_len); + sess_send_pkt(sess, pi->data, pi->pkt_len); } } @@ -719,40 +708,31 @@ static int sess_timer(struct cldc_sessio static int sess_send(struct cldc_session *sess, struct cldc_msg *msg) { int ret, i; - int data_left; - void *p; const char *secret_key; secret_key = user_key(sess, sess->user); - p = msg->data; - data_left = msg->data_len; for (i = 0; i < msg->n_pkts; i++) { struct cldc_pkt_info *pi; - int total_pkt_len; + struct cld_pkt_ftr *foot; pi = msg->pkt_info[i]; - memcpy(pi->data, p, pi->pkt_len); - - total_pkt_len = sizeof(struct cld_packet) + - pi->pkt_len + SHA_DIGEST_LENGTH; /* Add the sequence number to the end of the packet */ - sess_next_seqid(sess, &pi->pkt.seqid); - - p += pi->pkt_len; - data_left -= pi->pkt_len; + foot = (struct cld_pkt_ftr *) + (pi->data + pi->pkt_len - CLD_PKT_FTR_LEN); + memset(foot, 0, CLD_PKT_FTR_LEN); + sess_next_seqid(sess, &foot->seqid); /* Add the signature to the end of the packet */ ret = __cld_authsign(&sess->log, secret_key, - &pi->pkt, total_pkt_len-SHA_DIGEST_LENGTH, - ((uint8_t *)&pi->pkt + total_pkt_len) - - SHA_DIGEST_LENGTH); + pi->data, + pi->pkt_len - SHA_DIGEST_LENGTH,foot->sha); if (ret) return ret; /* attempt first send */ - if (sess_send_pkt(sess, &pi->pkt, total_pkt_len) < 0) + if (sess_send_pkt(sess, pi->data, pi->pkt_len) < 0) return -EIO; } @@ -802,7 +782,7 @@ int cldc_end_sess(struct cldc_session *s /* create END-SESS message */ msg = cldc_new_msg(sess, copts, CMO_END_SESS, - sizeof(struct cld_msg_hdr)); + (xdrproc_t)xdr_void, NULL); if (!msg) return -ENOMEM; @@ -814,10 +794,8 @@ int cldc_end_sess(struct cldc_session *s static ssize_t new_sess_cb(struct cldc_msg *msg, const void *resp_p, size_t resp_len, enum cle_err_codes resp_rc) { - struct cldc_session *sess = msg->sess; - if (resp_rc == CLE_OK) - sess->confirmed = true; + msg->sess->confirmed = true; if (msg->copts.cb) return msg->copts.cb(&msg->copts, resp_rc); @@ -870,7 +848,7 @@ int cldc_new_sess(const struct cldc_ops /* create NEW-SESS message */ msg = cldc_new_msg(sess, copts, CMO_NEW_SESS, - sizeof(struct cld_msg_hdr)); + (xdrproc_t)xdr_void, NULL); if (!msg) { sess_free(sess); return -ENOMEM; @@ -917,7 +895,7 @@ int cldc_nop(struct cldc_session *sess, /* create NOP message */ msg = cldc_new_msg(sess, copts, CMO_NOP, - sizeof(struct cld_msg_hdr)); + (xdrproc_t)xdr_void, NULL); if (!msg) return -ENOMEM; @@ -930,8 +908,7 @@ int cldc_del(struct cldc_session *sess, const char *pathname) { struct cldc_msg *msg; - struct cld_msg_del *del; - void *p; + struct cld_msg_del del; size_t plen; if (!sess->confirmed) @@ -946,33 +923,33 @@ int cldc_del(struct cldc_session *sess, return -EINVAL; /* create DEL message */ + del.inode_name = (char *)pathname; msg = cldc_new_msg(sess, copts, CMO_DEL, - sizeof(struct cld_msg_del) + strlen(pathname)); + (xdrproc_t)xdr_cld_msg_del, &del); if (!msg) return -ENOMEM; msg->cb = generic_end_cb; - /* fill in DEL-specific name_len, name info */ - del = (struct cld_msg_del *) msg->data; - del->name_len = cpu_to_le16(plen); - p = del; - p += sizeof(struct cld_msg_del); - memcpy(p, pathname, plen); - return sess_send(sess, msg); } static ssize_t open_end_cb(struct cldc_msg *msg, const void *resp_p, size_t resp_len, enum cle_err_codes resp_rc) { - const struct cld_msg_open_resp *resp = resp_p; - struct cldc_fh *fh = msg->cb_private; - if (resp_rc == CLE_OK) { - if (resp_len < sizeof(*resp)) - return -1010; - fh->fh_le = resp->fh; + struct cldc_fh *fh = msg->cb_private; + XDR xdrs; + struct cld_msg_open_resp resp; + + xdrmem_create(&xdrs, (void *)resp_p, resp_len, XDR_DECODE); + memset(&resp, 0, sizeof(resp)); + if (!xdr_cld_msg_open_resp(&xdrs, &resp)) { + xdr_destroy(&xdrs); + return -1009; + } + + fh->fh = resp.fh; fh->valid = true; } @@ -988,9 +965,8 @@ int cldc_open(struct cldc_session *sess, uint32_t events, struct cldc_fh **fh_out) { struct cldc_msg *msg; - struct cld_msg_open *open; + struct cld_msg_open open; struct cldc_fh fh, *fhtmp; - void *p; size_t plen; int fh_idx; @@ -1008,8 +984,11 @@ int cldc_open(struct cldc_session *sess, return -EINVAL; /* create OPEN message */ + open.mode = open_mode; + open.events = events; + open.inode_name = (char *)pathname; msg = cldc_new_msg(sess, copts, CMO_OPEN, - sizeof(struct cld_msg_open) + strlen(pathname)); + (xdrproc_t)xdr_cld_msg_open, &open); if (!msg) return -ENOMEM; @@ -1024,15 +1003,6 @@ int cldc_open(struct cldc_session *sess, msg->cb = open_end_cb; msg->cb_private = fhtmp; - /* fill in OPEN-specific info */ - open = (struct cld_msg_open *) msg->data; - open->mode = cpu_to_le32(open_mode); - open->events = cpu_to_le32(events); - open->name_len = cpu_to_le16(plen); - p = open; - p += sizeof(struct cld_msg_open); - memcpy(p, pathname, plen); - *fh_out = fhtmp; return sess_send(sess, msg); @@ -1042,7 +1012,7 @@ int cldc_close(struct cldc_fh *fh, const { struct cldc_session *sess; struct cldc_msg *msg; - struct cld_msg_close *close_msg; + struct cld_msg_close close_msg; if (!fh->valid) return -EINVAL; @@ -1050,8 +1020,9 @@ int cldc_close(struct cldc_fh *fh, const sess = fh->sess; /* create CLOSE message */ + close_msg.fh = fh->fh; msg = cldc_new_msg(sess, copts, CMO_CLOSE, - sizeof(struct cld_msg_close)); + (xdrproc_t)xdr_cld_msg_close, &close_msg); if (!msg) return -ENOMEM; @@ -1060,10 +1031,6 @@ int cldc_close(struct cldc_fh *fh, const msg->cb = generic_end_cb; - /* fill in CLOSE-specific fh info */ - close_msg = (struct cld_msg_close *) msg->data; - close_msg->fh = fh->fh_le; - return sess_send(sess, msg); } @@ -1072,7 +1039,7 @@ int cldc_lock(struct cldc_fh *fh, const { struct cldc_session *sess; struct cldc_msg *msg; - struct cld_msg_lock *lock; + struct cld_msg_lock lock; if (!fh->valid) return -EINVAL; @@ -1080,19 +1047,16 @@ int cldc_lock(struct cldc_fh *fh, const sess = fh->sess; /* create LOCK message */ + lock.fh = fh->fh; + lock.flags = lock_flags; msg = cldc_new_msg(sess, copts, wait_for_lock ? CMO_LOCK : CMO_TRYLOCK, - sizeof(struct cld_msg_lock)); + (xdrproc_t)xdr_cld_msg_lock, &lock); if (!msg) return -ENOMEM; msg->cb = generic_end_cb; - /* fill in LOCK-specific info */ - lock = (struct cld_msg_lock *) msg->data; - lock->fh = fh->fh_le; - lock->flags = cpu_to_le32(lock_flags); - return sess_send(sess, msg); } @@ -1100,7 +1064,7 @@ int cldc_unlock(struct cldc_fh *fh, cons { struct cldc_session *sess; struct cldc_msg *msg; - struct cld_msg_unlock *unlock; + struct cld_msg_unlock unlock; if (!fh->valid) return -EINVAL; @@ -1108,17 +1072,14 @@ int cldc_unlock(struct cldc_fh *fh, cons sess = fh->sess; /* create UNLOCK message */ + unlock.fh = fh->fh; msg = cldc_new_msg(sess, copts, CMO_UNLOCK, - sizeof(struct cld_msg_unlock)); + (xdrproc_t)xdr_cld_msg_unlock, &unlock); if (!msg) return -ENOMEM; msg->cb = generic_end_cb; - /* fill in UNLOCK-specific info */ - unlock = (struct cld_msg_unlock *) msg->data; - unlock->fh = fh->fh_le; - return sess_send(sess, msg); } @@ -1127,7 +1088,7 @@ int cldc_put(struct cldc_fh *fh, const s { struct cldc_session *sess; struct cldc_msg *msg; - struct cld_msg_put *put; + struct cld_msg_put put; if (!data || !data_len || data_len > CLD_MAX_PAYLOAD_SZ) return -EINVAL; @@ -1138,17 +1099,14 @@ int cldc_put(struct cldc_fh *fh, const s sess = fh->sess; /* create PUT message */ + put.fh = fh->fh; + put.data.data_len = data_len; + put.data.data_val = (char *)data; msg = cldc_new_msg(sess, copts, CMO_PUT, - sizeof(struct cld_msg_put) + data_len); + (xdrproc_t)xdr_cld_msg_put, &put); if (!msg) return -ENOMEM; - put = (struct cld_msg_put *) msg->data; - put->fh = fh->fh_le; - put->data_size = cpu_to_le32(data_len); - - memcpy((put + 1), data, data_len); - msg->cb = generic_end_cb; sess_send(sess, msg); @@ -1156,67 +1114,39 @@ int cldc_put(struct cldc_fh *fh, const s return 0; } -#undef XC32 -#undef XC64 -#define XC32(name) \ - o->name = le32_to_cpu(resp->name) -#define XC64(name) \ - o->name = le64_to_cpu(resp->name) - static ssize_t get_end_cb(struct cldc_msg *msg, const void *resp_p, size_t resp_len, enum cle_err_codes resp_rc) { - const struct cld_msg_get_resp *resp = resp_p; - struct cld_msg_get_resp *o = NULL; - if (resp_rc == CLE_OK) { - bool get_body; - - o = &msg->copts.resp; - - get_body = (resp->resp.hdr.op == CMO_GET); - msg->op = CMO_GET; + XDR xin; + struct cld_msg_get_resp *resp = &msg->copts.resp; - /* copy-and-swap */ - XC64(inum); - XC32(ino_len); - XC32(size); - XC64(version); - XC64(time_create); - XC64(time_modify); - XC32(flags); - - /* copy inode name */ - if (o->ino_len <= CLD_INODE_NAME_MAX) { - size_t diffsz; - const void *p; - - p = (resp + 1); - memcpy(&msg->copts.u.get.inode_name, p, o->ino_len); - - p += o->ino_len; - diffsz = p - resp_p; - - /* point to internal buffer holding GET data */ - msg->copts.u.get.buf = msg->sess->msg_buf + diffsz; - msg->copts.u.get.size = msg->sess->msg_buf_len - diffsz; - } else { - o->ino_len = 0; /* Probably full of garbage */ + /* Parse GET response. + * Avoid memory allocation in xdr_string by pointing + * variable-length elements at static buffers. */ + xdrmem_create(&xin, (void *)resp_p, resp_len, XDR_DECODE); + memset(resp, 0, sizeof(struct cld_msg_get_resp)); + resp->inode_name = msg->sess->inode_name_temp; + resp->data.data_val = msg->sess->payload; + resp->data.data_len = 0; + if (!xdr_cld_msg_get_resp(&xin, resp)) { + xdr_destroy(&xin); + return -1009; } + xdr_destroy(&xin); } if (msg->copts.cb) return msg->copts.cb(&msg->copts, resp_rc); return 0; } -#undef XC int cldc_get(struct cldc_fh *fh, const struct cldc_call_opts *copts, bool metadata_only) { struct cldc_session *sess; struct cldc_msg *msg; - struct cld_msg_get *get; + struct cld_msg_get get; if (!fh->valid) return -EINVAL; @@ -1224,17 +1154,14 @@ int cldc_get(struct cldc_fh *fh, const s sess = fh->sess; /* create GET message */ + get.fh = fh->fh; msg = cldc_new_msg(sess, copts, CMO_GET, - sizeof(struct cld_msg_get)); + (xdrproc_t)xdr_cld_msg_get, &get); if (!msg) return -ENOMEM; msg->cb = get_end_cb; - /* fill in GET-specific info */ - get = (struct cld_msg_get *) msg->data; - get->fh = fh->fh_le; - return sess_send(sess, msg); } diff -X /garz/tmp/dontdiff -urNp cld/lib/cld_msg_rpc.x cld.rpcgen/lib/cld_msg_rpc.x --- cld/lib/cld_msg_rpc.x 1969-12-31 19:00:00.000000000 -0500 +++ cld.rpcgen/lib/cld_msg_rpc.x 2010-02-03 16:03:36.000000000 -0500 @@ -0,0 +1,218 @@ +/* + * Copyright 2010, Colin McCabe + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +const CLD_PKT_MAGIC = "CLDc1pkt"; +const CLD_SID_SZ = 8; + +const CLD_INODE_NAME_MAX = 256; /**< max total pathname len */ + +const CLD_MAX_USERNAME = 32; + +const CLD_MAX_PKT_MSG_SZ = 1024; /**< The maximum number of message bytes we'll + put in a single packet */ + +const CLD_MAX_PAYLOAD_SZ = 131072; /**< Maximum length of the data that can be + sent with get or put. In some sense, + this is part of cld's API, and + shouldn't be changed lightly. */ + +const CLD_MAX_MSG_SZ = 196608; /**< Maximum size of a single message + including all packets. */ + +const CLD_MAX_SECRET_KEY = 128; /**< includes req. nul */ + +/** available RPC operations */ +enum cld_msg_op { + /* client -> server */ + CMO_NOP = 0, /**< no op */ + CMO_NEW_SESS = 1, /**< new session */ + CMO_OPEN = 2, /**< open file */ + CMO_GET_META = 3, /**< get metadata */ + CMO_GET = 4, /**< get metadata + data */ + CMO_PUT = 6, /**< put data */ + CMO_CLOSE = 7, /**< close file */ + CMO_DEL = 8, /**< delete file */ + CMO_LOCK = 9, /**< lock */ + CMO_UNLOCK = 10, /**< unlock */ + CMO_TRYLOCK = 11, /**< trylock */ + CMO_ACK = 12, /**< ack of seqid rx'd */ + CMO_END_SESS = 13, /**< end session */ + + /* server -> client */ + CMO_PING = 14, /**< server to client ping */ + CMO_NOT_MASTER = 15, /**< I am not the master! */ + CMO_EVENT = 16, /**< server->cli async event */ + CMO_ACK_FRAG = 17, /**< ack partial msg */ + + CMO_AFTER_LAST +}; + +/** CLD error codes */ +enum cle_err_codes { + CLE_OK = 0, /**< success / no error */ + CLE_SESS_EXISTS = 1, /**< session exists */ + CLE_SESS_INVAL = 2, /**< session doesn't exist */ + CLE_DB_ERR = 3, /**< db error */ + CLE_BAD_PKT = 4, /**< invalid/corrupted packet */ + CLE_INODE_INVAL = 5, /**< inode doesn't exist */ + CLE_NAME_INVAL = 6, /**< inode name invalid */ + CLE_OOM = 7, /**< server out of memory */ + CLE_FH_INVAL = 8, /**< file handle invalid */ + CLE_DATA_INVAL = 9, /**< invalid data pkt */ + CLE_LOCK_INVAL = 10, /**< invalid lock */ + CLE_LOCK_CONFLICT = 11, /**< conflicting lock held */ + CLE_LOCK_PENDING = 12, /**< lock waiting to be acq. */ + CLE_MODE_INVAL = 13, /**< op incompat. w/ file mode */ + CLE_INODE_EXISTS = 14, /**< inode exists */ + CLE_DIR_NOTEMPTY = 15, /**< dir not empty */ + CLE_INTERNAL_ERR = 16, /**< nonspecific internal err */ + CLE_TIMEOUT = 17, /**< session timed out */ + CLE_SIG_INVAL = 18 /**< HMAC sig bad / auth failed */ +}; + +/** availble OPEN mode flags */ +enum cld_open_modes { + COM_READ = 0x01, /**< read */ + COM_WRITE = 0x02, /**< write */ + COM_LOCK = 0x04, /**< lock */ + COM_ACL = 0x08, /**< ACL update */ + COM_CREATE = 0x10, /**< create file, if not exist */ + COM_EXCL = 0x20, /**< fail create if file exists */ + COM_DIRECTORY = 0x40 /**< operate on a directory */ +}; + +/** potential events client may receive */ +enum cld_events { + CE_UPDATED = 0x01, /**< contents updated */ + CE_DELETED = 0x02, /**< inode deleted */ + CE_LOCKED = 0x04, /**< lock acquired */ + CE_MASTER_FAILOVER = 0x08, /**< master failover */ + CE_SESS_FAILED = 0x10 +}; + +/** LOCK flags */ +enum cld_lock_flags { + CLF_SHARED = 0x01 /**< a shared (read) lock */ +}; + +/** Describes whether a packet begins, continues, or ends a message. */ +enum cld_pkt_order_t { + CLD_PKT_ORD_MID = 0x0, + CLD_PKT_ORD_FIRST = 0x1, + CLD_PKT_ORD_LAST = 0x2, + CLD_PKT_ORD_FIRST_LAST = 0x3 +}; +const CLD_PKT_IS_FIRST = 0x1; +const CLD_PKT_IS_LAST = 0x2; + +/** Information that appears only in the first packet */ +struct cld_pkt_msg_infos { + hyper xid; /**< opaque message id */ + enum cld_msg_op op; /**< message operation */ +}; + +/** Information about the message contained in this packet */ +union cld_pkt_msg_info switch (enum cld_pkt_order_t order) { + case CLD_PKT_ORD_MID: + case CLD_PKT_ORD_LAST: + void; + case CLD_PKT_ORD_FIRST: + case CLD_PKT_ORD_FIRST_LAST: + struct cld_pkt_msg_infos mi; +}; + +/** header for each packet */ +struct cld_pkt_hdr { + hyper magic; /**< magic number; constant */ + hyper sid; /**< client id */ + string user<CLD_MAX_USERNAME>; /**< authenticated user */ + struct cld_pkt_msg_info mi; +}; + +/** generic response for PUT, CLOSE, DEL, LOCK, UNLOCK */ +struct cld_msg_generic_resp { + enum cle_err_codes code; /**< error code, CLE_xxx */ + hyper xid_in; /**< C->S xid */ +}; + +/** ACK-FRAG message */ +struct cld_msg_ack_frag { + hyper seqid; /**< sequence id to ack */ +}; + +/** OPEN message */ +struct cld_msg_open { + int mode; /**< open mode, COM_xxx */ + int events; /**< events mask, CE_xxx */ + string inode_name<CLD_INODE_NAME_MAX>; +}; + +/** OPEN message response */ +struct cld_msg_open_resp { + struct cld_msg_generic_resp msg; + hyper fh; /**< handle opened */ +}; + +/** GET message */ +struct cld_msg_get { + hyper fh; /**< open file handle */ +}; + +/** GET message response */ +struct cld_msg_get_resp { + struct cld_msg_generic_resp msg; + hyper inum; /**< unique inode number */ + hyper vers; /**< inode version */ + hyper time_create; /**< creation time */ + hyper time_modify; /**< last modification time */ + int flags; /**< inode flags; CIFL_xxx */ + string inode_name<CLD_INODE_NAME_MAX>; + opaque data<CLD_MAX_PAYLOAD_SZ>; +}; + +/** PUT message */ +struct cld_msg_put { + hyper fh; /**< open file handle */ + opaque data<CLD_MAX_PAYLOAD_SZ>; +}; + +/** CLOSE message */ +struct cld_msg_close { + hyper fh; /**< open file handle */ +}; + +/** DEL message */ +struct cld_msg_del { + string inode_name<CLD_INODE_NAME_MAX>; +}; + +/** UNLOCK message */ +struct cld_msg_unlock { + uint64_t fh; /**< open file handle */ +}; + +/** LOCK message */ +struct cld_msg_lock { + hyper fh; /**< open file handle */ + int flags; /**< CLF_xxx */ +}; + +/** Server-to-client EVENT message */ +struct cld_msg_event { + hyper fh; /**< open file handle */ + int events; /**< CE_xxx */ +}; diff -X /garz/tmp/dontdiff -urNp cld/lib/common.c cld.rpcgen/lib/common.c --- cld/lib/common.c 2010-01-29 00:36:25.000000000 -0500 +++ cld.rpcgen/lib/common.c 2010-01-22 18:15:07.000000000 -0500 @@ -26,7 +26,11 @@ #include <errno.h> #include <glib.h> #include <cld-private.h> -#include "cld_msg.h" +#include <openssl/sha.h> +#include <openssl/hmac.h> +#include "cld_msg_rpc.h" +#include <hail_log.h> +#include <syslog.h> /* duplicated from tools/cldcli.c; put in common header somewhere? */ #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) diff -X /garz/tmp/dontdiff -urNp cld/lib/Makefile.am cld.rpcgen/lib/Makefile.am --- cld/lib/Makefile.am 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/lib/Makefile.am 2010-02-02 23:10:51.000000000 -0500 @@ -1,10 +1,21 @@ +BUILT_SOURCES = cld_msg_rpc.h + EXTRA_DIST = libcldc.pc.in libcldc-uninstalled.pc.in INCLUDES = -I$(top_srcdir)/include \ @GLIB_CFLAGS@ +mostlyclean-local: + rm -f *_rpc.h *_rpc_xdr.c + +%_rpc.h: %_rpc.x + rpcgen -h $< > $@ + +%_rpc_xdr.c: %_rpc.x + rpcgen -c $< > $@ + LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@ lib_LTLIBRARIES = libcldc.la @@ -15,7 +26,8 @@ libcldc_la_SOURCES = \ cldc-dns.c \ common.c \ libtimer.c \ - pkt.c + pkt.c \ + cld_msg_rpc_xdr.c libcldc_la_LDFLAGS = \ -version-info $(LIBCLDC_CURRENT):$(LIBCLDC_REVISION):$(LIBCLDC_AGE) \ @@ -25,3 +37,4 @@ libcldc_la_LDFLAGS = \ pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libcldc.pc +include_HEADERS = cld_msg_rpc.h diff -X /garz/tmp/dontdiff -urNp cld/lib/pkt.c cld.rpcgen/lib/pkt.c --- cld/lib/pkt.c 2010-02-04 19:08:09.000000000 -0500 +++ cld.rpcgen/lib/pkt.c 2010-02-03 16:27:08.000000000 -0500 @@ -17,11 +17,16 @@ */ #include <string.h> +#include <stdio.h> #include <errno.h> +#include <glib.h> #include <syslog.h> #include <openssl/sha.h> #include <openssl/hmac.h> -#include <cldc.h> +#include <cld-private.h> +#include "cld_pkt.h" +#include "cld_msg_rpc.h" +#include <hail_log.h> int __cld_authcheck(struct hail_log *log, const char *key, const void *buf, size_t buf_len, const void *sha) @@ -92,3 +97,96 @@ const char *__cld_opstr(enum cld_msg_op } } +const char *__cld_pkt_hdr_to_str(char *scratch, + const char *pkt_hdr, size_t pkt_len) +{ + XDR xin; + struct cld_pkt_hdr pkt; + bool bad_magic; + char temp[50], temp2[50]; + uint64_t seqid; + struct cld_pkt_ftr *foot; + size_t hdr_len; + + temp[0] = '\0'; + temp2[0] = '\0'; + foot = (struct cld_pkt_ftr *)(pkt_hdr + pkt_len - CLD_PKT_FTR_LEN); + seqid = le64_to_cpu(foot->seqid); + + if (pkt_len <= CLD_PKT_FTR_LEN) { + snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN, + "[MALFORMED: only %zu bytes]", pkt_len); + return scratch; + } + xdrmem_create(&xin, (void *)pkt_hdr, pkt_len - CLD_PKT_FTR_LEN, + XDR_DECODE); + memset(&pkt, 0, sizeof(pkt)); + if (!xdr_cld_pkt_hdr(&xin, &pkt)) { + xdr_destroy(&xin); + snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN, + "[MALFORMED: can't parse]"); + return scratch; + } + hdr_len = xdr_getpos(&xin); + xdr_destroy(&xin); + + bad_magic = !!(memcmp(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic))); + if (pkt.mi.order & CLD_PKT_IS_FIRST) { + struct cld_pkt_msg_infos *infos = + &pkt.mi.cld_pkt_msg_info_u.mi; + snprintf(temp, sizeof(temp), "[TYPE:%s, XID:%llx]", + __cld_opstr(infos->op), + (unsigned long long) infos->xid); + switch (infos->op) { + case CMO_ACK_FRAG: { + XDR x; + struct cld_msg_ack_frag ack; + memset(&ack, 0, sizeof(ack)); + xdrmem_create(&x, ((char *)pkt_hdr) + hdr_len, + pkt_len - hdr_len - CLD_PKT_FTR_LEN, + XDR_DECODE); + if (!xdr_cld_msg_ack_frag(&x, &ack)) { + xdr_destroy(&x); + snprintf(temp2, sizeof(temp2), "{MALFORMED}"); + break; + } + snprintf(temp2, sizeof(temp2), "{seqid:%llx}", + (unsigned long long) ack.seqid); + xdr_destroy(&x); + break; + } + default: + break; + } + } else { + snprintf(temp, sizeof(temp), "[CONT]"); + } + + snprintf(scratch, PKT_HDR_TO_STR_SCRATCH_LEN, + "<%s%s%s> " + "%s USER:'%s' SEQID:%llx %s", + ((pkt.mi.order & CLD_PKT_IS_FIRST) ? "1st" : ""), + ((pkt.mi.order & CLD_PKT_IS_LAST) ? "End" : ""), + (bad_magic ? "B" : ""), + temp, pkt.user, + (unsigned long long) seqid, + temp2); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return scratch; +} + +void __cld_dump_buf(const void *buf, size_t len) +{ + const unsigned char *buff = buf; + size_t off = 0; + do { + int i; + for (i = 0; i < 8; i++) { + if (!len) + break; + printf("%02x ", buff[off++]); + len--; + } + printf("\n"); + } while (len); +} diff -X /garz/tmp/dontdiff -urNp cld/server/cldb.h cld.rpcgen/server/cldb.h --- cld/server/cldb.h 2010-02-04 01:54:16.000000000 -0500 +++ cld.rpcgen/server/cldb.h 2010-01-22 18:29:11.000000000 -0500 @@ -23,7 +23,7 @@ #include <stdbool.h> #include <db.h> #include <cld-private.h> -#include <cld_msg.h> +#include <cld_msg_rpc.h> typedef uint64_t cldino_t; diff -X /garz/tmp/dontdiff -urNp cld/server/cld.h cld.rpcgen/server/cld.h --- cld/server/cld.h 2010-02-04 01:54:16.000000000 -0500 +++ cld.rpcgen/server/cld.h 2010-02-03 16:46:41.000000000 -0500 @@ -25,7 +25,7 @@ #include <poll.h> #include <glib.h> #include "cldb.h" -#include <cld_msg.h> +#include <cld_msg_rpc.h> #include <cld_common.h> #include <hail_log.h> @@ -72,20 +72,12 @@ struct session { bool dead; /* session has ended */ /* huge buffer should always come last */ + enum cld_msg_op msg_op; + uint64_t msg_xid; unsigned int msg_buf_len; char msg_buf[CLD_MAX_MSG_SZ]; }; -struct msg_params { - int sock_fd; - const struct client *cli; - struct session *sess; - - const struct cld_packet *pkt; - const void *msg; - size_t msg_len; -}; - struct server_stats { unsigned long poll; /* num. polls */ unsigned long event; /* events dispatched */ @@ -122,29 +114,59 @@ struct server { struct server_stats stats; /* global statistics */ }; +struct pkt_info { + struct cld_pkt_hdr *pkt; + struct session *sess; + uint64_t seqid; + uint64_t xid; + enum cld_msg_op op; + size_t hdr_len; +}; + /* msg.c */ extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum); -extern void msg_open(struct msg_params *); -extern void msg_put(struct msg_params *); -extern void msg_close(struct msg_params *); -extern void msg_del(struct msg_params *); -extern void msg_unlock(struct msg_params *); -extern void msg_lock(struct msg_params *, bool); -extern void msg_ack(struct msg_params *); -extern void msg_get(struct msg_params *, bool); +extern void msg_get(struct session *sess, const void *v); +extern void msg_open(struct session *sess, const void *v); +extern void msg_put(struct session *sess, const void *v); +extern void msg_close(struct session *sess, const void *v); +extern void msg_del(struct session *sess, const void *v); +extern void msg_unlock(struct session *sess, const void *v); +extern void msg_lock(struct session *sess, const void *v); +extern void msg_ack(struct session *sess, uint64_t seqid); /* session.c */ extern uint64_t next_seqid_le(uint64_t *seq); -extern void pkt_init_pkt(struct cld_packet *dest, const struct cld_packet *src); extern guint sess_hash(gconstpointer v); extern gboolean sess_equal(gconstpointer _a, gconstpointer _b); -extern void msg_new_sess(struct msg_params *, const struct client *); -extern void msg_end_sess(struct msg_params *, const struct client *); +extern void msg_new_sess(int sock_fd, const struct client *cli, + const struct pkt_info *info); +extern void msg_end_sess(struct session *sess, uint64_t xid); extern struct raw_session *session_new_raw(const struct session *sess); extern void sessions_free(void); -extern bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen, - void (*done_cb)(struct session_outpkt *), - void *done_data); + +/** Send a message as part of a session. + * + * @param sess The session + * @param xdrproc The XDR function to use to serialize the data + * @param xdrdata The message data + * @param op The op of the message + * @param done_cb The callback to call when the message has been acked + * @param done_data The data to give to done_cb + * + * @return true only if the message was sent + */ +extern bool sess_sendmsg(struct session *sess, + xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op, + void (*done_cb)(struct session_outpkt *), void *done_data); + +/** Send a generic response message. + * + * @param sess The session + * @param code The error code to send + */ +extern void sess_sendresp_generic(struct session *sess, + enum cle_err_codes code); + extern int session_dispose(DB_TXN *txn, struct session *sess); extern int session_remove_locks(DB_TXN *txn, uint8_t *sid, uint64_t fh, cldino_t inum, bool *waiter); @@ -156,12 +178,27 @@ extern struct hail_log srv_log; extern struct timeval current_time; extern int udp_tx(int sock_fd, struct sockaddr *, socklen_t, const void *, size_t); -extern void resp_copy(struct cld_msg_resp *resp, const struct cld_msg_hdr *src); -extern void resp_err(struct session *sess, - const struct cld_msg_hdr *src, enum cle_err_codes errcode); -extern void resp_ok(struct session *sess, const struct cld_msg_hdr *src); extern const char *user_key(const char *user); +/** Transmit a single packet. + * + * This function doesn't provide error-retransmission logic. + * It can't handle messages that are bigger than a single packet. + * + * @param fd Socket to send the response on + * @param cli Client address data + * @param sid The session-id to use. Must be of length CLD_SID_SZ + * @param seqid The sequence id to use + * @param xdrproc The XDR function to use to serialize the data + * @param xdrdata The message data + * @param op The op of message to send + * + * @return true only on success + */ +extern void simple_sendmsg(int fd, const struct client *cli, + uint64_t sid, const char *username, uint64_t seqid, + xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op); + /* util.c */ extern int write_pid_file(const char *pid_fn); extern void syslogerr(const char *prefix); diff -X /garz/tmp/dontdiff -urNp cld/server/Makefile.am cld.rpcgen/server/Makefile.am --- cld/server/Makefile.am 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/server/Makefile.am 2010-02-02 23:10:54.000000000 -0500 @@ -1,5 +1,6 @@ INCLUDES = -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib \ @GLIB_CFLAGS@ \ -DCLD_LIBDIR=\""$(libdir)"\" \ -DCLD_LOCAL_STATE_DIR="\"$(localstatedir)\"" @@ -8,7 +9,8 @@ sbin_PROGRAMS = cld cldbadm cld_SOURCES = cldb.h cld.h \ ../lib/common.c ../lib/libtimer.c ../lib/pkt.c \ - cldb.c msg.c server.c session.c util.c + cldb.c msg.c server.c session.c util.c \ + ../lib/cld_msg_rpc_xdr.c cld_LDADD = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@ cldbadm_SOURCES = cldb.h cldbadm.c diff -X /garz/tmp/dontdiff -urNp cld/server/msg.c cld.rpcgen/server/msg.c --- cld/server/msg.c 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/server/msg.c 2010-02-03 01:23:17.000000000 -0500 @@ -25,6 +25,8 @@ #include <syslog.h> #include <openssl/sha.h> #include <cld-private.h> +#include <cld_common.h> +#include <cld_msg_rpc.h> #include "cld.h" enum { @@ -247,12 +249,11 @@ static int inode_notify(DB_TXN *txn, cld } memset(&me, 0, sizeof(me)); - memcpy(me.hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ); - me.hdr.op = CMO_EVENT; - me.fh = h.fh; - me.events = cpu_to_le32(deleted ? CE_DELETED : CE_UPDATED); + me.fh = le64_to_cpu(h.fh); + me.events = deleted ? CE_DELETED : CE_UPDATED; - if (!sess_sendmsg(sess, &me, sizeof(me), NULL, NULL)) + if (!sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_event, + (void *)&me, CMO_EVENT, NULL, NULL)) break; } @@ -375,12 +376,11 @@ int inode_lock_rescan(DB_TXN *txn, cldin } memset(&me, 0, sizeof(me)); - memcpy(me.hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ); - me.hdr.op = CMO_EVENT; - me.fh = lock.fh; - me.events = cpu_to_le32(CE_LOCKED); + me.fh = le64_to_cpu(lock.fh); + me.events = CE_LOCKED; - if (!sess_sendmsg(sess, &me, sizeof(me), NULL, NULL)) + if (!sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_event, + (void *)&me, CMO_EVENT, NULL, NULL)) break; } @@ -388,12 +388,10 @@ int inode_lock_rescan(DB_TXN *txn, cldin return rc; } -void msg_get(struct msg_params *mp, bool metadata_only) +void msg_get(struct session *sess, const void *v) { - const struct cld_msg_get *msg = mp->msg; - struct cld_msg_get_resp *resp; - size_t resp_len; - uint64_t fh; + const struct cld_msg_get *get = v; + struct cld_msg_get_resp resp; struct raw_handle *h = NULL; struct raw_inode *inode = NULL; enum cle_err_codes resp_rc = CLE_OK; @@ -401,17 +399,10 @@ void msg_get(struct msg_params *mp, bool uint32_t name_len, inode_size; uint32_t omode; int rc; - struct session *sess = mp->sess; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - void *p; - - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - /* get filehandle from input msg */ - fh = le64_to_cpu(msg->fh); + void *data_mem; + char *inode_name; rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { @@ -421,7 +412,7 @@ void msg_get(struct msg_params *mp, bool } /* read handle from db */ - rc = cldb_handle_get(txn, sess->sid, fh, &h, 0); + rc = cldb_handle_get(txn, sess->sid, get->fh, &h, 0); if (rc) { resp_rc = CLE_FH_INVAL; goto err_out; @@ -442,42 +433,30 @@ void msg_get(struct msg_params *mp, bool goto err_out; } - name_len = le32_to_cpu(inode->ino_len); inode_size = le32_to_cpu(inode->size); - - resp_len = sizeof(*resp) + name_len + - (metadata_only ? 0 : inode_size); - resp = alloca(resp_len); - if (!resp) { - resp_rc = CLE_OOM; - goto err_out; - } - - HAIL_DEBUG(&srv_log, "%s: sizeof(resp) %zu, name_len %u, " - "inode->size %u, resp_len %zu", - __func__, - sizeof(*resp), name_len, - inode_size, resp_len); + HAIL_DEBUG(&srv_log, "GET-DEBUG: inode->size %u\n", inode_size); /* return response containing inode metadata */ - memset(resp, 0, resp_len); - resp_copy(&resp->resp, mp->msg); - resp->inum = inode->inum; - resp->ino_len = inode->ino_len; - resp->size = inode->size; - resp->version = inode->version; - resp->time_create = inode->time_create; - resp->time_modify = inode->time_modify; - resp->flags = inode->flags; + memset(&resp, 0, sizeof(resp)); + resp.msg.code = CLE_OK; + resp.msg.xid_in = sess->msg_xid; + resp.inum = le64_to_cpu(inode->inum); + resp.vers = le64_to_cpu(inode->version); + resp.time_create = le64_to_cpu(inode->time_create); + resp.time_modify = le64_to_cpu(inode->time_modify); + resp.flags = le32_to_cpu(inode->flags); - p = (resp + 1); - memcpy(p, (inode + 1), name_len); + name_len = le32_to_cpu(inode->ino_len); + inode_name = alloca(name_len + 1); + snprintf(inode_name, name_len + 1, "%s", (char *)(inode + 1)); + resp.inode_name = inode_name; - p += name_len; + resp.data.data_len = 0; + resp.data.data_val = NULL; /* send data, if requested */ - if (!metadata_only) { - void *data_mem; + data_mem = NULL; + if (sess->msg_op == CMO_GET) { size_t data_mem_len; rc = cldb_data_get(txn, inum, &data_mem, &data_mem_len, @@ -486,22 +465,23 @@ void msg_get(struct msg_params *mp, bool /* treat not-found as zero length file, as we may * not yet have created the data record */ - if (rc == DB_NOTFOUND) { - resp->size = 0; - resp_len -= inode_size; - } else if (rc || (data_mem_len != inode_size)) { - if (!rc) - free(data_mem); - resp_rc = CLE_DB_ERR; - goto err_out; - } else { - memcpy(p, data_mem, data_mem_len); - - free(data_mem); + if (rc != DB_NOTFOUND) { + if (rc || (data_mem_len != inode_size)) { + if (!rc) + free(data_mem); + resp_rc = CLE_DB_ERR; + goto err_out; + } else { + resp.data.data_len = data_mem_len; + resp.data.data_val = data_mem; + } } } - sess_sendmsg(sess, resp, resp_len, NULL, NULL); + sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_get_resp, + (void *)&resp, CMO_GET, NULL, NULL); + if (data_mem) + free(data_mem); rc = txn->commit(txn, 0); if (rc) @@ -516,15 +496,14 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_get txn abort"); err_out_noabort: - resp_err(sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(h); free(inode); } -void msg_open(struct msg_params *mp) +void msg_open(struct session *sess, const void *v) { - const struct cld_msg_open *msg = mp->msg; - struct session *sess = mp->sess; + const struct cld_msg_open *open = v; struct cld_msg_open_resp resp; const char *name; struct raw_session *raw_sess = NULL; @@ -535,24 +514,12 @@ void msg_open(struct msg_params *mp) struct pathname_info pinfo; void *parent_data = NULL; size_t parent_len; - uint32_t msg_mode, msg_events; uint64_t fh; cldino_t inum; enum cle_err_codes resp_rc = CLE_OK; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - msg_mode = le32_to_cpu(msg->mode); - msg_events = le32_to_cpu(msg->events); - name_len = le16_to_cpu(msg->name_len); - - if (mp->msg_len < (sizeof(*msg) + name_len)) - return; - rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { dbenv->err(dbenv, rc, "DB_ENV->txn_begin"); @@ -560,11 +527,12 @@ void msg_open(struct msg_params *mp) goto err_out_noabort; } - name = mp->msg + sizeof(*msg); + name = open->inode_name; + name_len = strlen(name); - create = msg_mode & COM_CREATE; - excl = msg_mode & COM_EXCL; - do_dir = msg_mode & COM_DIRECTORY; + create = open->mode & COM_CREATE; + excl = open->mode & COM_EXCL; + do_dir = open->mode & COM_DIRECTORY; if (!valid_inode_name(name, name_len) || (create && name_len < 2)) { resp_rc = CLE_NAME_INVAL; @@ -662,7 +630,7 @@ void msg_open(struct msg_params *mp) inum = cldino_from_le(inode->inum); /* alloc & init new handle; updates session's next_fh */ - h = cldb_handle_new(sess, inum, msg_mode, msg_events); + h = cldb_handle_new(sess, inum, open->mode, open->events); if (!h) { HAIL_CRIT(&srv_log, "cannot allocate handle"); resp_rc = CLE_OOM; @@ -717,11 +685,12 @@ void msg_open(struct msg_params *mp) free(raw_sess); free(h); - resp_copy(&resp.resp, mp->msg); - resp.resp.code = cpu_to_le32(CLE_OK); - resp.fh = cpu_to_le64(fh); - sess_sendmsg(sess, &resp, sizeof(resp), NULL, NULL); - + memset(&resp, 0, sizeof(resp)); + resp.msg.xid_in = sess->msg_xid; + resp.msg.code = CLE_OK; + resp.fh = fh; + sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_open_resp, + (void *)&resp, CMO_OPEN, NULL, NULL); return; err_out: @@ -729,7 +698,7 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_open txn abort"); err_out_noabort: - resp_err(mp->sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(parent_data); free(parent); free(inode); @@ -737,45 +706,18 @@ err_out_noabort: free(h); } -void msg_put(struct msg_params *mp) +void msg_put(struct session *sess, const void *v) { - const struct cld_msg_put *msg = mp->msg; - struct session *sess = mp->sess; - uint64_t fh; + const struct cld_msg_put *put = v; struct raw_handle *h = NULL; struct raw_inode *inode = NULL; enum cle_err_codes resp_rc = CLE_OK; - const void *mem; int rc; cldino_t inum; - uint32_t omode, data_size; + uint32_t omode; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - /* make sure input data as large as message header */ - if (mp->msg_len < sizeof(*msg)) - return; - - /* make sure additional input data as large as expected */ - data_size = le32_to_cpu(msg->data_size); - if (data_size > CLD_MAX_PAYLOAD_SZ) { - HAIL_ERR(&srv_log, "%s: can't PUT %d bytes of data: " - "%d is the maximum.\n", - __func__, data_size, CLD_MAX_PAYLOAD_SZ); - resp_rc = CLE_BAD_PKT; - goto err_out_noabort; - } - if (mp->msg_len != (data_size + sizeof(*msg))) { - HAIL_INFO(&srv_log, "PUT len mismatch: msg len %zu, " - "wanted %zu + %u (== %zu)", - mp->msg_len, - sizeof(*msg), data_size, data_size + sizeof(*msg)); - resp_rc = CLE_BAD_PKT; - goto err_out_noabort; - } - - fh = le64_to_cpu(msg->fh); - rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { dbenv->err(dbenv, rc, "DB_ENV->txn_begin"); @@ -784,7 +726,7 @@ void msg_put(struct msg_params *mp) } /* read handle from db */ - rc = cldb_handle_get(txn, sess->sid, fh, &h, 0); + rc = cldb_handle_get(txn, sess->sid, put->fh, &h, 0); if (rc) { resp_rc = CLE_FH_INVAL; goto err_out; @@ -807,15 +749,14 @@ void msg_put(struct msg_params *mp) } /* store contig. data area in db */ - mem = (msg + 1); rc = cldb_data_put(txn, inum, - mem, data_size, 0); + put->data.data_val, put->data.data_len, 0); if (rc) { resp_rc = CLE_DB_ERR; goto err_out; } - inode->size = cpu_to_le32(data_size); + inode->size = cpu_to_le32(put->data.data_len); /* update inode */ rc = inode_touch(txn, inode); @@ -831,7 +772,7 @@ void msg_put(struct msg_params *mp) goto err_out_noabort; } - resp_ok(sess, mp->msg); + sess_sendresp_generic(sess, CLE_OK); free(h); free(inode); @@ -842,31 +783,23 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_put txn abort"); err_out_noabort: - resp_err(sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(h); free(inode); } -void msg_close(struct msg_params *mp) +void msg_close(struct session *sess, const void *v) { - const struct cld_msg_close *msg = mp->msg; - uint64_t fh; + const struct cld_msg_close *close = v; int rc; enum cle_err_codes resp_rc = CLE_OK; struct raw_handle *h = NULL; cldino_t lock_inum = 0; bool waiter = false; - struct session *sess = mp->sess; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - fh = le64_to_cpu(msg->fh); - rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { dbenv->err(dbenv, rc, "DB_ENV->txn_begin"); @@ -875,7 +808,7 @@ void msg_close(struct msg_params *mp) } /* read handle from db */ - rc = cldb_handle_get(txn, sess->sid, fh, &h, DB_RMW); + rc = cldb_handle_get(txn, sess->sid, close->fh, &h, DB_RMW); if (rc) { if (rc == DB_NOTFOUND) resp_rc = CLE_FH_INVAL; @@ -888,7 +821,7 @@ void msg_close(struct msg_params *mp) lock_inum = cldino_from_le(h->inum); /* delete handle from db */ - rc = cldb_handle_del(txn, sess->sid, fh); + rc = cldb_handle_del(txn, sess->sid, close->fh); if (rc) { resp_rc = CLE_DB_ERR; goto err_out; @@ -896,7 +829,7 @@ void msg_close(struct msg_params *mp) /* remove locks, if any */ rc = session_remove_locks(txn, sess->sid, - fh, lock_inum, &waiter); + close->fh, lock_inum, &waiter); if (rc) { resp_rc = CLE_DB_ERR; goto err_out; @@ -918,7 +851,7 @@ void msg_close(struct msg_params *mp) goto err_out_noabort; } - resp_ok(sess, mp->msg); + sess_sendresp_generic(sess, CLE_OK); free(h); return; @@ -927,16 +860,15 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_close txn abort"); err_out_noabort: - resp_err(sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(h); } -void msg_del(struct msg_params *mp) +void msg_del(struct session *sess, const void *v) { - const struct cld_msg_del *msg = mp->msg; + const struct cld_msg_del *del = v; enum cle_err_codes resp_rc = CLE_OK; int rc, name_len; - const char *name; struct pathname_info pinfo; struct raw_inode *parent = NULL, *ino = NULL; void *parent_data = NULL; @@ -949,23 +881,13 @@ void msg_del(struct msg_params *mp) DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - name_len = le16_to_cpu(msg->name_len); - - if (mp->msg_len < (sizeof(*msg) + name_len)) - return; - - name = mp->msg + sizeof(*msg); - - if (!valid_inode_name(name, name_len) || (name_len < 2)) { + name_len = strlen(del->inode_name); + if (!valid_inode_name(del->inode_name, name_len) || (name_len < 2)) { resp_rc = CLE_NAME_INVAL; goto err_out_noabort; } - pathname_parse(name, name_len, &pinfo); + pathname_parse(del->inode_name, name_len, &pinfo); rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { @@ -991,7 +913,8 @@ void msg_del(struct msg_params *mp) } /* read inode to be deleted */ - rc = cldb_inode_get_byname(txn, name, name_len, &ino, false, 0); + rc = cldb_inode_get_byname(txn, del->inode_name, name_len, + &ino, false, 0); if (rc) { if (rc == DB_NOTFOUND) resp_rc = CLE_NAME_INVAL; @@ -1100,7 +1023,7 @@ void msg_del(struct msg_params *mp) goto err_out_noabort; } - resp_ok(mp->sess, mp->msg); + sess_sendresp_generic(sess, CLE_OK); free(ino); free(parent); free(parent_data); @@ -1111,31 +1034,23 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_del txn abort"); err_out_noabort: - resp_err(mp->sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(ino); free(parent); free(parent_data); } -void msg_unlock(struct msg_params *mp) +void msg_unlock(struct session *sess, const void *v) { - const struct cld_msg_unlock *msg = mp->msg; - uint64_t fh; + const struct cld_msg_unlock *unlock = v; struct raw_handle *h = NULL; cldino_t inum; int rc; enum cle_err_codes resp_rc = CLE_OK; uint32_t omode; - struct session *sess = mp->sess; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - fh = le64_to_cpu(msg->fh); - rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { dbenv->err(dbenv, rc, "DB_ENV->txn_begin"); @@ -1144,7 +1059,7 @@ void msg_unlock(struct msg_params *mp) } /* read handle from db */ - rc = cldb_handle_get(txn, sess->sid, fh, &h, 0); + rc = cldb_handle_get(txn, sess->sid, unlock->fh, &h, 0); if (rc) { resp_rc = CLE_FH_INVAL; goto err_out; @@ -1159,7 +1074,7 @@ void msg_unlock(struct msg_params *mp) } /* attempt to given lock on filehandle */ - rc = cldb_lock_del(txn, sess->sid, fh, inum); + rc = cldb_lock_del(txn, sess->sid, unlock->fh, inum); if (rc) { resp_rc = CLE_LOCK_INVAL; goto err_out; @@ -1172,7 +1087,7 @@ void msg_unlock(struct msg_params *mp) goto err_out_noabort; } - resp_ok(sess, mp->msg); + sess_sendresp_generic(sess, CLE_OK); free(h); return; @@ -1181,30 +1096,22 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_unlock txn abort"); err_out_noabort: - resp_err(sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(h); } -void msg_lock(struct msg_params *mp, bool wait) +void msg_lock(struct session *sess, const void *v) { - const struct cld_msg_lock *msg = mp->msg; - uint64_t fh; + const struct cld_msg_lock *lock = v; + bool wait = (sess->msg_op == CMO_LOCK); struct raw_handle *h = NULL; cldino_t inum; int rc; enum cle_err_codes resp_rc = CLE_OK; - uint32_t lock_flags, omode; + uint32_t omode; bool acquired = false; DB_ENV *dbenv = cld_srv.cldb.env; DB_TXN *txn; - struct session *sess = mp->sess; - - /* make sure input data as large as expected */ - if (mp->msg_len < sizeof(*msg)) - return; - - fh = le64_to_cpu(msg->fh); - lock_flags = le32_to_cpu(msg->flags); rc = dbenv->txn_begin(dbenv, NULL, &txn, 0); if (rc) { @@ -1214,7 +1121,7 @@ void msg_lock(struct msg_params *mp, boo } /* read handle from db */ - rc = cldb_handle_get(txn, sess->sid, fh, &h, 0); + rc = cldb_handle_get(txn, sess->sid, lock->fh, &h, 0); if (rc) { resp_rc = CLE_FH_INVAL; goto err_out; @@ -1229,8 +1136,8 @@ void msg_lock(struct msg_params *mp, boo } /* attempt to add lock */ - rc = cldb_lock_add(txn, sess->sid, fh, inum, - lock_flags & CLF_SHARED, wait, &acquired); + rc = cldb_lock_add(txn, sess->sid, lock->fh, inum, + lock->flags & CLF_SHARED, wait, &acquired); if (rc) { if (rc == DB_KEYEXIST) resp_rc = CLE_LOCK_CONFLICT; @@ -1253,7 +1160,7 @@ void msg_lock(struct msg_params *mp, boo } /* lock was acquired immediately */ - resp_ok(mp->sess, mp->msg); + sess_sendresp_generic(sess, CLE_OK); free(h); return; @@ -1262,7 +1169,7 @@ err_out: if (rc) dbenv->err(dbenv, rc, "msg_lock txn abort"); err_out_noabort: - resp_err(mp->sess, mp->msg, resp_rc); + sess_sendresp_generic(sess, resp_rc); free(h); } diff -X /garz/tmp/dontdiff -urNp cld/server/server.c cld.rpcgen/server/server.c --- cld/server/server.c 2010-02-04 01:54:16.000000000 -0500 +++ cld.rpcgen/server/server.c 2010-02-03 16:49:16.000000000 -0500 @@ -37,6 +37,7 @@ #include <openssl/hmac.h> #include <cld-private.h> #include "cld.h" +#include <cld_pkt.h> #define PROGRAM_NAME "cld" @@ -125,8 +126,6 @@ int udp_tx(int sock_fd, struct sockaddr { ssize_t src; - HAIL_DEBUG(&srv_log, "%s, fd %d", __func__, sock_fd); - src = sendto(sock_fd, data, data_len, 0, addr, addr_len); if (src < 0 && errno != EAGAIN) HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s", @@ -139,36 +138,6 @@ int udp_tx(int sock_fd, struct sockaddr return 0; } -void resp_copy(struct cld_msg_resp *resp, const struct cld_msg_hdr *src) -{ - memcpy(&resp->hdr, src, sizeof(*src)); - resp->code = 0; - resp->rsv = 0; - resp->xid_in = src->xid; -} - -void resp_err(struct session *sess, - const struct cld_msg_hdr *src, enum cle_err_codes errcode) -{ - struct cld_msg_resp resp; - - resp_copy(&resp, src); - __cld_rand64(&resp.hdr.xid); - resp.code = cpu_to_le32(errcode); - - if (sess->sock_fd <= 0) { - HAIL_ERR(&srv_log, "Nul sock in response"); - return; - } - - sess_sendmsg(sess, &resp, sizeof(resp), NULL, NULL); -} - -void resp_ok(struct session *sess, const struct cld_msg_hdr *src) -{ - resp_err(sess, src, CLE_OK); -} - const char *user_key(const char *user) { /* TODO: better auth scheme. @@ -181,266 +150,426 @@ const char *user_key(const char *user) return user; /* our secret key */ } -static void show_msg(const struct cld_msg_hdr *msg) +static int udp_rx_handle(struct session *sess, + void (*msg_handler)(struct session *sess, const void *), + xdrproc_t xdrproc, void *xdrdata) +{ + XDR xin; + xdrmem_create(&xin, sess->msg_buf, sess->msg_buf_len, XDR_DECODE); + if (!xdrproc(&xin, xdrdata)) { + HAIL_DEBUG(&srv_log, "%s: couldn't parse %s message", + __func__, __cld_opstr(sess->msg_op)); + xdr_destroy(&xin); + return CLE_BAD_PKT; + } + msg_handler(sess, xdrdata); + xdr_free(xdrproc, xdrdata); + xdr_destroy(&xin); + return 0; +} + +/** Recieve a UDP packet + * + * @param sock_fd The UDP socket we received the packet on + * @param cli Client address data + * @param info Packet information + * @param raw_pkt The raw packet buffer + * @param raw_len Length of the raw packet buffer + * + * @return An error code if we should send an error message + * response. CLE_OK if we are done. + */ +static enum cle_err_codes udp_rx(int sock_fd, + const struct client *cli, struct pkt_info *info, + const char *raw_pkt, size_t raw_len) { - switch (msg->op) { - case CMO_NOP: - case CMO_NEW_SESS: - case CMO_OPEN: - case CMO_GET_META: + struct cld_pkt_hdr *pkt = info->pkt; + struct session *sess = info->sess; + + if (sess) { + size_t msg_len; + + /* advance sequence id's and update last-contact timestamp */ + sess->last_contact = current_time.tv_sec; + sess->sock_fd = sock_fd; + + if (info->op != CMO_ACK) { + /* received message - update session */ + sess->next_seqid_in++; + } + + /* copy message fragment into reassembly buffer */ + if (pkt->mi.order & CLD_PKT_IS_FIRST) { + sess->msg_op = info->op; + sess->msg_xid = info->xid; + sess->msg_buf_len = 0; + } + msg_len = raw_len - info->hdr_len - CLD_PKT_FTR_LEN; + if ((sess->msg_buf_len + msg_len) > CLD_MAX_MSG_SZ) + return CLE_BAD_PKT; + + memcpy(sess->msg_buf + sess->msg_buf_len, + raw_pkt + info->hdr_len, msg_len); + sess->msg_buf_len += msg_len; + } + + if (!(pkt->mi.order & CLD_PKT_IS_LAST)) { + struct cld_msg_ack_frag ack; + ack.seqid = info->seqid; + + /* transmit ack-partial-msg response (once, without retries) */ + simple_sendmsg(sock_fd, cli, pkt->sid, + pkt->user, 0xdeadbeef, + (xdrproc_t)xdr_cld_msg_ack_frag, (void *)&ack, + CMO_ACK_FRAG); + return CLE_OK; + } + + /* Handle a complete message */ + switch (info->op) { case CMO_GET: - case CMO_PUT: - case CMO_CLOSE: - case CMO_DEL: - case CMO_LOCK: - case CMO_UNLOCK: + /* fall through */ + case CMO_GET_META: { + struct cld_msg_get get = {0}; + return udp_rx_handle(sess, msg_get, + (xdrproc_t)xdr_cld_msg_get, &get); + } + case CMO_OPEN: { + struct cld_msg_open open = {0}; + return udp_rx_handle(sess, msg_open, + (xdrproc_t)xdr_cld_msg_open, &open); + } + case CMO_PUT: { + struct cld_msg_put put = {0}; + return udp_rx_handle(sess, msg_put, + (xdrproc_t)xdr_cld_msg_put, &put); + } + case CMO_CLOSE: { + struct cld_msg_close close = {0}; + return udp_rx_handle(sess, msg_close, + (xdrproc_t)xdr_cld_msg_close, &close); + } + case CMO_DEL: { + struct cld_msg_del del = {0}; + return udp_rx_handle(sess, msg_del, + (xdrproc_t)xdr_cld_msg_del, &del); + } + case CMO_UNLOCK: { + struct cld_msg_unlock unlock = {0}; + return udp_rx_handle(sess, msg_unlock, + (xdrproc_t)xdr_cld_msg_unlock, &unlock); + } case CMO_TRYLOCK: + /* fall through */ + case CMO_LOCK: { + struct cld_msg_lock lock = {0}; + return udp_rx_handle(sess, msg_lock, + (xdrproc_t)xdr_cld_msg_lock, &lock); + } case CMO_ACK: + msg_ack(sess, info->seqid); + return 0; + case CMO_NOP: + sess_sendresp_generic(sess, CLE_OK); + return 0; + case CMO_NEW_SESS: + msg_new_sess(sock_fd, cli, info); + return 0; case CMO_END_SESS: - case CMO_PING: - case CMO_NOT_MASTER: - case CMO_EVENT: - case CMO_ACK_FRAG: - HAIL_DEBUG(&srv_log, "msg: op %s, xid %llu", - __cld_opstr(msg->op), - (unsigned long long) le64_to_cpu(msg->xid)); - break; + msg_end_sess(sess, info->xid); + return 0; + default: + HAIL_DEBUG(&srv_log, "%s: unexpected %s packet", + __func__, __cld_opstr(info->op)); + /* do nothing */ + return 0; } } -static void udp_rx_msg(const struct client *cli, const struct cld_packet *pkt, - const struct cld_msg_hdr *msg, struct msg_params *mp) +/** Parse a packet's header. Verify that the magic number is correct. + * + * @param raw_pkt Pointer to the packet data + * @param raw_len Length of the raw data + * @param pkt (out param) the packet header + * @param hdr_len (out param) the length of the packet header + * + * @return true on success; false if this packet is garbage + */ +static bool parse_pkt_header(const char *raw_pkt, int raw_len, + struct cld_pkt_hdr *pkt, ssize_t *hdr_len) { - struct session *sess = mp->sess; + XDR xin; + static const char * const magic = CLD_PKT_MAGIC; - if (srv_log.verbose) - show_msg(msg); + if (raw_len <= CLD_PKT_FTR_LEN) { + HAIL_DEBUG(&srv_log, "%s: packet is too short: only " + "%d bytes", __func__, raw_len); + return false; + } + xdrmem_create(&xin, (void *)raw_pkt, raw_len - CLD_PKT_FTR_LEN, + XDR_DECODE); + memset(pkt, 0, sizeof(*pkt)); + if (!xdr_cld_pkt_hdr(&xin, pkt)) { + HAIL_DEBUG(&srv_log, "%s: couldn't parse packet header", + __func__); + xdr_destroy(&xin); + return false; + } + *hdr_len = xdr_getpos(&xin); + xdr_destroy(&xin); + + if (memcmp((void *)&pkt->magic, magic, sizeof(pkt->magic))) { + HAIL_DEBUG(&srv_log, "%s: bad magic number", __func__); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)pkt); + return false; + } + + return true; +} + +/** Look up some information about a packet, including its session and the + * type of message it carries. + * + * @param pkt The packet's header + * @param raw_pkt Pointer to the raw packet data + * @param raw_len Length of the raw packet data + * @param info (out param) Information about the packet + * + * @return true on success; false if this packet is garbage + */ +static bool get_pkt_info(struct cld_pkt_hdr *pkt, + const char *raw_pkt, size_t raw_len, + size_t hdr_len, struct pkt_info *info) +{ + struct cld_pkt_ftr *foot; + struct session *s; + + memset(info, 0, sizeof(info)); + info->pkt = pkt; + info->sess = s = g_hash_table_lookup(cld_srv.sessions, &pkt->sid); + foot = (struct cld_pkt_ftr *) + (raw_pkt + (raw_len - CLD_PKT_FTR_LEN)); + info->seqid = le64_to_cpu(foot->seqid); + + if (pkt->mi.order & CLD_PKT_IS_FIRST) { + info->xid = pkt->mi.cld_pkt_msg_info_u.mi.xid; + info->op = pkt->mi.cld_pkt_msg_info_u.mi.op; + } else { + if (!s) { + HAIL_DEBUG(&srv_log, "%s: packet is not first, " + "but also not part of an existing session. " + "Protocol error.", __func__); + return false; + } + info->xid = s->msg_xid; + info->op = s->msg_op; + } + info->hdr_len = hdr_len; + return true; +} - switch(msg->op) { - case CMO_NOP: - resp_ok(sess, msg); - break; +/** Verify that the client session matches IP and username + * + * @param info Packet information + * @param cli Client address data + * + * @return 0 on success; error code otherwise + */ +static enum cle_err_codes validate_pkt_session(const struct pkt_info *info, + const struct client *cli) +{ + struct session *sess = info->sess; - case CMO_NEW_SESS: msg_new_sess(mp, cli); break; - case CMO_END_SESS: msg_end_sess(mp, cli); break; - case CMO_OPEN: msg_open(mp); break; - case CMO_GET: msg_get(mp, false); break; - case CMO_GET_META: msg_get(mp, true); break; - case CMO_PUT: msg_put(mp); break; - case CMO_CLOSE: msg_close(mp); break; - case CMO_DEL: msg_del(mp); break; - case CMO_UNLOCK: msg_unlock(mp); break; - case CMO_LOCK: msg_lock(mp, true); break; - case CMO_TRYLOCK: msg_lock(mp, false); break; - case CMO_ACK: msg_ack(mp); break; + if (!sess) { + /* Packets that don't belong to a session must be new-session + * packets attempting to establish a session. */ + if (info->op != CMO_NEW_SESS) { + HAIL_DEBUG(&srv_log, "%s: packet doesn't belong to a " + "session,but has type %d", + __func__, info->op); + return CLE_SESS_INVAL; + } + return 0; + } - default: - /* do nothing */ - break; + if (info->op == CMO_NEW_SESS) { + HAIL_DEBUG(&srv_log, "%s: Tried to create a new session, " + "but a session with that ID already exists.", + __func__); + return CLE_SESS_EXISTS; } -} -static void pkt_ack_frag(int sock_fd, - const struct client *cli, - const struct cld_packet *pkt) -{ - size_t alloc_len; - struct cld_packet *outpkt; - struct cld_msg_ack_frag *ack_msg; - void *p; - const char *secret_key; + /* verify that client session matches IP */ + if ((sess->addr_len != cli->addr_len) || + memcmp(&sess->addr, &cli->addr, sess->addr_len)) { + HAIL_DEBUG(&srv_log, "%s: sess->addr doesn't match packet " + "addr", __func__); + return CLE_SESS_INVAL; + } + + /* verify that client session matches username */ + if (strncmp(info->pkt->user, sess->user, CLD_MAX_USERNAME)) { + HAIL_DEBUG(&srv_log, "%s: session doesn't match packet's " + "username", __func__); + return CLE_SESS_INVAL; + } + + if (sess->dead) { + HAIL_DEBUG(&srv_log, "%s: packet session is dead", + __func__); + return CLE_SESS_INVAL; + } + + return 0; +} - alloc_len = sizeof(*outpkt) + sizeof(*ack_msg) + SHA_DIGEST_LENGTH; - outpkt = alloca(alloc_len); - ack_msg = (struct cld_msg_ack_frag *) (outpkt + 1); - memset(outpkt, 0, alloc_len); - - pkt_init_pkt(outpkt, pkt); - - memcpy(ack_msg->hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ); - __cld_rand64(&ack_msg->hdr.xid); - ack_msg->hdr.op = CMO_ACK_FRAG; - ack_msg->seqid = pkt->seqid; - - p = outpkt; - secret_key = user_key(outpkt->user); - __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH, - p + alloc_len - SHA_DIGEST_LENGTH); - - HAIL_DEBUG(&srv_log, "%s: " - "sid " SIDFMT ", op %s, seqid %llu", - __func__, - SIDARG(outpkt->sid), __cld_opstr(ack_msg->hdr.op), - (unsigned long long) le64_to_cpu(outpkt->seqid)); - - /* transmit ack-partial-msg response (once, without retries) */ - udp_tx(sock_fd, (struct sockaddr *) &cli->addr, cli->addr_len, - outpkt, alloc_len); -} - -static void udp_rx(int sock_fd, - const struct client *cli, - const void *raw_pkt, size_t pkt_len) -{ - const struct cld_packet *pkt = raw_pkt; - struct cld_packet *outpkt; - const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1); - struct session *sess = NULL; - enum cle_err_codes resp_rc = CLE_OK; - struct cld_msg_resp *resp; - struct msg_params mp; - size_t alloc_len; - uint32_t pkt_flags; - bool first_frag, last_frag, have_new_sess, have_ack, have_put; +/** Check a packet's cryptographic signature + * + * @param raw_pkt Pointer to the packet data + * @param raw_len Length of the raw data + * @param pkt the packet header + * + * @return 0 on success; error code otherwise + */ +static enum cle_err_codes validate_pkt_signature(const char *raw_pkt, + int raw_len, const struct cld_pkt_hdr *pkt) +{ + struct cld_pkt_ftr *foot; const char *secret_key; int auth_rc; - void *p; + foot = (struct cld_pkt_ftr *) + (raw_pkt + (raw_len - CLD_PKT_FTR_LEN)); secret_key = user_key(pkt->user); - /* verify pkt data integrity and credentials via HMAC signature */ auth_rc = __cld_authcheck(&srv_log, secret_key, raw_pkt, - pkt_len - SHA_DIGEST_LENGTH, - raw_pkt + pkt_len - SHA_DIGEST_LENGTH); + raw_len - SHA_DIGEST_LENGTH, + foot->sha); if (auth_rc) { HAIL_DEBUG(&srv_log, "auth failed, code %d", auth_rc); - resp_rc = CLE_SIG_INVAL; - goto err_out; - } - - pkt_flags = le32_to_cpu(pkt->flags); - first_frag = pkt_flags & CPF_FIRST; - last_frag = pkt_flags & CPF_LAST; - have_new_sess = first_frag && (msg->op == CMO_NEW_SESS); - have_ack = first_frag && (msg->op == CMO_ACK); - have_put = first_frag && (msg->op == CMO_PUT); - - /* look up client session, verify it matches IP and username */ - sess = g_hash_table_lookup(cld_srv.sessions, pkt->sid); - if (sess && - ((sess->addr_len != cli->addr_len) || - memcmp(&sess->addr, &cli->addr, sess->addr_len) || - strncmp(pkt->user, sess->user, CLD_MAX_USERNAME) || - sess->dead)) { - resp_rc = CLE_SESS_INVAL; - goto err_out; + return CLE_SIG_INVAL; } - mp.sock_fd = sock_fd; - mp.cli = cli; - mp.sess = sess; - mp.pkt = pkt; - mp.msg = msg; - mp.msg_len = pkt_len - sizeof(*pkt) - SHA_DIGEST_LENGTH; - - HAIL_DEBUG(&srv_log, "%s pkt: len %zu, seqid %llu, sid " SIDFMT ", " - "flags %s%s, user %s", - __func__, - pkt_len, (unsigned long long) le64_to_cpu(pkt->seqid), - SIDARG(pkt->sid), - first_frag ? "F" : "", last_frag ? "L" : "", - pkt->user); - - /* advance sequence id's and update last-contact timestamp */ - if (!have_new_sess) { - if (!sess) { - resp_rc = CLE_SESS_INVAL; - goto err_out; - } - - sess->last_contact = current_time.tv_sec; - sess->sock_fd = sock_fd; - - if (!have_ack) { - /* eliminate duplicates; do not return any response */ - if (le64_to_cpu(pkt->seqid) != sess->next_seqid_in) { - HAIL_DEBUG(&srv_log, "%s: dropping dup", __func__); - return; - } + return 0; +} - /* received message - update session */ - sess->next_seqid_in++; - } - } else { - if (sess) { - /* eliminate duplicates; do not return any response */ - if (le64_to_cpu(pkt->seqid) != sess->next_seqid_in) { - HAIL_DEBUG(&srv_log, "%s: dropping dup", __func__); - return; - } +/** Check if this packet is a duplicate + * + * @param info Packet info + * + * @return true only if the packet is a duplicate + */ +static bool packet_is_dupe(const struct pkt_info *info) +{ + if (!info->sess) + return false; + if (info->op == CMO_ACK) + return false; - resp_rc = CLE_SESS_EXISTS; - goto err_out; - } + /* Check sequence IDs to discover if this packet is a dupe */ + if (info->seqid != info->sess->next_seqid_in) { + HAIL_DEBUG(&srv_log, "dropping dup with seqid %llu " + "(expected %llu).", + (unsigned long long) info->seqid, + (unsigned long long) info->sess->next_seqid_in); + return true; } - /* copy message fragment into reassembly buffer */ - if (sess) { - if (first_frag) - sess->msg_buf_len = 0; - - if ((sess->msg_buf_len + mp.msg_len) > CLD_MAX_MSG_SZ) { - resp_rc = CLE_BAD_PKT; - goto err_out; - } - - memcpy(&sess->msg_buf[sess->msg_buf_len], msg, mp.msg_len); - sess->msg_buf_len += mp.msg_len; - - if (!last_frag) { - pkt_ack_frag(sock_fd, cli, pkt); - return; - } + return false; +} - mp.msg = msg = (struct cld_msg_hdr *) sess->msg_buf; - mp.msg_len = sess->msg_buf_len; +void simple_sendmsg(int fd, const struct client *cli, + uint64_t sid, const char *user, uint64_t seqid, + xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op) +{ + XDR xhdr, xmsg; + struct cld_pkt_hdr pkt; + struct cld_pkt_msg_infos *infos; + struct cld_pkt_ftr *foot; + const char *secret_key; + char *buf; + size_t msg_len, hdr_len, buf_len; + int auth_rc; - if ((srv_log.verbose > 1) && !first_frag) - HAIL_DEBUG(&srv_log, " final message size %u", - sess->msg_buf_len); + /* Set up the packet header */ + memset(&pkt, 0, sizeof(cld_pkt_hdr)); + memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic)); + pkt.sid = sid; + pkt.user = (char *)user; + pkt.mi.order = CLD_PKT_ORD_FIRST_LAST; + infos = &pkt.mi.cld_pkt_msg_info_u.mi; + __cld_rand64(&infos->xid); + infos->op = op; + + /* Determine sizes */ + msg_len = xdr_sizeof(xdrproc, (void *)xdrdata); + if (msg_len > CLD_MAX_MSG_SZ) { + HAIL_ERR(&srv_log, "%s: tried to put %d message bytes in a " + "single packet. Maximum message bytes per packet " + "is %d", __func__, msg_len, CLD_MAX_PKT_MSG_SZ); + return; } + hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, &pkt); + buf_len = msg_len + hdr_len + CLD_PKT_FTR_LEN; + buf = alloca(buf_len); + + /* Serialize data */ + xdrmem_create(&xhdr, buf, hdr_len, XDR_ENCODE); + if (!xdr_cld_pkt_hdr(&xhdr, &pkt)) { + xdr_destroy(&xhdr); + HAIL_ERR(&srv_log, "%s: xdr_cld_pkt_hdr failed", + __func__); + return; + } + xdr_destroy(&xhdr); + xdrmem_create(&xmsg, buf + hdr_len, msg_len, XDR_ENCODE); + if (!xdrproc(&xmsg, (void *)xdrdata)) { + xdr_destroy(&xmsg); + HAIL_ERR(&srv_log, "%s: xdrproc failed", __func__); + return; + } + xdr_destroy(&xmsg); - if (last_frag) - udp_rx_msg(cli, pkt, msg, &mp); - return; - -err_out: - /* transmit error response (once, without retries) */ - alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH; - outpkt = alloca(alloc_len); - resp = (struct cld_msg_resp *) (outpkt + 1); - memset(outpkt, 0, alloc_len); - - pkt_init_pkt(outpkt, pkt); - - resp_copy(resp, msg); - resp->code = cpu_to_le32(resp_rc); - - p = outpkt; - secret_key = user_key(outpkt->user); - __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH, - p + alloc_len - SHA_DIGEST_LENGTH); - - HAIL_DEBUG(&srv_log, "%s err: " - "sid " SIDFMT ", op %s, seqid %llu, code %d", - __func__, - SIDARG(outpkt->sid), __cld_opstr(resp->hdr.op), - (unsigned long long) le64_to_cpu(outpkt->seqid), - resp_rc); - - udp_tx(sock_fd, (struct sockaddr *) &cli->addr, cli->addr_len, - outpkt, alloc_len); + foot = (struct cld_pkt_ftr *) + (buf + (buf_len - SHA_DIGEST_LENGTH)); + foot->seqid = cpu_to_le64(seqid); + secret_key = user_key(user); + + auth_rc =__cld_authsign(&srv_log, secret_key, buf, + buf_len - SHA_DIGEST_LENGTH, + foot->sha); + if (auth_rc) + HAIL_ERR(&srv_log, "%s: authsign failed: %d", + __func__, auth_rc); + + udp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len, + buf, buf_len); +} + +static void simple_sendresp(int sock_fd, const struct client *cli, + const struct pkt_info *info, enum cle_err_codes code) +{ + const struct cld_pkt_hdr *pkt = info->pkt; + struct cld_msg_generic_resp resp; + resp.code = code; + resp.xid_in = info->xid; + + simple_sendmsg(sock_fd, cli, pkt->sid, pkt->user, info->seqid, + (xdrproc_t)xdr_cld_msg_generic_resp, (void *)&resp, + info->op); } static bool udp_srv_event(int fd, short events, void *userdata) { struct client cli; char host[64]; - ssize_t rrc; + ssize_t rrc, hdr_len; struct msghdr hdr; struct iovec iov[2]; - uint8_t raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ]; - struct cld_packet *pkt = (struct cld_packet *) raw_pkt; + char raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ]; + struct cld_pkt_hdr pkt; + struct pkt_info info; + enum cle_err_codes err; memset(&cli, 0, sizeof(cli)); @@ -470,46 +599,52 @@ static bool udp_srv_event(int fd, short HAIL_DEBUG(&srv_log, "client %s message (%d bytes)", host, (int) rrc); - /* if it is complete garbage, drop immediately */ - if ((rrc < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) || - (memcmp(pkt->magic, CLD_PKT_MAGIC, sizeof(pkt->magic)))) { + if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) { cld_srv.stats.garbage++; - return true; /* continue main loop; do NOT terminate server */ + return true; } - if (cld_srv.cldb.is_master && cld_srv.cldb.up) - udp_rx(fd, &cli, raw_pkt, rrc); + if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) { + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + cld_srv.stats.garbage++; + return true; + } + + if (packet_is_dupe(&info)) { + /* silently drop dupes */ + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; + } - else { - struct cld_packet *outpkt; - struct cld_msg_hdr *msg = (struct cld_msg_hdr *) (pkt + 1); - struct cld_msg_resp *resp; - size_t alloc_len; - const char *secret_key; - void *p; - - alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH; - outpkt = alloca(alloc_len); - memset(outpkt, 0, alloc_len); - - pkt_init_pkt(outpkt, pkt); - - /* transmit not-master error msg */ - resp = (struct cld_msg_resp *) (outpkt + 1); - resp_copy(resp, msg); - resp->hdr.op = CMO_NOT_MASTER; - - p = outpkt; - secret_key = user_key(outpkt->user); - __cld_authsign(&srv_log, secret_key, p, - alloc_len - SHA_DIGEST_LENGTH, - p + alloc_len - SHA_DIGEST_LENGTH); + err = validate_pkt_session(&info, &cli); + if (err) { + simple_sendresp(fd, &cli, &info, err); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; + } - udp_tx(fd, (struct sockaddr *) &cli.addr, cli.addr_len, - outpkt, alloc_len); + err = validate_pkt_signature(raw_pkt, rrc, &pkt); + if (err) { + simple_sendresp(fd, &cli, &info, err); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; } - return true; /* continue main loop; do NOT terminate server */ + if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) { + simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef, + (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; + } + + err = udp_rx(fd, &cli, &info, raw_pkt, rrc); + if (err) { + simple_sendresp(fd, &cli, &info, err); + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; + } + xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + return true; } static void add_chkpt_timer(void) diff -X /garz/tmp/dontdiff -urNp cld/server/session.c cld.rpcgen/server/session.c --- cld/server/session.c 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/server/session.c 2010-02-03 16:49:55.000000000 -0500 @@ -29,15 +29,15 @@ #include <openssl/sha.h> #include <cld-private.h> #include "cld.h" +#include <cld_pkt.h> struct session_outpkt { struct session *sess; - struct cld_packet *pkt; + char *pkt_data; size_t pkt_len; uint64_t next_retry; - uint64_t src_seqid; unsigned int refs; void (*done_cb)(struct session_outpkt *); @@ -60,26 +60,6 @@ uint64_t next_seqid_le(uint64_t *seq) return rc; } -void pkt_init_pkt(struct cld_packet *dest, const struct cld_packet *src) -{ - memset(dest, 0, sizeof(*dest)); - memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ); - dest->seqid = cpu_to_le64(0xdeadbeef); - memcpy(dest->sid, src->sid, CLD_SID_SZ); - dest->flags = cpu_to_le32(CPF_FIRST | CPF_LAST); - strncpy(dest->user, src->user, CLD_MAX_USERNAME - 1); -} - -static void pkt_init_sess(struct cld_packet *dest, struct session *sess) -{ - memset(dest, 0, sizeof(*dest)); - memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ); - dest->seqid = next_seqid_le(&sess->next_seqid_out); - memcpy(dest->sid, sess->sid, CLD_SID_SZ); - dest->flags = 0; - strncpy(dest->user, sess->user, CLD_MAX_USERNAME - 1); -} - guint sess_hash(gconstpointer v) { const struct session *sess = v; @@ -397,20 +377,6 @@ static void session_ping_done(struct ses outpkt->sess->ping_open = false; } -static void session_ping(struct session *sess) -{ - struct cld_msg_hdr resp; - - memset(&resp, 0, sizeof(resp)); - memcpy(resp.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ); - __cld_rand64(&resp.xid); - resp.op = CMO_PING; - - sess->ping_open = true; - - sess_sendmsg(sess, &resp, sizeof(resp), session_ping_done, NULL); -} - static void session_timeout(struct cld_timer *timer) { struct session *sess = timer->userdata; @@ -424,8 +390,12 @@ static void session_timeout(struct cld_t if (!sess->dead && (sess_expire > now)) { if (!sess->ping_open && (sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) && - (sess->sock_fd > 0)))) - session_ping(sess); + (sess->sock_fd > 0)))) { + sess->ping_open = true; + sess_sendmsg(sess, + (xdrproc_t)xdr_void, NULL, CMO_PING, + session_ping_done, NULL); + } cld_timer_add(&cld_srv.timers, &sess->timer, now + ((sess_expire - now) / 2) + 1); @@ -519,8 +489,8 @@ static struct session_outpkt *op_alloc(s if (!op) return NULL; - op->pkt = calloc(1, pkt_len); - if (!op->pkt) { + op->pkt_data = calloc(1, pkt_len); + if (!op->pkt_data) { free(op); return NULL; } @@ -542,7 +512,7 @@ static void op_unref(struct session_outp return; } - free(op->pkt); + free(op->pkt_data); free(op); } @@ -556,18 +526,10 @@ static int sess_retry_output(struct sess tmp = sess->out_q; while (tmp) { - struct cld_packet *outpkt; - struct cld_msg_hdr *outmsg; struct session_outpkt *op; - GList *tmp1; - - tmp1 = tmp; + op = tmp->data; tmp = tmp->next; - op = tmp1->data; - outpkt = op->pkt; - outmsg = (struct cld_msg_hdr *) (outpkt + 1); - if (!next_retry || (op->next_retry < next_retry)) *next_retry_out = next_retry = op->next_retry; @@ -575,15 +537,15 @@ static int sess_retry_output(struct sess continue; if (srv_log.verbose) { - HAIL_DEBUG(&srv_log, "%s: retrying: sid " SIDFMT ", " - "op %s, seqid %llu", + char scratch[PKT_HDR_TO_STR_SCRATCH_LEN]; + HAIL_DEBUG(&srv_log, "%s: retrying %s", __func__, - SIDARG(outpkt->sid), __cld_opstr(outmsg->op), - (unsigned long long) le64_to_cpu(outpkt->seqid)); + __cld_pkt_hdr_to_str(scratch, op->pkt_data, + op->pkt_len)); } rc = udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, - sess->addr_len, op->pkt, op->pkt_len); + sess->addr_len, op->pkt_data, op->pkt_len); if (rc) break; @@ -616,153 +578,161 @@ static void session_outq(struct session sess->out_q = g_list_concat(sess->out_q, new_pkts); } -bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen, - void (*done_cb)(struct session_outpkt *), - void *done_data) -{ - struct cld_packet *outpkt; - unsigned int n_pkts, i; - size_t pkt_len, msg_left = msglen; - struct session_outpkt **pkts, *op; - GList *tmp_root = NULL; - const void *p; - bool first_frag = true; - - if (msglen > CLD_MAX_MSG_SZ) { - HAIL_ERR(&srv_log, "%s: message too big (%zu bytes)\n", - __func__, msglen); - return false; - } - - n_pkts = (msglen / CLD_MAX_PKT_MSG_SZ); - n_pkts += (msglen % CLD_MAX_PKT_MSG_SZ) ? 1 : 0; - pkts = alloca(sizeof(struct session_outpkt *) * n_pkts); - - if (srv_log.verbose) { - const struct cld_msg_hdr *hdr = msg_; - const struct cld_msg_resp *rsp; - - switch (hdr->op) { - /* This is the command set that gets to cldc_rx_generic */ - case CMO_NOP: - case CMO_CLOSE: - case CMO_DEL: - case CMO_LOCK: - case CMO_UNLOCK: - case CMO_TRYLOCK: - case CMO_PUT: - case CMO_NEW_SESS: - case CMO_END_SESS: - case CMO_OPEN: - case CMO_GET_META: - case CMO_GET: - rsp = (struct cld_msg_resp *) msg_; - HAIL_DEBUG(&srv_log, "%s: " - "sid " SIDFMT ", op %s, msglen %u, code %u, " - "xid %llu, xid_in %llu", - __func__, - SIDARG(sess->sid), - __cld_opstr(hdr->op), - (unsigned int) msglen, - le32_to_cpu(rsp->code), - (unsigned long long) le64_to_cpu(hdr->xid), - (unsigned long long) le64_to_cpu(rsp->xid_in)); - break; - default: - HAIL_DEBUG(&srv_log, "%s: " - "sid " SIDFMT ", op %s, msglen %u", - __func__, - SIDARG(sess->sid), - __cld_opstr(hdr->op), - (unsigned int) msglen); - } - } +bool sess_sendmsg(struct session *sess, + xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op msg_op, + void (*done_cb)(struct session_outpkt *), void *done_data) +{ + XDR xmsg; + size_t msg_rem, msg_len, msg_chunk_len; + char *msg_bytes, *msg_cur; + GList *tmp_list, *new_pkts = NULL; + int first, last; + const char *secret_key; - /* pass 1: perform allocations */ - for (i = 0; i < n_pkts; i++) { - pkts[i] = op = op_alloc(sizeof(*outpkt) + - CLD_MAX_PKT_MSG_SZ + - SHA_DIGEST_LENGTH); - if (!op) - goto err_out; + secret_key = user_key(sess->user); - tmp_root = g_list_append(tmp_root, op); + /* Use XDR to serialize the message */ + msg_len = xdr_sizeof(xdrproc, (void *)xdrdata); + if (msg_len > CLD_MAX_MSG_SZ) + return false; + msg_bytes = alloca(msg_len); + xdrmem_create(&xmsg, msg_bytes, msg_len, XDR_ENCODE); + if (!xdrproc(&xmsg, (void *)xdrdata)) { + xdr_destroy(&xmsg); + HAIL_ERR(&srv_log, "%s: xdrproc failed", __func__); + return false; } + xdr_destroy(&xmsg); - /* pass 2: fill packets */ - p = msg_; - for (i = 0; i < n_pkts; i++) { - struct cld_msg_hdr *outmsg; - void *outmsg_mem; - size_t copy_len; - void *out_p; - const char *secret_key; - - op = pkts[i]; + /* Break the message into packets */ + first = 1; + msg_rem = msg_len; + msg_cur = msg_bytes; + do { + XDR xout; + struct cld_pkt_hdr pkt; + size_t hdr_len; + struct session_outpkt *op; + if (msg_rem <= CLD_MAX_PKT_MSG_SZ) { + msg_chunk_len = msg_rem; + last = 1; + } else { + msg_chunk_len = CLD_MAX_PKT_MSG_SZ; + last = 0; + } + + /* Set up packet header */ + memset(&pkt, 0, sizeof(pkt)); + memcpy(&pkt.magic, CLD_PKT_MAGIC, sizeof(pkt.magic)); + memcpy(&pkt.sid, sess->sid, CLD_SID_SZ); + pkt.user = sess->user; + if (first) { + struct cld_pkt_msg_infos *infos = + &pkt.mi.cld_pkt_msg_info_u.mi; + if (last) + pkt.mi.order = CLD_PKT_ORD_FIRST_LAST; + else + pkt.mi.order = CLD_PKT_ORD_FIRST; + __cld_rand64(&infos->xid); + infos->op = msg_op; + } else { + if (last) + pkt.mi.order = CLD_PKT_ORD_LAST; + else + pkt.mi.order = CLD_PKT_ORD_MID; + } + + /* Allocate space and initialize session_outpkt structure */ + hdr_len = xdr_sizeof((xdrproc_t)xdr_cld_pkt_hdr, (void *)&pkt); + op = op_alloc(hdr_len + msg_chunk_len + CLD_PKT_FTR_LEN); + if (!op) { + HAIL_DEBUG(&srv_log, "%s: op_alloc failed", + __func__); + goto err_out; + } op->sess = sess; - - outpkt = op->pkt; - pkt_len = op->pkt_len; - - outmsg_mem = (outpkt + 1); - outmsg = outmsg_mem; - - /* init packet header */ - pkt_init_sess(outpkt, sess); - - if (first_frag) { - first_frag = false; - outpkt->flags |= cpu_to_le32(CPF_FIRST); + op->next_retry = current_time.tv_sec + CLD_RETRY_START; + op->done_cb = done_cb; + op->done_data = done_data; + xdrmem_create(&xout, op->pkt_data, hdr_len, XDR_ENCODE); + if (!xdr_cld_pkt_hdr(&xout, &pkt)) { + xdr_destroy(&xout); + HAIL_ERR(&srv_log, "%s: xdr_cld_pkt_hdr failed", + __func__); + goto err_out; } + xdr_destroy(&xout); - copy_len = MIN(pkt_len - sizeof(*outpkt) - SHA_DIGEST_LENGTH, - msg_left); - memcpy(outmsg_mem, p, copy_len); - - p += copy_len; - msg_left -= copy_len; - - op->pkt_len = - pkt_len = sizeof(*outpkt) + copy_len + SHA_DIGEST_LENGTH; - - if (!msg_left) { - op->done_cb = done_cb; - op->done_data = done_data; - - outpkt->flags |= cpu_to_le32(CPF_LAST); + /* Fill in data */ + memcpy(op->pkt_data + hdr_len, msg_cur, msg_chunk_len); + msg_cur += msg_chunk_len; + msg_rem -= msg_chunk_len; + first = 0; + + new_pkts = g_list_prepend(new_pkts, op); + } while (!last); + + /* add sequence IDs and SHAs */ + new_pkts = g_list_reverse(new_pkts); + for (tmp_list = g_list_first(new_pkts); + tmp_list; + tmp_list = g_list_next(tmp_list)) { + struct session_outpkt *op = + (struct session_outpkt *) tmp_list->data; + struct cld_pkt_ftr *foot = (struct cld_pkt_ftr *) + (op->pkt_data + (op->pkt_len - CLD_PKT_FTR_LEN)); + int ret; + + foot->seqid = next_seqid_le(&sess->next_seqid_out); + ret = __cld_authsign(&srv_log, secret_key, + op->pkt_data, op->pkt_len - SHA_DIGEST_LENGTH, + foot->sha); + if (ret) { + HAIL_ERR(&srv_log, "%s: authsign failed: %d", + __func__, ret); + goto err_out; } + } - op->next_retry = current_time.tv_sec + CLD_RETRY_START; - - out_p = outpkt; - secret_key = user_key(outpkt->user); - if (__cld_authsign(&srv_log, secret_key, out_p, - pkt_len - SHA_DIGEST_LENGTH, - out_p + pkt_len - SHA_DIGEST_LENGTH)) - goto err_out; /* FIXME: we free all pkts -- wrong! */ - + /* send packets */ + for (tmp_list = g_list_first(new_pkts); + tmp_list; + tmp_list = g_list_next(tmp_list)) { + struct session_outpkt *op = + (struct session_outpkt *) tmp_list->data; udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, - sess->addr_len, outpkt, pkt_len); + sess->addr_len, op->pkt_data, op->pkt_len); } - session_outq(sess, tmp_root); + session_outq(sess, new_pkts); return true; err_out: - for (i = 0; i < n_pkts; i++) - op_unref(pkts[i]); - g_list_free(tmp_root); + for (tmp_list = g_list_first(new_pkts); tmp_list; + tmp_list = g_list_next(tmp_list)) { + struct session_outpkt *op; + op = (struct session_outpkt *)tmp_list->data; + op_unref(op); + } + g_list_free(new_pkts); return false; } -void msg_ack(struct msg_params *mp) +void sess_sendresp_generic(struct session *sess, enum cle_err_codes code) +{ + struct cld_msg_generic_resp resp; + resp.code = code; + resp.xid_in = sess->msg_xid; + + sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp, + (void *)&resp, sess->msg_op, NULL, NULL); +} + +void msg_ack(struct session *sess, uint64_t seqid) { - struct cld_packet *outpkt; - struct cld_msg_hdr *outmsg; GList *tmp, *tmp1; - struct session *sess = mp->sess; struct session_outpkt *op; if (!sess->out_q) @@ -771,19 +741,22 @@ void msg_ack(struct msg_params *mp) /* look through output queue */ tmp = sess->out_q; while (tmp) { + uint64_t op_seqid; + struct cld_pkt_ftr *foot; tmp1 = tmp; tmp = tmp->next; op = tmp1->data; - outpkt = op->pkt; - outmsg = (struct cld_msg_hdr *) (outpkt + 1); + foot = (struct cld_pkt_ftr *) + (op->pkt_data + (op->pkt_len - CLD_PKT_FTR_LEN)); + op_seqid = le64_to_cpu(foot->seqid); /* if matching seqid found, we ack'd a message in out_q */ - if (mp->pkt->seqid != outpkt->seqid) + if (seqid != op_seqid) continue; HAIL_DEBUG(&srv_log, " expiring seqid %llu", - (unsigned long long) le64_to_cpu(outpkt->seqid)); + (unsigned long long) op_seqid); /* remove and delete the ack'd msg; call ack'd callback */ sess->out_q = g_list_delete_link(sess->out_q, tmp1); @@ -797,19 +770,17 @@ void msg_ack(struct msg_params *mp) cld_timer_del(&cld_srv.timers, &sess->retry_timer); } -void msg_new_sess(struct msg_params *mp, const struct client *cli) +void msg_new_sess(int sock_fd, const struct client *cli, + const struct pkt_info *info) { + const struct cld_pkt_hdr *pkt = info->pkt; DB *db = cld_srv.cldb.sessions; struct raw_session raw_sess; struct session *sess = NULL; DBT key, val; int rc; enum cle_err_codes resp_rc = CLE_OK; - struct cld_msg_resp *resp; - struct cld_packet *outpkt; - size_t alloc_len; - const char *secret_key; - void *p; + struct cld_msg_generic_resp resp; sess = session_new(); if (!sess) { @@ -818,17 +789,17 @@ void msg_new_sess(struct msg_params *mp, } /* build raw_session database record */ - memcpy(sess->sid, mp->pkt->sid, sizeof(sess->sid)); + memcpy(sess->sid, &pkt->sid, sizeof(sess->sid)); memcpy(&sess->addr, &cli->addr, sizeof(sess->addr)); - strncpy(sess->user, mp->pkt->user, sizeof(sess->user)); - sess->user[sizeof(sess->user) - 1] = 0; + snprintf(sess->user, sizeof(sess->user), "%s", + pkt->user); - sess->sock_fd = mp->sock_fd; + sess->sock_fd = sock_fd; sess->addr_len = cli->addr_len; strncpy(sess->ipaddr, cli->addr_host, sizeof(sess->ipaddr)); sess->last_contact = current_time.tv_sec; - sess->next_seqid_in = le64_to_cpu(mp->pkt->seqid) + 1; + sess->next_seqid_in = info->seqid + 1; session_encode(&raw_sess, sess); @@ -865,34 +836,26 @@ void msg_new_sess(struct msg_params *mp, cld_timer_add(&cld_srv.timers, &sess->timer, time(NULL) + (CLD_SESS_TIMEOUT / 2)); - resp_ok(sess, mp->msg); + /* send new-sess reply */ + resp.code = CLE_OK; + resp.xid_in = info->xid; + sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp, + (void *)&resp, CMO_NEW_SESS, NULL, NULL); + return; err_out: session_free(sess, true); - alloc_len = sizeof(*outpkt) + sizeof(*resp) + SHA_DIGEST_LENGTH; - outpkt = alloca(alloc_len); - memset(outpkt, 0, alloc_len); - - pkt_init_pkt(outpkt, mp->pkt); - - resp = (struct cld_msg_resp *) (outpkt + 1); - resp_copy(resp, mp->msg); - resp->code = cpu_to_le32(resp_rc); - - p = outpkt; - secret_key = user_key(outpkt->user); - __cld_authsign(&srv_log, secret_key, p, alloc_len - SHA_DIGEST_LENGTH, - p + alloc_len - SHA_DIGEST_LENGTH); - - HAIL_DEBUG(&srv_log, "%s err: sid " SIDFMT ", op %s, seqid %llu", + HAIL_DEBUG(&srv_log, "%s err: sid " SIDFMT ", op %s", __func__, - SIDARG(outpkt->sid), __cld_opstr(resp->hdr.op), - (unsigned long long) le64_to_cpu(outpkt->seqid)); + pkt->sid, __cld_opstr(CMO_NEW_SESS)); - udp_tx(mp->sock_fd, (struct sockaddr *) &mp->cli->addr, - mp->cli->addr_len, outpkt, alloc_len); + resp.code = resp_rc; + resp.xid_in = info->xid; + simple_sendmsg(sock_fd, cli, pkt->sid, pkt->user, 0xdeadbeef, + (xdrproc_t)xdr_cld_msg_generic_resp, (void *)&resp, + CMO_NEW_SESS); HAIL_DEBUG(&srv_log, "NEW-SESS failed: %d", resp_rc); } @@ -902,18 +865,18 @@ static void end_sess_done(struct session session_trash(outpkt->sess); } -void msg_end_sess(struct msg_params *mp, const struct client *cli) +void msg_end_sess(struct session *sess, uint64_t xid) { - struct session *sess = mp->sess; - struct cld_msg_resp resp; + struct cld_msg_generic_resp resp; /* do nothing; let message acknowledgement via * end_sess_done mark session dead */ - - memset(&resp, 0, sizeof(resp)); - resp_copy(&resp, mp->msg); - sess_sendmsg(sess, &resp, sizeof(resp), end_sess_done, NULL); + resp.code = CLE_OK; + resp.xid_in = xid; + sess_sendmsg(sess, (xdrproc_t)xdr_cld_msg_generic_resp, + &resp, CMO_END_SESS, + end_sess_done, NULL); } /* diff -X /garz/tmp/dontdiff -urNp cld/test/load-file-event.c cld.rpcgen/test/load-file-event.c --- cld/test/load-file-event.c 2010-02-04 18:44:07.000000000 -0500 +++ cld.rpcgen/test/load-file-event.c 2010-02-04 18:45:07.000000000 -0500 @@ -146,7 +146,7 @@ static int read_1_cb(struct cldc_call_op { struct run *rp = coptarg->private; struct cldc_call_opts copts; - const char *data; + char *data; size_t data_len; int rc; diff -X /garz/tmp/dontdiff -urNp cld/test/Makefile.am cld.rpcgen/test/Makefile.am --- cld/test/Makefile.am 2010-02-04 01:20:51.000000000 -0500 +++ cld.rpcgen/test/Makefile.am 2010-02-02 22:30:24.000000000 -0500 @@ -1,5 +1,6 @@ INCLUDES = -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib \ @GLIB_CFLAGS@ EXTRA_DIST = \ diff -X /garz/tmp/dontdiff -urNp cld/tools/cldcli.c cld.rpcgen/tools/cldcli.c --- cld/tools/cldcli.c 2010-02-04 18:44:18.000000000 -0500 +++ cld.rpcgen/tools/cldcli.c 2010-02-03 17:23:17.000000000 -0500 @@ -217,7 +217,7 @@ static int cb_ls_2(struct cldc_call_opts struct cldc_call_opts copts = { NULL, }; struct cld_dirent_cur dc; int rc, i; - const char *data; + char *data; size_t data_len; bool first = true; @@ -297,7 +297,7 @@ static int cb_cat_2(struct cldc_call_opt { struct cresp cresp = { .tcode = TC_FAILED, }; struct cldc_call_opts copts = { NULL, }; - const char *data; + char *data; size_t data_len; if (errc != CLE_OK) { @@ -345,7 +345,7 @@ static int cb_cp_cf_2(struct cldc_call_o { struct cresp cresp = { .tcode = TC_FAILED, }; struct cldc_call_opts copts = { NULL, }; - const char *data; + char *data; size_t data_len; if (errc != CLE_OK) { diff -X /garz/tmp/dontdiff -urNp cld/tools/Makefile.am cld.rpcgen/tools/Makefile.am --- cld/tools/Makefile.am 2010-01-29 00:36:25.000000000 -0500 +++ cld.rpcgen/tools/Makefile.am 2010-01-22 18:28:08.000000000 -0500 @@ -1,5 +1,6 @@ INCLUDES = -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib \ @GLIB_CFLAGS@ bin_PROGRAMS = cldcli