Re: [PATCH 1/3] Fix problem with sync operations under very rare circumstances

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Jason,
you can test simply reproducer:
- start corosync
- run testcpg
- pause corosync
- send some messages via testcpg
- unpause corosync

What will happen is, that messages are send to wire BEFORE sync
messages. This is incorrect order. Actually, this has nothing to do with
totem srp itself, but it's easiest to implement fix in totemsrp code.
Actually stack (from message point of view) looks like:

- totemsrp - MTU fragments, has ringid, ...
- totempg - Fragmentation layer
- sync, cpg, ... - Services receiving, sending messages

But from the sync point of view, stack looks different, because there is
totemsrp which handle resending (of fragments), recovery (again of
fragments). When finished it sends to all services which implements
confchg call. In old openais, we were using confchg directly, but
services was then not synced at same order. That's why only sync service
handle confchg call and other services handles only sync_* calls.

Sadly, from message point of view, there is no difference between sync
messages and (for example) cpg messages.

jason napsal(a):
> Hi Jan,
> Sorry for turning this old thread out but I was just about to switch
> to corosync-1.4.5 and found this patch and confused.
> 
> Could you please explain more on this special message queue? Is it
> used for distinguishing the new SYNC messages from the old SYNC
> messages that queued in the new_message_queue but have not been
> originated during the last configuration change? If it is, instead of
> letting orf_token_mcast()to set message’s ring_id to my_ring_id , why
> not let totemsrp_mcast() to set it immediately ? Then SYNC module code
> can filter out the old ring’s SYNC messages correctly without the help

We are not talking about sync messages, but about regular messages. Old
SYNC messages are correctly filtered.

> of a special message queue. Generally, I think Totem really should
> settle the ring_id down for each message in new_message_queue but not
> wait until it being put to wire. Is my understanding theoretically

We must wait until token holding, because we don't know what is ring_id.

> right?
> 
> 
> On 11/8/12, Jan Friesse <jfriesse@xxxxxxxxxx> wrote:
>> From: Steven Dake <sdake@xxxxxxxxxx>
>>
>> This patch creates a special message queue for synchronization messages.
>> This prevents a situation in which messages are queued in the
>> new_message_queue but have not yet been originated from corrupting the
>> synchronization process.
>>
>> Signed-off-by: Steven Dake <sdake@xxxxxxxxxx>
>> ---
>>  exec/main.c                      |    4 +++
>>  exec/totemmrp.c                  |    5 +++
>>  exec/totemmrp.h                  |    2 +
>>  exec/totempg.c                   |    5 +++
>>  exec/totemsrp.c                  |   56
>> +++++++++++++++++++++++++++++++++----
>>  exec/totemsrp.h                  |    3 ++
>>  include/corosync/totem/totempg.h |    2 +
>>  7 files changed, 71 insertions(+), 6 deletions(-)
>>
>> diff --git a/exec/main.c b/exec/main.c
>> index 3dac5fb..91911a8 100644
>> --- a/exec/main.c
>> +++ b/exec/main.c
>> @@ -275,6 +275,10 @@ static void corosync_sync_completed (void)
>>
>>  	cs_ipcs_sync_state_changed(sync_in_process);
>>  	cs_ipc_allow_connections(1);
>> +	/*
>> +	 * Inform totem to start using new message queue again
>> +	 */
>> +	totempg_trans_ack();
>>  }
>>
>>  static int corosync_sync_callbacks_retrieve (
>> diff --git a/exec/totemmrp.c b/exec/totemmrp.c
>> index 84ad031..6ca6093 100644
>> --- a/exec/totemmrp.c
>> +++ b/exec/totemmrp.c
>> @@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void)
>>  {
>>  	totemsrp_threaded_mode_enable (totemsrp_context);
>>  }
>> +
>> +void totemmrp_trans_ack (void)
>> +{
>> +	totemsrp_trans_ack (totemsrp_context);
>> +}
>> diff --git a/exec/totemmrp.h b/exec/totemmrp.h
>> index 1977918..c900d45 100644
>> --- a/exec/totemmrp.h
>> +++ b/exec/totemmrp.h
>> @@ -131,4 +131,6 @@ extern int totemmrp_member_remove (
>>
>>  void totemmrp_threaded_mode_enable (void);
>>
>> +void totemmrp_trans_ack (void);
>> +
>>  #endif /* TOTEMMRP_H_DEFINED */
>> diff --git a/exec/totempg.c b/exec/totempg.c
>> index 3b89e3a..bb8840b 100644
>> --- a/exec/totempg.c
>> +++ b/exec/totempg.c
>> @@ -1477,3 +1477,8 @@ void totempg_threaded_mode_enable (void)
>>  	totemmrp_threaded_mode_enable ();
>>  }
>>
>> +void totempg_trans_ack (void)
>> +{
>> +	totemmrp_trans_ack ();
>> +}
>> +
>> diff --git a/exec/totemsrp.c b/exec/totemsrp.c
>> index a4cc19a..b0a0560 100644
>> --- a/exec/totemsrp.c
>> +++ b/exec/totemsrp.c
>> @@ -369,6 +369,8 @@ struct totemsrp_instance {
>>  	 */
>>  	struct cs_queue new_message_queue;
>>
>> +	struct cs_queue new_message_queue_trans;
>> +
>>  	struct cs_queue retrans_message_queue;
>>
>>  	struct sq regular_sort_queue;
>> @@ -502,6 +504,8 @@ struct totemsrp_instance {
>>  	uint32_t orf_token_discard;
>>
>>  	uint32_t threaded_mode_enabled;
>> +
>> +	uint32_t waiting_trans_ack;
>>  	
>>  	void * token_recv_event_handle;
>>  	void * token_sent_event_handle;
>> @@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct
>> totemsrp_instance *instance)
>>  	instance->commit_token = (struct memb_commit_token
>> *)instance->commit_token_storage;
>>
>>  	instance->my_id.no_addrs = INTERFACE_MAX;
>> +
>> +	instance->waiting_trans_ack = 1;
>>  }
>>
>>  static void main_token_seqid_get (
>> @@ -950,6 +956,10 @@ int totemsrp_initialize (
>>  		MESSAGE_QUEUE_MAX,
>>  		sizeof (struct message_item), instance->threaded_mode_enabled);
>>
>> +	cs_queue_init (&instance->new_message_queue_trans,
>> +		MESSAGE_QUEUE_MAX,
>> +		sizeof (struct message_item), instance->threaded_mode_enabled);
>> +
>>  	totemsrp_callback_token_create (instance,
>>  		&instance->token_recv_event_handle,
>>  		TOTEM_CALLBACK_TOKEN_RECEIVED,
>> @@ -981,6 +991,7 @@ void totemsrp_finalize (
>>  	memb_leave_message_send (instance);
>>  	totemrrp_finalize (instance->totemrrp_context);
>>  	cs_queue_free (&instance->new_message_queue);
>> +	cs_queue_free (&instance->new_message_queue_trans);
>>  	cs_queue_free (&instance->retrans_message_queue);
>>  	sq_free (&instance->regular_sort_queue);
>>  	sq_free (&instance->recovery_sort_queue);
>> @@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct
>> totemsrp_instance *instance)
>>  		trans_memb_list_totemip, instance->my_trans_memb_entries,
>>  		left_list, instance->my_left_memb_entries,
>>  		0, 0, &instance->my_ring_id);
>> +	instance->waiting_trans_ack = 1;
>>
>>  // TODO we need to filter to ensure we only deliver those
>>  // messages which are part of instance->my_deliver_memb
>> @@ -2265,8 +2277,15 @@ int totemsrp_mcast (
>>  	struct message_item message_item;
>>  	char *addr;
>>  	unsigned int addr_idx;
>> +	struct cs_queue *queue_use;
>>
>> -	if (cs_queue_is_full (&instance->new_message_queue)) {
>> +	if (instance->waiting_trans_ack) {
>> +		queue_use = &instance->new_message_queue_trans;
>> +	} else {
>> +		queue_use = &instance->new_message_queue;
>> +	}
>> +
>> +	if (cs_queue_is_full (queue_use)) {
>>  		log_printf (instance->totemsrp_log_level_debug, "queue full");
>>  		return (-1);
>>  	}
>> @@ -2305,7 +2324,7 @@ int totemsrp_mcast (
>>
>>  	log_printf (instance->totemsrp_log_level_trace, "mcasted message added to
>> pending queue");
>>  	instance->stats.mcast_tx++;
>> -	cs_queue_item_add (&instance->new_message_queue, &message_item);
>> +	cs_queue_item_add (queue_use, &message_item);
>>
>>  	return (0);
>>
>> @@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context)
>>  {
>>  	struct totemsrp_instance *instance = (struct totemsrp_instance
>> *)srp_context;
>>  	int avail;
>> +	struct cs_queue *queue_use;
>>
>> -	cs_queue_avail (&instance->new_message_queue, &avail);
>> +	if (instance->waiting_trans_ack) {
>> +		queue_use = &instance->new_message_queue_trans;
>> +	} else {
>> +		queue_use = &instance->new_message_queue;
>> +	}
>> +	cs_queue_avail (queue_use, &avail);
>>
>>  	return (avail);
>>  }
>> @@ -2483,7 +2508,12 @@ static int orf_token_mcast (
>>  		sort_queue = &instance->recovery_sort_queue;
>>  		reset_token_retransmit_timeout (instance); // REVIEWED
>>  	} else {
>> -		mcast_queue = &instance->new_message_queue;
>> +		if (instance->waiting_trans_ack) {
>> +			mcast_queue = &instance->new_message_queue_trans;
>> +		} else {
>> +			mcast_queue = &instance->new_message_queue;
>> +		}
>> +
>>  		sort_queue = &instance->regular_sort_queue;
>>  	}
>>
>> @@ -3372,13 +3402,20 @@ static void token_callbacks_execute (
>>  static unsigned int backlog_get (struct totemsrp_instance *instance)
>>  {
>>  	unsigned int backlog = 0;
>> +	struct cs_queue *queue_use = NULL;
>>
>>  	if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
>> -		backlog = cs_queue_used (&instance->new_message_queue);
>> +		if (instance->waiting_trans_ack) {
>> +			queue_use = &instance->new_message_queue_trans;
>> +		} else {
>> +			queue_use = &instance->new_message_queue;
>> +		}
>>  	} else
>>  	if (instance->memb_state == MEMB_STATE_RECOVERY) {
>> -		backlog = cs_queue_used (&instance->retrans_message_queue);
>> +		queue_use = &instance->retrans_message_queue;
>>  	}
>> +	backlog = cs_queue_used (queue_use);
>> +
>>  	instance->stats.token[instance->stats.latest_token].backlog_calc =
>> backlog;
>>  	return (backlog);
>>  }
>> @@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context)
>>
>>  	instance->threaded_mode_enabled = 1;
>>  }
>> +
>> +void totemsrp_trans_ack (void *context)
>> +{
>> +	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
>> +
>> +	instance->waiting_trans_ack = 0;
>> +}
>> diff --git a/exec/totemsrp.h b/exec/totemsrp.h
>> index d29aa3a..4ad4466 100644
>> --- a/exec/totemsrp.h
>> +++ b/exec/totemsrp.h
>> @@ -138,4 +138,7 @@ extern int totemsrp_member_remove (
>>  void totemsrp_threaded_mode_enable (
>>  	void *srp_context);
>>
>> +void totemsrp_trans_ack (
>> +	void *srp_context);
>> +
>>  #endif /* TOTEMSRP_H_DEFINED */
>> diff --git a/include/corosync/totem/totempg.h
>> b/include/corosync/totem/totempg.h
>> index 6ab9fd0..9938e5b 100644
>> --- a/include/corosync/totem/totempg.h
>> +++ b/include/corosync/totem/totempg.h
>> @@ -184,6 +184,8 @@ extern void totempg_queue_level_register_callback
>> (totem_queue_level_changed_fn)
>>
>>  extern void totempg_threaded_mode_enable (void);
>>
>> +extern void totempg_trans_ack (void);
>> +
>>  #ifdef __cplusplus
>>  }
>>  #endif
>> --
>> 1.7.1
>>
>> _______________________________________________
>> discuss mailing list
>> discuss@xxxxxxxxxxxx
>> http://lists.corosync.org/mailman/listinfo/discuss
>>
> 
> 

_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss



[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux