Reduce direct access to RedChannelClient, and get ready to convert to GObject. --- Changes in v2: - updated copyright dates - removed unused function - made red_channel_client_seamless_migration_done() static - removed NULL check from function that can't return NULL - renamed a couple functions from red_channel_ to red_channel_client_ - moved misplaced comment - removed a few unintentional style changes found by Frediano server/Makefile.am | 2 + server/cursor-channel.c | 2 + server/dcc-private.h | 1 + server/inputs-channel-client.c | 1 + server/inputs-channel.c | 1 + server/main-channel-client.c | 2 + server/main-channel.c | 1 + server/red-channel-client.c | 1626 +++++++++++++++++++++++++++++++++++ server/red-channel-client.h | 248 ++++++ server/red-channel.c | 1821 +++------------------------------------- server/red-channel.h | 207 +---- server/red-worker.h | 1 + server/reds.c | 1 + server/smartcard.c | 2 +- server/sound.c | 1 + server/spicevmc.c | 1 + 16 files changed, 2017 insertions(+), 1901 deletions(-) create mode 100644 server/red-channel-client.c create mode 100644 server/red-channel-client.h diff --git a/server/Makefile.am b/server/Makefile.am index 24e6e21..e275765 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -95,6 +95,8 @@ libserver_la_SOURCES = \ mjpeg-encoder.c \ red-channel.c \ red-channel.h \ + red-channel-client.c \ + red-channel-client.h \ red-common.h \ dispatcher.c \ dispatcher.h \ diff --git a/server/cursor-channel.c b/server/cursor-channel.c index cb3aa49..290f763 100644 --- a/server/cursor-channel.c +++ b/server/cursor-channel.c @@ -21,7 +21,9 @@ #include <glib.h> #include <common/generated_server_marshallers.h> + #include "cursor-channel.h" +#include "red-channel-client.h" #include "cache-item.h" #define CLIENT_CURSOR_CACHE_SIZE 256 diff --git a/server/dcc-private.h b/server/dcc-private.h index 02b51dd..d5aad3f 100644 --- a/server/dcc-private.h +++ b/server/dcc-private.h @@ -22,6 +22,7 @@ #include "dcc.h" #include "image-encoders.h" #include "stream.h" +#include "red-channel-client.h" struct DisplayChannelClient { RedChannelClient base; diff --git a/server/inputs-channel-client.c b/server/inputs-channel-client.c index bdbcf5c..ce21b9c 100644 --- a/server/inputs-channel-client.c +++ b/server/inputs-channel-client.c @@ -21,6 +21,7 @@ #include "inputs-channel-client.h" #include "inputs-channel.h" #include "migration-protocol.h" +#include "red-channel-client.h" struct InputsChannelClient { RedChannelClient base; diff --git a/server/inputs-channel.c b/server/inputs-channel.c index c28273a..da0f027 100644 --- a/server/inputs-channel.c +++ b/server/inputs-channel.c @@ -39,6 +39,7 @@ #include "reds.h" #include "reds-stream.h" #include "red-channel.h" +#include "red-channel-client.h" #include "inputs-channel-client.h" #include "main-channel-client.h" #include "inputs-channel.h" diff --git a/server/main-channel-client.c b/server/main-channel-client.c index 12151a7..bd339d0 100644 --- a/server/main-channel-client.c +++ b/server/main-channel-client.c @@ -23,7 +23,9 @@ #include "main-channel-client.h" #include <common/generated_server_marshallers.h> +#include "main-channel-client.h" #include "main-channel.h" +#include "red-channel-client.h" #include "reds.h" #define NET_TEST_WARMUP_BYTES 0 diff --git a/server/main-channel.c b/server/main-channel.c index 8bb874b..24c69a4 100644 --- a/server/main-channel.c +++ b/server/main-channel.c @@ -24,6 +24,7 @@ #include "red-common.h" #include "main-channel.h" #include "reds.h" +#include "red-channel-client.h" int main_channel_is_connected(MainChannel *main_chan) { diff --git a/server/red-channel-client.c b/server/red-channel-client.c new file mode 100644 index 0000000..b8555ff --- /dev/null +++ b/server/red-channel-client.c @@ -0,0 +1,1626 @@ +/* + Copyright (C) 2009-2016 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <glib.h> +#include <stdio.h> +#include <stdint.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include <sys/ioctl.h> +#ifdef HAVE_LINUX_SOCKIOS_H +#include <linux/sockios.h> /* SIOCOUTQ */ +#endif +#include <common/generated_server_marshallers.h> + +#include "red-channel-client.h" +#include "red-channel.h" + +#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) +#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) + +enum QosPingState { + PING_STATE_NONE, + PING_STATE_TIMER, + PING_STATE_WARMUP, + PING_STATE_LATENCY, +}; + +enum ConnectivityState { + CONNECTIVITY_STATE_CONNECTED, + CONNECTIVITY_STATE_BLOCKED, + CONNECTIVITY_STATE_WAIT_PONG, + CONNECTIVITY_STATE_DISCONNECTED, +}; + +typedef struct RedEmptyMsgPipeItem { + RedPipeItem base; + int msg; +} RedEmptyMsgPipeItem; + +typedef struct MarkerPipeItem { + RedPipeItem base; + gboolean *item_in_pipe; +} MarkerPipeItem; + +static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) +{ + if (!rcc->latency_monitor.timer) { + return; + } + if (rcc->latency_monitor.state != PING_STATE_NONE) { + return; + } + rcc->latency_monitor.state = PING_STATE_TIMER; + rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); +} + +static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) +{ + if (!rcc->latency_monitor.timer) { + return; + } + if (rcc->latency_monitor.state != PING_STATE_TIMER) { + return; + } + + rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); + rcc->latency_monitor.state = PING_STATE_NONE; +} + +static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) +{ + uint64_t passed, timeout; + + passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; + timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; + if (passed < PING_TEST_TIMEOUT_MS) { + timeout += PING_TEST_TIMEOUT_MS - passed; + } + + red_channel_client_start_ping_timer(rcc, timeout); +} + +RedChannel* red_channel_client_get_channel(RedChannelClient *rcc) +{ + return rcc->channel; +} + +void red_channel_client_on_output(void *opaque, int n) +{ + RedChannelClient *rcc = opaque; + + if (rcc->connectivity_monitor.timer) { + rcc->connectivity_monitor.out_bytes += n; + } + stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); +} + +void red_channel_client_on_input(void *opaque, int n) +{ + RedChannelClient *rcc = opaque; + + if (rcc->connectivity_monitor.timer) { + rcc->connectivity_monitor.in_bytes += n; + } +} + +int red_channel_client_get_out_msg_size(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + return rcc->send_data.size; +} + +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, + vec, IOV_MAX, pos); +} + +void red_channel_client_on_out_block(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + + rcc->send_data.blocked = TRUE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ | + SPICE_WATCH_EVENT_WRITE); +} + +static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) +{ + return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); +} + +static void red_channel_client_reset_send_data(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.marshaller); + rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, + rcc->send_data.header.header_size); + spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); + + /* Keeping the serial consecutive: resetting it if reset_send_data + * has been called before, but no message has been sent since then. + */ + if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { + spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); + /* When the urgent marshaller is active, the serial was incremented by + * the call to reset_send_data that was made for the main marshaller. + * The urgent msg receives this serial, and the main msg serial is + * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) + * should be 1 in this case*/ + if (!red_channel_client_urgent_marshaller_is_active(rcc)) { + rcc->send_data.serial = rcc->send_data.last_sent_serial; + } + } + rcc->send_data.serial++; + + if (!rcc->is_mini_header) { + spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } +} + +static void red_channel_client_send_set_ack(RedChannelClient *rcc) +{ + SpiceMsgSetAck ack; + + spice_assert(rcc); + red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); + ack.generation = ++rcc->ack_data.generation; + ack.window = rcc->ack_data.client_window; + rcc->ack_data.messages_window = 0; + + spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); + + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_migrate(RedChannelClient *rcc) +{ + SpiceMsgMigrate migrate; + + red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); + migrate.flags = rcc->channel->migration_flags; + spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { + rcc->wait_migrate_flush_mark = TRUE; + } + + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_ping(RedChannelClient *rcc) +{ + SpiceMsgPing ping; + + if (!rcc->latency_monitor.warmup_was_sent) { // latency test start + int delay_val; + socklen_t opt_size = sizeof(delay_val); + + rcc->latency_monitor.warmup_was_sent = TRUE; + /* + * When testing latency, TCP_NODELAY must be switched on, otherwise, + * sending the ping message is delayed by Nagle algorithm, and the + * roundtrip measurement is less accurate (bigger). + */ + rcc->latency_monitor.tcp_nodelay = 1; + if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + &opt_size) == -1) { + spice_warning("getsockopt failed, %s", strerror(errno)); + } else { + rcc->latency_monitor.tcp_nodelay = delay_val; + if (!delay_val) { + delay_val = 1; + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + if (errno != ENOTSUP) { + spice_warning("setsockopt failed, %s", strerror(errno)); + } + } + } + } + } + + red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); + ping.id = rcc->latency_monitor.id; + ping.timestamp = spice_get_monotonic_time_ns(); + spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) +{ + RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base); + + red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); + red_channel_client_begin_send_message(rcc); +} + +static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + red_channel_client_reset_send_data(rcc); + switch (item->type) { + case RED_PIPE_ITEM_TYPE_SET_ACK: + red_channel_client_send_set_ack(rcc); + break; + case RED_PIPE_ITEM_TYPE_MIGRATE: + red_channel_client_send_migrate(rcc); + break; + case RED_PIPE_ITEM_TYPE_EMPTY_MSG: + red_channel_client_send_empty_msg(rcc, item); + break; + case RED_PIPE_ITEM_TYPE_PING: + red_channel_client_send_ping(rcc); + break; + case RED_PIPE_ITEM_TYPE_MARKER: + break; + default: + rcc->channel->channel_cbs.send_item(rcc, item); + break; + } + red_pipe_item_unref(item); +} + +static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) +{ + if (rcc->send_data.item) { + red_pipe_item_unref(rcc->send_data.item); + rcc->send_data.item = NULL; + } +} + +static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.urgent.marshaller); + rcc->send_data.marshaller = rcc->send_data.main.marshaller; + rcc->send_data.header.data = rcc->send_data.main.header_data; + if (!rcc->is_mini_header) { + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } + rcc->send_data.item = rcc->send_data.main.item; +} + +void red_channel_client_on_out_msg_done(void *opaque) +{ + RedChannelClient *rcc = (RedChannelClient *)opaque; + int fd; + + rcc->send_data.size = 0; + + if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { + if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { + perror("sendfd"); + red_channel_client_disconnect(rcc); + if (fd != -1) + close(fd); + return; + } + if (fd != -1) + close(fd); + } + + red_channel_client_release_sent_item(rcc); + if (rcc->send_data.blocked) { + rcc->send_data.blocked = FALSE; + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ); + } + + if (red_channel_client_urgent_marshaller_is_active(rcc)) { + red_channel_client_restore_main_sender(rcc); + spice_assert(rcc->send_data.header.data != NULL); + red_channel_client_begin_send_message(rcc); + } else { + if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { + /* It is possible that the socket will become idle, so we may be able to test latency */ + red_channel_client_restart_ping_timer(rcc); + } + } + +} + +static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) +{ + rcc->pipe_size--; + ring_remove(&item->link); +} + +static void red_channel_client_set_remote_caps(RedChannelClient* rcc, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + rcc->remote_caps.num_common_caps = num_common_caps; + rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); + + rcc->remote_caps.num_caps = num_caps; + rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); +} + +static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) +{ + rcc->remote_caps.num_common_caps = 0; + free(rcc->remote_caps.common_caps); + rcc->remote_caps.num_caps = 0; + free(rcc->remote_caps.caps); +} + +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) +{ + return test_capability(rcc->remote_caps.common_caps, + rcc->remote_caps.num_common_caps, + cap); +} + +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) +{ + return test_capability(rcc->remote_caps.caps, + rcc->remote_caps.num_caps, + cap); +} + +static void red_channel_client_push_ping(RedChannelClient *rcc) +{ + spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); + rcc->latency_monitor.state = PING_STATE_WARMUP; + rcc->latency_monitor.warmup_was_sent = FALSE; + rcc->latency_monitor.id = rand(); + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); +} + +static void red_channel_client_ping_timer(void *opaque) +{ + RedChannelClient *rcc = opaque; + + spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); + red_channel_client_cancel_ping_timer(rcc); + +#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ + { + int so_unsent_size = 0; + + /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ + if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { + spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); + } + if (so_unsent_size > 0) { + /* tcp snd buffer is still occupied. rescheduling ping */ + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } else { + red_channel_client_push_ping(rcc); + } + } +#else /* ifdef HAVE_LINUX_SOCKIOS_H */ + /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ + red_channel_client_push_ping(rcc); +#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ +} + +static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) +{ + return (rcc->channel->handle_acks && + (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); +} + +/* + * When a connection is not alive (and we can't detect it via a socket error), we + * reach one of these 2 states: + * (1) Sending msgs is blocked: either writes return EAGAIN + * or we are missing MSGC_ACK from the client. + * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. + * + * The connectivity_timer callback tests if the channel's state matches one of the above. + * In case it does, on the next time the timer is called, it checks if the connection has + * been idle during the time that passed since the previous timer call. If the connection + * has been idle, we consider the client as disconnected. + */ +static void red_channel_client_connectivity_timer(void *opaque) +{ + RedChannelClient *rcc = opaque; + RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; + int is_alive = TRUE; + + if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { + if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { + if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { + spice_error("mismatch between rcc-state and connectivity-state"); + } + spice_debug("rcc is blocked; connection is idle"); + is_alive = FALSE; + } + } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { + if (monitor->in_bytes == 0) { + if (rcc->latency_monitor.state != PING_STATE_WARMUP && + rcc->latency_monitor.state != PING_STATE_LATENCY) { + spice_error("mismatch between rcc-state and connectivity-state"); + } + spice_debug("rcc waits for pong; connection is idle"); + is_alive = FALSE; + } + } + + if (is_alive) { + monitor->in_bytes = 0; + monitor->out_bytes = 0; + if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { + monitor->state = CONNECTIVITY_STATE_BLOCKED; + } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || + rcc->latency_monitor.state == PING_STATE_LATENCY) { + monitor->state = CONNECTIVITY_STATE_WAIT_PONG; + } else { + monitor->state = CONNECTIVITY_STATE_CONNECTED; + } + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } else { + monitor->state = CONNECTIVITY_STATE_DISCONNECTED; + spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", + rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); + red_channel_client_disconnect(rcc); + } +} + +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) +{ + if (!red_channel_client_is_connected(rcc)) { + return; + } + spice_debug(NULL); + spice_assert(timeout_ms > 0); + /* + * If latency_monitor is not active, we activate it in order to enable + * periodic ping messages so that we will be be able to identify a disconnected + * channel-client even if there are no ongoing channel specific messages + * on this channel. + */ + if (rcc->latency_monitor.timer == NULL) { + rcc->latency_monitor.timer = rcc->channel->core->timer_add( + rcc->channel->core, red_channel_client_ping_timer, rcc); + if (!red_client_during_migrate_at_target(rcc->client)) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + rcc->latency_monitor.roundtrip = -1; + } + if (rcc->connectivity_monitor.timer == NULL) { + rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; + rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( + rcc->channel->core, red_channel_client_connectivity_timer, rcc); + rcc->connectivity_monitor.timeout = timeout_ms; + if (!red_client_during_migrate_at_target(rcc->client)) { + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } + } +} + +static void red_channel_client_event(int fd, int event, void *data) +{ + RedChannelClient *rcc = (RedChannelClient *)data; + + red_channel_client_ref(rcc); + if (event & SPICE_WATCH_EVENT_READ) { + red_channel_client_receive(rcc); + } + if (event & SPICE_WATCH_EVENT_WRITE) { + red_channel_client_push(rcc); + } + red_channel_client_unref(rcc); +} + +static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); +} + +static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); +} + +static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); +} + +static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); +} + +static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); +} + +static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); +} + +static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); +} + +static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); +} + +static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); +} + +static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + spice_error("attempt to set header serial on mini header"); +} + +static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); +} + +static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + spice_error("attempt to set header sub list on mini header"); +} + +static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), + full_header_set_msg_type, + full_header_set_msg_size, + full_header_set_msg_serial, + full_header_set_msg_sub_list, + full_header_get_msg_type, + full_header_get_msg_size}; + +static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), + mini_header_set_msg_type, + mini_header_set_msg_size, + mini_header_set_msg_serial, + mini_header_set_msg_sub_list, + mini_header_get_msg_type, + mini_header_get_msg_size}; + +static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) +{ + if (red_client_get_channel(client, channel->type, channel->id)) { + spice_printerr("Error client %p: duplicate channel type %d id %d", + client, channel->type, channel->id); + return FALSE; + } + return TRUE; +} + +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, + RedsStream *stream, + int monitor_latency, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + RedChannelClient *rcc = NULL; + + pthread_mutex_lock(&client->lock); + if (!red_channel_client_pre_create_validate(channel, client)) { + goto error; + } + spice_assert(stream && channel && size >= sizeof(RedChannelClient)); + rcc = spice_malloc0(size); + rcc->stream = stream; + rcc->channel = channel; + rcc->client = client; + rcc->refs = 1; + rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + rcc->ack_data.client_generation = ~0; + rcc->ack_data.client_window = CLIENT_ACK_WINDOW; + rcc->send_data.main.marshaller = spice_marshaller_new(); + rcc->send_data.urgent.marshaller = spice_marshaller_new(); + + rcc->send_data.marshaller = rcc->send_data.main.marshaller; + + rcc->incoming.opaque = rcc; + rcc->incoming.cb = &channel->incoming_cb; + + rcc->outgoing.opaque = rcc; + rcc->outgoing.cb = &channel->outgoing_cb; + rcc->outgoing.pos = 0; + rcc->outgoing.size = 0; + + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + + if (!channel->channel_cbs.config_socket(rcc)) { + goto error; + } + + ring_init(&rcc->pipe); + rcc->pipe_size = 0; + + stream->watch = channel->core->watch_add(channel->core, + stream->socket, + SPICE_WATCH_EVENT_READ, + red_channel_client_event, rcc); + rcc->id = g_list_length(channel->clients); + red_channel_add_client(channel, rcc); + red_client_add_channel(client, rcc); + red_channel_ref(channel); + pthread_mutex_unlock(&client->lock); + + if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { + rcc->latency_monitor.timer = channel->core->timer_add( + channel->core, red_channel_client_ping_timer, rcc); + if (!client->during_target_migrate) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + rcc->latency_monitor.roundtrip = -1; + } + + return rcc; +error: + free(rcc); + reds_stream_free(stream); + pthread_mutex_unlock(&client->lock); + return NULL; +} + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) +{ + RedChannelClient *rcc = NULL; + + spice_assert(size >= sizeof(RedChannelClient)); + + pthread_mutex_lock(&client->lock); + if (!red_channel_client_pre_create_validate(channel, client)) { + goto error; + } + rcc = spice_malloc0(size); + rcc->refs = 1; + rcc->client = client; + rcc->channel = channel; + red_channel_ref(channel); + red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + ring_init(&rcc->pipe); + + rcc->dummy = TRUE; + rcc->dummy_connected = TRUE; + red_channel_add_client(channel, rcc); + red_client_add_channel(client, rcc); + pthread_mutex_unlock(&client->lock); + return rcc; +error: + pthread_mutex_unlock(&client->lock); + return NULL; +} + +static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) +{ + rcc->wait_migrate_data = FALSE; + + if (red_client_seamless_migration_done_for_channel(rcc->client)) { + if (rcc->latency_monitor.timer) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } + if (rcc->connectivity_monitor.timer) { + rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, + rcc->connectivity_monitor.timeout); + } + } +} + +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.timer) { + red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + } +} + +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) +{ + return rcc->wait_migrate_data; +} + +void red_channel_client_default_migrate(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.timer) { + red_channel_client_cancel_ping_timer(rcc); + rcc->channel->core->timer_remove(rcc->latency_monitor.timer); + rcc->latency_monitor.timer = NULL; + } + if (rcc->connectivity_monitor.timer) { + rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); + rcc->connectivity_monitor.timer = NULL; + } + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); +} + +void red_channel_client_ref(RedChannelClient *rcc) +{ + rcc->refs++; +} + +void red_channel_client_unref(RedChannelClient *rcc) +{ + if (--rcc->refs != 0) { + return; + } + + spice_debug("destroy rcc=%p", rcc); + + reds_stream_free(rcc->stream); + rcc->stream = NULL; + + if (rcc->send_data.main.marshaller) { + spice_marshaller_destroy(rcc->send_data.main.marshaller); + } + + if (rcc->send_data.urgent.marshaller) { + spice_marshaller_destroy(rcc->send_data.urgent.marshaller); + } + + red_channel_client_destroy_remote_caps(rcc); + if (rcc->channel) { + red_channel_unref(rcc->channel); + } + free(rcc); +} + +void red_channel_client_destroy(RedChannelClient *rcc) +{ + rcc->destroying = TRUE; + red_channel_client_disconnect(rcc); + red_client_remove_channel(rcc); + red_channel_client_unref(rcc); +} + +void red_channel_client_shutdown(RedChannelClient *rcc) +{ + if (rcc->stream && !rcc->stream->shutdown) { + rcc->channel->core->watch_remove(rcc->stream->watch); + rcc->stream->watch = NULL; + shutdown(rcc->stream->socket, SHUT_RDWR); + rcc->stream->shutdown = TRUE; + } +} + +static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) +{ + ssize_t n; + + if (!stream) { + return; + } + + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->cb->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; + } + } + + for (;;) { + handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); + n = reds_stream_writev(stream, handler->vec, handler->vec_size); + if (n == -1) { + switch (errno) { + case EAGAIN: + handler->cb->on_block(handler->opaque); + return; + case EINTR: + continue; + case EPIPE: + handler->cb->on_error(handler->opaque); + return; + default: + spice_printerr("%s", strerror(errno)); + handler->cb->on_error(handler->opaque); + return; + } + } else { + handler->pos += n; + handler->cb->on_output(handler->opaque, n); + if (handler->pos == handler->size) { // finished writing data + /* reset handler before calling on_msg_done, since it + * can trigger another call to red_peer_handle_outgoing (when + * switching from the urgent marshaller to the main one */ + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + handler->cb->on_msg_done(handler->opaque); + return; + } + } + } +} + +/* return the number of bytes read. -1 in case of error */ +static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) +{ + uint8_t *pos = buf; + while (size) { + int now; + if (stream->shutdown) { + return -1; + } + now = reds_stream_read(stream, pos, size); + if (now <= 0) { + if (now == 0) { + return -1; + } + spice_assert(now == -1); + if (errno == EAGAIN) { + break; + } else if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return -1; + } else { + spice_printerr("%s", strerror(errno)); + return -1; + } + } else { + size -= now; + pos += now; + } + } + return pos - buf; +} + +// TODO: this implementation, as opposed to the old implementation in red_worker, +// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer +// arithmetic for the case where a single cb_read could return multiple messages. But +// this is suboptimal potentially. Profile and consider fixing. +static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) +{ + int bytes_read; + uint8_t *parsed; + size_t parsed_size; + message_destructor_t parsed_free; + uint16_t msg_type; + uint32_t msg_size; + + /* XXX: This needs further investigation as to the underlying cause, it happened + * after spicec disconnect (but not with spice-gtk) repeatedly. */ + if (!stream) { + return; + } + + for (;;) { + int ret_handle; + if (handler->header_pos < handler->header.header_size) { + bytes_read = red_peer_receive(stream, + handler->header.data + handler->header_pos, + handler->header.header_size - handler->header_pos); + if (bytes_read == -1) { + handler->cb->on_error(handler->opaque); + return; + } + handler->cb->on_input(handler->opaque, bytes_read); + handler->header_pos += bytes_read; + + if (handler->header_pos != handler->header.header_size) { + return; + } + } + + msg_size = handler->header.get_msg_size(&handler->header); + msg_type = handler->header.get_msg_type(&handler->header); + if (handler->msg_pos < msg_size) { + if (!handler->msg) { + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); + if (handler->msg == NULL) { + spice_printerr("ERROR: channel refused to allocate buffer."); + handler->cb->on_error(handler->opaque); + return; + } + } + + bytes_read = red_peer_receive(stream, + handler->msg + handler->msg_pos, + msg_size - handler->msg_pos); + if (bytes_read == -1) { + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->cb->on_error(handler->opaque); + return; + } + handler->cb->on_input(handler->opaque, bytes_read); + handler->msg_pos += bytes_read; + if (handler->msg_pos != msg_size) { + return; + } + } + + if (handler->cb->parser) { + parsed = handler->cb->parser(handler->msg, + handler->msg + msg_size, msg_type, + SPICE_VERSION_MINOR, &parsed_size, &parsed_free); + if (parsed == NULL) { + spice_printerr("failed to parse message type %d", msg_type); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->cb->on_error(handler->opaque); + return; + } + ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, + msg_type, parsed); + parsed_free(parsed); + } else { + ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, + handler->msg); + } + handler->msg_pos = 0; + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); + handler->msg = NULL; + handler->header_pos = 0; + + if (!ret_handle) { + handler->cb->on_error(handler->opaque); + return; + } + } +} + +void red_channel_client_receive(RedChannelClient *rcc) +{ + red_channel_client_ref(rcc); + red_peer_handle_incoming(rcc->stream, &rcc->incoming); + red_channel_client_unref(rcc); +} + +void red_channel_client_send(RedChannelClient *rcc) +{ + red_channel_client_ref(rcc); + red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); + red_channel_client_unref(rcc); +} + +static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) +{ + RedPipeItem *item; + + if (!rcc || rcc->send_data.blocked + || red_channel_client_waiting_for_ack(rcc) + || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { + return NULL; + } + red_channel_client_pipe_remove(rcc, item); + return item; +} + +void red_channel_client_push(RedChannelClient *rcc) +{ + RedPipeItem *pipe_item; + + if (!rcc->during_send) { + rcc->during_send = TRUE; + } else { + return; + } + red_channel_client_ref(rcc); + if (rcc->send_data.blocked) { + red_channel_client_send(rcc); + } + + if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { + rcc->send_data.blocked = TRUE; + spice_printerr("ERROR: an item waiting to be sent and not blocked"); + } + + while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { + red_channel_client_send_item(rcc, pipe_item); + } + if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) + && rcc->stream->watch) { + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ); + } + rcc->during_send = FALSE; + red_channel_client_unref(rcc); +} + +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) +{ + if (rcc->latency_monitor.roundtrip < 0) { + return rcc->latency_monitor.roundtrip; + } + return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; +} + +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; + red_channel_client_push(rcc); +} + +static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) +{ + uint64_t now; + + /* ignoring unexpected pongs, or post-migration pongs for pings that + * started just before migration */ + if (ping->id != rcc->latency_monitor.id) { + spice_warning("ping-id (%u)!= pong-id %u", + rcc->latency_monitor.id, ping->id); + return; + } + + now = spice_get_monotonic_time_ns(); + + if (rcc->latency_monitor.state == PING_STATE_WARMUP) { + rcc->latency_monitor.state = PING_STATE_LATENCY; + return; + } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { + spice_warning("unexpected"); + return; + } + + /* set TCP_NODELAY=0, in case we reverted it for the test*/ + if (!rcc->latency_monitor.tcp_nodelay) { + int delay_val = 0; + + if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + if (errno != ENOTSUP) { + spice_warning("setsockopt failed, %s", strerror(errno)); + } + } + } + + /* + * The real network latency shouldn't change during the connection. However, + * the measurements can be bigger than the real roundtrip due to other + * threads or processes that are utilizing the network. We update the roundtrip + * measurement with the minimal value we encountered till now. + */ + if (rcc->latency_monitor.roundtrip < 0 || + now - ping->timestamp < rcc->latency_monitor.roundtrip) { + rcc->latency_monitor.roundtrip = now - ping->timestamp; + spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); + } + + rcc->latency_monitor.last_pong_time = now; + rcc->latency_monitor.state = PING_STATE_NONE; + red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); +} + +static void red_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + if (channel->channel_cbs.handle_migrate_flush_mark) { + channel->channel_cbs.handle_migrate_flush_mark(rcc); + } +} + +// TODO: the whole migration is broken with multiple clients. What do we want to do? +// basically just +// 1) source send mark to all +// 2) source gets at various times the data (waits for all) +// 3) source migrates to target +// 4) target sends data to all +// So need to make all the handlers work with per channel/client data (what data exactly?) +static void red_channel_client_handle_migrate_data(RedChannelClient *rcc, + uint32_t size, + void *message) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + spice_debug("channel type %d id %d rcc %p size %u", + channel->type, channel->id, rcc, size); + if (!channel->channel_cbs.handle_migrate_data) { + return; + } + if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { + spice_channel_client_error(rcc, "unexpected"); + return; + } + if (channel->channel_cbs.handle_migrate_data_get_serial) { + red_channel_client_set_message_serial(rcc, + channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); + } + if (!channel->channel_cbs.handle_migrate_data(rcc, size, message)) { + spice_channel_client_error(rcc, "handle_migrate_data failed"); + return; + } + red_channel_client_seamless_migration_done(rcc); +} + + +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, + uint16_t type, void *message) +{ + switch (type) { + case SPICE_MSGC_ACK_SYNC: + if (size != sizeof(uint32_t)) { + spice_printerr("bad message size"); + return FALSE; + } + rcc->ack_data.client_generation = *(uint32_t *)(message); + break; + case SPICE_MSGC_ACK: + if (rcc->ack_data.client_generation == rcc->ack_data.generation) { + rcc->ack_data.messages_window -= rcc->ack_data.client_window; + red_channel_client_push(rcc); + } + break; + case SPICE_MSGC_DISCONNECTING: + break; + case SPICE_MSGC_MIGRATE_FLUSH_MARK: + if (!rcc->wait_migrate_flush_mark) { + spice_error("unexpected flush mark"); + return FALSE; + } + red_channel_client_handle_migrate_flush_mark(rcc); + rcc->wait_migrate_flush_mark = FALSE; + break; + case SPICE_MSGC_MIGRATE_DATA: + red_channel_client_handle_migrate_data(rcc, size, message); + break; + case SPICE_MSGC_PONG: + red_channel_client_handle_pong(rcc, message); + break; + default: + spice_printerr("invalid message type %u", type); + return FALSE; + } + return TRUE; +} + +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + spice_assert(msg_type != 0); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); + rcc->send_data.item = item; + if (item) { + red_pipe_item_ref(item); + } +} + +void red_channel_client_begin_send_message(RedChannelClient *rcc) +{ + SpiceMarshaller *m = rcc->send_data.marshaller; + + // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) + if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { + spice_printerr("BUG: header->type == 0"); + return; + } + + /* canceling the latency test timer till the nework is idle */ + red_channel_client_cancel_ping_timer(rcc); + + spice_marshaller_flush(m); + rcc->send_data.size = spice_marshaller_get_total_size(m); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, + rcc->send_data.size - rcc->send_data.header.header_size); + rcc->ack_data.messages_window++; + rcc->send_data.last_sent_serial = rcc->send_data.serial; + rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ + red_channel_client_send(rcc); +} + +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) +{ + spice_assert(red_channel_client_no_item_being_sent(rcc)); + spice_assert(rcc->send_data.header.data != NULL); + rcc->send_data.main.header_data = rcc->send_data.header.data; + rcc->send_data.main.item = rcc->send_data.item; + + rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; + rcc->send_data.item = NULL; + red_channel_client_reset_send_data(rcc); + return rcc->send_data.marshaller; +} + +uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) +{ + return rcc->send_data.serial; +} + +void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) +{ + rcc->send_data.last_sent_serial = serial; + rcc->send_data.serial = serial; +} + +static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) +{ + spice_assert(rcc && item); + if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { + spice_debug("rcc is disconnected %p", rcc); + red_pipe_item_unref(item); + return FALSE; + } + if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { + rcc->channel->core->watch_update_mask(rcc->stream->watch, + SPICE_WATCH_EVENT_READ | + SPICE_WATCH_EVENT_WRITE); + } + rcc->pipe_size++; + ring_add(pos, &item->link); + return TRUE; +} + +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) +{ + + client_pipe_add(rcc, item, &rcc->pipe); +} + +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) +{ + red_channel_client_pipe_add(rcc, item); + red_channel_client_push(rcc); +} + +void red_channel_client_pipe_add_after(RedChannelClient *rcc, + RedPipeItem *item, + RedPipeItem *pos) +{ + spice_assert(pos); + client_pipe_add(rcc, item, &pos->link); +} + +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, + RedPipeItem *item) +{ + return ring_item_is_linked(&item->link); +} + +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, + RedPipeItem *item) +{ + client_pipe_add(rcc, item, rcc->pipe.prev); +} + +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) +{ + if (client_pipe_add(rcc, item, rcc->pipe.prev)) { + red_channel_client_push(rcc); + } +} + +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) +{ + RedPipeItem *item = spice_new(RedPipeItem, 1); + + red_pipe_item_init(item, pipe_item_type); + red_channel_client_pipe_add(rcc, item); + red_channel_client_push(rcc); +} + +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) +{ + RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); + + red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); + item->msg = msg_type; + red_channel_client_pipe_add(rcc, &item->base); + red_channel_client_push(rcc); +} + +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc) +{ + g_return_val_if_fail(rcc != NULL, TRUE); + return (rcc->pipe_size == 0) && (ring_is_empty(&rcc->pipe)); +} + +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc) +{ + return rcc->pipe_size; +} + +int red_channel_client_is_connected(RedChannelClient *rcc) +{ + if (!rcc->dummy) { + return rcc->channel + && (g_list_find(rcc->channel->clients, rcc) != NULL); + } else { + return rcc->dummy_connected; + } +} + +static void red_channel_client_clear_sent_item(RedChannelClient *rcc) +{ + red_channel_client_release_sent_item(rcc); + rcc->send_data.blocked = FALSE; + rcc->send_data.size = 0; +} + +// TODO: again - what is the context exactly? this happens in channel disconnect. but our +// current red_channel_shutdown also closes the socket - is there a socket to close? +// are we reading from an fd here? arghh +static void red_channel_client_pipe_clear(RedChannelClient *rcc) +{ + RedPipeItem *item; + + if (rcc) { + red_channel_client_clear_sent_item(rcc); + } + while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { + ring_remove(&item->link); + red_pipe_item_unref(item); + } + rcc->pipe_size = 0; +} + +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) +{ + rcc->ack_data.messages_window = 0; +} + +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) +{ + rcc->ack_data.client_window = client_window; +} + +void red_channel_client_push_set_ack(RedChannelClient *rcc) +{ + red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); +} + +static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) +{ + RedChannel *channel = red_channel_client_get_channel(rcc); + GList *link; + spice_assert(rcc->dummy); + if (channel && (link = g_list_find(channel->clients, rcc))) { + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); + red_channel_remove_client(channel, link->data); + } + rcc->dummy_connected = FALSE; +} + +void red_channel_client_disconnect(RedChannelClient *rcc) +{ + RedChannel *channel = rcc->channel; + + if (rcc->dummy) { + red_channel_client_disconnect_dummy(rcc); + return; + } + if (!red_channel_client_is_connected(rcc)) { + return; + } + spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, channel, + channel->type, channel->id); + red_channel_client_pipe_clear(rcc); + if (rcc->stream->watch) { + channel->core->watch_remove(rcc->stream->watch); + rcc->stream->watch = NULL; + } + if (rcc->latency_monitor.timer) { + channel->core->timer_remove(rcc->latency_monitor.timer); + rcc->latency_monitor.timer = NULL; + } + if (rcc->connectivity_monitor.timer) { + channel->core->timer_remove(rcc->connectivity_monitor.timer); + rcc->connectivity_monitor.timer = NULL; + } + red_channel_remove_client(channel, rcc); + channel->channel_cbs.on_disconnect(rcc); +} + +int red_channel_client_is_blocked(RedChannelClient *rcc) +{ + return rcc && rcc->send_data.blocked; +} + +int red_channel_client_send_message_pending(RedChannelClient *rcc) +{ + return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; +} + +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) +{ + return rcc->send_data.marshaller; +} + +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) +{ + return rcc->stream; +} + +RedClient *red_channel_client_get_client(RedChannelClient *rcc) +{ + return rcc->client; +} + +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) +{ + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); +} + +static void marker_pipe_item_free(RedPipeItem *base) +{ + MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base); + + if (item->item_in_pipe) { + *item->item_in_pipe = FALSE; + } + free(item); +} + +/* TODO: more evil sync stuff. anything with the word wait in it's name. */ +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, + RedPipeItem *item, + int64_t timeout) +{ + uint64_t end_time; + gboolean item_in_pipe; + + spice_info(NULL); + + if (timeout != -1) { + end_time = spice_get_monotonic_time_ns() + timeout; + } else { + end_time = UINT64_MAX; + } + + MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1); + + red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER, + marker_pipe_item_free); + item_in_pipe = TRUE; + mark_item->item_in_pipe = &item_in_pipe; + red_channel_client_pipe_add_after(rcc, &mark_item->base, item); + + if (red_channel_client_is_blocked(rcc)) { + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + } + red_channel_client_push(rcc); + + while(item_in_pipe && + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + red_channel_client_push(rcc); + } + + if (item_in_pipe) { + // still on the queue, make sure won't overwrite the stack variable + mark_item->item_in_pipe = NULL; + spice_warning("timeout"); + return FALSE; + } else { + return red_channel_client_wait_outgoing_item(rcc, + timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); + } +} + +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, + int64_t timeout) +{ + uint64_t end_time; + int blocked; + + if (!red_channel_client_is_blocked(rcc)) { + return TRUE; + } + if (timeout != -1) { + end_time = spice_get_monotonic_time_ns() + timeout; + } else { + end_time = UINT64_MAX; + } + spice_info("blocked"); + + do { + usleep(CHANNEL_BLOCKED_SLEEP_DURATION); + red_channel_client_receive(rcc); + red_channel_client_send(rcc); + } while ((blocked = red_channel_client_is_blocked(rcc)) && + (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); + + if (blocked) { + spice_warning("timeout"); + return FALSE; + } else { + spice_assert(red_channel_client_no_item_being_sent(rcc)); + return TRUE; + } +} + +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) +{ + if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { + red_channel_client_disconnect(rcc); + } else { + spice_assert(red_channel_client_no_item_being_sent(rcc)); + } +} + +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc) +{ + return !rcc || (rcc->send_data.size == 0); +} + +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, + RedPipeItem *item) +{ + red_channel_client_pipe_remove(rcc, item); + red_pipe_item_unref(item); +} + +/* client mutex should be locked before this call */ +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) +{ + gboolean ret = FALSE; + + if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { + rcc->wait_migrate_data = TRUE; + ret = TRUE; + } + spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, + rcc->wait_migrate_data); + + return ret; +} + +void red_channel_client_set_destroying(RedChannelClient *rcc) +{ + rcc->destroying = TRUE; +} + +gboolean red_channel_client_is_destroying(RedChannelClient *rcc) +{ + return rcc->destroying; +} diff --git a/server/red-channel-client.h b/server/red-channel-client.h new file mode 100644 index 0000000..bef052b --- /dev/null +++ b/server/red-channel-client.h @@ -0,0 +1,248 @@ +/* + Copyright (C) 2009-2016 Red Hat, Inc. + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_RED_CHANNEL_CLIENT +#define _H_RED_CHANNEL_CLIENT + +#include <common/marshaller.h> + +#include "red-pipe-item.h" +#include "reds-stream.h" +#include "red-channel.h" + +typedef struct RedChannel RedChannel; +typedef struct RedClient RedClient; +typedef struct IncomingHandler IncomingHandler; + +typedef struct RedChannelClient RedChannelClient; + +/* + * When an error occurs over a channel, we treat it as a warning + * for spice-server and shutdown the channel. + */ +#define spice_channel_client_error(rcc, format, ...) \ + do { \ + RedChannel *_ch = red_channel_client_get_channel(rcc); \ + spice_warning("rcc %p type %u id %u: " format, rcc, \ + _ch->type, _ch->id, ## __VA_ARGS__); \ + red_channel_client_shutdown(rcc); \ + } while (0) + +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, + RedClient *client, RedsStream *stream, + int monitor_latency, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps); + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps); + +void red_channel_client_ref(RedChannelClient *rcc); +void red_channel_client_unref(RedChannelClient *rcc); + +int red_channel_client_is_connected(RedChannelClient *rcc); +void red_channel_client_default_migrate(RedChannelClient *rcc); +int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); +void red_channel_client_destroy(RedChannelClient *rcc); +int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); +int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); +/* shutdown is the only safe thing to do out of the client/channel + * thread. It will not touch the rings, just shutdown the socket. + * It should be followed by some way to gurantee a disconnection. */ +void red_channel_client_shutdown(RedChannelClient *rcc); +/* handles general channel msgs from the client */ +int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, + uint16_t type, void *message); +/* when preparing send_data: should call init and then use marshaller */ +void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); + +uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); +void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); + +/* When sending a msg. Should first call red_channel_client_begin_send_message. + * It will first send the pending urgent data, if there is any, and then + * the rest of the data. + */ +void red_channel_client_begin_send_message(RedChannelClient *rcc); + +/* + * Stores the current send data, and switches to urgent send data. + * When it begins the actual send, it will send first the urgent data + * and afterward the rest of the data. + * Should be called only if during the marshalling of on message, + * the need to send another message, before, rises. + * Important: the serial of the non-urgent sent data, will be succeeded. + * return: the urgent send data marshaller + */ +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); + +/* returns -1 if we don't have an estimation */ +int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); + +/* Checks periodically if the connection is still alive */ +void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); + +void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); +int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_tail(RedChannelClient *rcc, RedPipeItem *item); +void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); +/* for types that use this routine -> the pipe item should be freed */ +void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); +void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); +gboolean red_channel_client_pipe_is_empty(RedChannelClient *rcc); +uint32_t red_channel_client_get_pipe_size(RedChannelClient *rcc); + +void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); +void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); +void red_channel_client_push_set_ack(RedChannelClient *rcc); + +gboolean red_channel_client_is_blocked(RedChannelClient *rcc); + +/* helper for channels that have complex logic that can possibly ready a send */ +int red_channel_client_send_message_pending(RedChannelClient *rcc); + +gboolean red_channel_client_no_item_being_sent(RedChannelClient *rcc); +void red_channel_client_push(RedChannelClient *rcc); +void red_channel_client_receive(RedChannelClient *rcc); +void red_channel_client_send(RedChannelClient *rcc); +void red_channel_client_disconnect(RedChannelClient *rcc); + +/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ +SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); +RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); +RedClient *red_channel_client_get_client(RedChannelClient *rcc); + +/* Note that the header is valid only between red_channel_reset_send_data and + * red_channel_begin_send_message.*/ +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); + +/* + * blocking functions. + * + * timeout is in nano sec. -1 for no timeout. + * + * Return: TRUE if waiting succeeded. FALSE if timeout expired. + */ + +int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, + RedPipeItem *item, + int64_t timeout); +int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, + int64_t timeout); +void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); + +RedChannel* red_channel_client_get_channel(RedChannelClient *rcc); + +void red_channel_client_on_output(void *opaque, int n); +void red_channel_client_on_input(void *opaque, int n); +int red_channel_client_get_out_msg_size(void *opaque); +void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos); +void red_channel_client_on_out_block(void *opaque); +void red_channel_client_on_out_msg_done(void *opaque); + +void red_channel_client_semi_seamless_migration_complete(RedChannelClient *rcc); +void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc); + +gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc); +void red_channel_client_set_destroying(RedChannelClient *rcc); +gboolean red_channel_client_is_destroying(RedChannelClient *rcc); + +#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) + +typedef struct OutgoingHandler { + OutgoingHandlerInterface *cb; + void *opaque; + struct iovec vec_buf[IOV_MAX]; + int vec_size; + struct iovec *vec; + int pos; + int size; +} OutgoingHandler; + +typedef struct IncomingHandler { + IncomingHandlerInterface *cb; + void *opaque; + uint8_t header_buf[MAX_HEADER_SIZE]; + SpiceDataHeaderOpaque header; + uint32_t header_pos; + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. + uint32_t msg_pos; + uint64_t serial; +} IncomingHandler; + +struct RedChannelClient { + RedChannel *channel; + RedClient *client; + RedsStream *stream; + int dummy; + int dummy_connected; + + uint32_t refs; + + struct { + uint32_t generation; + uint32_t client_generation; + uint32_t messages_window; + uint32_t client_window; + } ack_data; + + struct { + SpiceMarshaller *marshaller; + SpiceDataHeaderOpaque header; + uint32_t size; + RedPipeItem *item; + int blocked; + uint64_t serial; + uint64_t last_sent_serial; + + struct { + SpiceMarshaller *marshaller; + uint8_t *header_data; + RedPipeItem *item; + } main; + + struct { + SpiceMarshaller *marshaller; + } urgent; + } send_data; + + OutgoingHandler outgoing; + IncomingHandler incoming; + int during_send; + int id; // debugging purposes + Ring pipe; + uint32_t pipe_size; + + RedChannelCapabilities remote_caps; + int is_mini_header; + gboolean destroying; + + int wait_migrate_data; + int wait_migrate_flush_mark; + + RedChannelClientLatencyMonitor latency_monitor; + RedChannelClientConnectivityMonitor connectivity_monitor; +}; + +#endif /* _H_RED_CHANNEL_CLIENT */ diff --git a/server/red-channel.c b/server/red-channel.c index bf290b1..474fe68 100644 --- a/server/red-channel.c +++ b/server/red-channel.c @@ -22,68 +22,15 @@ #include <config.h> #endif -#include <glib.h> -#include <stdio.h> -#include <stdint.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <fcntl.h> -#include <unistd.h> -#include <errno.h> -#include <sys/ioctl.h> -#ifdef HAVE_LINUX_SOCKIOS_H -#include <linux/sockios.h> /* SIOCOUTQ */ -#endif - -#include <common/generated_server_marshallers.h> #include <common/ring.h> #include "red-channel.h" +#include "red-channel-client.h" #include "reds.h" #include "reds-stream.h" #include "main-dispatcher.h" #include "utils.h" -typedef struct RedEmptyMsgPipeItem { - RedPipeItem base; - int msg; -} RedEmptyMsgPipeItem; - -typedef struct MarkerPipeItem { - RedPipeItem base; - gboolean *item_in_pipe; -} MarkerPipeItem; - -#define PING_TEST_TIMEOUT_MS (MSEC_PER_SEC * 15) -#define PING_TEST_IDLE_NET_TIMEOUT_MS (MSEC_PER_SEC / 10) - -#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro - -enum QosPingState { - PING_STATE_NONE, - PING_STATE_TIMER, - PING_STATE_WARMUP, - PING_STATE_LATENCY, -}; - -enum ConnectivityState { - CONNECTIVITY_STATE_CONNECTED, - CONNECTIVITY_STATE_BLOCKED, - CONNECTIVITY_STATE_WAIT_PONG, - CONNECTIVITY_STATE_DISCONNECTED, -}; - -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout); -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc); -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc); - -static void red_channel_client_event(int fd, int event, void *data); -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); -static void red_client_remove_channel(RedChannelClient *rcc); -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); -static void red_channel_client_restore_main_sender(RedChannelClient *rcc); -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); - /* * Lifetime of RedChannel, RedChannelClient and RedClient: * RedChannel is created and destroyed by the calls to @@ -118,561 +65,23 @@ static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc); * If a call to red_channel_client_destroy is made from another location, it must be called * from the channel's thread. */ -static void red_channel_ref(RedChannel *channel); -static void red_channel_unref(RedChannel *channel); - -static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) -{ - return GUINT32_FROM_LE(((SpiceDataHeader *)header->data)->size); -} - -static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) -{ - return GUINT32_FROM_LE(((SpiceMiniDataHeader *)header->data)->size); -} - -static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) -{ - return GUINT16_FROM_LE(((SpiceDataHeader *)header->data)->type); -} - -static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) -{ - return GUINT16_FROM_LE(((SpiceMiniDataHeader *)header->data)->type); -} - -static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) -{ - ((SpiceDataHeader *)header->data)->type = GUINT16_TO_LE(type); -} - -static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) -{ - ((SpiceMiniDataHeader *)header->data)->type = GUINT16_TO_LE(type); -} - -static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) -{ - ((SpiceDataHeader *)header->data)->size = GUINT32_TO_LE(size); -} - -static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) -{ - ((SpiceMiniDataHeader *)header->data)->size = GUINT32_TO_LE(size); -} - -static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) -{ - ((SpiceDataHeader *)header->data)->serial = GUINT64_TO_LE(serial); -} - -static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) -{ - spice_error("attempt to set header serial on mini header"); -} - -static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) -{ - ((SpiceDataHeader *)header->data)->sub_list = GUINT32_TO_LE(sub_list); -} - -static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) -{ - spice_error("attempt to set header sub list on mini header"); -} - -static const SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), - full_header_set_msg_type, - full_header_set_msg_size, - full_header_set_msg_serial, - full_header_set_msg_sub_list, - full_header_get_msg_type, - full_header_get_msg_size}; - -static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), - mini_header_set_msg_type, - mini_header_set_msg_size, - mini_header_set_msg_serial, - mini_header_set_msg_sub_list, - mini_header_get_msg_type, - mini_header_get_msg_size}; - -/* return the number of bytes read. -1 in case of error */ -static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) -{ - uint8_t *pos = buf; - while (size) { - int now; - if (stream->shutdown) { - return -1; - } - now = reds_stream_read(stream, pos, size); - if (now <= 0) { - if (now == 0) { - return -1; - } - spice_assert(now == -1); - if (errno == EAGAIN) { - break; - } else if (errno == EINTR) { - continue; - } else if (errno == EPIPE) { - return -1; - } else { - spice_printerr("%s", strerror(errno)); - return -1; - } - } else { - size -= now; - pos += now; - } - } - return pos - buf; -} - -// TODO: this implementation, as opposed to the old implementation in red_worker, -// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer -// arithmetic for the case where a single cb_read could return multiple messages. But -// this is suboptimal potentially. Profile and consider fixing. -static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler) -{ - int bytes_read; - uint8_t *parsed; - size_t parsed_size; - message_destructor_t parsed_free; - uint16_t msg_type; - uint32_t msg_size; - - /* XXX: This needs further investigation as to the underlying cause, it happened - * after spicec disconnect (but not with spice-gtk) repeatedly. */ - if (!stream) { - return; - } - - for (;;) { - int ret_handle; - if (handler->header_pos < handler->header.header_size) { - bytes_read = red_peer_receive(stream, - handler->header.data + handler->header_pos, - handler->header.header_size - handler->header_pos); - if (bytes_read == -1) { - handler->cb->on_error(handler->opaque); - return; - } - handler->cb->on_input(handler->opaque, bytes_read); - handler->header_pos += bytes_read; - - if (handler->header_pos != handler->header.header_size) { - return; - } - } - - msg_size = handler->header.get_msg_size(&handler->header); - msg_type = handler->header.get_msg_type(&handler->header); - if (handler->msg_pos < msg_size) { - if (!handler->msg) { - handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); - if (handler->msg == NULL) { - spice_printerr("ERROR: channel refused to allocate buffer."); - handler->cb->on_error(handler->opaque); - return; - } - } - - bytes_read = red_peer_receive(stream, - handler->msg + handler->msg_pos, - msg_size - handler->msg_pos); - if (bytes_read == -1) { - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->cb->on_error(handler->opaque); - return; - } - handler->cb->on_input(handler->opaque, bytes_read); - handler->msg_pos += bytes_read; - if (handler->msg_pos != msg_size) { - return; - } - } - - if (handler->cb->parser) { - parsed = handler->cb->parser(handler->msg, - handler->msg + msg_size, msg_type, - SPICE_VERSION_MINOR, &parsed_size, &parsed_free); - if (parsed == NULL) { - spice_printerr("failed to parse message type %d", msg_type); - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->cb->on_error(handler->opaque); - return; - } - ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, - msg_type, parsed); - parsed_free(parsed); - } else { - ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, - handler->msg); - } - handler->msg_pos = 0; - handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); - handler->msg = NULL; - handler->header_pos = 0; - - if (!ret_handle) { - handler->cb->on_error(handler->opaque); - return; - } - } -} - -void red_channel_client_receive(RedChannelClient *rcc) -{ - red_channel_client_ref(rcc); - red_peer_handle_incoming(rcc->stream, &rcc->incoming); - red_channel_client_unref(rcc); -} void red_channel_receive(RedChannel *channel) { g_list_foreach(channel->clients, (GFunc)red_channel_client_receive, NULL); } -static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) -{ - ssize_t n; - - if (!stream) { - return; - } - - if (handler->size == 0) { - handler->vec = handler->vec_buf; - handler->size = handler->cb->get_msg_size(handler->opaque); - if (!handler->size) { // nothing to be sent - return; - } - } - - for (;;) { - handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); - n = reds_stream_writev(stream, handler->vec, handler->vec_size); - if (n == -1) { - switch (errno) { - case EAGAIN: - handler->cb->on_block(handler->opaque); - return; - case EINTR: - continue; - case EPIPE: - handler->cb->on_error(handler->opaque); - return; - default: - spice_printerr("%s", strerror(errno)); - handler->cb->on_error(handler->opaque); - return; - } - } else { - handler->pos += n; - handler->cb->on_output(handler->opaque, n); - if (handler->pos == handler->size) { // finished writing data - /* reset handler before calling on_msg_done, since it - * can trigger another call to red_peer_handle_outgoing (when - * switching from the urgent marshaller to the main one */ - handler->vec = handler->vec_buf; - handler->pos = 0; - handler->size = 0; - handler->cb->on_msg_done(handler->opaque); - return; - } - } - } -} - -static void red_channel_client_on_output(void *opaque, int n) -{ - RedChannelClient *rcc = opaque; - - if (rcc->connectivity_monitor.timer) { - rcc->connectivity_monitor.out_bytes += n; - } - stat_inc_counter(reds, rcc->channel->out_bytes_counter, n); -} - -static void red_channel_client_on_input(void *opaque, int n) -{ - RedChannelClient *rcc = opaque; - - if (rcc->connectivity_monitor.timer) { - rcc->connectivity_monitor.in_bytes += n; - } -} - static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { red_channel_client_disconnect(rcc); } -static int red_channel_client_get_out_msg_size(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - return rcc->send_data.size; -} - -static void red_channel_client_prepare_out_msg(void *opaque, struct iovec *vec, - int *vec_size, int pos) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - *vec_size = spice_marshaller_fill_iovec(rcc->send_data.marshaller, - vec, IOV_MAX, pos); -} - -static void red_channel_client_on_out_block(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - - rcc->send_data.blocked = TRUE; - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ | - SPICE_WATCH_EVENT_WRITE); -} - -static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) -{ - return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); -} - -static void red_channel_client_reset_send_data(RedChannelClient *rcc) -{ - spice_marshaller_reset(rcc->send_data.marshaller); - rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, - rcc->send_data.header.header_size); - spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); - rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); - rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); - - /* Keeping the serial consecutive: resetting it if reset_send_data - * has been called before, but no message has been sent since then. - */ - if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { - spice_assert(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); - /* When the urgent marshaller is active, the serial was incremented by - * the call to reset_send_data that was made for the main marshaller. - * The urgent msg receives this serial, and the main msg serial is - * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) - * should be 1 in this case*/ - if (!red_channel_client_urgent_marshaller_is_active(rcc)) { - rcc->send_data.serial = rcc->send_data.last_sent_serial; - } - } - rcc->send_data.serial++; - - if (!rcc->is_mini_header) { - spice_assert(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); - } -} - -void red_channel_client_push_set_ack(RedChannelClient *rcc) -{ - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_SET_ACK); -} - -static void red_channel_client_send_set_ack(RedChannelClient *rcc) -{ - SpiceMsgSetAck ack; - - spice_assert(rcc); - red_channel_client_init_send_data(rcc, SPICE_MSG_SET_ACK, NULL); - ack.generation = ++rcc->ack_data.generation; - ack.window = rcc->ack_data.client_window; - rcc->ack_data.messages_window = 0; - - spice_marshall_msg_set_ack(rcc->send_data.marshaller, &ack); - - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_migrate(RedChannelClient *rcc) -{ - SpiceMsgMigrate migrate; - - red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, NULL); - migrate.flags = rcc->channel->migration_flags; - spice_marshall_msg_migrate(rcc->send_data.marshaller, &migrate); - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_FLUSH) { - rcc->wait_migrate_flush_mark = TRUE; - } - - red_channel_client_begin_send_message(rcc); -} - - -static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) -{ - RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base); - - red_channel_client_init_send_data(rcc, msg_pipe_item->msg, NULL); - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_ping(RedChannelClient *rcc) -{ - SpiceMsgPing ping; - - if (!rcc->latency_monitor.warmup_was_sent) { // latency test start - int delay_val; - socklen_t opt_size = sizeof(delay_val); - - rcc->latency_monitor.warmup_was_sent = TRUE; - /* - * When testing latency, TCP_NODELAY must be switched on, otherwise, - * sending the ping message is delayed by Nagle algorithm, and the - * roundtrip measurement is less accurate (bigger). - */ - rcc->latency_monitor.tcp_nodelay = 1; - if (getsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - &opt_size) == -1) { - spice_warning("getsockopt failed, %s", strerror(errno)); - } else { - rcc->latency_monitor.tcp_nodelay = delay_val; - if (!delay_val) { - delay_val = 1; - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - sizeof(delay_val)) == -1) { - if (errno != ENOTSUP) { - spice_warning("setsockopt failed, %s", strerror(errno)); - } - } - } - } - } - - red_channel_client_init_send_data(rcc, SPICE_MSG_PING, NULL); - ping.id = rcc->latency_monitor.id; - ping.timestamp = spice_get_monotonic_time_ns(); - spice_marshall_msg_ping(rcc->send_data.marshaller, &ping); - red_channel_client_begin_send_message(rcc); -} - -static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) -{ - spice_assert(red_channel_client_no_item_being_sent(rcc)); - red_channel_client_reset_send_data(rcc); - switch (item->type) { - case RED_PIPE_ITEM_TYPE_SET_ACK: - red_channel_client_send_set_ack(rcc); - break; - case RED_PIPE_ITEM_TYPE_MIGRATE: - red_channel_client_send_migrate(rcc); - break; - case RED_PIPE_ITEM_TYPE_EMPTY_MSG: - red_channel_client_send_empty_msg(rcc, item); - break; - case RED_PIPE_ITEM_TYPE_PING: - red_channel_client_send_ping(rcc); - break; - case RED_PIPE_ITEM_TYPE_MARKER: - break; - default: - rcc->channel->channel_cbs.send_item(rcc, item); - break; - } - red_pipe_item_unref(item); -} - -static inline void red_channel_client_release_sent_item(RedChannelClient *rcc) -{ - if (rcc->send_data.item) { - red_pipe_item_unref(rcc->send_data.item); - rcc->send_data.item = NULL; - } -} - -static void red_channel_client_on_out_msg_done(void *opaque) -{ - RedChannelClient *rcc = (RedChannelClient *)opaque; - int fd; - - rcc->send_data.size = 0; - - if (spice_marshaller_get_fd(rcc->send_data.marshaller, &fd)) { - if (reds_stream_send_msgfd(rcc->stream, fd) < 0) { - perror("sendfd"); - red_channel_client_disconnect(rcc); - if (fd != -1) - close(fd); - return; - } - if (fd != -1) - close(fd); - } - - red_channel_client_release_sent_item(rcc); - if (rcc->send_data.blocked) { - rcc->send_data.blocked = FALSE; - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ); - } - - if (red_channel_client_urgent_marshaller_is_active(rcc)) { - red_channel_client_restore_main_sender(rcc); - spice_assert(rcc->send_data.header.data != NULL); - red_channel_client_begin_send_message(rcc); - } else { - if (rcc->latency_monitor.timer && !rcc->send_data.blocked && rcc->pipe_size == 0) { - /* It is possible that the socket will become idle, so we may be able to test latency */ - red_channel_client_restart_ping_timer(rcc); - } - } - -} - -static void red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) -{ - rcc->pipe_size--; - ring_remove(&item->link); -} - -static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) { spice_assert(rcc); channel->clients = g_list_prepend(channel->clients, rcc); } -static void red_channel_client_set_remote_caps(RedChannelClient* rcc, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - rcc->remote_caps.num_common_caps = num_common_caps; - rcc->remote_caps.common_caps = spice_memdup(common_caps, num_common_caps * sizeof(uint32_t)); - - rcc->remote_caps.num_caps = num_caps; - rcc->remote_caps.caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); -} - -static void red_channel_client_destroy_remote_caps(RedChannelClient* rcc) -{ - rcc->remote_caps.num_common_caps = 0; - free(rcc->remote_caps.common_caps); - rcc->remote_caps.num_caps = 0; - free(rcc->remote_caps.caps); -} - -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap) -{ - return test_capability(rcc->remote_caps.common_caps, - rcc->remote_caps.num_common_caps, - cap); -} - -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap) -{ - return test_capability(rcc->remote_caps.caps, - rcc->remote_caps.num_caps, - cap); -} - int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap) { GList *link, *next; @@ -699,230 +108,8 @@ int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap) return TRUE; } -static int red_channel_client_pre_create_validate(RedChannel *channel, RedClient *client) -{ - if (red_client_get_channel(client, channel->type, channel->id)) { - spice_printerr("Error client %p: duplicate channel type %d id %d", - client, channel->type, channel->id); - return FALSE; - } - return TRUE; -} - -static void red_channel_client_push_ping(RedChannelClient *rcc) -{ - spice_assert(rcc->latency_monitor.state == PING_STATE_NONE); - rcc->latency_monitor.state = PING_STATE_WARMUP; - rcc->latency_monitor.warmup_was_sent = FALSE; - rcc->latency_monitor.id = rand(); - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_PING); -} - -static void red_channel_client_ping_timer(void *opaque) -{ - RedChannelClient *rcc = opaque; - - spice_assert(rcc->latency_monitor.state == PING_STATE_TIMER); - red_channel_client_cancel_ping_timer(rcc); - -#ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ - { - int so_unsent_size = 0; - - /* retrieving the occupied size of the socket's tcp snd buffer (unacked + unsent) */ - if (ioctl(rcc->stream->socket, SIOCOUTQ, &so_unsent_size) == -1) { - spice_printerr("ioctl(SIOCOUTQ) failed, %s", strerror(errno)); - } - if (so_unsent_size > 0) { - /* tcp snd buffer is still occupied. rescheduling ping */ - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } else { - red_channel_client_push_ping(rcc); - } - } -#else /* ifdef HAVE_LINUX_SOCKIOS_H */ - /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ - red_channel_client_push_ping(rcc); -#endif /* ifdef HAVE_LINUX_SOCKIOS_H */ -} - -/* - * When a connection is not alive (and we can't detect it via a socket error), we - * reach one of these 2 states: - * (1) Sending msgs is blocked: either writes return EAGAIN - * or we are missing MSGC_ACK from the client. - * (2) MSG_PING was sent without receiving a MSGC_PONG in reply. - * - * The connectivity_timer callback tests if the channel's state matches one of the above. - * In case it does, on the next time the timer is called, it checks if the connection has - * been idle during the time that passed since the previous timer call. If the connection - * has been idle, we consider the client as disconnected. - */ -static void red_channel_client_connectivity_timer(void *opaque) -{ - RedChannelClient *rcc = opaque; - RedChannelClientConnectivityMonitor *monitor = &rcc->connectivity_monitor; - int is_alive = TRUE; - - if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { - if (monitor->in_bytes == 0 && monitor->out_bytes == 0) { - if (!rcc->send_data.blocked && !red_channel_client_waiting_for_ack(rcc)) { - spice_error("mismatch between rcc-state and connectivity-state"); - } - spice_debug("rcc is blocked; connection is idle"); - is_alive = FALSE; - } - } else if (monitor->state == CONNECTIVITY_STATE_WAIT_PONG) { - if (monitor->in_bytes == 0) { - if (rcc->latency_monitor.state != PING_STATE_WARMUP && - rcc->latency_monitor.state != PING_STATE_LATENCY) { - spice_error("mismatch between rcc-state and connectivity-state"); - } - spice_debug("rcc waits for pong; connection is idle"); - is_alive = FALSE; - } - } - - if (is_alive) { - monitor->in_bytes = 0; - monitor->out_bytes = 0; - if (rcc->send_data.blocked || red_channel_client_waiting_for_ack(rcc)) { - monitor->state = CONNECTIVITY_STATE_BLOCKED; - } else if (rcc->latency_monitor.state == PING_STATE_WARMUP || - rcc->latency_monitor.state == PING_STATE_LATENCY) { - monitor->state = CONNECTIVITY_STATE_WAIT_PONG; - } else { - monitor->state = CONNECTIVITY_STATE_CONNECTED; - } - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } else { - monitor->state = CONNECTIVITY_STATE_DISCONNECTED; - spice_warning("rcc %p on channel %d:%d has been unresponsive for more than %u ms, disconnecting", - rcc, rcc->channel->type, rcc->channel->id, monitor->timeout); - red_channel_client_disconnect(rcc); - } -} - -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms) -{ - if (!red_channel_client_is_connected(rcc)) { - return; - } - spice_debug(NULL); - spice_assert(timeout_ms > 0); - /* - * If latency_monitor is not active, we activate it in order to enable - * periodic ping messages so that we will be be able to identify a disconnected - * channel-client even if there are no ongoing channel specific messages - * on this channel. - */ - if (rcc->latency_monitor.timer == NULL) { - rcc->latency_monitor.timer = rcc->channel->core->timer_add( - rcc->channel->core, red_channel_client_ping_timer, rcc); - if (!red_client_during_migrate_at_target(rcc->client)) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - rcc->latency_monitor.roundtrip = -1; - } - if (rcc->connectivity_monitor.timer == NULL) { - rcc->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; - rcc->connectivity_monitor.timer = rcc->channel->core->timer_add( - rcc->channel->core, red_channel_client_connectivity_timer, rcc); - rcc->connectivity_monitor.timeout = timeout_ms; - if (!red_client_during_migrate_at_target(rcc->client)) { - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } - } -} - -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, - RedsStream *stream, - int monitor_latency, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - RedChannelClient *rcc = NULL; - - pthread_mutex_lock(&client->lock); - if (!red_channel_client_pre_create_validate(channel, client)) { - goto error; - } - spice_assert(stream && channel && size >= sizeof(RedChannelClient)); - rcc = spice_malloc0(size); - rcc->stream = stream; - rcc->channel = channel; - rcc->client = client; - rcc->refs = 1; - rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + - // block flags) - rcc->ack_data.client_generation = ~0; - rcc->ack_data.client_window = CLIENT_ACK_WINDOW; - rcc->send_data.main.marshaller = spice_marshaller_new(); - rcc->send_data.urgent.marshaller = spice_marshaller_new(); - - rcc->send_data.marshaller = rcc->send_data.main.marshaller; - - rcc->incoming.opaque = rcc; - rcc->incoming.cb = &channel->incoming_cb; - - rcc->outgoing.opaque = rcc; - rcc->outgoing.cb = &channel->outgoing_cb; - rcc->outgoing.pos = 0; - rcc->outgoing.size = 0; - - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { - rcc->incoming.header = mini_header_wrapper; - rcc->send_data.header = mini_header_wrapper; - rcc->is_mini_header = TRUE; - } else { - rcc->incoming.header = full_header_wrapper; - rcc->send_data.header = full_header_wrapper; - rcc->is_mini_header = FALSE; - } - - rcc->incoming.header.data = rcc->incoming.header_buf; - rcc->incoming.serial = 1; - - if (!channel->channel_cbs.config_socket(rcc)) { - goto error; - } - - ring_init(&rcc->pipe); - rcc->pipe_size = 0; - - stream->watch = channel->core->watch_add(channel->core, - stream->socket, - SPICE_WATCH_EVENT_READ, - red_channel_client_event, rcc); - rcc->id = g_list_length(channel->clients); - red_channel_add_client(channel, rcc); - red_client_add_channel(client, rcc); - red_channel_ref(channel); - pthread_mutex_unlock(&client->lock); - - if (monitor_latency && reds_stream_get_family(stream) != AF_UNIX) { - rcc->latency_monitor.timer = channel->core->timer_add( - channel->core, red_channel_client_ping_timer, rcc); - if (!client->during_target_migrate) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - rcc->latency_monitor.roundtrip = -1; - } - - return rcc; -error: - free(rcc); - reds_stream_free(stream); - pthread_mutex_unlock(&client->lock); - return NULL; -} - /* returns TRUE If all channels are finished migrating, FALSE otherwise */ -static gboolean red_client_seamless_migration_done_for_channel(RedClient *client) +gboolean red_client_seamless_migration_done_for_channel(RedClient *client) { gboolean ret = FALSE; @@ -944,26 +131,6 @@ static gboolean red_client_seamless_migration_done_for_channel(RedClient *client return ret; } -static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) -{ - rcc->wait_migrate_data = FALSE; - - if (red_client_seamless_migration_done_for_channel(rcc->client)) { - if (rcc->latency_monitor.timer) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_start(rcc->connectivity_monitor.timer, - rcc->connectivity_monitor.timeout); - } - } -} - -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc) -{ - return rcc->wait_migrate_data; -} - int red_channel_is_waiting_for_migrate_data(RedChannel *channel) { RedChannelClient *rcc; @@ -995,20 +162,6 @@ static void red_channel_client_default_disconnect(RedChannelClient *base) red_channel_client_disconnect(base); } -void red_channel_client_default_migrate(RedChannelClient *rcc) -{ - if (rcc->latency_monitor.timer) { - red_channel_client_cancel_ping_timer(rcc); - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); - rcc->latency_monitor.timer = NULL; - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); - rcc->connectivity_monitor.timer = NULL; - } - red_channel_client_pipe_add_type(rcc, RED_PIPE_ITEM_TYPE_MIGRATE); -} - RedChannel *red_channel_create(int size, RedsState *reds, const SpiceCoreInterfaceInternal *core, @@ -1121,575 +274,139 @@ RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, u static int do_nothing_handle_message(RedChannelClient *rcc, uint16_t type, uint32_t size, - uint8_t *msg) -{ - return TRUE; -} - -RedChannel *red_channel_create_parser(int size, - RedsState *reds, - const SpiceCoreInterfaceInternal *core, - uint32_t type, uint32_t id, - int handle_acks, - spice_parse_channel_func_t parser, - channel_handle_parsed_proc handle_parsed, - const ChannelCbs *channel_cbs, - uint32_t migration_flags) -{ - RedChannel *channel = red_channel_create(size, reds, core, type, id, - handle_acks, - do_nothing_handle_message, - channel_cbs, - migration_flags); - - if (channel == NULL) { - return NULL; - } - channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; - channel->incoming_cb.parser = parser; - - return channel; -} - -void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) -{ - spice_return_if_fail(channel != NULL); - spice_return_if_fail(channel->stat == 0); - -#ifdef RED_STATISTICS - channel->stat = stat; - channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); -#endif -} - -void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) -{ - spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); - channel->client_cbs.connect = client_cbs->connect; - - if (client_cbs->disconnect) { - channel->client_cbs.disconnect = client_cbs->disconnect; - } - - if (client_cbs->migrate) { - channel->client_cbs.migrate = client_cbs->migrate; - } - channel->data = cbs_data; -} - -int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) -{ - uint32_t index = cap / 32; - if (num_caps < index + 1) { - return FALSE; - } - - return (caps[index] & (1 << (cap % 32))) != 0; -} - -static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) -{ - int nbefore, n; - - nbefore = *num_caps; - n = cap / 32; - *num_caps = MAX(*num_caps, n + 1); - *caps = spice_renew(uint32_t, *caps, *num_caps); - memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); - (*caps)[n] |= (1 << (cap % 32)); -} - -void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) -{ - add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); -} - -void red_channel_set_cap(RedChannel *channel, uint32_t cap) -{ - add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); -} - -static void red_channel_ref(RedChannel *channel) -{ - channel->refs++; -} - -static void red_channel_unref(RedChannel *channel) -{ - if (!--channel->refs) { - if (channel->local_caps.num_common_caps) { - free(channel->local_caps.common_caps); - } - - if (channel->local_caps.num_caps) { - free(channel->local_caps.caps); - } - - free(channel); - } -} - -void red_channel_client_ref(RedChannelClient *rcc) -{ - rcc->refs++; -} - -void red_channel_client_unref(RedChannelClient *rcc) -{ - if (!--rcc->refs) { - spice_debug("destroy rcc=%p", rcc); - - reds_stream_free(rcc->stream); - rcc->stream = NULL; - - if (rcc->send_data.main.marshaller) { - spice_marshaller_destroy(rcc->send_data.main.marshaller); - } - - if (rcc->send_data.urgent.marshaller) { - spice_marshaller_destroy(rcc->send_data.urgent.marshaller); - } - - red_channel_client_destroy_remote_caps(rcc); - if (rcc->channel) { - red_channel_unref(rcc->channel); - } - free(rcc); - } -} - -void red_channel_client_destroy(RedChannelClient *rcc) -{ - rcc->destroying = 1; - red_channel_client_disconnect(rcc); - red_client_remove_channel(rcc); - red_channel_client_unref(rcc); -} - -void red_channel_destroy(RedChannel *channel) -{ - if (!channel) { - return; - } - - g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); - red_channel_unref(channel); -} - -void red_channel_client_shutdown(RedChannelClient *rcc) -{ - if (rcc->stream && !rcc->stream->shutdown) { - rcc->channel->core->watch_remove(rcc->stream->watch); - rcc->stream->watch = NULL; - shutdown(rcc->stream->socket, SHUT_RDWR); - rcc->stream->shutdown = TRUE; - } -} - -void red_channel_client_send(RedChannelClient *rcc) -{ - red_channel_client_ref(rcc); - red_peer_handle_outgoing(rcc->stream, &rcc->outgoing); - red_channel_client_unref(rcc); -} - -void red_channel_send(RedChannel *channel) -{ - g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); -} - -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) -{ - return (rcc->channel->handle_acks && - (rcc->ack_data.messages_window > rcc->ack_data.client_window * 2)); -} - -static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) -{ - RedPipeItem *item; - - if (!rcc || rcc->send_data.blocked - || red_channel_client_waiting_for_ack(rcc) - || !(item = (RedPipeItem *)ring_get_tail(&rcc->pipe))) { - return NULL; - } - red_channel_client_pipe_remove(rcc, item); - return item; -} - -void red_channel_client_push(RedChannelClient *rcc) -{ - RedPipeItem *pipe_item; - - if (!rcc->during_send) { - rcc->during_send = TRUE; - } else { - return; - } - red_channel_client_ref(rcc); - if (rcc->send_data.blocked) { - red_channel_client_send(rcc); - } - - if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) { - rcc->send_data.blocked = TRUE; - spice_printerr("ERROR: an item waiting to be sent and not blocked"); - } - - while ((pipe_item = red_channel_client_pipe_item_get(rcc))) { - red_channel_client_send_item(rcc, pipe_item); - } - if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe) - && rcc->stream->watch) { - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ); - } - rcc->during_send = FALSE; - red_channel_client_unref(rcc); -} - -void red_channel_push(RedChannel *channel) -{ - if (!channel) { - return; - } - - g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); -} - -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc) -{ - if (rcc->latency_monitor.roundtrip < 0) { - return rcc->latency_monitor.roundtrip; - } - return rcc->latency_monitor.roundtrip / NSEC_PER_MILLISEC; -} - -static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc) -{ - rcc->ack_data.messages_window = 0; - red_channel_client_push(rcc); -} - -// TODO: this function doesn't make sense because the window should be client (WAN/LAN) -// specific -void red_channel_init_outgoing_messages_window(RedChannel *channel) -{ - g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); -} - -static void red_channel_handle_migrate_flush_mark(RedChannelClient *rcc) -{ - if (rcc->channel->channel_cbs.handle_migrate_flush_mark) { - rcc->channel->channel_cbs.handle_migrate_flush_mark(rcc); - } -} - -// TODO: the whole migration is broken with multiple clients. What do we want to do? -// basically just -// 1) source send mark to all -// 2) source gets at various times the data (waits for all) -// 3) source migrates to target -// 4) target sends data to all -// So need to make all the handlers work with per channel/client data (what data exactly?) -static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size, void *message) -{ - spice_debug("channel type %d id %d rcc %p size %u", - rcc->channel->type, rcc->channel->id, rcc, size); - if (!rcc->channel->channel_cbs.handle_migrate_data) { - return; - } - if (!red_channel_client_is_waiting_for_migrate_data(rcc)) { - spice_channel_client_error(rcc, "unexpected"); - return; - } - if (rcc->channel->channel_cbs.handle_migrate_data_get_serial) { - red_channel_client_set_message_serial(rcc, - rcc->channel->channel_cbs.handle_migrate_data_get_serial(rcc, size, message)); - } - if (!rcc->channel->channel_cbs.handle_migrate_data(rcc, size, message)) { - spice_channel_client_error(rcc, "handle_migrate_data failed"); - return; - } - red_channel_client_seamless_migration_done(rcc); -} - -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) -{ - uint64_t passed, timeout; - - passed = (spice_get_monotonic_time_ns() - rcc->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; - timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; - if (passed < PING_TEST_TIMEOUT_MS) { - timeout += PING_TEST_TIMEOUT_MS - passed; - } - - red_channel_client_start_ping_timer(rcc, timeout); -} - -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) -{ - if (!rcc->latency_monitor.timer) { - return; - } - if (rcc->latency_monitor.state != PING_STATE_NONE) { - return; - } - rcc->latency_monitor.state = PING_STATE_TIMER; - rcc->channel->core->timer_start(rcc->latency_monitor.timer, timeout); -} - -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) -{ - if (!rcc->latency_monitor.timer) { - return; - } - if (rcc->latency_monitor.state != PING_STATE_TIMER) { - return; - } - - rcc->channel->core->timer_cancel(rcc->latency_monitor.timer); - rcc->latency_monitor.state = PING_STATE_NONE; -} - -static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) -{ - uint64_t now; - - /* ignoring unexpected pongs, or post-migration pongs for pings that - * started just before migration */ - if (ping->id != rcc->latency_monitor.id) { - spice_warning("ping-id (%u)!= pong-id %u", - rcc->latency_monitor.id, ping->id); - return; - } - - now = spice_get_monotonic_time_ns(); - - if (rcc->latency_monitor.state == PING_STATE_WARMUP) { - rcc->latency_monitor.state = PING_STATE_LATENCY; - return; - } else if (rcc->latency_monitor.state != PING_STATE_LATENCY) { - spice_warning("unexpected"); - return; - } - - /* set TCP_NODELAY=0, in case we reverted it for the test*/ - if (!rcc->latency_monitor.tcp_nodelay) { - int delay_val = 0; - - if (setsockopt(rcc->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, - sizeof(delay_val)) == -1) { - if (errno != ENOTSUP) { - spice_warning("setsockopt failed, %s", strerror(errno)); - } - } - } - - /* - * The real network latency shouldn't change during the connection. However, - * the measurements can be bigger than the real roundtrip due to other - * threads or processes that are utilizing the network. We update the roundtrip - * measurement with the minimal value we encountered till now. - */ - if (rcc->latency_monitor.roundtrip < 0 || - now - ping->timestamp < rcc->latency_monitor.roundtrip) { - rcc->latency_monitor.roundtrip = now - ping->timestamp; - spice_debug("update roundtrip %.2f(ms)", ((double)rcc->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); - } - - rcc->latency_monitor.last_pong_time = now; - rcc->latency_monitor.state = PING_STATE_NONE; - red_channel_client_start_ping_timer(rcc, PING_TEST_TIMEOUT_MS); -} - -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message) + uint8_t *msg) { - switch (type) { - case SPICE_MSGC_ACK_SYNC: - if (size != sizeof(uint32_t)) { - spice_printerr("bad message size"); - return FALSE; - } - rcc->ack_data.client_generation = *(uint32_t *)(message); - break; - case SPICE_MSGC_ACK: - if (rcc->ack_data.client_generation == rcc->ack_data.generation) { - rcc->ack_data.messages_window -= rcc->ack_data.client_window; - red_channel_client_push(rcc); - } - break; - case SPICE_MSGC_DISCONNECTING: - break; - case SPICE_MSGC_MIGRATE_FLUSH_MARK: - if (!rcc->wait_migrate_flush_mark) { - spice_error("unexpected flush mark"); - return FALSE; - } - red_channel_handle_migrate_flush_mark(rcc); - rcc->wait_migrate_flush_mark = FALSE; - break; - case SPICE_MSGC_MIGRATE_DATA: - red_channel_handle_migrate_data(rcc, size, message); - break; - case SPICE_MSGC_PONG: - red_channel_client_handle_pong(rcc, message); - break; - default: - spice_printerr("invalid message type %u", type); - return FALSE; - } return TRUE; } -static void red_channel_client_event(int fd, int event, void *data) +RedChannel *red_channel_create_parser(int size, + RedsState *reds, + const SpiceCoreInterfaceInternal *core, + uint32_t type, uint32_t id, + int handle_acks, + spice_parse_channel_func_t parser, + channel_handle_parsed_proc handle_parsed, + const ChannelCbs *channel_cbs, + uint32_t migration_flags) { - RedChannelClient *rcc = (RedChannelClient *)data; + RedChannel *channel = red_channel_create(size, reds, core, type, id, + handle_acks, + do_nothing_handle_message, + channel_cbs, + migration_flags); + channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; + channel->incoming_cb.parser = parser; - red_channel_client_ref(rcc); - if (event & SPICE_WATCH_EVENT_READ) { - red_channel_client_receive(rcc); - } - if (event & SPICE_WATCH_EVENT_WRITE) { - red_channel_client_push(rcc); - } - red_channel_client_unref(rcc); + return channel; } -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item) +void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat) { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - spice_assert(msg_type != 0); - rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); - rcc->send_data.item = item; - if (item) { - red_pipe_item_ref(item); - } + spice_return_if_fail(channel != NULL); + spice_return_if_fail(channel->stat == 0); + +#ifdef RED_STATISTICS + channel->stat = stat; + channel->out_bytes_counter = stat_add_counter(channel->reds, stat, "out_bytes", TRUE); +#endif } -void red_channel_client_begin_send_message(RedChannelClient *rcc) +void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data) { - SpiceMarshaller *m = rcc->send_data.marshaller; + spice_assert(client_cbs->connect || channel->type == SPICE_CHANNEL_MAIN); + channel->client_cbs.connect = client_cbs->connect; - // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) - if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { - spice_printerr("BUG: header->type == 0"); - return; + if (client_cbs->disconnect) { + channel->client_cbs.disconnect = client_cbs->disconnect; } - /* canceling the latency test timer till the nework is idle */ - red_channel_client_cancel_ping_timer(rcc); - - spice_marshaller_flush(m); - rcc->send_data.size = spice_marshaller_get_total_size(m); - rcc->send_data.header.set_msg_size(&rcc->send_data.header, - rcc->send_data.size - rcc->send_data.header.header_size); - rcc->ack_data.messages_window++; - rcc->send_data.last_sent_serial = rcc->send_data.serial; - rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ - red_channel_client_send(rcc); + if (client_cbs->migrate) { + channel->client_cbs.migrate = client_cbs->migrate; + } + channel->data = cbs_data; } -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) +int test_capability(const uint32_t *caps, int num_caps, uint32_t cap) { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - spice_assert(rcc->send_data.header.data != NULL); - rcc->send_data.main.header_data = rcc->send_data.header.data; - rcc->send_data.main.item = rcc->send_data.item; - - rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; - rcc->send_data.item = NULL; - red_channel_client_reset_send_data(rcc); - return rcc->send_data.marshaller; + uint32_t index = cap / 32; + if (num_caps < index + 1) { + return FALSE; + } + + return (caps[index] & (1 << (cap % 32))) != 0; } -static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +static void add_capability(uint32_t **caps, int *num_caps, uint32_t cap) { - spice_marshaller_reset(rcc->send_data.urgent.marshaller); - rcc->send_data.marshaller = rcc->send_data.main.marshaller; - rcc->send_data.header.data = rcc->send_data.main.header_data; - if (!rcc->is_mini_header) { - rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); - } - rcc->send_data.item = rcc->send_data.main.item; + int nbefore, n; + + nbefore = *num_caps; + n = cap / 32; + *num_caps = MAX(*num_caps, n + 1); + *caps = spice_renew(uint32_t, *caps, *num_caps); + memset(*caps + nbefore, 0, (*num_caps - nbefore) * sizeof(uint32_t)); + (*caps)[n] |= (1 << (cap % 32)); } -uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) +void red_channel_set_common_cap(RedChannel *channel, uint32_t cap) { - return rcc->send_data.serial; + add_capability(&channel->local_caps.common_caps, &channel->local_caps.num_common_caps, cap); } -void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) +void red_channel_set_cap(RedChannel *channel, uint32_t cap) { - rcc->send_data.last_sent_serial = serial; - rcc->send_data.serial = serial; + add_capability(&channel->local_caps.caps, &channel->local_caps.num_caps, cap); } -static inline gboolean client_pipe_add(RedChannelClient *rcc, RedPipeItem *item, RingItem *pos) +void red_channel_ref(RedChannel *channel) { - spice_assert(rcc && item); - if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) { - spice_debug("rcc is disconnected %p", rcc); - red_pipe_item_unref(item); - return FALSE; - } - if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) { - rcc->channel->core->watch_update_mask(rcc->stream->watch, - SPICE_WATCH_EVENT_READ | - SPICE_WATCH_EVENT_WRITE); - } - rcc->pipe_size++; - ring_add(pos, &item->link); - return TRUE; + channel->refs++; } -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item) +void red_channel_unref(RedChannel *channel) { + if (--channel->refs == 0) { + if (channel->local_caps.num_common_caps) { + free(channel->local_caps.common_caps); + } - client_pipe_add(rcc, item, &rcc->pipe); -} + if (channel->local_caps.num_caps) { + free(channel->local_caps.caps); + } -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item) -{ - red_channel_client_pipe_add(rcc, item); - red_channel_client_push(rcc); + free(channel); + } } -void red_channel_client_pipe_add_after(RedChannelClient *rcc, - RedPipeItem *item, - RedPipeItem *pos) +void red_channel_destroy(RedChannel *channel) { - spice_assert(pos); - client_pipe_add(rcc, item, &pos->link); -} + if (!channel) { + return; + } -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, - RedPipeItem *item) -{ - return ring_item_is_linked(&item->link); + g_list_foreach(channel->clients, (GFunc)red_channel_client_destroy, NULL); + red_channel_unref(channel); } -static void red_channel_client_pipe_add_tail(RedChannelClient *rcc, - RedPipeItem *item) +void red_channel_send(RedChannel *channel) { - client_pipe_add(rcc, item, rcc->pipe.prev); + g_list_foreach(channel->clients, (GFunc)red_channel_client_send, NULL); } -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item) +void red_channel_push(RedChannel *channel) { - if (client_pipe_add(rcc, item, rcc->pipe.prev)) { - red_channel_client_push(rcc); + if (!channel) { + return; } + + g_list_foreach(channel->clients, (GFunc)red_channel_client_push, NULL); } -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type) +// TODO: this function doesn't make sense because the window should be client (WAN/LAN) +// specific +void red_channel_init_outgoing_messages_window(RedChannel *channel) { - RedPipeItem *item = spice_new(RedPipeItem, 1); - - red_pipe_item_init(item, pipe_item_type); - red_channel_client_pipe_add(rcc, item); - red_channel_client_push(rcc); + g_list_foreach(channel->clients, (GFunc)red_channel_client_init_outgoing_messages_window, NULL); } static void red_channel_client_pipe_add_type_proxy(gpointer data, gpointer user_data) @@ -1704,16 +421,6 @@ void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type) GINT_TO_POINTER(pipe_item_type)); } -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type) -{ - RedEmptyMsgPipeItem *item = spice_new(RedEmptyMsgPipeItem, 1); - - red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_EMPTY_MSG); - item->msg = msg_type; - red_channel_client_pipe_add(rcc, &item->base); - red_channel_client_push(rcc); -} - static void red_channel_client_pipe_add_empty_msg_proxy(gpointer data, gpointer user_data) { int type = GPOINTER_TO_INT(user_data); @@ -1725,117 +432,38 @@ void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type) g_list_foreach(channel->clients, red_channel_client_pipe_add_empty_msg_proxy, GINT_TO_POINTER(msg_type)); } -int red_channel_client_is_connected(RedChannelClient *rcc) -{ - if (!rcc->dummy) { - return rcc->channel - && (g_list_find(rcc->channel->clients, rcc) != NULL); - } else { - return rcc->dummy_connected; - } -} - int red_channel_is_connected(RedChannel *channel) { return channel && channel->clients; } -static void red_channel_client_clear_sent_item(RedChannelClient *rcc) -{ - red_channel_client_release_sent_item(rcc); - rcc->send_data.blocked = FALSE; - rcc->send_data.size = 0; -} - -void red_channel_client_pipe_clear(RedChannelClient *rcc) -{ - RedPipeItem *item; - - if (rcc) { - red_channel_client_clear_sent_item(rcc); - } - while ((item = (RedPipeItem *)ring_get_head(&rcc->pipe))) { - ring_remove(&item->link); - red_pipe_item_unref(item); - } - rcc->pipe_size = 0; -} - -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) -{ - rcc->ack_data.messages_window = 0; -} - -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window) -{ - rcc->ack_data.client_window = client_window; -} - -static void red_channel_remove_client(RedChannelClient *rcc) +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc) { GList *link; + g_return_if_fail(channel == red_channel_client_get_channel(rcc)); - if (!pthread_equal(pthread_self(), rcc->channel->thread_id)) { + if (!pthread_equal(pthread_self(), channel->thread_id)) { spice_warning("channel type %d id %d - " "channel->thread_id (0x%lx) != pthread_self (0x%lx)." "If one of the threads is != io-thread && != vcpu-thread, " "this might be a BUG", - rcc->channel->type, rcc->channel->id, - rcc->channel->thread_id, pthread_self()); + channel->type, channel->id, + channel->thread_id, pthread_self()); } - spice_return_if_fail(rcc->channel); - link = g_list_find(rcc->channel->clients, rcc); + spice_return_if_fail(channel); + link = g_list_find(channel->clients, rcc); spice_return_if_fail(link != NULL); - rcc->channel->clients = g_list_remove_link(rcc->channel->clients, link); + channel->clients = g_list_remove_link(channel->clients, link); // TODO: should we set rcc->channel to NULL??? } -static void red_client_remove_channel(RedChannelClient *rcc) -{ - pthread_mutex_lock(&rcc->client->lock); - rcc->client->channels = g_list_remove(rcc->client->channels, rcc); - pthread_mutex_unlock(&rcc->client->lock); -} - -static void red_channel_client_disconnect_dummy(RedChannelClient *rcc) -{ - GList *link; - spice_assert(rcc->dummy); - if (rcc->channel && (link = g_list_find(rcc->channel->clients, rcc))) { - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); - red_channel_remove_client(link->data); - } - rcc->dummy_connected = FALSE; -} - -void red_channel_client_disconnect(RedChannelClient *rcc) +void red_client_remove_channel(RedChannelClient *rcc) { - if (rcc->dummy) { - red_channel_client_disconnect_dummy(rcc); - return; - } - if (!red_channel_client_is_connected(rcc)) { - return; - } - spice_printerr("rcc=%p (channel=%p type=%d id=%d)", rcc, rcc->channel, - rcc->channel->type, rcc->channel->id); - red_channel_client_pipe_clear(rcc); - if (rcc->stream->watch) { - rcc->channel->core->watch_remove(rcc->stream->watch); - rcc->stream->watch = NULL; - } - if (rcc->latency_monitor.timer) { - rcc->channel->core->timer_remove(rcc->latency_monitor.timer); - rcc->latency_monitor.timer = NULL; - } - if (rcc->connectivity_monitor.timer) { - rcc->channel->core->timer_remove(rcc->connectivity_monitor.timer); - rcc->connectivity_monitor.timer = NULL; - } - red_channel_remove_client(rcc); - rcc->channel->channel_cbs.on_disconnect(rcc); + RedClient *client = red_channel_client_get_client(rcc); + pthread_mutex_lock(&client->lock); + client->channels = g_list_remove(client->channels, rcc); + pthread_mutex_unlock(&client->lock); } void red_channel_disconnect(RedChannel *channel) @@ -1843,51 +471,6 @@ void red_channel_disconnect(RedChannel *channel) g_list_foreach(channel->clients, (GFunc)red_channel_client_disconnect, NULL); } -RedChannelClient *red_channel_client_create_dummy(int size, - RedChannel *channel, - RedClient *client, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) -{ - RedChannelClient *rcc = NULL; - - spice_assert(size >= sizeof(RedChannelClient)); - - pthread_mutex_lock(&client->lock); - if (!red_channel_client_pre_create_validate(channel, client)) { - goto error; - } - rcc = spice_malloc0(size); - rcc->refs = 1; - rcc->client = client; - rcc->channel = channel; - red_channel_ref(channel); - red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); - if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { - rcc->incoming.header = mini_header_wrapper; - rcc->send_data.header = mini_header_wrapper; - rcc->is_mini_header = TRUE; - } else { - rcc->incoming.header = full_header_wrapper; - rcc->send_data.header = full_header_wrapper; - rcc->is_mini_header = FALSE; - } - - rcc->incoming.header.data = rcc->incoming.header_buf; - rcc->incoming.serial = 1; - ring_init(&rcc->pipe); - - rcc->dummy = TRUE; - rcc->dummy_connected = TRUE; - red_channel_add_client(channel, rcc); - red_client_add_channel(client, rcc); - pthread_mutex_unlock(&client->lock); - return rcc; -error: - pthread_mutex_unlock(&client->lock); - return NULL; -} - void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) { g_list_foreach(channel->clients, (GFunc)cb, NULL); @@ -1928,49 +511,18 @@ int red_channel_any_blocked(RedChannel *channel) return FALSE; } -int red_channel_client_is_blocked(RedChannelClient *rcc) -{ - return rcc && rcc->send_data.blocked; -} - -int red_channel_client_send_message_pending(RedChannelClient *rcc) -{ - return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; -} - -/* accessors for RedChannelClient */ -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc) -{ - return rcc->send_data.marshaller; -} - -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc) -{ - return rcc->stream; -} - -RedClient *red_channel_client_get_client(RedChannelClient *rcc) -{ - return rcc->client; -} - -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) -{ - rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); -} - -/* end of accessors */ - int red_channel_get_first_socket(RedChannel *channel) { RedChannelClient *rcc; + RedsStream *stream; if (!channel || !channel->clients) { return -1; } rcc = g_list_nth_data(channel->clients, 0); + stream = red_channel_client_get_stream(rcc); - return rcc->stream->socket; + return stream->socket; } int red_channel_no_item_being_sent(RedChannel *channel) @@ -1986,18 +538,6 @@ int red_channel_no_item_being_sent(RedChannel *channel) return TRUE; } -int red_channel_client_no_item_being_sent(RedChannelClient *rcc) -{ - return !rcc || (rcc->send_data.size == 0); -} - -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, - RedPipeItem *item) -{ - red_channel_client_pipe_remove(rcc, item); - red_pipe_item_unref(item); -} - /* * RedClient implementation - kept in red-channel.c because they are * pretty tied together. @@ -2035,21 +575,6 @@ RedClient *red_client_unref(RedClient *client) return client; } -/* client mutex should be locked before this call */ -static gboolean red_channel_client_set_migration_seamless(RedChannelClient *rcc) -{ - gboolean ret = FALSE; - - if (rcc->channel->migration_flags & SPICE_MIGRATE_NEED_DATA_TRANSFER) { - rcc->wait_migrate_data = TRUE; - ret = TRUE; - } - spice_debug("channel type %d id %d rcc %p wait data %d", rcc->channel->type, rcc->channel->id, rcc, - rcc->wait_migrate_data); - - return ret; -} - void red_client_set_migration_seamless(RedClient *client) // dest { GList *link; @@ -2118,9 +643,8 @@ void red_client_destroy(RedClient *client) // TODO: should we go back to async. For this we need to use // ref count for channel clients. channel->client_cbs.disconnect(rcc); - spice_assert(ring_is_empty(&rcc->pipe)); - spice_assert(rcc->pipe_size == 0); - spice_assert(rcc->send_data.size == 0); + spice_assert(red_channel_client_pipe_is_empty(rcc)); + spice_assert(red_channel_client_no_item_being_sent(rcc)); red_channel_client_destroy(rcc); link = next; } @@ -2128,7 +652,7 @@ void red_client_destroy(RedClient *client) } /* client->lock should be locked */ -static RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id) { GList *link; RedChannelClient *rcc; @@ -2147,7 +671,7 @@ static RedChannelClient *red_client_get_channel(RedClient *client, int type, int } /* client->lock should be locked */ -static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) +void red_client_add_channel(RedClient *client, RedChannelClient *rcc) { spice_assert(rcc && client); client->channels = g_list_prepend(client->channels, rcc); @@ -2179,11 +703,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client) link = client->channels; while (link) { next = link->next; - RedChannelClient *rcc = link->data; - - if (rcc->latency_monitor.timer) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - } + red_channel_client_semi_seamless_migration_complete(link->data); link = next; } pthread_mutex_unlock(&client->lock); @@ -2275,8 +795,10 @@ uint32_t red_channel_max_pipe_size(RedChannel *channel) uint32_t pipe_size = 0; for (link = channel->clients; link != NULL; link = link->next) { + uint32_t new_size; rcc = link->data; - pipe_size = MAX(pipe_size, rcc->pipe_size); + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MAX(pipe_size, new_size); } return pipe_size; } @@ -2288,7 +810,9 @@ uint32_t red_channel_min_pipe_size(RedChannel *channel) uint32_t pipe_size = ~0; FOREACH_CLIENT(channel, link, next, rcc) { - pipe_size = MIN(pipe_size, rcc->pipe_size); + uint32_t new_size; + new_size = red_channel_client_get_pipe_size(rcc); + pipe_size = MIN(pipe_size, new_size); } return pipe_size == ~0 ? 0 : pipe_size; } @@ -2300,102 +824,11 @@ uint32_t red_channel_sum_pipes_size(RedChannel *channel) uint32_t sum = 0; FOREACH_CLIENT(channel, link, next, rcc) { - sum += rcc->pipe_size; + sum += red_channel_client_get_pipe_size(rcc); } return sum; } -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, - int64_t timeout) -{ - uint64_t end_time; - int blocked; - - if (!red_channel_client_is_blocked(rcc)) { - return TRUE; - } - if (timeout != -1) { - end_time = spice_get_monotonic_time_ns() + timeout; - } else { - end_time = UINT64_MAX; - } - spice_info("blocked"); - - do { - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - } while ((blocked = red_channel_client_is_blocked(rcc)) && - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)); - - if (blocked) { - spice_warning("timeout"); - return FALSE; - } else { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - return TRUE; - } -} - -static void marker_pipe_item_free(RedPipeItem *base) -{ - MarkerPipeItem *item = SPICE_UPCAST(MarkerPipeItem, base); - - if (item->item_in_pipe) { - *item->item_in_pipe = FALSE; - } - free(item); -} - -/* TODO: more evil sync stuff. anything with the word wait in it's name. */ -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, - RedPipeItem *item, - int64_t timeout) -{ - uint64_t end_time; - gboolean item_in_pipe; - - spice_info(NULL); - - if (timeout != -1) { - end_time = spice_get_monotonic_time_ns() + timeout; - } else { - end_time = UINT64_MAX; - } - - MarkerPipeItem *mark_item = spice_new0(MarkerPipeItem, 1); - - red_pipe_item_init_full(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER, - marker_pipe_item_free); - item_in_pipe = TRUE; - mark_item->item_in_pipe = &item_in_pipe; - red_channel_client_pipe_add_after(rcc, &mark_item->base, item); - - if (red_channel_client_is_blocked(rcc)) { - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - } - red_channel_client_push(rcc); - - while(item_in_pipe && - (timeout == -1 || spice_get_monotonic_time_ns() < end_time)) { - usleep(CHANNEL_BLOCKED_SLEEP_DURATION); - red_channel_client_receive(rcc); - red_channel_client_send(rcc); - red_channel_client_push(rcc); - } - - if (item_in_pipe) { - // still on the queue, make sure won't overwrite the stack variable - mark_item->item_in_pipe = NULL; - spice_warning("timeout"); - return FALSE; - } else { - return red_channel_client_wait_outgoing_item(rcc, - timeout == -1 ? -1 : end_time - spice_get_monotonic_time_ns()); - } -} - int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout) { @@ -2430,31 +863,7 @@ int red_channel_wait_all_sent(RedChannel *channel, } } -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc) -{ - if (red_channel_client_is_blocked(rcc) || rcc->pipe_size > 0) { - red_channel_client_disconnect(rcc); - } else { - spice_assert(red_channel_client_no_item_being_sent(rcc)); - } -} - RedsState* red_channel_get_server(RedChannel *channel) { return channel->reds; } - -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc) -{ - return rcc->channel; -} - -gboolean red_channel_client_is_destroying(RedChannelClient *rcc) -{ - return rcc->destroying; -} - -void red_channel_client_set_destroying(RedChannelClient *rcc) -{ - rcc->destroying = TRUE; -} diff --git a/server/red-channel.h b/server/red-channel.h index 370f6a7..68bfc7a 100644 --- a/server/red-channel.h +++ b/server/red-channel.h @@ -89,17 +89,6 @@ typedef struct IncomingHandlerInterface { on_input_proc on_input; } IncomingHandlerInterface; -typedef struct IncomingHandler { - IncomingHandlerInterface *cb; - void *opaque; - uint8_t header_buf[MAX_HEADER_SIZE]; - SpiceDataHeaderOpaque header; - uint32_t header_pos; - uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. - uint32_t msg_pos; - uint64_t serial; -} IncomingHandler; - typedef int (*get_outgoing_msg_size_proc)(void *opaque); typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos); typedef void (*on_outgoing_error_proc)(void *opaque); @@ -116,16 +105,6 @@ typedef struct OutgoingHandlerInterface { on_output_proc on_output; } OutgoingHandlerInterface; -typedef struct OutgoingHandler { - OutgoingHandlerInterface *cb; - void *opaque; - struct iovec vec_buf[IOV_MAX]; - int vec_size; - struct iovec *vec; - int pos; - int size; -} OutgoingHandler; - /* Red Channel interface */ typedef struct RedChannel RedChannel; @@ -231,62 +210,6 @@ typedef struct RedChannelClientConnectivityMonitor { SpiceTimer *timer; } RedChannelClientConnectivityMonitor; -struct RedChannelClient { - RedChannel *channel; - RedClient *client; - RedsStream *stream; - int dummy; - int dummy_connected; - - uint32_t refs; - - struct { - uint32_t generation; - uint32_t client_generation; - uint32_t messages_window; - uint32_t client_window; - } ack_data; - - struct { - SpiceMarshaller *marshaller; - SpiceDataHeaderOpaque header; - uint32_t size; - RedPipeItem *item; - int blocked; - uint64_t serial; - uint64_t last_sent_serial; - - struct { - SpiceMarshaller *marshaller; - uint8_t *header_data; - RedPipeItem *item; - } main; - - struct { - SpiceMarshaller *marshaller; - } urgent; - } send_data; - - OutgoingHandler outgoing; - IncomingHandler incoming; - int during_send; - int id; // debugging purposes - Ring pipe; - uint32_t pipe_size; - - RedChannelCapabilities remote_caps; - int is_mini_header; - int destroying; - - int wait_migrate_data; - int wait_migrate_flush_mark; - - RedChannelClientLatencyMonitor latency_monitor; - RedChannelClientConnectivityMonitor connectivity_monitor; -}; - -#define RED_CHANNEL_CLIENT(Client) ((RedChannelClient *)(Client)) - struct RedChannel { uint32_t type; uint32_t id; @@ -339,17 +262,6 @@ struct RedChannel { #define RED_CHANNEL(Channel) ((RedChannel *)(Channel)) -/* - * When an error occurs over a channel, we treat it as a warning - * for spice-server and shutdown the channel. - */ -#define spice_channel_client_error(rcc, format, ...) \ - do { \ - spice_warning("rcc %p type %u id %u: " format, rcc, \ - rcc->channel->type, rcc->channel->id, ## __VA_ARGS__); \ - red_channel_client_shutdown(rcc); \ - } while (0) - /* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't * explicitly destroy the channel */ RedChannel *red_channel_create(int size, @@ -372,6 +284,11 @@ RedChannel *red_channel_create_parser(int size, channel_handle_parsed_proc handle_parsed, const ChannelCbs *channel_cbs, uint32_t migration_flags); +void red_channel_ref(RedChannel *channel); +void red_channel_unref(RedChannel *channel); +void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc); +void red_channel_remove_client(RedChannel *channel, RedChannelClient *rcc); + void red_channel_set_stat_node(RedChannel *channel, StatNodeRef stat); void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *client_cbs, gpointer cbs_data); @@ -379,26 +296,13 @@ void red_channel_register_client_cbs(RedChannel *channel, const ClientCbs *clien void red_channel_set_common_cap(RedChannel *channel, uint32_t cap); void red_channel_set_cap(RedChannel *channel, uint32_t cap); -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, - RedsStream *stream, - int monitor_latency, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps); // TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but // do use the client callbacks. So the channel clients are not connected (the channel doesn't // have list of them, but they do have a link to the channel, and the client has a list of them) RedChannel *red_channel_create_dummy(int size, RedsState *reds, uint32_t type, uint32_t id); -RedChannelClient *red_channel_client_create_dummy(int size, - RedChannel *channel, - RedClient *client, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps); int red_channel_is_connected(RedChannel *channel); -int red_channel_client_is_connected(RedChannelClient *rcc); -void red_channel_client_default_migrate(RedChannelClient *rcc); -int red_channel_client_is_waiting_for_migrate_data(RedChannelClient *rcc); /* seamless migration is supported for only one client. This routine * checks if the only channel client associated with channel is * waiting for migration data */ @@ -411,61 +315,15 @@ int red_channel_is_waiting_for_migrate_data(RedChannel *channel); * from red_client_destroy. */ -void red_channel_client_destroy(RedChannelClient *rcc); void red_channel_destroy(RedChannel *channel); -void red_channel_client_ref(RedChannelClient *rcc); -void red_channel_client_unref(RedChannelClient *rcc); - -int red_channel_client_test_remote_common_cap(RedChannelClient *rcc, uint32_t cap); -int red_channel_client_test_remote_cap(RedChannelClient *rcc, uint32_t cap); /* return true if all the channel clients support the cap */ int red_channel_test_remote_common_cap(RedChannel *channel, uint32_t cap); int red_channel_test_remote_cap(RedChannel *channel, uint32_t cap); -/* shutdown is the only safe thing to do out of the client/channel - * thread. It will not touch the rings, just shutdown the socket. - * It should be followed by some way to gurantee a disconnection. */ -void red_channel_client_shutdown(RedChannelClient *rcc); - /* should be called when a new channel is ready to send messages */ void red_channel_init_outgoing_messages_window(RedChannel *channel); -/* handles general channel msgs from the client */ -int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message); - -/* when preparing send_data: should call init and then use marshaller */ -void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, RedPipeItem *item); - -uint64_t red_channel_client_get_message_serial(RedChannelClient *channel); -void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); - -/* When sending a msg. Should first call red_channel_client_begin_send_message. - * It will first send the pending urgent data, if there is any, and then - * the rest of the data. - */ -void red_channel_client_begin_send_message(RedChannelClient *rcc); - -/* - * Stores the current send data, and switches to urgent send data. - * When it begins the actual send, it will send first the urgent data - * and afterward the rest of the data. - * Should be called only if during the marshalling of on message, - * the need to send another message, before, rises. - * Important: the serial of the non-urgent sent data, will be succeeded. - * return: the urgent send data marshaller - */ -SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc); - -/* returns -1 if we don't have an estimation */ -int red_channel_client_get_roundtrip_ms(RedChannelClient *rcc); - -/* - * Checks periodically if the connection is still alive - */ -void red_channel_client_start_connectivity_monitoring(RedChannelClient *rcc, uint32_t timeout_ms); - // TODO: add back the channel_pipe_add functionality - by adding reference counting // to the RedPipeItem. @@ -475,23 +333,10 @@ int red_channel_pipes_new_add_push(RedChannel *channel, new_pipe_item_t creator, void red_channel_pipes_new_add(RedChannel *channel, new_pipe_item_t creator, void *data); void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator, void *data); -void red_channel_client_pipe_add_push(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add_after(RedChannelClient *rcc, RedPipeItem *item, RedPipeItem *pos); -int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, RedPipeItem *item); -void red_channel_client_pipe_add_tail_and_push(RedChannelClient *rcc, RedPipeItem *item); -/* for types that use this routine -> the pipe item should be freed */ -void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type); void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type); -void red_channel_client_pipe_add_empty_msg(RedChannelClient *rcc, int msg_type); void red_channel_pipes_add_empty_msg(RedChannel *channel, int msg_type); -void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc); -void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_window); -void red_channel_client_push_set_ack(RedChannelClient *rcc); - int red_channel_get_first_socket(RedChannel *channel); /* return TRUE if all of the connected clients to this channel are blocked */ @@ -500,24 +345,13 @@ int red_channel_all_blocked(RedChannel *channel); /* return TRUE if any of the connected clients to this channel are blocked */ int red_channel_any_blocked(RedChannel *channel); -int red_channel_client_is_blocked(RedChannelClient *rcc); - -/* helper for channels that have complex logic that can possibly ready a send */ -int red_channel_client_send_message_pending(RedChannelClient *rcc); - int red_channel_no_item_being_sent(RedChannel *channel); -int red_channel_client_no_item_being_sent(RedChannelClient *rcc); // TODO: unstaticed for display/cursor channels. they do some specific pushes not through // adding elements or on events. but not sure if this is actually required (only result // should be that they ""try"" a little harder, but if the event system is correct it // should not make any difference. void red_channel_push(RedChannel *channel); -void red_channel_client_push(RedChannelClient *rcc); -// TODO: again - what is the context exactly? this happens in channel disconnect. but our -// current red_channel_shutdown also closes the socket - is there a socket to close? -// are we reading from an fd here? arghh -void red_channel_client_pipe_clear(RedChannelClient *rcc); // Again, used in various places outside of event handler context (or in other event handler // contexts): // flush_display_commands/flush_cursor_commands @@ -526,23 +360,10 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc); // red_wait_pipe_item_sent // handle_channel_events - this is the only one that was used before, and was in red-channel.c void red_channel_receive(RedChannel *channel); -void red_channel_client_receive(RedChannelClient *rcc); // For red_worker void red_channel_send(RedChannel *channel); -void red_channel_client_send(RedChannelClient *rcc); // For red_worker void red_channel_disconnect(RedChannel *channel); -void red_channel_client_disconnect(RedChannelClient *rcc); - -/* accessors for RedChannelClient */ -/* Note: the valid times to call red_channel_get_marshaller are just during send_item callback. */ -SpiceMarshaller *red_channel_client_get_marshaller(RedChannelClient *rcc); -RedsStream *red_channel_client_get_stream(RedChannelClient *rcc); -RedClient *red_channel_client_get_client(RedChannelClient *rcc); - -/* Note that the header is valid only between red_channel_reset_send_data and - * red_channel_begin_send_message.*/ -void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list); /* return the sum of all the rcc pipe size */ uint32_t red_channel_max_pipe_size(RedChannel *channel); @@ -596,6 +417,11 @@ RedClient *red_client_ref(RedClient *client); */ RedClient *red_client_unref(RedClient *client); +/* client->lock should be locked */ +void red_client_add_channel(RedClient *client, RedChannelClient *rcc); +void red_client_remove_channel(RedChannelClient *rcc); +RedChannelClient *red_client_get_channel(RedClient *client, int type, int id); + MainChannelClient *red_client_get_main(RedClient *client); // main should be set once before all the other channels are created void red_client_set_main(RedClient *client, MainChannelClient *mcc); @@ -611,7 +437,7 @@ void red_client_semi_seamless_migrate_complete(RedClient *client); /* dst side * int red_client_during_migrate_at_target(RedClient *client); void red_client_migrate(RedClient *client); - +gboolean red_client_seamless_migration_done_for_channel(RedClient *client); /* * blocking functions. * @@ -620,16 +446,9 @@ void red_client_migrate(RedClient *client); * Return: TRUE if waiting succeeded. FALSE if timeout expired. */ -int red_channel_client_wait_pipe_item_sent(RedChannelClient *rcc, - RedPipeItem *item, - int64_t timeout); -int red_channel_client_wait_outgoing_item(RedChannelClient *rcc, - int64_t timeout); int red_channel_wait_all_sent(RedChannel *channel, int64_t timeout); -void red_channel_client_disconnect_if_pending_send(RedChannelClient *rcc); -RedChannel* red_channel_client_get_channel(RedChannelClient *rcc); -gboolean red_channel_client_is_destroying(RedChannelClient *rcc); -void red_channel_client_set_destroying(RedChannelClient *rcc); + +#define CHANNEL_BLOCKED_SLEEP_DURATION 10000 //micro #endif diff --git a/server/red-worker.h b/server/red-worker.h index 368bfa1..a0a327a 100644 --- a/server/red-worker.h +++ b/server/red-worker.h @@ -21,6 +21,7 @@ #include "red-common.h" #include "red-qxl.h" #include "red-parse-qxl.h" +#include "red-channel-client.h" typedef struct RedWorker RedWorker; diff --git a/server/reds.c b/server/reds.c index f95af14..837b2eb 100644 --- a/server/reds.c +++ b/server/reds.c @@ -72,6 +72,7 @@ #include "reds-private.h" #include "video-encoder.h" +#include "red-channel-client.h" static void reds_client_monitors_config(RedsState *reds, VDAgentMonitorsConfig *monitors_config); static gboolean reds_use_client_monitors_config(RedsState *reds); diff --git a/server/smartcard.c b/server/smartcard.c index a8a16c7..74c2b18 100644 --- a/server/smartcard.c +++ b/server/smartcard.c @@ -30,7 +30,7 @@ #include "reds.h" #include "char-device.h" -#include "red-channel.h" +#include "red-channel-client.h" #include "smartcard.h" #include "migration-protocol.h" diff --git a/server/sound.c b/server/sound.c index 84cbab4..52fe1c3 100644 --- a/server/sound.c +++ b/server/sound.c @@ -34,6 +34,7 @@ #include "main-channel.h" #include "reds.h" #include "red-qxl.h" +#include "red-channel-client.h" #include "sound.h" #include <common/snd_codec.h> #include "demarshallers.h" diff --git a/server/spicevmc.c b/server/spicevmc.c index c79e7bb..3533f3f 100644 --- a/server/spicevmc.c +++ b/server/spicevmc.c @@ -32,6 +32,7 @@ #include "char-device.h" #include "red-channel.h" +#include "red-channel-client.h" #include "reds.h" #include "migration-protocol.h" #ifdef USE_LZ4 -- 2.7.4 _______________________________________________ Spice-devel mailing list Spice-devel@xxxxxxxxxxxxxxxxxxxxx https://lists.freedesktop.org/mailman/listinfo/spice-devel