Convert CLD network protocol from UDP to TCP. Server, client lib, and chunkd's cldu module are all updated. tabled's cldu module must be updated also. The original rationale for UDP use was following Google's lead, based on the advice in the original Chubby paper, describing TCP's back-off policies and other behavior during times of high network congestion. This seems a bit dubious without further third party evidence, and TCP vastly simplifies our lives. While the code remains open and modular enough to support other protocols (hopefully RDMA or SCTP one day), this upgrade from UDP to TCP promises to make the current codebase much easier to use, while avoiding the "reinvent TCP, by using UDP" problem, which was a rabbit hole threatening CLD. Signed-off-by: Jeff Garzik <jgarzik@xxxxxxxxxx> --- chunkd/cldu.c | 6 cld/cld.h | 43 ++++++ cld/msg.c | 4 cld/server.c | 356 ++++++++++++++++++++++++++++++++++++--------------- cld/session.c | 4 configure.ac | 1 include/Makefile.am | 2 include/cld_common.h | 4 include/cldc.h | 24 ++- include/ncld.h | 4 include/ubbp.h | 52 +++++++ lib/Makefile.am | 2 lib/cldc-dns.c | 2 lib/cldc-tcp.c | 185 ++++++++++++++++++++++++++ lib/cldc-udp.c | 141 -------------------- lib/cldc.c | 54 +++---- 16 files changed, 595 insertions(+), 289 deletions(-) diff --git a/chunkd/cldu.c b/chunkd/cldu.c index 026c523..41f94b5 100644 --- a/chunkd/cldu.c +++ b/chunkd/cldu.c @@ -165,7 +165,7 @@ static void cldu_sess_event(void *priv, uint32_t what) */ if (cs->nsess) { applog(LOG_ERR, "Session failed, sid " SIDFMT, - SIDARG(cs->nsess->udp->sess->sid)); + SIDARG(cs->nsess->tcp->sess->sid)); } else { applog(LOG_ERR, "Session open failed"); } @@ -177,7 +177,7 @@ static void cldu_sess_event(void *priv, uint32_t what) } else { if (cs) applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT, - what, SIDARG(cs->nsess->udp->sess->sid)); + what, SIDARG(cs->nsess->tcp->sess->sid)); else applog(LOG_INFO, "cldc event 0x%x no sid", what); } @@ -372,7 +372,7 @@ static int cldu_set_cldc(struct cld_session *cs, int newactive) } applog(LOG_INFO, "New CLD session created, sid " SIDFMT, - SIDARG(cs->nsess->udp->sess->sid)); + SIDARG(cs->nsess->tcp->sess->sid)); /* * First, make sure the base directory exists. diff --git a/cld/cld.h b/cld/cld.h index 17f14b8..b1f9bbf 100644 --- a/cld/cld.h +++ b/cld/cld.h @@ -30,6 +30,7 @@ #include <cld_common.h> #include <hail_log.h> #include <hail_private.h> +#include <ubbp.h> struct client; struct session_outpkt; @@ -43,10 +44,39 @@ enum { SFL_FOREGROUND = (1 << 0), /* run in foreground */ }; +struct atcp_read { + void *buf; + unsigned int buf_size; + unsigned int bytes_wanted; + unsigned int bytes_read; + + void (*cb)(void *, bool); + void *cb_data; + + struct list_head node; +}; + +struct atcp_read_state { + struct list_head q; +}; + struct client { + int fd; + + struct event ev; + short ev_mask; /* EV_READ and/or EV_WRITE */ + struct sockaddr_in6 addr; /* inet address */ socklen_t addr_len; /* inet address len */ char addr_host[64]; /* ASCII version of inet addr */ + char addr_port[16]; /* ASCII version of inet addr */ + + struct atcp_read_state rst; + + struct ubbp_header ubbp; + + char raw_pkt[CLD_RAW_MSG_SZ]; + unsigned int raw_size; }; struct session { @@ -124,6 +154,17 @@ struct pkt_info { size_t hdr_len; }; +#define ___constant_swab32(x) ((uint32_t)( \ + (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \ + (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \ + (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \ + (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24))) + +static inline uint32_t swab32(uint32_t v) +{ + return ___constant_swab32(v); +} + /* msg.c */ extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum); extern void msg_get(struct session *sess, const void *v); @@ -178,7 +219,7 @@ extern int sess_load(GHashTable *ss); extern struct server cld_srv; extern struct hail_log srv_log; extern struct timeval current_time; -extern int udp_tx(int sock_fd, struct sockaddr *, socklen_t, +extern int tcp_tx(int sock_fd, struct sockaddr *, socklen_t, const void *, size_t); extern const char *user_key(const char *user); diff --git a/cld/msg.c b/cld/msg.c index dd8cf03..8d83e74 100644 --- a/cld/msg.c +++ b/cld/msg.c @@ -29,10 +29,6 @@ #include <cld_msg_rpc.h> #include "cld.h" -enum { - CLD_MAX_UDP_SEG = 1024, -}; - struct pathname_info { const char *dir; size_t dir_len; diff --git a/cld/server.c b/cld/server.c index aed501b..5a73e54 100644 --- a/cld/server.c +++ b/cld/server.c @@ -33,6 +33,7 @@ #include <netdb.h> #include <signal.h> #include <netinet/in.h> +#include <netinet/tcp.h> #include <openssl/sha.h> #include <openssl/hmac.h> #include <cld-private.h> @@ -46,10 +47,6 @@ const char *argp_program_version = PACKAGE_VERSION; -enum { - CLD_RAW_MSG_SZ = 4096, -}; - static struct argp_option options[] = { { "data", 'd', "DIRECTORY", 0, "Store database environment in DIRECTORY. Default: " @@ -61,7 +58,7 @@ static struct argp_option options[] = { { "foreground", 'F', NULL, 0, "Run in foreground, do not fork" }, { "port", 'p', "PORT", 0, - "Bind to UDP port PORT. Default: " CLD_DEF_PORT }, + "Bind to TCP port PORT. Default: " CLD_DEF_PORT }, { "pid", 'P', "FILE", 0, "Write daemon process id to FILE. Default: " CLD_DEF_PIDFN }, { "strict-free", 1001, NULL, 0, @@ -93,6 +90,11 @@ struct server cld_srv = { }; static void ensure_root(void); +static bool atcp_read(struct atcp_read_state *rst, + void *buf, unsigned int buf_size, + void (*cb)(void *, bool), void *cb_data); +static void cli_free(struct client *cli); +static void cli_rd_ubbp(void *userdata, bool success); static void applog(int prio, const char *fmt, ...) { @@ -119,12 +121,28 @@ struct hail_log srv_log = { .func = applog, }; -int udp_tx(int sock_fd, struct sockaddr *addr, socklen_t addr_len, +int tcp_tx(int sock_fd, struct sockaddr *addr, socklen_t addr_len, const void *data, size_t data_len) { ssize_t src; + struct ubbp_header ubbp; + + memcpy(ubbp.magic, "CLD1", 4); + ubbp.op_size = (data_len << 8) | 2; +#ifdef WORDS_BIGENDIAN + swab32(ubbp.op_size); +#endif + + src = write(sock_fd, &ubbp, sizeof(ubbp)); + if (src < 0 && errno != EAGAIN) + HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s", + __func__, sock_fd, (unsigned int) data_len, + strerror(errno)); - src = sendto(sock_fd, data, data_len, 0, addr, addr_len); + if (src < 0) + return -errno; + + src = write(sock_fd, data, data_len); if (src < 0 && errno != EAGAIN) HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s", __func__, sock_fd, (unsigned int) data_len, @@ -148,7 +166,7 @@ const char *user_key(const char *user) return user; /* our secret key */ } -static int udp_rx_handle(struct session *sess, +static int tcp_rx_handle(struct session *sess, void (*msg_handler)(struct session *, const void *), xdrproc_t xdrproc, void *xdrdata) { @@ -167,9 +185,9 @@ static int udp_rx_handle(struct session *sess, return 0; } -/** Recieve a UDP packet +/** Recieve a TCP packet * - * @param sock_fd The UDP socket we received the packet on + * @param sock_fd The TCP socket we received the packet on * @param cli Client address data * @param info Packet information * @param raw_pkt The raw packet buffer @@ -178,7 +196,7 @@ static int udp_rx_handle(struct session *sess, * @return An error code if we should send an error message * response. CLE_OK if we are done. */ -static enum cle_err_codes udp_rx(int sock_fd, const struct client *cli, +static enum cle_err_codes tcp_rx(int sock_fd, const struct client *cli, struct pkt_info *info, const char *raw_pkt, size_t raw_len) { @@ -230,39 +248,39 @@ static enum cle_err_codes udp_rx(int sock_fd, const struct client *cli, /* fall through */ case CMO_GET_META: { struct cld_msg_get get = {0}; - return udp_rx_handle(sess, msg_get, + return tcp_rx_handle(sess, msg_get, (xdrproc_t)xdr_cld_msg_get, &get); } case CMO_OPEN: { struct cld_msg_open open_msg = {0}; - return udp_rx_handle(sess, msg_open, + return tcp_rx_handle(sess, msg_open, (xdrproc_t)xdr_cld_msg_open, &open_msg); } case CMO_PUT: { struct cld_msg_put put = {0}; - return udp_rx_handle(sess, msg_put, + return tcp_rx_handle(sess, msg_put, (xdrproc_t)xdr_cld_msg_put, &put); } case CMO_CLOSE: { struct cld_msg_close close_msg = {0}; - return udp_rx_handle(sess, msg_close, + return tcp_rx_handle(sess, msg_close, (xdrproc_t)xdr_cld_msg_close, &close_msg); } case CMO_DEL: { struct cld_msg_del del = {0}; - return udp_rx_handle(sess, msg_del, + return tcp_rx_handle(sess, msg_del, (xdrproc_t)xdr_cld_msg_del, &del); } case CMO_UNLOCK: { struct cld_msg_unlock unlock = {0}; - return udp_rx_handle(sess, msg_unlock, + return tcp_rx_handle(sess, msg_unlock, (xdrproc_t)xdr_cld_msg_unlock, &unlock); } case CMO_TRYLOCK: /* fall through */ case CMO_LOCK: { struct cld_msg_lock lock = {0}; - return udp_rx_handle(sess, msg_lock, + return tcp_rx_handle(sess, msg_lock, (xdrproc_t)xdr_cld_msg_lock, &lock); } case CMO_ACK: @@ -452,31 +470,6 @@ static enum cle_err_codes pkt_chk_sig(const char *raw_pkt, int raw_len, return 0; } -/** Check if this packet is a duplicate - * - * @param info Packet info - * - * @return true only if the packet is a duplicate - */ -static bool packet_is_dupe(const struct pkt_info *info) -{ - if (!info->sess) - return false; - if (info->op == CMO_ACK) - return false; - - /* Check sequence IDs to discover if this packet is a dupe */ - if (info->seqid != info->sess->next_seqid_in) { - HAIL_DEBUG(&srv_log, "dropping dup with seqid %llu " - "(expected %llu).", - (unsigned long long) info->seqid, - (unsigned long long) info->sess->next_seqid_in); - return true; - } - - return false; -} - void simple_sendmsg(int fd, const struct client *cli, uint64_t sid, const char *user, uint64_t seqid, xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op) @@ -541,7 +534,7 @@ void simple_sendmsg(int fd, const struct client *cli, HAIL_ERR(&srv_log, "%s: authsign failed: %d", __func__, auth_rc); - udp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len, + tcp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len, buf, buf_len); } @@ -559,91 +552,88 @@ static void simple_sendresp(int sock_fd, const struct client *cli, info->op); } -static void udp_srv_event(int fd, short events, void *userdata) +static void cli_rd_pkt(void *userdata, bool success) { - struct client cli; - char host[64]; + struct client *cli = userdata; + int fd = cli->fd; ssize_t rrc, hdr_len; - struct msghdr hdr; - struct iovec iov[2]; - char raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ]; struct cld_pkt_hdr pkt; struct pkt_info info; enum cle_err_codes err; - memset(&cli, 0, sizeof(cli)); - - iov[0].iov_base = raw_pkt; - iov[0].iov_len = sizeof(raw_pkt); - - hdr.msg_name = &cli.addr; - hdr.msg_namelen = sizeof(cli.addr); - hdr.msg_iov = iov; - hdr.msg_iovlen = 1; - hdr.msg_control = ctl_msg; - hdr.msg_controllen = sizeof(ctl_msg); - - rrc = recvmsg(fd, &hdr, 0); - if (rrc < 0) { - syslogerr("UDP recvmsg"); - return; - } - cli.addr_len = hdr.msg_namelen; - - /* pretty-print incoming cxn info */ - getnameinfo((struct sockaddr *) &cli.addr, cli.addr_len, - host, sizeof(host), NULL, 0, NI_NUMERICHOST); - host[sizeof(host) - 1] = 0; + rrc = cli->raw_size; - strcpy(cli.addr_host, host); + HAIL_DEBUG(&srv_log, "client %s message (%d bytes)", + cli->addr_host, (int) rrc); - HAIL_DEBUG(&srv_log, "client %s message (%d bytes)", host, (int) rrc); - - if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) { + if (!parse_pkt_header(cli->raw_pkt, rrc, &pkt, &hdr_len)) { cld_srv.stats.garbage++; return; } - if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) { + if (!get_pkt_info(&pkt, cli->raw_pkt, rrc, hdr_len, &info)) { xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); cld_srv.stats.garbage++; return; } - if (packet_is_dupe(&info)) { - /* silently drop dupes */ - xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); - return; - } - - err = validate_pkt_session(&info, &cli); + err = validate_pkt_session(&info, cli); if (err) { - simple_sendresp(fd, &cli, &info, err); + simple_sendresp(fd, cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); return; } - err = pkt_chk_sig(raw_pkt, rrc, &pkt); + err = pkt_chk_sig(cli->raw_pkt, rrc, &pkt); if (err) { - simple_sendresp(fd, &cli, &info, err); + simple_sendresp(fd, cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); return; } if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) { - simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef, + simple_sendmsg(fd, cli, pkt.sid, pkt.user, 0xdeadbeef, (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); return; } - err = udp_rx(fd, &cli, &info, raw_pkt, rrc); + err = tcp_rx(fd, cli, &info, cli->raw_pkt, rrc); if (err) { - simple_sendresp(fd, &cli, &info, err); + simple_sendresp(fd, cli, &info, err); xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); return; } xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt); + + atcp_read(&cli->rst, &cli->ubbp, sizeof(cli->ubbp), + cli_rd_ubbp, cli); +} + +static void cli_rd_ubbp(void *userdata, bool success) +{ + struct client *cli = userdata; + uint32_t sz; + +#ifdef WORDS_BIGENDIAN + swab32(cli->ubbp.op_size); +#endif + if (memcmp(cli->ubbp.magic, "CLD1", 4)) + goto err_out; + if (UBBP_OP(cli->ubbp.op_size) != 1) + goto err_out; + sz = UBBP_SIZE(cli->ubbp.op_size); + if (sz > CLD_RAW_MSG_SZ) + goto err_out; + + cli->raw_size = sz; + + atcp_read(&cli->rst, cli->raw_pkt, sz, cli_rd_pkt, cli); + + return; + +err_out: + cli_free(cli); } static void add_chkpt_timer(void) @@ -672,6 +662,170 @@ static void cldb_checkpoint(int fd, short events, void *userdata) add_chkpt_timer(); } +static void atcp_read_init(struct atcp_read_state *rst) +{ + memset(rst, 0, sizeof(*rst)); + INIT_LIST_HEAD(&rst->q); +} + +static bool atcp_read(struct atcp_read_state *rst, + void *buf, unsigned int buf_size, + void (*cb)(void *, bool), void *cb_data) +{ + struct atcp_read *rd; + + rd = calloc(1, sizeof(*rd)); + if (!rd) + goto err_out; + + rd->buf = buf; + rd->buf_size = buf_size; + rd->bytes_wanted = buf_size; + rd->cb = cb; + rd->cb_data = cb_data; + + INIT_LIST_HEAD(&rd->node); + + list_add_tail(&rd->node, &rst->q); + + return true; + +err_out: + cb(cb_data, false); + return false; +} + +static bool atcp_read_event(struct atcp_read_state *rst, int fd) +{ + struct atcp_read *tmp, *iter; + + list_for_each_entry_safe(tmp, iter, &rst->q, node) { + ssize_t rrc; + + rrc = read(fd, tmp->buf + tmp->bytes_read, + tmp->bytes_wanted); + if (rrc < 0) { + if (errno == EAGAIN) + return true; + return false; + } + if (rrc == 0) + break; + + tmp->bytes_read += rrc; + tmp->bytes_wanted -= rrc; + + if (tmp->bytes_read == tmp->buf_size) { + list_del_init(&tmp->node); + + tmp->cb(tmp->cb_data, true); + free(tmp); + } + } + + return true; +} + +static struct client *cli_alloc(void) +{ + struct client *cli; + + cli = calloc(1, sizeof(*cli)); + if (!cli) + return NULL; + + cli->addr_len = sizeof(cli->addr); + + atcp_read_init(&cli->rst); + + return cli; +} + +static void cli_free(struct client *cli) +{ + if (!cli) + return; + + if (cli->fd >= 0) { + event_del(&cli->ev); + close(cli->fd); + cli->fd = -1; + } + + free(cli); +} + +static void tcp_cli_event(int fd, short events, void *userdata) +{ + struct client *cli = userdata; + + atcp_read_event(&cli->rst, fd); +} + +static void tcp_srv_event(int fd, short events, void *userdata) +{ + struct server_socket *sock = userdata; + struct client *cli; + char host[64]; + char port[16]; + int on = 1; + + cli = cli_alloc(); + if (!cli) { + applog(LOG_ERR, "out of memory"); + server_running = false; + event_loopbreak(); + return; + } + + /* receive TCP connection from kernel */ + cli->fd = accept(sock->fd, (struct sockaddr *) &cli->addr, + &cli->addr_len); + if (cli->fd < 0) { + syslogerr("tcp accept"); + goto err_out; + } + + /* mark non-blocking, for upcoming poll use */ + if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0) + goto err_out_fd; + + /* disable delay of small output packets */ + if (setsockopt(cli->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) + applog(LOG_WARNING, "TCP_NODELAY failed: %s", + strerror(errno)); + + event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, + tcp_cli_event, cli); + + /* pretty-print incoming cxn info */ + getnameinfo((struct sockaddr *) &cli->addr, cli->addr_len, + host, sizeof(host), port, sizeof(port), + NI_NUMERICHOST | NI_NUMERICSERV); + host[sizeof(host) - 1] = 0; + port[sizeof(port) - 1] = 0; + applog(LOG_INFO, "client host %s port %s connected%s", host, port, + /* cli->ssl ? " via SSL" : */ ""); + + strcpy(cli->addr_host, host); + strcpy(cli->addr_port, port); + + if (event_add(&cli->ev, NULL) < 0) { + applog(LOG_ERR, "unable to ready srv fd for polling"); + goto err_out_fd; + } + cli->ev_mask = EV_READ; + + atcp_read(&cli->rst, &cli->ubbp, sizeof(cli->ubbp), + cli_rd_ubbp, cli); + + return; + +err_out_fd: +err_out: + cli_free(cli); +} + static int net_write_port(const char *port_file, const char *port_str) { FILE *portf; @@ -717,17 +871,23 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot, fd = socket(addr_fam, sock_type, sock_prot); if (fd < 0) { - syslogerr("udp socket"); + syslogerr("tcp socket"); return -errno; } if (bind(fd, addr_ptr, addr_len) < 0) { - syslogerr("udp bind"); + syslogerr("tcp bind"); + close(fd); + return -errno; + } + + if (listen(fd, 100) < 0) { + syslogerr("tcp listen"); close(fd); return -errno; } - rc = fsetflags("udp server", fd, O_NONBLOCK); + rc = fsetflags("tcp server", fd, O_NONBLOCK); if (rc) { close(fd); return -errno; @@ -743,7 +903,7 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot, INIT_LIST_HEAD(&sock->sockets_node); event_set(&sock->ev, fd, EV_READ | EV_PERSIST, - udp_srv_event, sock); + tcp_srv_event, sock); if (event_add(&sock->ev, NULL) < 0) { free(sock); @@ -771,7 +931,7 @@ static int net_open_any(void) memset(&addr6, 0, sizeof(addr6)); addr6.sin6_family = AF_INET6; memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr)); - fd6 = net_open_socket(AF_INET6, SOCK_DGRAM, 0, sizeof(addr6), &addr6); + fd6 = net_open_socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, sizeof(addr6), &addr6); if (fd6 >= 0) { addr_len = sizeof(addr6); @@ -790,7 +950,7 @@ static int net_open_any(void) /* If IPv6 worked, we must use the same port number for IPv4 */ if (port) addr4.sin_port = port; - fd4 = net_open_socket(AF_INET, SOCK_DGRAM, 0, sizeof(addr4), &addr4); + fd4 = net_open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, sizeof(addr4), &addr4); if (!port) { if (fd4 < 0) @@ -824,7 +984,7 @@ static int net_open_known(const char *portstr) memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; + hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; rc = getaddrinfo(NULL, portstr, &hints, &res0); diff --git a/cld/session.c b/cld/session.c index 9887aaa..a38874a 100644 --- a/cld/session.c +++ b/cld/session.c @@ -550,7 +550,7 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out) op->pkt_len)); } - rc = udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, + rc = tcp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, sess->addr_len, op->pkt_data, op->pkt_len); if (rc) break; @@ -715,7 +715,7 @@ bool sess_sendmsg(struct session *sess, tmp_list = g_list_next(tmp_list)) { struct session_outpkt *op = (struct session_outpkt *) tmp_list->data; - udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, + tcp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr, sess->addr_len, op->pkt_data, op->pkt_len); } diff --git a/configure.ac b/configure.ac index 9cfad23..d93a9a5 100644 --- a/configure.ac +++ b/configure.ac @@ -69,6 +69,7 @@ AC_CHECK_HEADERS(sys/sendfile.h sys/filio.h) AC_CHECK_HEADER(db.h,[],exit 1) dnl Checks for typedefs, structures, and compiler characteristics. +AC_C_BIGENDIAN dnl AC_TYPE_SIZE_T dnl AC_TYPE_PID_T diff --git a/include/Makefile.am b/include/Makefile.am index 967352a..70f56a9 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -4,6 +4,6 @@ EXTRA_DIST = \ elist.h chunk_msg.h chunksrv.h chunk-private.h objcache.h include_HEADERS = \ - cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h \ + ubbp.h cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h \ hail_log.h hstor.h anet.h diff --git a/include/cld_common.h b/include/cld_common.h index 84b1ec6..efb5911 100644 --- a/include/cld_common.h +++ b/include/cld_common.h @@ -31,6 +31,10 @@ #define CLD_ALIGN8(n) ((8 - ((n) & 7)) & 7) +enum { + CLD_RAW_MSG_SZ = 4096, +}; + struct cld_timer { bool fired; bool on_list; diff --git a/include/cldc.h b/include/cldc.h index c64eef9..f98d151 100644 --- a/include/cldc.h +++ b/include/cldc.h @@ -25,6 +25,7 @@ #include <cld_msg_rpc.h> #include <cld_common.h> #include <hail_log.h> +#include <ubbp.h> struct cldc_session; @@ -142,13 +143,20 @@ struct cldc_host { unsigned short port; }; -/** A UDP implementation of the CLD client protocol */ -struct cldc_udp { +/** A TCP implementation of the CLD client protocol */ +struct cldc_tcp { uint8_t addr[64]; /* server address */ size_t addr_len; int fd; + struct ubbp_header ubbp; + unsigned int ubbp_read; + + char raw_pkt[CLD_RAW_MSG_SZ]; + unsigned int raw_size; + unsigned int raw_read; + struct cldc_session *sess; int (*cb)(struct cldc_session *, void *); @@ -215,12 +223,12 @@ extern void cldc_copts_get_data(const struct cldc_call_opts *copts, extern void cldc_copts_get_metadata(const struct cldc_call_opts *copts, struct cldc_node_metadata *md); -/* cldc-udp */ -extern void cldc_udp_free(struct cldc_udp *udp); -extern int cldc_udp_new(const char *hostname, int port, - struct cldc_udp **udp_out); -extern int cldc_udp_receive_pkt(struct cldc_udp *udp); -extern int cldc_udp_pkt_send(void *private, +/* cldc-tcp */ +extern void cldc_tcp_free(struct cldc_tcp *tcp); +extern int cldc_tcp_new(const char *hostname, int port, + struct cldc_tcp **tcp_out); +extern int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp); +extern int cldc_tcp_pkt_send(void *private, const void *addr, size_t addrlen, const void *buf, size_t buflen); diff --git a/include/ncld.h b/include/ncld.h index aad88b7..21b6e36 100644 --- a/include/ncld.h +++ b/include/ncld.h @@ -40,8 +40,8 @@ struct ncld_sess { int errc; GList *handles; int to_thread[2]; - struct cldc_udp *udp; - struct cld_timer udp_timer; + struct cldc_tcp *tcp; + struct cld_timer tcp_timer; struct cld_timer_list tlist; void (*event)(void *, unsigned int); void *event_arg; diff --git a/include/ubbp.h b/include/ubbp.h new file mode 100644 index 0000000..94dbbab --- /dev/null +++ b/include/ubbp.h @@ -0,0 +1,52 @@ +#ifndef __UBBP_H__ +#define __UBBP_H__ + +/* + + Universal Base Binary Protocol (UBBP) - a universal message header + + Copyright 2010 Red Hat, Inc. + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation + files (the "Software"), to deal in the Software without restriction, + including without limitation the rights to use, copy, modify, merge, + publish, distribute, sublicense, and/or sell copies of the Software, + and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY + KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN + AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR + IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + +*/ + + +#include <stdint.h> + +struct ubbp_header { + /* magic number of your protocol */ + unsigned char magic[4]; + + /* + * size: upper 24 bits (size = op_size >> 8) + * op: lower 8 bits (op = op_size & 0xff) + * + * Byte order: little endian + */ + uint32_t op_size; +}; + +#define UBBP_OP(op_size) ((op_size) & 0xff) +#define UBBP_SIZE(op_size) ((op_size) >> 8) + +#endif /* __UBBP_H__ */ + diff --git a/lib/Makefile.am b/lib/Makefile.am index 616b881..8be1835 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -23,7 +23,7 @@ lib_LTLIBRARIES = libhail.la libhail_la_SOURCES = \ atcp.c \ cldc.c \ - cldc-udp.c \ + cldc-tcp.c \ cldc-dns.c \ common.c \ libtimer.c \ diff --git a/lib/cldc-dns.c b/lib/cldc-dns.c index 92f875d..e987c01 100644 --- a/lib/cldc-dns.c +++ b/lib/cldc-dns.c @@ -63,7 +63,7 @@ int cldc_saveaddr(struct cldc_host *hp, memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = PF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; + hints.ai_socktype = SOCK_STREAM; rc = getaddrinfo(hostname, portstr, &hints, &res0); if (rc) { diff --git a/lib/cldc-tcp.c b/lib/cldc-tcp.c new file mode 100644 index 0000000..63a753b --- /dev/null +++ b/lib/cldc-tcp.c @@ -0,0 +1,185 @@ + +/* + * Copyright 2009 Red Hat, Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; see the file COPYING. If not, write to + * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#define _GNU_SOURCE +#include "hail-config.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <stdio.h> +#include <errno.h> +#include <netinet/in.h> +#include <netdb.h> +#include <cldc.h> + +void cldc_tcp_free(struct cldc_tcp *tcp) +{ + if (!tcp) + return; + + if (tcp->fd >= 0) + close(tcp->fd); + + free(tcp); +} + +int cldc_tcp_new(const char *hostname, int port, + struct cldc_tcp **tcp_out) +{ + struct cldc_tcp *tcp; + struct addrinfo hints, *res, *rp; + char port_s[32]; + int rc, fd = -1; + + *tcp_out = NULL; + + sprintf(port_s, "%d", port); + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + rc = getaddrinfo(hostname, port_s, &hints, &res); + if (rc) + return -ENOENT; + + for (rp = res; rp; rp = rp->ai_next) { + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd < 0) + continue; + + if (connect(fd, rp->ai_addr, rp->ai_addrlen) != -1) + break; /* success */ + + close(fd); + fd = -1; + } + + if (!rp) { + freeaddrinfo(res); + return -ENOENT; + } + + tcp = calloc(1, sizeof(*tcp)); + if (!tcp) { + freeaddrinfo(res); + close(fd); + return -ENOMEM; + } + + memcpy(tcp->addr, rp->ai_addr, rp->ai_addrlen); + tcp->addr_len = rp->ai_addrlen; + + tcp->fd = fd; + + freeaddrinfo(res); + + *tcp_out = tcp; + + return 0; +} + +int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp) +{ + static char buf[CLD_RAW_MSG_SZ]; /* BUG: static buf */ + ssize_t rc, crc; + void *p; + + if (tcp->ubbp_read < sizeof(tcp->ubbp)) { + p = &tcp->ubbp; + p += tcp->ubbp_read; + rc = read(tcp->fd, p, sizeof(tcp->ubbp) - tcp->ubbp_read); + if (rc < 0) { + if (errno != EAGAIN) + return -errno; + return 0; + } + + tcp->ubbp_read += rc; + if (tcp->ubbp_read == sizeof(tcp->ubbp)) { +#ifdef WORDS_BIGENDIAN + swab32(ubbp.op_size); +#endif + + if (memcmp(tcp->ubbp.magic, "CLD1", 4)) + return -EIO; + if (UBBP_OP(tcp->ubbp.op_size) != 2) + return -EIO; + tcp->raw_read = 0; + tcp->raw_size = UBBP_SIZE(tcp->ubbp.op_size); + if (tcp->raw_size > CLD_RAW_MSG_SZ) + return -EIO; + } + } + if (!tcp->raw_size) + return 0; + + p = buf; /* BUG: uses temp buffer */ + p += tcp->raw_read; + rc = read(tcp->fd, p, tcp->raw_size - tcp->raw_read); + if (rc < 0) { + if (errno != EAGAIN) + return -errno; + return 0; + } + + tcp->raw_read += rc; + + if (tcp->raw_read < tcp->raw_size) + return 0; + + tcp->ubbp_read = 0; + + crc = cldc_receive_pkt(tcp->sess, tcp->addr, tcp->addr_len, buf, + tcp->raw_size); + if (crc) + return crc; + + return 0; +} + +int cldc_tcp_pkt_send(void *private, + const void *addr, size_t addrlen, + const void *buf, size_t buflen) +{ + struct cldc_tcp *tcp = private; + ssize_t rc; + struct ubbp_header ubbp; + + memcpy(ubbp.magic, "CLD1", 4); + ubbp.op_size = (buflen << 8) | 1; +#ifdef WORDS_BIGENDIAN + swab32(ubbp.op_size); +#endif + + rc = write(tcp->fd, &ubbp, sizeof(ubbp)); + if (rc < 0) + return -errno; + + rc = write(tcp->fd, buf, buflen); + if (rc < 0) + return -errno; + + return 0; +} + diff --git a/lib/cldc-udp.c b/lib/cldc-udp.c deleted file mode 100644 index 3042746..0000000 --- a/lib/cldc-udp.c +++ /dev/null @@ -1,141 +0,0 @@ - -/* - * Copyright 2009 Red Hat, Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; see the file COPYING. If not, write to - * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. - * - */ - -#define _GNU_SOURCE -#include "hail-config.h" - -#include <sys/types.h> -#include <sys/socket.h> -#include <string.h> -#include <stdlib.h> -#include <unistd.h> -#include <stdio.h> -#include <errno.h> -#include <netinet/in.h> -#include <netdb.h> -#include <cldc.h> - -void cldc_udp_free(struct cldc_udp *udp) -{ - if (!udp) - return; - - if (udp->fd >= 0) - close(udp->fd); - - free(udp); -} - -int cldc_udp_new(const char *hostname, int port, - struct cldc_udp **udp_out) -{ - struct cldc_udp *udp; - struct addrinfo hints, *res, *rp; - char port_s[32]; - int rc, fd = -1; - - *udp_out = NULL; - - sprintf(port_s, "%d", port); - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_DGRAM; - hints.ai_protocol = IPPROTO_UDP; - - rc = getaddrinfo(hostname, port_s, &hints, &res); - if (rc) - return -ENOENT; - - for (rp = res; rp; rp = rp->ai_next) { - fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (fd < 0) - continue; - - if (connect(fd, rp->ai_addr, rp->ai_addrlen) != -1) - break; /* success */ - - close(fd); - fd = -1; - } - - if (!rp) { - freeaddrinfo(res); - return -ENOENT; - } - - udp = calloc(1, sizeof(*udp)); - if (!udp) { - freeaddrinfo(res); - close(fd); - return -ENOMEM; - } - - memcpy(udp->addr, rp->ai_addr, rp->ai_addrlen); - udp->addr_len = rp->ai_addrlen; - - udp->fd = fd; - - freeaddrinfo(res); - - *udp_out = udp; - - return 0; -} - -int cldc_udp_receive_pkt(struct cldc_udp *udp) -{ - char buf[2048]; - ssize_t rc, crc; - - rc = recv(udp->fd, buf, sizeof(buf), MSG_DONTWAIT); - if (rc < 0) { - if (errno != EAGAIN) - return -errno; - } - if (rc <= 0) - return 0; - - if (!udp->sess) - return -ENXIO; - - crc = cldc_receive_pkt(udp->sess, udp->addr, udp->addr_len, buf, rc); - if (crc) - return crc; - - return 0; -} - -int cldc_udp_pkt_send(void *private, - const void *addr, size_t addrlen, - const void *buf, size_t buflen) -{ - struct cldc_udp *udp = private; - ssize_t rc; - - /* we are connected, so we ignore addr and addrlen args */ - rc = send(udp->fd, buf, buflen, MSG_DONTWAIT); - if (rc < 0) - return -errno; - if (rc != buflen) - return -EILSEQ; - - return 0; -} - diff --git a/lib/cldc.c b/lib/cldc.c index 5209ea2..f0c3b59 100644 --- a/lib/cldc.c +++ b/lib/cldc.c @@ -1375,13 +1375,13 @@ static int ncld_gethost(char **hostp, unsigned short *portp, return 0; } -static void ncld_udp_timer_event(struct cld_timer *timer) +static void ncld_tcp_timer_event(struct cld_timer *timer) { struct ncld_sess *nsess = timer->userdata; - struct cldc_udp *udp = nsess->udp; + struct cldc_tcp *tcp = nsess->tcp; - if (udp->cb) - udp->cb(udp->sess, udp->cb_private); + if (tcp->cb) + tcp->cb(tcp->sess, tcp->cb_private); } enum { @@ -1447,7 +1447,7 @@ static gpointer ncld_sess_thr(gpointer data) memset(pfd, 0, sizeof(pfd)); pfd[0].fd = nsess->to_thread[0]; pfd[0].events = POLLIN; - pfd[1].fd = nsess->udp->fd; + pfd[1].fd = nsess->tcp->fd; pfd[1].events = POLLIN; rc = poll(pfd, 2, tmo ? tmo*1000 : -1); @@ -1464,7 +1464,7 @@ static gpointer ncld_sess_thr(gpointer data) ncld_thread_command(nsess); } else { g_mutex_lock(nsess->mutex); - cldc_udp_receive_pkt(nsess->udp); + cldc_tcp_receive_pkt_data(nsess->tcp); g_mutex_unlock(nsess->mutex); } } @@ -1491,14 +1491,14 @@ static bool ncld_p_timer_ctl(void *priv, bool add, void *cb_priv, time_t secs) { struct ncld_sess *nsess = priv; - struct cldc_udp *udp = nsess->udp; + struct cldc_tcp *tcp = nsess->tcp; if (add) { - udp->cb = cb; - udp->cb_private = cb_priv; - cld_timer_add(&nsess->tlist, &nsess->udp_timer, time(NULL) + secs); + tcp->cb = cb; + tcp->cb_private = cb_priv; + cld_timer_add(&nsess->tlist, &nsess->tcp_timer, time(NULL) + secs); } else { - cld_timer_del(&nsess->tlist, &nsess->udp_timer); + cld_timer_del(&nsess->tlist, &nsess->tcp_timer); } return true; } @@ -1507,7 +1507,7 @@ static int ncld_p_pkt_send(void *priv, const void *addr, size_t addrlen, const void *buf, size_t buflen) { struct ncld_sess *nsess = priv; - return cldc_udp_pkt_send(nsess->udp, addr, addrlen, buf, buflen); + return cldc_tcp_pkt_send(nsess->tcp, addr, addrlen, buf, buflen); } static void ncld_p_event(void *priv, struct cldc_session *csp, @@ -1517,7 +1517,7 @@ static void ncld_p_event(void *priv, struct cldc_session *csp, unsigned char cmd; if (what == CE_SESS_FAILED) { - if (nsess->udp->sess != csp) + if (nsess->tcp->sess != csp) abort(); if (!nsess->is_up) return; @@ -1542,7 +1542,7 @@ static void ncld_p_event(void *priv, struct cldc_session *csp, * Notice that we are already running on the context of the * thread that will deliver the event, so pipe really is not * needed: could as well set a flag and test it right after - * the call to cldc_udp_receive_pkt(). But pipe also provides + * the call to cldc_tcp_receive_pkt_data(). But pipe also provides * a queue of events, just in case. It's not like these events * are super-performance critical. */ @@ -1634,8 +1634,8 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error, if (!nsess) goto out_sesalloc; memset(nsess, 0, sizeof(struct ncld_sess)); - cld_timer_init(&nsess->udp_timer, "nsess-udp-timer", - ncld_udp_timer_event, nsess); + cld_timer_init(&nsess->tcp_timer, "nsess-tcp-timer", + ncld_tcp_timer_event, nsess); nsess->mutex = g_mutex_new(); if (!nsess->mutex) goto out_mutex; @@ -1661,9 +1661,9 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error, goto out_pipe_to; } - if (cldc_udp_new(nsess->host, nsess->port, &nsess->udp)) { + if (cldc_tcp_new(nsess->host, nsess->port, &nsess->tcp)) { err = 1023; - goto out_udp; + goto out_tcp; } nsess->thread = g_thread_create(ncld_sess_thr, nsess, TRUE, &gerr); @@ -1677,9 +1677,9 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error, copts.cb = ncld_new_sess; copts.private = nsess; if (cldc_new_sess_log(&ncld_ops, &copts, - nsess->udp->addr, nsess->udp->addr_len, + nsess->tcp->addr, nsess->tcp->addr_len, cld_user, cld_key, nsess, log, - &nsess->udp->sess)) { + &nsess->tcp->sess)) { g_mutex_unlock(nsess->mutex); err = 1024; goto out_session; @@ -1695,12 +1695,12 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error, return nsess; out_start: - cldc_kill_sess(nsess->udp->sess); + cldc_kill_sess(nsess->tcp->sess); out_session: ncld_thr_end(nsess); out_thread: - cldc_udp_free(nsess->udp); -out_udp: + cldc_tcp_free(nsess->tcp); +out_tcp: close(nsess->to_thread[0]); close(nsess->to_thread[1]); out_pipe_to: @@ -1774,7 +1774,7 @@ struct ncld_fh *ncld_open(struct ncld_sess *nsess, const char *fname, memset(&copts, 0, sizeof(copts)); copts.cb = ncld_open_cb; copts.private = fh; - rc = cldc_open(nsess->udp->sess, &copts, fname, mode, events, &fh->fh); + rc = cldc_open(nsess->tcp->sess, &copts, fname, mode, events, &fh->fh); if (rc) { err = -rc; g_mutex_unlock(nsess->mutex); @@ -1850,7 +1850,7 @@ int ncld_del(struct ncld_sess *nsess, const char *fname) memset(&copts, 0, sizeof(copts)); copts.cb = ncld_del_cb; copts.private = &dpb; - rc = cldc_del(nsess->udp->sess, &copts, fname); + rc = cldc_del(nsess->tcp->sess, &copts, fname); if (rc) { g_mutex_unlock(nsess->mutex); return -rc; @@ -2266,9 +2266,9 @@ void ncld_sess_close(struct ncld_sess *nsess) ncld_close(p); g_list_free(nsess->handles); - cldc_kill_sess(nsess->udp->sess); + cldc_kill_sess(nsess->tcp->sess); ncld_thr_end(nsess); - cldc_udp_free(nsess->udp); + cldc_tcp_free(nsess->tcp); close(nsess->to_thread[0]); close(nsess->to_thread[1]); g_cond_free(nsess->cond); -- To unsubscribe from this list: send the line "unsubscribe hail-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html