ACK On 11/8/2012 4:08 PM, Jan Friesse wrote: > From: Steven Dake <sdake@xxxxxxxxxx> > > This patch creates a special message queue for synchronization messages. > This prevents a situation in which messages are queued in the > new_message_queue but have not yet been originated from corrupting the > synchronization process. > > Signed-off-by: Steven Dake <sdake@xxxxxxxxxx> > --- > exec/main.c | 4 +++ > exec/totemmrp.c | 5 +++ > exec/totemmrp.h | 2 + > exec/totempg.c | 5 +++ > exec/totemsrp.c | 56 +++++++++++++++++++++++++++++++++---- > exec/totemsrp.h | 3 ++ > include/corosync/totem/totempg.h | 2 + > 7 files changed, 71 insertions(+), 6 deletions(-) > > diff --git a/exec/main.c b/exec/main.c > index 3dac5fb..91911a8 100644 > --- a/exec/main.c > +++ b/exec/main.c > @@ -275,6 +275,10 @@ static void corosync_sync_completed (void) > > cs_ipcs_sync_state_changed(sync_in_process); > cs_ipc_allow_connections(1); > + /* > + * Inform totem to start using new message queue again > + */ > + totempg_trans_ack(); > } > > static int corosync_sync_callbacks_retrieve ( > diff --git a/exec/totemmrp.c b/exec/totemmrp.c > index 84ad031..6ca6093 100644 > --- a/exec/totemmrp.c > +++ b/exec/totemmrp.c > @@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void) > { > totemsrp_threaded_mode_enable (totemsrp_context); > } > + > +void totemmrp_trans_ack (void) > +{ > + totemsrp_trans_ack (totemsrp_context); > +} > diff --git a/exec/totemmrp.h b/exec/totemmrp.h > index 1977918..c900d45 100644 > --- a/exec/totemmrp.h > +++ b/exec/totemmrp.h > @@ -131,4 +131,6 @@ extern int totemmrp_member_remove ( > > void totemmrp_threaded_mode_enable (void); > > +void totemmrp_trans_ack (void); > + > #endif /* TOTEMMRP_H_DEFINED */ > diff --git a/exec/totempg.c b/exec/totempg.c > index 3b89e3a..bb8840b 100644 > --- a/exec/totempg.c > +++ b/exec/totempg.c > @@ -1477,3 +1477,8 @@ void totempg_threaded_mode_enable (void) > totemmrp_threaded_mode_enable (); > } > > +void totempg_trans_ack (void) > +{ > + totemmrp_trans_ack (); > +} > + > diff --git a/exec/totemsrp.c b/exec/totemsrp.c > index a4cc19a..b0a0560 100644 > --- a/exec/totemsrp.c > +++ b/exec/totemsrp.c > @@ -369,6 +369,8 @@ struct totemsrp_instance { > */ > struct cs_queue new_message_queue; > > + struct cs_queue new_message_queue_trans; > + > struct cs_queue retrans_message_queue; > > struct sq regular_sort_queue; > @@ -502,6 +504,8 @@ struct totemsrp_instance { > uint32_t orf_token_discard; > > uint32_t threaded_mode_enabled; > + > + uint32_t waiting_trans_ack; > > void * token_recv_event_handle; > void * token_sent_event_handle; > @@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance) > instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage; > > instance->my_id.no_addrs = INTERFACE_MAX; > + > + instance->waiting_trans_ack = 1; > } > > static void main_token_seqid_get ( > @@ -950,6 +956,10 @@ int totemsrp_initialize ( > MESSAGE_QUEUE_MAX, > sizeof (struct message_item), instance->threaded_mode_enabled); > > + cs_queue_init (&instance->new_message_queue_trans, > + MESSAGE_QUEUE_MAX, > + sizeof (struct message_item), instance->threaded_mode_enabled); > + > totemsrp_callback_token_create (instance, > &instance->token_recv_event_handle, > TOTEM_CALLBACK_TOKEN_RECEIVED, > @@ -981,6 +991,7 @@ void totemsrp_finalize ( > memb_leave_message_send (instance); > totemrrp_finalize (instance->totemrrp_context); > cs_queue_free (&instance->new_message_queue); > + cs_queue_free (&instance->new_message_queue_trans); > cs_queue_free (&instance->retrans_message_queue); > sq_free (&instance->regular_sort_queue); > sq_free (&instance->recovery_sort_queue); > @@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance) > trans_memb_list_totemip, instance->my_trans_memb_entries, > left_list, instance->my_left_memb_entries, > 0, 0, &instance->my_ring_id); > + instance->waiting_trans_ack = 1; > > // TODO we need to filter to ensure we only deliver those > // messages which are part of instance->my_deliver_memb > @@ -2265,8 +2277,15 @@ int totemsrp_mcast ( > struct message_item message_item; > char *addr; > unsigned int addr_idx; > + struct cs_queue *queue_use; > > - if (cs_queue_is_full (&instance->new_message_queue)) { > + if (instance->waiting_trans_ack) { > + queue_use = &instance->new_message_queue_trans; > + } else { > + queue_use = &instance->new_message_queue; > + } > + > + if (cs_queue_is_full (queue_use)) { > log_printf (instance->totemsrp_log_level_debug, "queue full"); > return (-1); > } > @@ -2305,7 +2324,7 @@ int totemsrp_mcast ( > > log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue"); > instance->stats.mcast_tx++; > - cs_queue_item_add (&instance->new_message_queue, &message_item); > + cs_queue_item_add (queue_use, &message_item); > > return (0); > > @@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context) > { > struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context; > int avail; > + struct cs_queue *queue_use; > > - cs_queue_avail (&instance->new_message_queue, &avail); > + if (instance->waiting_trans_ack) { > + queue_use = &instance->new_message_queue_trans; > + } else { > + queue_use = &instance->new_message_queue; > + } > + cs_queue_avail (queue_use, &avail); > > return (avail); > } > @@ -2483,7 +2508,12 @@ static int orf_token_mcast ( > sort_queue = &instance->recovery_sort_queue; > reset_token_retransmit_timeout (instance); // REVIEWED > } else { > - mcast_queue = &instance->new_message_queue; > + if (instance->waiting_trans_ack) { > + mcast_queue = &instance->new_message_queue_trans; > + } else { > + mcast_queue = &instance->new_message_queue; > + } > + > sort_queue = &instance->regular_sort_queue; > } > > @@ -3372,13 +3402,20 @@ static void token_callbacks_execute ( > static unsigned int backlog_get (struct totemsrp_instance *instance) > { > unsigned int backlog = 0; > + struct cs_queue *queue_use = NULL; > > if (instance->memb_state == MEMB_STATE_OPERATIONAL) { > - backlog = cs_queue_used (&instance->new_message_queue); > + if (instance->waiting_trans_ack) { > + queue_use = &instance->new_message_queue_trans; > + } else { > + queue_use = &instance->new_message_queue; > + } > } else > if (instance->memb_state == MEMB_STATE_RECOVERY) { > - backlog = cs_queue_used (&instance->retrans_message_queue); > + queue_use = &instance->retrans_message_queue; > } > + backlog = cs_queue_used (queue_use); > + > instance->stats.token[instance->stats.latest_token].backlog_calc = backlog; > return (backlog); > } > @@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context) > > instance->threaded_mode_enabled = 1; > } > + > +void totemsrp_trans_ack (void *context) > +{ > + struct totemsrp_instance *instance = (struct totemsrp_instance *)context; > + > + instance->waiting_trans_ack = 0; > +} > diff --git a/exec/totemsrp.h b/exec/totemsrp.h > index d29aa3a..4ad4466 100644 > --- a/exec/totemsrp.h > +++ b/exec/totemsrp.h > @@ -138,4 +138,7 @@ extern int totemsrp_member_remove ( > void totemsrp_threaded_mode_enable ( > void *srp_context); > > +void totemsrp_trans_ack ( > + void *srp_context); > + > #endif /* TOTEMSRP_H_DEFINED */ > diff --git a/include/corosync/totem/totempg.h b/include/corosync/totem/totempg.h > index 6ab9fd0..9938e5b 100644 > --- a/include/corosync/totem/totempg.h > +++ b/include/corosync/totem/totempg.h > @@ -184,6 +184,8 @@ extern void totempg_queue_level_register_callback (totem_queue_level_changed_fn) > > extern void totempg_threaded_mode_enable (void); > > +extern void totempg_trans_ack (void); > + > #ifdef __cplusplus > } > #endif > _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss