Re: [PATCH] Allow cpg to send large messages

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 05/03/15 13:42, Jan Friesse wrote:
> Chrissie,
> patch looks generally good. Can you please fix indentation so:
> - Allocate a buffer to contain a full message. comment is not aligned
> - cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle, &group_name, ...
>   looks like 6 tabs + 8 spaces on first line and 19 tabs on second and
> following lines
> - iov[1].iov_len = iovec[i].iov_len - iov_sent is aligned with 4 spaces
> instead of one tab
> ?
> 
> Also (and this is bigger problem) I'm not entirely happy with:
> 
> usleep(10000);
> goto resend;
> 
> part. I mean, yes it make sense, on the other hand, it may cause app to
> block potentially forever. What do you thing about limited number of
> retries?
> 
> Last thing is cpghum_LDADD. I'm not entirely sure if libz is really
> dependency on some library corosync is using. If so, we don't
> necessarily need to check it's presence, but if not, we have to check it
> in configure script... Actually, it would be nice to test it anyway.
> 


I've checked all the indentation, and fixed my .emacs file so it should
remain sane from now on, I hope. Here's a revised patch with fixed
indentation, a retry count in send (value determined empirically with
slow VMs and large, 8M, messages) and a test for libz in configure
(which doesn't seem to be used elsewhere)

Signed-Off-By: Christine Caulfield <ccaulfie@xxxxxxxxxx>


Chrissie



diff --git a/configure.ac b/configure.ac
index 0c371aa..b394329 100644
--- a/configure.ac
+++ b/configure.ac
@@ -163,6 +163,7 @@ AC_CHECK_LIB([pthread], [pthread_create])
 AC_CHECK_LIB([socket], [socket])
 AC_CHECK_LIB([nsl], [t_open])
 AC_CHECK_LIB([rt], [sched_getscheduler])
+AC_CHECK_LIB([z], [crc32])
 
 # Checks for library functions.
 AC_FUNC_ALLOCA
diff --git a/exec/cpg.c b/exec/cpg.c
index 1c6fbb9..a18b850 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
 	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
 	MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
+	MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
 };
 
 struct zcb_mapped {
@@ -156,6 +157,8 @@ struct cpg_pd {
 	enum cpd_state cpd_state;
 	unsigned int flags;
 	int initial_totem_conf_sent;
+	uint64_t transition_counter; /* These two are used when sending fragmented messages */
+	uint64_t initial_transition_counter;
 	struct list_head list;
 	struct list_head iteration_instance_list_head;
 	struct list_head zcb_mapped_list_head;
@@ -224,6 +227,10 @@ static void message_handler_req_exec_cpg_mcast (
 	const void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid);
+
 static void message_handler_req_exec_cpg_downlist_old (
 	const void *message,
 	unsigned int nodeid);
@@ -238,6 +245,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
+
 static void exec_cpg_downlist_endian_convert_old (void *msg);
 
 static void exec_cpg_downlist_endian_convert (void *msg);
@@ -250,6 +259,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
 
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
 
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
+
 static void message_handler_req_lib_cpg_membership (void *conn,
 						    const void *message);
 
@@ -383,7 +394,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
 		.lib_handler_fn				= message_handler_req_lib_cpg_zc_execute,
 		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
 	},
-
+	{ /* 12 */
+		.lib_handler_fn				= message_handler_req_lib_cpg_partial_mcast,
+		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
+	},
 
 };
 
@@ -413,6 +427,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
 		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
 		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
 	},
+	{ /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
+		.exec_handler_fn	= message_handler_req_exec_cpg_partial_mcast,
+		.exec_endian_convert_fn	= exec_cpg_partial_mcast_endian_convert
+	},
 };
 
 struct corosync_service_engine cpg_service_engine = {
@@ -457,6 +475,17 @@ struct req_exec_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_exec_cpg_partial_mcast {
+	struct qb_ipc_request_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_message_source_t source __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct req_exec_cpg_downlist_old {
 	struct qb_ipc_request_header header __attribute__((aligned(8)));
 	mar_uint32_t left_nodes __attribute__((aligned(8)));
@@ -740,6 +769,7 @@ static int notify_lib_joinlist(
 					cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
 
 					api->ipc_dispatch_send (cpd->conn, buf, size);
+					cpd->transition_counter++;
 				}
 				if (left_list_entries) {
 					if (left_list[0].pid == cpd->pid &&
@@ -1186,6 +1216,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
 	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
 }
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
+{
+	struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
+
+	swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
+	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
+	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
+	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
+	req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
+	req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
+	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
+}
+
 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
 	struct list_head *iter;
 
@@ -1453,6 +1496,68 @@ static void message_handler_req_exec_cpg_mcast (
 	}
 }
 
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid)
+{
+	const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
+	struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
+	int msglen = req_exec_cpg_mcast->fraglen;
+	struct list_head *iter, *pi_iter;
+	struct cpg_pd *cpd;
+	struct iovec iovec[2];
+	int known_node = 0;
+
+	log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
+
+	res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
+	res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
+	res_lib_cpg_mcast.fraglen = msglen;
+	res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
+	res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
+	res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
+	res_lib_cpg_mcast.nodeid = nodeid;
+
+	memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
+	       sizeof(mar_cpg_name_t));
+	iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
+	iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
+
+	iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
+	iovec[1].iov_len = msglen;
+
+	for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
+		cpd = list_entry(iter, struct cpg_pd, list);
+		iter = iter->next;
+
+		if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
+		    && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
+
+			if (!known_node) {
+				/* Try to find, if we know the node */
+				for (pi_iter = process_info_list_head.next;
+				     pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
+
+					struct process_info *pi = list_entry (pi_iter, struct process_info, list);
+
+					if (pi->nodeid == nodeid &&
+					    mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
+						known_node = 1;
+						break;
+					}
+				}
+			}
+
+			if (!known_node) {
+				log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
+				return ;
+			}
+
+			api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
+		}
+	}
+}
+
 
 static int cpg_exec_send_downlist(void)
 {
@@ -1864,6 +1969,77 @@ static void message_handler_req_lib_cpg_zc_free (
 		res_header.size);
 }
 
+/* Fragmented mcast message from the library */
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
+{
+	const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
+	struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
+	mar_cpg_name_t group_name = cpd->group_name;
+
+	struct iovec req_exec_cpg_iovec[2];
+	struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+	int msglen = req_lib_cpg_mcast->fraglen;
+	int result;
+	cs_error_t error = CS_ERR_NOT_EXIST;
+
+	log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
+	log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
+
+	switch (cpd->cpd_state) {
+	case CPD_STATE_UNJOINED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_LEAVE_STARTED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_JOIN_STARTED:
+		error = CS_OK;
+		break;
+	case CPD_STATE_JOIN_COMPLETED:
+		error = CS_OK;
+		break;
+	}
+
+	res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
+	res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
+
+	if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
+		cpd->initial_transition_counter = cpd->transition_counter;
+	}
+	if (cpd->transition_counter != cpd->initial_transition_counter) {
+		error = CS_ERR_INTERRUPT;
+	}
+
+	if (error == CS_OK) {
+		req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
+		req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
+							       MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
+		req_exec_cpg_mcast.pid = cpd->pid;
+		req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
+		req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
+		req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
+		api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
+		memcpy(&req_exec_cpg_mcast.group_name, &group_name,
+		       sizeof(mar_cpg_name_t));
+
+		req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+		req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
+		req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+		req_exec_cpg_iovec[1].iov_len = msglen;
+
+		result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
+		assert(result == 0);
+	} else {
+		log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
+			   conn, group_name.value, cpd->cpd_state, error);
+	}
+
+	res_lib_cpg_partial_send.header.error = error;
+	api->ipc_response_send (conn, &res_lib_cpg_partial_send,
+				sizeof (res_lib_cpg_partial_send));
+}
+
 /* Mcast message from the library */
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
 {
diff --git a/exec/main.c b/exec/main.c
index cd972b5..e6a0b08 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -1200,7 +1200,7 @@ int main (int argc, char **argv, char **envp)
 	/* default configuration
 	 */
 	background = 1;
-	setprio = 1;
+	setprio = 0;
 	testonly = 0;
 
 	while ((ch = getopt (argc, argv, "fprtv")) != EOF) {
diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h
index 55fc4b8..f66fb14 100644
--- a/include/corosync/cpg.h
+++ b/include/corosync/cpg.h
@@ -186,6 +186,13 @@ cs_error_t cpg_fd_get (
 	int *fd);
 
 /**
+ * Get maximum size of a message that will not be fragmented
+ */
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size);
+
+/**
  * Get contexts for a CPG handle
  */
 cs_error_t cpg_context_get (
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index a95335a..5008acf 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2011 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -55,6 +55,7 @@ enum req_cpg_types {
 	MESSAGE_REQ_CPG_ZC_ALLOC = 9,
 	MESSAGE_REQ_CPG_ZC_FREE = 10,
 	MESSAGE_REQ_CPG_ZC_EXECUTE = 11,
+	MESSAGE_REQ_CPG_PARTIAL_MCAST = 12,
 };
 
 enum res_cpg_types {
@@ -75,6 +76,8 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_ZC_ALLOC = 14,
 	MESSAGE_RES_CPG_ZC_FREE = 15,
 	MESSAGE_RES_CPG_ZC_EXECUTE = 16,
+	MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
+	MESSAGE_RES_CPG_PARTIAL_SEND = 18,
 };
 
 enum lib_cpg_confchg_reason {
@@ -85,6 +88,12 @@ enum lib_cpg_confchg_reason {
 	CONFCHG_CPG_REASON_PROCDOWN = 5
 };
 
+enum lib_cpg_partial_types {
+	LIBCPG_PARTIAL_FIRST = 1,
+	LIBCPG_PARTIAL_CONTINUED = 2,
+	LIBCPG_PARTIAL_LAST = 3,
+};
+
 typedef struct {
 	uint32_t length __attribute__((aligned(8)));
 	char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8)));
@@ -200,6 +209,10 @@ struct res_lib_cpg_local_get {
 	mar_uint32_t local_nodeid __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_send {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+};
+
 struct req_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t guarantee __attribute__((aligned(8)));
@@ -207,6 +220,15 @@ struct req_lib_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_lib_cpg_partial_mcast {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_uint32_t guarantee __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 };
@@ -223,6 +245,17 @@ struct res_lib_cpg_deliver_callback {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_deliver_callback {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t nodeid __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_flowcontrol_callback {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t flow_control_state __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 4b92f44..037e8a9 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -1,7 +1,7 @@
 /*
  * vi: set autoindent tabstop=4 shiftwidth=4 :
  *
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -70,6 +70,12 @@
 #endif
 
 /*
+ * Maximum number of times to retry a send when transmitting
+ * a large message fragment
+ */
+#define MAX_RETRIES 100
+
+/*
  * ZCB files have following umask (umask is same as used in libqb)
  */
 #define CPG_MEMORY_MAP_UMASK		077
@@ -83,6 +89,14 @@ struct cpg_inst {
 		cpg_model_v1_data_t model_v1_data;
 	};
 	struct list_head iteration_list_head;
+    uint32_t max_msg_size;
+    char *assembly_buf;
+    uint32_t assembly_buf_ptr;
+    int assembling; /* Flag that says we have started assembling a message.
+					 * It's here to catch the situation where a node joins
+					 * the cluster/group in the middle of a CPG message send
+					 * so we don't pass on a partial message to the client.
+					 */
 };
 static void cpg_inst_free (void *inst);
 
@@ -210,6 +224,8 @@ cs_error_t cpg_model_initialize (
 		}
 	}
 
+	/* Allow space for corosync internal headers */
+	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
 	cpg_inst->model_data.model = model;
 	cpg_inst->context = context;
 
@@ -291,6 +307,25 @@ cs_error_t cpg_fd_get (
 	return (error);
 }
 
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	*size = cpg_inst->max_msg_size;
+
+	hdb_handle_put (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
 cs_error_t cpg_context_get (
 	cpg_handle_t handle,
 	void **context)
@@ -339,6 +374,7 @@ cs_error_t cpg_dispatch (
 	struct cpg_inst *cpg_inst;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+	struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
 	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
 	struct cpg_inst cpg_inst_copy;
 	struct qb_ipc_response_header *dispatch_data;
@@ -361,7 +397,7 @@ cs_error_t cpg_dispatch (
 
 	/*
 	 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
-	 * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
+	 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
 	 */
 	if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 		timeout = 0;
@@ -428,6 +464,43 @@ cs_error_t cpg_dispatch (
 					res_cpg_deliver_callback->msglen);
 				break;
 
+			case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
+				res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
+
+				marshall_from_mar_cpg_name_t (
+					&group_name,
+					&res_cpg_partial_deliver_callback->group_name);
+
+				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+					/*
+					 * Allocate a buffer to contain a full message.
+					 */
+					cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+					if (!cpg_inst->assembly_buf) {
+						error = CS_ERR_NO_MEMORY;
+						goto error_put;
+					}
+					cpg_inst->assembling = 1;
+					cpg_inst->assembly_buf_ptr = 0;
+				}
+				if (cpg_inst->assembling) {
+					memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
+					       res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+					cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+
+					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
+						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+							&group_name,
+							res_cpg_partial_deliver_callback->nodeid,
+							res_cpg_partial_deliver_callback->pid,
+							cpg_inst->assembly_buf,
+							res_cpg_partial_deliver_callback->msglen);
+						free(cpg_inst->assembly_buf);
+						cpg_inst->assembling = 0;
+					}
+				}
+				break;
+
 			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
 				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
 					break;
@@ -921,6 +994,12 @@ cs_error_t cpg_zcb_mcast_joined (
 	if (error != CS_OK) {
 		return (error);
 	}
+
+	if (msg_len > IPC_REQUEST_SIZE) {
+		error = CS_ERR_TOO_BIG;
+		goto error_exit;
+	}
+
 	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
 	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
@@ -957,6 +1036,88 @@ error_exit:
 	return (error);
 }
 
+static cs_error_t send_fragments (
+	struct cpg_inst *cpg_inst,
+	cpg_guarantee_t guarantee,
+	size_t msg_len,
+	const struct iovec *iovec,
+	unsigned int iov_len)
+{
+	int i;
+	cs_error_t error = CS_OK;
+	struct iovec iov[2];
+	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+	size_t sent = 0;
+	size_t iov_sent = 0;
+	int retry_count;
+
+	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
+	req_lib_cpg_mcast.guarantee = guarantee;
+	req_lib_cpg_mcast.msglen = msg_len;
+
+	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
+	iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
+
+	i=0;
+	iov_sent = 0 ;
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
+
+	while (error == CS_OK && sent < msg_len) {
+
+		retry_count = 0;
+		if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+			iov[1].iov_len = cpg_inst->max_msg_size;
+		}
+		else {
+			iov[1].iov_len = iovec[i].iov_len - iov_sent;
+		}
+
+		if (sent == 0) {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+		}
+		else if ((sent + iov[1].iov_len) == msg_len) {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+		}
+		else {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+		}
+
+		req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+		iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+
+	resend:
+		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
+							 &res_lib_cpg_partial_send,
+							 sizeof (res_lib_cpg_partial_send));
+
+		if (error == CS_ERR_TRY_AGAIN) {
+			fprintf(stderr, "sleep. counter=%d\n", retry_count);
+			if (++retry_count > MAX_RETRIES) {
+				goto error_exit;
+			}
+			usleep(10000);
+			goto resend;
+		}
+
+		iov_sent += iov[1].iov_len;
+		sent += iov[1].iov_len;
+
+		/* Next iovec */
+		if (iov_sent >= iovec[i].iov_len) {
+			i++;
+			iov_sent = 0;
+		}
+		error = res_lib_cpg_partial_send.header.error;
+	}
+error_exit:
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
+
+	return error;
+}
+
+
 cs_error_t cpg_mcast_joined (
 	cpg_handle_t handle,
 	cpg_guarantee_t guarantee,
@@ -979,6 +1140,11 @@ cs_error_t cpg_mcast_joined (
 		msg_len += iovec[i].iov_len;
 	}
 
+	if (msg_len > cpg_inst->max_msg_size) {
+		error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
+		goto error_exit;
+	}
+
 	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
 
@@ -994,6 +1160,7 @@ cs_error_t cpg_mcast_joined (
 	error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 
+error_exit:
 	hdb_handle_put (&cpg_handle_t_db, handle);
 
 	return (error);
diff --git a/test/Makefile.am b/test/Makefile.am
index c19e506..bb11518 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,7 +34,7 @@ MAINTAINERCLEANFILES	= Makefile.in
 
 EXTRA_DIST		= ploadstart.sh
 
-noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench \
+noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench cpghum \
 			  testquorum testvotequorum1 testvotequorum2	\
 			  stress_cpgfdget stress_cpgcontext cpgbound testsam \
 			  testcpgzc cpgbenchzc testzcgc stress_cpgzc
@@ -48,6 +48,7 @@ testzcgc_LDADD		= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgzc_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgfdget_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgcontext_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
+cpghum_LDADD            = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz
 testquorum_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la
 testvotequorum1_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
 testvotequorum2_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
diff --git a/test/cpghum.c b/test/cpghum.c
new file mode 100644
index 0000000..79184e8
--- /dev/null
+++ b/test/cpghum.c
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <ccaulfie@xxxxxxxxxx>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <zlib.h>
+#include <libgen.h>
+
+#include <qb/qblog.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static cpg_handle_t handle;
+
+static pthread_t thread;
+
+#ifndef timersub
+#define timersub(a, b, result)						\
+	do {								\
+		(result)->tv_sec = (a)->tv_sec - (b)->tv_sec;		\
+		(result)->tv_usec = (a)->tv_usec - (b)->tv_usec;	\
+		if ((result)->tv_usec < 0) {				\
+			--(result)->tv_sec;				\
+			(result)->tv_usec += 1000000;			\
+		}							\
+	} while (0)
+#endif /* timersub */
+
+static int alarm_notice;
+#define ONE_MEG 1048576
+#define DATASIZE (ONE_MEG*20)
+static char data[DATASIZE];
+static int send_counter = 0;
+static int do_syslog = 0;
+static int quiet = 0;
+static volatile int stopped;
+
+// stats
+static unsigned int length_errors=0;
+static unsigned int crc_errors=0;
+static unsigned int sequence_errors=0;
+static unsigned int packets_sent=0;
+static unsigned int packets_recvd=0;
+static unsigned int send_retries=0;
+static unsigned int send_fails=0;
+
+static void cpg_bm_confchg_fn (
+	cpg_handle_t handle_in,
+	const struct cpg_name *group_name,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+}
+
+static unsigned int g_recv_count;
+static unsigned int g_recv_length;
+static unsigned int g_write_size;
+static int g_recv_counter = 0;
+
+static void cpg_bm_deliver_fn (
+	cpg_handle_t handle_in,
+	const struct cpg_name *group_name,
+	uint32_t nodeid,
+	uint32_t pid,
+	void *msg,
+	size_t msg_len)
+{
+	int *value = msg;
+	uLong crc=0;
+	ulong recv_crc = value[1] & 0xFFFFFFFF;
+
+	packets_recvd++;
+	g_recv_length = msg_len;
+
+	// Basic check, packets should all be the right size
+	if (g_write_size && (msg_len != g_write_size)) {
+		length_errors++;
+		fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+		if (do_syslog) {
+			syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+		}
+	}
+
+	// Sequence counters are incrementing in step?
+	if (*value != g_recv_counter) {
+		sequence_errors++;
+		fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+		if (do_syslog) {
+			syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+		}
+		// Catch up or we'll be printing errors for ever
+		g_recv_counter = *value +1;
+	} else {
+		g_recv_counter++;
+	}
+
+	// Check crc
+	crc = crc32(0, NULL, 0);
+	crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
+	if (crc != recv_crc) {
+		crc_errors++;
+		fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+		if (do_syslog) {
+			syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+		}
+	}
+
+	g_recv_count++;
+
+}
+
+static cpg_model_v1_data_t model1_data = {
+	.cpg_deliver_fn		= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn,
+};
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn		= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn
+};
+
+static struct cpg_name group_name = {
+	.value = "cpghum",
+	.length = 7
+};
+
+static void cpg_test (
+	cpg_handle_t handle_in,
+	int write_size,
+	int delay_time,
+	int print_time)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	struct iovec iov;
+	unsigned int res;
+	int i;
+	unsigned int *dataint = (unsigned int *)data;
+	uLong crc;
+
+	alarm_notice = 0;
+	iov.iov_base = data;
+	iov.iov_len = write_size;
+
+	g_recv_count = 0;
+	alarm (print_time);
+
+	gettimeofday (&tv1, NULL);
+	do {
+		dataint[0] = send_counter++;
+		for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
+			dataint[i] = rand();
+		}
+		crc = crc32(0, NULL, 0);
+		dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
+	resend:
+		res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
+		if (res == CS_ERR_TRY_AGAIN) {
+			usleep(10000);
+			send_retries++;
+			goto resend;
+		}
+		if (res != CS_OK) {
+			fprintf(stderr, "send failed: %d\n", res);
+			send_fails++;
+		}
+		else {
+			packets_sent++;
+		}
+		usleep(delay_time*1000);
+	} while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	if (!quiet) {
+		printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
+		printf ("%5d bytes per write\n", write_size);
+	}
+
+}
+
+static void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
+}
+
+static void sigint_handler (int num)
+{
+	stopped = 1;
+}
+
+static void* dispatch_thread (void *arg)
+{
+	cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+	return NULL;
+}
+
+static void usage(char *cmd)
+{
+	fprintf(stderr, "%s [OPTIONS]\n", cmd);
+	fprintf(stderr, "\n");
+	fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
+	fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
+	fprintf(stderr, "corrupted messages will be detected and reported.\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
+	fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
+	fprintf(stderr, "and it, obviously, must match that of the sender.\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
+	fprintf(stderr, "different nodes by using the -n option.\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
+	fprintf(stderr, "sequence numbers.\n");
+	fprintf(stderr, "\n");
+	fprintf(stderr, "	-w    Write size in Kbytes, default 4\n");
+	fprintf(stderr, "	-W    Write size in bytes, default 4096\n");
+	fprintf(stderr, "	-n    CPG name to use, default 'cpghum'\n");
+	fprintf(stderr, "	-d    Delay between sending packets (mS), default 1000\n");
+	fprintf(stderr, "	-r    Number of repetitions, default 100\n");
+	fprintf(stderr, "	-p    Delay between printing output(S), default 10s\n");
+	fprintf(stderr, "	-l    Listen and check CRCs only, don't send (^C to quit)\n");
+	fprintf(stderr, "	-m    cpg_initialise() model. Default 1.\n");
+	fprintf(stderr, "	-s    Also send errors to syslog (for daemon log correlation).\n");
+	fprintf(stderr, "	-q    Quiet. Don't print messages every 10 seconds (see also -p)\n");
+	fprintf(stderr, "\n");
+}
+
+int main (int argc, char *argv[]) {
+	int i;
+	unsigned int res;
+	uint32_t maxsize;
+	int opt;
+	int bs;
+	int write_size = 4096;
+	int delay_time = 1000;
+	int repetitions = 100;
+	int print_time = 10;
+	int have_size = 0;
+	int listen_only = 0;
+	int model = 1;
+
+	while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
+		switch (opt) {
+		case 'w': // Write size in K
+			bs = atoi(optarg);
+			if (bs > 0) {
+				write_size = bs*1024;
+				have_size = 1;
+			}
+			break;
+		case 'W': // Write size in bytes
+			bs = atoi(optarg);
+			if (bs > 0) {
+				write_size = bs;
+				have_size = 1;
+			}
+			break;
+		case 'n':
+			strcpy(group_name.value, optarg);
+			group_name.length = strlen(group_name.value);
+			break;
+		case 'd':
+			delay_time = atoi(optarg);
+			break;
+		case 'r':
+			repetitions = atoi(optarg);
+			break;
+		case 'p':
+			print_time = atoi(optarg);
+			break;
+		case 'l':
+			listen_only = 1;
+			break;
+		case 's':
+			do_syslog = 1;
+			break;
+		case 'q':
+			quiet = 1;
+			break;
+		case 'm':
+			model = atoi(optarg);
+			if (model < 0 || model > 1) {
+				fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
+				exit(1);
+			}
+			break;
+		case '?':
+			usage(basename(argv[0]));
+			exit(0);
+		}
+	}
+
+	qb_log_init("cpghum", LOG_USER, LOG_EMERG);
+	qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+	qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+			  QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
+	qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
+
+	g_write_size = write_size;
+
+	signal (SIGALRM, sigalrm_handler);
+	signal (SIGINT, sigint_handler);
+	switch (model) {
+	case 0:
+		res = cpg_initialize (&handle, &callbacks);
+		break;
+	case 1:
+		res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
+		break;
+	default:
+		res=999; // can't get here but it keeps the compiler happy
+		break;
+	}
+
+	if (res != CS_OK) {
+		printf ("cpg_initialize failed with result %d\n", res);
+		exit (1);
+	}
+	pthread_create (&thread, NULL, dispatch_thread, NULL);
+
+	res = cpg_join (handle, &group_name);
+	if (res != CS_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+
+	if (listen_only) {
+		int secs;
+		if (!quiet) {
+			printf("-- Listening on CPG %s\n", group_name.value);
+			printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
+		}
+
+		/* Only check packet size if specified on the command-line */
+		if (!have_size) {
+			g_write_size = 0;
+		}
+
+		while (!stopped) {
+			sleep(1);
+			if (++secs > print_time && !quiet) {
+				printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
+				secs = 0;
+				g_recv_count = 0;
+			}
+		}
+	}
+	else {
+		cpg_max_atomic_msgsize_get (handle, &maxsize);
+		if ( write_size > maxsize) {
+			fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
+				write_size, maxsize);
+		}
+		for (i = 0; i < repetitions && !stopped; i++) {
+			cpg_test (handle, write_size, delay_time, print_time);
+			signal (SIGALRM, sigalrm_handler);
+		}
+	}
+
+	res = cpg_finalize (handle);
+	if (res != CS_OK) {
+		printf ("cpg_finalize failed with result %d\n", res);
+		exit (1);
+	}
+
+	printf("\n");
+	printf("Stats:\n");
+	if (!listen_only) {
+		printf("   packets sent:    %d\n", packets_sent);
+		printf("   send failures:   %d\n", send_fails);
+		printf("   send retries:    %d\n", send_retries);
+	}
+	if (have_size) {
+		printf("   length errors:   %d\n", length_errors);
+	}
+	printf("   packets recvd:   %d\n", packets_recvd);
+	printf("   sequence errors: %d\n", sequence_errors);
+	printf("   crc errors:	    %d\n", crc_errors);
+	printf("\n");
+	return (0);
+}
_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss

[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux