[PATCH] Allow cpg to send large messages

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



As we discussed at the cluster summit, increasing the message size
inside corosync itself is not only dangerous, but is only needed for a
very few corner cases .. all of which involve CPG.

So, to allow large CPG messages (which is needed) I have added an extra
facility to libcpg that will fragment messages that are too large for
corosync's internal buffers. It does this transparently to the
application. zero-copy sends are NOT supported for this feature.

I've also included a test program 'cpghum' that can test this facility
with message sequence numbers and checksums.

Signed-Off-By: Christine Caulfield <ccaulfie@xxxxxxxxxx>

diff --git a/exec/cpg.c b/exec/cpg.c
index 1c6fbb9..3d83982 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
 	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
 	MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
+	MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
 };
 
 struct zcb_mapped {
@@ -224,6 +225,10 @@ static void message_handler_req_exec_cpg_mcast (
 	const void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid);
+
 static void message_handler_req_exec_cpg_downlist_old (
 	const void *message,
 	unsigned int nodeid);
@@ -238,6 +243,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
+
 static void exec_cpg_downlist_endian_convert_old (void *msg);
 
 static void exec_cpg_downlist_endian_convert (void *msg);
@@ -250,6 +257,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
 
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
 
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
+
 static void message_handler_req_lib_cpg_membership (void *conn,
 						    const void *message);
 
@@ -383,7 +392,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
 		.lib_handler_fn				= message_handler_req_lib_cpg_zc_execute,
 		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
 	},
-
+	{ /* 12 */
+		.lib_handler_fn				= message_handler_req_lib_cpg_partial_mcast,
+		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
+	},
 
 };
 
@@ -413,6 +425,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
 		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
 		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
 	},
+	{ /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
+		.exec_handler_fn	= message_handler_req_exec_cpg_partial_mcast,
+		.exec_endian_convert_fn	= exec_cpg_partial_mcast_endian_convert
+	},
 };
 
 struct corosync_service_engine cpg_service_engine = {
@@ -457,6 +473,17 @@ struct req_exec_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_exec_cpg_partial_mcast {
+	struct qb_ipc_request_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+        mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_message_source_t source __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct req_exec_cpg_downlist_old {
 	struct qb_ipc_request_header header __attribute__((aligned(8)));
 	mar_uint32_t left_nodes __attribute__((aligned(8)));
@@ -1186,6 +1213,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
 	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
 }
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
+{
+	struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
+
+	swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
+	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
+	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
+	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
+	req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
+	req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
+	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
+}
+
 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
 	struct list_head *iter;
 
@@ -1452,6 +1492,67 @@ static void message_handler_req_exec_cpg_mcast (
 		}
 	}
 }
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid)
+{
+	const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
+	struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
+	int msglen = req_exec_cpg_mcast->fraglen;
+	struct list_head *iter, *pi_iter;
+	struct cpg_pd *cpd;
+	struct iovec iovec[2];
+	int known_node = 0;
+
+	log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
+
+	res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
+	res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
+	res_lib_cpg_mcast.fraglen = msglen;
+	res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
+	res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
+	res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
+	res_lib_cpg_mcast.nodeid = nodeid;
+
+	memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
+		sizeof(mar_cpg_name_t));
+	iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
+	iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
+
+	iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
+	iovec[1].iov_len = msglen;
+
+	for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
+		cpd = list_entry(iter, struct cpg_pd, list);
+		iter = iter->next;
+
+		if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
+			&& (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
+
+			if (!known_node) {
+				/* Try to find, if we know the node */
+				for (pi_iter = process_info_list_head.next;
+					pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
+
+					struct process_info *pi = list_entry (pi_iter, struct process_info, list);
+
+					if (pi->nodeid == nodeid &&
+						mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
+						known_node = 1;
+						break;
+					}
+				}
+			}
+
+			if (!known_node) {
+				log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
+				return ;
+			}
+
+			api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
+		}
+	}
+}
 
 
 static int cpg_exec_send_downlist(void)
@@ -1864,6 +1965,62 @@ static void message_handler_req_lib_cpg_zc_free (
 		res_header.size);
 }
 
+/* Fragmented mcast message from the library */
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
+{
+	const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
+	struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
+	mar_cpg_name_t group_name = cpd->group_name;
+
+	struct iovec req_exec_cpg_iovec[2];
+	struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+	int msglen = req_lib_cpg_mcast->fraglen;
+	int result;
+	cs_error_t error = CS_ERR_NOT_EXIST;
+
+	log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
+	log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
+
+	switch (cpd->cpd_state) {
+	case CPD_STATE_UNJOINED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_LEAVE_STARTED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_JOIN_STARTED:
+		error = CS_OK;
+		break;
+	case CPD_STATE_JOIN_COMPLETED:
+		error = CS_OK;
+		break;
+	}
+
+	if (error == CS_OK) {
+		req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
+		req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
+			MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
+		req_exec_cpg_mcast.pid = cpd->pid;
+		req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
+		req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
+		req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
+		api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
+		memcpy(&req_exec_cpg_mcast.group_name, &group_name,
+			sizeof(mar_cpg_name_t));
+
+		req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+		req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
+		req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+		req_exec_cpg_iovec[1].iov_len = msglen;
+
+		result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
+		assert(result == 0);
+	} else {
+		log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
+			conn, group_name.value, cpd->cpd_state, error);
+	}
+}
+
 /* Mcast message from the library */
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
 {
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index a95335a..d7000a5 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2011 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -55,6 +55,7 @@ enum req_cpg_types {
 	MESSAGE_REQ_CPG_ZC_ALLOC = 9,
 	MESSAGE_REQ_CPG_ZC_FREE = 10,
 	MESSAGE_REQ_CPG_ZC_EXECUTE = 11,
+	MESSAGE_REQ_CPG_PARTIAL_MCAST = 12,
 };
 
 enum res_cpg_types {
@@ -75,6 +76,7 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_ZC_ALLOC = 14,
 	MESSAGE_RES_CPG_ZC_FREE = 15,
 	MESSAGE_RES_CPG_ZC_EXECUTE = 16,
+	MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
 };
 
 enum lib_cpg_confchg_reason {
@@ -85,6 +87,12 @@ enum lib_cpg_confchg_reason {
 	CONFCHG_CPG_REASON_PROCDOWN = 5
 };
 
+enum lib_cpg_partial_types {
+	LIBCPG_PARTIAL_FIRST = 1,
+	LIBCPG_PARTIAL_CONTINUED = 2,
+	LIBCPG_PARTIAL_LAST = 3,
+};
+
 typedef struct {
 	uint32_t length __attribute__((aligned(8)));
 	char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8)));
@@ -207,6 +215,15 @@ struct req_lib_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_lib_cpg_partial_mcast {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_uint32_t guarantee __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+        mar_uint32_t fraglen __attribute__((aligned(8)));
+        mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 };
@@ -223,6 +240,17 @@ struct res_lib_cpg_deliver_callback {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_deliver_callback {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+    	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t nodeid __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+    	mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_flowcontrol_callback {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t flow_control_state __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 4b92f44..376914c 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -1,7 +1,7 @@
 /*
  * vi: set autoindent tabstop=4 shiftwidth=4 :
  *
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -83,6 +83,14 @@ struct cpg_inst {
 		cpg_model_v1_data_t model_v1_data;
 	};
 	struct list_head iteration_list_head;
+    uint32_t max_msg_size;
+    char *assembly_buf;
+    uint32_t assembly_buf_ptr;
+    int assembling; /* Flag that says we have started assembling a message.
+					 * It's here to catch the situation where a node joins
+					 * the cluster/group in the middle of a CPG message send
+					 * so we don't pass on a partial message to the client.
+					 */
 };
 static void cpg_inst_free (void *inst);
 
@@ -210,6 +218,8 @@ cs_error_t cpg_model_initialize (
 		}
 	}
 
+	/* Allow space for corosync internal headers */
+	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;	
 	cpg_inst->model_data.model = model;
 	cpg_inst->context = context;
 
@@ -339,6 +349,7 @@ cs_error_t cpg_dispatch (
 	struct cpg_inst *cpg_inst;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+	struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
 	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
 	struct cpg_inst cpg_inst_copy;
 	struct qb_ipc_response_header *dispatch_data;
@@ -361,7 +372,7 @@ cs_error_t cpg_dispatch (
 
 	/*
 	 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
-	 * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
+	 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
 	 */
 	if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 		timeout = 0;
@@ -428,6 +439,43 @@ cs_error_t cpg_dispatch (
 					res_cpg_deliver_callback->msglen);
 				break;
 
+			case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
+				res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
+
+				marshall_from_mar_cpg_name_t (
+					&group_name,
+					&res_cpg_partial_deliver_callback->group_name);
+
+				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+					    /*
+						 * Allocate a buffer to contain a full message.
+						 */
+					    cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+						if (!cpg_inst->assembly_buf) {
+							    error = CS_ERR_NO_MEMORY;
+								goto error_put;
+						}
+						cpg_inst->assembling = 1;
+						cpg_inst->assembly_buf_ptr = 0;
+				}
+				if (cpg_inst->assembling) {
+				        memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
+							   res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+						cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+
+						if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
+						        cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+																			&group_name,
+																			res_cpg_partial_deliver_callback->nodeid,
+																			res_cpg_partial_deliver_callback->pid,
+																			cpg_inst->assembly_buf,
+																			res_cpg_partial_deliver_callback->msglen);
+								free(cpg_inst->assembly_buf);
+								cpg_inst->assembling = 0;
+						}
+				}
+				break;
+
 			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
 				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
 					break;
@@ -921,6 +969,12 @@ cs_error_t cpg_zcb_mcast_joined (
 	if (error != CS_OK) {
 		return (error);
 	}
+
+	if (msg_len > IPC_REQUEST_SIZE) {
+	        error = CS_ERR_TOO_BIG;
+		goto error_exit;
+	}
+
 	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
 	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
@@ -957,6 +1011,76 @@ error_exit:
 	return (error);
 }
 
+static cs_error_t send_fragments (
+	struct cpg_inst *cpg_inst,
+	cpg_guarantee_t guarantee,
+	size_t msg_len,
+	const struct iovec *iovec,
+	unsigned int iov_len)
+{
+	int i;
+	cs_error_t error = CS_OK;
+	struct iovec iov[2];
+	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+	size_t sent = 0;
+	size_t iov_sent = 0;
+
+	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
+	req_lib_cpg_mcast.guarantee = guarantee;
+	req_lib_cpg_mcast.msglen = msg_len;
+
+	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
+	iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
+
+	i=0;
+	iov_sent = 0 ;
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
+
+	while (error == CS_OK && sent < msg_len) {
+
+	  if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+			        iov[1].iov_len = cpg_inst->max_msg_size;
+			}
+			else {
+			        iov[1].iov_len = iovec[i].iov_len - iov_sent;
+			}
+
+			if (sent == 0) {
+			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+			}
+			else if ((sent + iov[1].iov_len) == msg_len) {
+			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+			}
+			else {
+			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+			}
+	  		  
+			req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+			req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+			iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+
+	resend:
+			error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, 2));
+			if (error == CS_ERR_TRY_AGAIN) {
+			        usleep(10000);
+					goto resend;
+			}
+
+			iov_sent += iov[1].iov_len;
+			sent += iov[1].iov_len;
+
+			/* Next iovec */
+			if (iov_sent >= iovec[i].iov_len) {
+			        i++;
+					iov_sent = 0;
+			}
+	}
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
+
+	return error;
+}
+
+
 cs_error_t cpg_mcast_joined (
 	cpg_handle_t handle,
 	cpg_guarantee_t guarantee,
@@ -979,6 +1103,11 @@ cs_error_t cpg_mcast_joined (
 		msg_len += iovec[i].iov_len;
 	}
 
+	if (msg_len > cpg_inst->max_msg_size) {
+	      error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
+		  goto error_exit;
+	}
+
 	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
 
@@ -994,6 +1123,7 @@ cs_error_t cpg_mcast_joined (
 	error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 
+error_exit:
 	hdb_handle_put (&cpg_handle_t_db, handle);
 
 	return (error);
diff --git a/test/Makefile.am b/test/Makefile.am
index c19e506..bb11518 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,7 +34,7 @@ MAINTAINERCLEANFILES	= Makefile.in
 
 EXTRA_DIST		= ploadstart.sh
 
-noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench \
+noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench cpghum \
 			  testquorum testvotequorum1 testvotequorum2	\
 			  stress_cpgfdget stress_cpgcontext cpgbound testsam \
 			  testcpgzc cpgbenchzc testzcgc stress_cpgzc
@@ -48,6 +48,7 @@ testzcgc_LDADD		= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgzc_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgfdget_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgcontext_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
+cpghum_LDADD            = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz
 testquorum_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la
 testvotequorum1_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
 testvotequorum2_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
diff --git a/test/cpghum.c b/test/cpghum.c
new file mode 100644
index 0000000..0a096db
--- /dev/null
+++ b/test/cpghum.c
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <ccaulfie@xxxxxxxxxx>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <zlib.h>
+#include <libgen.h>
+
+#include <qb/qblog.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static cpg_handle_t handle;
+
+static pthread_t thread;
+
+#ifndef timersub
+#define timersub(a, b, result)						\
+	do {								\
+		(result)->tv_sec = (a)->tv_sec - (b)->tv_sec;		\
+		(result)->tv_usec = (a)->tv_usec - (b)->tv_usec;	\
+		if ((result)->tv_usec < 0) {				\
+			--(result)->tv_sec;				\
+			(result)->tv_usec += 1000000;			\
+		}							\
+	} while (0)
+#endif /* timersub */
+
+static int alarm_notice;
+#define ONE_MEG 1048576
+#define DATASIZE (ONE_MEG*20)
+static char data[DATASIZE];
+static int send_counter = 0;
+static int do_syslog = 0;
+static int quiet = 0;
+static volatile int stopped;
+
+// stats
+static unsigned int length_errors=0;
+static unsigned int crc_errors=0;
+static unsigned int sequence_errors=0;
+static unsigned int packets_sent=0;
+static unsigned int packets_recvd=0;
+static unsigned int send_retries=0;
+static unsigned int send_fails=0;
+
+static void cpg_bm_confchg_fn (
+	cpg_handle_t handle_in,
+	const struct cpg_name *group_name,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+}
+
+static unsigned int g_recv_count;
+static unsigned int g_recv_length;
+static unsigned int g_write_size;
+static int g_recv_counter = 0;
+
+static void cpg_bm_deliver_fn (
+        cpg_handle_t handle_in,
+        const struct cpg_name *group_name,
+        uint32_t nodeid,
+        uint32_t pid,
+        void *msg,
+        size_t msg_len)
+{
+	int *value = msg;
+	uLong crc=0;
+	ulong recv_crc = value[1] & 0xFFFFFFFF;
+
+	packets_recvd++;
+	g_recv_length = msg_len;
+
+	// Basic check, packets should all be the right size
+	if (g_write_size && (msg_len != g_write_size)) {
+	        length_errors++;
+		fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+		if (do_syslog) {
+		    syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+	    }
+	}
+
+	// Sequence counters are incrementing in step?
+	if (*value != g_recv_counter) {
+	        sequence_errors++;
+		fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+		if (do_syslog) {
+		    syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+		}
+		// Catch up or we'll be printing errors for ever
+		g_recv_counter = *value +1;
+	} else {
+		g_recv_counter++;
+	}
+
+	// Check crc
+	crc = crc32(0, NULL, 0);
+	crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
+	if (crc != recv_crc) {
+	        crc_errors++;
+		fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+		if (do_syslog) {
+		    syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+		}
+	}
+
+	g_recv_count++;
+
+}
+
+static cpg_model_v1_data_t model1_data = {
+	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn,
+};
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn
+};
+
+static struct cpg_name group_name = {
+	.value = "cpghum",
+	.length = 7
+};
+
+static void cpg_test (
+	cpg_handle_t handle_in,
+	int write_size,
+	int delay_time,
+	int print_time)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	struct iovec iov;
+	unsigned int res;
+	int i;
+	unsigned int *dataint = (unsigned int *)data;
+	uLong crc;
+
+	alarm_notice = 0;
+	iov.iov_base = data;
+	iov.iov_len = write_size;
+
+	g_recv_count = 0;
+	alarm (print_time);
+
+	gettimeofday (&tv1, NULL);
+	do {
+		dataint[0] = send_counter++;
+		for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
+		  dataint[i] = rand();
+		}
+		crc = crc32(0, NULL, 0);
+		dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
+	resend:
+		res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
+		if (res == CS_ERR_TRY_AGAIN) {
+		    usleep(10000);
+		    send_retries++;
+		    goto resend;
+		}
+		if (res != CS_OK) {
+		    fprintf(stderr, "send failed: %d\n", res);
+		    send_fails++;
+		}
+		else {
+		    packets_sent++;
+		}
+		usleep(delay_time*1000);
+	} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	if (!quiet) {
+	        printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
+		printf ("%5d bytes per write\n", write_size);
+	}
+
+}
+
+static void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
+}
+
+static void sigint_handler (int num)
+{
+	stopped = 1;
+}
+
+static void* dispatch_thread (void *arg)
+{
+	cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+	return NULL;
+}
+
+static void usage(char *cmd)
+{
+        fprintf(stderr, "%s [OPTIONS]\n", cmd);
+        fprintf(stderr, "\n");
+	fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
+	fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
+	fprintf(stderr, "corrupted messages will be detected and reported.\n");
+        fprintf(stderr, "\n");
+        fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
+        fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
+        fprintf(stderr, "\n");
+        fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
+        fprintf(stderr, "and it, obviously, must match that of the sender.\n");
+        fprintf(stderr, "\n");
+        fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
+        fprintf(stderr, "different nodes by using the -n option.\n");
+        fprintf(stderr, "\n");
+        fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
+        fprintf(stderr, "sequence numbers.\n");
+        fprintf(stderr, "\n");
+        fprintf(stderr, "       -w    Write size in Kbytes, default 4\n");
+        fprintf(stderr, "       -W    Write size in bytes, default 4096\n");
+        fprintf(stderr, "       -n    CPG name to use, default 'cpghum'\n");
+        fprintf(stderr, "       -d    Delay between sending packets (mS), default 1000\n");
+        fprintf(stderr, "       -r    Number of repetitions, default 100\n");
+        fprintf(stderr, "       -p    Delay between printing output(S), default 10s\n");
+	fprintf(stderr, "       -l    Listen and check CRCs only, don't send (^C to quit)\n");
+	fprintf(stderr, "       -m    cpg_initialise() model. Default 1.\n");
+	fprintf(stderr, "       -s    Also send errors to syslog (for daemon log correlation).\n");
+	fprintf(stderr, "       -q    Quiet. Don't print messages every 10 seconds (see also -p)\n");
+        fprintf(stderr, "\n");
+}
+
+int main (int argc, char *argv[]) {
+	int i;
+	unsigned int res;
+	const char *cpgname = "cpghum";
+	int opt;
+	int bs;
+	int write_size = 4096;
+	int delay_time = 1000;
+	int repetitions = 100;
+	int print_time = 10;
+	int have_size = 0;
+	int listen_only = 0;
+	int model = 1;
+
+	while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
+	        switch (opt) {
+		case 'w': // Write size in K
+		        bs = atoi(optarg);
+			if (bs > 0) {
+			        write_size = bs*1024;
+				have_size = 1;
+			}
+			break;
+		case 'W': // Write size in bytes
+		        bs = atoi(optarg);
+			if (bs > 0) {
+			        write_size = bs;
+				have_size = 1;
+			}
+			break;
+		case 'n':
+		        strcpy(group_name.value, optarg);
+			group_name.length = strlen(group_name.value);
+			break;
+		case 'd':
+		        delay_time = atoi(optarg);
+			break;
+		case 'r':
+		        repetitions = atoi(optarg);
+			break;
+		case 'p':
+		        print_time = atoi(optarg);
+			break;
+		case 'l':
+		        listen_only = 1;
+			break;
+		case 's':
+		        do_syslog = 1;
+			break;
+		case 'q':
+		        quiet = 1;
+			break;
+		case 'm':
+		        model = atoi(optarg);
+			if (model < 0 || model > 1) {
+			        fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
+				exit(1);
+			}
+			break;
+		case '?':
+		        usage(basename(argv[0]));
+			exit(0);
+		}
+	}
+
+	qb_log_init("cpghum", LOG_USER, LOG_EMERG);
+	qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+	qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+			  QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
+	qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
+
+	g_write_size = write_size;
+
+	signal (SIGALRM, sigalrm_handler);
+	signal (SIGINT, sigint_handler);
+	switch (model) {
+	      case 0:
+		  res = cpg_initialize (&handle, &callbacks);
+		  break;
+	      case 1:
+		  res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
+		  break;
+	      default:
+		  res=999; // can't get here but it keeps the compiler happy
+		  break;
+	}
+
+	if (res != CS_OK) {
+		printf ("cpg_initialize failed with result %d\n", res);
+		exit (1);
+	}
+	pthread_create (&thread, NULL, dispatch_thread, NULL);
+
+	res = cpg_join (handle, &group_name);
+	if (res != CS_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+
+	if (listen_only) {
+	        int secs;
+		if (!quiet) {
+		        printf("-- Listening on CPG %s\n", cpgname);
+			printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
+		}
+
+		/* Only check packet size if specified on the command-line */
+		if (!have_size) {
+		        g_write_size = 0;
+		}
+
+		while (!stopped) {
+		        sleep(1);
+			if (++secs > print_time && !quiet) {
+		   	        printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
+				secs = 0;
+				g_recv_count = 0;
+			}
+		}
+	}
+	else {
+	  /* We're a test app .. we 'know' that IPC_SIZE is 1 Meg. Don't do this at home. */
+	  if ( write_size > ONE_MEG - 1024) {
+	        fprintf(stderr, "INFO: packet size (%d) is larger than IPC_REQUEST_SIZE-1K (%d), libcpg will fragment\n",
+			write_size, ONE_MEG-1024);
+	    }
+	    for (i = 0; i < repetitions && !stopped; i++) {
+		cpg_test (handle, write_size, delay_time, print_time);
+		signal (SIGALRM, sigalrm_handler);
+	    }
+	}
+
+	res = cpg_finalize (handle);
+	if (res != CS_OK) {
+		printf ("cpg_finalize failed with result %d\n", res);
+		exit (1);
+	}
+
+	printf("\n");
+	printf("Stats:\n");
+	if (!listen_only) {
+	    printf("   packets sent:    %d\n", packets_sent);
+	    printf("   send failures:   %d\n", send_fails);
+	    printf("   send retries:    %d\n", send_retries);
+	}
+	if (have_size) {
+	    printf("   length errors:   %d\n", length_errors);
+	}
+	printf("   packets recvd:   %d\n", packets_recvd);
+	printf("   sequence errors: %d\n", sequence_errors);
+	printf("   crc errors:      %d\n", crc_errors);
+	printf("\n");
+	return (0);
+}
_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss

[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux