Tried to see the difference without the code move. I got this. A spell mistake was introduced: measurement -> measurment Also a patch that add a "RedChannel *channel" parameter to red_channel_remove_client was introduced but was removed as a change. If is too messy to revert I'm fine to keep the change. Didn't fine any other move problem. There are some minor space change: -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc) +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc) -static void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, - int *vec_size, int pos) +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos) - _ch->type, _ch->id, ## __VA_ARGS__); \ + _ch->type, _ch->id, ## __VA_ARGS__); \ --- before.c 2016-09-02 16:32:28.164100097 +0100 +++ after.c 2016-09-02 10:17:54.521922589 +0100 @@ -14,21 +14,23 @@ You should have received a copy of the GNU Lesser General Public License along with this library; if not, see <http://www.gnu.org/licenses/>. */ #ifdef HAVE_CONFIG_H #include <config.h> #endif #include <glib.h> #include <common/generated_server_marshallers.h> + #include "cursor-channel.h" +#include "red-channel-client.h" #include "cache-item.h" #define CLIENT_CURSOR_CACHE_SIZE 256 #define CURSOR_CACHE_HASH_SHIFT 8 #define CURSOR_CACHE_HASH_SIZE (1 << CURSOR_CACHE_HASH_SHIFT) #define CURSOR_CACHE_HASH_MASK (CURSOR_CACHE_HASH_SIZE - 1) #define CURSOR_CACHE_HASH_KEY(id) ((id) & CURSOR_CACHE_HASH_MASK) typedef struct CursorChannelClient CursorChannelClient; @@ -538,20 +540,21 @@ You should have received a copy of the GNU Lesser General Public License along with this library; if not, see <http://www.gnu.org/licenses/>. */ #ifndef DCC_PRIVATE_H_ #define DCC_PRIVATE_H_ #include "cache-item.h" #include "dcc.h" #include "image-encoders.h" #include "stream.h" +#include "red-channel-client.h" struct DisplayChannelClient { RedChannelClient base; int is_low_bandwidth; uint32_t id; SpiceImageCompression image_compression; spice_wan_compression_t jpeg_state; spice_wan_compression_t zlib_glz_state; ImageEncoders encoders; @@ -618,20 +621,21 @@ #include <common/marshaller.h> #include <common/messages.h> #include <common/generated_server_marshallers.h> #include "demarshallers.h" #include "spice.h" #include "red-common.h" #include "reds.h" #include "reds-stream.h" #include "red-channel.h" +#include "red-channel-client.h" #include "inputs-channel-client.h" #include "main-channel-client.h" #include "inputs-channel.h" #include "migration-protocol.h" #include "utils.h" // TODO: RECEIVE_BUF_SIZE used to be the same for inputs_channel and main_channel // since it was defined once in reds.c which contained both. // Now that they are split we can give a more fitting value for inputs - what // should it be? @@ -1278,20 +1282,21 @@ You should have received a copy of the GNU Lesser General Public License along with this library; if not, see <http://www.gnu.org/licenses/>. */ #ifdef HAVE_CONFIG_H #include <config.h> #endif #include "inputs-channel-client.h" #include "inputs-channel.h" #include "migration-protocol.h" +#include "red-channel-client.h" struct InputsChannelClient { RedChannelClient base; uint16_t motion_count; }; RedChannelClient* inputs_channel_client_create(RedChannel *channel, RedClient *client, RedsStream *stream, int monitor_latency, @@ -1366,20 +1371,21 @@ */ #ifdef HAVE_CONFIG_H #include <config.h> #endif #include <common/ring.h> #include "red-common.h" #include "main-channel.h" #include "reds.h" +#include "red-channel-client.h" int main_channel_is_connected(MainChannel *main_chan) { return red_channel_is_connected(&main_chan->base); } /* * When the main channel is disconnected, disconnect the entire client. */ static void main_channel_client_on_disconnect(RedChannelClient *rcc) @@ -1785,21 +1791,23 @@ */ #ifdef HAVE_CONFIG_H #include <config.h> #endif #include <inttypes.h> #include "main-channel-client.h" #include <common/generated_server_marshallers.h> +#include "main-channel-client.h" #include "main-channel.h" +#include "red-channel-client.h" #include "reds.h" #define NET_TEST_WARMUP_BYTES 0 #define NET_TEST_BYTES (1024 * 250) enum NetTestStage { NET_TEST_STAGE_INVALID, NET_TEST_STAGE_WARMUP, NET_TEST_STAGE_LATENCY, NET_TEST_STAGE_RATE, @@ -2796,20 +2804,22 @@ inputs-channel-client.h \ jpeg-encoder.c \ jpeg-encoder.h \ main-channel.c \ main-channel.h \ main-channel-client.c \ main-channel-client.h \ mjpeg-encoder.c \ red-channel.c \ red-channel.h \ + red-channel-client.c \ + red-channel-client.h \ red-common.h \ dispatcher.c \ dispatcher.h \ red-qxl.c \ red-qxl.h \ main-dispatcher.c \ main-dispatcher.h \ migration-protocol.h \ memslot.c \ memslot.h \ @@ -2919,36 +2929,26 @@ Author: yhalperi@xxxxxxxxxx */ #ifdef HAVE_CONFIG_H #include <config.h> #endif #include <common/ring.h> #include "red-channel.h" +#include "red-channel-client.h" #include "reds.h" #include "reds-stream.h" #include "main-dispatcher.h" #include "utils.h" -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout); -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc); -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc); - -static void red_channel_client_event(int fd, int event, void *data); -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); -static void red_client_remove_channel(RedChannelClient *rcc); -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); -static void red_channel_client_restore_main_sender(RedChannelClient *rcc); -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); - /* * Lifetime of RedChannel, RedChannelClient and RedClient: * RedChannel is created and destroyed by the calls to * red_channel_create.* and red_channel_destroy. The RedChannel resources * are deallocated only after red_channel_destroy is called and no RedChannelClient * refers to the channel. * RedChannelClient is created and destroyed by the calls to red_channel_client_create * and red_channel_client_destroy. RedChannelClient resources are deallocated only when * its refs == 0. The reference count of RedChannelClient can be increased by routines * that include calls that might destroy the red_channel_client. For example, @@ -2969,35 +2969,32 @@ * other references, they will not be completely released, until they are dereferenced. * * Note: red_channel_client_destroy is not thread safe, and still it is called from * red_client_destroy (from the client's thread). However, since before this call, * red_client_destroy calls rcc->channel->client_cbs.disconnect(rcc), which is synchronous, * we assume that if the channel is in another thread, it does no longer have references to * this channel client. * If a call to red_channel_client_destroy is made from another location, it must be called * from the channel's thread. */ -static void red_channel_ref(RedChannel *channel); -static void red_channel_unref(RedChannel *channel); - void red_channel_receive(RedChannel *channel) { g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL); } static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { red_channel_client_disconnect(rcc); } -static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) { spice_assert(rcc); channel->clients = g_list_prepend(channel->clients, rcc); } int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap) { GList *link, *next; RedChannelClient *rcc; @@ -3016,21 +3013,21 @@ FOREACH_CLIENT(channel, link, next, rcc) { if (!red_channel_client_test_remote_cap(rcc, cap)) { return FALSE; } } return TRUE; } /* returns TRUE If all channels are finished migrating, FALSE otherwise */ -static gboolean red_client_seamless_migration_done_for_channel(RedClient *client) +gboolean red_client_seamless_migration_done_for_channel(RedClient *client) { gboolean ret = FALSE; pthread_mutex_lock(&client->lock); client->num_migrated_channels--; /* we assume we always have at least one channel who has migration data transfer, * otherwise, this flag will never be set back to FALSE*/ if (!client->num_migrated_channels) { client->during_target_migrate = FALSE; client->seamless_migrate = FALSE; @@ -3269,26 +3266,26 @@ void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) { add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); } void red_channel_set_cap(RedChannel *channel, uint32_t cap) { add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); } -static void red_channel_ref(RedChannel *channel) +void red_channel_ref(RedChannel *channel) { channel->refs++; } -static void red_channel_unref(RedChannel *channel) +void red_channel_unref(RedChannel *channel) { if (--channel->refs == 0) { if (channel->local_caps.num_common_caps) { free(channel->local_caps.common_caps); } if (channel->local_caps.num_caps) { free(channel->local_caps.caps); } @@ -3348,45 +3345,47 @@ void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type) { g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type)); } int red_channel_is_connected(RedChannel *channel) { return channel && channel->clients; } -static void red_channel_remove_client(RedChannelClient *rcc) +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc) { GList *link; + g_return_if_fail(channel == red_channel_client_get_channel(rcc)); - if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) { + if (!pthread_equal(pthread_self(), channel->thread_id)) { spice_warning("channel type %d id %d - " "channel->thread_id (0x%lx) != pthread_self (0x%lx)." "If one of the threads is != io-thread && != vcpu-thread, " "this might be a BUG", - rcc->channel->type, rcc->channel->id, - rcc->channel->thread_id, pthread_self()); + channel->type, channel->id, + channel->thread_id, pthread_self()); } - spice_return_if_fail(rcc->channel); - link = g_list_find(rcc->channel->clients, rcc); + spice_return_if_fail(channel); + link = g_list_find(channel->clients, rcc); spice_return_if_fail(link != NULL); - rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link); + channel->clients = g_list_remove_link(channel->clients, link); // TODO: should we set rcc->channel to NULL??? } -static void red_client_remove_channel(RedChannelClient *rcc) +void red_client_remove_channel(RedChannelClient *rcc) { - pthread_mutex_lock(&rcc->client->lock); - rcc->client->channels = g_list_remove(rcc->client->channels, rcc); - pthread_mutex_unlock(&rcc->client->lock); + RedClient *client = red_channel_client_get_client(rcc); + pthread_mutex_lock(&client->lock); + client->channels = g_list_remove(client->channels, rcc); + pthread_mutex_unlock(&client->lock); } void red_channel_disconnect(RedChannel *channel) { g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL); } void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) { g_list_foreach(channel->clients, (GFunc)cb, NULL); @@ -3423,27 +3422,29 @@ if (red_channel_client_is_blocked(rcc)) { return TRUE; } } return FALSE; } int red_channel_get_first_socket(RedChannel *channel) { RedChannelClient *rcc; + RedsStream *stream; if (!channel || !channel->clients) { return -1; } rcc = g_list_nth_data(channel->clients, 0); + stream = red_channel_client_get_stream(rcc); - return rcc->stream->socket; + return stream->socket; } int red_channel_no_item_being_sent(RedChannel *channel) { GList *link, *next; RedChannelClient *rcc; FOREACH_CLIENT(channel, link, next, rcc) { if (!red_channel_client_no_item_being_sent(rcc)) { return FALSE; @@ -3550,50 +3551,49 @@ // is not synchronous. rcc = link->data; channel = red_channel_client_get_channel(rcc); red_channel_client_set_destroying(rcc); // some channels may be in other threads. However we currently // assume disconnect is synchronous (we changed the dispatcher // to wait for disconnection) // TODO: should we go back to async. For this we need to use // ref count for channel clients. channel->client_cbs.disconnect(rcc); - spice_assert(ring_is_empty(&rcc->pipe)); - spice_assert(rcc->pipe_size == 0); - spice_assert(rcc->send_data.size == 0); + spice_assert(red_channel_client_pipe_is_empty(rcc)); + spice_assert(red_channel_client_no_item_being_sent(rcc)); red_channel_client_destroy(rcc); link = next; } red_client_unref(client); } /* client->lock should be locked */ -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) { GList *link; RedChannelClient *rcc; RedChannelClient *ret = NULL; for (link = client->channels; link != NULL; link = link->next) { RedChannel *channel; rcc = link->data; channel = red_channel_client_get_channel(rcc); if (channel->type == type && channel->id == id) { ret = rcc; break; } } return ret; } /* client->lock should be locked */ -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) +void red_client_add_channel(RedClient *client, RedChannelClient *rcc) { spice_assert(rcc && client); client->channels = g_list_prepend(client->channels, rcc); if (client->during_target_migrate && client->seamless_migrate) { if (red_channel_client_set_migration_seamless(rcc)) client->num_migrated_channels++; } } MainChannelClient *red_client_get_main(RedClient *client) { @@ -3611,25 +3611,21 @@ pthread_mutex_lock(&client->lock); if (!client->during_target_migrate || client->seamless_migrate) { spice_error("unexpected"); pthread_mutex_unlock(&client->lock); return; } client->during_target_migrate = FALSE; link = client->channels; while (link) { next = link->next; - RedChannelClient *rcc = link->data; - - if (rcc->latency_monitor.timer) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } + red_channel_client_semi_seamless_migration_complete(link->data); link = next; } pthread_mutex_unlock(&client->lock); reds_on_client_semi_seamless_migrate_complete(client->reds, client); } /* should be called only from the main thread */ int red_client_during_migrate_at_target(RedClient *client) { int ret; @@ -3707,46 +3703,50 @@ red_channel_client_pipe_add_tail); } uint32_t red_channel_max_pipe_size(RedChannel *channel) { GList *link; RedChannelClient *rcc; uint32_t pipe_size = 0; for (link = channel->clients; link != NULL; link = link->next) { + uint32_t new_size; rcc = link->data; - pipe_size = MAX(pipe_size, rcc->pipe_size); + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MAX(pipe_size, new_size); } return pipe_size; } uint32_t red_channel_min_pipe_size(RedChannel *channel) { GList *link, *next; RedChannelClient *rcc; uint32_t pipe_size = ~0; FOREACH_CLIENT(channel, link, next, rcc) { - pipe_size = MIN(pipe_size, rcc->pipe_size); + uint32_t new_size; + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MIN(pipe_size, new_size); } return pipe_size == ~0 ? 0 : pipe_size; } uint32_t red_channel_sum_pipes_size(RedChannel *channel) { GList *link, *next; RedChannelClient *rcc; uint32_t sum = 0; FOREACH_CLIENT(channel, link, next, rcc) { - sum += rcc->pipe_size; + sum += red_channel_client_get_pipe_size(rcc); } return sum; } int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout) { uint64_t end_time; uint32_t max_pipe_size; int blocked = FALSE; @@ -3810,26 +3810,26 @@ #include <netinet/tcp.h> #include <fcntl.h> #include <unistd.h> #include <errno.h> #include <sys/ioctl.h> #ifdef HAVE_LINUX_SOCKIOS_H #include <linux/sockios.h> /* SIOCOUTQ */ #endif #include <common/generated_server_marshallers.h> +#include "red-channel-client.h" +#include "red-channel.h" #define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) #define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) -#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro - enum QosPingState { PING_STATE_NONE, PING_STATE_TIMER, PING_STATE_WARMUP, PING_STATE_LATENCY, }; enum ConnectivityState { CONNECTIVITY_STATE_CONNECTED, CONNECTIVITY_STATE_BLOCKED, @@ -3878,61 +3878,66 @@ passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; if (passed < PING_TEST_TIMEOUT_MS) { timeout += PING_TEST_TIMEOUT_MS - passed; } red_channel_client_start_ping_timer(rcc, timeout); } -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc) +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc) { return rcc->channel; } -static void red_channel_client_on_output(void *opaque, int n) +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc) +{ + return &rcc->incoming; +} + +void red_channel_client_on_output(void *opaque, int n) { RedChannelClient *rcc = opaque; if (rcc->connectivity_monitor.timer) { rcc->connectivity_monitor.out_bytes += n; } stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); } -static void red_channel_client_on_input(void *opaque, int n) +void red_channel_client_on_input(void *opaque, int n) { RedChannelClient *rcc = opaque; if (rcc->connectivity_monitor.timer) { rcc->connectivity_monitor.in_bytes += n; } } -static int red_channel_client_get_out_msg_size(void *opaque) +int red_channel_client_get_out_msg_size(void *opaque) { RedChannelClient *rcc = (RedChannelClient *)opaque; return rcc->send_data.size; } -static void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, - int *vec_size, int pos) +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos) { RedChannelClient *rcc = (RedChannelClient *)opaque; *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, vec, IOV_MAX, pos); } -static void red_channel_client_on_out_block(void *opaque) +void red_channel_client_on_out_block(void *opaque) { RedChannelClient *rcc = (RedChannelClient *)opaque; rcc->send_data.blocked = TRUE; rcc->channel->core->watch_update_mask(rcc->stream->watch, SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE); } static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) @@ -4006,21 +4011,21 @@ SpiceMsgPing ping; if (!rcc->latency_monitor.warmup_was_sent) { // latency test start int delay_val; socklen_t opt_size = sizeof(delay_val); rcc->latency_monitor.warmup_was_sent = TRUE; /* * When testing latency, TCP_NODELAY must be switched on, otherwise, * sending the ping message is delayed by Nagle algorithm, and the - * roundtrip measurement is less accurate (bigger). + * roundtrip measurment is less accurate (bigger). */ rcc->latency_monitor.tcp_nodelay = 1; if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, &opt_size) == -1) { spice_warning("getsockopt failed, %s", strerror(errno)); } else { rcc->latency_monitor.tcp_nodelay = delay_val; if (!delay_val) { delay_val = 1; if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, @@ -4086,21 +4091,21 @@ { spice_marshaller_reset(rcc->send_data.urgent.marshaller); rcc->send_data.marshaller = rcc->send_data.main.marshaller; rcc->send_data.header.data = rcc->send_data.main.header_data; if (!rcc->is_mini_header) { rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); } rcc->send_data.item = rcc->send_data.main.item; } -static void red_channel_client_on_out_msg_done(void *opaque) +void red_channel_client_on_out_msg_done(void *opaque) { RedChannelClient *rcc = (RedChannelClient *)opaque; int fd; rcc->send_data.size = 0; if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { perror("sendfd"); red_channel_client_disconnect(rcc); @@ -4527,35 +4532,42 @@ rcc->dummy_connected = TRUE; red_channel_add_client(channel, rcc); red_client_add_channel(client, rcc); pthread_mutex_unlock(&client->lock); return rcc; error: pthread_mutex_unlock(&client->lock); return NULL; } -static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) +void red_channel_client_seamless_migration_done(RedChannelClient *rcc) { rcc->wait_migrate_data = FALSE; if (red_client_seamless_migration_done_for_channel(rcc->client)) { if (rcc->latency_monitor.timer) { red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); } if (rcc->connectivity_monitor.timer) { rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, rcc->connectivity_monitor.timeout); } } } +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.timer) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } +} + int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) { return rcc->wait_migrate_data; } void red_channel_client_default_migrate(RedChannelClient *rcc) { if (rcc->latency_monitor.timer) { red_channel_client_cancel_ping_timer(rcc); rcc->channel->core->timer_remove(rcc->latency_monitor.timer); @@ -4568,45 +4580,47 @@ red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); } void red_channel_client_ref(RedChannelClient *rcc) { rcc->refs++; } void red_channel_client_unref(RedChannelClient *rcc) { - if (!--rcc->refs) { - spice_debug("destroy rcc=%p", rcc); + if (--rcc->refs != 0) { + return; + } - reds_stream_free(rcc->stream); - rcc->stream = NULL; + spice_debug("destroy rcc=%p", rcc); - if (rcc->send_data.main.marshaller) { - spice_marshaller_destroy(rcc->send_data.main.marshaller); - } + reds_stream_free(rcc->stream); + rcc->stream = NULL; - if (rcc->send_data.urgent.marshaller) { - spice_marshaller_destroy(rcc->send_data.urgent.marshaller); - } + if (rcc->send_data.main.marshaller) { + spice_marshaller_destroy(rcc->send_data.main.marshaller); + } - red_channel_client_destroy_remote_caps(rcc); - if (rcc->channel) { - red_channel_unref(rcc->channel); - } - free(rcc); + if (rcc->send_data.urgent.marshaller) { + spice_marshaller_destroy(rcc->send_data.urgent.marshaller); } + + red_channel_client_destroy_remote_caps(rcc); + if (rcc->channel) { + red_channel_unref(rcc->channel); + } + free(rcc); } void red_channel_client_destroy(RedChannelClient *rcc) { - rcc->destroying = 1; + rcc->destroying = TRUE; red_channel_client_disconnect(rcc); red_client_remove_channel(rcc); red_channel_client_unref(rcc); } void red_channel_client_shutdown(RedChannelClient *rcc) { if (rcc->stream && !rcc->stream->shutdown) { rcc->channel->core->watch_remove(rcc->stream->watch); rcc->stream->watch = NULL; @@ -4851,21 +4865,21 @@ } int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) { if (rcc->latency_monitor.roundtrip < 0) { return rcc->latency_monitor.roundtrip; } return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; } -static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) { rcc->ack_data.messages_window = 0; red_channel_client_push(rcc); } static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) { uint64_t now; /* ignoring unexpected pongs, or post-migration pongs for pings that @@ -4910,48 +4924,50 @@ spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); } rcc->latency_monitor.last_pong_time = now; rcc->latency_monitor.state = PING_STATE_NONE; red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); } static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) { - if (rcc->channel->channel_cbs.handle_migrate_flush_mark) { - rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc); + RedChannel *channel = red_channel_client_get_channel(rcc); + if (channel->channel_cbs.handle_migrate_flush_mark) { + channel->channel_cbs.handle_migrate_flush_mark(rcc); } } // TODO: the whole migration is broken with multiple clients. What do we want to do? // basically just // 1) source send mark to all // 2) source gets at various times the data (waits for all) // 3) source migrates to target // 4) target sends data to all // So need to make all the handlers work with per channel/client data (what data exactly?) static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) { + RedChannel *channel = red_channel_client_get_channel(rcc); spice_debug("channel type %d id %d rcc %p size %u", - rcc->channel->type, rcc->channel->id, rcc, size); - if (!rcc->channel->channel_cbs.handle_migrate_data) { + channel->type, channel->id, rcc, size); + if (!channel->channel_cbs.handle_migrate_data) { return; } if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { spice_channel_client_error(rcc, "unexpected"); return; } - if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) { + if (channel->channel_cbs.handle_migrate_data_get_serial) { red_channel_client_set_message_serial(rcc, - rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); + channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); } - if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) { + if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) { spice_channel_client_error(rcc, "handle_migrate_data failed"); return; } red_channel_client_seamless_migration_done(rcc); } int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message) { @@ -5087,22 +5103,22 @@ spice_assert(pos); client_pipe_add(rcc, item, &pos->link); } int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item) { return ring_item_is_linked(&item->link); } -static void red_channel_client_pipe_add_tail(RedChannelClient *rcc, - RedPipeItem *item) +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, + RedPipeItem *item) { client_pipe_add(rcc, item, rcc->pipe.prev); } void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) { if (client_pipe_add(rcc, item, rcc->pipe.prev)) { red_channel_client_push(rcc); } } @@ -5119,38 +5135,49 @@ void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) { RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); item->msg = msg_type; red_channel_client_pipe_add(rcc, &item->base); red_channel_client_push(rcc); } +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc) +{ + g_return_val_if_fail(rcc != NULL, TRUE); + return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe)); +} + +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc) +{ + return rcc->pipe_size; +} + int red_channel_client_is_connected(RedChannelClient *rcc) { if (!rcc->dummy) { return rcc->channel && (g_list_find(rcc->channel->clients, rcc) != NULL); } else { return rcc->dummy_connected; } } static void red_channel_client_clear_sent_item(RedChannelClient *rcc) { red_channel_client_release_sent_item(rcc); rcc->send_data.blocked = FALSE; rcc->send_data.size = 0; } -void red_channel_client_pipe_clear(RedChannelClient *rcc) +static void red_channel_client_pipe_clear(RedChannelClient *rcc) { RedPipeItem *item; if (rcc) { red_channel_client_clear_sent_item(rcc); } while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { ring_remove(&item->link); red_pipe_item_unref(item); } @@ -5167,91 +5194,91 @@ rcc->ack_data.client_window = client_window; } void red_channel_client_push_set_ack(RedChannelClient *rcc) { red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); } static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) { + RedChannel *channel = red_channel_client_get_channel(rcc); GList *link; spice_assert(rcc->dummy); - if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) { - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); - red_channel_remove_client(link->data); + if (channel && (link = g_list_find(channel->clients, rcc))) { + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); + red_channel_remove_client(channel, link->data); } rcc->dummy_connected = FALSE; } void red_channel_client_disconnect(RedChannelClient *rcc) { + RedChannel *channel = rcc->channel; + if (rcc->dummy) { red_channel_client_disconnect_dummy(rcc); return; } if (!red_channel_client_is_connected(rcc)) { return; } - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); red_channel_client_pipe_clear(rcc); if (rcc->stream->watch) { - rcc->channel->core->watch_remove(rcc->stream->watch); + channel->core->watch_remove(rcc->stream->watch); rcc->stream->watch = NULL; } if (rcc->latency_monitor.timer) { - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); + channel->core->timer_remove(rcc->latency_monitor.timer); rcc->latency_monitor.timer = NULL; } if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); + channel->core->timer_remove(rcc->connectivity_monitor.timer); rcc->connectivity_monitor.timer = NULL; } - red_channel_remove_client(rcc); - rcc->channel->channel_cbs.on_disconnect(rcc); + red_channel_remove_client(channel, rcc); + channel->channel_cbs.on_disconnect(rcc); } int red_channel_client_is_blocked(RedChannelClient *rcc) { return rcc && rcc->send_data.blocked; } int red_channel_client_send_message_pending(RedChannelClient *rcc) { return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; } -/* accessors for RedChannelClient */ SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) { return rcc->send_data.marshaller; } RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) { return rcc->stream; } RedClient *red_channel_client_get_client(RedChannelClient *rcc) { return rcc->client; } void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) { rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); } -/* end of accessors */ - static void marker_pipe_item_free(RedPipeItem *base) { MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base); if (item->item_in_pipe) { *item->item_in_pipe = FALSE; } free(item); } @@ -5338,34 +5365,34 @@ void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) { if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { red_channel_client_disconnect(rcc); } else { spice_assert(red_channel_client_no_item_being_sent(rcc)); } } -int red_channel_client_no_item_being_sent(RedChannelClient *rcc) +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc) { return !rcc || (rcc->send_data.size == 0); } void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item) { red_channel_client_pipe_remove(rcc, item); red_pipe_item_unref(item); } /* client mutex should be locked before this call */ -static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) { gboolean ret = FALSE; if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { rcc->wait_migrate_data = TRUE; ret = TRUE; } spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, rcc->wait_migrate_data); @@ -5400,29 +5427,35 @@ #ifndef _H_RED_CHANNEL_CLIENT #define _H_RED_CHANNEL_CLIENT #include <common/marshaller.h> #include "red-pipe-item.h" #include "reds-stream.h" #include "red-channel.h" +typedef struct RedChannel RedChannel; +typedef struct RedClient RedClient; +typedef struct IncomingHandler IncomingHandler; + +typedef struct RedChannelClient RedChannelClient; + /* * When an error occurs over a channel, we treat it as a warning * for spice-server and shutdown the channel. */ #define spice_channel_client_error(rcc, format, ...) \ do { \ RedChannel *_ch = red_channel_client_get_channel(rcc); \ spice_warning("rcc %p type %u id %u: " format, rcc, \ - _ch->type, _ch->id, ## __VA_ARGS__); \ + _ch->type, _ch->id, ## __VA_ARGS__); \ red_channel_client_shutdown(rcc); \ } while (0) RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, RedsStream *stream, int monitor_latency, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps); RedChannelClient *red_channel_client_create_dummy(int size, @@ -5474,60 +5507,86 @@ int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); /* Checks periodically if the connection is still alive */ void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item); void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); /* for types that use this routine -> the pipe item should be freed */ void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc); +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc); + void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); void red_channel_client_push_set_ack(RedChannelClient *rcc); -int red_channel_client_is_blocked(RedChannelClient *rcc); +gboolean red_channel_client_is_blocked(RedChannelClient *rcc); /* helper for channels that have complex logic that can possibly ready a send */ int red_channel_client_send_message_pending(RedChannelClient *rcc); -int red_channel_client_no_item_being_sent(RedChannelClient *rcc); +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc); void red_channel_client_push(RedChannelClient *rcc); // TODO: again - what is the context exactly? this happens in channel disconnect. but our // current red_channel_shutdown also closes the socket - is there a socket to close? // are we reading from an fd here? arghh -void red_channel_client_pipe_clear(RedChannelClient *rcc); void red_channel_client_receive(RedChannelClient *rcc); void red_channel_client_send(RedChannelClient *rcc); void red_channel_client_disconnect(RedChannelClient *rcc); -/* accessors for RedChannelClient */ /* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); RedClient *red_channel_client_get_client(RedChannelClient *rcc); /* Note that the header is valid only between red_channel_reset_send_data and * red_channel_begin_send_message.*/ void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); + +/* + * blocking functions. + * + * timeout is in nano sec. -1 for no timeout. + * + * Return: TRUE if waiting succeeded. FALSE if timeout expired. + */ + int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, RedPipeItem *item, int64_t timeout); int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, int64_t timeout); void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc); +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc); +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc); + +void red_channel_client_on_output(void *opaque, int n); +void red_channel_client_on_input(void *opaque, int n); +int red_channel_client_get_out_msg_size(void *opaque); +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos); +void red_channel_client_on_out_block(void *opaque); +void red_channel_client_on_out_msg_done(void *opaque); + +void red_channel_client_seamless_migration_done(RedChannelClient *rcc); +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc); +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc); + +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc); void red_channel_client_set_destroying(RedChannelClient *rcc); gboolean red_channel_client_is_destroying(RedChannelClient *rcc); #define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) typedef struct OutgoingHandler { OutgoingHandlerInterface *cb; void *opaque; struct iovec vec_buf[IOV_MAX]; int vec_size; @@ -5585,21 +5644,21 @@ OutgoingHandler outgoing; IncomingHandler incoming; int during_send; int id; // debugging purposes Ring pipe; uint32_t pipe_size; RedChannelCapabilities remote_caps; int is_mini_header; - int destroying; + gboolean destroying; int wait_migrate_data; int wait_migrate_flush_mark; RedChannelClientLatencyMonitor latency_monitor; RedChannelClientConnectivityMonitor connectivity_monitor; }; #endif /* _H_RED_CHANNEL_CLIENT */ /* @@ -5881,20 +5940,25 @@ * will become default eventually */ RedChannel *red_channel_create_parser(int size, RedsState *reds, const SpiceCoreInterfaceInternal *core, uint32_t type, uint32_t id, int handle_acks, spice_parse_channel_func_t parser, channel_handle_parsed_proc handle_parsed, const ChannelCbs *channel_cbs, uint32_t migration_flags); +void red_channel_ref(RedChannel *channel); +void red_channel_unref(RedChannel *channel); +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc); +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc); + void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat); void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data); // caps are freed when the channel is destroyed void red_channel_set_common_cap(RedChannel *channel, uint32_t cap); void red_channel_set_cap(RedChannel *channel, uint32_t cap); // TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but // do use the client callbacks. So the channel clients are not connected (the channel doesn't // have list of them, but they do have a link to the channel, and the client has a list of them) @@ -6009,47 +6073,54 @@ RedClient *red_client_ref(RedClient *client); /* * releases the client resources when refs == 0. * We assume the red_client_derstroy was called before * we reached refs==0 */ RedClient *red_client_unref(RedClient *client); +/* client->lock should be locked */ +void red_client_add_channel(RedClient *client, RedChannelClient *rcc); +void red_client_remove_channel(RedChannelClient *rcc); +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); + MainChannelClient *red_client_get_main(RedClient *client); // main should be set once before all the other channels are created void red_client_set_main(RedClient *client, MainChannelClient *mcc); /* called when the migration handshake results in seamless migration (dst side). * By default we assume semi-seamless */ void red_client_set_migration_seamless(RedClient *client); void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side */ /* TRUE if the migration is seamless and there are still channels that wait from migration data. * Or, during semi-seamless migration, and the main channel still waits for MIGRATE_END * from the client. * Note: Call it only from the main thread */ int red_client_during_migrate_at_target(RedClient *client); void red_client_migrate(RedClient *client); - +gboolean red_client_seamless_migration_done_for_channel(RedClient *client); /* * blocking functions. * * timeout is in nano sec. -1 for no timeout. * * Return: TRUE if waiting succeeded. FALSE if timeout expired. */ int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout); +#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro + #endif /* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* Copyright (C) 2009 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. @@ -6111,20 +6182,21 @@ #include "char-device.h" #include "migration-protocol.h" #ifdef USE_SMARTCARD #include "smartcard.h" #endif #include "reds-stream.h" #include "utils.h" #include "reds-private.h" #include "video-encoder.h" +#include "red-channel-client.h" static void reds_client_monitors_config(RedsState *reds, VDAgentMonitorsConfig *monitors_config); static gboolean reds_use_client_monitors_config(RedsState *reds); static SpiceCoreInterface *core_public = NULL; static SpiceTimer *adapter_timer_add(const SpiceCoreInterfaceInternal *iface, SpiceTimerFunc func, void *opaque) { return core_public->timer_add(func, opaque); } @@ -10592,20 +10664,21 @@ You should have received a copy of the GNU Lesser General Public License along with this library; if not, see <http://www.gnu.org/licenses/>. */ #ifndef _H_REDWORKER #define _H_REDWORKER #include "red-common.h" #include "red-qxl.h" #include "red-parse-qxl.h" +#include "red-channel-client.h" typedef struct RedWorker RedWorker; int common_channel_config_socket(RedChannelClient *rcc); #define COMMON_CLIENT_TIMEOUT (NSEC_PER_SEC * 30) #define CHANNEL_RECEIVE_BUF_SIZE 1024 typedef struct CommonGraphicsChannel { RedChannel base; // Must be the first thing @@ -10699,21 +10772,21 @@ #ifdef USE_SMARTCARD_012 #include <vscard_common.h> #else #ifdef USE_SMARTCARD #include <libcacard.h> #endif #endif #include "reds.h" #include "char-device.h" -#include "red-channel.h" +#include "red-channel-client.h" #include "smartcard.h" #include "migration-protocol.h" /* * TODO: the code doesn't really support multiple readers. * For example: smartcard_char_device_add_to_readers calls smartcard_init, * which can be called only once. * We should allow different readers, at least one reader per client. * In addition the implementation should be changed: instead of one channel for * all readers, we need to have different channles for different readers, @@ -11555,20 +11628,21 @@ #include <netinet/tcp.h> #include <common/marshaller.h> #include <common/generated_server_marshallers.h> #include "spice.h" #include "red-common.h" #include "main-channel.h" #include "reds.h" #include "red-qxl.h" +#include "red-channel-client.h" #include "sound.h" #include <common/snd_codec.h> #include "demarshallers.h" #ifndef IOV_MAX #define IOV_MAX 1024 #endif #define SND_RECEIVE_BUF_SIZE (16 * 1024 * 2) #define RECORD_SAMPLES_SIZE (SND_RECEIVE_BUF_SIZE >> 2) @@ -13196,20 +13270,21 @@ #include <assert.h> #include <errno.h> #include <string.h> #include <netinet/in.h> // IPPROTO_TCP #include <netinet/tcp.h> // TCP_NODELAY #include <common/generated_server_marshallers.h> #include "char-device.h" #include "red-channel.h" +#include "red-channel-client.h" #include "reds.h" #include "migration-protocol.h" #ifdef USE_LZ4 #include <lz4.h> #endif /* todo: add flow control. i.e., * (a) limit the tokens available for the client * (b) limit the tokens available for the server */ _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel