As we discussed at the cluster summit, increasing the message size inside corosync itself is not only dangerous, but is only needed for a very few corner cases .. all of which involve CPG. So, to allow large CPG messages (which is needed) I have added an extra facility to libcpg that will fragment messages that are too large for corosync's internal buffers. It does this transparently to the application. zero-copy sends are NOT supported for this feature. I've also included a test program 'cpghum' that can test this facility with message sequence numbers and checksums. Signed-Off-By: Christine Caulfield <ccaulfie@xxxxxxxxxx>
diff --git a/exec/cpg.c b/exec/cpg.c index 1c6fbb9..3d83982 100644 --- a/exec/cpg.c +++ b/exec/cpg.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006-2012 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -83,7 +83,8 @@ enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, MESSAGE_REQ_EXEC_CPG_MCAST = 3, MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4, - MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5 + MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5, + MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6, }; struct zcb_mapped { @@ -224,6 +225,10 @@ static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid); +static void message_handler_req_exec_cpg_partial_mcast ( + const void *message, + unsigned int nodeid); + static void message_handler_req_exec_cpg_downlist_old ( const void *message, unsigned int nodeid); @@ -238,6 +243,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg); static void exec_cpg_mcast_endian_convert (void *msg); +static void exec_cpg_partial_mcast_endian_convert (void *msg); + static void exec_cpg_downlist_endian_convert_old (void *msg); static void exec_cpg_downlist_endian_convert (void *msg); @@ -250,6 +257,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag static void message_handler_req_lib_cpg_mcast (void *conn, const void *message); +static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message); + static void message_handler_req_lib_cpg_membership (void *conn, const void *message); @@ -383,7 +392,10 @@ static struct corosync_lib_handler cpg_lib_engine[] = .lib_handler_fn = message_handler_req_lib_cpg_zc_execute, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, - + { /* 12 */ + .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast, + .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED + }, }; @@ -413,6 +425,10 @@ static struct corosync_exec_handler cpg_exec_engine[] = .exec_handler_fn = message_handler_req_exec_cpg_downlist, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert }, + { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */ + .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast, + .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert + }, }; struct corosync_service_engine cpg_service_engine = { @@ -457,6 +473,17 @@ struct req_exec_cpg_mcast { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct req_exec_cpg_partial_mcast { + struct qb_ipc_request_header header __attribute__((aligned(8))); + mar_cpg_name_t group_name __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_message_source_t source __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct req_exec_cpg_downlist_old { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); @@ -1186,6 +1213,19 @@ static void exec_cpg_mcast_endian_convert (void *msg) swab_mar_message_source_t (&req_exec_cpg_mcast->source); } +static void exec_cpg_partial_mcast_endian_convert (void *msg) +{ + struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg; + + swab_coroipc_request_header_t (&req_exec_cpg_mcast->header); + swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); + req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); + req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); + req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen); + req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type); + swab_mar_message_source_t (&req_exec_cpg_mcast->source); +} + static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) { struct list_head *iter; @@ -1452,6 +1492,67 @@ static void message_handler_req_exec_cpg_mcast ( } } } +static void message_handler_req_exec_cpg_partial_mcast ( + const void *message, + unsigned int nodeid) +{ + const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message; + struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast; + int msglen = req_exec_cpg_mcast->fraglen; + struct list_head *iter, *pi_iter; + struct cpg_pd *cpd; + struct iovec iovec[2]; + int known_node = 0; + + log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen); + + res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK; + res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen; + res_lib_cpg_mcast.fraglen = msglen; + res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen; + res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid; + res_lib_cpg_mcast.type = req_exec_cpg_mcast->type; + res_lib_cpg_mcast.nodeid = nodeid; + + memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name, + sizeof(mar_cpg_name_t)); + iovec[0].iov_base = (void *)&res_lib_cpg_mcast; + iovec[0].iov_len = sizeof (res_lib_cpg_mcast); + + iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast); + iovec[1].iov_len = msglen; + + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) { + cpd = list_entry(iter, struct cpg_pd, list); + iter = iter->next; + + if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED) + && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) { + + if (!known_node) { + /* Try to find, if we know the node */ + for (pi_iter = process_info_list_head.next; + pi_iter != &process_info_list_head; pi_iter = pi_iter->next) { + + struct process_info *pi = list_entry (pi_iter, struct process_info, list); + + if (pi->nodeid == nodeid && + mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) { + known_node = 1; + break; + } + } + } + + if (!known_node) { + log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message"); + return ; + } + + api->ipc_dispatch_iov_send (cpd->conn, iovec, 2); + } + } +} static int cpg_exec_send_downlist(void) @@ -1864,6 +1965,62 @@ static void message_handler_req_lib_cpg_zc_free ( res_header.size); } +/* Fragmented mcast message from the library */ +static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message) +{ + const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message; + struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); + mar_cpg_name_t group_name = cpd->group_name; + + struct iovec req_exec_cpg_iovec[2]; + struct req_exec_cpg_partial_mcast req_exec_cpg_mcast; + int msglen = req_lib_cpg_mcast->fraglen; + int result; + cs_error_t error = CS_ERR_NOT_EXIST; + + log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn); + log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen); + + switch (cpd->cpd_state) { + case CPD_STATE_UNJOINED: + error = CS_ERR_NOT_EXIST; + break; + case CPD_STATE_LEAVE_STARTED: + error = CS_ERR_NOT_EXIST; + break; + case CPD_STATE_JOIN_STARTED: + error = CS_OK; + break; + case CPD_STATE_JOIN_COMPLETED: + error = CS_OK; + break; + } + + if (error == CS_OK) { + req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; + req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, + MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST); + req_exec_cpg_mcast.pid = cpd->pid; + req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen; + req_exec_cpg_mcast.type = req_lib_cpg_mcast->type; + req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen; + api->ipc_source_set (&req_exec_cpg_mcast.source, conn); + memcpy(&req_exec_cpg_mcast.group_name, &group_name, + sizeof(mar_cpg_name_t)); + + req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; + req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); + req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; + req_exec_cpg_iovec[1].iov_len = msglen; + + result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); + assert(result == 0); + } else { + log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d", + conn, group_name.value, cpd->cpd_state, error); + } +} + /* Mcast message from the library */ static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) { diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h index a95335a..d7000a5 100644 --- a/include/corosync/ipc_cpg.h +++ b/include/corosync/ipc_cpg.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006-2011 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -55,6 +55,7 @@ enum req_cpg_types { MESSAGE_REQ_CPG_ZC_ALLOC = 9, MESSAGE_REQ_CPG_ZC_FREE = 10, MESSAGE_REQ_CPG_ZC_EXECUTE = 11, + MESSAGE_REQ_CPG_PARTIAL_MCAST = 12, }; enum res_cpg_types { @@ -75,6 +76,7 @@ enum res_cpg_types { MESSAGE_RES_CPG_ZC_ALLOC = 14, MESSAGE_RES_CPG_ZC_FREE = 15, MESSAGE_RES_CPG_ZC_EXECUTE = 16, + MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17, }; enum lib_cpg_confchg_reason { @@ -85,6 +87,12 @@ enum lib_cpg_confchg_reason { CONFCHG_CPG_REASON_PROCDOWN = 5 }; +enum lib_cpg_partial_types { + LIBCPG_PARTIAL_FIRST = 1, + LIBCPG_PARTIAL_CONTINUED = 2, + LIBCPG_PARTIAL_LAST = 3, +}; + typedef struct { uint32_t length __attribute__((aligned(8))); char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8))); @@ -207,6 +215,15 @@ struct req_lib_cpg_mcast { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct req_lib_cpg_partial_mcast { + struct qb_ipc_response_header header __attribute__((aligned(8))); + mar_uint32_t guarantee __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct res_lib_cpg_mcast { struct qb_ipc_response_header header __attribute__((aligned(8))); }; @@ -223,6 +240,17 @@ struct res_lib_cpg_deliver_callback { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct res_lib_cpg_partial_deliver_callback { + struct qb_ipc_response_header header __attribute__((aligned(8))); + mar_cpg_name_t group_name __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t nodeid __attribute__((aligned(8))); + mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct res_lib_cpg_flowcontrol_callback { struct qb_ipc_response_header header __attribute__((aligned(8))); mar_uint32_t flow_control_state __attribute__((aligned(8))); diff --git a/lib/cpg.c b/lib/cpg.c index 4b92f44..376914c 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -1,7 +1,7 @@ /* * vi: set autoindent tabstop=4 shiftwidth=4 : * - * Copyright (c) 2006-2012 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -83,6 +83,14 @@ struct cpg_inst { cpg_model_v1_data_t model_v1_data; }; struct list_head iteration_list_head; + uint32_t max_msg_size; + char *assembly_buf; + uint32_t assembly_buf_ptr; + int assembling; /* Flag that says we have started assembling a message. + * It's here to catch the situation where a node joins + * the cluster/group in the middle of a CPG message send + * so we don't pass on a partial message to the client. + */ }; static void cpg_inst_free (void *inst); @@ -210,6 +218,8 @@ cs_error_t cpg_model_initialize ( } } + /* Allow space for corosync internal headers */ + cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024; cpg_inst->model_data.model = model; cpg_inst->context = context; @@ -339,6 +349,7 @@ cs_error_t cpg_dispatch ( struct cpg_inst *cpg_inst; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; + struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback; struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback; struct cpg_inst cpg_inst_copy; struct qb_ipc_response_header *dispatch_data; @@ -361,7 +372,7 @@ cs_error_t cpg_dispatch ( /* * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and - * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING + * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING */ if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) { timeout = 0; @@ -428,6 +439,43 @@ cs_error_t cpg_dispatch ( res_cpg_deliver_callback->msglen); break; + case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK: + res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data; + + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_partial_deliver_callback->group_name); + + if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) { + /* + * Allocate a buffer to contain a full message. + */ + cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen); + if (!cpg_inst->assembly_buf) { + error = CS_ERR_NO_MEMORY; + goto error_put; + } + cpg_inst->assembling = 1; + cpg_inst->assembly_buf_ptr = 0; + } + if (cpg_inst->assembling) { + memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr, + res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen); + cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen; + + if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) { + cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle, + &group_name, + res_cpg_partial_deliver_callback->nodeid, + res_cpg_partial_deliver_callback->pid, + cpg_inst->assembly_buf, + res_cpg_partial_deliver_callback->msglen); + free(cpg_inst->assembly_buf); + cpg_inst->assembling = 0; + } + } + break; + case MESSAGE_RES_CPG_CONFCHG_CALLBACK: if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) { break; @@ -921,6 +969,12 @@ cs_error_t cpg_zcb_mcast_joined ( if (error != CS_OK) { return (error); } + + if (msg_len > IPC_REQUEST_SIZE) { + error = CS_ERR_TOO_BIG; + goto error_exit; + } + req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast)); req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) + msg_len; @@ -957,6 +1011,76 @@ error_exit: return (error); } +static cs_error_t send_fragments ( + struct cpg_inst *cpg_inst, + cpg_guarantee_t guarantee, + size_t msg_len, + const struct iovec *iovec, + unsigned int iov_len) +{ + int i; + cs_error_t error = CS_OK; + struct iovec iov[2]; + struct req_lib_cpg_partial_mcast req_lib_cpg_mcast; + size_t sent = 0; + size_t iov_sent = 0; + + req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST; + req_lib_cpg_mcast.guarantee = guarantee; + req_lib_cpg_mcast.msglen = msg_len; + + iov[0].iov_base = (void *)&req_lib_cpg_mcast; + iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast); + + i=0; + iov_sent = 0 ; + qb_ipcc_fc_enable_max_set(cpg_inst->c, 2); + + while (error == CS_OK && sent < msg_len) { + + if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) { + iov[1].iov_len = cpg_inst->max_msg_size; + } + else { + iov[1].iov_len = iovec[i].iov_len - iov_sent; + } + + if (sent == 0) { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST; + } + else if ((sent + iov[1].iov_len) == msg_len) { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST; + } + else { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED; + } + + req_lib_cpg_mcast.fraglen = iov[1].iov_len; + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len; + iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent; + + resend: + error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, 2)); + if (error == CS_ERR_TRY_AGAIN) { + usleep(10000); + goto resend; + } + + iov_sent += iov[1].iov_len; + sent += iov[1].iov_len; + + /* Next iovec */ + if (iov_sent >= iovec[i].iov_len) { + i++; + iov_sent = 0; + } + } + qb_ipcc_fc_enable_max_set(cpg_inst->c, 1); + + return error; +} + + cs_error_t cpg_mcast_joined ( cpg_handle_t handle, cpg_guarantee_t guarantee, @@ -979,6 +1103,11 @@ cs_error_t cpg_mcast_joined ( msg_len += iovec[i].iov_len; } + if (msg_len > cpg_inst->max_msg_size) { + error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len); + goto error_exit; + } + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) + msg_len; @@ -994,6 +1123,7 @@ cs_error_t cpg_mcast_joined ( error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1)); qb_ipcc_fc_enable_max_set(cpg_inst->c, 1); +error_exit: hdb_handle_put (&cpg_handle_t_db, handle); return (error); diff --git a/test/Makefile.am b/test/Makefile.am index c19e506..bb11518 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -34,7 +34,7 @@ MAINTAINERCLEANFILES = Makefile.in EXTRA_DIST = ploadstart.sh -noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench \ +noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench cpghum \ testquorum testvotequorum1 testvotequorum2 \ stress_cpgfdget stress_cpgcontext cpgbound testsam \ testcpgzc cpgbenchzc testzcgc stress_cpgzc @@ -48,6 +48,7 @@ testzcgc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgzc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgfdget_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgcontext_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la +cpghum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz testquorum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la testvotequorum1_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la testvotequorum2_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la diff --git a/test/cpghum.c b/test/cpghum.c new file mode 100644 index 0000000..0a096db --- /dev/null +++ b/test/cpghum.c @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2015 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Christine Caulfield <ccaulfie@xxxxxxxxxx> + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> +#include <errno.h> +#include <unistd.h> +#include <time.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/select.h> +#include <sys/uio.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <pthread.h> +#include <zlib.h> +#include <libgen.h> + +#include <qb/qblog.h> +#include <qb/qbutil.h> + +#include <corosync/corotypes.h> +#include <corosync/cpg.h> + +static cpg_handle_t handle; + +static pthread_t thread; + +#ifndef timersub +#define timersub(a, b, result) \ + do { \ + (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ + (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ + if ((result)->tv_usec < 0) { \ + --(result)->tv_sec; \ + (result)->tv_usec += 1000000; \ + } \ + } while (0) +#endif /* timersub */ + +static int alarm_notice; +#define ONE_MEG 1048576 +#define DATASIZE (ONE_MEG*20) +static char data[DATASIZE]; +static int send_counter = 0; +static int do_syslog = 0; +static int quiet = 0; +static volatile int stopped; + +// stats +static unsigned int length_errors=0; +static unsigned int crc_errors=0; +static unsigned int sequence_errors=0; +static unsigned int packets_sent=0; +static unsigned int packets_recvd=0; +static unsigned int send_retries=0; +static unsigned int send_fails=0; + +static void cpg_bm_confchg_fn ( + cpg_handle_t handle_in, + const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ +} + +static unsigned int g_recv_count; +static unsigned int g_recv_length; +static unsigned int g_write_size; +static int g_recv_counter = 0; + +static void cpg_bm_deliver_fn ( + cpg_handle_t handle_in, + const struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + void *msg, + size_t msg_len) +{ + int *value = msg; + uLong crc=0; + ulong recv_crc = value[1] & 0xFFFFFFFF; + + packets_recvd++; + g_recv_length = msg_len; + + // Basic check, packets should all be the right size + if (g_write_size && (msg_len != g_write_size)) { + length_errors++; + fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); + if (do_syslog) { + syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); + } + } + + // Sequence counters are incrementing in step? + if (*value != g_recv_counter) { + sequence_errors++; + fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); + if (do_syslog) { + syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); + } + // Catch up or we'll be printing errors for ever + g_recv_counter = *value +1; + } else { + g_recv_counter++; + } + + // Check crc + crc = crc32(0, NULL, 0); + crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF; + if (crc != recv_crc) { + crc_errors++; + fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); + if (do_syslog) { + syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); + } + } + + g_recv_count++; + +} + +static cpg_model_v1_data_t model1_data = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn, +}; + +static cpg_callbacks_t callbacks = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn +}; + +static struct cpg_name group_name = { + .value = "cpghum", + .length = 7 +}; + +static void cpg_test ( + cpg_handle_t handle_in, + int write_size, + int delay_time, + int print_time) +{ + struct timeval tv1, tv2, tv_elapsed; + struct iovec iov; + unsigned int res; + int i; + unsigned int *dataint = (unsigned int *)data; + uLong crc; + + alarm_notice = 0; + iov.iov_base = data; + iov.iov_len = write_size; + + g_recv_count = 0; + alarm (print_time); + + gettimeofday (&tv1, NULL); + do { + dataint[0] = send_counter++; + for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) { + dataint[i] = rand(); + } + crc = crc32(0, NULL, 0); + dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2); + resend: + res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1); + if (res == CS_ERR_TRY_AGAIN) { + usleep(10000); + send_retries++; + goto resend; + } + if (res != CS_OK) { + fprintf(stderr, "send failed: %d\n", res); + send_fails++; + } + else { + packets_sent++; + } + usleep(delay_time*1000); + } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0); + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + if (!quiet) { + printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s"); + printf ("%5d bytes per write\n", write_size); + } + +} + +static void sigalrm_handler (int num) +{ + alarm_notice = 1; +} + +static void sigint_handler (int num) +{ + stopped = 1; +} + +static void* dispatch_thread (void *arg) +{ + cpg_dispatch (handle, CS_DISPATCH_BLOCKING); + return NULL; +} + +static void usage(char *cmd) +{ + fprintf(stderr, "%s [OPTIONS]\n", cmd); + fprintf(stderr, "\n"); + fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd); + fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n"); + fprintf(stderr, "corrupted messages will be detected and reported.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd); + fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n"); + fprintf(stderr, "and it, obviously, must match that of the sender.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n"); + fprintf(stderr, "different nodes by using the -n option.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd); + fprintf(stderr, "sequence numbers.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, " -w Write size in Kbytes, default 4\n"); + fprintf(stderr, " -W Write size in bytes, default 4096\n"); + fprintf(stderr, " -n CPG name to use, default 'cpghum'\n"); + fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n"); + fprintf(stderr, " -r Number of repetitions, default 100\n"); + fprintf(stderr, " -p Delay between printing output(S), default 10s\n"); + fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n"); + fprintf(stderr, " -m cpg_initialise() model. Default 1.\n"); + fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n"); + fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n"); + fprintf(stderr, "\n"); +} + +int main (int argc, char *argv[]) { + int i; + unsigned int res; + const char *cpgname = "cpghum"; + int opt; + int bs; + int write_size = 4096; + int delay_time = 1000; + int repetitions = 100; + int print_time = 10; + int have_size = 0; + int listen_only = 0; + int model = 1; + + while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) { + switch (opt) { + case 'w': // Write size in K + bs = atoi(optarg); + if (bs > 0) { + write_size = bs*1024; + have_size = 1; + } + break; + case 'W': // Write size in bytes + bs = atoi(optarg); + if (bs > 0) { + write_size = bs; + have_size = 1; + } + break; + case 'n': + strcpy(group_name.value, optarg); + group_name.length = strlen(group_name.value); + break; + case 'd': + delay_time = atoi(optarg); + break; + case 'r': + repetitions = atoi(optarg); + break; + case 'p': + print_time = atoi(optarg); + break; + case 'l': + listen_only = 1; + break; + case 's': + do_syslog = 1; + break; + case 'q': + quiet = 1; + break; + case 'm': + model = atoi(optarg); + if (model < 0 || model > 1) { + fprintf(stderr, "%s: Model must be 0-1\n", argv[0]); + exit(1); + } + break; + case '?': + usage(basename(argv[0])); + exit(0); + } + } + + qb_log_init("cpghum", LOG_USER, LOG_EMERG); + qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FILE, "*", LOG_DEBUG); + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); + + g_write_size = write_size; + + signal (SIGALRM, sigalrm_handler); + signal (SIGINT, sigint_handler); + switch (model) { + case 0: + res = cpg_initialize (&handle, &callbacks); + break; + case 1: + res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL); + break; + default: + res=999; // can't get here but it keeps the compiler happy + break; + } + + if (res != CS_OK) { + printf ("cpg_initialize failed with result %d\n", res); + exit (1); + } + pthread_create (&thread, NULL, dispatch_thread, NULL); + + res = cpg_join (handle, &group_name); + if (res != CS_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + + if (listen_only) { + int secs; + if (!quiet) { + printf("-- Listening on CPG %s\n", cpgname); + printf("-- Ignore any starting \"counters don't match\" error while we catch up\n"); + } + + /* Only check packet size if specified on the command-line */ + if (!have_size) { + g_write_size = 0; + } + + while (!stopped) { + sleep(1); + if (++secs > print_time && !quiet) { + printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length); + secs = 0; + g_recv_count = 0; + } + } + } + else { + /* We're a test app .. we 'know' that IPC_SIZE is 1 Meg. Don't do this at home. */ + if ( write_size > ONE_MEG - 1024) { + fprintf(stderr, "INFO: packet size (%d) is larger than IPC_REQUEST_SIZE-1K (%d), libcpg will fragment\n", + write_size, ONE_MEG-1024); + } + for (i = 0; i < repetitions && !stopped; i++) { + cpg_test (handle, write_size, delay_time, print_time); + signal (SIGALRM, sigalrm_handler); + } + } + + res = cpg_finalize (handle); + if (res != CS_OK) { + printf ("cpg_finalize failed with result %d\n", res); + exit (1); + } + + printf("\n"); + printf("Stats:\n"); + if (!listen_only) { + printf(" packets sent: %d\n", packets_sent); + printf(" send failures: %d\n", send_fails); + printf(" send retries: %d\n", send_retries); + } + if (have_size) { + printf(" length errors: %d\n", length_errors); + } + printf(" packets recvd: %d\n", packets_recvd); + printf(" sequence errors: %d\n", sequence_errors); + printf(" crc errors: %d\n", crc_errors); + printf("\n"); + return (0); +}
_______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss