Re: [PATCH] Allow cpg to send large messages

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 16/02/15 14:10, Christine Caulfield wrote:
> On 13/02/15 12:27, Jan Friesse wrote:
>> Chrissie,
>>
>> Christine Caulfield napsal(a):
>>> It occurs to me that, as this has the potential to break Virtual
>>> Synchrony, there should be an option to either disable message
>>
>> it took me a while until I've found what exactly you mean by break EVS.
>> This is something we (or at least I) totally forgot but it looks like
>> HUGE problem and I don't even think that pcmk is able to handle this
>> situation (I believe you are talking about situation when one node is
>> sending long message, and other node will leave and then join again into
>> membership during long message is sent, so it will not receive that
>> message).
>>
>>> fragmentation or to some indication of the maximum message size that
>>> will not be fragmented.
>>>
>>> Thoughts?
>>
>> I'm thinking about following solutions:
>> - implement deferral of delivery of membership change to client
>> - some kind of recovery... Both of them is like reimplementing totem
>> inside libcpg.
>> - Another solution may be to add extra callback parameter and deliver
>> also list of nodes who received message.
>>
>> Generally, I'm really not very happy with breaking EVS. Yes, loooong
>> messages use case is weird and not so common outside pcmk and yes,
>> satellite nodes will break EVS anyway, but for needle we should stay
>> very conservative.
>>
> 
> Agreed, I didn't realise how bad it could be until late into the
> development here. The 'sledgehammer' solution would be to flag when a
> confchg has happened during the sending of a long message and if that
> happens, invalidate the whole send. It would then means retransmitting
> the whole message again from the start.
> 


... and here it is!

I stopped short of checking the ring state when the message is finally
delivered, it just checks it at each transmission stage. Which is what
happens with normal sends, of course.

Chrissie



diff --git a/exec/cpg.c b/exec/cpg.c
index 3d83982..0bf587b 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -157,6 +157,8 @@ struct cpg_pd {
 	enum cpd_state cpd_state;
 	unsigned int flags;
 	int initial_totem_conf_sent;
+	uint64_t transition_counter; /* These two are used when sending fragmented messages */
+	uint64_t initial_transition_counter;
 	struct list_head list;
 	struct list_head iteration_instance_list_head;
 	struct list_head zcb_mapped_list_head;
@@ -767,6 +769,7 @@ static int notify_lib_joinlist(
 					cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
 
 					api->ipc_dispatch_send (cpd->conn, buf, size);
+					cpd->transition_counter++;
 				}
 				if (left_list_entries) {
 					if (left_list[0].pid == cpd->pid &&
@@ -1974,6 +1977,7 @@ static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *m
 
 	struct iovec req_exec_cpg_iovec[2];
 	struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
 	int msglen = req_lib_cpg_mcast->fraglen;
 	int result;
 	cs_error_t error = CS_ERR_NOT_EXIST;
@@ -1996,6 +2000,16 @@ static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *m
 		break;
 	}
 
+	res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
+	res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
+
+	if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
+		cpd->initial_transition_counter = cpd->transition_counter;
+	}
+	if (cpd->transition_counter != cpd->initial_transition_counter) {
+	        error = CS_ERR_INTERRUPT;
+	}
+
 	if (error == CS_OK) {
 		req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
 		req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
@@ -2019,6 +2033,10 @@ static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *m
 		log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
 			conn, group_name.value, cpd->cpd_state, error);
 	}
+
+	res_lib_cpg_partial_send.header.error = error;
+	api->ipc_response_send (conn, &res_lib_cpg_partial_send,
+				sizeof (res_lib_cpg_partial_send));
 }
 
 /* Mcast message from the library */
diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h
index 55fc4b8..f66fb14 100644
--- a/include/corosync/cpg.h
+++ b/include/corosync/cpg.h
@@ -186,6 +186,13 @@ cs_error_t cpg_fd_get (
 	int *fd);
 
 /**
+ * Get maximum size of a message that will not be fragmented
+ */
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size);
+
+/**
  * Get contexts for a CPG handle
  */
 cs_error_t cpg_context_get (
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index d7000a5..3b2ae63 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -77,6 +77,7 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_ZC_FREE = 15,
 	MESSAGE_RES_CPG_ZC_EXECUTE = 16,
 	MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
+	MESSAGE_RES_CPG_PARTIAL_SEND = 18,
 };
 
 enum lib_cpg_confchg_reason {
@@ -208,6 +209,10 @@ struct res_lib_cpg_local_get {
 	mar_uint32_t local_nodeid __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_send {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+};
+
 struct req_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t guarantee __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 376914c..38f3c43 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -219,7 +219,7 @@ cs_error_t cpg_model_initialize (
 	}
 
 	/* Allow space for corosync internal headers */
-	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;	
+	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
 	cpg_inst->model_data.model = model;
 	cpg_inst->context = context;
 
@@ -301,6 +301,25 @@ cs_error_t cpg_fd_get (
 	return (error);
 }
 
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	*size = cpg_inst->max_msg_size;
+
+	hdb_handle_put (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
 cs_error_t cpg_context_get (
 	cpg_handle_t handle,
 	void **context)
@@ -1022,6 +1041,7 @@ static cs_error_t send_fragments (
 	cs_error_t error = CS_OK;
 	struct iovec iov[2];
 	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
 	size_t sent = 0;
 	size_t iov_sent = 0;
 
@@ -1038,42 +1058,46 @@ static cs_error_t send_fragments (
 
 	while (error == CS_OK && sent < msg_len) {
 
-	  if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
-			        iov[1].iov_len = cpg_inst->max_msg_size;
-			}
-			else {
-			        iov[1].iov_len = iovec[i].iov_len - iov_sent;
-			}
+		if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+			iov[1].iov_len = cpg_inst->max_msg_size;
+		}
+		else {
+		    iov[1].iov_len = iovec[i].iov_len - iov_sent;
+		}
 
-			if (sent == 0) {
-			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
-			}
-			else if ((sent + iov[1].iov_len) == msg_len) {
-			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
-			}
-			else {
-			        req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
-			}
-	  		  
-			req_lib_cpg_mcast.fraglen = iov[1].iov_len;
-			req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
-			iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+		if (sent == 0) {
+		    req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+		}
+		else if ((sent + iov[1].iov_len) == msg_len) {
+		    req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+		}
+		else {
+		    req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+		}
+
+		req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+		iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
 
 	resend:
-			error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, 2));
-			if (error == CS_ERR_TRY_AGAIN) {
-			        usleep(10000);
-					goto resend;
-			}
+		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
+							 &res_lib_cpg_partial_send,
+							 sizeof (res_lib_cpg_partial_send));
+
+		if (error == CS_ERR_TRY_AGAIN) {
+			usleep(10000);
+			goto resend;
+		}
 
-			iov_sent += iov[1].iov_len;
-			sent += iov[1].iov_len;
+		iov_sent += iov[1].iov_len;
+		sent += iov[1].iov_len;
 
-			/* Next iovec */
-			if (iov_sent >= iovec[i].iov_len) {
-			        i++;
-					iov_sent = 0;
-			}
+		/* Next iovec */
+		if (iov_sent >= iovec[i].iov_len) {
+			i++;
+			iov_sent = 0;
+		}
+		error = res_lib_cpg_partial_send.header.error;
 	}
 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 
diff --git a/test/cpghum.c b/test/cpghum.c
index 0a096db..a46addb 100644
--- a/test/cpghum.c
+++ b/test/cpghum.c
@@ -279,7 +279,7 @@ static void usage(char *cmd)
 int main (int argc, char *argv[]) {
 	int i;
 	unsigned int res;
-	const char *cpgname = "cpghum";
+	uint32_t maxsize;
 	int opt;
 	int bs;
 	int write_size = 4096;
@@ -378,7 +378,7 @@ int main (int argc, char *argv[]) {
 	if (listen_only) {
 	        int secs;
 		if (!quiet) {
-		        printf("-- Listening on CPG %s\n", cpgname);
+		        printf("-- Listening on CPG %s\n", group_name.value);
 			printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
 		}
 
@@ -397,10 +397,10 @@ int main (int argc, char *argv[]) {
 		}
 	}
 	else {
-	  /* We're a test app .. we 'know' that IPC_SIZE is 1 Meg. Don't do this at home. */
-	  if ( write_size > ONE_MEG - 1024) {
-	        fprintf(stderr, "INFO: packet size (%d) is larger than IPC_REQUEST_SIZE-1K (%d), libcpg will fragment\n",
-			write_size, ONE_MEG-1024);
+	    cpg_max_atomic_msgsize_get (handle, &maxsize);
+	    if ( write_size > maxsize) {
+	        fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
+			write_size, maxsize);
 	    }
 	    for (i = 0; i < repetitions && !stopped; i++) {
 		cpg_test (handle, write_size, delay_time, print_time);
_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss

[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux