[PATCH 3/3] Add waiting_trans_ack also to fragmentation layer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Patch for support waiting_trans_ack may fail if there is synchronization
happening between delivery of fragmented message. In such situation,
fragmentation layer is waiting for message with correct number, but it
will never arrive.

Solution is to handle (callback) change of waiting_trans_ack and use
different queue.

Signed-off-by: Jan Friesse <jfriesse@xxxxxxxxxx>
---
 exec/totemmrp.c |    7 +++-
 exec/totemmrp.h |    4 ++-
 exec/totempg.c  |   87 +++++++++++++++++++++++++++++++++++++++++++++++-------
 exec/totemsrp.c |   12 +++++++-
 exec/totemsrp.h |    4 ++-
 5 files changed, 97 insertions(+), 17 deletions(-)

diff --git a/exec/totemmrp.c b/exec/totemmrp.c
index 6ca6093..6166f32 100644
--- a/exec/totemmrp.c
+++ b/exec/totemmrp.c
@@ -130,7 +130,9 @@ int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 	int result;
 	pg_deliver_fn = deliver_fn;
@@ -143,7 +145,8 @@ int totemmrp_initialize (
 		totem_config,
 		stats->mrp,
 		totemmrp_deliver_fn,
-		totemmrp_confchg_fn);
+		totemmrp_confchg_fn,
+		waiting_trans_ack_cb_fn);
 
 	return (result);
 }
diff --git a/exec/totemmrp.h b/exec/totemmrp.h
index c900d45..2988c8f 100644
--- a/exec/totemmrp.h
+++ b/exec/totemmrp.h
@@ -74,7 +74,9 @@ extern int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 extern void totemmrp_finalize (void);
 
diff --git a/exec/totempg.c b/exec/totempg.c
index bb8840b..29780b0 100644
--- a/exec/totempg.c
+++ b/exec/totempg.c
@@ -214,6 +214,10 @@ DECLARE_LIST_INIT(assembly_list_inuse);
 
 DECLARE_LIST_INIT(assembly_list_free);
 
+DECLARE_LIST_INIT(assembly_list_inuse_trans);
+
+DECLARE_LIST_INIT(assembly_list_free_trans);
+
 DECLARE_LIST_INIT(totempg_groups_list);
 
 /*
@@ -234,6 +238,8 @@ static int fragment_continuation = 0;
 
 static struct iovec iov_delv;
 
+static int totempg_waiting_transack = 0;
+
 struct totempg_group_instance {
 	void (*deliver_fn) (
 		unsigned int nodeid,
@@ -276,16 +282,32 @@ static int msg_count_send_ok (int msg_count);
 
 static int byte_count_send_ok (int byte_count);
 
+static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
+{
+	log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
+	totempg_waiting_transack = waiting_trans_ack;
+}
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
 	struct assembly *assembly;
 	struct list_head *list;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_inuse = &assembly_list_inuse_trans;
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_inuse = &assembly_list_inuse;
+		active_assembly_list_free = &assembly_list_free;
+	}
 
 	/*
 	 * Search inuse list for node id and return assembly buffer if found
 	 */
-	for (list = assembly_list_inuse.next;
-		list != &assembly_list_inuse;
+	for (list = active_assembly_list_inuse->next;
+		list != active_assembly_list_inuse;
 		list = list->next) {
 
 		assembly = list_entry (list, struct assembly, list);
@@ -298,10 +320,10 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	/*
 	 * Nothing found in inuse list get one from free list if available
 	 */
-	if (list_empty (&assembly_list_free) == 0) {
-		assembly = list_entry (assembly_list_free.next, struct assembly, list);
+	if (list_empty (active_assembly_list_free) == 0) {
+		assembly = list_entry (active_assembly_list_free->next, struct assembly, list);
 		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_inuse);
+		list_add (&assembly->list, active_assembly_list_inuse);
 		assembly->nodeid = nodeid;
 		assembly->index = 0;
 		assembly->last_frag_num = 0;
@@ -323,15 +345,56 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	assembly->last_frag_num = 0;
 	assembly->throw_away_mode = THROW_AWAY_INACTIVE;
 	list_init (&assembly->list);
-	list_add (&assembly->list, &assembly_list_inuse);
+	list_add (&assembly->list, active_assembly_list_inuse);
 
 	return (assembly);
 }
 
 static void assembly_deref (struct assembly *assembly)
 {
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_free = &assembly_list_free;
+	}
+
 	list_del (&assembly->list);
-	list_add (&assembly->list, &assembly_list_free);
+	list_add (&assembly->list, active_assembly_list_free);
+}
+
+static void assembly_deref_from_normal_and_trans (int nodeid)
+{
+	int j;
+	struct list_head *list, *list_next;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+	struct assembly *assembly;
+
+	for (j = 0; j < 2; j++) {
+		if (j == 0) {
+			active_assembly_list_inuse = &assembly_list_inuse;
+			active_assembly_list_free = &assembly_list_free;
+		} else {
+			active_assembly_list_inuse = &assembly_list_inuse_trans;
+			active_assembly_list_free = &assembly_list_free_trans;
+		}
+
+		for (list = active_assembly_list_inuse->next;
+			list != active_assembly_list_inuse;
+			list = list_next) {
+
+			list_next = list->next;
+			assembly = list_entry (list, struct assembly, list);
+
+			if (nodeid == assembly->nodeid) {
+				list_del (&assembly->list);
+				list_add (&assembly->list, active_assembly_list_free);
+			}
+		}
+	}
+
 }
 
 static inline void app_confchg_fn (
@@ -343,7 +406,6 @@ static inline void app_confchg_fn (
 {
 	int i;
 	struct totempg_group_instance *instance;
-	struct assembly *assembly;
 	struct list_head *list;
 
 	/*
@@ -352,9 +414,7 @@ static inline void app_confchg_fn (
 	 * In the leaving processor's assembly buffer.
 	 */
 	for (i = 0; i < left_list_entries; i++) {
-		assembly = assembly_ref (left_list[i]);
-		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_free);
+		assembly_deref_from_normal_and_trans (left_list[i]);
 	}
 
 	for (list = totempg_groups_list.next;
@@ -648,6 +708,8 @@ static void totempg_deliver_fn (
 				}
 			}
 		} else {
+			log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
+					continuation, assembly->last_frag_num);
 			assembly->throw_away_mode = THROW_AWAY_ACTIVE;
 		}
 	}
@@ -762,7 +824,8 @@ int totempg_initialize (
 		totem_config,
 		&totempg_stats,
 		totempg_deliver_fn,
-		totempg_confchg_fn);
+		totempg_confchg_fn,
+		totempg_waiting_trans_ack_cb);
 
 	totemmrp_callback_token_create (
 		&callback_token_received_handle,
diff --git a/exec/totemsrp.c b/exec/totemsrp.c
index 7fcee9b..97dc88c 100644
--- a/exec/totemsrp.c
+++ b/exec/totemsrp.c
@@ -463,6 +463,9 @@ struct totemsrp_instance {
 
         void (*totemsrp_service_ready_fn) (void);
 
+	void (*totemsrp_waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack);
+
 	int global_seqno;
 
 	int my_token_held;
@@ -785,7 +788,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 	struct totemsrp_instance *instance;
 	unsigned int res;
@@ -812,6 +817,9 @@ int totemsrp_initialize (
 
 	totemsrp_instance_initialize (instance);
 
+	instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
+
 	stats->srp = &instance->stats;
 	instance->stats.latest_token = 0;
 	instance->stats.earliest_token = 0;
@@ -1837,6 +1845,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 		left_list, instance->my_left_memb_entries,
 		0, 0, &instance->my_ring_id);
 	instance->waiting_trans_ack = 1;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
 
 // TODO we need to filter to ensure we only deliver those
 // messages which are part of instance->my_deliver_memb
@@ -4618,4 +4627,5 @@ void totemsrp_trans_ack (void *context)
 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
 
 	instance->waiting_trans_ack = 0;
+	instance->totemsrp_waiting_trans_ack_cb_fn (0);
 }
diff --git a/exec/totemsrp.h b/exec/totemsrp.h
index 4ad4466..185276f 100644
--- a/exec/totemsrp.h
+++ b/exec/totemsrp.h
@@ -65,7 +65,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 void totemsrp_finalize (void *srp_context);
 
-- 
1.7.1

_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss


[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux