27.05.2014 10:38, Jay Sorg wrote: One big question up-front, sorry for not asking it earlier. Why do you need to invent a custom protocol to communicate with xrdp, instead of implementing something more standard inside XRDP? Especially since your custom protocol is essentially RTP over stream socket minus sequence numbers & timestamps, or ESD minus auth plus an 8-byte header? Now stating the obvious findings, will review again tomorrow more thoroughly. > +static int close_send(struct userdata *u) { > + struct xrdp_header h; > + > + pa_log("close_send:"); > + if (u->fd == 0) { > + return 0; > + } > + h.code = 1; Magic value. Please make an enum for all of them. > + h.bytes = 8; This looks like sizeof(struct xrdp_header), which is not guaranteed to be 8. Use uint32_t and friends in the definition of that structure. > + if (send(u->fd, &h, 8, 0) != 8) { This can fail, partial writes are common on sockets. > + pa_log("close_send: send failed"); > + close(u->fd); > + u->fd = 0; > + return 0; > + } else { > + pa_log_debug("close_send: sent header ok"); > + } > + return 8; > +} > + > +static int sink_process_msg(pa_msgobject *o, int code, void *data, > + int64_t offset, pa_memchunk *chunk) { > + > + struct userdata *u = PA_SINK(o)->userdata; > + pa_usec_t now; > + long lat; > + > + pa_log_debug("sink_process_msg: code %d", code); Useless debug > + > + switch (code) { > + > + case PA_SINK_MESSAGE_SET_VOLUME: /* 3 */ > + break; > + > + case PA_SINK_MESSAGE_SET_MUTE: /* 6 */ > + break; > + These cases should be removed. > + case PA_SINK_MESSAGE_GET_LATENCY: /* 7 */ > + now = pa_rtclock_now(); > + lat = u->timestamp > now ? u->timestamp - now : 0ULL; > + pa_log_debug("sink_process_msg: lat %ld", lat); > + *((pa_usec_t*) data) = lat; > + return 0; This (together with the fact that later you add and subtract things based on the number of samples) seems to be based on the assumption that the wall clock and client audio clocks tick at the same rate. If xrdp eats samples from the socket too slowly due to a misbehaving client, a difference will accumulate and lead to unpleasant results such as a blocked socket. Please use only one of these two clock sources. > + > + case PA_SINK_MESSAGE_GET_REQUESTED_LATENCY: /* 8 */ > + break; Also useless. > + > + case PA_SINK_MESSAGE_SET_STATE: /* 9 */ > + if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ { > + pa_log("sink_process_msg: running"); > + > + u->timestamp = pa_rtclock_now(); > + } else { > + pa_log("sink_process_msg: not running"); > + close_send(u); > + } > + break; > + > + } > + > + return pa_sink_process_msg(o, code, data, offset, chunk); > +} > + > +static void sink_update_requested_latency_cb(pa_sink *s) { > + struct userdata *u; > + size_t nbytes; > + > + pa_sink_assert_ref(s); > + pa_assert_se(u = s->userdata); > + > + u->block_usec = BLOCK_USEC; > + > + u->got_max_latency = 0; > + if (u->block_usec == (pa_usec_t) -1) { > + u->block_usec = s->thread_info.max_latency; > + u->got_max_latency = 1; > + } > + > + nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec); > + pa_sink_set_max_rewind_within_thread(s, nbytes); > + pa_sink_set_max_request_within_thread(s, nbytes); > +} > + > +static void process_rewind(struct userdata *u, pa_usec_t now) { > + size_t rewind_nbytes, in_buffer; > + pa_usec_t delay; > + > + pa_assert(u); > + > + /* Figure out how much we shall rewind and reset the counter */ > + rewind_nbytes = u->sink->thread_info.rewind_nbytes; > + u->sink->thread_info.rewind_nbytes = 0; > + > + pa_assert(rewind_nbytes > 0); > + pa_log_debug("Requested to rewind %lu bytes.", > + (unsigned long) rewind_nbytes); > + > + if (u->timestamp <= now) > + goto do_nothing; > + > + delay = u->timestamp - now; > + in_buffer = pa_usec_to_bytes(delay, &u->sink->sample_spec); > + > + if (in_buffer <= 0) > + goto do_nothing; > + > + if (rewind_nbytes > in_buffer) > + rewind_nbytes = in_buffer; > + > + pa_sink_process_rewind(u->sink, rewind_nbytes); > + u->timestamp -= pa_bytes_to_usec(rewind_nbytes, &u->sink->sample_spec); > + u->skip_bytes += rewind_nbytes; > + > + pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); > + return; > + > +do_nothing: > + > + pa_sink_process_rewind(u->sink, 0); > +} I will test this separately tomorrow with a deliberately written evil pulse client if you give me the instructions how to set up xrdp. Rewinding is an area that nobody tests, and nobody gets right from the first try. In fact I would prefer that your sink has no rewind support at all (and a small fixed latency) rather than rewind support that only pretends to work. See module-jack-sink as a possibly-relevant example. > + > +static int get_display_num_from_display(char *display_text) { > + int lindex; > + int mode; > + int host_index; > + int disp_index; > + int scre_index; > + int display_num; > + char host[256]; > + char disp[256]; > + char scre[256]; > + > + if (display_text == NULL) { > + return 0; > + } > + memset(host, 0, 256); > + memset(disp, 0, 256); > + memset(scre, 0, 256); > + > + lindex = 0; > + host_index = 0; > + disp_index = 0; > + scre_index = 0; > + mode = 0; > + > + while (display_text[lindex] != 0) { > + if (display_text[lindex] == ':') { > + mode = 1; > + } else if (display_text[lindex] == '.') { > + mode = 2; > + } else if (mode == 0) { > + host[host_index] = display_text[lindex]; > + host_index++; > + if (host_index > 255) { > + return 0; > + } > + } else if (mode == 1) { > + disp[disp_index] = display_text[lindex]; > + disp_index++; > + if (disp_index > 255) { > + return 0; > + } > + } else if (mode == 2) { > + scre[scre_index] = display_text[lindex]; > + scre_index++; > + if (scre_index > 255) { > + return 0; > + } > + } > + lindex++; > + } > + > + host[host_index] = 0; > + disp[disp_index] = 0; > + scre[scre_index] = 0; > + display_num = atoi(disp); > + return display_num; > +} > + > +static int data_send(struct userdata *u, pa_memchunk *chunk) { > + char *data; > + int bytes; > + int sent; > + int fd; > + struct xrdp_header h; > + struct sockaddr_un s; > + > + if (u->fd == 0) { I'd rather use -1 as the invalid fd value. > + if (u->failed_connect_time != 0) { > + if (pa_rtclock_now() - u->failed_connect_time < 1000000) { > + return 0; > + } > + } > + fd = socket(PF_LOCAL, SOCK_STREAM, 0); > + memset(&s, 0, sizeof(s)); > + s.sun_family = AF_UNIX; > + bytes = sizeof(s.sun_path) - 1; > + snprintf(s.sun_path, bytes, CHANSRV_PORT_STR, u->display_num); > + pa_log_debug("trying to conenct to %s", s.sun_path); > + if (connect(fd, (struct sockaddr *)&s, > + sizeof(struct sockaddr_un)) != 0) { > + u->failed_connect_time = pa_rtclock_now(); > + pa_log_debug("Connected failed"); > + close(fd); > + return 0; > + } > + u->failed_connect_time = 0; > + pa_log("Connected ok fd %d", fd); > + u->fd = fd; > + } > + > + bytes = chunk->length; > + pa_log_debug("bytes %d", bytes); > + > + /* from rewind */ > + if (u->skip_bytes > 0) { > + if (bytes > u->skip_bytes) { > + bytes -= u->skip_bytes; > + u->skip_bytes = 0; > + } else { > + u->skip_bytes -= bytes; > + return bytes; > + } > + } > + > + h.code = 0; > + h.bytes = bytes + 8; > + if (send(u->fd, &h, 8, 0) != 8) { > + pa_log("data_send: send failed"); > + close(u->fd); > + u->fd = 0; > + return 0; > + } else { > + pa_log_debug("data_send: sent header ok bytes %d", bytes); > + } Sorry, I don't follow the logic here. You seem to store the total rewound amount in u->skip_bytes, and just don't send too-new data that was canceled. However, where is the guarantee that nobody will attempt to (rightfully) cancel the just-sent data? In fact, in process_render(), you render eagerly until you get block_usec or more of extra samples, with block_usec thus effectively being the non-rewindable latency. And block_usec is settable, and can be set to an arbitrarily large value, which is bad. Cap it to ~150-200 ms if you really want it to be settable at all. > + > + data = (char*)pa_memblock_acquire(chunk->memblock); > + data += chunk->index; > + sent = send(u->fd, data, bytes, 0); > + pa_memblock_release(chunk->memblock); > + > + if (sent != bytes) { Same here as already mentioned in the close_send. Partial writes are common on stream-oriented sockets. Please make yourself an LD_PRELOAD library that randomly caps the length for easy debugging of this issue. > + pa_log("data_send: send failed sent %d bytes %d", sent, bytes); > + close(u->fd); > + u->fd = 0; > + return 0; > + } > + > + return sent; > +} > + > +static void process_render(struct userdata *u, pa_usec_t now) { > + pa_memchunk chunk; > + int request_bytes; > + > + pa_assert(u); > + if (u->got_max_latency) { > + return; > + } > + pa_log_debug("process_render: u->block_usec %d", (int)(u->block_usec)); > + while (u->timestamp < now + u->block_usec) { > + request_bytes = u->sink->thread_info.max_request; > + request_bytes = MIN(request_bytes, 16 * 1024); Explain this magic value and put it into a #define. > + pa_sink_render(u->sink, request_bytes, &chunk); > + data_send(u, &chunk); > + pa_memblock_unref(chunk.memblock); > + u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec); > + } > +} > + > +static void thread_func(void *userdata) { > + > + struct userdata *u = userdata; > + int ret; > + pa_usec_t now; > + > + pa_assert(u); > + > + pa_log_debug("Thread starting up"); > + > + pa_thread_mq_install(&u->thread_mq); > + > + u->timestamp = pa_rtclock_now(); > + > + for (;;) { > + > + if (u->sink->thread_info.state == PA_SINK_RUNNING) { > + > + now = pa_rtclock_now(); > + > + if (u->sink->thread_info.rewind_requested) { > + if (u->sink->thread_info.rewind_nbytes > 0) { > + process_rewind(u, now); > + } else { > + pa_sink_process_rewind(u->sink, 0); > + } > + } > + > + if (u->timestamp <= now) { > + pa_log_debug("thread_func: calling process_render"); > + process_render(u, now); > + } > + > + pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); > + > + } else { > + pa_rtpoll_set_timer_disabled(u->rtpoll); > + } > + > + if ((ret = pa_rtpoll_run(u->rtpoll, true)) < 0) { > + goto fail; > + } > + > + if (ret == 0) { > + goto finish; > + } > + } > + > +fail: > + /* If this was no regular exit from the loop we have to continue > + * processing messages until we received PA_MESSAGE_SHUTDOWN */ > + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), > + PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, > + NULL, NULL); > + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); > + > +finish: > + pa_log_debug("Thread shutting down"); > +} > + > +int pa__init(pa_module*m) { > + struct userdata *u = NULL; > + pa_sample_spec ss; > + pa_channel_map map; > + pa_modargs *ma = NULL; > + pa_sink_new_data data; > + size_t nbytes; > + > + pa_assert(m); > + > + if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { > + pa_log("Failed to parse module arguments."); > + goto fail; > + } > + > + ss = m->core->default_sample_spec; > + map = m->core->default_channel_map; > + if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, > + PA_CHANNEL_MAP_DEFAULT) < 0) { > + pa_log("Invalid sample format specification or channel map"); > + goto fail; > + } > + > + m->userdata = u = pa_xnew0(struct userdata, 1); > + u->core = m->core; > + u->module = m; > + u->rtpoll = pa_rtpoll_new(); > + pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll); > + > + pa_sink_new_data_init(&data); > + data.driver = __FILE__; > + data.module = m; > + pa_sink_new_data_set_name(&data, > + pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME)); > + pa_sink_new_data_set_sample_spec(&data, &ss); > + pa_sink_new_data_set_channel_map(&data, &map); > + pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "xrdp sink"); > + pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "abstract"); > + > + if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, > + PA_UPDATE_REPLACE) < 0) { > + pa_log("Invalid properties"); > + pa_sink_new_data_done(&data); > + goto fail; > + } > + > + u->sink = pa_sink_new(m->core, &data, > + PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY); > + pa_sink_new_data_done(&data); > + > + if (!u->sink) { > + pa_log("Failed to create sink object."); > + goto fail; > + } > + > + u->sink->parent.process_msg = sink_process_msg; > + u->sink->update_requested_latency = sink_update_requested_latency_cb; > + u->sink->userdata = u; > + > + pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); > + pa_sink_set_rtpoll(u->sink, u->rtpoll); > + > + u->block_usec = BLOCK_USEC; > + nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); > + pa_sink_set_max_rewind(u->sink, nbytes); > + pa_sink_set_max_request(u->sink, nbytes); > + > + u->display_num = get_display_num_from_display(getenv("DISPLAY")); > + > + if (!(u->thread = pa_thread_new("xrdp-sink", thread_func, u))) { > + pa_log("Failed to create thread."); > + goto fail; > + } > + > + pa_sink_put(u->sink); > + > + pa_modargs_free(ma); > + > + return 0; > + > +fail: > + if (ma) { > + pa_modargs_free(ma); > + } > + > + pa__done(m); > + > + return -1; > +} > + > +int pa__get_n_used(pa_module *m) { > + struct userdata *u; > + > + pa_assert(m); > + pa_assert_se(u = m->userdata); > + > + return pa_sink_linked_by(u->sink); > +} > + > +void pa__done(pa_module*m) { > + struct userdata *u; > + > + pa_assert(m); > + > + if (!(u = m->userdata)) { > + return; > + } > + > + if (u->sink) { > + pa_sink_unlink(u->sink); > + } > + > + if (u->thread) { > + pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, > + NULL, 0, NULL); > + pa_thread_free(u->thread); > + } > + > + pa_thread_mq_done(&u->thread_mq); > + > + if (u->sink) { > + pa_sink_unref(u->sink); > + } > + > + if (u->rtpoll) { > + pa_rtpoll_free(u->rtpoll); > + } > + > + pa_xfree(u); > +} > -- Alexander E. Patrakov