Signed-off-by: Pierre Ossman <ossman at cendio.se> --- src/modules/module-tunnel-sink-new.c | 158 +++++++++++++++++++++++++++++++++ src/modules/module-tunnel-source-new.c | 151 +++++++++++++++++++++++++++++++ 2 files changed, 309 insertions(+) diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c index 6989f73..f9e38e4 100644 --- a/src/modules/module-tunnel-sink-new.c +++ b/src/modules/module-tunnel-sink-new.c @@ -27,6 +27,7 @@ #include <pulse/stream.h> #include <pulse/mainloop.h> #include <pulse/introspect.h> +#include <pulse/subscribe.h> #include <pulse/error.h> #include <pulsecore/core.h> @@ -64,6 +65,8 @@ PA_MODULE_USAGE( static void stream_state_cb(pa_stream *stream, void *userdata); static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata); static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata); +static void sink_input_info_cb(pa_context *context, const pa_sink_input_info *sii, int eol, void *userdata); +static void subscribe_cb(pa_context *context, pa_subscription_event_type_t t, uint32_t idx, void *userdata); static void context_state_cb(pa_context *c, void *userdata); static void sink_update_requested_latency_cb(pa_sink *s); @@ -85,6 +88,10 @@ struct userdata { char *cookie_file; char *remote_server; char *remote_sink_name; + + bool has_volume; + bool mute; + pa_cvolume volume; }; static const char* const valid_modargs[] = { @@ -164,6 +171,7 @@ static void thread_func(void *userdata) { goto fail; } + pa_context_set_subscribe_callback(u->context, subscribe_cb, u); pa_context_set_state_callback(u->context, context_state_cb, u); if (pa_context_connect(u->context, u->remote_server, @@ -319,6 +327,7 @@ static void context_state_cb(pa_context *c, void *userdata) { pa_proplist *proplist; pa_buffer_attr bufferattr; pa_usec_t requested_latency; + pa_operation *operation; char *username = pa_get_user_name_malloc(); char *hostname = pa_get_host_name_malloc(); /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for lynxis at lazus' */ @@ -368,6 +377,19 @@ static void context_state_cb(pa_context *c, void *userdata) { u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); } u->connected = true; + + operation = pa_context_subscribe(u->context, + PA_SUBSCRIPTION_MASK_SINK_INPUT, + NULL, NULL); + if (operation) + pa_operation_unref(operation); + + operation = pa_context_get_sink_input_info(u->context, + pa_stream_get_index(u->stream), + sink_input_info_cb, u); + if (operation) + pa_operation_unref(operation); + break; } case PA_CONTEXT_FAILED: @@ -383,6 +405,49 @@ static void context_state_cb(pa_context *c, void *userdata) { } } +static void sink_input_info_cb(pa_context *context, const pa_sink_input_info *sii, int eol, void *userdata) { + struct userdata *u = userdata; + pa_assert(u); + + if (eol) + return; + + u->has_volume = sii->has_volume; + u->mute = sii->mute; + u->volume = sii->volume; + + if (!sii->has_volume) + return; + + if ((sii->mute == u->sink->muted) && + pa_cvolume_equal(&sii->volume, &u->sink->real_volume)) + return; + + pa_sink_update_volume_and_mute(u->sink); +} + +static void subscribe_cb(pa_context *context, pa_subscription_event_type_t t, uint32_t idx, void *userdata) { + struct userdata *u = userdata; + pa_operation *operation; + + pa_assert(u); + + if (!u->connected) + return; + + if (idx != pa_stream_get_index(u->stream)) + return; + + if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) != PA_SUBSCRIPTION_EVENT_CHANGE) + return; + + operation = pa_context_get_sink_input_info(u->context, + pa_stream_get_index(u->stream), + sink_input_info_cb, u); + if (operation) + pa_operation_unref(operation); +} + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; pa_operation *operation; @@ -424,6 +489,93 @@ static void sink_update_requested_latency_cb(pa_sink *s) { } } +static void sink_get_volume_cb(pa_sink *s) { + struct userdata *u; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->has_volume) + return; + + s->real_volume = u->volume; +} + +static void sink_set_volume_cb(pa_sink *s) { + struct userdata *u; + pa_operation *operation; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->connected) + return; + + /* We have a race here since we create the sink before we are + * connected and fully updated with the server side volume. E.g. + * streams might be moved to this sink, and have their volumes + * calculated incorrectly because s->real_volume isn't properly + * set yet. We then propagate this error back in to the sink and + * over to the other end of the tunnel. Avoid this by simply + * ignoring volume changes until we have gotten the proper volume + * from the server. */ + if (!u->has_volume) + return; + + operation = pa_context_set_sink_input_volume(u->context, + pa_stream_get_index(u->stream), + &s->real_volume, + NULL, NULL); + if (operation) + pa_operation_unref(operation); +} + +static void sink_write_volume_cb(pa_sink *s) { + /* A bit silly but this is to make sure we only access the server + * connection on the IO thread. */ + sink_set_volume_cb(s); +} + +static int sink_get_mute_cb(pa_sink *s, bool *mute) { + struct userdata *u; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->has_volume) + return -1; + + *mute = u->mute; + + return 0; +} + +static void sink_set_mute_cb(pa_sink *s) { + struct userdata *u; + pa_operation *operation; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->connected) + return; + + /* See sink_set_volume_cb() */ + if (!u->has_volume) + return; + + operation = pa_context_set_sink_input_mute(u->context, + pa_stream_get_index(u->stream), + s->muted, + NULL, NULL); + if (operation) + pa_operation_unref(operation); +} + static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; @@ -560,6 +712,12 @@ int pa__init(pa_module *m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); + pa_sink_set_get_volume_callback(u->sink, sink_get_volume_cb); + pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); + pa_sink_set_write_volume_callback(u->sink, sink_write_volume_cb); + pa_sink_set_get_mute_callback(u->sink, sink_get_mute_cb); + pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); + pa_sink_enable_decibel_volume(u->sink, true); /* set thread message queue */ pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq); diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c index 1fd0e69..20a1f2e 100644 --- a/src/modules/module-tunnel-source-new.c +++ b/src/modules/module-tunnel-source-new.c @@ -27,6 +27,7 @@ #include <pulse/stream.h> #include <pulse/mainloop.h> #include <pulse/introspect.h> +#include <pulse/subscribe.h> #include <pulse/error.h> #include <pulsecore/core.h> @@ -62,6 +63,8 @@ PA_MODULE_USAGE( static void stream_state_cb(pa_stream *stream, void *userdata); static void stream_read_cb(pa_stream *s, size_t length, void *userdata); +static void source_output_info_cb(pa_context *context, const pa_source_output_info *soo, int eol, void *userdata); +static void subscribe_cb(pa_context *context, pa_subscription_event_type_t t, uint32_t idx, void *userdata); static void context_state_cb(pa_context *c, void *userdata); static void source_update_requested_latency_cb(pa_source *s); @@ -83,6 +86,10 @@ struct userdata { char *cookie_file; char *remote_server; char *remote_source_name; + + bool has_volume; + bool mute; + pa_cvolume volume; }; static const char* const valid_modargs[] = { @@ -213,6 +220,7 @@ static void thread_func(void *userdata) { goto fail; } + pa_context_set_subscribe_callback(u->context, subscribe_cb, u); pa_context_set_state_callback(u->context, context_state_cb, u); if (pa_context_connect(u->context, u->remote_server, @@ -299,6 +307,7 @@ static void context_state_cb(pa_context *c, void *userdata) { pa_proplist *proplist; pa_buffer_attr bufferattr; pa_usec_t requested_latency; + pa_operation *operation; char *username = pa_get_user_name_malloc(); char *hostname = pa_get_host_name_malloc(); /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for lynxis at lazus' */ @@ -341,6 +350,19 @@ static void context_state_cb(pa_context *c, void *userdata) { u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); } u->connected = true; + + operation = pa_context_subscribe(u->context, + PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT, + NULL, NULL); + if (operation) + pa_operation_unref(operation); + + operation = pa_context_get_source_output_info(u->context, + pa_stream_get_index(u->stream), + source_output_info_cb, u); + if (operation) + pa_operation_unref(operation); + break; } case PA_CONTEXT_FAILED: @@ -356,6 +378,49 @@ static void context_state_cb(pa_context *c, void *userdata) { } } +static void source_output_info_cb(pa_context *context, const pa_source_output_info *soo, int eol, void *userdata) { + struct userdata *u = userdata; + pa_assert(u); + + if (eol) + return; + + u->has_volume = soo->has_volume; + u->mute = soo->mute; + u->volume = soo->volume; + + if (!soo->has_volume) + return; + + if ((soo->mute == u->source->muted) && + pa_cvolume_equal(&soo->volume, &u->source->real_volume)) + return; + + pa_source_update_volume_and_mute(u->source); +} + +static void subscribe_cb(pa_context *context, pa_subscription_event_type_t t, uint32_t idx, void *userdata) { + struct userdata *u = userdata; + pa_operation *operation; + + pa_assert(u); + + if (!u->connected) + return; + + if (idx != pa_stream_get_index(u->stream)) + return; + + if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) != PA_SUBSCRIPTION_EVENT_CHANGE) + return; + + operation = pa_context_get_source_output_info(u->context, + pa_stream_get_index(u->stream), + source_output_info_cb, u); + if (operation) + pa_operation_unref(operation); +} + static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; pa_operation *operation; @@ -393,6 +458,86 @@ static void source_update_requested_latency_cb(pa_source *s) { } } +static void source_get_volume_cb(pa_source *s) { + struct userdata *u; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->has_volume) + return; + + s->real_volume = u->volume; +} + +static void source_set_volume_cb(pa_source *s) { + struct userdata *u; + pa_operation *operation; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->connected) + return; + + /* See sink_set_volume_cb() in the sink tunnel module */ + if (!u->has_volume) + return; + + operation = pa_context_set_source_output_volume(u->context, + pa_stream_get_index(u->stream), + &s->real_volume, + NULL, NULL); + if (operation) + pa_operation_unref(operation); +} + +static void source_write_volume_cb(pa_source *s) { + /* A bit silly but this is to make sure we only access the server + * connection on the IO thread. */ + source_set_volume_cb(s); +} + +static int source_get_mute_cb(pa_source *s, bool *mute) { + struct userdata *u; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->has_volume) + return -1; + + *mute = u->mute; + + return 0; +} + +static void source_set_mute_cb(pa_source *s) { + struct userdata *u; + pa_operation *operation; + + pa_assert(s); + u = s->userdata; + pa_assert(u); + + if (!u->connected) + return; + + /* See source_set_volume_cb() */ + if (!u->has_volume) + return; + + operation = pa_context_set_source_output_mute(u->context, + pa_stream_get_index(u->stream), + s->muted, + NULL, NULL); + if (operation) + pa_operation_unref(operation); +} + static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SOURCE(o)->userdata; @@ -532,6 +677,12 @@ int pa__init(pa_module *m) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg_cb; u->source->update_requested_latency = source_update_requested_latency_cb; + pa_source_set_get_volume_callback(u->source, source_get_volume_cb); + pa_source_set_set_volume_callback(u->source, source_set_volume_cb); + pa_source_set_write_volume_callback(u->source, source_write_volume_cb); + pa_source_set_get_mute_callback(u->source, source_get_mute_cb); + pa_source_set_set_mute_callback(u->source, source_set_mute_cb); + pa_source_enable_decibel_volume(u->source, true); pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); -- 2.5.5