Reduce direct access to RedChannelClient, and get ready to convert to GObject. --- server/Makefile.am | 3 + server/common-graphics-channel-client-private.h | 2 +- server/common-graphics-channel-client.c | 1 + server/dcc.h | 4 +- server/display-channel.c | 3 +- server/inputs-channel-client.c | 1 + server/inputs-channel.c | 13 +- server/main-channel-client.c | 2 +- server/main-channel.c | 60 +- server/red-channel-client-private.h | 78 + server/red-channel-client.c | 1622 ++++++++++++++++++++ server/red-channel-client.h | 176 +++ server/red-channel.c | 1845 ++--------------------- server/red-channel.h | 186 +-- server/red-qxl.c | 24 +- server/red-worker.c | 3 +- server/reds.c | 22 +- server/smartcard.c | 2 +- server/sound.c | 8 +- server/spicevmc.c | 40 +- server/stream.c | 11 +- 21 files changed, 2155 insertions(+), 1951 deletions(-) create mode 100644 server/red-channel-client-private.h create mode 100644 server/red-channel-client.c create mode 100644 server/red-channel-client.h diff --git a/server/Makefile.am b/server/Makefile.am index ed6c875..1643b93 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -97,6 +97,9 @@ libserver_la_SOURCES = \ mjpeg-encoder.h \ red-channel.c \ red-channel.h \ + red-channel-client.c \ + red-channel-client.h \ + red-channel-client-private.h \ red-common.h \ dispatcher.c \ dispatcher.h \ diff --git a/server/common-graphics-channel-client-private.h b/server/common-graphics-channel-client-private.h index 8a9ef76..92442ff 100644 --- a/server/common-graphics-channel-client-private.h +++ b/server/common-graphics-channel-client-private.h @@ -18,7 +18,7 @@ #define COMMON_GRAPHICS_CHANNEL_CLIENT_PRIVATE_H #include "common-graphics-channel-client.h" -#include "red-channel.h" +#include "red-channel-client-private.h" struct CommonGraphicsChannelClient { RedChannelClient base; diff --git a/server/common-graphics-channel-client.c b/server/common-graphics-channel-client.c index 1de4fe2..f25d737 100644 --- a/server/common-graphics-channel-client.c +++ b/server/common-graphics-channel-client.c @@ -20,6 +20,7 @@ #include "common-graphics-channel-client-private.h" #include "dcc.h" +#include "red-channel-client.h" void common_graphics_channel_client_set_low_bandwidth(CommonGraphicsChannelClient *self, gboolean low_bandwidth) diff --git a/server/dcc.h b/server/dcc.h index a56c4e2..76ac401 100644 --- a/server/dcc.h +++ b/server/dcc.h @@ -60,8 +60,8 @@ typedef struct FreeList { typedef struct DisplayChannelClient DisplayChannelClient; -#define DCC_TO_WORKER(dcc) ((RedWorker*)((CommonGraphicsChannel*)((RedChannelClient*)dcc)->channel)->worker) -#define DCC_TO_DC(dcc) ((DisplayChannel*)((RedChannelClient*)dcc)->channel) +#define DCC_TO_WORKER(dcc) ((RedWorker*)((CommonGraphicsChannel*)(red_channel_client_get_channel((RedChannelClient*)dcc)))->worker) +#define DCC_TO_DC(dcc) ((DisplayChannel*)red_channel_client_get_channel((RedChannelClient*)dcc)) #define RCC_TO_DCC(rcc) ((DisplayChannelClient*)rcc) typedef struct RedSurfaceCreateItem { diff --git a/server/display-channel.c b/server/display-channel.c index cca99eb..e6112b4 100644 --- a/server/display-channel.c +++ b/server/display-channel.c @@ -1987,8 +1987,7 @@ static void release_item(RedChannelClient *rcc, RedPipeItem *item, int item_push static int handle_migrate_flush_mark(RedChannelClient *rcc) { - DisplayChannel *display_channel = SPICE_CONTAINEROF(rcc->channel, DisplayChannel, common.base); - RedChannel *channel = RED_CHANNEL(display_channel); + RedChannel *channel = red_channel_client_get_channel(rcc); red_channel_pipes_add_type(channel, RED_PIPE_ITEM_TYPE_MIGRATE_DATA); return TRUE; diff --git a/server/inputs-channel-client.c b/server/inputs-channel-client.c index adcc5c6..e02fbc9 100644 --- a/server/inputs-channel-client.c +++ b/server/inputs-channel-client.c @@ -21,6 +21,7 @@ #include "inputs-channel-client.h" #include "inputs-channel.h" #include "migration-protocol.h" +#include "red-channel-client-private.h" struct InputsChannelClient { RedChannelClient base; diff --git a/server/inputs-channel.c b/server/inputs-channel.c index 8f548c6..237905e 100644 --- a/server/inputs-channel.c +++ b/server/inputs-channel.c @@ -39,6 +39,7 @@ #include "reds.h" #include "reds-stream.h" #include "red-channel.h" +#include "red-channel-client.h" #include "inputs-channel-client.h" #include "main-channel-client.h" #include "inputs-channel.h" @@ -151,7 +152,7 @@ static uint8_t *inputs_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size) { - InputsChannel *inputs_channel = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base); + InputsChannel *inputs_channel = (InputsChannel*)red_channel_client_get_channel(rcc); if (size > RECEIVE_BUF_SIZE) { spice_printerr("error: too large incoming message"); @@ -275,7 +276,7 @@ static void inputs_channel_send_item(RedChannelClient *rcc, RedPipeItem *base) static int inputs_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message) { - InputsChannel *inputs_channel = (InputsChannel *)rcc->channel; + InputsChannel *inputs_channel = (InputsChannel *)red_channel_client_get_channel(rcc); InputsChannelClient *icc = (InputsChannelClient *)rcc; uint32_t i; RedsState *reds = red_channel_get_server(&inputs_channel->base); @@ -461,13 +462,13 @@ static void inputs_channel_on_disconnect(RedChannelClient *rcc) if (!rcc) { return; } - inputs_release_keys(SPICE_CONTAINEROF(rcc->channel, InputsChannel, base)); + inputs_release_keys((InputsChannel*)red_channel_client_get_channel(rcc)); } static void inputs_pipe_add_init(RedChannelClient *rcc) { RedInputsInitPipeItem *item = spice_malloc(sizeof(RedInputsInitPipeItem)); - InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base); + InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc); red_pipe_item_init(&item->base, RED_PIPE_ITEM_INPUTS_INIT); item->modifiers = kbd_get_leds(inputs_channel_get_keyboard(inputs)); @@ -518,7 +519,7 @@ static void inputs_connect(RedChannel *channel, RedClient *client, static void inputs_migrate(RedChannelClient *rcc) { - InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base); + InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc); inputs->src_during_migrate = TRUE; red_channel_client_default_migrate(rcc); } @@ -555,7 +556,7 @@ static int inputs_channel_handle_migrate_data(RedChannelClient *rcc, void *message) { InputsChannelClient *icc = (InputsChannelClient*)rcc; - InputsChannel *inputs = SPICE_CONTAINEROF(rcc->channel, InputsChannel, base); + InputsChannel *inputs = (InputsChannel*)red_channel_client_get_channel(rcc); SpiceMigrateDataHeader *header; SpiceMigrateDataInputs *mig_data; diff --git a/server/main-channel-client.c b/server/main-channel-client.c index bed0f55..077bcef 100644 --- a/server/main-channel-client.c +++ b/server/main-channel-client.c @@ -20,8 +20,8 @@ #include <inttypes.h> #include "main-channel-client.h" -#include "main-channel-client.h" #include "main-channel.h" +#include "red-channel-client-private.h" #include "reds.h" #define NET_TEST_WARMUP_BYTES 0 diff --git a/server/main-channel.c b/server/main-channel.c index 824c65c..fbbb032 100644 --- a/server/main-channel.c +++ b/server/main-channel.c @@ -66,9 +66,10 @@ int main_channel_is_connected(MainChannel *main_chan) */ static void main_channel_client_on_disconnect(RedChannelClient *rcc) { - RedsState *reds = red_channel_get_server(rcc->channel); + RedsState *reds = red_channel_get_server(red_channel_client_get_channel(rcc)); spice_printerr("rcc=%p", rcc); - main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds), rcc->client); + main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds), + red_channel_client_get_client(rcc)); } RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id) @@ -81,7 +82,7 @@ RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t c rcc = link->data; mcc = (MainChannelClient*) rcc; if (main_channel_client_get_connection_id(mcc) == connection_id) { - return rcc->client; + return red_channel_client_get_client(rcc); } } return NULL; @@ -116,12 +117,13 @@ static RedPipeItem *main_multi_media_time_item_new(RedChannelClient *rcc, static void main_channel_push_channels(MainChannelClient *mcc) { - if (red_client_during_migrate_at_target((main_channel_client_get_base(mcc))->client)) { + RedChannelClient *rcc = main_channel_client_get_base(mcc); + if (red_client_during_migrate_at_target(red_channel_client_get_client(rcc))) { spice_printerr("warning: ignoring unexpected SPICE_MSGC_MAIN_ATTACH_CHANNELS" "during migration"); return; } - red_channel_client_pipe_add_type(main_channel_client_get_base(mcc), RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST); + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MAIN_CHANNELS_LIST); } static void main_channel_marshall_channels(RedChannelClient *rcc, @@ -129,9 +131,10 @@ static void main_channel_marshall_channels(RedChannelClient *rcc, RedPipeItem *item) { SpiceMsgChannels* channels_info; + RedChannel *channel = red_channel_client_get_channel(rcc); red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_CHANNELS_LIST, item); - channels_info = reds_msg_channels_new(rcc->channel->reds); + channels_info = reds_msg_channels_new(channel->reds); spice_marshall_msg_main_channels_list(m, channels_info); free(channels_info); } @@ -246,18 +249,20 @@ static void main_channel_marshall_migrate_data_item(RedChannelClient *rcc, SpiceMarshaller *m, RedPipeItem *item) { + RedChannel *channel = red_channel_client_get_channel(rcc); red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item); - reds_marshall_migrate_data(rcc->channel->reds, m); // TODO: from reds split. ugly separation. + reds_marshall_migrate_data(channel->reds, m); // TODO: from reds split. ugly separation. } static int main_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) { + RedChannel *channel = red_channel_client_get_channel(rcc); MainChannelClient *mcc = (MainChannelClient*)rcc; SpiceMigrateDataHeader *header = (SpiceMigrateDataHeader *)message; /* not supported with multi-clients */ - spice_assert(rcc->channel->clients_num == 1); + spice_assert(channel->clients_num == 1); if (size < sizeof(SpiceMigrateDataHeader) + sizeof(SpiceMigrateDataMain)) { spice_printerr("bad message size %u", size); @@ -269,7 +274,7 @@ static int main_channel_handle_migrate_data(RedChannelClient *rcc, spice_error("bad header"); return FALSE; } - return reds_handle_migrate_data(rcc->channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size); + return reds_handle_migrate_data(channel->reds, mcc, (SpiceMigrateDataMain *)(header + 1), size); } static void main_channel_marshall_init(RedChannelClient *rcc, @@ -277,7 +282,7 @@ static void main_channel_marshall_init(RedChannelClient *rcc, RedInitPipeItem *item) { SpiceMsgMainInit init; // TODO - remove this copy, make RedInitPipeItem reuse SpiceMsgMainInit - + RedChannel *channel = red_channel_client_get_channel(rcc); red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_INIT, &item->base); init.session_id = item->connection_id; @@ -287,7 +292,7 @@ static void main_channel_marshall_init(RedChannelClient *rcc, if (item->is_client_mouse_allowed) { init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT; } - init.agent_connected = reds_has_vdagent(rcc->channel->reds); + init.agent_connected = reds_has_vdagent(channel->reds); init.agent_tokens = REDS_AGENT_WINDOW_SIZE; init.multi_media_time = item->multi_media_time; init.ram_hint = item->ram_hint; @@ -329,11 +334,12 @@ static void main_channel_fill_migrate_dst_info(MainChannel *main_channel, static void main_channel_marshall_migrate_begin(SpiceMarshaller *m, RedChannelClient *rcc, RedPipeItem *item) { + RedChannel *channel = red_channel_client_get_channel(rcc); SpiceMsgMainMigrationBegin migrate; MainChannel *main_ch; red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN, item); - main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); + main_ch = SPICE_CONTAINEROF(channel, MainChannel, base); main_channel_fill_migrate_dst_info(main_ch, &migrate.dst_info); spice_marshall_msg_main_migrate_begin(m, &migrate); } @@ -342,11 +348,12 @@ static void main_channel_marshall_migrate_begin_seamless(SpiceMarshaller *m, RedChannelClient *rcc, RedPipeItem *item) { + RedChannel *channel = red_channel_client_get_channel(rcc); SpiceMsgMainMigrateBeginSeamless migrate_seamless; MainChannel *main_ch; red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_BEGIN_SEAMLESS, item); - main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); + main_ch = SPICE_CONTAINEROF(channel, MainChannel, base); main_channel_fill_migrate_dst_info(main_ch, &migrate_seamless.dst_info); migrate_seamless.src_mig_version = SPICE_MIGRATION_PROTOCOL_VERSION; spice_marshall_msg_main_migrate_begin_seamless(m, &migrate_seamless); @@ -386,12 +393,13 @@ void main_channel_migrate_switch(MainChannel *main_chan, RedsMigSpice *mig_targe static void main_channel_marshall_migrate_switch(SpiceMarshaller *m, RedChannelClient *rcc, RedPipeItem *item) { + RedChannel *channel = red_channel_client_get_channel(rcc); SpiceMsgMainMigrationSwitchHost migrate; MainChannel *main_ch; spice_printerr(""); red_channel_client_init_send_data(rcc, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST, item); - main_ch = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); + main_ch = SPICE_CONTAINEROF(channel, MainChannel, base); migrate.port = main_ch->mig_target.port; migrate.sport = main_ch->mig_target.sport; migrate.host_size = strlen(main_ch->mig_target.host) + 1; @@ -431,7 +439,7 @@ static void main_channel_send_item(RedChannelClient *rcc, RedPipeItem *base) base->type != RED_PIPE_ITEM_TYPE_MAIN_INIT) { spice_printerr("Init msg for client %p was not sent yet " "(client is probably during semi-seamless migration). Ignoring msg type %d", - rcc->client, base->type); + red_channel_client_get_client(rcc), base->type); main_channel_release_pipe_item(rcc, base, FALSE); return; } @@ -527,7 +535,8 @@ static void main_channel_release_pipe_item(RedChannelClient *rcc, static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message) { - MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); + RedChannel *channel = red_channel_client_get_channel(rcc); + MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base); MainChannelClient *mcc = (MainChannelClient*)rcc; switch (type) { @@ -539,18 +548,18 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint return FALSE; } tokens = (SpiceMsgcMainAgentStart *)message; - reds_on_main_agent_start(rcc->channel->reds, mcc, tokens->num_tokens); + reds_on_main_agent_start(channel->reds, mcc, tokens->num_tokens); break; } case SPICE_MSGC_MAIN_AGENT_DATA: { - reds_on_main_agent_data(rcc->channel->reds, mcc, message, size); + reds_on_main_agent_data(channel->reds, mcc, message, size); break; } case SPICE_MSGC_MAIN_AGENT_TOKEN: { SpiceMsgcMainAgentTokens *tokens; tokens = (SpiceMsgcMainAgentTokens *)message; - reds_on_main_agent_tokens(rcc->channel->reds, mcc, tokens->num_tokens); + reds_on_main_agent_tokens(channel->reds, mcc, tokens->num_tokens); break; } case SPICE_MSGC_MAIN_ATTACH_CHANNELS: @@ -574,7 +583,7 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint ((SpiceMsgcMainMigrateDstDoSeamless *)message)->src_version); break; case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST: - reds_on_main_mouse_mode_request(rcc->channel->reds, message, size); + reds_on_main_mouse_mode_request(channel->reds, message, size); break; case SPICE_MSGC_PONG: { main_channel_client_handle_pong(mcc, (SpiceMsgPing *)message, size); @@ -595,11 +604,12 @@ static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size) { - MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); + RedChannel *channel = red_channel_client_get_channel(rcc); + MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base); MainChannelClient *mcc = (MainChannelClient*)rcc; if (type == SPICE_MSGC_MAIN_AGENT_DATA) { - return reds_get_agent_data_buffer(rcc->channel->reds, mcc, size); + return reds_get_agent_data_buffer(channel->reds, mcc, size); } else { return main_chan->recv_buf; } @@ -610,8 +620,9 @@ static void main_channel_release_msg_rcv_buf(RedChannelClient *rcc, uint32_t size, uint8_t *msg) { + RedChannel *channel = red_channel_client_get_channel(rcc); if (type == SPICE_MSGC_MAIN_AGENT_DATA) { - reds_release_agent_data_buffer(rcc->channel->reds, msg); + reds_release_agent_data_buffer(channel->reds, msg); } } @@ -626,8 +637,9 @@ static void main_channel_hold_pipe_item(RedChannelClient *rcc, RedPipeItem *item static int main_channel_handle_migrate_flush_mark(RedChannelClient *rcc) { + RedChannel *channel = red_channel_client_get_channel(rcc); spice_debug(NULL); - main_channel_push_migrate_data_item(SPICE_CONTAINEROF(rcc->channel, + main_channel_push_migrate_data_item(SPICE_CONTAINEROF(channel, MainChannel, base)); return TRUE; } diff --git a/server/red-channel-client-private.h b/server/red-channel-client-private.h new file mode 100644 index 0000000..4ad4d90 --- /dev/null +++ b/server/red-channel-client-private.h @@ -0,0 +1,78 @@ +/* + Copyright (C) 2009-2015 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_RED_CHANNEL_CLIENT_PRIVATE +#define _H_RED_CHANNEL_CLIENT_PRIVATE + +#include "red-channel-client.h" +#include "red-channel.h" + +struct RedChannelClient { + RedChannel *channel; + RedClient *client; + RedsStream *stream; + int dummy; + int dummy_connected; + + uint32_t refs; + + struct { + uint32_t generation; + uint32_t client_generation; + uint32_t messages_window; + uint32_t client_window; + } ack_data; + + struct { + SpiceMarshaller *marshaller; + SpiceDataHeaderOpaque header; + uint32_t size; + RedPipeItem *item; + int blocked; + uint64_t serial; + uint64_t last_sent_serial; + + struct { + SpiceMarshaller *marshaller; + uint8_t *header_data; + RedPipeItem *item; + } main; + + struct { + SpiceMarshaller *marshaller; + } urgent; + } send_data; + + OutgoingHandler outgoing; + IncomingHandler incoming; + int during_send; + int id; // debugging purposes + Ring pipe; + uint32_t pipe_size; + + RedChannelCapabilities remote_caps; + int is_mini_header; + gboolean destroying; + + int wait_migrate_data; + int wait_migrate_flush_mark; + + RedChannelClientLatencyMonitor latency_monitor; + RedChannelClientConnectivityMonitor connectivity_monitor; +}; + +#endif /* _H_RED_CHANNEL_CLIENT_PRIVATE */ diff --git a/server/red-channel-client.c b/server/red-channel-client.c new file mode 100644 index 0000000..02450df --- /dev/null +++ b/server/red-channel-client.c @@ -0,0 +1,1622 @@ +/* + Copyright (C) 2009-2015 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <glib.h> +#include <stdio.h> +#include <stdint.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include <sys/ioctl.h> +#ifdef HAVE_LINUX_SOCKIOS_H +#include <linux/sockios.h> /* SIOCOUTQ */ +#endif + +#include "common/generated_server_marshallers.h" +#include "red-channel-client-private.h" +#include "red-channel.h" + +#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) +#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) + +enum QosPingState { + PING_STATE_NONE, + PING_STATE_TIMER, + PING_STATE_WARMUP, + PING_STATE_LATENCY, +}; + +enum ConnectivityState { + CONNECTIVITY_STATE_CONNECTED, + CONNECTIVITY_STATE_BLOCKED, + CONNECTIVITY_STATE_WAIT_PONG, + CONNECTIVITY_STATE_DISCONNECTED, +}; + +typedef struct RedEmptyMsgPipeItem { + RedPipeItem base; + int msg; +} RedEmptyMsgPipeItem; + +static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) +{ + if (!rcc->latency_monitor.timer) { + return; + } + if (rcc->latency_monitor.state != PING_STATE_NONE) { + return; + } + rcc->latency_monitor.state = PING_STATE_TIMER; + rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); +} + +static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) +{ + if (!rcc->latency_monitor.timer) { + return; + } + if (rcc->latency_monitor.state != PING_STATE_TIMER) { + return; + } + + rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); + rcc->latency_monitor.state = PING_STATE_NONE; +} + +static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) +{ + uint64_t passed, timeout; + + passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; + timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; + if (passed < PING_TEST_TIMEOUT_MS) { + timeout += PING_TEST_TIMEOUT_MS - passed; + } + + red_channel_client_start_ping_timer(rcc, timeout); +} + +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc) +{ + return rcc->channel; +} + +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc) +{ + return &rcc->incoming; +} + +void red_channel_client_on_output(void *opaque, int n) +{ + RedChannelClient *rcc = opaque; + + if (rcc->connectivity_monitor.timer) { + rcc->connectivity_monitor.out_bytes += n; + } + stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); +} + +void red_channel_client_on_input(void *opaque, int n) +{ + RedChannelClient *rcc = opaque; + + if (rcc->connectivity_monitor.timer) { + rcc->connectivity_monitor.in_bytes += n; + } +} + +int red_channel_client_get_out_msg_size(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + return rcc->send_data.size; +} + +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, + vec, IOV_MAX, pos); +} + +void red_channel_client_on_out_block(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + rcc->send_data.blocked = TRUE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ | + SPICE_WATCH_EVENT_WRITE); +} + +static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) +{ + return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); +} + +static void red_channel_client_reset_send_data(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.marshaller); + rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, + rcc->send_data.header.header_size); + spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); + + /* Keeping the serial consecutive: resetting it if reset_send_data + * has been called before, but no message has been sent since then. + */ + if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { + spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); + /* When the urgent marshaller is active, the serial was incremented by + * the call to reset_send_data that was made for the main marshaller. + * The urgent msg receives this serial, and the main msg serial is + * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) + * should be 1 in this case*/ + if (!red_channel_client_urgent_marshaller_is_active(rcc)) { + rcc->send_data.serial = rcc->send_data.last_sent_serial; + } + } + rcc->send_data.serial++; + + if (!rcc->is_mini_header) { + spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } +} + +static void red_channel_client_send_set_ack(RedChannelClient *rcc) +{ + SpiceMsgSetAck ack; + + spice_assert(rcc); + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); + ack.generation = ++rcc->ack_data.generation; + ack.window = rcc->ack_data.client_window; + rcc->ack_data.messages_window = 0; + + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); + + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_migrate(RedChannelClient *rcc) +{ + SpiceMsgMigrate migrate; + + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); + migrate.flags = rcc->channel->migration_flags; + spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { + rcc->wait_migrate_flush_mark = TRUE; + } + + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_ping(RedChannelClient *rcc) +{ + SpiceMsgPing ping; + + if (!rcc->latency_monitor.warmup_was_sent) { // latency test start + int delay_val; + socklen_t opt_size = sizeof(delay_val); + + rcc->latency_monitor.warmup_was_sent = TRUE; + /* + * When testing latency, TCP_NODELAY must be switched on, otherwise, + * sending the ping message is delayed by Nagle algorithm, and the + * roundtrip measurment is less accurate (bigger). + */ + rcc->latency_monitor.tcp_nodelay = 1; + if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + &opt_size) == -1) { + spice_warning("getsockopt failed, %s", strerror(errno)); + } else { + rcc->latency_monitor.tcp_nodelay = delay_val; + if (!delay_val) { + delay_val = 1; + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + if (errno != ENOTSUP) { + spice_warning("setsockopt failed, %s", strerror(errno)); + } + } + } + } + } + + red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); + ping.id = rcc->latency_monitor.id; + ping.timestamp = spice_get_monotonic_time_ns(); + spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) +{ + RedEmptyMsgPipeItem *msg_pipe_item = SPICE_CONTAINEROF(base, RedEmptyMsgPipeItem, base); + + red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + red_channel_client_reset_send_data(rcc); + switch (item->type) { + case RED_PIPE_ITEM_TYPE_SET_ACK: + red_channel_client_send_set_ack(rcc); + break; + case RED_PIPE_ITEM_TYPE_MIGRATE: + red_channel_client_send_migrate(rcc); + break; + case RED_PIPE_ITEM_TYPE_EMPTY_MSG: + red_channel_client_send_empty_msg(rcc, item); + break; + case RED_PIPE_ITEM_TYPE_PING: + red_channel_client_send_ping(rcc); + break; + default: + rcc->channel->channel_cbs.send_item(rcc, item); + return; + } + free(item); +} + +static void red_channel_client_release_item(RedChannelClient *rcc, + RedPipeItem *item, + int item_pushed) +{ + switch (item->type) { + case RED_PIPE_ITEM_TYPE_SET_ACK: + case RED_PIPE_ITEM_TYPE_EMPTY_MSG: + case RED_PIPE_ITEM_TYPE_MIGRATE: + case RED_PIPE_ITEM_TYPE_PING: + free(item); + break; + default: + rcc->channel->channel_cbs.release_item(rcc, item, item_pushed); + } +} + +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) +{ + if (rcc->send_data.item) { + red_channel_client_release_item(rcc, + rcc->send_data.item, TRUE); + rcc->send_data.item = NULL; + } +} + +static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.urgent.marshaller); + rcc->send_data.marshaller = rcc->send_data.main.marshaller; + rcc->send_data.header.data = rcc->send_data.main.header_data; + if (!rcc->is_mini_header) { + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } + rcc->send_data.item = rcc->send_data.main.item; +} + +void red_channel_client_on_out_msg_done(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + int fd; + + rcc->send_data.size = 0; + + if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { + if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { + perror("sendfd"); + red_channel_client_disconnect(rcc); + if (fd != -1) + close(fd); + return; + } + if (fd != -1) + close(fd); + } + + red_channel_client_release_sent_item(rcc); + if (rcc->send_data.blocked) { + rcc->send_data.blocked = FALSE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ); + } + + if (red_channel_client_urgent_marshaller_is_active(rcc)) { + red_channel_client_restore_main_sender(rcc); + spice_assert(rcc->send_data.header.data != NULL); + red_channel_client_begin_send_message(rcc); + } else { + if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { + /* It is possible that the socket will become idle, so we may be able to test latency */ + red_channel_client_restart_ping_timer(rcc); + } + } + +} + +static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) +{ + rcc->pipe_size--; + ring_remove(&item->link); +} + +static void red_channel_client_set_remote_caps(RedChannelClient* rcc, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + rcc->remote_caps.num_common_caps = num_common_caps; + rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); + + rcc->remote_caps.num_caps = num_caps; + rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); +} + +static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) +{ + rcc->remote_caps.num_common_caps = 0; + free(rcc->remote_caps.common_caps); + rcc->remote_caps.num_caps = 0; + free(rcc->remote_caps.caps); +} + +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) +{ + return test_capability(rcc->remote_caps.common_caps, + rcc->remote_caps.num_common_caps, + cap); +} + +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) +{ + return test_capability(rcc->remote_caps.caps, + rcc->remote_caps.num_caps, + cap); +} + +static void red_channel_client_push_ping(RedChannelClient *rcc) +{ + spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); + rcc->latency_monitor.state = PING_STATE_WARMUP; + rcc->latency_monitor.warmup_was_sent = FALSE; + rcc->latency_monitor.id = rand(); + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); +} + +static void red_channel_client_ping_timer(void *opaque) +{ + RedChannelClient *rcc = opaque; + + spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); + red_channel_client_cancel_ping_timer(rcc); + +#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ + { + int so_unsent_size = 0; + + /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ + if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { + spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); + } + if (so_unsent_size > 0) { + /* tcp snd buffer is still occupied. rescheduling ping */ + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } else { + red_channel_client_push_ping(rcc); + } + } +#else /* ifdef HAVE_LINUX_SOCKIOS_H */ + /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ + red_channel_client_push_ping(rcc); +#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ +} + +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) +{ + return (rcc->channel->handle_acks && + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); +} + +/* + * When a connection is not alive (and we can't detect it via a socket error), we + * reach one of these 2 states: + * (1) Sending msgs is blocked: either writes return EAGAIN + * or we are missing MSGC_ACK from the client. + * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. + * + * The connectivity_timer callback tests if the channel's state matches one of the above. + * In case it does, on the next time the timer is called, it checks if the connection has + * been idle during the time that passed since the previous timer call. If the connection + * has been idle, we consider the client as disconnected. + */ +static void red_channel_client_connectivity_timer(void *opaque) +{ + RedChannelClient *rcc = opaque; + RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; + int is_alive = TRUE; + + if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { + if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { + if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { + spice_error("mismatch between rcc-state and connectivity-state"); + } + spice_debug("rcc is blocked; connection is idle"); + is_alive = FALSE; + } + } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { + if (monitor->in_bytes == 0) { + if (rcc->latency_monitor.state != PING_STATE_WARMUP && + rcc->latency_monitor.state != PING_STATE_LATENCY) { + spice_error("mismatch between rcc-state and connectivity-state"); + } + spice_debug("rcc waits for pong; connection is idle"); + is_alive = FALSE; + } + } + + if (is_alive) { + monitor->in_bytes = 0; + monitor->out_bytes = 0; + if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { + monitor->state = CONNECTIVITY_STATE_BLOCKED; + } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || + rcc->latency_monitor.state == PING_STATE_LATENCY) { + monitor->state = CONNECTIVITY_STATE_WAIT_PONG; + } else { + monitor->state = CONNECTIVITY_STATE_CONNECTED; + } + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } else { + monitor->state = CONNECTIVITY_STATE_DISCONNECTED; + spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", + rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); + red_channel_client_disconnect(rcc); + } +} + +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) +{ + if (!red_channel_client_is_connected(rcc)) { + return; + } + spice_debug(NULL); + spice_assert(timeout_ms > 0); + /* + * If latency_monitor is not active, we activate it in order to enable + * periodic ping messages so that we will be be able to identify a disconnected + * channel-client even if there are no ongoing channel specific messages + * on this channel. + */ + if (rcc->latency_monitor.timer == NULL) { + rcc->latency_monitor.timer = rcc->channel->core->timer_add( + rcc->channel->core, red_channel_client_ping_timer, rcc); + if (!red_client_during_migrate_at_target(rcc->client)) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + rcc->latency_monitor.roundtrip = -1; + } + if (rcc->connectivity_monitor.timer == NULL) { + rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; + rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( + rcc->channel->core, red_channel_client_connectivity_timer, rcc); + rcc->connectivity_monitor.timeout = timeout_ms; + if (!red_client_during_migrate_at_target(rcc->client)) { + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } + } +} + +static void red_channel_client_event(int fd, int event, void *data) +{ + RedChannelClient *rcc = (RedChannelClient *)data; + + red_channel_client_ref(rcc); + if (event & SPICE_WATCH_EVENT_READ) { + red_channel_client_receive(rcc); + } + if (event & SPICE_WATCH_EVENT_WRITE) { + red_channel_client_push(rcc); + } + red_channel_client_unref(rcc); +} + +static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); +} + +static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); +} + +static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); +} + +static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); +} + +static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); +} + +static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); +} + +static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); +} + +static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); +} + +static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); +} + +static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + spice_error("attempt to set header serial on mini header"); +} + +static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); +} + +static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + spice_error("attempt to set header sub list on mini header"); +} + +static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), + full_header_set_msg_type, + full_header_set_msg_size, + full_header_set_msg_serial, + full_header_set_msg_sub_list, + full_header_get_msg_type, + full_header_get_msg_size}; + +static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), + mini_header_set_msg_type, + mini_header_set_msg_size, + mini_header_set_msg_serial, + mini_header_set_msg_sub_list, + mini_header_get_msg_type, + mini_header_get_msg_size}; + +static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) +{ + if (red_client_get_channel(client, channel->type, channel->id)) { + spice_printerr("Error client %p: duplicate channel type %d id %d", + client, channel->type, channel->id); + return FALSE; + } + return TRUE; +} + +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, + RedsStream *stream, + int monitor_latency, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + RedChannelClient *rcc = NULL; + + pthread_mutex_lock(&client->lock); + if (!red_channel_client_pre_create_validate(channel, client)) { + goto error; + } + spice_assert(stream && channel && size >= sizeof(RedChannelClient)); + rcc = spice_malloc0(size); + rcc->stream = stream; + rcc->channel = channel; + rcc->client = client; + rcc->refs = 1; + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + rcc->ack_data.client_generation = ~0; + rcc->ack_data.client_window = CLIENT_ACK_WINDOW; + rcc->send_data.main.marshaller = spice_marshaller_new(); + rcc->send_data.urgent.marshaller = spice_marshaller_new(); + + rcc->send_data.marshaller = rcc->send_data.main.marshaller; + + rcc->incoming.opaque = rcc; + rcc->incoming.cb = &channel->incoming_cb; + + rcc->outgoing.opaque = rcc; + rcc->outgoing.cb = &channel->outgoing_cb; + rcc->outgoing.pos = 0; + rcc->outgoing.size = 0; + + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + + if (!channel->channel_cbs.config_socket(rcc)) { + goto error; + } + + ring_init(&rcc->pipe); + rcc->pipe_size = 0; + + stream->watch = channel->core->watch_add(channel->core, + stream->socket, + SPICE_WATCH_EVENT_READ, + red_channel_client_event, rcc); + rcc->id = g_list_length(channel->clients); + red_channel_add_client(channel, rcc); + red_client_add_channel(client, rcc); + red_channel_ref(channel); + pthread_mutex_unlock(&client->lock); + + if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { + rcc->latency_monitor.timer = channel->core->timer_add( + channel->core, red_channel_client_ping_timer, rcc); + if (!client->during_target_migrate) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + rcc->latency_monitor.roundtrip = -1; + } + + return rcc; +error: + free(rcc); + reds_stream_free(stream); + pthread_mutex_unlock(&client->lock); + return NULL; +} + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + RedChannelClient *rcc = NULL; + + spice_assert(size >= sizeof(RedChannelClient)); + + pthread_mutex_lock(&client->lock); + if (!red_channel_client_pre_create_validate(channel, client)) { + goto error; + } + rcc = spice_malloc0(size); + rcc->refs = 1; + rcc->client = client; + rcc->channel = channel; + red_channel_ref(channel); + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + ring_init(&rcc->pipe); + + rcc->dummy = TRUE; + rcc->dummy_connected = TRUE; + red_channel_add_client(channel, rcc); + red_client_add_channel(client, rcc); + pthread_mutex_unlock(&client->lock); + return rcc; +error: + pthread_mutex_unlock(&client->lock); + return NULL; +} + +void red_channel_client_seamless_migration_done(RedChannelClient *rcc) +{ + rcc->wait_migrate_data = FALSE; + + if (red_client_seamless_migration_done_for_channel(rcc->client, rcc)) { + if (rcc->latency_monitor.timer) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + if (rcc->connectivity_monitor.timer) { + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } + } +} + +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.timer) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } +} + +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) +{ + return rcc->wait_migrate_data; +} + +void red_channel_client_default_migrate(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.timer) { + red_channel_client_cancel_ping_timer(rcc); + rcc->channel->core->timer_remove(rcc->latency_monitor.timer); + rcc->latency_monitor.timer = NULL; + } + if (rcc->connectivity_monitor.timer) { + rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); + rcc->connectivity_monitor.timer = NULL; + } + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); +} + +void red_channel_client_ref(RedChannelClient *rcc) +{ + rcc->refs++; +} + +void red_channel_client_unref(RedChannelClient *rcc) +{ + if (--rcc->refs != 0) { + return; + } + + spice_debug("destroy rcc=%p", rcc); + + reds_stream_free(rcc->stream); + rcc->stream = NULL; + + if (rcc->send_data.main.marshaller) { + spice_marshaller_destroy(rcc->send_data.main.marshaller); + } + + if (rcc->send_data.urgent.marshaller) { + spice_marshaller_destroy(rcc->send_data.urgent.marshaller); + } + + red_channel_client_destroy_remote_caps(rcc); + if (rcc->channel) { + red_channel_unref(rcc->channel); + } + free(rcc); +} + +void red_channel_client_destroy(RedChannelClient *rcc) +{ + rcc->destroying = TRUE; + red_channel_client_disconnect(rcc); + red_client_remove_channel(rcc); + red_channel_client_unref(rcc); +} + +void red_channel_client_shutdown(RedChannelClient *rcc) +{ + if (rcc->stream && !rcc->stream->shutdown) { + rcc->channel->core->watch_remove(rcc->stream->watch); + rcc->stream->watch = NULL; + shutdown(rcc->stream->socket, SHUT_RDWR); + rcc->stream->shutdown = TRUE; + } +} + +static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) +{ + ssize_t n; + + if (!stream) { + return; + } + + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->cb->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; + } + } + + for (;;) { + handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); + n = reds_stream_writev(stream, handler->vec, handler->vec_size); + if (n == -1) { + switch (errno) { + case EAGAIN: + handler->cb->on_block(handler->opaque); + return; + case EINTR: + continue; + case EPIPE: + handler->cb->on_error(handler->opaque); + return; + default: + spice_printerr("%s", strerror(errno)); + handler->cb->on_error(handler->opaque); + return; + } + } else { + handler->pos += n; + handler->cb->on_output(handler->opaque, n); + if (handler->pos == handler->size) { // finished writing data + /* reset handler before calling on_msg_done, since it + * can trigger another call to red_peer_handle_outgoing (when + * switching from the urgent marshaller to the main one */ + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + handler->cb->on_msg_done(handler->opaque); + return; + } + } + } +} + +/* return the number of bytes read. -1 in case of error */ +static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) +{ + uint8_t *pos = buf; + while (size) { + int now; + if (stream->shutdown) { + return -1; + } + now = reds_stream_read(stream, pos, size); + if (now <= 0) { + if (now == 0) { + return -1; + } + spice_assert(now == -1); + if (errno == EAGAIN) { + break; + } else if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return -1; + } else { + spice_printerr("%s", strerror(errno)); + return -1; + } + } else { + size -= now; + pos += now; + } + } + return pos - buf; +} + +// TODO: this implementation, as opposed to the old implementation in red_worker, +// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer +// arithmetic for the case where a single cb_read could return multiple messages. But +// this is suboptimal potentially. Profile and consider fixing. +static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) +{ + int bytes_read; + uint8_t *parsed; + size_t parsed_size; + message_destructor_t parsed_free; + uint16_t msg_type; + uint32_t msg_size; + + /* XXX: This needs further investigation as to the underlying cause, it happened + * after spicec disconnect (but not with spice-gtk) repeatedly. */ + if (!stream) { + return; + } + + for (;;) { + int ret_handle; + if (handler->header_pos < handler->header.header_size) { + bytes_read = red_peer_receive(stream, + handler->header.data + handler->header_pos, + handler->header.header_size - handler->header_pos); + if (bytes_read == -1) { + handler->cb->on_error(handler->opaque); + return; + } + handler->cb->on_input(handler->opaque, bytes_read); + handler->header_pos += bytes_read; + + if (handler->header_pos != handler->header.header_size) { + return; + } + } + + msg_size = handler->header.get_msg_size(&handler->header); + msg_type = handler->header.get_msg_type(&handler->header); + if (handler->msg_pos < msg_size) { + if (!handler->msg) { + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); + if (handler->msg == NULL) { + spice_printerr("ERROR: channel refused to allocate buffer."); + handler->cb->on_error(handler->opaque); + return; + } + } + + bytes_read = red_peer_receive(stream, + handler->msg + handler->msg_pos, + msg_size - handler->msg_pos); + if (bytes_read == -1) { + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->cb->on_error(handler->opaque); + return; + } + handler->cb->on_input(handler->opaque, bytes_read); + handler->msg_pos += bytes_read; + if (handler->msg_pos != msg_size) { + return; + } + } + + if (handler->cb->parser) { + parsed = handler->cb->parser(handler->msg, + handler->msg + msg_size, msg_type, + SPICE_VERSION_MINOR, &parsed_size, &parsed_free); + if (parsed == NULL) { + spice_printerr("failed to parse message type %d", msg_type); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->cb->on_error(handler->opaque); + return; + } + ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, + msg_type, parsed); + parsed_free(parsed); + } else { + ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, + handler->msg); + } + handler->msg_pos = 0; + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->msg = NULL; + handler->header_pos = 0; + + if (!ret_handle) { + handler->cb->on_error(handler->opaque); + return; + } + } +} + +void red_channel_client_receive(RedChannelClient *rcc) +{ + red_channel_client_ref(rcc); + red_peer_handle_incoming(rcc->stream, &rcc->incoming); + red_channel_client_unref(rcc); +} + +void red_channel_client_send(RedChannelClient *rcc) +{ + red_channel_client_ref(rcc); + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); + red_channel_client_unref(rcc); +} + +static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) +{ + RedPipeItem *item; + + if (!rcc || rcc->send_data.blocked + || red_channel_client_waiting_for_ack(rcc) + || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { + return NULL; + } + red_channel_client_pipe_remove(rcc, item); + return item; +} + +void red_channel_client_push(RedChannelClient *rcc) +{ + RedPipeItem *pipe_item; + + if (!rcc->during_send) { + rcc->during_send = TRUE; + } else { + return; + } + red_channel_client_ref(rcc); + if (rcc->send_data.blocked) { + red_channel_client_send(rcc); + } + + if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { + rcc->send_data.blocked = TRUE; + spice_printerr("ERROR: an item waiting to be sent and not blocked"); + } + + while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { + red_channel_client_send_item(rcc, pipe_item); + } + if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) + && rcc->stream->watch) { + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ); + } + rcc->during_send = FALSE; + red_channel_client_unref(rcc); +} + +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.roundtrip < 0) { + return rcc->latency_monitor.roundtrip; + } + return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; +} + +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; + red_channel_client_push(rcc); +} + +static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) +{ + uint64_t now; + + /* ignoring unexpected pongs, or post-migration pongs for pings that + * started just before migration */ + if (ping->id != rcc->latency_monitor.id) { + spice_warning("ping-id (%u)!= pong-id %u", + rcc->latency_monitor.id, ping->id); + return; + } + + now = spice_get_monotonic_time_ns(); + + if (rcc->latency_monitor.state == PING_STATE_WARMUP) { + rcc->latency_monitor.state = PING_STATE_LATENCY; + return; + } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { + spice_warning("unexpected"); + return; + } + + /* set TCP_NODELAY=0, in case we reverted it for the test*/ + if (!rcc->latency_monitor.tcp_nodelay) { + int delay_val = 0; + + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + if (errno != ENOTSUP) { + spice_warning("setsockopt failed, %s", strerror(errno)); + } + } + } + + /* + * The real network latency shouldn't change during the connection. However, + * the measurements can be bigger than the real roundtrip due to other + * threads or processes that are utilizing the network. We update the roundtrip + * measurement with the minimal value we encountered till now. + */ + if (rcc->latency_monitor.roundtrip < 0 || + now - ping->timestamp < rcc->latency_monitor.roundtrip) { + rcc->latency_monitor.roundtrip = now - ping->timestamp; + spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); + } + + rcc->latency_monitor.last_pong_time = now; + rcc->latency_monitor.state = PING_STATE_NONE; + red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); +} + +static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + if (channel->channel_cbs.handle_migrate_flush_mark) { + channel->channel_cbs.handle_migrate_flush_mark(rcc); + } +} + +// TODO: the whole migration is broken with multiple clients. What do we want to do? +// basically just +// 1) source send mark to all +// 2) source gets at various times the data (waits for all) +// 3) source migrates to target +// 4) target sends data to all +// So need to make all the handlers work with per channel/client data (what data exactly?) +static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + spice_debug("channel type %d id %d rcc %p size %u", + channel->type, channel->id, rcc, size); + if (!channel->channel_cbs.handle_migrate_data) { + return; + } + if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { + spice_channel_client_error(rcc, "unexpected"); + return; + } + if (channel->channel_cbs.handle_migrate_data_get_serial) { + red_channel_client_set_message_serial(rcc, + channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); + } + if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) { + spice_channel_client_error(rcc, "handle_migrate_data failed"); + return; + } + red_channel_client_seamless_migration_done(rcc); +} + + +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, + uint16_t type, void *message) +{ + switch (type) { + case SPICE_MSGC_ACK_SYNC: + if (size != sizeof(uint32_t)) { + spice_printerr("bad message size"); + return FALSE; + } + rcc->ack_data.client_generation = *(uint32_t *)(message); + break; + case SPICE_MSGC_ACK: + if (rcc->ack_data.client_generation == rcc->ack_data.generation) { + rcc->ack_data.messages_window -= rcc->ack_data.client_window; + red_channel_client_push(rcc); + } + break; + case SPICE_MSGC_DISCONNECTING: + break; + case SPICE_MSGC_MIGRATE_FLUSH_MARK: + if (!rcc->wait_migrate_flush_mark) { + spice_error("unexpected flush mark"); + return FALSE; + } + red_channel_handle_migrate_flush_mark(rcc); + rcc->wait_migrate_flush_mark = FALSE; + break; + case SPICE_MSGC_MIGRATE_DATA: + red_channel_handle_migrate_data(rcc, size, message); + break; + case SPICE_MSGC_PONG: + red_channel_client_handle_pong(rcc, message); + break; + default: + spice_printerr("invalid message type %u", type); + return FALSE; + } + return TRUE; +} + +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + spice_assert(msg_type != 0); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); + rcc->send_data.item = item; + if (item) { + rcc->channel->channel_cbs.hold_item(rcc, item); + } +} + +void red_channel_client_begin_send_message(RedChannelClient *rcc) +{ + SpiceMarshaller *m = rcc->send_data.marshaller; + + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) + if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { + spice_printerr("BUG: header->type == 0"); + return; + } + + /* canceling the latency test timer till the nework is idle */ + red_channel_client_cancel_ping_timer(rcc); + + spice_marshaller_flush(m); + rcc->send_data.size = spice_marshaller_get_total_size(m); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, + rcc->send_data.size - rcc->send_data.header.header_size); + rcc->ack_data.messages_window++; + rcc->send_data.last_sent_serial = rcc->send_data.serial; + rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ + red_channel_client_send(rcc); +} + +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + spice_assert(rcc->send_data.header.data != NULL); + rcc->send_data.main.header_data = rcc->send_data.header.data; + rcc->send_data.main.item = rcc->send_data.item; + + rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; + rcc->send_data.item = NULL; + red_channel_client_reset_send_data(rcc); + return rcc->send_data.marshaller; +} + +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) +{ + return rcc->send_data.serial; +} + +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) +{ + rcc->send_data.last_sent_serial = serial; + rcc->send_data.serial = serial; +} + +static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) +{ + spice_assert(rcc && item); + if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { + spice_debug("rcc is disconnected %p", rcc); + red_channel_client_release_item(rcc, item, FALSE); + return FALSE; + } + if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ | + SPICE_WATCH_EVENT_WRITE); + } + rcc->pipe_size++; + ring_add(pos, &item->link); + return TRUE; +} + +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) +{ + + client_pipe_add(rcc, item, &rcc->pipe); +} + +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) +{ + red_channel_client_pipe_add(rcc, item); + red_channel_client_push(rcc); +} + +void red_channel_client_pipe_add_after(RedChannelClient *rcc, + RedPipeItem *item, + RedPipeItem *pos) +{ + spice_assert(pos); + client_pipe_add(rcc, item, &pos->link); +} + +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, + RedPipeItem *item) +{ + return ring_item_is_linked(&item->link); +} + +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, + RedPipeItem *item) +{ + client_pipe_add(rcc, item, rcc->pipe.prev); +} + +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) +{ + if (client_pipe_add(rcc, item, rcc->pipe.prev)) { + red_channel_client_push(rcc); + } +} + +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) +{ + RedPipeItem *item = spice_new(RedPipeItem, 1); + + red_pipe_item_init(item, pipe_item_type); + red_channel_client_pipe_add(rcc, item); + red_channel_client_push(rcc); +} + +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) +{ + RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); + + red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); + item->msg = msg_type; + red_channel_client_pipe_add(rcc, &item->base); + red_channel_client_push(rcc); +} + +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc) +{ + g_return_val_if_fail(rcc != NULL, TRUE); + return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe)); +} + +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc) +{ + return rcc->pipe_size; +} + +int red_channel_client_is_connected(RedChannelClient *rcc) +{ + if (!rcc->dummy) { + return rcc->channel + && (g_list_find(rcc->channel->clients, rcc) != NULL); + } else { + return rcc->dummy_connected; + } +} + +static void red_channel_client_clear_sent_item(RedChannelClient *rcc) +{ + if (rcc->send_data.item) { + red_channel_client_release_item(rcc, rcc->send_data.item, TRUE); + rcc->send_data.item = NULL; + } + rcc->send_data.blocked = FALSE; + rcc->send_data.size = 0; +} + +void red_channel_client_pipe_clear(RedChannelClient *rcc) +{ + RedPipeItem *item; + + if (rcc) { + red_channel_client_clear_sent_item(rcc); + } + while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { + ring_remove(&item->link); + red_channel_client_release_item(rcc, item, FALSE); + } + rcc->pipe_size = 0; +} + +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; +} + +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) +{ + rcc->ack_data.client_window = client_window; +} + +void red_channel_client_push_set_ack(RedChannelClient *rcc) +{ + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); +} + +static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + GList *link; + spice_assert(rcc->dummy); + if (channel && (link = g_list_find(channel->clients, rcc))) { + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); + red_channel_remove_client(channel, link->data); + } + rcc->dummy_connected = FALSE; +} + +void red_channel_client_disconnect(RedChannelClient *rcc) +{ + RedChannel *channel = rcc->channel; + + if (rcc->dummy) { + red_channel_client_disconnect_dummy(rcc); + return; + } + if (!red_channel_client_is_connected(rcc)) { + return; + } + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); + red_channel_client_pipe_clear(rcc); + if (rcc->stream->watch) { + channel->core->watch_remove(rcc->stream->watch); + rcc->stream->watch = NULL; + } + if (rcc->latency_monitor.timer) { + channel->core->timer_remove(rcc->latency_monitor.timer); + rcc->latency_monitor.timer = NULL; + } + if (rcc->connectivity_monitor.timer) { + channel->core->timer_remove(rcc->connectivity_monitor.timer); + rcc->connectivity_monitor.timer = NULL; + } + red_channel_remove_client(channel, rcc); + channel->channel_cbs.on_disconnect(rcc); +} + +int red_channel_client_is_blocked(RedChannelClient *rcc) +{ + return rcc && rcc->send_data.blocked; +} + +int red_channel_client_send_message_pending(RedChannelClient *rcc) +{ + return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; +} + +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) +{ + return rcc->send_data.marshaller; +} + +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) +{ + return rcc->stream; +} + +RedClient *red_channel_client_get_client(RedChannelClient *rcc) +{ + return rcc->client; +} + +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) +{ + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); +} + +/* TODO: more evil sync stuff. anything with the word wait in it's name. */ +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, + RedPipeItem *item, + int64_t timeout) +{ + uint64_t end_time; + int item_in_pipe; + + spice_info(NULL); + + if (timeout != -1) { + end_time = spice_get_monotonic_time_ns() + timeout; + } else { + end_time = UINT64_MAX; + } + + rcc->channel->channel_cbs.hold_item(rcc, item); + + if (red_channel_client_is_blocked(rcc)) { + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + } + red_channel_client_push(rcc); + + while((item_in_pipe = ring_item_is_linked(&item->link)) && + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + red_channel_client_push(rcc); + } + + red_channel_client_release_item(rcc, item, TRUE); + if (item_in_pipe) { + spice_warning("timeout"); + return FALSE; + } else { + return red_channel_client_wait_outgoing_item(rcc, + timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); + } +} + +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, + int64_t timeout) +{ + uint64_t end_time; + int blocked; + + if (!red_channel_client_is_blocked(rcc)) { + return TRUE; + } + if (timeout != -1) { + end_time = spice_get_monotonic_time_ns() + timeout; + } else { + end_time = UINT64_MAX; + } + spice_info("blocked"); + + do { + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + } while ((blocked = red_channel_client_is_blocked(rcc)) && + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); + + if (blocked) { + spice_warning("timeout"); + return FALSE; + } else { + spice_assert(red_channel_client_no_item_being_sent(rcc)); + return TRUE; + } +} + +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) +{ + if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { + red_channel_client_disconnect(rcc); + } else { + spice_assert(red_channel_client_no_item_being_sent(rcc)); + } +} + +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc) +{ + return !rcc || (rcc->send_data.size == 0); +} + +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, + RedPipeItem *item) +{ + red_channel_client_pipe_remove(rcc, item); + red_channel_client_release_item(rcc, item, FALSE); +} + +/* client mutex should be locked before this call */ +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) +{ + gboolean ret = FALSE; + + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { + rcc->wait_migrate_data = TRUE; + ret = TRUE; + } + spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, + rcc->wait_migrate_data); + + return ret; +} + +void red_channel_client_set_destroying(RedChannelClient *rcc) +{ + rcc->destroying = TRUE; +} + +gboolean red_channel_client_is_destroying(RedChannelClient *rcc) +{ + return rcc->destroying; +} diff --git a/server/red-channel-client.h b/server/red-channel-client.h new file mode 100644 index 0000000..cf70abd --- /dev/null +++ b/server/red-channel-client.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2009-2015 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_RED_CHANNEL_CLIENT +#define _H_RED_CHANNEL_CLIENT + +#include "common/marshaller.h" +#include "red-pipe-item.h" +#include "reds-stream.h" + +typedef struct RedChannel RedChannel; +typedef struct RedClient RedClient; +typedef struct IncomingHandler IncomingHandler; + +typedef struct RedChannelClient RedChannelClient; + +/* + * When an error occurs over a channel, we treat it as a warning + * for spice-server and shutdown the channel. + */ +#define spice_channel_client_error(rcc, format, ...) \ + do { \ + RedChannel *_ch = red_channel_client_get_channel(rcc); \ + spice_warning("rcc %p type %u id %u: " format, rcc, \ + _ch->type, _ch->id, ## __VA_ARGS__); \ + red_channel_client_shutdown(rcc); \ + } while (0) + +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, + RedClient *client, RedsStream *stream, + int monitor_latency, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps); + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps); + +void red_channel_client_ref(RedChannelClient *rcc); +void red_channel_client_unref(RedChannelClient *rcc); + +int red_channel_client_is_connected(RedChannelClient *rcc); +void red_channel_client_default_migrate(RedChannelClient *rcc); +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); +void red_channel_client_destroy(RedChannelClient *rcc); +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); +/* shutdown is the only safe thing to do out of the client/channel + * thread. It will not touch the rings, just shutdown the socket. + * It should be followed by some way to gurantee a disconnection. */ +void red_channel_client_shutdown(RedChannelClient *rcc); +/* handles general channel msgs from the client */ +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, + uint16_t type, void *message); +/* when preparing send_data: should call init and then use marshaller */ +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); + +uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); +void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); + +/* When sending a msg. Should first call red_channel_client_begin_send_message. + * It will first send the pending urgent data, if there is any, and then + * the rest of the data. + */ +void red_channel_client_begin_send_message(RedChannelClient *rcc); + +/* + * Stores the current send data, and switches to urgent send data. + * When it begins the actual send, it will send first the urgent data + * and afterward the rest of the data. + * Should be called only if during the marshalling of on message, + * the need to send another message, before, rises. + * Important: the serial of the non-urgent sent data, will be succeeded. + * return: the urgent send data marshaller + */ +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); + +/* returns -1 if we don't have an estimation */ +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); + +/* Checks periodically if the connection is still alive */ +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); + +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); +/* for types that use this routine -> the pipe item should be freed */ +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc); +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc); + +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); +void red_channel_client_push_set_ack(RedChannelClient *rcc); + +gboolean red_channel_client_is_blocked(RedChannelClient *rcc); + +/* helper for channels that have complex logic that can possibly ready a send */ +int red_channel_client_send_message_pending(RedChannelClient *rcc); + +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc); +void red_channel_client_push(RedChannelClient *rcc); +// TODO: again - what is the context exactly? this happens in channel disconnect. but our +// current red_channel_shutdown also closes the socket - is there a socket to close? +// are we reading from an fd here? arghh +void red_channel_client_receive(RedChannelClient *rcc); +void red_channel_client_send(RedChannelClient *rcc); +void red_channel_client_disconnect(RedChannelClient *rcc); + +/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); +RedClient *red_channel_client_get_client(RedChannelClient *rcc); + +/* Note that the header is valid only between red_channel_reset_send_data and + * red_channel_begin_send_message.*/ +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); + +/* + * blocking functions. + * + * timeout is in nano sec. -1 for no timeout. + * + * Return: TRUE if waiting succeeded. FALSE if timeout expired. + */ + +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, + RedPipeItem *item, + int64_t timeout); +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, + int64_t timeout); +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); + +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc); +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc); + +void red_channel_client_on_output(void *opaque, int n); +void red_channel_client_on_input(void *opaque, int n); +int red_channel_client_get_out_msg_size(void *opaque); +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos); +void red_channel_client_on_out_block(void *opaque); +void red_channel_client_on_out_msg_done(void *opaque); + +void red_channel_client_seamless_migration_done(RedChannelClient *rcc); +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc); +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc); + +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc); +void red_channel_client_set_destroying(RedChannelClient *rcc); +gboolean red_channel_client_is_destroying(RedChannelClient *rcc); + +#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) + +#endif /* _H_RED_CHANNEL_CLIENT */ diff --git a/server/red-channel.c b/server/red-channel.c index c1b1e91..c714d90 100644 --- a/server/red-channel.c +++ b/server/red-channel.c @@ -22,20 +22,6 @@ #include <config.h> #endif -#include <glib.h> -#include <stdio.h> -#include <stdint.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <fcntl.h> -#include <unistd.h> -#include <errno.h> -#include <sys/ioctl.h> -#ifdef HAVE_LINUX_SOCKIOS_H -#include <linux/sockios.h> /* SIOCOUTQ */ -#endif - -#include "common/generated_server_marshallers.h" #include "common/ring.h" #include "red-channel.h" @@ -44,41 +30,6 @@ #include "main-dispatcher.h" #include "utils.h" -typedef struct RedEmptyMsgPipeItem { - RedPipeItem base; - int msg; -} RedEmptyMsgPipeItem; - -#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) -#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) - -#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro - -enum QosPingState { - PING_STATE_NONE, - PING_STATE_TIMER, - PING_STATE_WARMUP, - PING_STATE_LATENCY, -}; - -enum ConnectivityState { - CONNECTIVITY_STATE_CONNECTED, - CONNECTIVITY_STATE_BLOCKED, - CONNECTIVITY_STATE_WAIT_PONG, - CONNECTIVITY_STATE_DISCONNECTED, -}; - -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout); -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc); -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc); - -static void red_channel_client_event(int fd, int event, void *data); -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); -static void red_client_remove_channel(RedChannelClient *rcc); -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); -static void red_channel_client_restore_main_sender(RedChannelClient *rcc); -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); - /* * Lifetime of RedChannel, RedChannelClient and RedClient: * RedChannel is created and destroyed by the calls to @@ -113,574 +64,23 @@ static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); * If a call to red_channel_client_destroy is made from another location, it must be called * from the channel's thread. */ -static void red_channel_ref(RedChannel *channel); -static void red_channel_unref(RedChannel *channel); - -static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) -{ - return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); -} - -static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) -{ - return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); -} - -static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) -{ - return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); -} - -static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) -{ - return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); -} - -static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) -{ - ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); -} - -static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) -{ - ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); -} - -static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) -{ - ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); -} - -static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) -{ - ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); -} - -static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) -{ - ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); -} - -static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) -{ - spice_error("attempt to set header serial on mini header"); -} - -static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) -{ - ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); -} - -static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) -{ - spice_error("attempt to set header sub list on mini header"); -} - -static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), - full_header_set_msg_type, - full_header_set_msg_size, - full_header_set_msg_serial, - full_header_set_msg_sub_list, - full_header_get_msg_type, - full_header_get_msg_size}; - -static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), - mini_header_set_msg_type, - mini_header_set_msg_size, - mini_header_set_msg_serial, - mini_header_set_msg_sub_list, - mini_header_get_msg_type, - mini_header_get_msg_size}; - -/* return the number of bytes read. -1 in case of error */ -static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) -{ - uint8_t *pos = buf; - while (size) { - int now; - if (stream->shutdown) { - return -1; - } - now = reds_stream_read(stream, pos, size); - if (now <= 0) { - if (now == 0) { - return -1; - } - spice_assert(now == -1); - if (errno == EAGAIN) { - break; - } else if (errno == EINTR) { - continue; - } else if (errno == EPIPE) { - return -1; - } else { - spice_printerr("%s", strerror(errno)); - return -1; - } - } else { - size -= now; - pos += now; - } - } - return pos - buf; -} - -// TODO: this implementation, as opposed to the old implementation in red_worker, -// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer -// arithmetic for the case where a single cb_read could return multiple messages. But -// this is suboptimal potentially. Profile and consider fixing. -static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) -{ - int bytes_read; - uint8_t *parsed; - size_t parsed_size; - message_destructor_t parsed_free; - uint16_t msg_type; - uint32_t msg_size; - - /* XXX: This needs further investigation as to the underlying cause, it happened - * after spicec disconnect (but not with spice-gtk) repeatedly. */ - if (!stream) { - return; - } - - for (;;) { - int ret_handle; - if (handler->header_pos < handler->header.header_size) { - bytes_read = red_peer_receive(stream, - handler->header.data + handler->header_pos, - handler->header.header_size - handler->header_pos); - if (bytes_read == -1) { - handler->cb->on_error(handler->opaque); - return; - } - handler->cb->on_input(handler->opaque, bytes_read); - handler->header_pos += bytes_read; - - if (handler->header_pos != handler->header.header_size) { - return; - } - } - - msg_size = handler->header.get_msg_size(&handler->header); - msg_type = handler->header.get_msg_type(&handler->header); - if (handler->msg_pos < msg_size) { - if (!handler->msg) { - handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); - if (handler->msg == NULL) { - spice_printerr("ERROR: channel refused to allocate buffer."); - handler->cb->on_error(handler->opaque); - return; - } - } - - bytes_read = red_peer_receive(stream, - handler->msg + handler->msg_pos, - msg_size - handler->msg_pos); - if (bytes_read == -1) { - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->cb->on_error(handler->opaque); - return; - } - handler->cb->on_input(handler->opaque, bytes_read); - handler->msg_pos += bytes_read; - if (handler->msg_pos != msg_size) { - return; - } - } - - if (handler->cb->parser) { - parsed = handler->cb->parser(handler->msg, - handler->msg + msg_size, msg_type, - SPICE_VERSION_MINOR, &parsed_size, &parsed_free); - if (parsed == NULL) { - spice_printerr("failed to parse message type %d", msg_type); - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->cb->on_error(handler->opaque); - return; - } - ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, - msg_type, parsed); - parsed_free(parsed); - } else { - ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, - handler->msg); - } - handler->msg_pos = 0; - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->msg = NULL; - handler->header_pos = 0; - - if (!ret_handle) { - handler->cb->on_error(handler->opaque); - return; - } - } -} - -void red_channel_client_receive(RedChannelClient *rcc) -{ - red_channel_client_ref(rcc); - red_peer_handle_incoming(rcc->stream, &rcc->incoming); - red_channel_client_unref(rcc); -} void red_channel_receive(RedChannel *channel) { g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL); } -static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) -{ - ssize_t n; - - if (!stream) { - return; - } - - if (handler->size == 0) { - handler->vec = handler->vec_buf; - handler->size = handler->cb->get_msg_size(handler->opaque); - if (!handler->size) { // nothing to be sent - return; - } - } - - for (;;) { - handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); - n = reds_stream_writev(stream, handler->vec, handler->vec_size); - if (n == -1) { - switch (errno) { - case EAGAIN: - handler->cb->on_block(handler->opaque); - return; - case EINTR: - continue; - case EPIPE: - handler->cb->on_error(handler->opaque); - return; - default: - spice_printerr("%s", strerror(errno)); - handler->cb->on_error(handler->opaque); - return; - } - } else { - handler->pos += n; - handler->cb->on_output(handler->opaque, n); - if (handler->pos == handler->size) { // finished writing data - /* reset handler before calling on_msg_done, since it - * can trigger another call to red_peer_handle_outgoing (when - * switching from the urgent marshaller to the main one */ - handler->vec = handler->vec_buf; - handler->pos = 0; - handler->size = 0; - handler->cb->on_msg_done(handler->opaque); - return; - } - } - } -} - -static void red_channel_client_on_output(void *opaque, int n) -{ - RedChannelClient *rcc = opaque; - - if (rcc->connectivity_monitor.timer) { - rcc->connectivity_monitor.out_bytes += n; - } - stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); -} - -static void red_channel_client_on_input(void *opaque, int n) -{ - RedChannelClient *rcc = opaque; - - if (rcc->connectivity_monitor.timer) { - rcc->connectivity_monitor.in_bytes += n; - } -} - static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { red_channel_client_disconnect(rcc); } -static int red_channel_client_peer_get_out_msg_size(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - return rcc->send_data.size; -} - -static void red_channel_client_peer_prepare_out_msg( - void *opaque, struct iovec *vec, int *vec_size, int pos) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, - vec, IOV_MAX, pos); -} - -static void red_channel_client_peer_on_out_block(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - rcc->send_data.blocked = TRUE; - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ | - SPICE_WATCH_EVENT_WRITE); -} - -static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) -{ - return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); -} - -static void red_channel_client_reset_send_data(RedChannelClient *rcc) -{ - spice_marshaller_reset(rcc->send_data.marshaller); - rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, - rcc->send_data.header.header_size); - spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); - rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); - rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); - - /* Keeping the serial consecutive: resetting it if reset_send_data - * has been called before, but no message has been sent since then. - */ - if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { - spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); - /* When the urgent marshaller is active, the serial was incremented by - * the call to reset_send_data that was made for the main marshaller. - * The urgent msg receives this serial, and the main msg serial is - * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) - * should be 1 in this case*/ - if (!red_channel_client_urgent_marshaller_is_active(rcc)) { - rcc->send_data.serial = rcc->send_data.last_sent_serial; - } - } - rcc->send_data.serial++; - - if (!rcc->is_mini_header) { - spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); - } -} - -void red_channel_client_push_set_ack(RedChannelClient *rcc) -{ - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); -} - -static void red_channel_client_send_set_ack(RedChannelClient *rcc) -{ - SpiceMsgSetAck ack; - - spice_assert(rcc); - red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); - ack.generation = ++rcc->ack_data.generation; - ack.window = rcc->ack_data.client_window; - rcc->ack_data.messages_window = 0; - - spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); - - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_migrate(RedChannelClient *rcc) -{ - SpiceMsgMigrate migrate; - - red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); - migrate.flags = rcc->channel->migration_flags; - spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { - rcc->wait_migrate_flush_mark = TRUE; - } - - red_channel_client_begin_send_message(rcc); -} - - -static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) -{ - RedEmptyMsgPipeItem *msg_pipe_item = SPICE_CONTAINEROF(base, RedEmptyMsgPipeItem, base); - - red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_ping(RedChannelClient *rcc) -{ - SpiceMsgPing ping; - - if (!rcc->latency_monitor.warmup_was_sent) { // latency test start - int delay_val; - socklen_t opt_size = sizeof(delay_val); - - rcc->latency_monitor.warmup_was_sent = TRUE; - /* - * When testing latency, TCP_NODELAY must be switched on, otherwise, - * sending the ping message is delayed by Nagle algorithm, and the - * roundtrip measurement is less accurate (bigger). - */ - rcc->latency_monitor.tcp_nodelay = 1; - if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - &opt_size) == -1) { - spice_warning("getsockopt failed, %s", strerror(errno)); - } else { - rcc->latency_monitor.tcp_nodelay = delay_val; - if (!delay_val) { - delay_val = 1; - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - sizeof(delay_val)) == -1) { - if (errno != ENOTSUP) { - spice_warning("setsockopt failed, %s", strerror(errno)); - } - } - } - } - } - - red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); - ping.id = rcc->latency_monitor.id; - ping.timestamp = spice_get_monotonic_time_ns(); - spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) -{ - spice_assert(red_channel_client_no_item_being_sent(rcc)); - red_channel_client_reset_send_data(rcc); - switch (item->type) { - case RED_PIPE_ITEM_TYPE_SET_ACK: - red_channel_client_send_set_ack(rcc); - break; - case RED_PIPE_ITEM_TYPE_MIGRATE: - red_channel_client_send_migrate(rcc); - break; - case RED_PIPE_ITEM_TYPE_EMPTY_MSG: - red_channel_client_send_empty_msg(rcc, item); - break; - case RED_PIPE_ITEM_TYPE_PING: - red_channel_client_send_ping(rcc); - break; - default: - rcc->channel->channel_cbs.send_item(rcc, item); - return; - } - free(item); -} - -static void red_channel_client_release_item(RedChannelClient *rcc, RedPipeItem *item, int item_pushed) -{ - switch (item->type) { - case RED_PIPE_ITEM_TYPE_SET_ACK: - case RED_PIPE_ITEM_TYPE_EMPTY_MSG: - case RED_PIPE_ITEM_TYPE_MIGRATE: - case RED_PIPE_ITEM_TYPE_PING: - free(item); - break; - default: - rcc->channel->channel_cbs.release_item(rcc, item, item_pushed); - } -} - -static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) -{ - if (rcc->send_data.item) { - red_channel_client_release_item(rcc, - rcc->send_data.item, TRUE); - rcc->send_data.item = NULL; - } -} - -static void red_channel_client_on_out_msg_done(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - int fd; - - rcc->send_data.size = 0; - - if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { - if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { - perror("sendfd"); - red_channel_client_disconnect(rcc); - if (fd != -1) - close(fd); - return; - } - if (fd != -1) - close(fd); - } - - red_channel_client_release_sent_item(rcc); - if (rcc->send_data.blocked) { - rcc->send_data.blocked = FALSE; - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ); - } - - if (red_channel_client_urgent_marshaller_is_active(rcc)) { - red_channel_client_restore_main_sender(rcc); - spice_assert(rcc->send_data.header.data != NULL); - red_channel_client_begin_send_message(rcc); - } else { - if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { - /* It is possible that the socket will become idle, so we may be able to test latency */ - red_channel_client_restart_ping_timer(rcc); - } - } - -} - -static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) -{ - rcc->pipe_size--; - ring_remove(&item->link); -} - -static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) { spice_assert(rcc); channel->clients = g_list_append(channel->clients, rcc); } -static void red_channel_client_set_remote_caps(RedChannelClient* rcc, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - rcc->remote_caps.num_common_caps = num_common_caps; - rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); - - rcc->remote_caps.num_caps = num_caps; - rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); -} - -static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) -{ - rcc->remote_caps.num_common_caps = 0; - free(rcc->remote_caps.common_caps); - rcc->remote_caps.num_caps = 0; - free(rcc->remote_caps.caps); -} - -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) -{ - return test_capability(rcc->remote_caps.common_caps, - rcc->remote_caps.num_common_caps, - cap); -} - -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) -{ - return test_capability(rcc->remote_caps.caps, - rcc->remote_caps.num_caps, - cap); -} - int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap) { GList *link; @@ -709,231 +109,9 @@ int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap) return TRUE; } -static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) -{ - if (red_client_get_channel(client, channel->type, channel->id)) { - spice_printerr("Error client %p: duplicate channel type %d id %d", - client, channel->type, channel->id); - return FALSE; - } - return TRUE; -} - -static void red_channel_client_push_ping(RedChannelClient *rcc) -{ - spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); - rcc->latency_monitor.state = PING_STATE_WARMUP; - rcc->latency_monitor.warmup_was_sent = FALSE; - rcc->latency_monitor.id = rand(); - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); -} - -static void red_channel_client_ping_timer(void *opaque) -{ - RedChannelClient *rcc = opaque; - - spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); - red_channel_client_cancel_ping_timer(rcc); - -#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ - { - int so_unsent_size = 0; - - /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ - if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { - spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); - } - if (so_unsent_size > 0) { - /* tcp snd buffer is still occupied. rescheduling ping */ - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } else { - red_channel_client_push_ping(rcc); - } - } -#else /* ifdef HAVE_LINUX_SOCKIOS_H */ - /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ - red_channel_client_push_ping(rcc); -#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ -} - -/* - * When a connection is not alive (and we can't detect it via a socket error), we - * reach one of these 2 states: - * (1) Sending msgs is blocked: either writes return EAGAIN - * or we are missing MSGC_ACK from the client. - * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. - * - * The connectivity_timer callback tests if the channel's state matches one of the above. - * In case it does, on the next time the timer is called, it checks if the connection has - * been idle during the time that passed since the previous timer call. If the connection - * has been idle, we consider the client as disconnected. - */ -static void red_channel_client_connectivity_timer(void *opaque) -{ - RedChannelClient *rcc = opaque; - RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; - int is_alive = TRUE; - - if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { - if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { - if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { - spice_error("mismatch between rcc-state and connectivity-state"); - } - spice_debug("rcc is blocked; connection is idle"); - is_alive = FALSE; - } - } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { - if (monitor->in_bytes == 0) { - if (rcc->latency_monitor.state != PING_STATE_WARMUP && - rcc->latency_monitor.state != PING_STATE_LATENCY) { - spice_error("mismatch between rcc-state and connectivity-state"); - } - spice_debug("rcc waits for pong; connection is idle"); - is_alive = FALSE; - } - } - - if (is_alive) { - monitor->in_bytes = 0; - monitor->out_bytes = 0; - if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { - monitor->state = CONNECTIVITY_STATE_BLOCKED; - } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || - rcc->latency_monitor.state == PING_STATE_LATENCY) { - monitor->state = CONNECTIVITY_STATE_WAIT_PONG; - } else { - monitor->state = CONNECTIVITY_STATE_CONNECTED; - } - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } else { - monitor->state = CONNECTIVITY_STATE_DISCONNECTED; - spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", - rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); - red_channel_client_disconnect(rcc); - } -} - -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) -{ - if (!red_channel_client_is_connected(rcc)) { - return; - } - spice_debug(NULL); - spice_assert(timeout_ms > 0); - /* - * If latency_monitor is not active, we activate it in order to enable - * periodic ping messages so that we will be be able to identify a disconnected - * channel-client even if there are no ongoing channel specific messages - * on this channel. - */ - if (rcc->latency_monitor.timer == NULL) { - rcc->latency_monitor.timer = rcc->channel->core->timer_add( - rcc->channel->core, red_channel_client_ping_timer, rcc); - if (!red_client_during_migrate_at_target(rcc->client)) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - rcc->latency_monitor.roundtrip = -1; - } - if (rcc->connectivity_monitor.timer == NULL) { - rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; - rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( - rcc->channel->core, red_channel_client_connectivity_timer, rcc); - rcc->connectivity_monitor.timeout = timeout_ms; - if (!red_client_during_migrate_at_target(rcc->client)) { - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } - } -} - -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, - RedsStream *stream, - int monitor_latency, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - RedChannelClient *rcc = NULL; - - pthread_mutex_lock(&client->lock); - if (!red_channel_client_pre_create_validate(channel, client)) { - goto error; - } - spice_assert(stream && channel && size >= sizeof(RedChannelClient)); - rcc = spice_malloc0(size); - rcc->stream = stream; - rcc->channel = channel; - rcc->client = client; - rcc->refs = 1; - rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + - // block flags) - rcc->ack_data.client_generation = ~0; - rcc->ack_data.client_window = CLIENT_ACK_WINDOW; - rcc->send_data.main.marshaller = spice_marshaller_new(); - rcc->send_data.urgent.marshaller = spice_marshaller_new(); - - rcc->send_data.marshaller = rcc->send_data.main.marshaller; - - rcc->incoming.opaque = rcc; - rcc->incoming.cb = &channel->incoming_cb; - - rcc->outgoing.opaque = rcc; - rcc->outgoing.cb = &channel->outgoing_cb; - rcc->outgoing.pos = 0; - rcc->outgoing.size = 0; - - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { - rcc->incoming.header = mini_header_wrapper; - rcc->send_data.header = mini_header_wrapper; - rcc->is_mini_header = TRUE; - } else { - rcc->incoming.header = full_header_wrapper; - rcc->send_data.header = full_header_wrapper; - rcc->is_mini_header = FALSE; - } - - rcc->incoming.header.data = rcc->incoming.header_buf; - rcc->incoming.serial = 1; - - if (!channel->channel_cbs.config_socket(rcc)) { - goto error; - } - - ring_init(&rcc->pipe); - rcc->pipe_size = 0; - - stream->watch = channel->core->watch_add(channel->core, - stream->socket, - SPICE_WATCH_EVENT_READ, - red_channel_client_event, rcc); - rcc->id = g_list_length(channel->clients); - red_channel_add_client(channel, rcc); - red_client_add_channel(client, rcc); - red_channel_ref(channel); - pthread_mutex_unlock(&client->lock); - - if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { - rcc->latency_monitor.timer = channel->core->timer_add( - channel->core, red_channel_client_ping_timer, rcc); - if (!client->during_target_migrate) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - rcc->latency_monitor.roundtrip = -1; - } - - return rcc; -error: - free(rcc); - reds_stream_free(stream); - pthread_mutex_unlock(&client->lock); - return NULL; -} - /* returns TRUE If all channels are finished migrating, FALSE otherwise */ -static gboolean red_client_seamless_migration_done_for_channel(RedClient *client, - RedChannelClient *rcc) +gboolean red_client_seamless_migration_done_for_channel(RedClient *client, + RedChannelClient *rcc) { gboolean ret = FALSE; @@ -955,26 +133,6 @@ static gboolean red_client_seamless_migration_done_for_channel(RedClient *client return ret; } -static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) -{ - rcc->wait_migrate_data = FALSE; - - if (red_client_seamless_migration_done_for_channel(rcc->client, rcc)) { - if (rcc->latency_monitor.timer) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } - } -} - -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) -{ - return rcc->wait_migrate_data; -} - int red_channel_is_waiting_for_migrate_data(RedChannel *channel) { RedChannelClient *rcc; @@ -1006,20 +164,6 @@ static void red_channel_client_default_disconnect(RedChannelClient *base) red_channel_client_disconnect(base); } -void red_channel_client_default_migrate(RedChannelClient *rcc) -{ - if (rcc->latency_monitor.timer) { - red_channel_client_cancel_ping_timer(rcc); - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); - rcc->latency_monitor.timer = NULL; - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); - rcc->connectivity_monitor.timer = NULL; - } - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); -} - RedChannel *red_channel_create(int size, RedsState *reds, const SpiceCoreInterfaceInternal *core, @@ -1055,9 +199,9 @@ RedChannel *red_channel_create(int size, channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_client_default_peer_on_error; channel->incoming_cb.on_input = red_channel_client_on_input; - channel->outgoing_cb.get_msg_size = red_channel_client_peer_get_out_msg_size; - channel->outgoing_cb.prepare = red_channel_client_peer_prepare_out_msg; - channel->outgoing_cb.on_block = red_channel_client_peer_on_out_block; + channel->outgoing_cb.get_msg_size = red_channel_client_get_out_msg_size; + channel->outgoing_cb.prepare = red_channel_client_prepare_out_msg; + channel->outgoing_cb.on_block = red_channel_client_on_out_block; channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_client_default_peer_on_error; channel->outgoing_cb.on_msg_done = red_channel_client_on_out_msg_done; @@ -1120,587 +264,155 @@ RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, u red_channel_register_client_cbs(channel, &client_cbs, NULL); red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER); - channel->thread_id = pthread_self(); - spice_debug("channel type %d id %d thread_id 0x%lx", - channel->type, channel->id, channel->thread_id); - - channel->out_bytes_counter = 0; - - return channel; -} - -static int do_nothing_handle_message(RedChannelClient *rcc, - uint16_t type, - uint32_t size, - uint8_t *msg) -{ - return TRUE; -} - -RedChannel *red_channel_create_parser(int size, - RedsState *reds, - const SpiceCoreInterfaceInternal *core, - uint32_t type, uint32_t id, - int handle_acks, - spice_parse_channel_func_t parser, - channel_handle_parsed_proc handle_parsed, - const ChannelCbs *channel_cbs, - uint32_t migration_flags) -{ - RedChannel *channel = red_channel_create(size, reds, core, type, id, - handle_acks, - do_nothing_handle_message, - channel_cbs, - migration_flags); - - if (channel == NULL) { - return NULL; - } - channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; - channel->incoming_cb.parser = parser; - - return channel; -} - -void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) -{ - spice_return_if_fail(channel != NULL); - spice_return_if_fail(channel->stat == 0); - -#ifdef RED_STATISTICS - channel->stat = stat; - channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); -#endif -} - -void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) -{ - spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); - channel->client_cbs.connect = client_cbs->connect; - - if (client_cbs->disconnect) { - channel->client_cbs.disconnect = client_cbs->disconnect; - } - - if (client_cbs->migrate) { - channel->client_cbs.migrate = client_cbs->migrate; - } - channel->data = cbs_data; -} - -int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) -{ - uint32_t index = cap / 32; - if (num_caps < index + 1) { - return FALSE; - } - - return (caps[index] & (1 << (cap % 32))) != 0; -} - -static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) -{ - int nbefore, n; - - nbefore = *num_caps; - n = cap / 32; - *num_caps = MAX(*num_caps, n + 1); - *caps = spice_renew(uint32_t, *caps, *num_caps); - memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); - (*caps)[n] |= (1 << (cap % 32)); -} - -void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) -{ - add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); -} - -void red_channel_set_cap(RedChannel *channel, uint32_t cap) -{ - add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); -} - -static void red_channel_ref(RedChannel *channel) -{ - channel->refs++; -} - -static void red_channel_unref(RedChannel *channel) -{ - if (!--channel->refs) { - if (channel->local_caps.num_common_caps) { - free(channel->local_caps.common_caps); - } - - if (channel->local_caps.num_caps) { - free(channel->local_caps.caps); - } - - free(channel); - } -} - -void red_channel_client_ref(RedChannelClient *rcc) -{ - rcc->refs++; -} - -void red_channel_client_unref(RedChannelClient *rcc) -{ - if (!--rcc->refs) { - spice_debug("destroy rcc=%p", rcc); - - reds_stream_free(rcc->stream); - rcc->stream = NULL; - - if (rcc->send_data.main.marshaller) { - spice_marshaller_destroy(rcc->send_data.main.marshaller); - } - - if (rcc->send_data.urgent.marshaller) { - spice_marshaller_destroy(rcc->send_data.urgent.marshaller); - } - - red_channel_client_destroy_remote_caps(rcc); - if (rcc->channel) { - red_channel_unref(rcc->channel); - } - free(rcc); - } -} - -void red_channel_client_destroy(RedChannelClient *rcc) -{ - rcc->destroying = 1; - red_channel_client_disconnect(rcc); - red_client_remove_channel(rcc); - red_channel_client_unref(rcc); -} - -void red_channel_destroy(RedChannel *channel) -{ - if (!channel) { - return; - } - - g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); - red_channel_unref(channel); -} - -void red_channel_client_shutdown(RedChannelClient *rcc) -{ - if (rcc->stream && !rcc->stream->shutdown) { - rcc->channel->core->watch_remove(rcc->stream->watch); - rcc->stream->watch = NULL; - shutdown(rcc->stream->socket, SHUT_RDWR); - rcc->stream->shutdown = TRUE; - } -} - -void red_channel_client_send(RedChannelClient *rcc) -{ - red_channel_client_ref(rcc); - red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); - red_channel_client_unref(rcc); -} - -void red_channel_send(RedChannel *channel) -{ - g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); -} - -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) -{ - return (rcc->channel->handle_acks && - (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); -} - -static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) -{ - RedPipeItem *item; - - if (!rcc || rcc->send_data.blocked - || red_channel_client_waiting_for_ack(rcc) - || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { - return NULL; - } - red_channel_client_pipe_remove(rcc, item); - return item; -} - -void red_channel_client_push(RedChannelClient *rcc) -{ - RedPipeItem *pipe_item; - - if (!rcc->during_send) { - rcc->during_send = TRUE; - } else { - return; - } - red_channel_client_ref(rcc); - if (rcc->send_data.blocked) { - red_channel_client_send(rcc); - } - - if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { - rcc->send_data.blocked = TRUE; - spice_printerr("ERROR: an item waiting to be sent and not blocked"); - } - - while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { - red_channel_client_send_item(rcc, pipe_item); - } - if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) - && rcc->stream->watch) { - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ); - } - rcc->during_send = FALSE; - red_channel_client_unref(rcc); -} - -void red_channel_push(RedChannel *channel) -{ - if (!channel) { - return; - } - - g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); -} - -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) -{ - if (rcc->latency_monitor.roundtrip < 0) { - return rcc->latency_monitor.roundtrip; - } - return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; -} - -static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) -{ - rcc->ack_data.messages_window = 0; - red_channel_client_push(rcc); -} - -// TODO: this function doesn't make sense because the window should be client (WAN/LAN) -// specific -void red_channel_init_outgoing_messages_window(RedChannel *channel) -{ - g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); -} - -static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) -{ - if (rcc->channel->channel_cbs.handle_migrate_flush_mark) { - rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc); - } -} - -// TODO: the whole migration is broken with multiple clients. What do we want to do? -// basically just -// 1) source send mark to all -// 2) source gets at various times the data (waits for all) -// 3) source migrates to target -// 4) target sends data to all -// So need to make all the handlers work with per channel/client data (what data exactly?) -static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) -{ - spice_debug("channel type %d id %d rcc %p size %u", - rcc->channel->type, rcc->channel->id, rcc, size); - if (!rcc->channel->channel_cbs.handle_migrate_data) { - return; - } - if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { - spice_channel_client_error(rcc, "unexpected"); - return; - } - if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) { - red_channel_client_set_message_serial(rcc, - rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); - } - if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) { - spice_channel_client_error(rcc, "handle_migrate_data failed"); - return; - } - red_channel_client_seamless_migration_done(rcc); -} - -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) -{ - uint64_t passed, timeout; - - passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; - timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; - if (passed < PING_TEST_TIMEOUT_MS) { - timeout += PING_TEST_TIMEOUT_MS - passed; - } - - red_channel_client_start_ping_timer(rcc, timeout); -} - -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) -{ - if (!rcc->latency_monitor.timer) { - return; - } - if (rcc->latency_monitor.state != PING_STATE_NONE) { - return; - } - rcc->latency_monitor.state = PING_STATE_TIMER; - rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); -} - -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) -{ - if (!rcc->latency_monitor.timer) { - return; - } - if (rcc->latency_monitor.state != PING_STATE_TIMER) { - return; - } - - rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); - rcc->latency_monitor.state = PING_STATE_NONE; -} - -static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) -{ - uint64_t now; - - /* ignoring unexpected pongs, or post-migration pongs for pings that - * started just before migration */ - if (ping->id != rcc->latency_monitor.id) { - spice_warning("ping-id (%u)!= pong-id %u", - rcc->latency_monitor.id, ping->id); - return; - } - - now = spice_get_monotonic_time_ns(); - - if (rcc->latency_monitor.state == PING_STATE_WARMUP) { - rcc->latency_monitor.state = PING_STATE_LATENCY; - return; - } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { - spice_warning("unexpected"); - return; - } - - /* set TCP_NODELAY=0, in case we reverted it for the test*/ - if (!rcc->latency_monitor.tcp_nodelay) { - int delay_val = 0; - - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - sizeof(delay_val)) == -1) { - if (errno != ENOTSUP) { - spice_warning("setsockopt failed, %s", strerror(errno)); - } - } - } - - /* - * The real network latency shouldn't change during the connection. However, - * the measurements can be bigger than the real roundtrip due to other - * threads or processes that are utilizing the network. We update the roundtrip - * measurement with the minimal value we encountered till now. - */ - if (rcc->latency_monitor.roundtrip < 0 || - now - ping->timestamp < rcc->latency_monitor.roundtrip) { - rcc->latency_monitor.roundtrip = now - ping->timestamp; - spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); - } + channel->thread_id = pthread_self(); + spice_debug("channel type %d id %d thread_id 0x%lx", + channel->type, channel->id, channel->thread_id); - rcc->latency_monitor.last_pong_time = now; - rcc->latency_monitor.state = PING_STATE_NONE; - red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); + channel->out_bytes_counter = 0; + + return channel; } -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message) +static int do_nothing_handle_message(RedChannelClient *rcc, + uint16_t type, + uint32_t size, + uint8_t *msg) { - switch (type) { - case SPICE_MSGC_ACK_SYNC: - if (size != sizeof(uint32_t)) { - spice_printerr("bad message size"); - return FALSE; - } - rcc->ack_data.client_generation = *(uint32_t *)(message); - break; - case SPICE_MSGC_ACK: - if (rcc->ack_data.client_generation == rcc->ack_data.generation) { - rcc->ack_data.messages_window -= rcc->ack_data.client_window; - red_channel_client_push(rcc); - } - break; - case SPICE_MSGC_DISCONNECTING: - break; - case SPICE_MSGC_MIGRATE_FLUSH_MARK: - if (!rcc->wait_migrate_flush_mark) { - spice_error("unexpected flush mark"); - return FALSE; - } - red_channel_handle_migrate_flush_mark(rcc); - rcc->wait_migrate_flush_mark = FALSE; - break; - case SPICE_MSGC_MIGRATE_DATA: - red_channel_handle_migrate_data(rcc, size, message); - break; - case SPICE_MSGC_PONG: - red_channel_client_handle_pong(rcc, message); - break; - default: - spice_printerr("invalid message type %u", type); - return FALSE; - } return TRUE; } -static void red_channel_client_event(int fd, int event, void *data) +RedChannel *red_channel_create_parser(int size, + RedsState *reds, + const SpiceCoreInterfaceInternal *core, + uint32_t type, uint32_t id, + int handle_acks, + spice_parse_channel_func_t parser, + channel_handle_parsed_proc handle_parsed, + const ChannelCbs *channel_cbs, + uint32_t migration_flags) { - RedChannelClient *rcc = (RedChannelClient *)data; + RedChannel *channel = red_channel_create(size, reds, core, type, id, + handle_acks, + do_nothing_handle_message, + channel_cbs, + migration_flags); - red_channel_client_ref(rcc); - if (event & SPICE_WATCH_EVENT_READ) { - red_channel_client_receive(rcc); - } - if (event & SPICE_WATCH_EVENT_WRITE) { - red_channel_client_push(rcc); + if (channel == NULL) { + return NULL; } - red_channel_client_unref(rcc); + channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; + channel->incoming_cb.parser = parser; + + return channel; } -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) +void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - spice_assert(msg_type != 0); - rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); - rcc->send_data.item = item; - if (item) { - rcc->channel->channel_cbs.hold_item(rcc, item); - } + spice_return_if_fail(channel != NULL); + spice_return_if_fail(channel->stat == 0); + +#ifdef RED_STATISTICS + channel->stat = stat; + channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); +#endif } -void red_channel_client_begin_send_message(RedChannelClient *rcc) +void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) { - SpiceMarshaller *m = rcc->send_data.marshaller; + spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); + channel->client_cbs.connect = client_cbs->connect; - // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) - if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { - spice_printerr("BUG: header->type == 0"); - return; + if (client_cbs->disconnect) { + channel->client_cbs.disconnect = client_cbs->disconnect; } - /* canceling the latency test timer till the nework is idle */ - red_channel_client_cancel_ping_timer(rcc); - - spice_marshaller_flush(m); - rcc->send_data.size = spice_marshaller_get_total_size(m); - rcc->send_data.header.set_msg_size(&rcc->send_data.header, - rcc->send_data.size - rcc->send_data.header.header_size); - rcc->ack_data.messages_window++; - rcc->send_data.last_sent_serial = rcc->send_data.serial; - rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ - red_channel_client_send(rcc); + if (client_cbs->migrate) { + channel->client_cbs.migrate = client_cbs->migrate; + } + channel->data = cbs_data; } -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) +int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - spice_assert(rcc->send_data.header.data != NULL); - rcc->send_data.main.header_data = rcc->send_data.header.data; - rcc->send_data.main.item = rcc->send_data.item; - - rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; - rcc->send_data.item = NULL; - red_channel_client_reset_send_data(rcc); - return rcc->send_data.marshaller; + uint32_t index = cap / 32; + if (num_caps < index + 1) { + return FALSE; + } + + return (caps[index] & (1 << (cap % 32))) != 0; } -static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) { - spice_marshaller_reset(rcc->send_data.urgent.marshaller); - rcc->send_data.marshaller = rcc->send_data.main.marshaller; - rcc->send_data.header.data = rcc->send_data.main.header_data; - if (!rcc->is_mini_header) { - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); - } - rcc->send_data.item = rcc->send_data.main.item; + int nbefore, n; + + nbefore = *num_caps; + n = cap / 32; + *num_caps = MAX(*num_caps, n + 1); + *caps = spice_renew(uint32_t, *caps, *num_caps); + memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); + (*caps)[n] |= (1 << (cap % 32)); } -uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) +void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) { - return rcc->send_data.serial; + add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); } -void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) +void red_channel_set_cap(RedChannel *channel, uint32_t cap) { - rcc->send_data.last_sent_serial = serial; - rcc->send_data.serial = serial; + add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); } -static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) +void red_channel_ref(RedChannel *channel) { - spice_assert(rcc && item); - if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { - spice_debug("rcc is disconnected %p", rcc); - red_channel_client_release_item(rcc, item, FALSE); - return FALSE; - } - if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ | - SPICE_WATCH_EVENT_WRITE); - } - rcc->pipe_size++; - ring_add(pos, &item->link); - return TRUE; + channel->refs++; } -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) +void red_channel_unref(RedChannel *channel) { + if (--channel->refs == 0) { + if (channel->local_caps.num_common_caps) { + free(channel->local_caps.common_caps); + } - client_pipe_add(rcc, item, &rcc->pipe); -} + if (channel->local_caps.num_caps) { + free(channel->local_caps.caps); + } -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) -{ - red_channel_client_pipe_add(rcc, item); - red_channel_client_push(rcc); + free(channel); + } } -void red_channel_client_pipe_add_after(RedChannelClient *rcc, - RedPipeItem *item, - RedPipeItem *pos) +void red_channel_destroy(RedChannel *channel) { - spice_assert(pos); - client_pipe_add(rcc, item, &pos->link); -} + if (!channel) { + return; + } -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, - RedPipeItem *item) -{ - return ring_item_is_linked(&item->link); + g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); + red_channel_unref(channel); } -void red_channel_client_pipe_add_tail(RedChannelClient *rcc, - RedPipeItem *item) +void red_channel_send(RedChannel *channel) { - client_pipe_add(rcc, item, rcc->pipe.prev); + g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); } -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) +void red_channel_push(RedChannel *channel) { - if (client_pipe_add(rcc, item, rcc->pipe.prev)) { - red_channel_client_push(rcc); + if (!channel) { + return; } + + g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); } -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) +// TODO: this function doesn't make sense because the window should be client (WAN/LAN) +// specific +void red_channel_init_outgoing_messages_window(RedChannel *channel) { - RedPipeItem *item = spice_new(RedPipeItem, 1); - - red_pipe_item_init(item, pipe_item_type); - red_channel_client_pipe_add(rcc, item); - red_channel_client_push(rcc); + g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); } static void red_channel_client_pipe_add_type_proxy(gpointer data, gpointer user_data) @@ -1715,16 +427,6 @@ void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type) GINT_TO_POINTER(pipe_item_type)); } -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) -{ - RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); - - red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); - item->msg = msg_type; - red_channel_client_pipe_add(rcc, &item->base); - red_channel_client_push(rcc); -} - static void red_channel_client_pipe_add_empty_msg_proxy(gpointer data, gpointer user_data) { int type = GPOINTER_TO_INT(user_data); @@ -1736,120 +438,38 @@ void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type) g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type)); } -int red_channel_client_is_connected(RedChannelClient *rcc) -{ - if (!rcc->dummy) { - return rcc->channel - && (g_list_find(rcc->channel->clients, rcc) != NULL); - } else { - return rcc->dummy_connected; - } -} - int red_channel_is_connected(RedChannel *channel) { return channel && channel->clients; } -void red_channel_client_clear_sent_item(RedChannelClient *rcc) -{ - if (rcc->send_data.item) { - red_channel_client_release_item(rcc, rcc->send_data.item, TRUE); - rcc->send_data.item = NULL; - } - rcc->send_data.blocked = FALSE; - rcc->send_data.size = 0; -} - -void red_channel_client_pipe_clear(RedChannelClient *rcc) -{ - RedPipeItem *item; - - if (rcc) { - red_channel_client_clear_sent_item(rcc); - } - while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { - ring_remove(&item->link); - red_channel_client_release_item(rcc, item, FALSE); - } - rcc->pipe_size = 0; -} - -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) -{ - rcc->ack_data.messages_window = 0; -} - -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) -{ - rcc->ack_data.client_window = client_window; -} - -static void red_channel_remove_client(RedChannelClient *rcc) +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc) { GList *link; + g_return_if_fail(channel == red_channel_client_get_channel(rcc)); - if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) { + if (!pthread_equal(pthread_self(), channel->thread_id)) { spice_warning("channel type %d id %d - " "channel->thread_id (0x%lx) != pthread_self (0x%lx)." "If one of the threads is != io-thread && != vcpu-thread, " "this might be a BUG", - rcc->channel->type, rcc->channel->id, - rcc->channel->thread_id, pthread_self()); + channel->type, channel->id, + channel->thread_id, pthread_self()); } - spice_return_if_fail(rcc->channel); - link = g_list_find(rcc->channel->clients, rcc); + spice_return_if_fail(channel); + link = g_list_find(channel->clients, rcc); spice_return_if_fail(link != NULL); - rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link); + channel->clients = g_list_remove_link(channel->clients, link); // TODO: should we set rcc->channel to NULL??? } -static void red_client_remove_channel(RedChannelClient *rcc) -{ - pthread_mutex_lock(&rcc->client->lock); - rcc->client->channels = g_list_remove(rcc->client->channels, rcc); - pthread_mutex_unlock(&rcc->client->lock); -} - -static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) -{ - GList *link; - spice_assert(rcc->dummy); - if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) { - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); - red_channel_remove_client(link->data); - } - rcc->dummy_connected = FALSE; -} - -void red_channel_client_disconnect(RedChannelClient *rcc) +void red_client_remove_channel(RedChannelClient *rcc) { - if (rcc->dummy) { - red_channel_client_disconnect_dummy(rcc); - return; - } - if (!red_channel_client_is_connected(rcc)) { - return; - } - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); - red_channel_client_pipe_clear(rcc); - if (rcc->stream->watch) { - rcc->channel->core->watch_remove(rcc->stream->watch); - rcc->stream->watch = NULL; - } - if (rcc->latency_monitor.timer) { - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); - rcc->latency_monitor.timer = NULL; - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); - rcc->connectivity_monitor.timer = NULL; - } - red_channel_remove_client(rcc); - rcc->channel->channel_cbs.on_disconnect(rcc); + RedClient *client = red_channel_client_get_client(rcc); + pthread_mutex_lock(&client->lock); + client->channels = g_list_remove(client->channels, rcc); + pthread_mutex_unlock(&client->lock); } void red_channel_disconnect(RedChannel *channel) @@ -1857,51 +477,6 @@ void red_channel_disconnect(RedChannel *channel) g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL); } -RedChannelClient *red_channel_client_create_dummy(int size, - RedChannel *channel, - RedClient *client, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - RedChannelClient *rcc = NULL; - - spice_assert(size >= sizeof(RedChannelClient)); - - pthread_mutex_lock(&client->lock); - if (!red_channel_client_pre_create_validate(channel, client)) { - goto error; - } - rcc = spice_malloc0(size); - rcc->refs = 1; - rcc->client = client; - rcc->channel = channel; - red_channel_ref(channel); - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { - rcc->incoming.header = mini_header_wrapper; - rcc->send_data.header = mini_header_wrapper; - rcc->is_mini_header = TRUE; - } else { - rcc->incoming.header = full_header_wrapper; - rcc->send_data.header = full_header_wrapper; - rcc->is_mini_header = FALSE; - } - - rcc->incoming.header.data = rcc->incoming.header_buf; - rcc->incoming.serial = 1; - ring_init(&rcc->pipe); - - rcc->dummy = TRUE; - rcc->dummy_connected = TRUE; - red_channel_add_client(channel, rcc); - red_client_add_channel(client, rcc); - pthread_mutex_unlock(&client->lock); - return rcc; -error: - pthread_mutex_unlock(&client->lock); - return NULL; -} - void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) { g_list_foreach(channel->clients, (GFunc)cb, NULL); @@ -1922,7 +497,7 @@ int red_channel_all_blocked(RedChannel *channel) } for (link = channel->clients; link != NULL; link = link->next) { rcc = link->data; - if (!rcc->send_data.blocked) { + if (!red_channel_client_is_blocked(rcc)) { return FALSE; } } @@ -1936,56 +511,25 @@ int red_channel_any_blocked(RedChannel *channel) for (link = channel->clients; link != NULL; link = link->next) { rcc = link->data; - if (rcc->send_data.blocked) { + if (red_channel_client_is_blocked(rcc)) { return TRUE; } } return FALSE; } -int red_channel_client_blocked(RedChannelClient *rcc) -{ - return rcc && rcc->send_data.blocked; -} - -int red_channel_client_send_message_pending(RedChannelClient *rcc) -{ - return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; -} - -/* accessors for RedChannelClient */ -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) -{ - return rcc->send_data.marshaller; -} - -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) -{ - return rcc->stream; -} - -RedClient *red_channel_client_get_client(RedChannelClient *rcc) -{ - return rcc->client; -} - -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) -{ - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); -} - -/* end of accessors */ - int red_channel_get_first_socket(RedChannel *channel) { RedChannelClient *rcc; + RedsStream *stream; if (!channel || !channel->clients) { return -1; } rcc = channel->clients->data; + stream = red_channel_client_get_stream(rcc); - return rcc->stream->socket; + return stream->socket; } int red_channel_no_item_being_sent(RedChannel *channel) @@ -2002,18 +546,6 @@ int red_channel_no_item_being_sent(RedChannel *channel) return TRUE; } -int red_channel_client_no_item_being_sent(RedChannelClient *rcc) -{ - return !rcc || (rcc->send_data.size == 0); -} - -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, - RedPipeItem *item) -{ - red_channel_client_pipe_remove(rcc, item); - red_channel_client_release_item(rcc, item, FALSE); -} - /* * RedClient implementation - kept in red-channel.c because they are * pretty tied together. @@ -2051,21 +583,6 @@ RedClient *red_client_unref(RedClient *client) return client; } -/* client mutex should be locked before this call */ -static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) -{ - gboolean ret = FALSE; - - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { - rcc->wait_migrate_data = TRUE; - ret = TRUE; - } - spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, - rcc->wait_migrate_data); - - return ret; -} - void red_client_set_migration_seamless(RedClient *client) // dest { GList *link; @@ -2085,6 +602,7 @@ void red_client_migrate(RedClient *client) { GList *link, *next; RedChannelClient *rcc; + RedChannel *channel; spice_printerr("migrate client with #channels %d", g_list_length(client->channels)); if (!pthread_equal(pthread_self(), client->thread_id)) { @@ -2097,8 +615,9 @@ void red_client_migrate(RedClient *client) while (link) { next = link->next; rcc = link->data; + channel = red_channel_client_get_channel(rcc); if (red_channel_client_is_connected(rcc)) { - rcc->channel->client_cbs.migrate(rcc); + channel->client_cbs.migrate(rcc); } link = next; } @@ -2119,20 +638,21 @@ void red_client_destroy(RedClient *client) } link = client->channels; while (link) { + RedChannel *channel; next = link->next; // some channels may be in other threads, so disconnection // is not synchronous. rcc = link->data; - rcc->destroying = 1; + channel = red_channel_client_get_channel(rcc); + red_channel_client_set_destroying(rcc); // some channels may be in other threads. However we currently // assume disconnect is synchronous (we changed the dispatcher // to wait for disconnection) // TODO: should we go back to async. For this we need to use // ref count for channel clients. - rcc->channel->client_cbs.disconnect(rcc); - spice_assert(ring_is_empty(&rcc->pipe)); - spice_assert(rcc->pipe_size == 0); - spice_assert(rcc->send_data.size == 0); + channel->client_cbs.disconnect(rcc); + spice_assert(red_channel_client_pipe_is_empty(rcc)); + spice_assert(red_channel_client_no_item_being_sent(rcc)); red_channel_client_destroy(rcc); link = next; } @@ -2140,15 +660,17 @@ void red_client_destroy(RedClient *client) } /* client->lock should be locked */ -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) { GList *link; RedChannelClient *rcc; RedChannelClient *ret = NULL; for (link = client->channels; link != NULL; link = link->next) { + RedChannel *channel; rcc = link->data; - if (rcc->channel->type == type && rcc->channel->id == id) { + channel = red_channel_client_get_channel(rcc); + if (channel->type == type && channel->id == id) { ret = rcc; break; } @@ -2157,7 +679,7 @@ static RedChannelClient *red_client_get_channel(RedClient *client, int type, int } /* client->lock should be locked */ -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) +void red_client_add_channel(RedClient *client, RedChannelClient *rcc) { spice_assert(rcc && client); client->channels = g_list_append(client->channels, rcc); @@ -2189,11 +711,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client) link = client->channels; while (link) { next = link->next; - RedChannelClient *rcc = link->data; - - if (rcc->latency_monitor.timer) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } + red_channel_client_semi_seamless_migration_complete(link->data); link = next; } pthread_mutex_unlock(&client->lock); @@ -2289,8 +807,10 @@ uint32_t red_channel_max_pipe_size(RedChannel *channel) uint32_t pipe_size = 0; for (link = channel->clients; link != NULL; link = link->next) { + uint32_t new_size; rcc = link->data; - pipe_size = MAX(pipe_size, rcc->pipe_size); + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MAX(pipe_size, new_size); } return pipe_size; } @@ -2302,8 +822,10 @@ uint32_t red_channel_min_pipe_size(RedChannel *channel) uint32_t pipe_size = ~0; for (link = channel->clients; link != NULL; link = link->next) { + uint32_t new_size; rcc = link->data; - pipe_size = MIN(pipe_size, rcc->pipe_size); + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MIN(pipe_size, new_size); } return pipe_size == ~0 ? 0 : pipe_size; } @@ -2316,85 +838,11 @@ uint32_t red_channel_sum_pipes_size(RedChannel *channel) for (link = channel->clients; link != NULL; link = link->next) { rcc = link->data; - sum += rcc->pipe_size; + sum += red_channel_client_get_pipe_size(rcc); } return sum; } -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, - int64_t timeout) -{ - uint64_t end_time; - int blocked; - - if (!red_channel_client_blocked(rcc)) { - return TRUE; - } - if (timeout != -1) { - end_time = spice_get_monotonic_time_ns() + timeout; - } else { - end_time = UINT64_MAX; - } - spice_info("blocked"); - - do { - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - } while ((blocked = red_channel_client_blocked(rcc)) && - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); - - if (blocked) { - spice_warning("timeout"); - return FALSE; - } else { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - return TRUE; - } -} - -/* TODO: more evil sync stuff. anything with the word wait in it's name. */ -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, - RedPipeItem *item, - int64_t timeout) -{ - uint64_t end_time; - int item_in_pipe; - - spice_info(NULL); - - if (timeout != -1) { - end_time = spice_get_monotonic_time_ns() + timeout; - } else { - end_time = UINT64_MAX; - } - - rcc->channel->channel_cbs.hold_item(rcc, item); - - if (red_channel_client_blocked(rcc)) { - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - } - red_channel_client_push(rcc); - - while((item_in_pipe = ring_item_is_linked(&item->link)) && - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - red_channel_client_push(rcc); - } - - red_channel_client_release_item(rcc, item, TRUE); - if (item_in_pipe) { - spice_warning("timeout"); - return FALSE; - } else { - return red_channel_client_wait_outgoing_item(rcc, - timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); - } -} - int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout) { @@ -2429,15 +877,6 @@ int red_channel_wait_all_sent(RedChannel *channel, } } -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) -{ - if (red_channel_client_blocked(rcc) || rcc->pipe_size > 0) { - red_channel_client_disconnect(rcc); - } else { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - } -} - RedsState* red_channel_get_server(RedChannel *channel) { return channel->reds; diff --git a/server/red-channel.h b/server/red-channel.h index 7b4ec75..af6be18 100644 --- a/server/red-channel.h +++ b/server/red-channel.h @@ -34,6 +34,7 @@ #include "reds-stream.h" #include "stat.h" #include "red-pipe-item.h" +#include "red-channel-client.h" #define MAX_SEND_BUFS 1000 #define CLIENT_ACK_WINDOW 20 @@ -130,7 +131,6 @@ typedef struct OutgoingHandler { /* Red Channel interface */ typedef struct RedChannel RedChannel; -typedef struct RedChannelClient RedChannelClient; typedef struct RedClient RedClient; typedef struct MainChannelClient MainChannelClient; @@ -236,62 +236,6 @@ typedef struct RedChannelClientConnectivityMonitor { SpiceTimer *timer; } RedChannelClientConnectivityMonitor; -struct RedChannelClient { - RedChannel *channel; - RedClient *client; - RedsStream *stream; - int dummy; - int dummy_connected; - - uint32_t refs; - - struct { - uint32_t generation; - uint32_t client_generation; - uint32_t messages_window; - uint32_t client_window; - } ack_data; - - struct { - SpiceMarshaller *marshaller; - SpiceDataHeaderOpaque header; - uint32_t size; - RedPipeItem *item; - int blocked; - uint64_t serial; - uint64_t last_sent_serial; - - struct { - SpiceMarshaller *marshaller; - uint8_t *header_data; - RedPipeItem *item; - } main; - - struct { - SpiceMarshaller *marshaller; - } urgent; - } send_data; - - OutgoingHandler outgoing; - IncomingHandler incoming; - int during_send; - int id; // debugging purposes - Ring pipe; - uint32_t pipe_size; - - RedChannelCapabilities remote_caps; - int is_mini_header; - int destroying; - - int wait_migrate_data; - int wait_migrate_flush_mark; - - RedChannelClientLatencyMonitor latency_monitor; - RedChannelClientConnectivityMonitor connectivity_monitor; -}; - -#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) - struct RedChannel { uint32_t type; uint32_t id; @@ -335,17 +279,6 @@ struct RedChannel { #define RED_CHANNEL(Channel) ((RedChannel *)(Channel)) -/* - * When an error occurs over a channel, we treat it as a warning - * for spice-server and shutdown the channel. - */ -#define spice_channel_client_error(rcc, format, ...) \ - do { \ - spice_warning("rcc %p type %u id %u: " format, rcc, \ - rcc->channel->type, rcc->channel->id, ## __VA_ARGS__); \ - red_channel_client_shutdown(rcc); \ - } while (0) - /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't * explicitly destroy the channel */ RedChannel *red_channel_create(int size, @@ -368,6 +301,11 @@ RedChannel *red_channel_create_parser(int size, channel_handle_parsed_proc handle_parsed, const ChannelCbs *channel_cbs, uint32_t migration_flags); +void red_channel_ref(RedChannel *channel); +void red_channel_unref(RedChannel *channel); +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc); +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc); + void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat); void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data); @@ -375,26 +313,13 @@ void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *clien void red_channel_set_common_cap(RedChannel *channel, uint32_t cap); void red_channel_set_cap(RedChannel *channel, uint32_t cap); -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, - RedsStream *stream, - int monitor_latency, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps); // TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but // do use the client callbacks. So the channel clients are not connected (the channel doesn't // have list of them, but they do have a link to the channel, and the client has a list of them) RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, uint32_t id); -RedChannelClient *red_channel_client_create_dummy(int size, - RedChannel *channel, - RedClient *client, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps); int red_channel_is_connected(RedChannel *channel); -int red_channel_client_is_connected(RedChannelClient *rcc); -void red_channel_client_default_migrate(RedChannelClient *rcc); -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); /* seamless migration is supported for only one client. This routine * checks if the only channel client associated with channel is * waiting for migration data */ @@ -407,61 +332,15 @@ int red_channel_is_waiting_for_migrate_data(RedChannel *channel); * from red_client_destroy. */ -void red_channel_client_destroy(RedChannelClient *rcc); void red_channel_destroy(RedChannel *channel); -void red_channel_client_ref(RedChannelClient *rcc); -void red_channel_client_unref(RedChannelClient *rcc); - -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); /* return true if all the channel clients support the cap */ int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap); int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap); -/* shutdown is the only safe thing to do out of the client/channel - * thread. It will not touch the rings, just shutdown the socket. - * It should be followed by some way to gurantee a disconnection. */ -void red_channel_client_shutdown(RedChannelClient *rcc); - /* should be called when a new channel is ready to send messages */ void red_channel_init_outgoing_messages_window(RedChannel *channel); -/* handles general channel msgs from the client */ -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message); - -/* when preparing send_data: should call init and then use marshaller */ -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); - -uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); -void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); - -/* When sending a msg. Should first call red_channel_client_begin_send_message. - * It will first send the pending urgent data, if there is any, and then - * the rest of the data. - */ -void red_channel_client_begin_send_message(RedChannelClient *rcc); - -/* - * Stores the current send data, and switches to urgent send data. - * When it begins the actual send, it will send first the urgent data - * and afterward the rest of the data. - * Should be called only if during the marshalling of on message, - * the need to send another message, before, rises. - * Important: the serial of the non-urgent sent data, will be succeeded. - * return: the urgent send data marshaller - */ -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); - -/* returns -1 if we don't have an estimation */ -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); - -/* - * Checks periodically if the connection is still alive - */ -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); - // TODO: add back the channel_pipe_add functionality - by adding reference counting // to the RedPipeItem. @@ -471,23 +350,10 @@ int red_channel_pipes_new_add_push(RedChannel *channel, new_pipe_item_t creator, void red_channel_pipes_new_add(RedChannel *channel, new_pipe_item_t creator, void *data); void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator, void *data); -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); -/* for types that use this routine -> the pipe item should be freed */ -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type); -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type); -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); -void red_channel_client_push_set_ack(RedChannelClient *rcc); - int red_channel_get_first_socket(RedChannel *channel); /* return TRUE if all of the connected clients to this channel are blocked */ @@ -496,24 +362,13 @@ int red_channel_all_blocked(RedChannel *channel); /* return TRUE if any of the connected clients to this channel are blocked */ int red_channel_any_blocked(RedChannel *channel); -int red_channel_client_blocked(RedChannelClient *rcc); - -/* helper for channels that have complex logic that can possibly ready a send */ -int red_channel_client_send_message_pending(RedChannelClient *rcc); - int red_channel_no_item_being_sent(RedChannel *channel); -int red_channel_client_no_item_being_sent(RedChannelClient *rcc); // TODO: unstaticed for display/cursor channels. they do some specific pushes not through // adding elements or on events. but not sure if this is actually required (only result // should be that they ""try"" a little harder, but if the event system is correct it // should not make any difference. void red_channel_push(RedChannel *channel); -void red_channel_client_push(RedChannelClient *rcc); -// TODO: again - what is the context exactly? this happens in channel disconnect. but our -// current red_channel_shutdown also closes the socket - is there a socket to close? -// are we reading from an fd here? arghh -void red_channel_client_pipe_clear(RedChannelClient *rcc); // Again, used in various places outside of event handler context (or in other event handler // contexts): // flush_display_commands/flush_cursor_commands @@ -522,23 +377,10 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc); // red_wait_pipe_item_sent // handle_channel_events - this is the only one that was used before, and was in red-channel.c void red_channel_receive(RedChannel *channel); -void red_channel_client_receive(RedChannelClient *rcc); // For red_worker void red_channel_send(RedChannel *channel); -void red_channel_client_send(RedChannelClient *rcc); // For red_worker void red_channel_disconnect(RedChannel *channel); -void red_channel_client_disconnect(RedChannelClient *rcc); - -/* accessors for RedChannelClient */ -/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); -RedClient *red_channel_client_get_client(RedChannelClient *rcc); - -/* Note that the header is valid only between red_channel_reset_send_data and - * red_channel_begin_send_message.*/ -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); /* return the sum of all the rcc pipe size */ uint32_t red_channel_max_pipe_size(RedChannel *channel); @@ -592,6 +434,11 @@ RedClient *red_client_ref(RedClient *client); */ RedClient *red_client_unref(RedClient *client); +/* client->lock should be locked */ +void red_client_add_channel(RedClient *client, RedChannelClient *rcc); +void red_client_remove_channel(RedChannelClient *rcc); +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); + MainChannelClient *red_client_get_main(RedClient *client); // main should be set once before all the other channels are created void red_client_set_main(RedClient *client, MainChannelClient *mcc); @@ -607,7 +454,8 @@ void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side * int red_client_during_migrate_at_target(RedClient *client); void red_client_migrate(RedClient *client); - +gboolean red_client_seamless_migration_done_for_channel(RedClient *client, + RedChannelClient *rcc); /* * blocking functions. * @@ -616,13 +464,9 @@ void red_client_migrate(RedClient *client); * Return: TRUE if waiting succeeded. FALSE if timeout expired. */ -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, - RedPipeItem *item, - int64_t timeout); -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, - int64_t timeout); int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout); -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); + +#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro #endif diff --git a/server/red-qxl.c b/server/red-qxl.c index abde0ba..ef39f0e 100644 --- a/server/red-qxl.c +++ b/server/red-qxl.c @@ -102,12 +102,13 @@ static void red_qxl_disconnect_display_peer(RedChannelClient *rcc) { RedWorkerMessageDisplayDisconnect payload; Dispatcher *dispatcher; + RedChannel *channel = red_channel_client_get_channel(rcc); - if (!rcc->channel) { + if (!channel) { return; } - dispatcher = (Dispatcher *)rcc->channel->data; + dispatcher = (Dispatcher *)channel->data; spice_printerr(""); payload.rcc = rcc; @@ -123,11 +124,12 @@ static void red_qxl_display_migrate(RedChannelClient *rcc) { RedWorkerMessageDisplayMigrate payload; Dispatcher *dispatcher; - if (!rcc->channel) { + RedChannel *channel = red_channel_client_get_channel(rcc); + if (!channel) { return; } - dispatcher = (Dispatcher *)rcc->channel->data; - spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id); + dispatcher = (Dispatcher *)channel->data; + spice_printerr("channel type %u id %u", channel->type, channel->id); payload.rcc = rcc; dispatcher_send_message(dispatcher, RED_WORKER_MESSAGE_DISPLAY_MIGRATE, @@ -162,12 +164,13 @@ static void red_qxl_disconnect_cursor_peer(RedChannelClient *rcc) { RedWorkerMessageCursorDisconnect payload; Dispatcher *dispatcher; + RedChannel *channel = red_channel_client_get_channel(rcc); - if (!rcc->channel) { + if (!channel) { return; } - dispatcher = (Dispatcher *)rcc->channel->data; + dispatcher = (Dispatcher *)channel->data; spice_printerr(""); payload.rcc = rcc; @@ -180,12 +183,13 @@ static void red_qxl_cursor_migrate(RedChannelClient *rcc) { RedWorkerMessageCursorMigrate payload; Dispatcher *dispatcher; + RedChannel *channel = red_channel_client_get_channel(rcc); - if (!rcc->channel) { + if (!channel) { return; } - dispatcher = (Dispatcher *)rcc->channel->data; - spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id); + dispatcher = (Dispatcher *)channel->data; + spice_printerr("channel type %u id %u", channel->type, channel->id); payload.rcc = rcc; dispatcher_send_message(dispatcher, RED_WORKER_MESSAGE_CURSOR_MIGRATE, diff --git a/server/red-worker.c b/server/red-worker.c index 46fbc4d..8e3a90a 100644 --- a/server/red-worker.c +++ b/server/red-worker.c @@ -102,7 +102,8 @@ static int display_is_connected(RedWorker *worker) static uint8_t *common_alloc_recv_buf(RedChannelClient *rcc, uint16_t type, uint32_t size) { - CommonGraphicsChannel *common = SPICE_CONTAINEROF(rcc->channel, CommonGraphicsChannel, base); + RedChannel *channel = red_channel_client_get_channel(rcc); + CommonGraphicsChannel *common = SPICE_CONTAINEROF(channel, CommonGraphicsChannel, base); /* SPICE_MSGC_MIGRATE_DATA is the only client message whose size is dynamic */ if (type == SPICE_MSGC_MIGRATE_DATA) { diff --git a/server/reds.c b/server/reds.c index 5551db7..b89d6ee 100644 --- a/server/reds.c +++ b/server/reds.c @@ -1046,12 +1046,14 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t { RedCharDevice *dev_state = RED_CHAR_DEVICE(reds->agent_dev); RedChannelClient *rcc; + RedClient *client; if (!reds->vdagent) { return; } spice_assert(reds->vdagent->st && reds->vdagent->st == dev_state); rcc = main_channel_client_get_base(mcc); + client = red_channel_client_get_client(rcc); reds->agent_dev->priv->client_agent_started = TRUE; /* * Note that in older releases, send_tokens were set to ~0 on both client @@ -1060,11 +1062,11 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t * and vice versa, the sending from the server to the client won't have * flow control, but will have no other problem. */ - if (!red_char_device_client_exists(dev_state, rcc->client)) { + if (!red_char_device_client_exists(dev_state, client)) { int client_added; client_added = red_char_device_client_add(dev_state, - rcc->client, + client, TRUE, /* flow control */ REDS_VDI_PORT_NUM_RECEIVE_BUFFS, REDS_AGENT_WINDOW_SIZE, @@ -1078,7 +1080,7 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t } } else { red_char_device_send_to_client_tokens_set(dev_state, - rcc->client, + client, num_tokens); } @@ -1090,12 +1092,13 @@ void reds_on_main_agent_start(RedsState *reds, MainChannelClient *mcc, uint32_t void reds_on_main_agent_tokens(RedsState *reds, MainChannelClient *mcc, uint32_t num_tokens) { + RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc)); if (!reds->vdagent) { return; } spice_assert(reds->vdagent->st); red_char_device_send_to_client_tokens_add(reds->vdagent->st, - main_channel_client_get_base(mcc)->client, + client, num_tokens); } @@ -1116,7 +1119,7 @@ uint8_t *reds_get_agent_data_buffer(RedsState *reds, MainChannelClient *mcc, siz } spice_assert(dev->priv->recv_from_client_buf == NULL); - client = main_channel_client_get_base(mcc)->client; + client = red_channel_client_get_client(main_channel_client_get_base(mcc)); dev->priv->recv_from_client_buf = red_char_device_write_buffer_get(RED_CHAR_DEVICE(dev), client, size + sizeof(VDIChunkHeader)); @@ -1481,9 +1484,9 @@ int reds_handle_migrate_data(RedsState *reds, MainChannelClient *mcc, } else { spice_debug("agent was not attached on the source host"); if (reds->vdagent) { + RedClient *client = red_channel_client_get_client(main_channel_client_get_base(mcc)); /* red_char_device_client_remove disables waiting for migration data */ - red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev), - main_channel_client_get_base(mcc)->client); + red_char_device_client_remove(RED_CHAR_DEVICE(agent_dev), client); main_channel_push_agent_connected(reds->main_channel); } } @@ -1946,10 +1949,11 @@ int reds_on_migrate_dst_set_seamless(RedsState *reds, MainChannelClient *mcc, ui reds->dst_do_seamless_migrate = FALSE; } else { RedChannelClient *rcc = main_channel_client_get_base(mcc); + RedClient *client = red_channel_client_get_client(rcc); - red_client_set_migration_seamless(rcc->client); + red_client_set_migration_seamless(client); /* linking all the channels that have been connected before migration handshake */ - reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, rcc->client); + reds->dst_do_seamless_migrate = reds_link_mig_target_channels(reds, client); } return reds->dst_do_seamless_migrate; } diff --git a/server/smartcard.c b/server/smartcard.c index a42bcd8..e544f19 100644 --- a/server/smartcard.c +++ b/server/smartcard.c @@ -30,7 +30,7 @@ #include "reds.h" #include "char-device.h" -#include "red-channel.h" +#include "red-channel-client-private.h" #include "smartcard.h" #include "migration-protocol.h" diff --git a/server/sound.c b/server/sound.c index d790c7a..05ce9b5 100644 --- a/server/sound.c +++ b/server/sound.c @@ -35,6 +35,7 @@ #include "main-channel-client.h" #include "reds.h" #include "red-qxl.h" +#include "red-channel-client-private.h" #include "sound.h" #include "common/snd_codec.h" #include "demarshallers.h" @@ -213,14 +214,16 @@ static void snd_disconnect_channel(SndChannel *channel) { SndWorker *worker; RedsState *reds; + RedChannel *red_channel; if (!channel || !channel->stream) { spice_debug("not connected"); return; } + red_channel = red_channel_client_get_channel(channel->channel_client); reds = snd_channel_get_server(channel); spice_debug("SndChannel=%p rcc=%p type=%d", - channel, channel->channel_client, channel->channel_client->channel->type); + channel, channel->channel_client, red_channel->type); worker = channel->worker; channel->cleanup(channel); red_channel_client_disconnect(worker->connection->channel_client); @@ -423,12 +426,13 @@ static int snd_record_handle_message(SndChannel *channel, size_t size, uint32_t static void snd_receive(SndChannel *channel) { SpiceDataHeaderOpaque *header; + IncomingHandler *incoming = red_channel_client_get_incoming_handler(channel->channel_client); if (!channel) { return; } - header = &channel->channel_client->incoming.header; + header = &incoming->header; for (;;) { ssize_t n; diff --git a/server/spicevmc.c b/server/spicevmc.c index dffdd9a..bfd93c1 100644 --- a/server/spicevmc.c +++ b/server/spicevmc.c @@ -34,6 +34,8 @@ #include "red-channel.h" #include "reds.h" #include "migration-protocol.h" +/* FIXME: only included for sizeof RedChannelClient */ +#include "red-channel-client-private.h" /* todo: add flow control. i.e., * (a) limit the tokens available for the client @@ -147,14 +149,15 @@ static void spicevmc_chardev_send_msg_to_client(RedPipeItem *msg, SpiceVmcState *state = opaque; RedVmcPipeItem *vmc_msg = (RedVmcPipeItem *)msg; - spice_assert(state->rcc->client == client); + spice_assert(red_channel_client_get_client(state->rcc) == client); red_pipe_item_ref(vmc_msg); red_channel_client_pipe_add_push(state->rcc, (RedPipeItem *)vmc_msg); } static void spicevmc_port_send_init(RedChannelClient *rcc) { - SpiceVmcState *state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + RedChannel *channel = red_channel_client_get_channel(rcc); + SpiceVmcState *state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel); SpiceCharDeviceInstance *sin = state->chardev_sin; RedPortInitPipeItem *item = spice_malloc(sizeof(RedPortInitPipeItem)); @@ -185,7 +188,8 @@ static void spicevmc_char_dev_remove_client(RedClient *client, void *opaque) SpiceVmcState *state = opaque; spice_printerr("vmc state %p, client %p", state, client); - spice_assert(state->rcc && state->rcc->client == client); + spice_assert(state->rcc && + red_channel_client_get_client(state->rcc) == client); red_channel_client_shutdown(state->rcc); } @@ -194,8 +198,9 @@ static int spicevmc_red_channel_client_config_socket(RedChannelClient *rcc) { int delay_val = 1; RedsStream *stream = red_channel_client_get_stream(rcc); + RedChannel *channel = red_channel_client_get_channel(rcc); - if (rcc->channel->type == SPICE_CHANNEL_USBREDIR) { + if (channel->type == SPICE_CHANNEL_USBREDIR) { if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, sizeof(delay_val)) != 0) { if (errno != ENOTSUP && errno != ENOPROTOOPT) { @@ -212,12 +217,14 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc) { SpiceVmcState *state; SpiceCharDeviceInterface *sif; + RedChannel *channel = red_channel_client_get_channel(rcc); + RedClient *client = red_channel_client_get_client(rcc); if (!rcc) { return; } - state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel); if (state->recv_from_client_buf) { /* partial message which wasn't pushed to device */ red_char_device_write_buffer_release(state->chardev, state->recv_from_client_buf); @@ -225,17 +232,17 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc) } if (state->chardev) { - if (red_char_device_client_exists(state->chardev, rcc->client)) { - red_char_device_client_remove(state->chardev, rcc->client); + if (red_char_device_client_exists(state->chardev, client)) { + red_char_device_client_remove(state->chardev, client); } else { spice_printerr("client %p have already been removed from char dev %p", - rcc->client, state->chardev); + client, state->chardev); } } /* Don't destroy the rcc if it is already being destroyed, as then red_client_destroy/red_channel_client_destroy will already do this! */ - if (!rcc->destroying) + if (!red_channel_client_is_destroying(rcc)) red_channel_client_destroy(rcc); state->rcc = NULL; @@ -247,7 +254,8 @@ static void spicevmc_red_channel_client_on_disconnect(RedChannelClient *rcc) static SpiceVmcState *spicevmc_red_channel_client_get_state(RedChannelClient *rcc) { - return SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + RedChannel *channel = red_channel_client_get_channel(rcc); + return SPICE_CONTAINEROF(channel, SpiceVmcState, channel); } static int spicevmc_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc) @@ -316,15 +324,17 @@ static uint8_t *spicevmc_red_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, uint32_t size) { SpiceVmcState *state; + RedChannel *channel = red_channel_client_get_channel(rcc); + RedClient *client = red_channel_client_get_client(rcc); - state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel); switch (type) { case SPICE_MSGC_SPICEVMC_DATA: assert(!state->recv_from_client_buf); state->recv_from_client_buf = red_char_device_write_buffer_get(state->chardev, - rcc->client, + client, size); if (!state->recv_from_client_buf) { spice_error("failed to allocate write buffer"); @@ -344,8 +354,9 @@ static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc, uint8_t *msg) { SpiceVmcState *state; + RedChannel *channel = red_channel_client_get_channel(rcc); - state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel); switch (type) { case SPICE_MSGC_SPICEVMC_DATA: @@ -380,8 +391,9 @@ static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc, RedPipeItem *item) { SpiceVmcState *state; + RedChannel *channel = red_channel_client_get_channel(rcc); - state = SPICE_CONTAINEROF(rcc->channel, SpiceVmcState, channel); + state = SPICE_CONTAINEROF(channel, SpiceVmcState, channel); red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item); spice_marshaller_add_uint32(m, SPICE_MIGRATE_DATA_SPICEVMC_MAGIC); spice_marshaller_add_uint32(m, SPICE_MIGRATE_DATA_SPICEVMC_VERSION); diff --git a/server/stream.c b/server/stream.c index ee3c0b0..f98177e 100644 --- a/server/stream.c +++ b/server/stream.c @@ -628,7 +628,7 @@ static uint64_t get_initial_bit_rate(DisplayChannelClient *dcc, Stream *stream) MainChannelClient *mcc; uint64_t net_test_bit_rate; - mcc = red_client_get_main(RED_CHANNEL_CLIENT(dcc)->client); + mcc = red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc))); net_test_bit_rate = main_channel_client_is_network_info_initialized(mcc) ? main_channel_client_get_bitrate_per_sec(mcc) : 0; @@ -660,7 +660,8 @@ static uint32_t get_roundtrip_ms(void *opaque) roundtrip = red_channel_client_get_roundtrip_ms(RED_CHANNEL_CLIENT(agent->dcc)); if (roundtrip < 0) { - MainChannelClient *mcc = red_client_get_main(RED_CHANNEL_CLIENT(agent->dcc)->client); + MainChannelClient *mcc = + red_client_get_main(red_channel_client_get_client(RED_CHANNEL_CLIENT(agent->dcc))); /* * the main channel client roundtrip might not have been @@ -684,7 +685,9 @@ static void update_client_playback_delay(void *opaque, uint32_t delay_ms) { StreamAgent *agent = opaque; DisplayChannelClient *dcc = agent->dcc; - RedsState *reds = red_channel_get_server(((RedChannelClient*)dcc)->channel); + RedChannel *channel = red_channel_client_get_channel(RED_CHANNEL_CLIENT(dcc)); + RedClient *client = red_channel_client_get_client(RED_CHANNEL_CLIENT(dcc)); + RedsState *reds = red_channel_get_server(channel); dcc_update_streams_max_latency(dcc, agent); @@ -694,7 +697,7 @@ static void update_client_playback_delay(void *opaque, uint32_t delay_ms) } spice_debug("resetting client latency: %u", dcc_get_max_stream_latency(agent->dcc)); main_dispatcher_set_mm_time_latency(reds_get_main_dispatcher(reds), - RED_CHANNEL_CLIENT(agent->dcc)->client, + client, dcc_get_max_stream_latency(agent->dcc)); } -- 2.4.11 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel