Turns out sinks and sources must be 'put' right after they've been created otherwise they'll receive message whose handlers assert 'put'. --- src/modules/module-tunnel-sink-new.c | 283 ++++++++++++++++---------- src/modules/module-tunnel-source-new.c | 355 +++++++++++++++++++-------------- src/modules/module-tunnel.c | 190 +++++++++++------- 3 files changed, 499 insertions(+), 329 deletions(-) diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c index fd24e690..e25e3931 100644 --- a/src/modules/module-tunnel-sink-new.c +++ b/src/modules/module-tunnel-sink-new.c @@ -60,16 +60,23 @@ PA_MODULE_USAGE( #define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC) #define TUNNEL_THREAD_FAILED_MAINLOOP 1 -enum { - SINK_MESSAGE_PUT = PA_SINK_MESSAGE_MAX, -}; - static void stream_state_cb(pa_stream *stream, void *userdata); static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata); static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata); static void context_state_cb(pa_context *c, void *userdata); static void sink_update_requested_latency_cb(pa_sink *s); +enum { + TUNNEL_MESSAGE_CREATE_SINK, + TUNNEL_MESSAGE_CREATE_STREAM, +}; + +typedef struct { + pa_msgobject parent; +} tunnel_msg; + +PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject); + struct userdata { pa_module *module; pa_sink *sink; @@ -85,12 +92,20 @@ struct userdata { bool update_stream_bufferattr_after_connect; bool connected; + bool created; char *cookie_file; char *remote_server; char *remote_sink_name; + + tunnel_msg *msg; + + pa_sink_new_data *sink_data; }; +static int stream_create(struct userdata *u); +static int sink_create(struct userdata *u); + static const char* const valid_modargs[] = { "sink_name", "sink_properties", @@ -144,6 +159,42 @@ static pa_proplist* tunnel_new_proplist(struct userdata *u) { return proplist; } +static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u = data; + + pa_assert(u); + + switch (code) { + + /* Delivered from the IO thread, handled in the main thread. */ + case TUNNEL_MESSAGE_CREATE_SINK: + pa_log_debug("Creating sink."); + if (sink_create(u) < 0) { + pa_module_unload_request(u->module, true); + return -1; + } + pa_sink_new_data_done(u->sink_data); + pa_xfree(u->sink_data); + u->sink_data = NULL; + pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL); + break; + + /* Delivered from the main thread, handled in the IO thread. */ + case TUNNEL_MESSAGE_CREATE_STREAM: + u->created = true; + pa_log_debug("Creating stream."); + if (stream_create(u) < 0) { + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + return -1; + } + u->connected = true; + break; + + } + + return 0; +} + static void thread_func(void *userdata) { struct userdata *u = userdata; pa_proplist *proplist; @@ -173,7 +224,7 @@ static void thread_func(void *userdata) { u->remote_server, PA_CONTEXT_NOAUTOSPAWN, NULL) < 0) { - pa_log("Failed to connect libpulse context"); + pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context))); goto fail; } @@ -187,7 +238,7 @@ static void thread_func(void *userdata) { goto fail; } - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) + if (PA_UNLIKELY(u->created && u->sink->thread_info.rewind_requested)) pa_sink_process_rewind(u->sink, 0); if (u->connected && @@ -244,6 +295,34 @@ finish: pa_log_debug("Thread shutting down"); } +static void context_state_cb(pa_context *c, void *userdata) { + struct userdata *u = userdata; + pa_assert(u); + + switch (pa_context_get_state(c)) { + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + case PA_CONTEXT_READY: { + pa_log_debug("Context successfully connected."); + pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK, u, 0, NULL, NULL); + break; + } + case PA_CONTEXT_FAILED: + pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context))); + u->connected = false; + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + break; + case PA_CONTEXT_TERMINATED: + pa_log_debug("Context terminated."); + u->connected = false; + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + break; + } +} + static void stream_state_cb(pa_stream *stream, void *userdata) { struct userdata *u = userdata; @@ -251,7 +330,7 @@ static void stream_state_cb(pa_stream *stream, void *userdata) { switch (pa_stream_get_state(stream)) { case PA_STREAM_FAILED: - pa_log_error("Stream failed."); + pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context))); u->connected = false; u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); break; @@ -291,78 +370,53 @@ static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *user stream_changed_buffer_attr_cb(stream, userdata); } -static void context_state_cb(pa_context *c, void *userdata) { - struct userdata *u = userdata; - pa_assert(u); +/* Handled in the IO thread. */ +static int stream_create(struct userdata *u) { + pa_proplist *proplist; + pa_buffer_attr bufferattr; + pa_usec_t requested_latency; + char *username = pa_get_user_name_malloc(); + char *hostname = pa_get_host_name_malloc(); + /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis at lazus' */ + char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); + pa_xfree(hostname); + pa_xfree(username); - switch (pa_context_get_state(c)) { - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - case PA_CONTEXT_READY: { - pa_proplist *proplist; - pa_buffer_attr bufferattr; - pa_usec_t requested_latency; - char *username = pa_get_user_name_malloc(); - char *hostname = pa_get_host_name_malloc(); - /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis at lazus' */ - char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); - pa_xfree(hostname); - pa_xfree(username); - - pa_log_debug("Connection successful. Creating stream."); - pa_assert(!u->stream); - - proplist = tunnel_new_proplist(u); - u->stream = pa_stream_new_with_proplist(u->context, - stream_name, - &u->sink->sample_spec, - &u->sink->channel_map, - proplist); - pa_proplist_free(proplist); - pa_xfree(stream_name); + pa_assert(!u->stream); - if (!u->stream) { - pa_log_error("Could not create a stream."); - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - return; - } + proplist = tunnel_new_proplist(u); + u->stream = pa_stream_new_with_proplist(u->context, + stream_name, + &u->sink->sample_spec, + &u->sink->channel_map, + proplist); + pa_proplist_free(proplist); + pa_xfree(stream_name); - requested_latency = pa_sink_get_requested_latency_within_thread(u->sink); - if (requested_latency == (pa_usec_t) -1) - requested_latency = u->sink->thread_info.max_latency; - - reset_bufferattr(&bufferattr); - bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec); - - pa_stream_set_state_callback(u->stream, stream_state_cb, userdata); - pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, userdata); - if (pa_stream_connect_playback(u->stream, - u->remote_sink_name, - &bufferattr, - PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE, - NULL, - NULL) < 0) { - pa_log_error("Could not connect stream."); - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - } - u->connected = true; - pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL, NULL); - break; - } - case PA_CONTEXT_FAILED: - pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context))); - u->connected = false; - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - break; - case PA_CONTEXT_TERMINATED: - pa_log_debug("Context terminated."); - u->connected = false; - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - break; + if (!u->stream) { + pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context))); + return -1; + } + + requested_latency = pa_sink_get_requested_latency_within_thread(u->sink); + if (requested_latency == (pa_usec_t) -1) + requested_latency = u->sink->thread_info.max_latency; + + reset_bufferattr(&bufferattr); + bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec); + + pa_stream_set_state_callback(u->stream, stream_state_cb, u); + pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u); + if (pa_stream_connect_playback(u->stream, + u->remote_sink_name, + &bufferattr, + PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE, + NULL, + NULL) < 0) { + pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context))); + return -1; } + return 0; } static void sink_update_requested_latency_cb(pa_sink *s) { @@ -435,13 +489,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of *((int64_t*) data) = remote_latency; return 0; } - - /* Delivered from the IO thread, handled in the main thread. */ - case SINK_MESSAGE_PUT: { - if (u->connected) - pa_sink_put(u->sink); - return 0; - } } return pa_sink_process_msg(o, code, data, offset, chunk); } @@ -480,10 +527,31 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, return 0; } +/* Handled in the main thread. */ +static int sink_create(struct userdata *u) { + if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) { + pa_log("Failed to create sink."); + return -1; + } + + u->sink->userdata = u; + u->sink->parent.process_msg = sink_process_msg_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; + u->sink->update_requested_latency = sink_update_requested_latency_cb; + pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); + + /* set thread message queue */ + pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq); + pa_sink_set_rtpoll(u->sink, u->rtpoll); + + pa_sink_put(u->sink); + + return 0; +} + int pa__init(pa_module *m) { struct userdata *u = NULL; pa_modargs *ma = NULL; - pa_sink_new_data sink_data; pa_sample_spec ss; pa_channel_map map; const char *remote_server = NULL; @@ -539,47 +607,36 @@ int pa__init(pa_module *m) { * with module-tunnel-sink-new. */ u->rtpoll = pa_rtpoll_new(); - /* Create sink */ - pa_sink_new_data_init(&sink_data); - sink_data.driver = __FILE__; - sink_data.module = m; + /* Create sink data */ + u->sink_data = pa_xnew(pa_sink_new_data, 1); + pa_sink_new_data_init(u->sink_data); + u->sink_data->driver = __FILE__; + u->sink_data->module = m; default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server); sink_name = pa_modargs_get_value(ma, "sink_name", default_sink_name); - pa_sink_new_data_set_name(&sink_data, sink_name); - pa_sink_new_data_set_sample_spec(&sink_data, &ss); - pa_sink_new_data_set_channel_map(&sink_data, &map); + pa_sink_new_data_set_name(u->sink_data, sink_name); + pa_sink_new_data_set_sample_spec(u->sink_data, &ss); + pa_sink_new_data_set_channel_map(u->sink_data, &map); - pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "sound"); - pa_proplist_setf(sink_data.proplist, + pa_proplist_sets(u->sink_data->proplist, PA_PROP_DEVICE_CLASS, "sound"); + pa_proplist_setf(u->sink_data->proplist, PA_PROP_DEVICE_DESCRIPTION, _("Tunnel to %s/%s"), remote_server, pa_strempty(u->remote_sink_name)); - if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) { + if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); - pa_sink_new_data_done(&sink_data); - goto fail; - } - if (!(u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) { - pa_log("Failed to create sink."); - pa_sink_new_data_done(&sink_data); goto fail; } - pa_sink_new_data_done(&sink_data); - u->sink->userdata = u; - u->sink->parent.process_msg = sink_process_msg_cb; - u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; - u->sink->update_requested_latency = sink_update_requested_latency_cb; - pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); - - /* set thread message queue */ - pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq); - pa_sink_set_rtpoll(u->sink, u->rtpoll); + /* Setup initial message handler */ + u->msg = pa_msgobject_new(tunnel_msg); + u->msg->parent.process_msg = tunnel_process_msg_cb; + /* start IO thread */ if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) { pa_log("Failed to create thread."); goto fail; @@ -635,6 +692,14 @@ void pa__done(pa_module *m) { if (u->remote_server) pa_xfree(u->remote_server); + if (u->msg) + pa_xfree(u->msg); + + if (u->sink_data) { + pa_sink_new_data_done(u->sink_data); + pa_xfree(u->sink_data); + } + if (u->sink) pa_sink_unref(u->sink); diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c index bff2f6a1..df9c1757 100644 --- a/src/modules/module-tunnel-source-new.c +++ b/src/modules/module-tunnel-source-new.c @@ -59,15 +59,22 @@ PA_MODULE_USAGE( #define TUNNEL_THREAD_FAILED_MAINLOOP 1 -enum { - SOURCE_MESSAGE_PUT = PA_SOURCE_MESSAGE_MAX, -}; - static void stream_state_cb(pa_stream *stream, void *userdata); static void stream_read_cb(pa_stream *s, size_t length, void *userdata); static void context_state_cb(pa_context *c, void *userdata); static void source_update_requested_latency_cb(pa_source *s); +enum { + TUNNEL_MESSAGE_CREATE_SOURCE, + TUNNEL_MESSAGE_CREATE_STREAM, +}; + +typedef struct { + pa_msgobject parent; +} tunnel_msg; + +PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject); + struct userdata { pa_module *module; pa_source *source; @@ -87,8 +94,16 @@ struct userdata { char *cookie_file; char *remote_server; char *remote_source_name; + + tunnel_msg *msg; + + pa_source_new_data *source_data; }; +static int stream_create(struct userdata *u); +static int source_create(struct userdata *u); +static void read_new_samples(struct userdata *u); + static const char* const valid_modargs[] = { "source_name", "source_properties", @@ -133,63 +148,39 @@ static pa_proplist* tunnel_new_proplist(struct userdata *u) { return proplist; } -static void stream_read_cb(pa_stream *s, size_t length, void *userdata) { - struct userdata *u = userdata; - u->new_data = true; -} - -/* called from io context to read samples from the stream into our source */ -static void read_new_samples(struct userdata *u) { - const void *p; - size_t readable = 0; - pa_memchunk memchunk; +static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u = data; pa_assert(u); - u->new_data = false; - - pa_memchunk_reset(&memchunk); - - if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY)) - return; - readable = pa_stream_readable_size(u->stream); - while (readable > 0) { - size_t nbytes = 0; - if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) { - pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context))); - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - return; - } - - if (PA_LIKELY(p)) { - /* we have valid data */ - memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true); - memchunk.length = nbytes; - memchunk.index = 0; - - pa_source_post(u->source, &memchunk); - pa_memblock_unref_fixed(memchunk.memblock); - } else { - size_t bytes_to_generate = nbytes; - - /* we have a hole. generate silence */ - memchunk = u->source->silence; - pa_memblock_ref(memchunk.memblock); - - while (bytes_to_generate > 0) { - if (bytes_to_generate < memchunk.length) - memchunk.length = bytes_to_generate; + switch (code) { - pa_source_post(u->source, &memchunk); - bytes_to_generate -= memchunk.length; + /* Delivered from the IO thread, handled in the main thread. */ + case TUNNEL_MESSAGE_CREATE_SOURCE: + pa_log_debug("Creating source."); + if (source_create(u) < 0) { + pa_module_unload_request(u->module, true); + return -1; } + pa_source_new_data_done(u->source_data); + pa_xfree(u->source_data); + u->source_data = NULL; + pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL); + break; - pa_memblock_unref(memchunk.memblock); - } + /* Delivered from the main thread, handled in the IO thread. */ + case TUNNEL_MESSAGE_CREATE_STREAM: + pa_log_debug("Creating stream."); + if (stream_create(u) < 0) { + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + return -1; + } + u->connected = true; + break; - pa_stream_drop(u->stream); - readable -= nbytes; } + + return 0; } static void thread_func(void *userdata) { @@ -259,6 +250,34 @@ finish: pa_log_debug("Thread shutting down"); } +static void context_state_cb(pa_context *c, void *userdata) { + struct userdata *u = userdata; + pa_assert(u); + + switch (pa_context_get_state(c)) { + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + case PA_CONTEXT_READY: { + pa_log_debug("Context successfully connected."); + pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE, u, 0, NULL, NULL); + break; + } + case PA_CONTEXT_FAILED: + pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context))); + u->connected = false; + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + break; + case PA_CONTEXT_TERMINATED: + pa_log_debug("Context terminated."); + u->connected = false; + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + break; + } +} + static void stream_state_cb(pa_stream *stream, void *userdata) { struct userdata *u = userdata; @@ -289,76 +308,110 @@ static void stream_state_cb(pa_stream *stream, void *userdata) { } } -static void context_state_cb(pa_context *c, void *userdata) { +static void stream_read_cb(pa_stream *s, size_t length, void *userdata) { struct userdata *u = userdata; + u->new_data = true; +} + +/* called from io context to read samples from the stream into our source */ +static void read_new_samples(struct userdata *u) { + const void *p; + size_t readable = 0; + pa_memchunk memchunk; + pa_assert(u); + u->new_data = false; - switch (pa_context_get_state(c)) { - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - case PA_CONTEXT_READY: { - pa_proplist *proplist; - pa_buffer_attr bufferattr; - pa_usec_t requested_latency; - char *username = pa_get_user_name_malloc(); - char *hostname = pa_get_host_name_malloc(); - /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis at lazus' */ - char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); - pa_xfree(username); - pa_xfree(hostname); - - pa_log_debug("Connection successful. Creating stream."); - pa_assert(!u->stream); - - proplist = tunnel_new_proplist(u); - u->stream = pa_stream_new_with_proplist(u->context, - stream_name, - &u->source->sample_spec, - &u->source->channel_map, - proplist); - pa_proplist_free(proplist); - pa_xfree(stream_name); + pa_memchunk_reset(&memchunk); - if (!u->stream) { - pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context))); - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - return; - } + if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY)) + return; - requested_latency = pa_source_get_requested_latency_within_thread(u->source); - if (requested_latency == (uint32_t) -1) - requested_latency = u->source->thread_info.max_latency; + readable = pa_stream_readable_size(u->stream); + while (readable > 0) { + size_t nbytes = 0; + if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) { + pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context))); + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + return; + } - reset_bufferattr(&bufferattr); - bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec); + if (PA_LIKELY(p)) { + /* we have valid data */ + memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true); + memchunk.length = nbytes; + memchunk.index = 0; - pa_stream_set_state_callback(u->stream, stream_state_cb, userdata); - pa_stream_set_read_callback(u->stream, stream_read_cb, userdata); - if (pa_stream_connect_record(u->stream, - u->remote_source_name, - &bufferattr, - PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED) < 0) { - pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context))); - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); + pa_source_post(u->source, &memchunk); + pa_memblock_unref_fixed(memchunk.memblock); + } else { + size_t bytes_to_generate = nbytes; + + /* we have a hole. generate silence */ + memchunk = u->source->silence; + pa_memblock_ref(memchunk.memblock); + + while (bytes_to_generate > 0) { + if (bytes_to_generate < memchunk.length) + memchunk.length = bytes_to_generate; + + pa_source_post(u->source, &memchunk); + bytes_to_generate -= memchunk.length; } - u->connected = true; - pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL, NULL); - break; + + pa_memblock_unref(memchunk.memblock); } - case PA_CONTEXT_FAILED: - pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context))); - u->connected = false; - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - break; - case PA_CONTEXT_TERMINATED: - pa_log_debug("Context terminated."); - u->connected = false; - u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); - break; + + pa_stream_drop(u->stream); + readable -= nbytes; + } +} + +/* Handled in the IO thread. */ +static int stream_create(struct userdata *u) { + pa_proplist *proplist; + pa_buffer_attr bufferattr; + pa_usec_t requested_latency; + char *username = pa_get_user_name_malloc(); + char *hostname = pa_get_host_name_malloc(); + /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis at lazus' */ + char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname); + pa_xfree(username); + pa_xfree(hostname); + + pa_assert(!u->stream); + + proplist = tunnel_new_proplist(u); + u->stream = pa_stream_new_with_proplist(u->context, + stream_name, + &u->source->sample_spec, + &u->source->channel_map, + proplist); + pa_proplist_free(proplist); + pa_xfree(stream_name); + + if (!u->stream) { + pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context))); + return -1; } + + requested_latency = pa_source_get_requested_latency_within_thread(u->source); + if (requested_latency == (pa_usec_t) -1) + requested_latency = u->source->thread_info.max_latency; + + reset_bufferattr(&bufferattr); + bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec); + + pa_stream_set_state_callback(u->stream, stream_state_cb, u); + pa_stream_set_read_callback(u->stream, stream_read_cb, u); + if (pa_stream_connect_record(u->stream, + u->remote_source_name, + &bufferattr, + PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_START_CORKED) < 0) { + pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context))); + return -1; + } + return 0; } static void source_update_requested_latency_cb(pa_source *s) { @@ -434,13 +487,6 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t return 0; } - - /* Delivered from the IO thread, handled in the main thread. */ - case SOURCE_MESSAGE_PUT: { - if (u->connected) - pa_source_put(u->source); - return 0; - } } return pa_source_process_msg(o, code, data, offset, chunk); } @@ -479,10 +525,30 @@ static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_ return 0; } +/* Handled in the main thread. */ +static int source_create(struct userdata *u) { + if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) { + pa_log("Failed to create source."); + return -1; + } + + u->source->userdata = u; + u->source->parent.process_msg = source_process_msg_cb; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; + u->source->update_requested_latency = source_update_requested_latency_cb; + + /* set thread message queue */ + pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); + pa_source_set_rtpoll(u->source, u->rtpoll); + + pa_source_put(u->source); + + return 0; +} + int pa__init(pa_module *m) { struct userdata *u = NULL; pa_modargs *ma = NULL; - pa_source_new_data source_data; pa_sample_spec ss; pa_channel_map map; const char *remote_server = NULL; @@ -535,45 +601,36 @@ int pa__init(pa_module *m) { * only works because it calls pa_asyncmsq_process_one(). */ u->rtpoll = pa_rtpoll_new(); - /* Create source */ - pa_source_new_data_init(&source_data); - source_data.driver = __FILE__; - source_data.module = m; + /* Create source data */ + u->source_data = pa_xnew(pa_source_new_data, 1); + pa_source_new_data_init(u->source_data); + u->source_data->driver = __FILE__; + u->source_data->module = m; default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server); source_name = pa_modargs_get_value(ma, "source_name", default_source_name); - pa_source_new_data_set_name(&source_data, source_name); - pa_source_new_data_set_sample_spec(&source_data, &ss); - pa_source_new_data_set_channel_map(&source_data, &map); + pa_source_new_data_set_name(u->source_data, source_name); + pa_source_new_data_set_sample_spec(u->source_data, &ss); + pa_source_new_data_set_channel_map(u->source_data, &map); - pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "sound"); - pa_proplist_setf(source_data.proplist, + pa_proplist_sets(u->source_data->proplist, PA_PROP_DEVICE_CLASS, "sound"); + pa_proplist_setf(u->source_data->proplist, PA_PROP_DEVICE_DESCRIPTION, _("Tunnel to %s/%s"), remote_server, pa_strempty(u->remote_source_name)); - if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) { + if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); - pa_source_new_data_done(&source_data); - goto fail; - } - if (!(u->source = pa_source_new(m->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) { - pa_log("Failed to create source."); - pa_source_new_data_done(&source_data); goto fail; } - pa_source_new_data_done(&source_data); - u->source->userdata = u; - u->source->parent.process_msg = source_process_msg_cb; - u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; - u->source->update_requested_latency = source_update_requested_latency_cb; - - pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); - pa_source_set_rtpoll(u->source, u->rtpoll); + /* setup initial message handler */ + u->msg = pa_msgobject_new(tunnel_msg); + u->msg->parent.process_msg = tunnel_process_msg_cb; + /* start IO thread */ if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) { pa_log("Failed to create thread."); goto fail; @@ -629,6 +686,14 @@ void pa__done(pa_module *m) { if (u->remote_server) pa_xfree(u->remote_server); + if (u->msg) + pa_xfree(u->msg); + + if (u->source_data) { + pa_source_new_data_done(u->source_data); + pa_xfree(u->source_data); + } + if (u->source) pa_source_unref(u->source); diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c index 960f8533..9ea1ab33 100644 --- a/src/modules/module-tunnel.c +++ b/src/modules/module-tunnel.c @@ -131,7 +131,8 @@ enum { SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_REMOTE_SUSPEND, SINK_MESSAGE_UPDATE_LATENCY, - SINK_MESSAGE_POST + SINK_MESSAGE_POST, + SINK_MESSAGE_CREATED, }; #define DEFAULT_TLENGTH_MSEC 150 @@ -142,7 +143,7 @@ enum { enum { SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX, SOURCE_MESSAGE_REMOTE_SUSPEND, - SOURCE_MESSAGE_UPDATE_LATENCY + SOURCE_MESSAGE_UPDATE_LATENCY, }; #define DEFAULT_FRAGSIZE_MSEC 25 @@ -196,10 +197,13 @@ struct userdata { char *server_name; #ifdef TUNNEL_SINK + pa_sink_new_data *sink_data; char *sink_name; pa_sink *sink; size_t requested_bytes; + bool sink_created; #else + pa_source_new_data *source_data; char *source_name; pa_source *source; pa_mcalign *mcalign; @@ -241,6 +245,11 @@ struct userdata { }; static void request_latency(struct userdata *u); +#ifdef TUNNEL_SINK +static int sink_create(struct userdata *u); +#else +static int source_create(struct userdata *u); +#endif /* Called from main context */ static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) { @@ -550,6 +559,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } + case SINK_MESSAGE_CREATED: + + u->sink_created = true; + return 0; + case SINK_MESSAGE_POST: /* OK, This might be a bit confusing. This message is @@ -717,7 +731,7 @@ static void thread_func(void *userdata) { int ret; #ifdef TUNNEL_SINK - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) + if (PA_UNLIKELY(u->sink_created && u->sink->thread_info.rewind_requested)) pa_sink_process_rewind(u->sink, 0); #endif @@ -1599,9 +1613,17 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t } #ifdef TUNNEL_SINK - pa_sink_put(u->sink); + if (sink_create(u) < 0) + goto fail; + pa_sink_new_data_done(u->sink_data); + pa_xfree(u->sink_data); + u->sink_data = NULL; #else - pa_source_put(u->source); + if (source_create(u) < 0) + goto fail; + pa_source_new_data_done(u->source_data); + pa_xfree(u->source_data); + u->source_data = NULL; #endif /* Starting with protocol version 13 the MSB of the version tag @@ -1922,6 +1944,58 @@ static void sink_set_mute(pa_sink *sink) { #endif +#ifdef TUNNEL_SINK +/* Called from main context */ +static int sink_create(struct userdata *u) { + if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_NETWORK|PA_SINK_LATENCY))) { + pa_log("Failed to create sink."); + return -1; + } + + u->sink->userdata = u; + u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + pa_sink_set_set_volume_callback(u->sink, sink_set_volume); + pa_sink_set_set_mute_callback(u->sink, sink_set_mute); + + u->sink->refresh_volume = u->sink->refresh_muted = false; + +/* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */ + + pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); + pa_sink_set_rtpoll(u->sink, u->rtpoll); + + pa_sink_put(u->sink); + + pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_CREATED, NULL, 0, NULL, NULL); + + return 0; +} +#else +/* Called from main context */ +static int source_create(struct userdata *u) { + if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY))) { + pa_log("Failed to create source."); + return -1; + } + + u->source->userdata = u; + u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb; + +/* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */ + + pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); + pa_source_set_rtpoll(u->source, u->rtpoll); + + u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec)); + + pa_source_put(u->source); + + return 0; +} +#endif + int pa__init(pa_module*m) { pa_modargs *ma = NULL; struct userdata *u = NULL; @@ -1930,11 +2004,6 @@ int pa__init(pa_module*m) { pa_sample_spec ss; pa_channel_map map; char *dn = NULL; -#ifdef TUNNEL_SINK - pa_sink_new_data data; -#else - pa_source_new_data data; -#endif bool automatic; #ifdef HAVE_X11 xcb_connection_t *xcb = NULL; @@ -2130,90 +2199,49 @@ int pa__init(pa_module*m) { pa_socket_client_set_callback(u->client, on_connection, u); #ifdef TUNNEL_SINK + u->sink_data = pa_xnew(pa_sink_new_data, 1); + pa_sink_new_data_init(u->sink_data); if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL)))) dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name); - pa_sink_new_data_init(&data); - data.driver = __FILE__; - data.module = m; - data.namereg_fail = false; - pa_sink_new_data_set_name(&data, dn); - pa_sink_new_data_set_sample_spec(&data, &ss); - pa_sink_new_data_set_channel_map(&data, &map); - pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name); - pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name); + u->sink_data->driver = __FILE__; + u->sink_data->module = m; + u->sink_data->namereg_fail = false; + pa_sink_new_data_set_name(u->sink_data, dn); + pa_sink_new_data_set_sample_spec(u->sink_data, &ss); + pa_sink_new_data_set_channel_map(u->sink_data, &map); + pa_proplist_setf(u->sink_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name); + pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.server", u->server_name); if (u->sink_name) - pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name); + pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.sink", u->sink_name); - if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) { + if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); - pa_sink_new_data_done(&data); - goto fail; - } - - u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY); - pa_sink_new_data_done(&data); - - if (!u->sink) { - pa_log("Failed to create sink."); goto fail; } - - u->sink->parent.process_msg = sink_process_msg; - u->sink->userdata = u; - u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; - pa_sink_set_set_volume_callback(u->sink, sink_set_volume); - pa_sink_set_set_mute_callback(u->sink, sink_set_mute); - - u->sink->refresh_volume = u->sink->refresh_muted = false; - -/* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */ - - pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); - pa_sink_set_rtpoll(u->sink, u->rtpoll); - #else + u->source_data = pa_xnew(pa_source_new_data, 1); + pa_source_new_data_init(u->source_data); if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL)))) dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name); - pa_source_new_data_init(&data); - data.driver = __FILE__; - data.module = m; - data.namereg_fail = false; - pa_source_new_data_set_name(&data, dn); - pa_source_new_data_set_sample_spec(&data, &ss); - pa_source_new_data_set_channel_map(&data, &map); - pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name); - pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name); + u->source_data->driver = __FILE__; + u->source_data->module = m; + u->source_data->namereg_fail = false; + pa_source_new_data_set_name(u->source_data, dn); + pa_source_new_data_set_sample_spec(u->source_data, &ss); + pa_source_new_data_set_channel_map(u->source_data, &map); + pa_proplist_setf(u->source_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name); + pa_proplist_sets(u->source_data->proplist, "tunnel.remote.server", u->server_name); if (u->source_name) - pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name); + pa_proplist_sets(u->source_data->proplist, "tunnel.remote.source", u->source_name); - if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) { + if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); - pa_source_new_data_done(&data); - goto fail; - } - - u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY); - pa_source_new_data_done(&data); - - if (!u->source) { - pa_log("Failed to create source."); goto fail; } - - u->source->parent.process_msg = source_process_msg; - u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb; - u->source->userdata = u; - -/* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */ - - pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); - pa_source_set_rtpoll(u->source, u->rtpoll); - - u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec)); #endif u->time_event = NULL; @@ -2300,6 +2328,18 @@ void pa__done(pa_module*m) { pa_source_unref(u->source); #endif +#ifdef TUNNEL_SINK + if (u->sink_data) { + pa_sink_new_data_done(u->sink_data); + pa_xfree(u->sink_data); + } +#else + if (u->source_data) { + pa_source_new_data_done(u->source_data); + pa_xfree(u->source_data); + } +#endif + if (u->rtpoll) pa_rtpoll_free(u->rtpoll); -- 2.14.4