From: Steven Dake <sdake@xxxxxxxxxx> This patch creates a special message queue for synchronization messages. This prevents a situation in which messages are queued in the new_message_queue but have not yet been originated from corrupting the synchronization process. Signed-off-by: Steven Dake <sdake@xxxxxxxxxx> --- exec/main.c | 4 +++ exec/totemmrp.c | 5 +++ exec/totemmrp.h | 2 + exec/totempg.c | 5 +++ exec/totemsrp.c | 56 +++++++++++++++++++++++++++++++++---- exec/totemsrp.h | 3 ++ include/corosync/totem/totempg.h | 2 + 7 files changed, 71 insertions(+), 6 deletions(-) diff --git a/exec/main.c b/exec/main.c index 3dac5fb..91911a8 100644 --- a/exec/main.c +++ b/exec/main.c @@ -275,6 +275,10 @@ static void corosync_sync_completed (void) cs_ipcs_sync_state_changed(sync_in_process); cs_ipc_allow_connections(1); + /* + * Inform totem to start using new message queue again + */ + totempg_trans_ack(); } static int corosync_sync_callbacks_retrieve ( diff --git a/exec/totemmrp.c b/exec/totemmrp.c index 84ad031..6ca6093 100644 --- a/exec/totemmrp.c +++ b/exec/totemmrp.c @@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void) { totemsrp_threaded_mode_enable (totemsrp_context); } + +void totemmrp_trans_ack (void) +{ + totemsrp_trans_ack (totemsrp_context); +} diff --git a/exec/totemmrp.h b/exec/totemmrp.h index 1977918..c900d45 100644 --- a/exec/totemmrp.h +++ b/exec/totemmrp.h @@ -131,4 +131,6 @@ extern int totemmrp_member_remove ( void totemmrp_threaded_mode_enable (void); +void totemmrp_trans_ack (void); + #endif /* TOTEMMRP_H_DEFINED */ diff --git a/exec/totempg.c b/exec/totempg.c index 3b89e3a..bb8840b 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -1477,3 +1477,8 @@ void totempg_threaded_mode_enable (void) totemmrp_threaded_mode_enable (); } +void totempg_trans_ack (void) +{ + totemmrp_trans_ack (); +} + diff --git a/exec/totemsrp.c b/exec/totemsrp.c index a4cc19a..b0a0560 100644 --- a/exec/totemsrp.c +++ b/exec/totemsrp.c @@ -369,6 +369,8 @@ struct totemsrp_instance { */ struct cs_queue new_message_queue; + struct cs_queue new_message_queue_trans; + struct cs_queue retrans_message_queue; struct sq regular_sort_queue; @@ -502,6 +504,8 @@ struct totemsrp_instance { uint32_t orf_token_discard; uint32_t threaded_mode_enabled; + + uint32_t waiting_trans_ack; void * token_recv_event_handle; void * token_sent_event_handle; @@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; instance->my_id.no_addrs = INTERFACE_MAX; + + instance->waiting_trans_ack = 1; } static void main_token_seqid_get ( @@ -950,6 +956,10 @@ int totemsrp_initialize ( MESSAGE_QUEUE_MAX, sizeof (struct message_item), instance->threaded_mode_enabled); + cs_queue_init (&instance->new_message_queue_trans, + MESSAGE_QUEUE_MAX, + sizeof (struct message_item), instance->threaded_mode_enabled); + totemsrp_callback_token_create (instance, &instance->token_recv_event_handle, TOTEM_CALLBACK_TOKEN_RECEIVED, @@ -981,6 +991,7 @@ void totemsrp_finalize ( memb_leave_message_send (instance); totemrrp_finalize (instance->totemrrp_context); cs_queue_free (&instance->new_message_queue); + cs_queue_free (&instance->new_message_queue_trans); cs_queue_free (&instance->retrans_message_queue); sq_free (&instance->regular_sort_queue); sq_free (&instance->recovery_sort_queue); @@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) trans_memb_list_totemip, instance->my_trans_memb_entries, left_list, instance->my_left_memb_entries, 0, 0, &instance->my_ring_id); + instance->waiting_trans_ack = 1; // TODO we need to filter to ensure we only deliver those // messages which are part of instance->my_deliver_memb @@ -2265,8 +2277,15 @@ int totemsrp_mcast ( struct message_item message_item; char *addr; unsigned int addr_idx; + struct cs_queue *queue_use; - if (cs_queue_is_full (&instance->new_message_queue)) { + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } + + if (cs_queue_is_full (queue_use)) { log_printf (instance->totemsrp_log_level_debug, "queue full"); return (-1); } @@ -2305,7 +2324,7 @@ int totemsrp_mcast ( log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue"); instance->stats.mcast_tx++; - cs_queue_item_add (&instance->new_message_queue, &message_item); + cs_queue_item_add (queue_use, &message_item); return (0); @@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context) { struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; int avail; + struct cs_queue *queue_use; - cs_queue_avail (&instance->new_message_queue, &avail); + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } + cs_queue_avail (queue_use, &avail); return (avail); } @@ -2483,7 +2508,12 @@ static int orf_token_mcast ( sort_queue = &instance->recovery_sort_queue; reset_token_retransmit_timeout (instance); // REVIEWED } else { - mcast_queue = &instance->new_message_queue; + if (instance->waiting_trans_ack) { + mcast_queue = &instance->new_message_queue_trans; + } else { + mcast_queue = &instance->new_message_queue; + } + sort_queue = &instance->regular_sort_queue; } @@ -3372,13 +3402,20 @@ static void token_callbacks_execute ( static unsigned int backlog_get (struct totemsrp_instance *instance) { unsigned int backlog = 0; + struct cs_queue *queue_use = NULL; if (instance->memb_state == MEMB_STATE_OPERATIONAL) { - backlog = cs_queue_used (&instance->new_message_queue); + if (instance->waiting_trans_ack) { + queue_use = &instance->new_message_queue_trans; + } else { + queue_use = &instance->new_message_queue; + } } else if (instance->memb_state == MEMB_STATE_RECOVERY) { - backlog = cs_queue_used (&instance->retrans_message_queue); + queue_use = &instance->retrans_message_queue; } + backlog = cs_queue_used (queue_use); + instance->stats.token[instance->stats.latest_token].backlog_calc = backlog; return (backlog); } @@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context) instance->threaded_mode_enabled = 1; } + +void totemsrp_trans_ack (void *context) +{ + struct totemsrp_instance *instance = (struct totemsrp_instance *)context; + + instance->waiting_trans_ack = 0; +} diff --git a/exec/totemsrp.h b/exec/totemsrp.h index d29aa3a..4ad4466 100644 --- a/exec/totemsrp.h +++ b/exec/totemsrp.h @@ -138,4 +138,7 @@ extern int totemsrp_member_remove ( void totemsrp_threaded_mode_enable ( void *srp_context); +void totemsrp_trans_ack ( + void *srp_context); + #endif /* TOTEMSRP_H_DEFINED */ diff --git a/include/corosync/totem/totempg.h b/include/corosync/totem/totempg.h index 6ab9fd0..9938e5b 100644 --- a/include/corosync/totem/totempg.h +++ b/include/corosync/totem/totempg.h @@ -184,6 +184,8 @@ extern void totempg_queue_level_register_callback (totem_queue_level_changed_fn) extern void totempg_threaded_mode_enable (void); +extern void totempg_trans_ack (void); + #ifdef __cplusplus } #endif -- 1.7.1 _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss