Hi, Did not see any issue. Small comments below. On Wed, Aug 31, 2016 at 11:54:38AM -0500, Jonathon Jongsma wrote: > Reduce direct access to RedChannelClient, and get ready to convert to > GObject. > --- > server/Makefile.am | 2 + > server/cursor-channel.c | 2 + > server/dcc-private.h | 1 + > server/inputs-channel-client.c | 1 + > server/inputs-channel.c | 1 + > server/main-channel-client.c | 2 + > server/main-channel.c | 1 + > server/red-channel-client.c | 1626 ++++++++++++++++++++++++++++++++++++ > server/red-channel-client.h | 253 ++++++ > server/red-channel.c | 1809 +++------------------------------------- > server/red-channel.h | 207 +---- > server/red-worker.h | 1 + > server/reds.c | 1 + > server/smartcard.c | 2 +- > server/sound.c | 1 + > server/spicevmc.c | 1 + > 16 files changed, 2018 insertions(+), 1893 deletions(-) > create mode 100644 server/red-channel-client.c > create mode 100644 server/red-channel-client.h > > diff --git a/server/Makefile.am b/server/Makefile.am > index 24e6e21..e275765 100644 > --- a/server/Makefile.am > +++ b/server/Makefile.am > @@ -95,6 +95,8 @@ libserver_la_SOURCES = \ > mjpeg-encoder.c \ > red-channel.c \ > red-channel.h \ > + red-channel-client.c \ > + red-channel-client.h \ Indentation > red-common.h \ > dispatcher.c \ > dispatcher.h \ > diff --git a/server/cursor-channel.c b/server/cursor-channel.c > index cb3aa49..290f763 100644 > --- a/server/cursor-channel.c > +++ b/server/cursor-channel.c > @@ -21,7 +21,9 @@ > > #include <glib.h> > #include <common/generated_server_marshallers.h> > + > #include "cursor-channel.h" > +#include "red-channel-client.h" > #include "cache-item.h" > > #define CLIENT_CURSOR_CACHE_SIZE 256 > diff --git a/server/dcc-private.h b/server/dcc-private.h > index 02b51dd..d5aad3f 100644 > --- a/server/dcc-private.h > +++ b/server/dcc-private.h > @@ -22,6 +22,7 @@ > #include "dcc.h" > #include "image-encoders.h" > #include "stream.h" > +#include "red-channel-client.h" > > struct DisplayChannelClient { > RedChannelClient base; > diff --git a/server/inputs-channel-client.c b/server/inputs-channel-client.c > index bdbcf5c..ce21b9c 100644 > --- a/server/inputs-channel-client.c > +++ b/server/inputs-channel-client.c > @@ -21,6 +21,7 @@ > #include "inputs-channel-client.h" > #include "inputs-channel.h" > #include "migration-protocol.h" > +#include "red-channel-client.h" > > struct InputsChannelClient { > RedChannelClient base; > diff --git a/server/inputs-channel.c b/server/inputs-channel.c > index c28273a..da0f027 100644 > --- a/server/inputs-channel.c > +++ b/server/inputs-channel.c > @@ -39,6 +39,7 @@ > #include "reds.h" > #include "reds-stream.h" > #include "red-channel.h" > +#include "red-channel-client.h" > #include "inputs-channel-client.h" > #include "main-channel-client.h" > #include "inputs-channel.h" > diff --git a/server/main-channel-client.c b/server/main-channel-client.c > index 12151a7..bd339d0 100644 > --- a/server/main-channel-client.c > +++ b/server/main-channel-client.c > @@ -23,7 +23,9 @@ > #include "main-channel-client.h" > #include <common/generated_server_marshallers.h> > > +#include "main-channel-client.h" > #include "main-channel.h" > +#include "red-channel-client.h" > #include "reds.h" > > #define NET_TEST_WARMUP_BYTES 0 > diff --git a/server/main-channel.c b/server/main-channel.c > index 8bb874b..24c69a4 100644 > --- a/server/main-channel.c > +++ b/server/main-channel.c > @@ -24,6 +24,7 @@ > #include "red-common.h" > #include "main-channel.h" > #include "reds.h" > +#include "red-channel-client.h" > > int main_channel_is_connected(MainChannel *main_chan) > { > diff --git a/server/red-channel-client.c b/server/red-channel-client.c > new file mode 100644 > index 0000000..253e30e > --- /dev/null > +++ b/server/red-channel-client.c > @@ -0,0 +1,1626 @@ > +/* > + Copyright (C) 2009-2015 Red Hat, Inc. -2016 ? > + > + This library is free software; you can redistribute it and/or > + modify it under the terms of the GNU Lesser General Public > + License as published by the Free Software Foundation; either > + version 2.1 of the License, or (at your option) any later version. > + > + This library is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with this library; if not, see <http://www.gnu.org/licenses/>. > +*/ > + > +#ifdef HAVE_CONFIG_H > +#include <config.h> > +#endif > + > +#include <glib.h> > +#include <stdio.h> > +#include <stdint.h> > +#include <netinet/in.h> > +#include <netinet/tcp.h> > +#include <fcntl.h> > +#include <unistd.h> > +#include <errno.h> > +#include <sys/ioctl.h> > +#ifdef HAVE_LINUX_SOCKIOS_H > +#include <linux/sockios.h> /* SIOCOUTQ */ > +#endif > +#include <common/generated_server_marshallers.h> > + > +#include "red-channel-client.h" > +#include "red-channel.h" > + > +#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) > +#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) > + > +enum QosPingState { > + PING_STATE_NONE, > + PING_STATE_TIMER, > + PING_STATE_WARMUP, > + PING_STATE_LATENCY, > +}; > + > +enum ConnectivityState { > + CONNECTIVITY_STATE_CONNECTED, > + CONNECTIVITY_STATE_BLOCKED, > + CONNECTIVITY_STATE_WAIT_PONG, > + CONNECTIVITY_STATE_DISCONNECTED, > +}; > + > +typedef struct RedEmptyMsgPipeItem { > + RedPipeItem base; > + int msg; > +} RedEmptyMsgPipeItem; > + > +typedef struct MarkerPipeItem { > + RedPipeItem base; > + gboolean *item_in_pipe; > +} MarkerPipeItem; > + > +static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) > +{ > + if (!rcc->latency_monitor.timer) { > + return; > + } > + if (rcc->latency_monitor.state != PING_STATE_NONE) { > + return; > + } > + rcc->latency_monitor.state = PING_STATE_TIMER; > + rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); > +} > + > +static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) > +{ > + if (!rcc->latency_monitor.timer) { > + return; > + } > + if (rcc->latency_monitor.state != PING_STATE_TIMER) { > + return; > + } > + > + rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); > + rcc->latency_monitor.state = PING_STATE_NONE; > +} > + > +static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) > +{ > + uint64_t passed, timeout; > + > + passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; > + timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; > + if (passed < PING_TEST_TIMEOUT_MS) { > + timeout += PING_TEST_TIMEOUT_MS - passed; > + } > + > + red_channel_client_start_ping_timer(rcc, timeout); > +} > + > +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc) > +{ > + return rcc->channel; > +} > + > +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc) /* FIXME: Never used */ ? > +{ > + return &rcc->incoming; > +} > + > +void red_channel_client_on_output(void *opaque, int n) > +{ > + RedChannelClient *rcc = opaque; > + > + if (rcc->connectivity_monitor.timer) { > + rcc->connectivity_monitor.out_bytes += n; > + } > + stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); > +} > + > +void red_channel_client_on_input(void *opaque, int n) > +{ > + RedChannelClient *rcc = opaque; > + > + if (rcc->connectivity_monitor.timer) { > + rcc->connectivity_monitor.in_bytes += n; > + } > +} > + > +int red_channel_client_get_out_msg_size(void *opaque) > +{ > + RedChannelClient *rcc = (RedChannelClient *)opaque; > + > + return rcc->send_data.size; > +} > + > +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, > + int *vec_size, int pos) > +{ > + RedChannelClient *rcc = (RedChannelClient *)opaque; > + > + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, > + vec, IOV_MAX, pos); > +} > + > +void red_channel_client_on_out_block(void *opaque) > +{ > + RedChannelClient *rcc = (RedChannelClient *)opaque; > + > + rcc->send_data.blocked = TRUE; > + rcc->channel->core->watch_update_mask(rcc->stream->watch, > + SPICE_WATCH_EVENT_READ | > + SPICE_WATCH_EVENT_WRITE); > +} > + > +static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) > +{ > + return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); > +} > + > +static void red_channel_client_reset_send_data(RedChannelClient *rcc) > +{ > + spice_marshaller_reset(rcc->send_data.marshaller); > + rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, > + rcc->send_data.header.header_size); > + spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); > + rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); > + rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); > + > + /* Keeping the serial consecutive: resetting it if reset_send_data > + * has been called before, but no message has been sent since then. > + */ > + if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { > + spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); > + /* When the urgent marshaller is active, the serial was incremented by > + * the call to reset_send_data that was made for the main marshaller. > + * The urgent msg receives this serial, and the main msg serial is > + * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) > + * should be 1 in this case*/ > + if (!red_channel_client_urgent_marshaller_is_active(rcc)) { > + rcc->send_data.serial = rcc->send_data.last_sent_serial; > + } > + } > + rcc->send_data.serial++; > + > + if (!rcc->is_mini_header) { > + spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); > + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); > + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); > + } > +} > + > +static void red_channel_client_send_set_ack(RedChannelClient *rcc) > +{ > + SpiceMsgSetAck ack; > + > + spice_assert(rcc); > + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); > + ack.generation = ++rcc->ack_data.generation; > + ack.window = rcc->ack_data.client_window; > + rcc->ack_data.messages_window = 0; > + > + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); > + > + red_channel_client_begin_send_message(rcc); > +} > + > +static void red_channel_client_send_migrate(RedChannelClient *rcc) > +{ > + SpiceMsgMigrate migrate; > + > + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); > + migrate.flags = rcc->channel->migration_flags; > + spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); > + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { > + rcc->wait_migrate_flush_mark = TRUE; > + } > + > + red_channel_client_begin_send_message(rcc); > +} > + > +static void red_channel_client_send_ping(RedChannelClient *rcc) > +{ > + SpiceMsgPing ping; > + > + if (!rcc->latency_monitor.warmup_was_sent) { // latency test start > + int delay_val; > + socklen_t opt_size = sizeof(delay_val); > + > + rcc->latency_monitor.warmup_was_sent = TRUE; > + /* > + * When testing latency, TCP_NODELAY must be switched on, otherwise, > + * sending the ping message is delayed by Nagle algorithm, and the > + * roundtrip measurment is less accurate (bigger). > + */ > + rcc->latency_monitor.tcp_nodelay = 1; > + if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > + &opt_size) == -1) { > + spice_warning("getsockopt failed, %s", strerror(errno)); > + } else { > + rcc->latency_monitor.tcp_nodelay = delay_val; > + if (!delay_val) { > + delay_val = 1; > + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > + sizeof(delay_val)) == -1) { > + if (errno != ENOTSUP) { > + spice_warning("setsockopt failed, %s", strerror(errno)); > + } > + } > + } > + } > + } > + > + red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); > + ping.id = rcc->latency_monitor.id; > + ping.timestamp = spice_get_monotonic_time_ns(); > + spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); > + red_channel_client_begin_send_message(rcc); > +} > + > +static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) > +{ > + RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base); > + > + red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); > + red_channel_client_begin_send_message(rcc); > +} > + > +static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) > +{ > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > + red_channel_client_reset_send_data(rcc); > + switch (item->type) { > + case RED_PIPE_ITEM_TYPE_SET_ACK: > + red_channel_client_send_set_ack(rcc); > + break; > + case RED_PIPE_ITEM_TYPE_MIGRATE: > + red_channel_client_send_migrate(rcc); > + break; > + case RED_PIPE_ITEM_TYPE_EMPTY_MSG: > + red_channel_client_send_empty_msg(rcc, item); > + break; > + case RED_PIPE_ITEM_TYPE_PING: > + red_channel_client_send_ping(rcc); > + break; > + case RED_PIPE_ITEM_TYPE_MARKER: > + break; > + default: > + rcc->channel->channel_cbs.send_item(rcc, item); > + break; > + } > + red_pipe_item_unref(item); > +} > + > +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) > +{ > + if (rcc->send_data.item) { > + red_pipe_item_unref(rcc->send_data.item); > + rcc->send_data.item = NULL; > + } > +} > + > +static void red_channel_client_restore_main_sender(RedChannelClient *rcc) > +{ > + spice_marshaller_reset(rcc->send_data.urgent.marshaller); > + rcc->send_data.marshaller = rcc->send_data.main.marshaller; > + rcc->send_data.header.data = rcc->send_data.main.header_data; > + if (!rcc->is_mini_header) { > + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); > + } > + rcc->send_data.item = rcc->send_data.main.item; > +} > + > +void red_channel_client_on_out_msg_done(void *opaque) > +{ > + RedChannelClient *rcc = (RedChannelClient *)opaque; > + int fd; > + > + rcc->send_data.size = 0; > + > + if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { > + if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { > + perror("sendfd"); > + red_channel_client_disconnect(rcc); > + if (fd != -1) > + close(fd); > + return; > + } > + if (fd != -1) > + close(fd); > + } > + > + red_channel_client_release_sent_item(rcc); > + if (rcc->send_data.blocked) { > + rcc->send_data.blocked = FALSE; > + rcc->channel->core->watch_update_mask(rcc->stream->watch, > + SPICE_WATCH_EVENT_READ); > + } > + > + if (red_channel_client_urgent_marshaller_is_active(rcc)) { > + red_channel_client_restore_main_sender(rcc); > + spice_assert(rcc->send_data.header.data != NULL); > + red_channel_client_begin_send_message(rcc); > + } else { > + if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { > + /* It is possible that the socket will become idle, so we may be able to test latency */ > + red_channel_client_restart_ping_timer(rcc); > + } > + } > + > +} > + > +static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) > +{ > + rcc->pipe_size--; > + ring_remove(&item->link); > +} > + > +static void red_channel_client_set_remote_caps(RedChannelClient* rcc, > + int num_common_caps, uint32_t *common_caps, > + int num_caps, uint32_t *caps) > +{ > + rcc->remote_caps.num_common_caps = num_common_caps; > + rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); > + > + rcc->remote_caps.num_caps = num_caps; > + rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); > +} > + > +static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) > +{ > + rcc->remote_caps.num_common_caps = 0; > + free(rcc->remote_caps.common_caps); > + rcc->remote_caps.num_caps = 0; > + free(rcc->remote_caps.caps); > +} > + > +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) > +{ > + return test_capability(rcc->remote_caps.common_caps, > + rcc->remote_caps.num_common_caps, > + cap); > +} > + > +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) > +{ > + return test_capability(rcc->remote_caps.caps, > + rcc->remote_caps.num_caps, > + cap); > +} > + > +static void red_channel_client_push_ping(RedChannelClient *rcc) > +{ > + spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); > + rcc->latency_monitor.state = PING_STATE_WARMUP; > + rcc->latency_monitor.warmup_was_sent = FALSE; > + rcc->latency_monitor.id = rand(); > + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); > + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); > +} > + > +static void red_channel_client_ping_timer(void *opaque) > +{ > + RedChannelClient *rcc = opaque; > + > + spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); > + red_channel_client_cancel_ping_timer(rcc); > + > +#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ > + { > + int so_unsent_size = 0; > + > + /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ > + if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { > + spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); > + } > + if (so_unsent_size > 0) { > + /* tcp snd buffer is still occupied. rescheduling ping */ > + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > + } else { > + red_channel_client_push_ping(rcc); > + } > + } > +#else /* ifdef HAVE_LINUX_SOCKIOS_H */ > + /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ > + red_channel_client_push_ping(rcc); > +#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ > +} > + > +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) > +{ > + return (rcc->channel->handle_acks && > + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); > +} > + > +/* > + * When a connection is not alive (and we can't detect it via a socket error), we > + * reach one of these 2 states: > + * (1) Sending msgs is blocked: either writes return EAGAIN > + * or we are missing MSGC_ACK from the client. > + * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. > + * > + * The connectivity_timer callback tests if the channel's state matches one of the above. > + * In case it does, on the next time the timer is called, it checks if the connection has > + * been idle during the time that passed since the previous timer call. If the connection > + * has been idle, we consider the client as disconnected. > + */ > +static void red_channel_client_connectivity_timer(void *opaque) > +{ > + RedChannelClient *rcc = opaque; > + RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; > + int is_alive = TRUE; > + > + if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { > + if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { > + if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { > + spice_error("mismatch between rcc-state and connectivity-state"); > + } > + spice_debug("rcc is blocked; connection is idle"); > + is_alive = FALSE; > + } > + } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { > + if (monitor->in_bytes == 0) { > + if (rcc->latency_monitor.state != PING_STATE_WARMUP && > + rcc->latency_monitor.state != PING_STATE_LATENCY) { > + spice_error("mismatch between rcc-state and connectivity-state"); > + } > + spice_debug("rcc waits for pong; connection is idle"); > + is_alive = FALSE; > + } > + } > + > + if (is_alive) { > + monitor->in_bytes = 0; > + monitor->out_bytes = 0; > + if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { > + monitor->state = CONNECTIVITY_STATE_BLOCKED; > + } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || > + rcc->latency_monitor.state == PING_STATE_LATENCY) { > + monitor->state = CONNECTIVITY_STATE_WAIT_PONG; > + } else { > + monitor->state = CONNECTIVITY_STATE_CONNECTED; > + } > + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > + rcc->connectivity_monitor.timeout); > + } else { > + monitor->state = CONNECTIVITY_STATE_DISCONNECTED; > + spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", > + rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); > + red_channel_client_disconnect(rcc); > + } > +} > + > +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) > +{ > + if (!red_channel_client_is_connected(rcc)) { > + return; > + } > + spice_debug(NULL); > + spice_assert(timeout_ms > 0); > + /* > + * If latency_monitor is not active, we activate it in order to enable > + * periodic ping messages so that we will be be able to identify a disconnected > + * channel-client even if there are no ongoing channel specific messages > + * on this channel. > + */ > + if (rcc->latency_monitor.timer == NULL) { > + rcc->latency_monitor.timer = rcc->channel->core->timer_add( > + rcc->channel->core, red_channel_client_ping_timer, rcc); > + if (!red_client_during_migrate_at_target(rcc->client)) { > + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > + } > + rcc->latency_monitor.roundtrip = -1; > + } > + if (rcc->connectivity_monitor.timer == NULL) { > + rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; > + rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( > + rcc->channel->core, red_channel_client_connectivity_timer, rcc); > + rcc->connectivity_monitor.timeout = timeout_ms; > + if (!red_client_during_migrate_at_target(rcc->client)) { > + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > + rcc->connectivity_monitor.timeout); > + } > + } > +} > + > +static void red_channel_client_event(int fd, int event, void *data) > +{ > + RedChannelClient *rcc = (RedChannelClient *)data; > + > + red_channel_client_ref(rcc); > + if (event & SPICE_WATCH_EVENT_READ) { > + red_channel_client_receive(rcc); > + } > + if (event & SPICE_WATCH_EVENT_WRITE) { > + red_channel_client_push(rcc); > + } > + red_channel_client_unref(rcc); > +} > + > +static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) > +{ > + return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); > +} > + > +static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) > +{ > + return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); > +} > + > +static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) > +{ > + return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); > +} > + > +static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) > +{ > + return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); > +} > + > +static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) > +{ > + ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); > +} > + > +static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) > +{ > + ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); > +} > + > +static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) > +{ > + ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); > +} > + > +static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) > +{ > + ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); > +} > + > +static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) > +{ > + ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); > +} > + > +static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) > +{ > + spice_error("attempt to set header serial on mini header"); > +} > + > +static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) > +{ > + ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); > +} > + > +static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) > +{ > + spice_error("attempt to set header sub list on mini header"); > +} > + > +static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), > + full_header_set_msg_type, > + full_header_set_msg_size, > + full_header_set_msg_serial, > + full_header_set_msg_sub_list, > + full_header_get_msg_type, > + full_header_get_msg_size}; > + > +static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), > + mini_header_set_msg_type, > + mini_header_set_msg_size, > + mini_header_set_msg_serial, > + mini_header_set_msg_sub_list, > + mini_header_get_msg_type, > + mini_header_get_msg_size}; > + > +static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) > +{ > + if (red_client_get_channel(client, channel->type, channel->id)) { > + spice_printerr("Error client %p: duplicate channel type %d id %d", > + client, channel->type, channel->id); > + return FALSE; > + } > + return TRUE; > +} > + > +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, > + RedsStream *stream, > + int monitor_latency, > + int num_common_caps, uint32_t *common_caps, > + int num_caps, uint32_t *caps) > +{ > + RedChannelClient *rcc = NULL; > + > + pthread_mutex_lock(&client->lock); > + if (!red_channel_client_pre_create_validate(channel, client)) { > + goto error; > + } > + spice_assert(stream && channel && size >= sizeof(RedChannelClient)); > + rcc = spice_malloc0(size); > + rcc->stream = stream; > + rcc->channel = channel; > + rcc->client = client; > + rcc->refs = 1; > + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + > + // block flags) > + rcc->ack_data.client_generation = ~0; > + rcc->ack_data.client_window = CLIENT_ACK_WINDOW; > + rcc->send_data.main.marshaller = spice_marshaller_new(); > + rcc->send_data.urgent.marshaller = spice_marshaller_new(); > + > + rcc->send_data.marshaller = rcc->send_data.main.marshaller; > + > + rcc->incoming.opaque = rcc; > + rcc->incoming.cb = &channel->incoming_cb; > + > + rcc->outgoing.opaque = rcc; > + rcc->outgoing.cb = &channel->outgoing_cb; > + rcc->outgoing.pos = 0; > + rcc->outgoing.size = 0; > + > + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); > + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { > + rcc->incoming.header = mini_header_wrapper; > + rcc->send_data.header = mini_header_wrapper; > + rcc->is_mini_header = TRUE; > + } else { > + rcc->incoming.header = full_header_wrapper; > + rcc->send_data.header = full_header_wrapper; > + rcc->is_mini_header = FALSE; > + } > + > + rcc->incoming.header.data = rcc->incoming.header_buf; > + rcc->incoming.serial = 1; > + > + if (!channel->channel_cbs.config_socket(rcc)) { > + goto error; > + } > + > + ring_init(&rcc->pipe); > + rcc->pipe_size = 0; > + > + stream->watch = channel->core->watch_add(channel->core, > + stream->socket, > + SPICE_WATCH_EVENT_READ, > + red_channel_client_event, rcc); > + rcc->id = g_list_length(channel->clients); > + red_channel_add_client(channel, rcc); > + red_client_add_channel(client, rcc); > + red_channel_ref(channel); > + pthread_mutex_unlock(&client->lock); > + > + if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { > + rcc->latency_monitor.timer = channel->core->timer_add( > + channel->core, red_channel_client_ping_timer, rcc); > + if (!client->during_target_migrate) { > + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > + } > + rcc->latency_monitor.roundtrip = -1; > + } > + > + return rcc; > +error: > + free(rcc); > + reds_stream_free(stream); > + pthread_mutex_unlock(&client->lock); > + return NULL; > +} > + > +RedChannelClient *red_channel_client_create_dummy(int size, > + RedChannel *channel, > + RedClient *client, > + int num_common_caps, uint32_t *common_caps, > + int num_caps, uint32_t *caps) > +{ > + RedChannelClient *rcc = NULL; > + > + spice_assert(size >= sizeof(RedChannelClient)); > + > + pthread_mutex_lock(&client->lock); > + if (!red_channel_client_pre_create_validate(channel, client)) { > + goto error; > + } > + rcc = spice_malloc0(size); > + rcc->refs = 1; > + rcc->client = client; > + rcc->channel = channel; > + red_channel_ref(channel); > + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); > + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { > + rcc->incoming.header = mini_header_wrapper; > + rcc->send_data.header = mini_header_wrapper; > + rcc->is_mini_header = TRUE; > + } else { > + rcc->incoming.header = full_header_wrapper; > + rcc->send_data.header = full_header_wrapper; > + rcc->is_mini_header = FALSE; > + } > + > + rcc->incoming.header.data = rcc->incoming.header_buf; > + rcc->incoming.serial = 1; > + ring_init(&rcc->pipe); > + > + rcc->dummy = TRUE; > + rcc->dummy_connected = TRUE; > + red_channel_add_client(channel, rcc); > + red_client_add_channel(client, rcc); > + pthread_mutex_unlock(&client->lock); > + return rcc; > +error: > + pthread_mutex_unlock(&client->lock); > + return NULL; > +} > + > +void red_channel_client_seamless_migration_done(RedChannelClient *rcc) Can be static as it is only used on red_channel_handle_migrate_data() > +{ > + rcc->wait_migrate_data = FALSE; > + > + if (red_client_seamless_migration_done_for_channel(rcc->client)) { > + if (rcc->latency_monitor.timer) { > + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > + } > + if (rcc->connectivity_monitor.timer) { > + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > + rcc->connectivity_monitor.timeout); > + } > + } > +} > + > +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc) > +{ > + if (rcc->latency_monitor.timer) { > + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > + } > +} > + > +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) > +{ > + return rcc->wait_migrate_data; > +} > + > +void red_channel_client_default_migrate(RedChannelClient *rcc) > +{ > + if (rcc->latency_monitor.timer) { > + red_channel_client_cancel_ping_timer(rcc); > + rcc->channel->core->timer_remove(rcc->latency_monitor.timer); > + rcc->latency_monitor.timer = NULL; > + } > + if (rcc->connectivity_monitor.timer) { > + rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); > + rcc->connectivity_monitor.timer = NULL; > + } > + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); > +} > + > +void red_channel_client_ref(RedChannelClient *rcc) > +{ > + rcc->refs++; > +} > + > +void red_channel_client_unref(RedChannelClient *rcc) > +{ > + if (--rcc->refs != 0) { > + return; > + } > + > + spice_debug("destroy rcc=%p", rcc); > + > + reds_stream_free(rcc->stream); > + rcc->stream = NULL; > + > + if (rcc->send_data.main.marshaller) { > + spice_marshaller_destroy(rcc->send_data.main.marshaller); > + } > + > + if (rcc->send_data.urgent.marshaller) { > + spice_marshaller_destroy(rcc->send_data.urgent.marshaller); > + } > + > + red_channel_client_destroy_remote_caps(rcc); > + if (rcc->channel) { > + red_channel_unref(rcc->channel); > + } > + free(rcc); > +} > + > +void red_channel_client_destroy(RedChannelClient *rcc) > +{ > + rcc->destroying = TRUE; > + red_channel_client_disconnect(rcc); > + red_client_remove_channel(rcc); > + red_channel_client_unref(rcc); > +} > + > +void red_channel_client_shutdown(RedChannelClient *rcc) > +{ > + if (rcc->stream && !rcc->stream->shutdown) { > + rcc->channel->core->watch_remove(rcc->stream->watch); > + rcc->stream->watch = NULL; > + shutdown(rcc->stream->socket, SHUT_RDWR); > + rcc->stream->shutdown = TRUE; > + } > +} > + > +static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) > +{ > + ssize_t n; > + > + if (!stream) { > + return; > + } > + > + if (handler->size == 0) { > + handler->vec = handler->vec_buf; > + handler->size = handler->cb->get_msg_size(handler->opaque); > + if (!handler->size) { // nothing to be sent > + return; > + } > + } > + > + for (;;) { > + handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); > + n = reds_stream_writev(stream, handler->vec, handler->vec_size); > + if (n == -1) { > + switch (errno) { > + case EAGAIN: > + handler->cb->on_block(handler->opaque); > + return; > + case EINTR: > + continue; > + case EPIPE: > + handler->cb->on_error(handler->opaque); > + return; > + default: > + spice_printerr("%s", strerror(errno)); > + handler->cb->on_error(handler->opaque); > + return; > + } > + } else { > + handler->pos += n; > + handler->cb->on_output(handler->opaque, n); > + if (handler->pos == handler->size) { // finished writing data > + /* reset handler before calling on_msg_done, since it > + * can trigger another call to red_peer_handle_outgoing (when > + * switching from the urgent marshaller to the main one */ > + handler->vec = handler->vec_buf; > + handler->pos = 0; > + handler->size = 0; > + handler->cb->on_msg_done(handler->opaque); > + return; > + } > + } > + } > +} > + > +/* return the number of bytes read. -1 in case of error */ > +static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) > +{ > + uint8_t *pos = buf; > + while (size) { > + int now; > + if (stream->shutdown) { > + return -1; > + } > + now = reds_stream_read(stream, pos, size); > + if (now <= 0) { > + if (now == 0) { > + return -1; > + } > + spice_assert(now == -1); > + if (errno == EAGAIN) { > + break; > + } else if (errno == EINTR) { > + continue; > + } else if (errno == EPIPE) { > + return -1; > + } else { > + spice_printerr("%s", strerror(errno)); > + return -1; > + } > + } else { > + size -= now; > + pos += now; > + } > + } > + return pos - buf; > +} > + > +// TODO: this implementation, as opposed to the old implementation in red_worker, > +// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer > +// arithmetic for the case where a single cb_read could return multiple messages. But > +// this is suboptimal potentially. Profile and consider fixing. > +static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) > +{ > + int bytes_read; > + uint8_t *parsed; > + size_t parsed_size; > + message_destructor_t parsed_free; > + uint16_t msg_type; > + uint32_t msg_size; > + > + /* XXX: This needs further investigation as to the underlying cause, it happened > + * after spicec disconnect (but not with spice-gtk) repeatedly. */ > + if (!stream) { > + return; > + } > + > + for (;;) { > + int ret_handle; > + if (handler->header_pos < handler->header.header_size) { > + bytes_read = red_peer_receive(stream, > + handler->header.data + handler->header_pos, > + handler->header.header_size - handler->header_pos); > + if (bytes_read == -1) { > + handler->cb->on_error(handler->opaque); > + return; > + } > + handler->cb->on_input(handler->opaque, bytes_read); > + handler->header_pos += bytes_read; > + > + if (handler->header_pos != handler->header.header_size) { > + return; > + } > + } > + > + msg_size = handler->header.get_msg_size(&handler->header); > + msg_type = handler->header.get_msg_type(&handler->header); > + if (handler->msg_pos < msg_size) { > + if (!handler->msg) { > + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); > + if (handler->msg == NULL) { > + spice_printerr("ERROR: channel refused to allocate buffer."); > + handler->cb->on_error(handler->opaque); > + return; > + } > + } > + > + bytes_read = red_peer_receive(stream, > + handler->msg + handler->msg_pos, > + msg_size - handler->msg_pos); > + if (bytes_read == -1) { > + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > + handler->cb->on_error(handler->opaque); > + return; > + } > + handler->cb->on_input(handler->opaque, bytes_read); > + handler->msg_pos += bytes_read; > + if (handler->msg_pos != msg_size) { > + return; > + } > + } > + > + if (handler->cb->parser) { > + parsed = handler->cb->parser(handler->msg, > + handler->msg + msg_size, msg_type, > + SPICE_VERSION_MINOR, &parsed_size, &parsed_free); > + if (parsed == NULL) { > + spice_printerr("failed to parse message type %d", msg_type); > + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > + handler->cb->on_error(handler->opaque); > + return; > + } > + ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, > + msg_type, parsed); > + parsed_free(parsed); > + } else { > + ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, > + handler->msg); > + } > + handler->msg_pos = 0; > + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > + handler->msg = NULL; > + handler->header_pos = 0; > + > + if (!ret_handle) { > + handler->cb->on_error(handler->opaque); > + return; > + } > + } > +} > + > +void red_channel_client_receive(RedChannelClient *rcc) > +{ > + red_channel_client_ref(rcc); > + red_peer_handle_incoming(rcc->stream, &rcc->incoming); > + red_channel_client_unref(rcc); > +} > + > +void red_channel_client_send(RedChannelClient *rcc) > +{ > + red_channel_client_ref(rcc); > + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); > + red_channel_client_unref(rcc); > +} > + > +static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) > +{ > + RedPipeItem *item; > + > + if (!rcc || rcc->send_data.blocked > + || red_channel_client_waiting_for_ack(rcc) > + || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { > + return NULL; > + } > + red_channel_client_pipe_remove(rcc, item); > + return item; > +} > + > +void red_channel_client_push(RedChannelClient *rcc) > +{ > + RedPipeItem *pipe_item; > + > + if (!rcc->during_send) { > + rcc->during_send = TRUE; > + } else { > + return; > + } > + red_channel_client_ref(rcc); > + if (rcc->send_data.blocked) { > + red_channel_client_send(rcc); > + } > + > + if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { > + rcc->send_data.blocked = TRUE; > + spice_printerr("ERROR: an item waiting to be sent and not blocked"); > + } > + > + while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { > + red_channel_client_send_item(rcc, pipe_item); > + } > + if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) > + && rcc->stream->watch) { > + rcc->channel->core->watch_update_mask(rcc->stream->watch, > + SPICE_WATCH_EVENT_READ); > + } > + rcc->during_send = FALSE; > + red_channel_client_unref(rcc); > +} > + > +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) > +{ > + if (rcc->latency_monitor.roundtrip < 0) { > + return rcc->latency_monitor.roundtrip; > + } > + return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; > +} > + > +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) > +{ > + rcc->ack_data.messages_window = 0; > + red_channel_client_push(rcc); > +} > + > +static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) > +{ > + uint64_t now; > + > + /* ignoring unexpected pongs, or post-migration pongs for pings that > + * started just before migration */ > + if (ping->id != rcc->latency_monitor.id) { > + spice_warning("ping-id (%u)!= pong-id %u", > + rcc->latency_monitor.id, ping->id); > + return; > + } > + > + now = spice_get_monotonic_time_ns(); > + > + if (rcc->latency_monitor.state == PING_STATE_WARMUP) { > + rcc->latency_monitor.state = PING_STATE_LATENCY; > + return; > + } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { > + spice_warning("unexpected"); > + return; > + } > + > + /* set TCP_NODELAY=0, in case we reverted it for the test*/ > + if (!rcc->latency_monitor.tcp_nodelay) { > + int delay_val = 0; > + > + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > + sizeof(delay_val)) == -1) { > + if (errno != ENOTSUP) { > + spice_warning("setsockopt failed, %s", strerror(errno)); > + } > + } > + } > + > + /* > + * The real network latency shouldn't change during the connection. However, > + * the measurements can be bigger than the real roundtrip due to other > + * threads or processes that are utilizing the network. We update the roundtrip > + * measurement with the minimal value we encountered till now. > + */ > + if (rcc->latency_monitor.roundtrip < 0 || > + now - ping->timestamp < rcc->latency_monitor.roundtrip) { > + rcc->latency_monitor.roundtrip = now - ping->timestamp; > + spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); > + } > + > + rcc->latency_monitor.last_pong_time = now; > + rcc->latency_monitor.state = PING_STATE_NONE; > + red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); > +} > + > +static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) > +{ > + RedChannel *channel = red_channel_client_get_channel(rcc); > + if (channel->channel_cbs.handle_migrate_flush_mark) { > + channel->channel_cbs.handle_migrate_flush_mark(rcc); > + } > +} > + > +// TODO: the whole migration is broken with multiple clients. What do we want to do? > +// basically just > +// 1) source send mark to all > +// 2) source gets at various times the data (waits for all) > +// 3) source migrates to target > +// 4) target sends data to all > +// So need to make all the handlers work with per channel/client data (what data exactly?) > +static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) > +{ > + RedChannel *channel = red_channel_client_get_channel(rcc); > + spice_debug("channel type %d id %d rcc %p size %u", > + channel->type, channel->id, rcc, size); > + if (!channel->channel_cbs.handle_migrate_data) { > + return; > + } > + if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { > + spice_channel_client_error(rcc, "unexpected"); > + return; > + } > + if (channel->channel_cbs.handle_migrate_data_get_serial) { > + red_channel_client_set_message_serial(rcc, > + channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); > + } > + if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) { > + spice_channel_client_error(rcc, "handle_migrate_data failed"); > + return; > + } > + red_channel_client_seamless_migration_done(rcc); > +} > + > + > +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, > + uint16_t type, void *message) > +{ > + switch (type) { > + case SPICE_MSGC_ACK_SYNC: > + if (size != sizeof(uint32_t)) { > + spice_printerr("bad message size"); > + return FALSE; > + } > + rcc->ack_data.client_generation = *(uint32_t *)(message); > + break; > + case SPICE_MSGC_ACK: > + if (rcc->ack_data.client_generation == rcc->ack_data.generation) { > + rcc->ack_data.messages_window -= rcc->ack_data.client_window; > + red_channel_client_push(rcc); > + } > + break; > + case SPICE_MSGC_DISCONNECTING: > + break; > + case SPICE_MSGC_MIGRATE_FLUSH_MARK: > + if (!rcc->wait_migrate_flush_mark) { > + spice_error("unexpected flush mark"); > + return FALSE; > + } > + red_channel_handle_migrate_flush_mark(rcc); > + rcc->wait_migrate_flush_mark = FALSE; > + break; > + case SPICE_MSGC_MIGRATE_DATA: > + red_channel_handle_migrate_data(rcc, size, message); > + break; > + case SPICE_MSGC_PONG: > + red_channel_client_handle_pong(rcc, message); > + break; > + default: > + spice_printerr("invalid message type %u", type); > + return FALSE; > + } > + return TRUE; > +} > + > +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) > +{ > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > + spice_assert(msg_type != 0); > + rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); > + rcc->send_data.item = item; > + if (item) { > + red_pipe_item_ref(item); > + } > +} > + > +void red_channel_client_begin_send_message(RedChannelClient *rcc) > +{ > + SpiceMarshaller *m = rcc->send_data.marshaller; > + > + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) > + if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { > + spice_printerr("BUG: header->type == 0"); > + return; > + } > + > + /* canceling the latency test timer till the nework is idle */ > + red_channel_client_cancel_ping_timer(rcc); > + > + spice_marshaller_flush(m); > + rcc->send_data.size = spice_marshaller_get_total_size(m); > + rcc->send_data.header.set_msg_size(&rcc->send_data.header, > + rcc->send_data.size - rcc->send_data.header.header_size); > + rcc->ack_data.messages_window++; > + rcc->send_data.last_sent_serial = rcc->send_data.serial; > + rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ > + red_channel_client_send(rcc); > +} > + > +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) > +{ > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > + spice_assert(rcc->send_data.header.data != NULL); > + rcc->send_data.main.header_data = rcc->send_data.header.data; > + rcc->send_data.main.item = rcc->send_data.item; > + > + rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; > + rcc->send_data.item = NULL; > + red_channel_client_reset_send_data(rcc); > + return rcc->send_data.marshaller; > +} > + > +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) > +{ > + return rcc->send_data.serial; > +} > + > +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) > +{ > + rcc->send_data.last_sent_serial = serial; > + rcc->send_data.serial = serial; > +} > + > +static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) > +{ > + spice_assert(rcc && item); > + if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { > + spice_debug("rcc is disconnected %p", rcc); > + red_pipe_item_unref(item); > + return FALSE; > + } > + if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { > + rcc->channel->core->watch_update_mask(rcc->stream->watch, > + SPICE_WATCH_EVENT_READ | > + SPICE_WATCH_EVENT_WRITE); > + } > + rcc->pipe_size++; > + ring_add(pos, &item->link); > + return TRUE; > +} > + > +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) > +{ > + > + client_pipe_add(rcc, item, &rcc->pipe); > +} > + > +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) > +{ > + red_channel_client_pipe_add(rcc, item); > + red_channel_client_push(rcc); > +} > + > +void red_channel_client_pipe_add_after(RedChannelClient *rcc, > + RedPipeItem *item, > + RedPipeItem *pos) > +{ > + spice_assert(pos); > + client_pipe_add(rcc, item, &pos->link); > +} > + > +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, > + RedPipeItem *item) > +{ > + return ring_item_is_linked(&item->link); > +} > + > +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, > + RedPipeItem *item) > +{ > + client_pipe_add(rcc, item, rcc->pipe.prev); > +} > + > +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) > +{ > + if (client_pipe_add(rcc, item, rcc->pipe.prev)) { > + red_channel_client_push(rcc); > + } > +} > + > +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) > +{ > + RedPipeItem *item = spice_new(RedPipeItem, 1); > + > + red_pipe_item_init(item, pipe_item_type); > + red_channel_client_pipe_add(rcc, item); > + red_channel_client_push(rcc); > +} > + > +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) > +{ > + RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); > + > + red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); > + item->msg = msg_type; > + red_channel_client_pipe_add(rcc, &item->base); > + red_channel_client_push(rcc); > +} > + > +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc) > +{ > + g_return_val_if_fail(rcc != NULL, TRUE); > + return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe)); > +} > + > +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc) > +{ > + return rcc->pipe_size; > +} > + > +int red_channel_client_is_connected(RedChannelClient *rcc) > +{ > + if (!rcc->dummy) { > + return rcc->channel > + && (g_list_find(rcc->channel->clients, rcc) != NULL); > + } else { > + return rcc->dummy_connected; > + } > +} > + > +static void red_channel_client_clear_sent_item(RedChannelClient *rcc) > +{ > + red_channel_client_release_sent_item(rcc); > + rcc->send_data.blocked = FALSE; > + rcc->send_data.size = 0; > +} > + > +static void red_channel_client_pipe_clear(RedChannelClient *rcc) > +{ > + RedPipeItem *item; > + > + if (rcc) { > + red_channel_client_clear_sent_item(rcc); > + } > + while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { > + ring_remove(&item->link); > + red_pipe_item_unref(item); > + } > + rcc->pipe_size = 0; > +} > + > +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) > +{ > + rcc->ack_data.messages_window = 0; > +} > + > +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) > +{ > + rcc->ack_data.client_window = client_window; > +} > + > +void red_channel_client_push_set_ack(RedChannelClient *rcc) > +{ > + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); > +} > + > +static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) > +{ > + RedChannel *channel = red_channel_client_get_channel(rcc); > + GList *link; > + spice_assert(rcc->dummy); > + if (channel && (link = g_list_find(channel->clients, rcc))) { > + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, > + channel->type, channel->id); > + red_channel_remove_client(channel, link->data); > + } > + rcc->dummy_connected = FALSE; > +} > + > +void red_channel_client_disconnect(RedChannelClient *rcc) > +{ > + RedChannel *channel = rcc->channel; > + > + if (rcc->dummy) { > + red_channel_client_disconnect_dummy(rcc); > + return; > + } > + if (!red_channel_client_is_connected(rcc)) { > + return; > + } > + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, > + channel->type, channel->id); > + red_channel_client_pipe_clear(rcc); > + if (rcc->stream->watch) { > + channel->core->watch_remove(rcc->stream->watch); > + rcc->stream->watch = NULL; > + } > + if (rcc->latency_monitor.timer) { > + channel->core->timer_remove(rcc->latency_monitor.timer); > + rcc->latency_monitor.timer = NULL; > + } > + if (rcc->connectivity_monitor.timer) { > + channel->core->timer_remove(rcc->connectivity_monitor.timer); > + rcc->connectivity_monitor.timer = NULL; > + } > + red_channel_remove_client(channel, rcc); > + channel->channel_cbs.on_disconnect(rcc); > +} > + > +int red_channel_client_is_blocked(RedChannelClient *rcc) > +{ > + return rcc && rcc->send_data.blocked; > +} > + > +int red_channel_client_send_message_pending(RedChannelClient *rcc) > +{ > + return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; > +} > + > +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) > +{ > + return rcc->send_data.marshaller; > +} > + > +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) > +{ > + return rcc->stream; > +} > + > +RedClient *red_channel_client_get_client(RedChannelClient *rcc) > +{ > + return rcc->client; > +} > + > +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) > +{ > + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); > +} > + > +static void marker_pipe_item_free(RedPipeItem *base) > +{ > + MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base); > + > + if (item->item_in_pipe) { > + *item->item_in_pipe = FALSE; > + } > + free(item); > +} > + > +/* TODO: more evil sync stuff. anything with the word wait in it's name. */ > +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, > + RedPipeItem *item, > + int64_t timeout) > +{ > + uint64_t end_time; > + gboolean item_in_pipe; > + > + spice_info(NULL); > + > + if (timeout != -1) { > + end_time = spice_get_monotonic_time_ns() + timeout; > + } else { > + end_time = UINT64_MAX; > + } > + > + MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1); > + > + red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER, > + marker_pipe_item_free); > + item_in_pipe = TRUE; > + mark_item->item_in_pipe = &item_in_pipe; > + red_channel_client_pipe_add_after(rcc, &mark_item->base, item); > + > + if (red_channel_client_is_blocked(rcc)) { > + red_channel_client_receive(rcc); > + red_channel_client_send(rcc); > + } > + red_channel_client_push(rcc); > + > + while(item_in_pipe && > + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { > + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); > + red_channel_client_receive(rcc); > + red_channel_client_send(rcc); > + red_channel_client_push(rcc); > + } > + > + if (item_in_pipe) { > + // still on the queue, make sure won't overwrite the stack variable > + mark_item->item_in_pipe = NULL; > + spice_warning("timeout"); > + return FALSE; > + } else { > + return red_channel_client_wait_outgoing_item(rcc, > + timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); > + } > +} > + > +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, > + int64_t timeout) > +{ > + uint64_t end_time; > + int blocked; > + > + if (!red_channel_client_is_blocked(rcc)) { > + return TRUE; > + } > + if (timeout != -1) { > + end_time = spice_get_monotonic_time_ns() + timeout; > + } else { > + end_time = UINT64_MAX; > + } > + spice_info("blocked"); > + > + do { > + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); > + red_channel_client_receive(rcc); > + red_channel_client_send(rcc); > + } while ((blocked = red_channel_client_is_blocked(rcc)) && > + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); > + > + if (blocked) { > + spice_warning("timeout"); > + return FALSE; > + } else { > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > + return TRUE; > + } > +} > + > +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) > +{ > + if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { > + red_channel_client_disconnect(rcc); > + } else { > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > + } > +} > + > +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc) > +{ > + return !rcc || (rcc->send_data.size == 0); > +} > + > +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, > + RedPipeItem *item) > +{ > + red_channel_client_pipe_remove(rcc, item); > + red_pipe_item_unref(item); > +} > + > +/* client mutex should be locked before this call */ > +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) > +{ > + gboolean ret = FALSE; > + > + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { > + rcc->wait_migrate_data = TRUE; > + ret = TRUE; > + } > + spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, > + rcc->wait_migrate_data); > + > + return ret; > +} > + > +void red_channel_client_set_destroying(RedChannelClient *rcc) > +{ > + rcc->destroying = TRUE; > +} > + > +gboolean red_channel_client_is_destroying(RedChannelClient *rcc) > +{ > + return rcc->destroying; > +} > diff --git a/server/red-channel-client.h b/server/red-channel-client.h > new file mode 100644 > index 0000000..bacfeb5 > --- /dev/null > +++ b/server/red-channel-client.h > @@ -0,0 +1,253 @@ > +/* > + Copyright (C) 2009-2015 Red Hat, Inc. -2016 ? > + > + This library is free software; you can redistribute it and/or > + modify it under the terms of the GNU Lesser General Public > + License as published by the Free Software Foundation; either > + version 2.1 of the License, or (at your option) any later version. > + > + This library is distributed in the hope that it will be useful, > + but WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + Lesser General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public > + License along with this library; if not, see <http://www.gnu.org/licenses/>. > +*/ > + > +#ifndef _H_RED_CHANNEL_CLIENT > +#define _H_RED_CHANNEL_CLIENT > + > +#include <common/marshaller.h> > + > +#include "red-pipe-item.h" > +#include "reds-stream.h" > +#include "red-channel.h" > + > +typedef struct RedChannel RedChannel; > +typedef struct RedClient RedClient; > +typedef struct IncomingHandler IncomingHandler; > + > +typedef struct RedChannelClient RedChannelClient; > + > +/* > + * When an error occurs over a channel, we treat it as a warning > + * for spice-server and shutdown the channel. > + */ > +#define spice_channel_client_error(rcc, format, ...) \ This is only used in red-channel-client.c now but I see that we can change spice_warning + _client_shutdown() with this macro at least in one place (reds.c) > + do { \ > + RedChannel *_ch = red_channel_client_get_channel(rcc); \ > + spice_warning("rcc %p type %u id %u: " format, rcc, \ > + _ch->type, _ch->id, ## __VA_ARGS__); \ Might be okay to indent the last '\' > + red_channel_client_shutdown(rcc); \ > + } while (0) > + > +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, > + RedClient *client, RedsStream *stream, > + int monitor_latency, > + int num_common_caps, uint32_t *common_caps, > + int num_caps, uint32_t *caps); > + > +RedChannelClient *red_channel_client_create_dummy(int size, > + RedChannel *channel, > + RedClient *client, > + int num_common_caps, uint32_t *common_caps, > + int num_caps, uint32_t *caps); > + > +void red_channel_client_ref(RedChannelClient *rcc); > +void red_channel_client_unref(RedChannelClient *rcc); > + > +int red_channel_client_is_connected(RedChannelClient *rcc); > +void red_channel_client_default_migrate(RedChannelClient *rcc); > +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); > +void red_channel_client_destroy(RedChannelClient *rcc); > +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); > +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); > +/* shutdown is the only safe thing to do out of the client/channel > + * thread. It will not touch the rings, just shutdown the socket. > + * It should be followed by some way to gurantee a disconnection. */ > +void red_channel_client_shutdown(RedChannelClient *rcc); > +/* handles general channel msgs from the client */ > +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, > + uint16_t type, void *message); > +/* when preparing send_data: should call init and then use marshaller */ > +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); > + > +uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); > +void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); > + > +/* When sending a msg. Should first call red_channel_client_begin_send_message. > + * It will first send the pending urgent data, if there is any, and then > + * the rest of the data. > + */ > +void red_channel_client_begin_send_message(RedChannelClient *rcc); > + > +/* > + * Stores the current send data, and switches to urgent send data. > + * When it begins the actual send, it will send first the urgent data > + * and afterward the rest of the data. > + * Should be called only if during the marshalling of on message, > + * the need to send another message, before, rises. > + * Important: the serial of the non-urgent sent data, will be succeeded. > + * return: the urgent send data marshaller > + */ > +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); > + > +/* returns -1 if we don't have an estimation */ > +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); > + > +/* Checks periodically if the connection is still alive */ > +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); > + > +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); > +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); > +void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); > +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); > +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); > +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item); > +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); > +/* for types that use this routine -> the pipe item should be freed */ > +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); > +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); > +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc); > +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc); > + > +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); > +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); > +void red_channel_client_push_set_ack(RedChannelClient *rcc); > + > +gboolean red_channel_client_is_blocked(RedChannelClient *rcc); > + > +/* helper for channels that have complex logic that can possibly ready a send */ > +int red_channel_client_send_message_pending(RedChannelClient *rcc); > + > +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc); > +void red_channel_client_push(RedChannelClient *rcc); > +// TODO: again - what is the context exactly? this happens in channel disconnect. but our > +// current red_channel_shutdown also closes the socket - is there a socket to close? > +// are we reading from an fd here? arghh > +void red_channel_client_receive(RedChannelClient *rcc); > +void red_channel_client_send(RedChannelClient *rcc); > +void red_channel_client_disconnect(RedChannelClient *rcc); > + > +/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ > +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); > +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); > +RedClient *red_channel_client_get_client(RedChannelClient *rcc); > + > +/* Note that the header is valid only between red_channel_reset_send_data and > + * red_channel_begin_send_message.*/ > +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); > + > +/* > + * blocking functions. > + * > + * timeout is in nano sec. -1 for no timeout. > + * > + * Return: TRUE if waiting succeeded. FALSE if timeout expired. > + */ > + > +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, > + RedPipeItem *item, > + int64_t timeout); > +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, > + int64_t timeout); > +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); > + > +RedChannel* red_channel_client_get_channel(RedChannelClient* rcc); > +IncomingHandler* red_channel_client_get_incoming_handler(RedChannelClient *rcc); > + > +void red_channel_client_on_output(void *opaque, int n); > +void red_channel_client_on_input(void *opaque, int n); > +int red_channel_client_get_out_msg_size(void *opaque); > +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, > + int *vec_size, int pos); > +void red_channel_client_on_out_block(void *opaque); > +void red_channel_client_on_out_msg_done(void *opaque); > + > +void red_channel_client_seamless_migration_done(RedChannelClient *rcc); > +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc); > +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc); > + > +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc); > +void red_channel_client_set_destroying(RedChannelClient *rcc); > +gboolean red_channel_client_is_destroying(RedChannelClient *rcc); > + > +#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) > + > +typedef struct OutgoingHandler { > + OutgoingHandlerInterface *cb; > + void *opaque; > + struct iovec vec_buf[IOV_MAX]; > + int vec_size; > + struct iovec *vec; > + int pos; > + int size; > +} OutgoingHandler; > + > +typedef struct IncomingHandler { > + IncomingHandlerInterface *cb; > + void *opaque; > + uint8_t header_buf[MAX_HEADER_SIZE]; > + SpiceDataHeaderOpaque header; > + uint32_t header_pos; > + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. > + uint32_t msg_pos; > + uint64_t serial; > +} IncomingHandler; > + > +struct RedChannelClient { > + RedChannel *channel; > + RedClient *client; > + RedsStream *stream; > + int dummy; > + int dummy_connected; > + > + uint32_t refs; > + > + struct { > + uint32_t generation; > + uint32_t client_generation; > + uint32_t messages_window; > + uint32_t client_window; > + } ack_data; > + > + struct { > + SpiceMarshaller *marshaller; > + SpiceDataHeaderOpaque header; > + uint32_t size; > + RedPipeItem *item; > + int blocked; > + uint64_t serial; > + uint64_t last_sent_serial; > + > + struct { > + SpiceMarshaller *marshaller; > + uint8_t *header_data; > + RedPipeItem *item; > + } main; > + > + struct { > + SpiceMarshaller *marshaller; > + } urgent; > + } send_data; > + > + OutgoingHandler outgoing; > + IncomingHandler incoming; > + int during_send; > + int id; // debugging purposes > + Ring pipe; > + uint32_t pipe_size; > + > + RedChannelCapabilities remote_caps; > + int is_mini_header; > + gboolean destroying; > + > + int wait_migrate_data; > + int wait_migrate_flush_mark; > + > + RedChannelClientLatencyMonitor latency_monitor; > + RedChannelClientConnectivityMonitor connectivity_monitor; > +}; > + > +#endif /* _H_RED_CHANNEL_CLIENT */ > diff --git a/server/red-channel.c b/server/red-channel.c > index bf290b1..03338aa 100644 > --- a/server/red-channel.c > +++ b/server/red-channel.c > @@ -22,68 +22,15 @@ > #include <config.h> > #endif > > -#include <glib.h> > -#include <stdio.h> > -#include <stdint.h> > -#include <netinet/in.h> > -#include <netinet/tcp.h> > -#include <fcntl.h> > -#include <unistd.h> > -#include <errno.h> > -#include <sys/ioctl.h> > -#ifdef HAVE_LINUX_SOCKIOS_H > -#include <linux/sockios.h> /* SIOCOUTQ */ > -#endif > - > -#include <common/generated_server_marshallers.h> > #include <common/ring.h> > > #include "red-channel.h" > +#include "red-channel-client.h" > #include "reds.h" > #include "reds-stream.h" > #include "main-dispatcher.h" > #include "utils.h" > > -typedef struct RedEmptyMsgPipeItem { > - RedPipeItem base; > - int msg; > -} RedEmptyMsgPipeItem; > - > -typedef struct MarkerPipeItem { > - RedPipeItem base; > - gboolean *item_in_pipe; > -} MarkerPipeItem; > - > -#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) > -#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) > - > -#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro > - > -enum QosPingState { > - PING_STATE_NONE, > - PING_STATE_TIMER, > - PING_STATE_WARMUP, > - PING_STATE_LATENCY, > -}; > - > -enum ConnectivityState { > - CONNECTIVITY_STATE_CONNECTED, > - CONNECTIVITY_STATE_BLOCKED, > - CONNECTIVITY_STATE_WAIT_PONG, > - CONNECTIVITY_STATE_DISCONNECTED, > -}; > - > -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout); > -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc); > -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc); > - > -static void red_channel_client_event(int fd, int event, void *data); > -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); > -static void red_client_remove_channel(RedChannelClient *rcc); > -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); > -static void red_channel_client_restore_main_sender(RedChannelClient *rcc); > -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); > - > /* > * Lifetime of RedChannel, RedChannelClient and RedClient: > * RedChannel is created and destroyed by the calls to > @@ -118,561 +65,23 @@ static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); > * If a call to red_channel_client_destroy is made from another location, it must be called > * from the channel's thread. > */ > -static void red_channel_ref(RedChannel *channel); > -static void red_channel_unref(RedChannel *channel); > - > -static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) > -{ > - return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); > -} > - > -static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) > -{ > - return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); > -} > - > -static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) > -{ > - return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); > -} > - > -static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) > -{ > - return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); > -} > - > -static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) > -{ > - ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); > -} > - > -static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) > -{ > - ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); > -} > - > -static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) > -{ > - ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); > -} > - > -static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) > -{ > - ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); > -} > - > -static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) > -{ > - ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); > -} > - > -static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) > -{ > - spice_error("attempt to set header serial on mini header"); > -} > - > -static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) > -{ > - ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); > -} > - > -static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) > -{ > - spice_error("attempt to set header sub list on mini header"); > -} > - > -static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), > - full_header_set_msg_type, > - full_header_set_msg_size, > - full_header_set_msg_serial, > - full_header_set_msg_sub_list, > - full_header_get_msg_type, > - full_header_get_msg_size}; > - > -static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), > - mini_header_set_msg_type, > - mini_header_set_msg_size, > - mini_header_set_msg_serial, > - mini_header_set_msg_sub_list, > - mini_header_get_msg_type, > - mini_header_get_msg_size}; > - > -/* return the number of bytes read. -1 in case of error */ > -static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) > -{ > - uint8_t *pos = buf; > - while (size) { > - int now; > - if (stream->shutdown) { > - return -1; > - } > - now = reds_stream_read(stream, pos, size); > - if (now <= 0) { > - if (now == 0) { > - return -1; > - } > - spice_assert(now == -1); > - if (errno == EAGAIN) { > - break; > - } else if (errno == EINTR) { > - continue; > - } else if (errno == EPIPE) { > - return -1; > - } else { > - spice_printerr("%s", strerror(errno)); > - return -1; > - } > - } else { > - size -= now; > - pos += now; > - } > - } > - return pos - buf; > -} > - > -// TODO: this implementation, as opposed to the old implementation in red_worker, > -// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer > -// arithmetic for the case where a single cb_read could return multiple messages. But > -// this is suboptimal potentially. Profile and consider fixing. > -static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) > -{ > - int bytes_read; > - uint8_t *parsed; > - size_t parsed_size; > - message_destructor_t parsed_free; > - uint16_t msg_type; > - uint32_t msg_size; > - > - /* XXX: This needs further investigation as to the underlying cause, it happened > - * after spicec disconnect (but not with spice-gtk) repeatedly. */ > - if (!stream) { > - return; > - } > - > - for (;;) { > - int ret_handle; > - if (handler->header_pos < handler->header.header_size) { > - bytes_read = red_peer_receive(stream, > - handler->header.data + handler->header_pos, > - handler->header.header_size - handler->header_pos); > - if (bytes_read == -1) { > - handler->cb->on_error(handler->opaque); > - return; > - } > - handler->cb->on_input(handler->opaque, bytes_read); > - handler->header_pos += bytes_read; > - > - if (handler->header_pos != handler->header.header_size) { > - return; > - } > - } > - > - msg_size = handler->header.get_msg_size(&handler->header); > - msg_type = handler->header.get_msg_type(&handler->header); > - if (handler->msg_pos < msg_size) { > - if (!handler->msg) { > - handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); > - if (handler->msg == NULL) { > - spice_printerr("ERROR: channel refused to allocate buffer."); > - handler->cb->on_error(handler->opaque); > - return; > - } > - } > - > - bytes_read = red_peer_receive(stream, > - handler->msg + handler->msg_pos, > - msg_size - handler->msg_pos); > - if (bytes_read == -1) { > - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > - handler->cb->on_error(handler->opaque); > - return; > - } > - handler->cb->on_input(handler->opaque, bytes_read); > - handler->msg_pos += bytes_read; > - if (handler->msg_pos != msg_size) { > - return; > - } > - } > - > - if (handler->cb->parser) { > - parsed = handler->cb->parser(handler->msg, > - handler->msg + msg_size, msg_type, > - SPICE_VERSION_MINOR, &parsed_size, &parsed_free); > - if (parsed == NULL) { > - spice_printerr("failed to parse message type %d", msg_type); > - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > - handler->cb->on_error(handler->opaque); > - return; > - } > - ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, > - msg_type, parsed); > - parsed_free(parsed); > - } else { > - ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, > - handler->msg); > - } > - handler->msg_pos = 0; > - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); > - handler->msg = NULL; > - handler->header_pos = 0; > - > - if (!ret_handle) { > - handler->cb->on_error(handler->opaque); > - return; > - } > - } > -} > - > -void red_channel_client_receive(RedChannelClient *rcc) > -{ > - red_channel_client_ref(rcc); > - red_peer_handle_incoming(rcc->stream, &rcc->incoming); > - red_channel_client_unref(rcc); > -} > > void red_channel_receive(RedChannel *channel) > { > g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL); > } > > -static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) > -{ > - ssize_t n; > - > - if (!stream) { > - return; > - } > - > - if (handler->size == 0) { > - handler->vec = handler->vec_buf; > - handler->size = handler->cb->get_msg_size(handler->opaque); > - if (!handler->size) { // nothing to be sent > - return; > - } > - } > - > - for (;;) { > - handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); > - n = reds_stream_writev(stream, handler->vec, handler->vec_size); > - if (n == -1) { > - switch (errno) { > - case EAGAIN: > - handler->cb->on_block(handler->opaque); > - return; > - case EINTR: > - continue; > - case EPIPE: > - handler->cb->on_error(handler->opaque); > - return; > - default: > - spice_printerr("%s", strerror(errno)); > - handler->cb->on_error(handler->opaque); > - return; > - } > - } else { > - handler->pos += n; > - handler->cb->on_output(handler->opaque, n); > - if (handler->pos == handler->size) { // finished writing data > - /* reset handler before calling on_msg_done, since it > - * can trigger another call to red_peer_handle_outgoing (when > - * switching from the urgent marshaller to the main one */ > - handler->vec = handler->vec_buf; > - handler->pos = 0; > - handler->size = 0; > - handler->cb->on_msg_done(handler->opaque); > - return; > - } > - } > - } > -} > - > -static void red_channel_client_on_output(void *opaque, int n) > -{ > - RedChannelClient *rcc = opaque; > - > - if (rcc->connectivity_monitor.timer) { > - rcc->connectivity_monitor.out_bytes += n; > - } > - stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); > -} > - > -static void red_channel_client_on_input(void *opaque, int n) > -{ > - RedChannelClient *rcc = opaque; > - > - if (rcc->connectivity_monitor.timer) { > - rcc->connectivity_monitor.in_bytes += n; > - } > -} > - > static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) > { > red_channel_client_disconnect(rcc); > } > > -static int red_channel_client_get_out_msg_size(void *opaque) > -{ > - RedChannelClient *rcc = (RedChannelClient *)opaque; > - > - return rcc->send_data.size; > -} > - > -static void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, > - int *vec_size, int pos) > -{ > - RedChannelClient *rcc = (RedChannelClient *)opaque; > - > - *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, > - vec, IOV_MAX, pos); > -} > - > -static void red_channel_client_on_out_block(void *opaque) > -{ > - RedChannelClient *rcc = (RedChannelClient *)opaque; > - > - rcc->send_data.blocked = TRUE; > - rcc->channel->core->watch_update_mask(rcc->stream->watch, > - SPICE_WATCH_EVENT_READ | > - SPICE_WATCH_EVENT_WRITE); > -} > - > -static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) > -{ > - return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); > -} > - > -static void red_channel_client_reset_send_data(RedChannelClient *rcc) > -{ > - spice_marshaller_reset(rcc->send_data.marshaller); > - rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, > - rcc->send_data.header.header_size); > - spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); > - rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); > - rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); > - > - /* Keeping the serial consecutive: resetting it if reset_send_data > - * has been called before, but no message has been sent since then. > - */ > - if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { > - spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); > - /* When the urgent marshaller is active, the serial was incremented by > - * the call to reset_send_data that was made for the main marshaller. > - * The urgent msg receives this serial, and the main msg serial is > - * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) > - * should be 1 in this case*/ > - if (!red_channel_client_urgent_marshaller_is_active(rcc)) { > - rcc->send_data.serial = rcc->send_data.last_sent_serial; > - } > - } > - rcc->send_data.serial++; > - > - if (!rcc->is_mini_header) { > - spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); > - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); > - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); > - } > -} > - > -void red_channel_client_push_set_ack(RedChannelClient *rcc) > -{ > - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); > -} > - > -static void red_channel_client_send_set_ack(RedChannelClient *rcc) > -{ > - SpiceMsgSetAck ack; > - > - spice_assert(rcc); > - red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); > - ack.generation = ++rcc->ack_data.generation; > - ack.window = rcc->ack_data.client_window; > - rcc->ack_data.messages_window = 0; > - > - spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); > - > - red_channel_client_begin_send_message(rcc); > -} > - > -static void red_channel_client_send_migrate(RedChannelClient *rcc) > -{ > - SpiceMsgMigrate migrate; > - > - red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); > - migrate.flags = rcc->channel->migration_flags; > - spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); > - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { > - rcc->wait_migrate_flush_mark = TRUE; > - } > - > - red_channel_client_begin_send_message(rcc); > -} > - > - > -static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) > -{ > - RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base); > - > - red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); > - red_channel_client_begin_send_message(rcc); > -} > - > -static void red_channel_client_send_ping(RedChannelClient *rcc) > -{ > - SpiceMsgPing ping; > - > - if (!rcc->latency_monitor.warmup_was_sent) { // latency test start > - int delay_val; > - socklen_t opt_size = sizeof(delay_val); > - > - rcc->latency_monitor.warmup_was_sent = TRUE; > - /* > - * When testing latency, TCP_NODELAY must be switched on, otherwise, > - * sending the ping message is delayed by Nagle algorithm, and the > - * roundtrip measurement is less accurate (bigger). > - */ > - rcc->latency_monitor.tcp_nodelay = 1; > - if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > - &opt_size) == -1) { > - spice_warning("getsockopt failed, %s", strerror(errno)); > - } else { > - rcc->latency_monitor.tcp_nodelay = delay_val; > - if (!delay_val) { > - delay_val = 1; > - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > - sizeof(delay_val)) == -1) { > - if (errno != ENOTSUP) { > - spice_warning("setsockopt failed, %s", strerror(errno)); > - } > - } > - } > - } > - } > - > - red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); > - ping.id = rcc->latency_monitor.id; > - ping.timestamp = spice_get_monotonic_time_ns(); > - spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); > - red_channel_client_begin_send_message(rcc); > -} > - > -static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) > -{ > - spice_assert(red_channel_client_no_item_being_sent(rcc)); > - red_channel_client_reset_send_data(rcc); > - switch (item->type) { > - case RED_PIPE_ITEM_TYPE_SET_ACK: > - red_channel_client_send_set_ack(rcc); > - break; > - case RED_PIPE_ITEM_TYPE_MIGRATE: > - red_channel_client_send_migrate(rcc); > - break; > - case RED_PIPE_ITEM_TYPE_EMPTY_MSG: > - red_channel_client_send_empty_msg(rcc, item); > - break; > - case RED_PIPE_ITEM_TYPE_PING: > - red_channel_client_send_ping(rcc); > - break; > - case RED_PIPE_ITEM_TYPE_MARKER: > - break; > - default: > - rcc->channel->channel_cbs.send_item(rcc, item); > - break; > - } > - red_pipe_item_unref(item); > -} > - > -static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) > -{ > - if (rcc->send_data.item) { > - red_pipe_item_unref(rcc->send_data.item); > - rcc->send_data.item = NULL; > - } > -} > - > -static void red_channel_client_on_out_msg_done(void *opaque) > -{ > - RedChannelClient *rcc = (RedChannelClient *)opaque; > - int fd; > - > - rcc->send_data.size = 0; > - > - if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { > - if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { > - perror("sendfd"); > - red_channel_client_disconnect(rcc); > - if (fd != -1) > - close(fd); > - return; > - } > - if (fd != -1) > - close(fd); > - } > - > - red_channel_client_release_sent_item(rcc); > - if (rcc->send_data.blocked) { > - rcc->send_data.blocked = FALSE; > - rcc->channel->core->watch_update_mask(rcc->stream->watch, > - SPICE_WATCH_EVENT_READ); > - } > - > - if (red_channel_client_urgent_marshaller_is_active(rcc)) { > - red_channel_client_restore_main_sender(rcc); > - spice_assert(rcc->send_data.header.data != NULL); > - red_channel_client_begin_send_message(rcc); > - } else { > - if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { > - /* It is possible that the socket will become idle, so we may be able to test latency */ > - red_channel_client_restart_ping_timer(rcc); > - } > - } > - > -} > - > -static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) > -{ > - rcc->pipe_size--; > - ring_remove(&item->link); > -} > - > -static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) > +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) > { > spice_assert(rcc); > channel->clients = g_list_prepend(channel->clients, rcc); > } > > -static void red_channel_client_set_remote_caps(RedChannelClient* rcc, > - int num_common_caps, uint32_t *common_caps, > - int num_caps, uint32_t *caps) > -{ > - rcc->remote_caps.num_common_caps = num_common_caps; > - rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); > - > - rcc->remote_caps.num_caps = num_caps; > - rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); > -} > - > -static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) > -{ > - rcc->remote_caps.num_common_caps = 0; > - free(rcc->remote_caps.common_caps); > - rcc->remote_caps.num_caps = 0; > - free(rcc->remote_caps.caps); > -} > - > -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) > -{ > - return test_capability(rcc->remote_caps.common_caps, > - rcc->remote_caps.num_common_caps, > - cap); > -} > - > -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) > -{ > - return test_capability(rcc->remote_caps.caps, > - rcc->remote_caps.num_caps, > - cap); > -} > - > int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap) > { > GList *link, *next; > @@ -699,230 +108,8 @@ int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap) > return TRUE; > } > > -static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) > -{ > - if (red_client_get_channel(client, channel->type, channel->id)) { > - spice_printerr("Error client %p: duplicate channel type %d id %d", > - client, channel->type, channel->id); > - return FALSE; > - } > - return TRUE; > -} > - > -static void red_channel_client_push_ping(RedChannelClient *rcc) > -{ > - spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); > - rcc->latency_monitor.state = PING_STATE_WARMUP; > - rcc->latency_monitor.warmup_was_sent = FALSE; > - rcc->latency_monitor.id = rand(); > - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); > - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); > -} > - > -static void red_channel_client_ping_timer(void *opaque) > -{ > - RedChannelClient *rcc = opaque; > - > - spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); > - red_channel_client_cancel_ping_timer(rcc); > - > -#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ > - { > - int so_unsent_size = 0; > - > - /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ > - if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { > - spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); > - } > - if (so_unsent_size > 0) { > - /* tcp snd buffer is still occupied. rescheduling ping */ > - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > - } else { > - red_channel_client_push_ping(rcc); > - } > - } > -#else /* ifdef HAVE_LINUX_SOCKIOS_H */ > - /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ > - red_channel_client_push_ping(rcc); > -#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ > -} > - > -/* > - * When a connection is not alive (and we can't detect it via a socket error), we > - * reach one of these 2 states: > - * (1) Sending msgs is blocked: either writes return EAGAIN > - * or we are missing MSGC_ACK from the client. > - * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. > - * > - * The connectivity_timer callback tests if the channel's state matches one of the above. > - * In case it does, on the next time the timer is called, it checks if the connection has > - * been idle during the time that passed since the previous timer call. If the connection > - * has been idle, we consider the client as disconnected. > - */ > -static void red_channel_client_connectivity_timer(void *opaque) > -{ > - RedChannelClient *rcc = opaque; > - RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; > - int is_alive = TRUE; > - > - if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { > - if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { > - if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { > - spice_error("mismatch between rcc-state and connectivity-state"); > - } > - spice_debug("rcc is blocked; connection is idle"); > - is_alive = FALSE; > - } > - } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { > - if (monitor->in_bytes == 0) { > - if (rcc->latency_monitor.state != PING_STATE_WARMUP && > - rcc->latency_monitor.state != PING_STATE_LATENCY) { > - spice_error("mismatch between rcc-state and connectivity-state"); > - } > - spice_debug("rcc waits for pong; connection is idle"); > - is_alive = FALSE; > - } > - } > - > - if (is_alive) { > - monitor->in_bytes = 0; > - monitor->out_bytes = 0; > - if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { > - monitor->state = CONNECTIVITY_STATE_BLOCKED; > - } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || > - rcc->latency_monitor.state == PING_STATE_LATENCY) { > - monitor->state = CONNECTIVITY_STATE_WAIT_PONG; > - } else { > - monitor->state = CONNECTIVITY_STATE_CONNECTED; > - } > - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > - rcc->connectivity_monitor.timeout); > - } else { > - monitor->state = CONNECTIVITY_STATE_DISCONNECTED; > - spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", > - rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); > - red_channel_client_disconnect(rcc); > - } > -} > - > -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) > -{ > - if (!red_channel_client_is_connected(rcc)) { > - return; > - } > - spice_debug(NULL); > - spice_assert(timeout_ms > 0); > - /* > - * If latency_monitor is not active, we activate it in order to enable > - * periodic ping messages so that we will be be able to identify a disconnected > - * channel-client even if there are no ongoing channel specific messages > - * on this channel. > - */ > - if (rcc->latency_monitor.timer == NULL) { > - rcc->latency_monitor.timer = rcc->channel->core->timer_add( > - rcc->channel->core, red_channel_client_ping_timer, rcc); > - if (!red_client_during_migrate_at_target(rcc->client)) { > - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > - } > - rcc->latency_monitor.roundtrip = -1; > - } > - if (rcc->connectivity_monitor.timer == NULL) { > - rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; > - rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( > - rcc->channel->core, red_channel_client_connectivity_timer, rcc); > - rcc->connectivity_monitor.timeout = timeout_ms; > - if (!red_client_during_migrate_at_target(rcc->client)) { > - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > - rcc->connectivity_monitor.timeout); > - } > - } > -} > - > -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, > - RedsStream *stream, > - int monitor_latency, > - int num_common_caps, uint32_t *common_caps, > - int num_caps, uint32_t *caps) > -{ > - RedChannelClient *rcc = NULL; > - > - pthread_mutex_lock(&client->lock); > - if (!red_channel_client_pre_create_validate(channel, client)) { > - goto error; > - } > - spice_assert(stream && channel && size >= sizeof(RedChannelClient)); > - rcc = spice_malloc0(size); > - rcc->stream = stream; > - rcc->channel = channel; > - rcc->client = client; > - rcc->refs = 1; > - rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + > - // block flags) > - rcc->ack_data.client_generation = ~0; > - rcc->ack_data.client_window = CLIENT_ACK_WINDOW; > - rcc->send_data.main.marshaller = spice_marshaller_new(); > - rcc->send_data.urgent.marshaller = spice_marshaller_new(); > - > - rcc->send_data.marshaller = rcc->send_data.main.marshaller; > - > - rcc->incoming.opaque = rcc; > - rcc->incoming.cb = &channel->incoming_cb; > - > - rcc->outgoing.opaque = rcc; > - rcc->outgoing.cb = &channel->outgoing_cb; > - rcc->outgoing.pos = 0; > - rcc->outgoing.size = 0; > - > - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); > - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { > - rcc->incoming.header = mini_header_wrapper; > - rcc->send_data.header = mini_header_wrapper; > - rcc->is_mini_header = TRUE; > - } else { > - rcc->incoming.header = full_header_wrapper; > - rcc->send_data.header = full_header_wrapper; > - rcc->is_mini_header = FALSE; > - } > - > - rcc->incoming.header.data = rcc->incoming.header_buf; > - rcc->incoming.serial = 1; > - > - if (!channel->channel_cbs.config_socket(rcc)) { > - goto error; > - } > - > - ring_init(&rcc->pipe); > - rcc->pipe_size = 0; > - > - stream->watch = channel->core->watch_add(channel->core, > - stream->socket, > - SPICE_WATCH_EVENT_READ, > - red_channel_client_event, rcc); > - rcc->id = g_list_length(channel->clients); > - red_channel_add_client(channel, rcc); > - red_client_add_channel(client, rcc); > - red_channel_ref(channel); > - pthread_mutex_unlock(&client->lock); > - > - if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { > - rcc->latency_monitor.timer = channel->core->timer_add( > - channel->core, red_channel_client_ping_timer, rcc); > - if (!client->during_target_migrate) { > - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > - } > - rcc->latency_monitor.roundtrip = -1; > - } > - > - return rcc; > -error: > - free(rcc); > - reds_stream_free(stream); > - pthread_mutex_unlock(&client->lock); > - return NULL; > -} > - > /* returns TRUE If all channels are finished migrating, FALSE otherwise */ > -static gboolean red_client_seamless_migration_done_for_channel(RedClient *client) > +gboolean red_client_seamless_migration_done_for_channel(RedClient *client) > { > gboolean ret = FALSE; > > @@ -944,26 +131,6 @@ static gboolean red_client_seamless_migration_done_for_channel(RedClient *client > return ret; > } > > -static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) > -{ > - rcc->wait_migrate_data = FALSE; > - > - if (red_client_seamless_migration_done_for_channel(rcc->client)) { > - if (rcc->latency_monitor.timer) { > - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > - } > - if (rcc->connectivity_monitor.timer) { > - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, > - rcc->connectivity_monitor.timeout); > - } > - } > -} > - > -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) > -{ > - return rcc->wait_migrate_data; > -} > - > int red_channel_is_waiting_for_migrate_data(RedChannel *channel) > { > RedChannelClient *rcc; > @@ -995,20 +162,6 @@ static void red_channel_client_default_disconnect(RedChannelClient *base) > red_channel_client_disconnect(base); > } > > -void red_channel_client_default_migrate(RedChannelClient *rcc) > -{ > - if (rcc->latency_monitor.timer) { > - red_channel_client_cancel_ping_timer(rcc); > - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); > - rcc->latency_monitor.timer = NULL; > - } > - if (rcc->connectivity_monitor.timer) { > - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); > - rcc->connectivity_monitor.timer = NULL; > - } > - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); > -} > - > RedChannel *red_channel_create(int size, > RedsState *reds, > const SpiceCoreInterfaceInternal *core, > @@ -1128,568 +281,136 @@ static int do_nothing_handle_message(RedChannelClient *rcc, > > RedChannel *red_channel_create_parser(int size, > RedsState *reds, > - const SpiceCoreInterfaceInternal *core, > - uint32_t type, uint32_t id, > - int handle_acks, > - spice_parse_channel_func_t parser, > - channel_handle_parsed_proc handle_parsed, > - const ChannelCbs *channel_cbs, > - uint32_t migration_flags) > -{ > - RedChannel *channel = red_channel_create(size, reds, core, type, id, > - handle_acks, > - do_nothing_handle_message, > - channel_cbs, > - migration_flags); > - > - if (channel == NULL) { > - return NULL; > - } > - channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; > - channel->incoming_cb.parser = parser; > - > - return channel; > -} > - > -void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) > -{ > - spice_return_if_fail(channel != NULL); > - spice_return_if_fail(channel->stat == 0); > - > -#ifdef RED_STATISTICS > - channel->stat = stat; > - channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); > -#endif > -} > - > -void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) > -{ > - spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); > - channel->client_cbs.connect = client_cbs->connect; > - > - if (client_cbs->disconnect) { > - channel->client_cbs.disconnect = client_cbs->disconnect; > - } > - > - if (client_cbs->migrate) { > - channel->client_cbs.migrate = client_cbs->migrate; > - } > - channel->data = cbs_data; > -} > - > -int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) > -{ > - uint32_t index = cap / 32; > - if (num_caps < index + 1) { > - return FALSE; > - } > - > - return (caps[index] & (1 << (cap % 32))) != 0; > -} > - > -static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) > -{ > - int nbefore, n; > - > - nbefore = *num_caps; > - n = cap / 32; > - *num_caps = MAX(*num_caps, n + 1); > - *caps = spice_renew(uint32_t, *caps, *num_caps); > - memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); > - (*caps)[n] |= (1 << (cap % 32)); > -} > - > -void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) > -{ > - add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); > -} > - > -void red_channel_set_cap(RedChannel *channel, uint32_t cap) > -{ > - add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); > -} > - > -static void red_channel_ref(RedChannel *channel) > -{ > - channel->refs++; > -} > - > -static void red_channel_unref(RedChannel *channel) > -{ > - if (!--channel->refs) { > - if (channel->local_caps.num_common_caps) { > - free(channel->local_caps.common_caps); > - } > - > - if (channel->local_caps.num_caps) { > - free(channel->local_caps.caps); > - } > - > - free(channel); > - } > -} > - > -void red_channel_client_ref(RedChannelClient *rcc) > -{ > - rcc->refs++; > -} > - > -void red_channel_client_unref(RedChannelClient *rcc) > -{ > - if (!--rcc->refs) { > - spice_debug("destroy rcc=%p", rcc); > - > - reds_stream_free(rcc->stream); > - rcc->stream = NULL; > - > - if (rcc->send_data.main.marshaller) { > - spice_marshaller_destroy(rcc->send_data.main.marshaller); > - } > - > - if (rcc->send_data.urgent.marshaller) { > - spice_marshaller_destroy(rcc->send_data.urgent.marshaller); > - } > - > - red_channel_client_destroy_remote_caps(rcc); > - if (rcc->channel) { > - red_channel_unref(rcc->channel); > - } > - free(rcc); > - } > -} > - > -void red_channel_client_destroy(RedChannelClient *rcc) > -{ > - rcc->destroying = 1; > - red_channel_client_disconnect(rcc); > - red_client_remove_channel(rcc); > - red_channel_client_unref(rcc); > -} > - > -void red_channel_destroy(RedChannel *channel) > -{ > - if (!channel) { > - return; > - } > - > - g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); > - red_channel_unref(channel); > -} > - > -void red_channel_client_shutdown(RedChannelClient *rcc) > -{ > - if (rcc->stream && !rcc->stream->shutdown) { > - rcc->channel->core->watch_remove(rcc->stream->watch); > - rcc->stream->watch = NULL; > - shutdown(rcc->stream->socket, SHUT_RDWR); > - rcc->stream->shutdown = TRUE; > - } > -} > - > -void red_channel_client_send(RedChannelClient *rcc) > -{ > - red_channel_client_ref(rcc); > - red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); > - red_channel_client_unref(rcc); > -} > - > -void red_channel_send(RedChannel *channel) > -{ > - g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); > -} > - > -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) > -{ > - return (rcc->channel->handle_acks && > - (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); > -} > - > -static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) > -{ > - RedPipeItem *item; > - > - if (!rcc || rcc->send_data.blocked > - || red_channel_client_waiting_for_ack(rcc) > - || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { > - return NULL; > - } > - red_channel_client_pipe_remove(rcc, item); > - return item; > -} > - > -void red_channel_client_push(RedChannelClient *rcc) > -{ > - RedPipeItem *pipe_item; > - > - if (!rcc->during_send) { > - rcc->during_send = TRUE; > - } else { > - return; > - } > - red_channel_client_ref(rcc); > - if (rcc->send_data.blocked) { > - red_channel_client_send(rcc); > - } > - > - if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { > - rcc->send_data.blocked = TRUE; > - spice_printerr("ERROR: an item waiting to be sent and not blocked"); > - } > - > - while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { > - red_channel_client_send_item(rcc, pipe_item); > - } > - if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) > - && rcc->stream->watch) { > - rcc->channel->core->watch_update_mask(rcc->stream->watch, > - SPICE_WATCH_EVENT_READ); > - } > - rcc->during_send = FALSE; > - red_channel_client_unref(rcc); > -} > - > -void red_channel_push(RedChannel *channel) > -{ > - if (!channel) { > - return; > - } > - > - g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); > -} > - > -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) > -{ > - if (rcc->latency_monitor.roundtrip < 0) { > - return rcc->latency_monitor.roundtrip; > - } > - return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; > -} > - > -static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) > -{ > - rcc->ack_data.messages_window = 0; > - red_channel_client_push(rcc); > -} > - > -// TODO: this function doesn't make sense because the window should be client (WAN/LAN) > -// specific > -void red_channel_init_outgoing_messages_window(RedChannel *channel) > -{ > - g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); > -} > - > -static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) > -{ > - if (rcc->channel->channel_cbs.handle_migrate_flush_mark) { > - rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc); > - } > -} > - > -// TODO: the whole migration is broken with multiple clients. What do we want to do? > -// basically just > -// 1) source send mark to all > -// 2) source gets at various times the data (waits for all) > -// 3) source migrates to target > -// 4) target sends data to all > -// So need to make all the handlers work with per channel/client data (what data exactly?) > -static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) > -{ > - spice_debug("channel type %d id %d rcc %p size %u", > - rcc->channel->type, rcc->channel->id, rcc, size); > - if (!rcc->channel->channel_cbs.handle_migrate_data) { > - return; > - } > - if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { > - spice_channel_client_error(rcc, "unexpected"); > - return; > - } > - if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) { > - red_channel_client_set_message_serial(rcc, > - rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); > - } > - if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) { > - spice_channel_client_error(rcc, "handle_migrate_data failed"); > - return; > - } > - red_channel_client_seamless_migration_done(rcc); > -} > - > -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) > -{ > - uint64_t passed, timeout; > - > - passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; > - timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; > - if (passed < PING_TEST_TIMEOUT_MS) { > - timeout += PING_TEST_TIMEOUT_MS - passed; > - } > - > - red_channel_client_start_ping_timer(rcc, timeout); > -} > - > -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) > -{ > - if (!rcc->latency_monitor.timer) { > - return; > - } > - if (rcc->latency_monitor.state != PING_STATE_NONE) { > - return; > - } > - rcc->latency_monitor.state = PING_STATE_TIMER; > - rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); > -} > - > -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) > -{ > - if (!rcc->latency_monitor.timer) { > - return; > - } > - if (rcc->latency_monitor.state != PING_STATE_TIMER) { > - return; > - } > - > - rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); > - rcc->latency_monitor.state = PING_STATE_NONE; > -} > - > -static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) > -{ > - uint64_t now; > - > - /* ignoring unexpected pongs, or post-migration pongs for pings that > - * started just before migration */ > - if (ping->id != rcc->latency_monitor.id) { > - spice_warning("ping-id (%u)!= pong-id %u", > - rcc->latency_monitor.id, ping->id); > - return; > - } > - > - now = spice_get_monotonic_time_ns(); > - > - if (rcc->latency_monitor.state == PING_STATE_WARMUP) { > - rcc->latency_monitor.state = PING_STATE_LATENCY; > - return; > - } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { > - spice_warning("unexpected"); > - return; > - } > - > - /* set TCP_NODELAY=0, in case we reverted it for the test*/ > - if (!rcc->latency_monitor.tcp_nodelay) { > - int delay_val = 0; > - > - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, > - sizeof(delay_val)) == -1) { > - if (errno != ENOTSUP) { > - spice_warning("setsockopt failed, %s", strerror(errno)); > - } > - } > - } > + const SpiceCoreInterfaceInternal *core, > + uint32_t type, uint32_t id, > + int handle_acks, > + spice_parse_channel_func_t parser, > + channel_handle_parsed_proc handle_parsed, > + const ChannelCbs *channel_cbs, > + uint32_t migration_flags) > +{ > + RedChannel *channel = red_channel_create(size, reds, core, type, id, > + handle_acks, > + do_nothing_handle_message, > + channel_cbs, > + migration_flags); > > - /* > - * The real network latency shouldn't change during the connection. However, > - * the measurements can be bigger than the real roundtrip due to other > - * threads or processes that are utilizing the network. We update the roundtrip > - * measurement with the minimal value we encountered till now. > - */ > - if (rcc->latency_monitor.roundtrip < 0 || > - now - ping->timestamp < rcc->latency_monitor.roundtrip) { > - rcc->latency_monitor.roundtrip = now - ping->timestamp; > - spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); > + if (channel == NULL) { > + return NULL; > } I know this patch intention is to just move code but as quick note, I can't see how channel can be NULL on red_channel_create() so this bit could be removed in the future? Maybe a FIXME? > + channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; > + channel->incoming_cb.parser = parser; > > - rcc->latency_monitor.last_pong_time = now; > - rcc->latency_monitor.state = PING_STATE_NONE; > - red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); > + return channel; > } > > -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, > - uint16_t type, void *message) > +void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) > { > - switch (type) { > - case SPICE_MSGC_ACK_SYNC: > - if (size != sizeof(uint32_t)) { > - spice_printerr("bad message size"); > - return FALSE; > - } > - rcc->ack_data.client_generation = *(uint32_t *)(message); > - break; > - case SPICE_MSGC_ACK: > - if (rcc->ack_data.client_generation == rcc->ack_data.generation) { > - rcc->ack_data.messages_window -= rcc->ack_data.client_window; > - red_channel_client_push(rcc); > - } > - break; > - case SPICE_MSGC_DISCONNECTING: > - break; > - case SPICE_MSGC_MIGRATE_FLUSH_MARK: > - if (!rcc->wait_migrate_flush_mark) { > - spice_error("unexpected flush mark"); > - return FALSE; > - } > - red_channel_handle_migrate_flush_mark(rcc); > - rcc->wait_migrate_flush_mark = FALSE; > - break; > - case SPICE_MSGC_MIGRATE_DATA: > - red_channel_handle_migrate_data(rcc, size, message); > - break; > - case SPICE_MSGC_PONG: > - red_channel_client_handle_pong(rcc, message); > - break; > - default: > - spice_printerr("invalid message type %u", type); > - return FALSE; > - } > - return TRUE; > + spice_return_if_fail(channel != NULL); > + spice_return_if_fail(channel->stat == 0); > + > +#ifdef RED_STATISTICS > + channel->stat = stat; > + channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); > +#endif > } > > -static void red_channel_client_event(int fd, int event, void *data) > +void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) > { > - RedChannelClient *rcc = (RedChannelClient *)data; > + spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); > + channel->client_cbs.connect = client_cbs->connect; > > - red_channel_client_ref(rcc); > - if (event & SPICE_WATCH_EVENT_READ) { > - red_channel_client_receive(rcc); > - } > - if (event & SPICE_WATCH_EVENT_WRITE) { > - red_channel_client_push(rcc); > + if (client_cbs->disconnect) { > + channel->client_cbs.disconnect = client_cbs->disconnect; > } > - red_channel_client_unref(rcc); > -} > > -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) > -{ > - spice_assert(red_channel_client_no_item_being_sent(rcc)); > - spice_assert(msg_type != 0); > - rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); > - rcc->send_data.item = item; > - if (item) { > - red_pipe_item_ref(item); > + if (client_cbs->migrate) { > + channel->client_cbs.migrate = client_cbs->migrate; > } > + channel->data = cbs_data; > } > > -void red_channel_client_begin_send_message(RedChannelClient *rcc) > +int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) > { > - SpiceMarshaller *m = rcc->send_data.marshaller; > - > - // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) > - if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { > - spice_printerr("BUG: header->type == 0"); > - return; > + uint32_t index = cap / 32; > + if (num_caps < index + 1) { > + return FALSE; > } > > - /* canceling the latency test timer till the nework is idle */ > - red_channel_client_cancel_ping_timer(rcc); > - > - spice_marshaller_flush(m); > - rcc->send_data.size = spice_marshaller_get_total_size(m); > - rcc->send_data.header.set_msg_size(&rcc->send_data.header, > - rcc->send_data.size - rcc->send_data.header.header_size); > - rcc->ack_data.messages_window++; > - rcc->send_data.last_sent_serial = rcc->send_data.serial; > - rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ > - red_channel_client_send(rcc); > + return (caps[index] & (1 << (cap % 32))) != 0; > } > > -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) > +static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) > { > - spice_assert(red_channel_client_no_item_being_sent(rcc)); > - spice_assert(rcc->send_data.header.data != NULL); > - rcc->send_data.main.header_data = rcc->send_data.header.data; > - rcc->send_data.main.item = rcc->send_data.item; > - > - rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; > - rcc->send_data.item = NULL; > - red_channel_client_reset_send_data(rcc); > - return rcc->send_data.marshaller; > -} > + int nbefore, n; > > -static void red_channel_client_restore_main_sender(RedChannelClient *rcc) > -{ > - spice_marshaller_reset(rcc->send_data.urgent.marshaller); > - rcc->send_data.marshaller = rcc->send_data.main.marshaller; > - rcc->send_data.header.data = rcc->send_data.main.header_data; > - if (!rcc->is_mini_header) { > - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); > - } > - rcc->send_data.item = rcc->send_data.main.item; > + nbefore = *num_caps; > + n = cap / 32; > + *num_caps = MAX(*num_caps, n + 1); > + *caps = spice_renew(uint32_t, *caps, *num_caps); > + memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); > + (*caps)[n] |= (1 << (cap % 32)); > } > > -uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) > +void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) > { > - return rcc->send_data.serial; > + add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); > } > > -void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) > +void red_channel_set_cap(RedChannel *channel, uint32_t cap) > { > - rcc->send_data.last_sent_serial = serial; > - rcc->send_data.serial = serial; > + add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); > } > > -static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) > +void red_channel_ref(RedChannel *channel) > { > - spice_assert(rcc && item); > - if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { > - spice_debug("rcc is disconnected %p", rcc); > - red_pipe_item_unref(item); > - return FALSE; > - } > - if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { > - rcc->channel->core->watch_update_mask(rcc->stream->watch, > - SPICE_WATCH_EVENT_READ | > - SPICE_WATCH_EVENT_WRITE); > - } > - rcc->pipe_size++; > - ring_add(pos, &item->link); > - return TRUE; > + channel->refs++; > } > > -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) > +void red_channel_unref(RedChannel *channel) > { > + if (--channel->refs == 0) { > + if (channel->local_caps.num_common_caps) { > + free(channel->local_caps.common_caps); > + } > > - client_pipe_add(rcc, item, &rcc->pipe); > -} > + if (channel->local_caps.num_caps) { > + free(channel->local_caps.caps); > + } > > -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) > -{ > - red_channel_client_pipe_add(rcc, item); > - red_channel_client_push(rcc); > + free(channel); > + } > } > > -void red_channel_client_pipe_add_after(RedChannelClient *rcc, > - RedPipeItem *item, > - RedPipeItem *pos) > +void red_channel_destroy(RedChannel *channel) > { > - spice_assert(pos); > - client_pipe_add(rcc, item, &pos->link); > -} > + if (!channel) { > + return; > + } > > -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, > - RedPipeItem *item) > -{ > - return ring_item_is_linked(&item->link); > + g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); > + red_channel_unref(channel); > } > > -static void red_channel_client_pipe_add_tail(RedChannelClient *rcc, > - RedPipeItem *item) > +void red_channel_send(RedChannel *channel) > { > - client_pipe_add(rcc, item, rcc->pipe.prev); > + g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); > } > > -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) > +void red_channel_push(RedChannel *channel) > { > - if (client_pipe_add(rcc, item, rcc->pipe.prev)) { > - red_channel_client_push(rcc); > + if (!channel) { > + return; > } > + > + g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); > } > > -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) > +// TODO: this function doesn't make sense because the window should be client (WAN/LAN) > +// specific > +void red_channel_init_outgoing_messages_window(RedChannel *channel) > { > - RedPipeItem *item = spice_new(RedPipeItem, 1); > - > - red_pipe_item_init(item, pipe_item_type); > - red_channel_client_pipe_add(rcc, item); > - red_channel_client_push(rcc); > + g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); > } > > static void red_channel_client_pipe_add_type_proxy(gpointer data, gpointer user_data) > @@ -1704,16 +425,6 @@ void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type) > GINT_TO_POINTER(pipe_item_type)); > } > > -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) > -{ > - RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); > - > - red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); > - item->msg = msg_type; > - red_channel_client_pipe_add(rcc, &item->base); > - red_channel_client_push(rcc); > -} > - > static void red_channel_client_pipe_add_empty_msg_proxy(gpointer data, gpointer user_data) > { > int type = GPOINTER_TO_INT(user_data); > @@ -1725,117 +436,38 @@ void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type) > g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type)); > } > > -int red_channel_client_is_connected(RedChannelClient *rcc) > -{ > - if (!rcc->dummy) { > - return rcc->channel > - && (g_list_find(rcc->channel->clients, rcc) != NULL); > - } else { > - return rcc->dummy_connected; > - } > -} > - > int red_channel_is_connected(RedChannel *channel) > { > return channel && channel->clients; > } > > -static void red_channel_client_clear_sent_item(RedChannelClient *rcc) > -{ > - red_channel_client_release_sent_item(rcc); > - rcc->send_data.blocked = FALSE; > - rcc->send_data.size = 0; > -} > - > -void red_channel_client_pipe_clear(RedChannelClient *rcc) > -{ > - RedPipeItem *item; > - > - if (rcc) { > - red_channel_client_clear_sent_item(rcc); > - } > - while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { > - ring_remove(&item->link); > - red_pipe_item_unref(item); > - } > - rcc->pipe_size = 0; > -} > - > -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) > -{ > - rcc->ack_data.messages_window = 0; > -} > - > -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) > -{ > - rcc->ack_data.client_window = client_window; > -} > - > -static void red_channel_remove_client(RedChannelClient *rcc) > +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc) > { > GList *link; > + g_return_if_fail(channel == red_channel_client_get_channel(rcc)); > > - if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) { > + if (!pthread_equal(pthread_self(), channel->thread_id)) { > spice_warning("channel type %d id %d - " > "channel->thread_id (0x%lx) != pthread_self (0x%lx)." > "If one of the threads is != io-thread && != vcpu-thread, " > "this might be a BUG", > - rcc->channel->type, rcc->channel->id, > - rcc->channel->thread_id, pthread_self()); > + channel->type, channel->id, > + channel->thread_id, pthread_self()); > } > - spice_return_if_fail(rcc->channel); > - link = g_list_find(rcc->channel->clients, rcc); > + spice_return_if_fail(channel); > + link = g_list_find(channel->clients, rcc); > spice_return_if_fail(link != NULL); > > - rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link); > + channel->clients = g_list_remove_link(channel->clients, link); > // TODO: should we set rcc->channel to NULL??? > } > > -static void red_client_remove_channel(RedChannelClient *rcc) > -{ > - pthread_mutex_lock(&rcc->client->lock); > - rcc->client->channels = g_list_remove(rcc->client->channels, rcc); > - pthread_mutex_unlock(&rcc->client->lock); > -} > - > -static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) > -{ > - GList *link; > - spice_assert(rcc->dummy); > - if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) { > - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, > - rcc->channel->type, rcc->channel->id); > - red_channel_remove_client(link->data); > - } > - rcc->dummy_connected = FALSE; > -} > - > -void red_channel_client_disconnect(RedChannelClient *rcc) > +void red_client_remove_channel(RedChannelClient *rcc) > { > - if (rcc->dummy) { > - red_channel_client_disconnect_dummy(rcc); > - return; > - } > - if (!red_channel_client_is_connected(rcc)) { > - return; > - } > - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, > - rcc->channel->type, rcc->channel->id); > - red_channel_client_pipe_clear(rcc); > - if (rcc->stream->watch) { > - rcc->channel->core->watch_remove(rcc->stream->watch); > - rcc->stream->watch = NULL; > - } > - if (rcc->latency_monitor.timer) { > - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); > - rcc->latency_monitor.timer = NULL; > - } > - if (rcc->connectivity_monitor.timer) { > - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); > - rcc->connectivity_monitor.timer = NULL; > - } > - red_channel_remove_client(rcc); > - rcc->channel->channel_cbs.on_disconnect(rcc); > + RedClient *client = red_channel_client_get_client(rcc); > + pthread_mutex_lock(&client->lock); > + client->channels = g_list_remove(client->channels, rcc); > + pthread_mutex_unlock(&client->lock); > } > > void red_channel_disconnect(RedChannel *channel) > @@ -1843,51 +475,6 @@ void red_channel_disconnect(RedChannel *channel) > g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL); > } > > -RedChannelClient *red_channel_client_create_dummy(int size, > - RedChannel *channel, > - RedClient *client, > - int num_common_caps, uint32_t *common_caps, > - int num_caps, uint32_t *caps) > -{ > - RedChannelClient *rcc = NULL; > - > - spice_assert(size >= sizeof(RedChannelClient)); > - > - pthread_mutex_lock(&client->lock); > - if (!red_channel_client_pre_create_validate(channel, client)) { > - goto error; > - } > - rcc = spice_malloc0(size); > - rcc->refs = 1; > - rcc->client = client; > - rcc->channel = channel; > - red_channel_ref(channel); > - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); > - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { > - rcc->incoming.header = mini_header_wrapper; > - rcc->send_data.header = mini_header_wrapper; > - rcc->is_mini_header = TRUE; > - } else { > - rcc->incoming.header = full_header_wrapper; > - rcc->send_data.header = full_header_wrapper; > - rcc->is_mini_header = FALSE; > - } > - > - rcc->incoming.header.data = rcc->incoming.header_buf; > - rcc->incoming.serial = 1; > - ring_init(&rcc->pipe); > - > - rcc->dummy = TRUE; > - rcc->dummy_connected = TRUE; > - red_channel_add_client(channel, rcc); > - red_client_add_channel(client, rcc); > - pthread_mutex_unlock(&client->lock); > - return rcc; > -error: > - pthread_mutex_unlock(&client->lock); > - return NULL; > -} > - > void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) > { > g_list_foreach(channel->clients, (GFunc)cb, NULL); > @@ -1928,49 +515,18 @@ int red_channel_any_blocked(RedChannel *channel) > return FALSE; > } > > -int red_channel_client_is_blocked(RedChannelClient *rcc) > -{ > - return rcc && rcc->send_data.blocked; > -} > - > -int red_channel_client_send_message_pending(RedChannelClient *rcc) > -{ > - return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; > -} > - > -/* accessors for RedChannelClient */ > -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) > -{ > - return rcc->send_data.marshaller; > -} > - > -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) > -{ > - return rcc->stream; > -} > - > -RedClient *red_channel_client_get_client(RedChannelClient *rcc) > -{ > - return rcc->client; > -} > - > -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) > -{ > - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); > -} > - > -/* end of accessors */ > - > int red_channel_get_first_socket(RedChannel *channel) > { > RedChannelClient *rcc; > + RedsStream *stream; > > if (!channel || !channel->clients) { > return -1; > } > rcc = g_list_nth_data(channel->clients, 0); > + stream = red_channel_client_get_stream(rcc); > > - return rcc->stream->socket; > + return stream->socket; > } > > int red_channel_no_item_being_sent(RedChannel *channel) > @@ -1986,18 +542,6 @@ int red_channel_no_item_being_sent(RedChannel *channel) > return TRUE; > } > > -int red_channel_client_no_item_being_sent(RedChannelClient *rcc) > -{ > - return !rcc || (rcc->send_data.size == 0); > -} > - > -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, > - RedPipeItem *item) > -{ > - red_channel_client_pipe_remove(rcc, item); > - red_pipe_item_unref(item); > -} > - > /* > * RedClient implementation - kept in red-channel.c because they are > * pretty tied together. > @@ -2035,21 +579,6 @@ RedClient *red_client_unref(RedClient *client) > return client; > } > > -/* client mutex should be locked before this call */ > -static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) > -{ > - gboolean ret = FALSE; > - > - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { > - rcc->wait_migrate_data = TRUE; > - ret = TRUE; > - } > - spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, > - rcc->wait_migrate_data); > - > - return ret; > -} > - > void red_client_set_migration_seamless(RedClient *client) // dest > { > GList *link; > @@ -2118,9 +647,8 @@ void red_client_destroy(RedClient *client) > // TODO: should we go back to async. For this we need to use > // ref count for channel clients. > channel->client_cbs.disconnect(rcc); > - spice_assert(ring_is_empty(&rcc->pipe)); > - spice_assert(rcc->pipe_size == 0); > - spice_assert(rcc->send_data.size == 0); > + spice_assert(red_channel_client_pipe_is_empty(rcc)); > + spice_assert(red_channel_client_no_item_being_sent(rcc)); > red_channel_client_destroy(rcc); > link = next; > } > @@ -2128,7 +656,7 @@ void red_client_destroy(RedClient *client) > } > > /* client->lock should be locked */ > -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) > +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) > { > GList *link; > RedChannelClient *rcc; > @@ -2147,7 +675,7 @@ static RedChannelClient *red_client_get_channel(RedClient *client, int type, int > } > > /* client->lock should be locked */ > -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) > +void red_client_add_channel(RedClient *client, RedChannelClient *rcc) > { > spice_assert(rcc && client); > client->channels = g_list_prepend(client->channels, rcc); > @@ -2179,11 +707,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client) > link = client->channels; > while (link) { > next = link->next; > - RedChannelClient *rcc = link->data; > - > - if (rcc->latency_monitor.timer) { > - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); > - } > + red_channel_client_semi_seamless_migration_complete(link->data); > link = next; > } > pthread_mutex_unlock(&client->lock); > @@ -2275,8 +799,10 @@ uint32_t red_channel_max_pipe_size(RedChannel *channel) > uint32_t pipe_size = 0; > > for (link = channel->clients; link != NULL; link = link->next) { > + uint32_t new_size; > rcc = link->data; > - pipe_size = MAX(pipe_size, rcc->pipe_size); > + new_size = red_channel_client_get_pipe_size(rcc); > + pipe_size = MAX(pipe_size, new_size); > } > return pipe_size; > } > @@ -2288,7 +814,9 @@ uint32_t red_channel_min_pipe_size(RedChannel *channel) > uint32_t pipe_size = ~0; > > FOREACH_CLIENT(channel, link, next, rcc) { > - pipe_size = MIN(pipe_size, rcc->pipe_size); > + uint32_t new_size; > + new_size = red_channel_client_get_pipe_size(rcc); > + pipe_size = MIN(pipe_size, new_size); > } > return pipe_size == ~0 ? 0 : pipe_size; > } > @@ -2300,102 +828,11 @@ uint32_t red_channel_sum_pipes_size(RedChannel *channel) > uint32_t sum = 0; > > FOREACH_CLIENT(channel, link, next, rcc) { > - sum += rcc->pipe_size; > + sum += red_channel_client_get_pipe_size(rcc); > } > return sum; > } > > -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, > - int64_t timeout) > -{ > - uint64_t end_time; > - int blocked; > - > - if (!red_channel_client_is_blocked(rcc)) { > - return TRUE; > - } > - if (timeout != -1) { > - end_time = spice_get_monotonic_time_ns() + timeout; > - } else { > - end_time = UINT64_MAX; > - } > - spice_info("blocked"); > - > - do { > - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); > - red_channel_client_receive(rcc); > - red_channel_client_send(rcc); > - } while ((blocked = red_channel_client_is_blocked(rcc)) && > - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); > - > - if (blocked) { > - spice_warning("timeout"); > - return FALSE; > - } else { > - spice_assert(red_channel_client_no_item_being_sent(rcc)); > - return TRUE; > - } > -} > - > -static void marker_pipe_item_free(RedPipeItem *base) > -{ > - MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base); > - > - if (item->item_in_pipe) { > - *item->item_in_pipe = FALSE; > - } > - free(item); > -} > - > -/* TODO: more evil sync stuff. anything with the word wait in it's name. */ > -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, > - RedPipeItem *item, > - int64_t timeout) > -{ > - uint64_t end_time; > - gboolean item_in_pipe; > - > - spice_info(NULL); > - > - if (timeout != -1) { > - end_time = spice_get_monotonic_time_ns() + timeout; > - } else { > - end_time = UINT64_MAX; > - } > - > - MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1); > - > - red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER, > - marker_pipe_item_free); > - item_in_pipe = TRUE; > - mark_item->item_in_pipe = &item_in_pipe; > - red_channel_client_pipe_add_after(rcc, &mark_item->base, item); > - > - if (red_channel_client_is_blocked(rcc)) { > - red_channel_client_receive(rcc); > - red_channel_client_send(rcc); > - } > - red_channel_client_push(rcc); > - > - while(item_in_pipe && > - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { > - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); > - red_channel_client_receive(rcc); > - red_channel_client_send(rcc); > - red_channel_client_push(rcc); > - } > - > - if (item_in_pipe) { > - // still on the queue, make sure won't overwrite the stack variable > - mark_item->item_in_pipe = NULL; > - spice_warning("timeout"); > - return FALSE; > - } else { > - return red_channel_client_wait_outgoing_item(rcc, > - timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); > - } > -} > - > int red_channel_wait_all_sent(RedChannel *channel, > int64_t timeout) > { > @@ -2430,31 +867,7 @@ int red_channel_wait_all_sent(RedChannel *channel, > } > } > > -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) > -{ > - if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { > - red_channel_client_disconnect(rcc); > - } else { > - spice_assert(red_channel_client_no_item_being_sent(rcc)); > - } > -} > - > RedsState* red_channel_get_server(RedChannel *channel) > { > return channel->reds; > } > - > -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc) > -{ > - return rcc->channel; > -} > - > -gboolean red_channel_client_is_destroying(RedChannelClient *rcc) > -{ > - return rcc->destroying; > -} > - > -void red_channel_client_set_destroying(RedChannelClient *rcc) > -{ > - rcc->destroying = TRUE; > -} > diff --git a/server/red-channel.h b/server/red-channel.h > index 370f6a7..68bfc7a 100644 > --- a/server/red-channel.h > +++ b/server/red-channel.h > @@ -89,17 +89,6 @@ typedef struct IncomingHandlerInterface { > on_input_proc on_input; > } IncomingHandlerInterface; > > -typedef struct IncomingHandler { > - IncomingHandlerInterface *cb; > - void *opaque; > - uint8_t header_buf[MAX_HEADER_SIZE]; > - SpiceDataHeaderOpaque header; > - uint32_t header_pos; > - uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. > - uint32_t msg_pos; > - uint64_t serial; > -} IncomingHandler; > - > typedef int (*get_outgoing_msg_size_proc)(void *opaque); > typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos); > typedef void (*on_outgoing_error_proc)(void *opaque); > @@ -116,16 +105,6 @@ typedef struct OutgoingHandlerInterface { > on_output_proc on_output; > } OutgoingHandlerInterface; > > -typedef struct OutgoingHandler { > - OutgoingHandlerInterface *cb; > - void *opaque; > - struct iovec vec_buf[IOV_MAX]; > - int vec_size; > - struct iovec *vec; > - int pos; > - int size; > -} OutgoingHandler; > - > /* Red Channel interface */ > > typedef struct RedChannel RedChannel; > @@ -231,62 +210,6 @@ typedef struct RedChannelClientConnectivityMonitor { > SpiceTimer *timer; > } RedChannelClientConnectivityMonitor; > > -struct RedChannelClient { > - RedChannel *channel; > - RedClient *client; > - RedsStream *stream; > - int dummy; > - int dummy_connected; > - > - uint32_t refs; > - > - struct { > - uint32_t generation; > - uint32_t client_generation; > - uint32_t messages_window; > - uint32_t client_window; > - } ack_data; > - > - struct { > - SpiceMarshaller *marshaller; > - SpiceDataHeaderOpaque header; > - uint32_t size; > - RedPipeItem *item; > - int blocked; > - uint64_t serial; > - uint64_t last_sent_serial; > - > - struct { > - SpiceMarshaller *marshaller; > - uint8_t *header_data; > - RedPipeItem *item; > - } main; > - > - struct { > - SpiceMarshaller *marshaller; > - } urgent; > - } send_data; > - > - OutgoingHandler outgoing; > - IncomingHandler incoming; > - int during_send; > - int id; // debugging purposes > - Ring pipe; > - uint32_t pipe_size; > - > - RedChannelCapabilities remote_caps; > - int is_mini_header; > - int destroying; > - > - int wait_migrate_data; > - int wait_migrate_flush_mark; > - > - RedChannelClientLatencyMonitor latency_monitor; > - RedChannelClientConnectivityMonitor connectivity_monitor; > -}; > - > -#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) > - > struct RedChannel { > uint32_t type; > uint32_t id; > @@ -339,17 +262,6 @@ struct RedChannel { > > #define RED_CHANNEL(Channel) ((RedChannel *)(Channel)) > > -/* > - * When an error occurs over a channel, we treat it as a warning > - * for spice-server and shutdown the channel. > - */ > -#define spice_channel_client_error(rcc, format, ...) \ > - do { \ > - spice_warning("rcc %p type %u id %u: " format, rcc, \ > - rcc->channel->type, rcc->channel->id, ## __VA_ARGS__); \ > - red_channel_client_shutdown(rcc); \ > - } while (0) > - > /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't > * explicitly destroy the channel */ > RedChannel *red_channel_create(int size, > @@ -372,6 +284,11 @@ RedChannel *red_channel_create_parser(int size, > channel_handle_parsed_proc handle_parsed, > const ChannelCbs *channel_cbs, > uint32_t migration_flags); > +void red_channel_ref(RedChannel *channel); > +void red_channel_unref(RedChannel *channel); > +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc); > +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc); > + > void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat); > > void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data); > @@ -379,26 +296,13 @@ void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *clien > void red_channel_set_common_cap(RedChannel *channel, uint32_t cap); > void red_channel_set_cap(RedChannel *channel, uint32_t cap); > > -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, > - RedsStream *stream, > - int monitor_latency, > - int num_common_caps, uint32_t *common_caps, > - int num_caps, uint32_t *caps); > // TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but > // do use the client callbacks. So the channel clients are not connected (the channel doesn't > // have list of them, but they do have a link to the channel, and the client has a list of them) > RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, uint32_t id); > -RedChannelClient *red_channel_client_create_dummy(int size, > - RedChannel *channel, > - RedClient *client, > - int num_common_caps, uint32_t *common_caps, > - int num_caps, uint32_t *caps); > > int red_channel_is_connected(RedChannel *channel); > -int red_channel_client_is_connected(RedChannelClient *rcc); > > -void red_channel_client_default_migrate(RedChannelClient *rcc); > -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); > /* seamless migration is supported for only one client. This routine > * checks if the only channel client associated with channel is > * waiting for migration data */ > @@ -411,61 +315,15 @@ int red_channel_is_waiting_for_migrate_data(RedChannel *channel); > * from red_client_destroy. > */ > > -void red_channel_client_destroy(RedChannelClient *rcc); > void red_channel_destroy(RedChannel *channel); > -void red_channel_client_ref(RedChannelClient *rcc); > -void red_channel_client_unref(RedChannelClient *rcc); > - > -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); > -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); > > /* return true if all the channel clients support the cap */ > int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap); > int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap); > > -/* shutdown is the only safe thing to do out of the client/channel > - * thread. It will not touch the rings, just shutdown the socket. > - * It should be followed by some way to gurantee a disconnection. */ > -void red_channel_client_shutdown(RedChannelClient *rcc); > - > /* should be called when a new channel is ready to send messages */ > void red_channel_init_outgoing_messages_window(RedChannel *channel); > > -/* handles general channel msgs from the client */ > -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, > - uint16_t type, void *message); > - > -/* when preparing send_data: should call init and then use marshaller */ > -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); > - > -uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); > -void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); > - > -/* When sending a msg. Should first call red_channel_client_begin_send_message. > - * It will first send the pending urgent data, if there is any, and then > - * the rest of the data. > - */ > -void red_channel_client_begin_send_message(RedChannelClient *rcc); > - > -/* > - * Stores the current send data, and switches to urgent send data. > - * When it begins the actual send, it will send first the urgent data > - * and afterward the rest of the data. > - * Should be called only if during the marshalling of on message, > - * the need to send another message, before, rises. > - * Important: the serial of the non-urgent sent data, will be succeeded. > - * return: the urgent send data marshaller > - */ > -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); > - > -/* returns -1 if we don't have an estimation */ > -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); > - > -/* > - * Checks periodically if the connection is still alive > - */ > -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); > - > // TODO: add back the channel_pipe_add functionality - by adding reference counting > // to the RedPipeItem. > > @@ -475,23 +333,10 @@ int red_channel_pipes_new_add_push(RedChannel *channel, new_pipe_item_t creator, > void red_channel_pipes_new_add(RedChannel *channel, new_pipe_item_t creator, void *data); > void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator, void *data); > > -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); > -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); > -void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); > -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); > -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); > -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); > -/* for types that use this routine -> the pipe item should be freed */ > -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); > void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type); > > -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); > void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type); > > -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); > -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); > -void red_channel_client_push_set_ack(RedChannelClient *rcc); > - > int red_channel_get_first_socket(RedChannel *channel); > > /* return TRUE if all of the connected clients to this channel are blocked */ > @@ -500,24 +345,13 @@ int red_channel_all_blocked(RedChannel *channel); > /* return TRUE if any of the connected clients to this channel are blocked */ > int red_channel_any_blocked(RedChannel *channel); > > -int red_channel_client_is_blocked(RedChannelClient *rcc); > - > -/* helper for channels that have complex logic that can possibly ready a send */ > -int red_channel_client_send_message_pending(RedChannelClient *rcc); > - > int red_channel_no_item_being_sent(RedChannel *channel); > -int red_channel_client_no_item_being_sent(RedChannelClient *rcc); > > // TODO: unstaticed for display/cursor channels. they do some specific pushes not through > // adding elements or on events. but not sure if this is actually required (only result > // should be that they ""try"" a little harder, but if the event system is correct it > // should not make any difference. > void red_channel_push(RedChannel *channel); > -void red_channel_client_push(RedChannelClient *rcc); > -// TODO: again - what is the context exactly? this happens in channel disconnect. but our > -// current red_channel_shutdown also closes the socket - is there a socket to close? > -// are we reading from an fd here? arghh > -void red_channel_client_pipe_clear(RedChannelClient *rcc); > // Again, used in various places outside of event handler context (or in other event handler > // contexts): > // flush_display_commands/flush_cursor_commands > @@ -526,23 +360,10 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc); > // red_wait_pipe_item_sent > // handle_channel_events - this is the only one that was used before, and was in red-channel.c > void red_channel_receive(RedChannel *channel); > -void red_channel_client_receive(RedChannelClient *rcc); > // For red_worker > void red_channel_send(RedChannel *channel); > -void red_channel_client_send(RedChannelClient *rcc); > // For red_worker > void red_channel_disconnect(RedChannel *channel); > -void red_channel_client_disconnect(RedChannelClient *rcc); > - > -/* accessors for RedChannelClient */ > -/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ > -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); > -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); > -RedClient *red_channel_client_get_client(RedChannelClient *rcc); > - > -/* Note that the header is valid only between red_channel_reset_send_data and > - * red_channel_begin_send_message.*/ > -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); > > /* return the sum of all the rcc pipe size */ > uint32_t red_channel_max_pipe_size(RedChannel *channel); > @@ -596,6 +417,11 @@ RedClient *red_client_ref(RedClient *client); > */ > RedClient *red_client_unref(RedClient *client); > > +/* client->lock should be locked */ > +void red_client_add_channel(RedClient *client, RedChannelClient *rcc); > +void red_client_remove_channel(RedChannelClient *rcc); > +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); > + > MainChannelClient *red_client_get_main(RedClient *client); > // main should be set once before all the other channels are created > void red_client_set_main(RedClient *client, MainChannelClient *mcc); > @@ -611,7 +437,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side * > int red_client_during_migrate_at_target(RedClient *client); > > void red_client_migrate(RedClient *client); > - > +gboolean red_client_seamless_migration_done_for_channel(RedClient *client); > /* > * blocking functions. > * > @@ -620,16 +446,9 @@ void red_client_migrate(RedClient *client); > * Return: TRUE if waiting succeeded. FALSE if timeout expired. > */ > > -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, > - RedPipeItem *item, > - int64_t timeout); > -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, > - int64_t timeout); > int red_channel_wait_all_sent(RedChannel *channel, > int64_t timeout); > -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); > -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc); > -gboolean red_channel_client_is_destroying(RedChannelClient *rcc); > -void red_channel_client_set_destroying(RedChannelClient *rcc); > + > +#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro > > #endif > diff --git a/server/red-worker.h b/server/red-worker.h > index 368bfa1..a0a327a 100644 > --- a/server/red-worker.h > +++ b/server/red-worker.h > @@ -21,6 +21,7 @@ > #include "red-common.h" > #include "red-qxl.h" > #include "red-parse-qxl.h" > +#include "red-channel-client.h" > > typedef struct RedWorker RedWorker; > > diff --git a/server/reds.c b/server/reds.c > index 74f7727..e75e1fd 100644 > --- a/server/reds.c > +++ b/server/reds.c > @@ -72,6 +72,7 @@ > > #include "reds-private.h" > #include "video-encoder.h" > +#include "red-channel-client.h" > > static void reds_client_monitors_config(RedsState *reds, VDAgentMonitorsConfig *monitors_config); > static gboolean reds_use_client_monitors_config(RedsState *reds); > diff --git a/server/smartcard.c b/server/smartcard.c > index a8a16c7..74c2b18 100644 > --- a/server/smartcard.c > +++ b/server/smartcard.c > @@ -30,7 +30,7 @@ > > #include "reds.h" > #include "char-device.h" > -#include "red-channel.h" > +#include "red-channel-client.h" > #include "smartcard.h" > #include "migration-protocol.h" > > diff --git a/server/sound.c b/server/sound.c > index 84cbab4..52fe1c3 100644 > --- a/server/sound.c > +++ b/server/sound.c > @@ -34,6 +34,7 @@ > #include "main-channel.h" > #include "reds.h" > #include "red-qxl.h" > +#include "red-channel-client.h" > #include "sound.h" > #include <common/snd_codec.h> > #include "demarshallers.h" > diff --git a/server/spicevmc.c b/server/spicevmc.c > index c79e7bb..3533f3f 100644 > --- a/server/spicevmc.c > +++ b/server/spicevmc.c > @@ -32,6 +32,7 @@ > > #include "char-device.h" > #include "red-channel.h" > +#include "red-channel-client.h" > #include "reds.h" > #include "migration-protocol.h" > #ifdef USE_LZ4 > -- > 2.7.4 Patch is quite big but looks ok to me. Reviewed-by: Victor Toso <victortoso@xxxxxxxxxx> > > _______________________________________________ > Spice-devel mailing list > Spice-devel@xxxxxxxxxxxxxxxxxxxxx > https://lists.freedesktop.org/mailman/listinfo/spice-devel _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel