This patch creates a special message queue for synchronization messages. This prevents a situation in which messages are queued in the new_message_queue but have not yet been originated from corrupting the synchronization process. Honzaf agreed to test this patch since he has the reproducer in hand. Signed-off-by: Steven Dake <sdake@xxxxxxxxxx> --- exec/main.c | 4 +++ exec/totemmrp.c | 5 ++++ exec/totemmrp.h | 2 ++ exec/totempg.c | 5 ++++ exec/totemsrp.c | 56 +++++++++++++++++++++++++++++++++++----- exec/totemsrp.h | 3 +++ include/corosync/totem/totempg.h | 2 ++ 7 files changed, 71 insertions(+), 6 deletions(-) diff --git a/exec/main.c b/exec/main.c index 5bccb87..c4df331 100644 --- a/exec/main.c +++ b/exec/main.c @@ -275,6 +275,10 @@ static void corosync_sync_completed (void) cs_ipcs_sync_state_changed(sync_in_process); cs_ipc_allow_connections(1); + /* + * Inform totem to start using new message queue again + */ + totempg_trans_ack(); } static int corosync_sync_callbacks_retrieve ( diff --git a/exec/totemmrp.c b/exec/totemmrp.c index 84ad031..6ca6093 100644 --- a/exec/totemmrp.c +++ b/exec/totemmrp.c @@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void) { totemsrp_threaded_mode_enable (totemsrp_context); } + +void totemmrp_trans_ack (void) +{ + totemsrp_trans_ack (totemsrp_context); +} diff --git a/exec/totemmrp.h b/exec/totemmrp.h index 1977918..c900d45 100644 --- a/exec/totemmrp.h +++ b/exec/totemmrp.h @@ -131,4 +131,6 @@ extern int totemmrp_member_remove ( void totemmrp_threaded_mode_enable (void); +void totemmrp_trans_ack (void); + #endif /* TOTEMMRP_H_DEFINED */ diff --git a/exec/totempg.c b/exec/totempg.c index abaaf6b..922929e 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1472,3 +1472,8 @@ void totempg_threaded_mode_enable (void) totemmrp_threaded_mode_enable (); } +void totempg_trans_ack (void) +{ + totemmrp_trans_ack (); +} + diff --git a/exec/totemsrp.c b/exec/totemsrp.c index 895a13e..4a7d65b 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -372,6 +372,8 @@ struct totemsrp_instance { */ struct cs_queue new_message_queue; + struct cs_queue new_message_queue_trans; + struct cs_queue retrans_message_queue; struct sq regular_sort_queue; @@ -503,6 +505,8 @@ struct totemsrp_instance { uint32_t orf_token_discard; uint32_t threaded_mode_enabled; + + uint32_t waiting_trans_ack; void * token_recv_event_handle; void * token_sent_event_handle; @@ -680,6 +684,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; instance->my_id.no_addrs = INTERFACE_MAX; + + instance->waiting_trans_ack = 1; } static void main_token_seqid_get ( @@ -950,6 +956,10 @@ int totemsrp_initialize ( MESSAGE_QUEUE_MAX, sizeof (struct message_item), instance->threaded_mode_enabled); + cs_queue_init (&instance->new_message_queue_trans, + MESSAGE_QUEUE_MAX, + sizeof (struct message_item), instance->threaded_mode_enabled); + totemsrp_callback_token_create (instance, &instance->token_recv_event_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, @@ -981,6 +991,7 @@ void totemsrp_finalize ( memb_leave_message_send (instance); totemrrp_finalize (instance->totemrrp_context); cs_queue_free (&instance->new_message_queue); + cs_queue_free (&instance->new_message_queue_trans); cs_queue_free (&instance->retrans_message_queue); sq_free (&instance->regular_sort_queue); sq_free (&instance->recovery_sort_queue); @@ -1815,6 +1826,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) trans_memb_list_totemip, instance->my_trans_memb_entries, left_list, instance->my_left_memb_entries, 0, 0, &instance->my_ring_id); + instance->waiting_trans_ack = 1; // TODO we need to filter to ensure we only deliver those // messages which are part of instance->my_deliver_memb @@ -2263,8 +2275,15 @@ int totemsrp_mcast ( struct message_item message_item; char *addr; unsigned int addr_idx; + struct cs_queue *queue_use; - if (cs_queue_is_full (&instance->new_message_queue)) { + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } + + if (cs_queue_is_full (queue_use)) { log_printf (instance->totemsrp_log_level_debug, "queue full"); return (-1); } @@ -2303,7 +2322,7 @@ int totemsrp_mcast ( log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue"); instance->stats.mcast_tx++; - cs_queue_item_add (&instance->new_message_queue, &message_item); + cs_queue_item_add (queue_use, &message_item); return (0); @@ -2318,8 +2337,14 @@ int totemsrp_avail (void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int avail; + struct cs_queue *queue_use; - cs_queue_avail (&instance->new_message_queue, &avail); + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } + cs_queue_avail (queue_use, &avail); return (avail); } @@ -2481,7 +2506,12 @@ static int orf_token_mcast ( sort_queue = &instance->recovery_sort_queue; reset_token_retransmit_timeout (instance); // REVIEWED } else { - mcast_queue = &instance->new_message_queue; + if (instance->waiting_trans_ack) { + mcast_queue = &instance->new_message_queue_trans; + } else { + mcast_queue = &instance->new_message_queue; + } + sort_queue = &instance->regular_sort_queue; } @@ -3370,13 +3400,20 @@ static void token_callbacks_execute ( static unsigned int backlog_get (struct totemsrp_instance *instance) { unsigned int backlog = 0; + struct cs_queue *queue_use = NULL; if (instance->memb_state == MEMB_STATE_OPERATIONAL) { - backlog = cs_queue_used (&instance->new_message_queue); + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } } else if (instance->memb_state == MEMB_STATE_RECOVERY) { - backlog = cs_queue_used (&instance->retrans_message_queue); + queue_use = &instance->retrans_message_queue; } + backlog = cs_queue_used (queue_use); + instance->stats.token[instance->stats.latest_token].backlog_calc = backlog; return (backlog); } @@ -4565,3 +4602,10 @@ void totemsrp_threaded_mode_enable (void *context) instance->threaded_mode_enabled = 1; } + +void totemsrp_trans_ack (void *context) +{ + struct totemsrp_instance *instance = (struct totemsrp_instance *)context; + + instance->waiting_trans_ack = 0; +} diff --git a/exec/totemsrp.h b/exec/totemsrp.h index d29aa3a..4ad4466 100644 --- a/exec/totemsrp.h +++ b/exec/totemsrp.h @@ -138,4 +138,7 @@ extern int totemsrp_member_remove ( void totemsrp_threaded_mode_enable ( void *srp_context); +void totemsrp_trans_ack ( + void *srp_context); + #endif /* TOTEMSRP_H_DEFINED */ diff --git a/include/corosync/totem/totempg.h b/include/corosync/totem/totempg.h index 8bbecbf..bde4209 100644 --- a/include/corosync/totem/totempg.h +++ b/include/corosync/totem/totempg.h @@ -183,6 +183,8 @@ extern void totempg_queue_level_register_callback (totem_queue_level_changed_fn) extern void totempg_threaded_mode_enable (void); +extern void totempg_trans_ack (void); + #ifdef __cplusplus } #endif -- 1.7.11.2 _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss