Re: [PATCH 1/3] Fix problem with sync operations under very rare circumstances

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Jan,
Sorry for turning this old thread out but I was just about to switch
to corosync-1.4.5 and found this patch and confused.

Could you please explain more on this special message queue? Is it
used for distinguishing the new SYNC messages from the old SYNC
messages that queued in the new_message_queue but have not been
originated during the last configuration change? If it is, instead of
letting orf_token_mcast()to set message’s ring_id to my_ring_id , why
not let totemsrp_mcast() to set it immediately ? Then SYNC module code
can filter out the old ring’s SYNC messages correctly without the help
of a special message queue. Generally, I think Totem really should
settle the ring_id down for each message in new_message_queue but not
wait until it being put to wire. Is my understanding theoretically
right?


On 11/8/12, Jan Friesse <jfriesse@xxxxxxxxxx> wrote:
> From: Steven Dake <sdake@xxxxxxxxxx>
>
> This patch creates a special message queue for synchronization messages.
> This prevents a situation in which messages are queued in the
> new_message_queue but have not yet been originated from corrupting the
> synchronization process.
>
> Signed-off-by: Steven Dake <sdake@xxxxxxxxxx>
> ---
>  exec/main.c                      |    4 +++
>  exec/totemmrp.c                  |    5 +++
>  exec/totemmrp.h                  |    2 +
>  exec/totempg.c                   |    5 +++
>  exec/totemsrp.c                  |   56
> +++++++++++++++++++++++++++++++++----
>  exec/totemsrp.h                  |    3 ++
>  include/corosync/totem/totempg.h |    2 +
>  7 files changed, 71 insertions(+), 6 deletions(-)
>
> diff --git a/exec/main.c b/exec/main.c
> index 3dac5fb..91911a8 100644
> --- a/exec/main.c
> +++ b/exec/main.c
> @@ -275,6 +275,10 @@ static void corosync_sync_completed (void)
>
>  	cs_ipcs_sync_state_changed(sync_in_process);
>  	cs_ipc_allow_connections(1);
> +	/*
> +	 * Inform totem to start using new message queue again
> +	 */
> +	totempg_trans_ack();
>  }
>
>  static int corosync_sync_callbacks_retrieve (
> diff --git a/exec/totemmrp.c b/exec/totemmrp.c
> index 84ad031..6ca6093 100644
> --- a/exec/totemmrp.c
> +++ b/exec/totemmrp.c
> @@ -276,3 +276,8 @@ void totemmrp_threaded_mode_enable (void)
>  {
>  	totemsrp_threaded_mode_enable (totemsrp_context);
>  }
> +
> +void totemmrp_trans_ack (void)
> +{
> +	totemsrp_trans_ack (totemsrp_context);
> +}
> diff --git a/exec/totemmrp.h b/exec/totemmrp.h
> index 1977918..c900d45 100644
> --- a/exec/totemmrp.h
> +++ b/exec/totemmrp.h
> @@ -131,4 +131,6 @@ extern int totemmrp_member_remove (
>
>  void totemmrp_threaded_mode_enable (void);
>
> +void totemmrp_trans_ack (void);
> +
>  #endif /* TOTEMMRP_H_DEFINED */
> diff --git a/exec/totempg.c b/exec/totempg.c
> index 3b89e3a..bb8840b 100644
> --- a/exec/totempg.c
> +++ b/exec/totempg.c
> @@ -1477,3 +1477,8 @@ void totempg_threaded_mode_enable (void)
>  	totemmrp_threaded_mode_enable ();
>  }
>
> +void totempg_trans_ack (void)
> +{
> +	totemmrp_trans_ack ();
> +}
> +
> diff --git a/exec/totemsrp.c b/exec/totemsrp.c
> index a4cc19a..b0a0560 100644
> --- a/exec/totemsrp.c
> +++ b/exec/totemsrp.c
> @@ -369,6 +369,8 @@ struct totemsrp_instance {
>  	 */
>  	struct cs_queue new_message_queue;
>
> +	struct cs_queue new_message_queue_trans;
> +
>  	struct cs_queue retrans_message_queue;
>
>  	struct sq regular_sort_queue;
> @@ -502,6 +504,8 @@ struct totemsrp_instance {
>  	uint32_t orf_token_discard;
>
>  	uint32_t threaded_mode_enabled;
> +
> +	uint32_t waiting_trans_ack;
>  	
>  	void * token_recv_event_handle;
>  	void * token_sent_event_handle;
> @@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct
> totemsrp_instance *instance)
>  	instance->commit_token = (struct memb_commit_token
> *)instance->commit_token_storage;
>
>  	instance->my_id.no_addrs = INTERFACE_MAX;
> +
> +	instance->waiting_trans_ack = 1;
>  }
>
>  static void main_token_seqid_get (
> @@ -950,6 +956,10 @@ int totemsrp_initialize (
>  		MESSAGE_QUEUE_MAX,
>  		sizeof (struct message_item), instance->threaded_mode_enabled);
>
> +	cs_queue_init (&instance->new_message_queue_trans,
> +		MESSAGE_QUEUE_MAX,
> +		sizeof (struct message_item), instance->threaded_mode_enabled);
> +
>  	totemsrp_callback_token_create (instance,
>  		&instance->token_recv_event_handle,
>  		TOTEM_CALLBACK_TOKEN_RECEIVED,
> @@ -981,6 +991,7 @@ void totemsrp_finalize (
>  	memb_leave_message_send (instance);
>  	totemrrp_finalize (instance->totemrrp_context);
>  	cs_queue_free (&instance->new_message_queue);
> +	cs_queue_free (&instance->new_message_queue_trans);
>  	cs_queue_free (&instance->retrans_message_queue);
>  	sq_free (&instance->regular_sort_queue);
>  	sq_free (&instance->recovery_sort_queue);
> @@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct
> totemsrp_instance *instance)
>  		trans_memb_list_totemip, instance->my_trans_memb_entries,
>  		left_list, instance->my_left_memb_entries,
>  		0, 0, &instance->my_ring_id);
> +	instance->waiting_trans_ack = 1;
>
>  // TODO we need to filter to ensure we only deliver those
>  // messages which are part of instance->my_deliver_memb
> @@ -2265,8 +2277,15 @@ int totemsrp_mcast (
>  	struct message_item message_item;
>  	char *addr;
>  	unsigned int addr_idx;
> +	struct cs_queue *queue_use;
>
> -	if (cs_queue_is_full (&instance->new_message_queue)) {
> +	if (instance->waiting_trans_ack) {
> +		queue_use = &instance->new_message_queue_trans;
> +	} else {
> +		queue_use = &instance->new_message_queue;
> +	}
> +
> +	if (cs_queue_is_full (queue_use)) {
>  		log_printf (instance->totemsrp_log_level_debug, "queue full");
>  		return (-1);
>  	}
> @@ -2305,7 +2324,7 @@ int totemsrp_mcast (
>
>  	log_printf (instance->totemsrp_log_level_trace, "mcasted message added to
> pending queue");
>  	instance->stats.mcast_tx++;
> -	cs_queue_item_add (&instance->new_message_queue, &message_item);
> +	cs_queue_item_add (queue_use, &message_item);
>
>  	return (0);
>
> @@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context)
>  {
>  	struct totemsrp_instance *instance = (struct totemsrp_instance
> *)srp_context;
>  	int avail;
> +	struct cs_queue *queue_use;
>
> -	cs_queue_avail (&instance->new_message_queue, &avail);
> +	if (instance->waiting_trans_ack) {
> +		queue_use = &instance->new_message_queue_trans;
> +	} else {
> +		queue_use = &instance->new_message_queue;
> +	}
> +	cs_queue_avail (queue_use, &avail);
>
>  	return (avail);
>  }
> @@ -2483,7 +2508,12 @@ static int orf_token_mcast (
>  		sort_queue = &instance->recovery_sort_queue;
>  		reset_token_retransmit_timeout (instance); // REVIEWED
>  	} else {
> -		mcast_queue = &instance->new_message_queue;
> +		if (instance->waiting_trans_ack) {
> +			mcast_queue = &instance->new_message_queue_trans;
> +		} else {
> +			mcast_queue = &instance->new_message_queue;
> +		}
> +
>  		sort_queue = &instance->regular_sort_queue;
>  	}
>
> @@ -3372,13 +3402,20 @@ static void token_callbacks_execute (
>  static unsigned int backlog_get (struct totemsrp_instance *instance)
>  {
>  	unsigned int backlog = 0;
> +	struct cs_queue *queue_use = NULL;
>
>  	if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
> -		backlog = cs_queue_used (&instance->new_message_queue);
> +		if (instance->waiting_trans_ack) {
> +			queue_use = &instance->new_message_queue_trans;
> +		} else {
> +			queue_use = &instance->new_message_queue;
> +		}
>  	} else
>  	if (instance->memb_state == MEMB_STATE_RECOVERY) {
> -		backlog = cs_queue_used (&instance->retrans_message_queue);
> +		queue_use = &instance->retrans_message_queue;
>  	}
> +	backlog = cs_queue_used (queue_use);
> +
>  	instance->stats.token[instance->stats.latest_token].backlog_calc =
> backlog;
>  	return (backlog);
>  }
> @@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context)
>
>  	instance->threaded_mode_enabled = 1;
>  }
> +
> +void totemsrp_trans_ack (void *context)
> +{
> +	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
> +
> +	instance->waiting_trans_ack = 0;
> +}
> diff --git a/exec/totemsrp.h b/exec/totemsrp.h
> index d29aa3a..4ad4466 100644
> --- a/exec/totemsrp.h
> +++ b/exec/totemsrp.h
> @@ -138,4 +138,7 @@ extern int totemsrp_member_remove (
>  void totemsrp_threaded_mode_enable (
>  	void *srp_context);
>
> +void totemsrp_trans_ack (
> +	void *srp_context);
> +
>  #endif /* TOTEMSRP_H_DEFINED */
> diff --git a/include/corosync/totem/totempg.h
> b/include/corosync/totem/totempg.h
> index 6ab9fd0..9938e5b 100644
> --- a/include/corosync/totem/totempg.h
> +++ b/include/corosync/totem/totempg.h
> @@ -184,6 +184,8 @@ extern void totempg_queue_level_register_callback
> (totem_queue_level_changed_fn)
>
>  extern void totempg_threaded_mode_enable (void);
>
> +extern void totempg_trans_ack (void);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> --
> 1.7.1
>
> _______________________________________________
> discuss mailing list
> discuss@xxxxxxxxxxxx
> http://lists.corosync.org/mailman/listinfo/discuss
>


-- 
Yours,
jason

_______________________________________________
discuss mailing list
discuss@xxxxxxxxxxxx
http://lists.corosync.org/mailman/listinfo/discuss



[Index of Archives]     [Linux Clusters]     [Corosync Project]     [Linux USB Devel]     [Linux Audio Users]     [Photo]     [Yosemite News]    [Yosemite Photos]    [Linux Kernel]     [Linux SCSI]     [X.Org]

  Powered by Linux