There are no behaviour changes, the code from almost all the SET_STATE handlers is moved with minimal changes to the newly introduced set_state_in_io_thread() callback. The only exception is module-tunnel, which has to call pa_sink_render() after pa_sink.thread_info.state has been updated. The set_state_in_io_thread() callback is called before updating that variable, so moving the SET_STATE handler code to the callback isn't possible. The purpose of this change is to make it easier to get state change handling right in modules. Hooking to the SET_STATE messages in modules required care in calling pa_sink/source_process_msg() at the right time (or not calling it at all, as was the case on resume failures), and there were a few bugs (fixed before this patch). Now the core takes care of ordering things correctly. Another motivation for this change is that there was some talk about adding a suspend_cause variable to pa_sink/source.thread_info. The variable would be updated in the core SET_STATE handler, but that would not work with the old design, because in case of resume failures modules didn't call the core message handler. --- src/modules/alsa/alsa-sink.c | 89 ++++++++------ src/modules/alsa/alsa-source.c | 89 ++++++++------ src/modules/bluetooth/module-bluez4-device.c | 172 ++++++++++++++------------ src/modules/bluetooth/module-bluez5-device.c | 174 ++++++++++++++------------- src/modules/echo-cancel/module-echo-cancel.c | 31 +++-- src/modules/module-combine-sink.c | 33 +++-- src/modules/module-equalizer-sink.c | 30 +++-- src/modules/module-esound-sink.c | 59 +++++---- src/modules/module-ladspa-sink.c | 30 +++-- src/modules/module-null-sink.c | 25 ++-- src/modules/module-null-source.c | 21 ++-- src/modules/module-pipe-sink.c | 45 ++++--- src/modules/module-remap-sink.c | 30 +++-- src/modules/module-sine-source.c | 21 ++-- src/modules/module-solaris.c | 126 ++++++++++--------- src/modules/module-tunnel-sink-new.c | 48 +++++--- src/modules/module-tunnel-source-new.c | 48 +++++--- src/modules/module-virtual-sink.c | 30 +++-- src/modules/module-virtual-surround-sink.c | 30 +++-- src/modules/oss/module-oss.c | 152 ++++++++++++----------- src/modules/raop/raop-sink.c | 121 ++++++++++--------- src/pulsecore/sink.c | 8 ++ src/pulsecore/sink.h | 30 +++-- src/pulsecore/source.c | 8 ++ src/pulsecore/source.h | 32 +++-- 25 files changed, 849 insertions(+), 633 deletions(-) diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c index dca8f6939..9e335374c 100644 --- a/src/modules/alsa/alsa-sink.c +++ b/src/modules/alsa/alsa-sink.c @@ -1184,46 +1184,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } - - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: { - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - suspend(u); - - break; - } - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: { - int r; - - if (u->sink->thread_info.state == PA_SINK_INIT) { - if (build_pollfd(u) < 0) - /* FIXME: This will cause an assertion failure in - * pa_sink_put(), because with the current design - * pa_sink_put() is not allowed to fail. */ - return -PA_ERR_IO; - } - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - if ((r = unsuspend(u)) < 0) - return r; - } - - break; - } - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - - break; } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -1248,6 +1208,54 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t new_stat return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: { + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + suspend(u); + + break; + } + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: { + int r; + + if (u->sink->thread_info.state == PA_SINK_INIT) { + if (build_pollfd(u) < 0) + /* FIXME: This will cause an assertion failure, because + * with the current design pa_sink_put() is not allowed + * to fail and pa_sink_put() has no fallback code that + * would start the sink suspended if opening the device + * fails. */ + return -PA_ERR_IO; + } + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + if ((r = unsuspend(u)) < 0) + return r; + } + + break; + } + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) { struct userdata *u = snd_mixer_elem_get_callback_private(elem); @@ -2360,6 +2368,7 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca if (u->use_tsched) u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->ucm_context) u->sink->set_port = sink_set_port_ucm_cb; else diff --git a/src/modules/alsa/alsa-source.c b/src/modules/alsa/alsa-source.c index b3adc7e79..312b2596a 100644 --- a/src/modules/alsa/alsa-source.c +++ b/src/modules/alsa/alsa-source.c @@ -1039,46 +1039,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return 0; } - - case PA_SOURCE_MESSAGE_SET_STATE: - - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SOURCE_SUSPENDED: { - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - - suspend(u); - - break; - } - - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: { - int r; - - if (u->source->thread_info.state == PA_SOURCE_INIT) { - if (build_pollfd(u) < 0) - /* FIXME: This will cause an assertion failure in - * pa_source_put(), because with the current design - * pa_source_put() is not allowed to fail. */ - return -PA_ERR_IO; - } - - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - if ((r = unsuspend(u)) < 0) - return r; - } - - break; - } - - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; - } - - break; } return pa_source_process_msg(o, code, data, offset, chunk); @@ -1103,6 +1063,54 @@ static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t ne return 0; } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: { + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + + suspend(u); + + break; + } + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: { + int r; + + if (u->source->thread_info.state == PA_SOURCE_INIT) { + if (build_pollfd(u) < 0) + /* FIXME: This will cause an assertion failure, because + * with the current design pa_source_put() is not allowed + * to fail and pa_source_put() has no fallback code that + * would start the source suspended if opening the device + * fails. */ + return -PA_ERR_IO; + } + + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + if ((r = unsuspend(u)) < 0) + return r; + } + + break; + } + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; + } + + return 0; +} + static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) { struct userdata *u = snd_mixer_elem_get_callback_private(elem); @@ -2036,6 +2044,7 @@ pa_source *pa_alsa_source_new(pa_module *m, pa_modargs *ma, const char*driver, p if (u->use_tsched) u->source->update_requested_latency = source_update_requested_latency_cb; u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; if (u->ucm_context) u->source->set_port = source_set_port_ucm_cb; else diff --git a/src/modules/bluetooth/module-bluez4-device.c b/src/modules/bluetooth/module-bluez4-device.c index c6baee84c..85eb7986b 100644 --- a/src/modules/bluetooth/module-bluez4-device.c +++ b/src/modules/bluetooth/module-bluez4-device.c @@ -386,45 +386,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ - if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) - break; - - /* Stop the device if the source is suspended as well */ - if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) - /* We deliberately ignore whether stopping - * actually worked. Since the stream_fd is - * closed it doesn't really matter */ - bt_transport_release(u); - - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - if (u->sink->thread_info.state != PA_SINK_SUSPENDED) - break; - - /* Resume the device if the source was suspended as well */ - if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { - if (bt_transport_acquire(u, false) < 0) - return -1; - else - setup_stream(u); - } - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - break; - case PA_SINK_MESSAGE_GET_LATENCY: { if (u->read_smoother) { @@ -451,55 +412,61 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } -/* Run from IO thread */ -static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { - struct userdata *u = PA_SOURCE(o)->userdata; - - pa_assert(u->source == PA_SOURCE(o)); - pa_assert(u->transport); - - switch (code) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; - case PA_SOURCE_MESSAGE_SET_STATE: + pa_assert(s); + pa_assert_se(u = s->userdata); - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { + switch (new_state) { - case PA_SOURCE_SUSPENDED: - /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ - if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) - break; + case PA_SINK_SUSPENDED: + /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) + break; - /* Stop the device if the sink is suspended as well */ - if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) - bt_transport_release(u); + /* Stop the device if the source is suspended as well */ + if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) + /* We deliberately ignore whether stopping + * actually worked. Since the stream_fd is + * closed it doesn't really matter */ + bt_transport_release(u); - if (u->read_smoother) - pa_smoother_pause(u->read_smoother, pa_rtclock_now()); - break; + break; - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: - if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) - break; - - /* Resume the device if the sink was suspended as well */ - if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { - if (bt_transport_acquire(u, false) < 0) - return -1; - else - setup_stream(u); - } - /* We don't resume the smoother here. Instead we - * wait until the first packet arrives */ - break; + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + if (u->sink->thread_info.state != PA_SINK_SUSPENDED) + break; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + /* Resume the device if the source was suspended as well */ + if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { + if (bt_transport_acquire(u, false) < 0) + return -1; + else + setup_stream(u); } break; + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + +/* Run from IO thread */ +static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u = PA_SOURCE(o)->userdata; + + pa_assert(u->source == PA_SOURCE(o)); + pa_assert(u->transport); + + switch (code) { + case PA_SOURCE_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -519,6 +486,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: + /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ + if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + break; + + /* Stop the device if the sink is suspended as well */ + if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) + bt_transport_release(u); + + if (u->read_smoother) + pa_smoother_pause(u->read_smoother, pa_rtclock_now()); + break; + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) + break; + + /* Resume the device if the sink was suspended as well */ + if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { + if (bt_transport_acquire(u, false) < 0) + return -1; + else + setup_stream(u); + } + /* We don't resume the smoother here. Instead we + * wait until the first packet arrives */ + break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + break; + } + + return 0; +} + /* Called from main thread context */ static int device_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct bluetooth_msg *u = BLUETOOTH_MSG(obj); @@ -1591,6 +1605,7 @@ static int add_sink(struct userdata *u) { u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->set_port = sink_set_port_cb; } @@ -1663,6 +1678,7 @@ static int add_source(struct userdata *u) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->set_port = source_set_port_cb; } diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c index b83f0eafa..5e189ba24 100644 --- a/src/modules/bluetooth/module-bluez5-device.c +++ b/src/modules/bluetooth/module-bluez5-device.c @@ -891,48 +891,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SOURCE_SUSPENDED: - /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ - if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) - break; - - /* Stop the device if the sink is suspended as well */ - if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) - transport_release(u); - - if (u->read_smoother) - pa_smoother_pause(u->read_smoother, pa_rtclock_now()); - - break; - - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: - if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) - break; - - /* Resume the device if the sink was suspended as well */ - if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { - if (!setup_transport_and_stream(u)) - return -1; - } - - /* We don't resume the smoother here. Instead we - * wait until the first packet arrives */ - - break; - - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - break; - } - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -956,6 +914,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: + /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ + if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + break; + + /* Stop the device if the sink is suspended as well */ + if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) + transport_release(u); + + if (u->read_smoother) + pa_smoother_pause(u->read_smoother, pa_rtclock_now()); + + break; + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) + break; + + /* Resume the device if the sink was suspended as well */ + if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) + if (!setup_transport_and_stream(u)) + return -1; + + /* We don't resume the smoother here. Instead we + * wait until the first packet arrives */ + + break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + break; + } + + return 0; +} + /* Run from main thread */ static void source_set_volume_cb(pa_source *s) { uint16_t gain; @@ -1044,6 +1049,7 @@ static int add_source(struct userdata *u) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) { pa_source_set_set_volume_callback(u->source, source_set_volume_cb); @@ -1061,45 +1067,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ - if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) - break; - - /* Stop the device if the source is suspended as well */ - if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) - /* We deliberately ignore whether stopping - * actually worked. Since the stream_fd is - * closed it doesn't really matter */ - transport_release(u); - - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - if (u->sink->thread_info.state != PA_SINK_SUSPENDED) - break; - - /* Resume the device if the source was suspended as well */ - if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { - if (!setup_transport_and_stream(u)) - return -1; - } - - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - break; - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -1124,6 +1091,50 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: + /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) + break; + + /* Stop the device if the source is suspended as well */ + if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) + /* We deliberately ignore whether stopping + * actually worked. Since the stream_fd is + * closed it doesn't really matter */ + transport_release(u); + + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + if (u->sink->thread_info.state != PA_SINK_SUSPENDED) + break; + + /* Resume the device if the source was suspended as well */ + if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + if (!setup_transport_and_stream(u)) + return -1; + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + /* Run from main thread */ static void sink_set_volume_cb(pa_sink *s) { uint16_t gain; @@ -1213,6 +1224,7 @@ static int add_sink(struct userdata *u) { u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) { pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c index 7af2f4b22..893c41eeb 100644 --- a/src/modules/echo-cancel/module-echo-cancel.c +++ b/src/modules/echo-cancel/module-echo-cancel.c @@ -458,19 +458,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } - } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -526,6 +513,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from source I/O thread context */ static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -1926,6 +1930,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index 22800a8bb..bbd416b6f 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -718,6 +718,25 @@ static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + bool running; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + running = new_state == PA_SINK_RUNNING; + pa_atomic_store(&u->thread_info.running, running); + + if (running) + pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); + else + pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); + + return 0; +} + /* Called from IO context */ static void update_max_request(struct userdata *u) { size_t max_request = 0; @@ -859,19 +878,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: { - bool running = (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING); - - pa_atomic_store(&u->thread_info.running, running); - - if (running) - pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); - else - pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); - - break; - } - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t x, y, c; int64_t *delay = data; @@ -1426,6 +1432,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency; u->sink->userdata = u; diff --git a/src/modules/module-equalizer-sink.c b/src/modules/module-equalizer-sink.c index efe95b3fb..36029b389 100644 --- a/src/modules/module-equalizer-sink.c +++ b/src/modules/module-equalizer-sink.c @@ -267,18 +267,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of //+ pa_bytes_to_usec(u->latency * fs, ss) return 0; } - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -299,6 +287,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -1230,6 +1235,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c index d93ad9c5b..9fea2da74 100644 --- a/src/modules/module-esound-sink.c +++ b/src/modules/module-esound-sink.c @@ -141,32 +141,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - pa_smoother_pause(u->smoother, pa_rtclock_now()); - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) - pa_smoother_resume(u->smoother, pa_rtclock_now(), true); - - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t w, r; @@ -194,6 +168,38 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + pa_smoother_pause(u->smoother, pa_rtclock_now()); + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) + pa_smoother_resume(u->smoother, pa_rtclock_now(), true); + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + ; + } + + return 0; +} + static void thread_func(void *userdata) { struct userdata *u = userdata; int write_type = 0; @@ -611,6 +617,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->userdata = u; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c index a2db68e1c..de866d96a 100644 --- a/src/modules/module-ladspa-sink.c +++ b/src/modules/module-ladspa-sink.c @@ -374,18 +374,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of connect_control_ports(u); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -406,6 +394,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -1298,6 +1303,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index 3ace082df..16b0b6870 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -89,15 +89,6 @@ static int sink_process_msg( struct userdata *u = PA_SINK(o)->userdata; switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { - if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) - u->timestamp = pa_rtclock_now(); - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t now; @@ -111,6 +102,21 @@ static int sink_process_msg( return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { + if (PA_SINK_IS_OPENED(new_state)) + u->timestamp = pa_rtclock_now(); + } + + return 0; +} + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -297,6 +303,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; diff --git a/src/modules/module-null-source.c b/src/modules/module-null-source.c index 41f17bd98..ae67206a3 100644 --- a/src/modules/module-null-source.c +++ b/src/modules/module-null-source.c @@ -89,13 +89,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off struct userdata *u = PA_SOURCE(o)->userdata; switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING) - u->timestamp = pa_rtclock_now(); - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { pa_usec_t now; @@ -109,6 +102,19 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (new_state == PA_SOURCE_RUNNING) + u->timestamp = pa_rtclock_now(); + + return 0; +} + static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -229,6 +235,7 @@ int pa__init(pa_module*m) { u->latency_time = latency_time; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; u->source->userdata = u; diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 995785e1e..b2378059b 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -110,24 +110,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse struct userdata *u = PA_SINK(o)->userdata; switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { - if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) - u->timestamp = pa_rtclock_now(); - } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) { - if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) { - /* Clear potential FIFO error flag */ - u->fifo_error = false; - - /* Continuously dropping data (clear counter on entering suspended state. */ - if (u->bytes_dropped != 0) { - pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); - u->bytes_dropped = 0; - } - } - } - break; - case PA_SINK_MESSAGE_GET_LATENCY: if (u->use_system_clock_for_timing) { pa_usec_t now; @@ -153,6 +135,32 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { + if (PA_SINK_IS_OPENED(new_state)) + u->timestamp = pa_rtclock_now(); + } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) { + if (new_state == PA_SINK_SUSPENDED) { + /* Clear potential FIFO error flag */ + u->fifo_error = false; + + /* Continuously dropping data (clear counter on entering suspended state. */ + if (u->bytes_dropped != 0) { + pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); + u->bytes_dropped = 0; + } + } + } + + return 0; +} + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -505,6 +513,7 @@ int pa__init(pa_module *m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->use_system_clock_for_timing) u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c index ec6698795..56e7a85fd 100644 --- a/src/modules/module-remap-sink.c +++ b/src/modules/module-remap-sink.c @@ -94,18 +94,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -126,6 +114,23 @@ static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_s return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind(pa_sink *s) { struct userdata *u; @@ -411,6 +416,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency; u->sink->request_rewind = sink_request_rewind; u->sink->userdata = u; diff --git a/src/modules/module-sine-source.c b/src/modules/module-sine-source.c index f4c297384..39fb71ab3 100644 --- a/src/modules/module-sine-source.c +++ b/src/modules/module-sine-source.c @@ -87,13 +87,6 @@ static int source_process_msg( switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING) - u->timestamp = pa_rtclock_now(); - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { pa_usec_t now, left_to_fill; @@ -109,6 +102,19 @@ static int source_process_msg( return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (new_state == PA_SOURCE_RUNNING) + u->timestamp = pa_rtclock_now(); + + return 0; +} + static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -257,6 +263,7 @@ int pa__init(pa_module*m) { } u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; u->source->userdata = u; diff --git a/src/modules/module-solaris.c b/src/modules/module-solaris.c index a4960b8b7..e68f2a93d 100644 --- a/src/modules/module-solaris.c +++ b/src/modules/module-solaris.c @@ -390,51 +390,57 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case PA_SINK_MESSAGE_GET_LATENCY: *((int64_t*) data) = sink_get_latency(u, &PA_SINK(o)->sample_spec); return 0; + } - case PA_SINK_MESSAGE_SET_STATE: + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; - case PA_SINK_SUSPENDED: + pa_assert(s); + pa_assert_se(u = s->userdata); - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + switch (new_state) { - pa_smoother_pause(u->smoother, pa_rtclock_now()); + case PA_SINK_SUSPENDED: - if (!u->source || u->source_suspended) - suspend(u); + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - u->sink_suspended = true; - break; + pa_smoother_pause(u->smoother, pa_rtclock_now()); - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - pa_smoother_resume(u->smoother, pa_rtclock_now(), true); - - if (!u->source || u->source_suspended) { - bool mute; - if (unsuspend(u) < 0) - return -1; - u->sink->get_volume(u->sink); - if (u->sink->get_mute(u->sink, &mute) >= 0) - pa_sink_set_mute(u->sink, mute, false); - } - u->sink_suspended = false; - } - break; + if (!u->source || u->source_suspended) + suspend(u); - case PA_SINK_INVALID_STATE: - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - ; - } + u->sink_suspended = true; + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + pa_smoother_resume(u->smoother, pa_rtclock_now(), true); + if (!u->source || u->source_suspended) { + bool mute; + if (unsuspend(u) < 0) + return -1; + u->sink->get_volume(u->sink); + if (u->sink->get_mute(u->sink, &mute) >= 0) + pa_sink_set_mute(u->sink, mute, false); + } + u->sink_suspended = false; + } break; + + case PA_SINK_INVALID_STATE: + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + ; } - return pa_sink_process_msg(o, code, data, offset, chunk); + return 0; } static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { @@ -445,45 +451,51 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off case PA_SOURCE_MESSAGE_GET_LATENCY: *((pa_usec_t*) data) = source_get_latency(u, &PA_SOURCE(o)->sample_spec); return 0; + } - case PA_SOURCE_MESSAGE_SET_STATE: + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; - case PA_SOURCE_SUSPENDED: + pa_assert(s); + pa_assert_se(u = s->userdata); - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + switch (new_state) { - if (!u->sink || u->sink_suspended) - suspend(u); + case PA_SOURCE_SUSPENDED: - u->source_suspended = true; - break; + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: + if (!u->sink || u->sink_suspended) + suspend(u); - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - if (!u->sink || u->sink_suspended) { - if (unsuspend(u) < 0) - return -1; - u->source->get_volume(u->source); - } - u->source_suspended = false; - } - break; + u->source_suspended = true; + break; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + if (!u->sink || u->sink_suspended) { + if (unsuspend(u) < 0) + return -1; + u->source->get_volume(u->source); + } + u->source_suspended = false; } break; + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; + } - return pa_source_process_msg(o, code, data, offset, chunk); + return 0; } static void sink_set_volume(pa_sink *s) { @@ -960,6 +972,7 @@ int pa__init(pa_module *m) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); pa_source_set_rtpoll(u->source, u->rtpoll); @@ -1003,6 +1016,7 @@ int pa__init(pa_module *m) { pa_assert(u->sink); u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c index 8a67b81f1..313903372 100644 --- a/src/modules/module-tunnel-sink-new.c +++ b/src/modules/module-tunnel-sink-new.c @@ -429,28 +429,37 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of *((int64_t*) data) = remote_latency; return 0; } - case PA_SINK_MESSAGE_SET_STATE: - if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) - break; + } + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - case PA_SINK_SUSPENDED: { - cork_stream(u, true); - break; - } - case PA_SINK_IDLE: - case PA_SINK_RUNNING: { - cork_stream(u, false); - break; - } - case PA_SINK_INVALID_STATE: - case PA_SINK_INIT: - case PA_SINK_UNLINKED: - break; - } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) + return 0; + + switch (new_state) { + case PA_SINK_SUSPENDED: { + cork_stream(u, true); + break; + } + case PA_SINK_IDLE: + case PA_SINK_RUNNING: { + cork_stream(u, false); + break; + } + case PA_SINK_INVALID_STATE: + case PA_SINK_INIT: + case PA_SINK_UNLINKED: break; } - return pa_sink_process_msg(o, code, data, offset, chunk); + + return 0; } int pa__init(pa_module *m) { @@ -545,6 +554,7 @@ int pa__init(pa_module *m) { pa_sink_new_data_done(&sink_data); u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c index 7ad077111..d0a5414ad 100644 --- a/src/modules/module-tunnel-source-new.c +++ b/src/modules/module-tunnel-source-new.c @@ -428,28 +428,37 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t return 0; } - case PA_SOURCE_MESSAGE_SET_STATE: - if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) - break; + } + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - case PA_SOURCE_SUSPENDED: { - cork_stream(u, true); - break; - } - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: { - cork_stream(u, false); - break; - } - case PA_SOURCE_INVALID_STATE: - case PA_SOURCE_INIT: - case PA_SOURCE_UNLINKED: - break; - } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) + return 0; + + switch (new_state) { + case PA_SOURCE_SUSPENDED: { + cork_stream(u, true); + break; + } + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: { + cork_stream(u, false); + break; + } + case PA_SOURCE_INVALID_STATE: + case PA_SOURCE_INIT: + case PA_SOURCE_UNLINKED: break; } - return pa_source_process_msg(o, code, data, offset, chunk); + + return 0; } int pa__init(pa_module *m) { @@ -541,6 +550,7 @@ int pa__init(pa_module *m) { pa_source_new_data_done(&source_data); u->source->userdata = u; u->source->parent.process_msg = source_process_msg_cb; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); diff --git a/src/modules/module-virtual-sink.c b/src/modules/module-virtual-sink.c index ca6ce5696..68ad20076 100644 --- a/src/modules/module-virtual-sink.c +++ b/src/modules/module-virtual-sink.c @@ -106,18 +106,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -138,6 +126,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -556,6 +561,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-virtual-surround-sink.c b/src/modules/module-virtual-surround-sink.c index 00780d8bd..7c5e246cf 100644 --- a/src/modules/module-virtual-surround-sink.c +++ b/src/modules/module-virtual-surround-sink.c @@ -134,18 +134,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -166,6 +154,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -730,6 +735,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/oss/module-oss.c b/src/modules/oss/module-oss.c index 7d1b9f52b..d2551bcfc 100644 --- a/src/modules/oss/module-oss.c +++ b/src/modules/oss/module-oss.c @@ -643,8 +643,6 @@ fail: /* Called from IO context */ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; - bool do_trigger = false, quick = true; - pa_sink_state_t new_state; switch (code) { @@ -662,68 +660,73 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } + } - case PA_SINK_MESSAGE_SET_STATE: - new_state = PA_PTR_TO_UINT(data); + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch (new_state) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + bool do_trigger = false; + bool quick = true; - case PA_SINK_SUSPENDED: - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + pa_assert(s); + pa_assert_se(u = s->userdata); - if (!u->source || u->source_suspended) - suspend(u); + switch (new_state) { - do_trigger = true; + case PA_SINK_SUSPENDED: + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - u->sink_suspended = true; - break; + if (!u->source || u->source_suspended) + suspend(u); - case PA_SINK_IDLE: - case PA_SINK_RUNNING: + do_trigger = true; - if (u->sink->thread_info.state == PA_SINK_INIT) { - do_trigger = true; - quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state); - } + u->sink_suspended = true; + break; - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + case PA_SINK_IDLE: + case PA_SINK_RUNNING: - if (!u->source || u->source_suspended) { - if (unsuspend(u) < 0) - return -1; - quick = false; - } + if (u->sink->thread_info.state == PA_SINK_INIT) { + do_trigger = true; + quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state); + } - do_trigger = true; + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - u->out_mmap_current = 0; - u->out_mmap_saved_nfrags = 0; + if (!u->source || u->source_suspended) { + if (unsuspend(u) < 0) + return -1; + quick = false; + } - u->sink_suspended = false; - } + do_trigger = true; - break; + u->out_mmap_current = 0; + u->out_mmap_saved_nfrags = 0; - case PA_SINK_INVALID_STATE: - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - ; + u->sink_suspended = false; } break; + + case PA_SINK_INVALID_STATE: + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + ; } if (do_trigger) trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick); - return pa_sink_process_msg(o, code, data, offset, chunk); + return 0; } static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SOURCE(o)->userdata; - bool do_trigger = false, quick = true; - pa_source_state_t new_state; switch (code) { @@ -740,61 +743,68 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off *((int64_t*) data) = (int64_t)r; return 0; } + } - case PA_SOURCE_MESSAGE_SET_STATE: - new_state = PA_PTR_TO_UINT(data); + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch (new_state) { +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + bool do_trigger = false; + bool quick = true; - case PA_SOURCE_SUSPENDED: - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + pa_assert(s); + pa_assert_se(u = s->userdata); - if (!u->sink || u->sink_suspended) - suspend(u); + switch (new_state) { - do_trigger = true; + case PA_SOURCE_SUSPENDED: + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - u->source_suspended = true; - break; + if (!u->sink || u->sink_suspended) + suspend(u); - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: + do_trigger = true; - if (u->source->thread_info.state == PA_SOURCE_INIT) { - do_trigger = true; - quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state); - } + u->source_suspended = true; + break; - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: - if (!u->sink || u->sink_suspended) { - if (unsuspend(u) < 0) - return -1; - quick = false; - } + if (u->source->thread_info.state == PA_SOURCE_INIT) { + do_trigger = true; + quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state); + } - do_trigger = true; + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - u->in_mmap_current = 0; - u->in_mmap_saved_nfrags = 0; + if (!u->sink || u->sink_suspended) { + if (unsuspend(u) < 0) + return -1; + quick = false; + } - u->source_suspended = false; - } - break; + do_trigger = true; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + u->in_mmap_current = 0; + u->in_mmap_saved_nfrags = 0; + u->source_suspended = false; } break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; } if (do_trigger) trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick); - return pa_source_process_msg(o, code, data, offset, chunk); + return 0; } static void sink_get_volume(pa_sink *s) { @@ -1334,6 +1344,7 @@ int pa__init(pa_module*m) { } u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->userdata = u; pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); @@ -1403,6 +1414,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->userdata = u; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); diff --git a/src/modules/raop/raop-sink.c b/src/modules/raop/raop-sink.c index 936129cf5..baa346641 100644 --- a/src/modules/raop/raop-sink.c +++ b/src/modules/raop/raop-sink.c @@ -136,64 +136,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_assert(u->raop); switch (code) { - case PA_SINK_MESSAGE_SET_STATE: { - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - case PA_SINK_SUSPENDED: { - pa_log_debug("RAOP: SUSPENDED"); - - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - /* Issue a TEARDOWN if we are still connected */ - if (pa_raop_client_is_alive(u->raop)) { - pa_raop_client_teardown(u->raop); - } - - break; - } - - case PA_SINK_IDLE: { - pa_log_debug("RAOP: IDLE"); - - /* Issue a FLUSH if we're comming from running state */ - if (u->sink->thread_info.state == PA_SINK_RUNNING) { - pa_rtpoll_set_timer_disabled(u->rtpoll); - pa_raop_client_flush(u->raop); - } - - break; - } - - case PA_SINK_RUNNING: { - pa_usec_t now; - - pa_log_debug("RAOP: RUNNING"); - - now = pa_rtclock_now(); - pa_smoother_reset(u->smoother, now, false); - - if (!pa_raop_client_is_alive(u->raop)) { - /* Connecting will trigger a RECORD and start steaming */ - pa_raop_client_announce(u->raop); - } else if (!pa_raop_client_can_stream(u->raop)) { - /* RECORD alredy sent, simply start streaming */ - pa_raop_client_stream(u->raop); - pa_rtpoll_set_timer_absolute(u->rtpoll, now); - u->write_count = 0; - u->start = now; - } - - break; - } - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - break; - } - - break; - } - case PA_SINK_MESSAGE_GET_LATENCY: { int64_t r = 0; @@ -278,6 +220,68 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + case PA_SINK_SUSPENDED: + pa_log_debug("RAOP: SUSPENDED"); + + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + /* Issue a TEARDOWN if we are still connected */ + if (pa_raop_client_is_alive(u->raop)) { + pa_raop_client_teardown(u->raop); + } + + break; + + case PA_SINK_IDLE: + pa_log_debug("RAOP: IDLE"); + + /* Issue a FLUSH if we're comming from running state */ + if (u->sink->thread_info.state == PA_SINK_RUNNING) { + pa_rtpoll_set_timer_disabled(u->rtpoll); + pa_raop_client_flush(u->raop); + } + + break; + + case PA_SINK_RUNNING: { + pa_usec_t now; + + pa_log_debug("RAOP: RUNNING"); + + now = pa_rtclock_now(); + pa_smoother_reset(u->smoother, now, false); + + if (!pa_raop_client_is_alive(u->raop)) { + /* Connecting will trigger a RECORD and start steaming */ + pa_raop_client_announce(u->raop); + } else if (!pa_raop_client_can_stream(u->raop)) { + /* RECORD alredy sent, simply start streaming */ + pa_raop_client_stream(u->raop); + pa_rtpoll_set_timer_absolute(u->rtpoll, now); + u->write_count = 0; + u->start = now; + } + + break; + } + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + static void sink_set_volume_cb(pa_sink *s) { struct userdata *u = s->userdata; pa_cvolume hw; @@ -696,6 +700,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); u->sink->userdata = u; diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 6549515b5..2c9334931 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -151,6 +151,7 @@ static void reset_callbacks(pa_sink *s) { pa_assert(s); s->set_state_in_main_thread = NULL; + s->set_state_in_io_thread = NULL; s->get_volume = NULL; s->set_volume = NULL; s->write_volume = NULL; @@ -2850,6 +2851,13 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse (s->thread_info.state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(PA_PTR_TO_UINT(userdata))) || (PA_SINK_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SINK_SUSPENDED); + if (s->set_state_in_io_thread) { + int r; + + if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0) + return r; + } + s->thread_info.state = PA_PTR_TO_UINT(userdata); if (s->thread_info.state == PA_SINK_SUSPENDED) { diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index 0caeb550b..e1ea52495 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -124,19 +124,31 @@ struct pa_sink { bool set_mute_in_progress; - /* Called when the main loop requests a state change. Called from - * main loop context. If returns -1 the state change will be - * inhibited. This will also be called even if only the suspend cause + /* Callbacks for doing things when the sink state and/or suspend cause is + * changed. It's fine to set either or both of the callbacks to NULL if the + * implementation doesn't have anything to do on state or suspend cause * changes. * - * s->state and s->suspend_cause haven't been updated yet when this is - * called, so the callback can get the old state through those variables. + * set_state_in_main_thread() is called first. The callback is allowed to + * report failure if and only if the sink changes its state from + * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow + * failure also when changing state from INIT to IDLE or RUNNING, but + * currently that will crash pa_sink_put().) If + * set_state_in_main_thread() fails, set_state_in_io_thread() won't be + * called. * - * If set_state_in_main_thread() is successful, the IO thread will be - * notified with the SET_STATE message. The message handler is allowed to - * fail, in which case the old state is restored, and - * set_state_in_main_thread() is called again. */ + * If set_state_in_main_thread() is successful (or not set), then + * set_state_in_io_thread() is called. Again, failure is allowed if and + * only if the sink changes state from SUSPENDED to IDLE or RUNNING. If + * set_state_in_io_thread() fails, then set_state_in_main_thread() is + * called again, this time with the state parameter set to SUSPENDED and + * the suspend_cause parameter set to 0. + * + * pa_sink.state, pa_sink.thread_info.state and pa_sink.suspend_cause + * are updated only after all the callback calls. In case of failure, the + * state is set to SUSPENDED and the suspend cause is set to 0. */ int (*set_state_in_main_thread)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + int (*set_state_in_io_thread)(pa_sink *s, pa_sink_state_t state); /* may be NULL */ /* Sink drivers that support hardware volume may set this * callback. This is called when the current volume needs to be diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index ad8e5e364..dd56eb082 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -142,6 +142,7 @@ static void reset_callbacks(pa_source *s) { pa_assert(s); s->set_state_in_main_thread = NULL; + s->set_state_in_io_thread = NULL; s->get_volume = NULL; s->set_volume = NULL; s->write_volume = NULL; @@ -2224,6 +2225,13 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) || (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED); + if (s->set_state_in_io_thread) { + int r; + + if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0) + return r; + } + s->thread_info.state = PA_PTR_TO_UINT(userdata); if (suspend_change) { diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index d60e8a1a8..c4fda7965 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -125,19 +125,31 @@ struct pa_source { bool set_mute_in_progress; - /* Called when the main loop requests a state change. Called from - * main loop context. If returns -1 the state change will be - * inhibited. This will also be called even if only the suspend cause + /* Callbacks for doing things when the source state and/or suspend cause is + * changed. It's fine to set either or both of the callbacks to NULL if the + * implementation doesn't have anything to do on state or suspend cause * changes. * - * s->state and s->suspend_cause haven't been updated yet when this is - * called, so the callback can get the old state through those variables. + * set_state_in_main_thread() is called first. The callback is allowed to + * report failure if and only if the source changes its state from + * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow + * failure also when changing state from INIT to IDLE or RUNNING, but + * currently that will crash pa_source_put().) If + * set_state_in_main_thread() fails, set_state_in_io_thread() won't be + * called. * - * If set_state_in_main_thread() is successful, the IO thread will be - * notified with the SET_STATE message. The message handler is allowed to - * fail, in which case the old state is restored, and - * set_state_in_main_thread() is called again. */ - int (*set_state_in_main_thread)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + * If set_state_in_main_thread() is successful (or not set), then + * set_state_in_io_thread() is called. Again, failure is allowed if and + * only if the source changes state from SUSPENDED to IDLE or RUNNING. If + * set_state_in_io_thread() fails, then set_state_in_main_thread() is + * called again, this time with the state parameter set to SUSPENDED and + * the suspend_cause parameter set to 0. + * + * pa_source.state, pa_source.thread_info.state and pa_source.suspend_cause + * are updated only after all the callback calls. In case of failure, the + * state is set to SUSPENDED and the suspend cause is set to 0. */ + int (*set_state_in_main_thread)(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + int (*set_state_in_io_thread)(pa_source *s, pa_source_state_t state); /* may be NULL */ /* Called when the volume is queried. Called from main loop * context. If this is NULL a PA_SOURCE_MESSAGE_GET_VOLUME message -- 2.16.1