ACK On 11/8/2012 4:08 PM, Jan Friesse wrote: > Patch for support waiting_trans_ack may fail if there is synchronization > happening between delivery of fragmented message. In such situation, > fragmentation layer is waiting for message with correct number, but it > will never arrive. > > Solution is to handle (callback) change of waiting_trans_ack and use > different queue. > > Signed-off-by: Jan Friesse <jfriesse@xxxxxxxxxx> > --- > exec/totemmrp.c | 7 +++- > exec/totemmrp.h | 4 ++- > exec/totempg.c | 87 +++++++++++++++++++++++++++++++++++++++++++++++------- > exec/totemsrp.c | 12 +++++++- > exec/totemsrp.h | 4 ++- > 5 files changed, 97 insertions(+), 17 deletions(-) > > diff --git a/exec/totemmrp.c b/exec/totemmrp.c > index 6ca6093..6166f32 100644 > --- a/exec/totemmrp.c > +++ b/exec/totemmrp.c > @@ -130,7 +130,9 @@ int totemmrp_initialize ( > const unsigned int *member_list, size_t member_list_entries, > const unsigned int *left_list, size_t left_list_entries, > const unsigned int *joined_list, size_t joined_list_entries, > - const struct memb_ring_id *ring_id)) > + const struct memb_ring_id *ring_id), > + void (*waiting_trans_ack_cb_fn) ( > + int waiting_trans_ack)) > { > int result; > pg_deliver_fn = deliver_fn; > @@ -143,7 +145,8 @@ int totemmrp_initialize ( > totem_config, > stats->mrp, > totemmrp_deliver_fn, > - totemmrp_confchg_fn); > + totemmrp_confchg_fn, > + waiting_trans_ack_cb_fn); > > return (result); > } > diff --git a/exec/totemmrp.h b/exec/totemmrp.h > index c900d45..2988c8f 100644 > --- a/exec/totemmrp.h > +++ b/exec/totemmrp.h > @@ -74,7 +74,9 @@ extern int totemmrp_initialize ( > const unsigned int *member_list, size_t member_list_entries, > const unsigned int *left_list, size_t left_list_entries, > const unsigned int *joined_list, size_t joined_list_entries, > - const struct memb_ring_id *ring_id)); > + const struct memb_ring_id *ring_id), > + void (*waiting_trans_ack_cb_fn) ( > + int waiting_trans_ack)); > > extern void totemmrp_finalize (void); > > diff --git a/exec/totempg.c b/exec/totempg.c > index bb8840b..29780b0 100644 > --- a/exec/totempg.c > +++ b/exec/totempg.c > @@ -214,6 +214,10 @@ DECLARE_LIST_INIT(assembly_list_inuse); > > DECLARE_LIST_INIT(assembly_list_free); > > +DECLARE_LIST_INIT(assembly_list_inuse_trans); > + > +DECLARE_LIST_INIT(assembly_list_free_trans); > + > DECLARE_LIST_INIT(totempg_groups_list); > > /* > @@ -234,6 +238,8 @@ static int fragment_continuation = 0; > > static struct iovec iov_delv; > > +static int totempg_waiting_transack = 0; > + > struct totempg_group_instance { > void (*deliver_fn) ( > unsigned int nodeid, > @@ -276,16 +282,32 @@ static int msg_count_send_ok (int msg_count); > > static int byte_count_send_ok (int byte_count); > > +static void totempg_waiting_trans_ack_cb (int waiting_trans_ack) > +{ > + log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack); > + totempg_waiting_transack = waiting_trans_ack; > +} > + > static struct assembly *assembly_ref (unsigned int nodeid) > { > struct assembly *assembly; > struct list_head *list; > + struct list_head *active_assembly_list_inuse; > + struct list_head *active_assembly_list_free; > + > + if (totempg_waiting_transack) { > + active_assembly_list_inuse = &assembly_list_inuse_trans; > + active_assembly_list_free = &assembly_list_free_trans; > + } else { > + active_assembly_list_inuse = &assembly_list_inuse; > + active_assembly_list_free = &assembly_list_free; > + } > > /* > * Search inuse list for node id and return assembly buffer if found > */ > - for (list = assembly_list_inuse.next; > - list != &assembly_list_inuse; > + for (list = active_assembly_list_inuse->next; > + list != active_assembly_list_inuse; > list = list->next) { > > assembly = list_entry (list, struct assembly, list); > @@ -298,10 +320,10 @@ static struct assembly *assembly_ref (unsigned int nodeid) > /* > * Nothing found in inuse list get one from free list if available > */ > - if (list_empty (&assembly_list_free) == 0) { > - assembly = list_entry (assembly_list_free.next, struct assembly, list); > + if (list_empty (active_assembly_list_free) == 0) { > + assembly = list_entry (active_assembly_list_free->next, struct assembly, list); > list_del (&assembly->list); > - list_add (&assembly->list, &assembly_list_inuse); > + list_add (&assembly->list, active_assembly_list_inuse); > assembly->nodeid = nodeid; > assembly->index = 0; > assembly->last_frag_num = 0; > @@ -323,15 +345,56 @@ static struct assembly *assembly_ref (unsigned int nodeid) > assembly->last_frag_num = 0; > assembly->throw_away_mode = THROW_AWAY_INACTIVE; > list_init (&assembly->list); > - list_add (&assembly->list, &assembly_list_inuse); > + list_add (&assembly->list, active_assembly_list_inuse); > > return (assembly); > } > > static void assembly_deref (struct assembly *assembly) > { > + struct list_head *active_assembly_list_free; > + > + if (totempg_waiting_transack) { > + active_assembly_list_free = &assembly_list_free_trans; > + } else { > + active_assembly_list_free = &assembly_list_free; > + } > + > list_del (&assembly->list); > - list_add (&assembly->list, &assembly_list_free); > + list_add (&assembly->list, active_assembly_list_free); > +} > + > +static void assembly_deref_from_normal_and_trans (int nodeid) > +{ > + int j; > + struct list_head *list, *list_next; > + struct list_head *active_assembly_list_inuse; > + struct list_head *active_assembly_list_free; > + struct assembly *assembly; > + > + for (j = 0; j < 2; j++) { > + if (j == 0) { > + active_assembly_list_inuse = &assembly_list_inuse; > + active_assembly_list_free = &assembly_list_free; > + } else { > + active_assembly_list_inuse = &assembly_list_inuse_trans; > + active_assembly_list_free = &assembly_list_free_trans; > + } > + > + for (list = active_assembly_list_inuse->next; > + list != active_assembly_list_inuse; > + list = list_next) { > + > + list_next = list->next; > + assembly = list_entry (list, struct assembly, list); > + > + if (nodeid == assembly->nodeid) { > + list_del (&assembly->list); > + list_add (&assembly->list, active_assembly_list_free); > + } > + } > + } > + > } > > static inline void app_confchg_fn ( > @@ -343,7 +406,6 @@ static inline void app_confchg_fn ( > { > int i; > struct totempg_group_instance *instance; > - struct assembly *assembly; > struct list_head *list; > > /* > @@ -352,9 +414,7 @@ static inline void app_confchg_fn ( > * In the leaving processor's assembly buffer. > */ > for (i = 0; i < left_list_entries; i++) { > - assembly = assembly_ref (left_list[i]); > - list_del (&assembly->list); > - list_add (&assembly->list, &assembly_list_free); > + assembly_deref_from_normal_and_trans (left_list[i]); > } > > for (list = totempg_groups_list.next; > @@ -648,6 +708,8 @@ static void totempg_deliver_fn ( > } > } > } else { > + log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u", > + continuation, assembly->last_frag_num); > assembly->throw_away_mode = THROW_AWAY_ACTIVE; > } > } > @@ -762,7 +824,8 @@ int totempg_initialize ( > totem_config, > &totempg_stats, > totempg_deliver_fn, > - totempg_confchg_fn); > + totempg_confchg_fn, > + totempg_waiting_trans_ack_cb); > > totemmrp_callback_token_create ( > &callback_token_received_handle, > diff --git a/exec/totemsrp.c b/exec/totemsrp.c > index 7fcee9b..97dc88c 100644 > --- a/exec/totemsrp.c > +++ b/exec/totemsrp.c > @@ -463,6 +463,9 @@ struct totemsrp_instance { > > void (*totemsrp_service_ready_fn) (void); > > + void (*totemsrp_waiting_trans_ack_cb_fn) ( > + int waiting_trans_ack); > + > int global_seqno; > > int my_token_held; > @@ -785,7 +788,9 @@ int totemsrp_initialize ( > const unsigned int *member_list, size_t member_list_entries, > const unsigned int *left_list, size_t left_list_entries, > const unsigned int *joined_list, size_t joined_list_entries, > - const struct memb_ring_id *ring_id)) > + const struct memb_ring_id *ring_id), > + void (*waiting_trans_ack_cb_fn) ( > + int waiting_trans_ack)) > { > struct totemsrp_instance *instance; > unsigned int res; > @@ -812,6 +817,9 @@ int totemsrp_initialize ( > > totemsrp_instance_initialize (instance); > > + instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn; > + instance->totemsrp_waiting_trans_ack_cb_fn (1); > + > stats->srp = &instance->stats; > instance->stats.latest_token = 0; > instance->stats.earliest_token = 0; > @@ -1837,6 +1845,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) > left_list, instance->my_left_memb_entries, > 0, 0, &instance->my_ring_id); > instance->waiting_trans_ack = 1; > + instance->totemsrp_waiting_trans_ack_cb_fn (1); > > // TODO we need to filter to ensure we only deliver those > // messages which are part of instance->my_deliver_memb > @@ -4618,4 +4627,5 @@ void totemsrp_trans_ack (void *context) > struct totemsrp_instance *instance = (struct totemsrp_instance *)context; > > instance->waiting_trans_ack = 0; > + instance->totemsrp_waiting_trans_ack_cb_fn (0); > } > diff --git a/exec/totemsrp.h b/exec/totemsrp.h > index 4ad4466..185276f 100644 > --- a/exec/totemsrp.h > +++ b/exec/totemsrp.h > @@ -65,7 +65,9 @@ int totemsrp_initialize ( > const unsigned int *member_list, size_t member_list_entries, > const unsigned int *left_list, size_t left_list_entries, > const unsigned int *joined_list, size_t joined_list_entries, > - const struct memb_ring_id *ring_id)); > + const struct memb_ring_id *ring_id), > + void (*waiting_trans_ack_cb_fn) ( > + int waiting_trans_ack)); > > void totemsrp_finalize (void *srp_context); > > _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss