It's used when trans_ack is changed so after change, we have clean room and/or previous state. Signed-off-by: Jan Friesse <jfriesse@xxxxxxxxxx> --- exec/totempg.c | 175 +++++++++++++++++++++++++++++++++---------------------- 1 files changed, 105 insertions(+), 70 deletions(-) diff --git a/exec/totempg.c b/exec/totempg.c index 0649cab..a6319ca 100644 --- a/exec/totempg.c +++ b/exec/totempg.c @@ -152,16 +152,6 @@ struct totempg_mcast { #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \ sizeof (struct totempg_mcast)) -/* - * Local variables used for packing small messages - */ -static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; - -static int mcast_packed_msg_count = 0; - -static int totempg_reserved = 1; - -static unsigned int totempg_size_limit; /* * Function and data used to log messages @@ -211,24 +201,41 @@ DECLARE_LIST_INIT(assembly_list_free_trans); DECLARE_LIST_INIT(assembly_list_free); /* - * Staging buffer for packed messages. Messages are staged in this buffer - * before sending. Multiple messages may fit which cuts down on the - * number of mcasts sent. If a message doesn't completely fit, then - * the mcast header has a fragment bit set that says that there are more - * data to follow. fragment_size is an index into the buffer. It indicates - * the size of message data and where to place new message data. - * fragment_contuation indicates whether the first packed message in - * the buffer is a continuation of a previously packed fragment. + * Structure for storing totem_pg contexts so they can be switched */ -static unsigned char *fragmentation_data; +struct totempg_context { + /* + * Staging buffer for packed messages. Messages are staged in this buffer + * before sending. Multiple messages may fit which cuts down on the + * number of mcasts sent. If a message doesn't completely fit, then + * the mcast header has a fragment bit set that says that there are more + * data to follow. fragment_size is an index into the buffer. It indicates + * the size of message data and where to place new message data. + * fragment_contuation indicates whether the first packed message in + * the buffer is a continuation of a previously packed fragment. + */ + unsigned char *fragmentation_data; -static int fragment_size = 0; + int fragment_size; -static int fragment_continuation = 0; + int fragment_continuation; -static int totempg_waiting_transack = 0; + unsigned char next_fragment; -static unsigned int totempg_max_handle = 0; + /* + * Local variables used for packing small messages + */ + unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX]; + + int mcast_packed_msg_count; + + int totempg_reserved; +}; + +#define TOTEMPG_NO_CONTEXTS 2 + +static struct totempg_context totempg_contexts_array[TOTEMPG_NO_CONTEXTS]; +static struct totempg_context *totempg_active_context; struct totempg_group_instance { void (*deliver_fn) ( @@ -251,7 +258,9 @@ struct totempg_group_instance { DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL); -static unsigned char next_fragment = 1; +static unsigned int totempg_max_handle = 0; +static int totempg_waiting_transack = 0; +static unsigned int totempg_size_limit = 0; static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -273,12 +282,23 @@ static int msg_count_send_ok (int msg_count); static int byte_count_send_ok (int byte_count); +static void totempg_set_context(int context_no) +{ + totempg_active_context = &totempg_contexts_array[context_no]; +} + static void totempg_waiting_trans_ack_cb (int waiting_trans_ack) { log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack); totempg_waiting_transack = waiting_trans_ack; + if (!waiting_trans_ack) { + totempg_set_context(0); + } else { + totempg_set_context(1); + } } + static struct assembly *assembly_ref (unsigned int nodeid) { struct assembly *assembly; @@ -746,9 +766,10 @@ int callback_token_received_fn (enum totem_callback_token_type type, struct totempg_mcast mcast; struct iovec iovecs[3]; int res; + struct totempg_context *con = totempg_active_context; pthread_mutex_lock (&mcast_msg_mutex); - if (mcast_packed_msg_count == 0) { + if (con->mcast_packed_msg_count == 0) { pthread_mutex_unlock (&mcast_msg_mutex); return (0); } @@ -763,21 +784,21 @@ int callback_token_received_fn (enum totem_callback_token_type type, * Was the first message in this buffer a continuation of a * fragmented message? */ - mcast.continuation = fragment_continuation; - fragment_continuation = 0; + mcast.continuation = con->fragment_continuation; + con->fragment_continuation = 0; - mcast.msg_count = mcast_packed_msg_count; + mcast.msg_count = con->mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof (struct totempg_mcast); - iovecs[1].iov_base = (void *)mcast_packed_msg_lens; - iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short); - iovecs[2].iov_base = (void *)&fragmentation_data[0]; - iovecs[2].iov_len = fragment_size; + iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens; + iovecs[1].iov_len = con->mcast_packed_msg_count * sizeof (unsigned short); + iovecs[2].iov_base = (void *)&con->fragmentation_data[0]; + iovecs[2].iov_len = con->fragment_size; res = totemmrp_mcast (iovecs, 3, 0); - mcast_packed_msg_count = 0; - fragment_size = 0; + con->mcast_packed_msg_count = 0; + con->fragment_size = 0; pthread_mutex_unlock (&mcast_msg_mutex); return (0); @@ -791,6 +812,8 @@ int totempg_initialize ( struct totem_config *totem_config) { int res; + int i; + struct totempg_context *con; totempg_totem_config = totem_config; totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security; @@ -801,9 +824,16 @@ int totempg_initialize ( totempg_log_printf = totem_config->totem_logging_configuration.log_printf; totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id; - fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); - if (fragmentation_data == 0) { - return (-1); + for (i = 0; i < TOTEMPG_NO_CONTEXTS; i++) { + con = &totempg_contexts_array[i]; + memset(con, 0, sizeof(*con)); + + con->fragmentation_data = malloc (TOTEMPG_PACKET_SIZE); + if (con->fragmentation_data == 0) { + return (-1); + } + con->totempg_reserved = 1; + con->next_fragment = 1; } totemsrp_net_mtu_adjust (totem_config); @@ -855,6 +885,7 @@ static int mcast_msg ( int copy_len = 0; int copy_base = 0; int total_size = 0; + struct totempg_context *con = totempg_active_context; pthread_mutex_lock (&mcast_msg_mutex); totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1); @@ -872,9 +903,9 @@ static int mcast_msg ( iov_len = dest; max_packet_size = TOTEMPG_PACKET_SIZE - - (sizeof (unsigned short) * (mcast_packed_msg_count + 1)); + (sizeof (unsigned short) * (con->mcast_packed_msg_count + 1)); - mcast_packed_msg_lens[mcast_packed_msg_count] = 0; + con->mcast_packed_msg_lens[con->mcast_packed_msg_count] = 0; /* * Check if we would overwrite new message queue @@ -884,7 +915,7 @@ static int mcast_msg ( } if (byte_count_send_ok (total_size + sizeof(unsigned short) * - (mcast_packed_msg_count)) == 0) { + (con->mcast_packed_msg_count)) == 0) { pthread_mutex_unlock (&mcast_msg_mutex); return(-1); @@ -893,7 +924,7 @@ static int mcast_msg ( mcast.header.version = 0; for (i = 0; i < iov_len; ) { mcast.fragmented = 0; - mcast.continuation = fragment_continuation; + mcast.continuation = con->fragment_continuation; copy_len = iovec[i].iov_len - copy_base; /* @@ -902,14 +933,14 @@ static int mcast_msg ( * fragment_buffer on exit so that max_packet_size + fragment_size * doesn't exceed the size of the fragment_buffer on the next call. */ - if ((copy_len + fragment_size) < + if ((copy_len + con->fragment_size) < (max_packet_size - sizeof (unsigned short))) { - memcpy (&fragmentation_data[fragment_size], + memcpy (&con->fragmentation_data[con->fragment_size], (char *)iovec[i].iov_base + copy_base, copy_len); - fragment_size += copy_len; - mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; - next_fragment = 1; + con->fragment_size += copy_len; + con->mcast_packed_msg_lens[con->mcast_packed_msg_count] += copy_len; + con->next_fragment = 1; copy_len = 0; copy_base = 0; i++; @@ -921,18 +952,18 @@ static int mcast_msg ( } else { unsigned char *data_ptr; - copy_len = min(copy_len, max_packet_size - fragment_size); + copy_len = min(copy_len, max_packet_size - con->fragment_size); if( copy_len == max_packet_size ) data_ptr = (unsigned char *)iovec[i].iov_base + copy_base; else { - data_ptr = fragmentation_data; - memcpy (&fragmentation_data[fragment_size], + data_ptr = con->fragmentation_data; + memcpy (&con->fragmentation_data[con->fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); } - memcpy (&fragmentation_data[fragment_size], + memcpy (&con->fragmentation_data[con->fragment_size], (unsigned char *)iovec[i].iov_base + copy_base, copy_len); - mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len; + con->mcast_packed_msg_lens[con->mcast_packed_msg_count] += copy_len; /* * if we're not on the last iovec or the iovec is too large to @@ -941,25 +972,25 @@ static int mcast_msg ( */ if ((i < (iov_len - 1)) || ((copy_base + copy_len) < iovec[i].iov_len)) { - if (!next_fragment) { - next_fragment++; + if (!con->next_fragment) { + con->next_fragment++; } - fragment_continuation = next_fragment; - mcast.fragmented = next_fragment++; - assert(fragment_continuation != 0); + con->fragment_continuation = con->next_fragment; + mcast.fragmented = con->next_fragment++; + assert(con->fragment_continuation != 0); assert(mcast.fragmented != 0); } else { - fragment_continuation = 0; + con->fragment_continuation = 0; } /* * assemble the message and send it */ - mcast.msg_count = ++mcast_packed_msg_count; + mcast.msg_count = ++con->mcast_packed_msg_count; iovecs[0].iov_base = (void *)&mcast; iovecs[0].iov_len = sizeof(struct totempg_mcast); - iovecs[1].iov_base = (void *)mcast_packed_msg_lens; - iovecs[1].iov_len = mcast_packed_msg_count * + iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens; + iovecs[1].iov_len = con->mcast_packed_msg_count * sizeof(unsigned short); iovecs[2].iov_base = (void *)data_ptr; iovecs[2].iov_len = max_packet_size; @@ -972,9 +1003,9 @@ static int mcast_msg ( /* * Recalculate counts and indexes for the next. */ - mcast_packed_msg_lens[0] = 0; - mcast_packed_msg_count = 0; - fragment_size = 0; + con->mcast_packed_msg_lens[0] = 0; + con->mcast_packed_msg_count = 0; + con->fragment_size = 0; max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short)); /* @@ -999,8 +1030,8 @@ static int mcast_msg ( * the last buffer just fit into the fragmentation_data buffer * and we were at the last iovec. */ - if (mcast_packed_msg_lens[mcast_packed_msg_count]) { - mcast_packed_msg_count++; + if (con->mcast_packed_msg_lens[con->mcast_packed_msg_count]) { + con->mcast_packed_msg_count++; } error_exit: @@ -1015,11 +1046,12 @@ static int msg_count_send_ok ( int msg_count) { int avail = 0; + struct totempg_context *con = totempg_active_context; avail = totemmrp_avail (); totempg_stats.msg_queue_avail = avail; - return ((avail - totempg_reserved) > msg_count); + return ((avail - con->totempg_reserved) > msg_count); } static int byte_count_send_ok ( @@ -1039,10 +1071,11 @@ static int send_reserve ( int msg_size) { unsigned int msg_count = 0; + struct totempg_context *con = totempg_active_context; msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1; - totempg_reserved += msg_count; - totempg_stats.msg_reserved = totempg_reserved; + con->totempg_reserved += msg_count; + totempg_stats.msg_reserved = con->totempg_reserved; return (msg_count); } @@ -1050,8 +1083,10 @@ static int send_reserve ( static void send_release ( int msg_count) { - totempg_reserved -= msg_count; - totempg_stats.msg_reserved = totempg_reserved; + struct totempg_context *con = totempg_active_context; + + con->totempg_reserved -= msg_count; + totempg_stats.msg_reserved = con->totempg_reserved; } int totempg_callback_token_create ( -- 1.7.1 _______________________________________________ discuss mailing list discuss@xxxxxxxxxxxx http://lists.corosync.org/mailman/listinfo/discuss