The new tunnel sink and source use libpulse to talk to the remote server, and libpulse requires a pa_mainloop_api implementation for interacting with the event loop. The tunnel sink and source have so far been using pa_mainloop, but there are some modules that assume that all sinks and sources use pa_rtpoll in their IO threads, and trying to use those modules together with the pa_mainloop based tunnel sinks and sources will result in crashes (see [1]). I will switch the event loop implementation in the tunnel modules to pa_rtpoll, but that requires a pa_mainloop_api implementation in pa_rtpoll first - that's implemented here. pa_rtpoll_run() is changed so that it processes all defer events first, and then all expired time events. The rest of pa_rtpoll_run() works as before, except the poll timeout calculation now has to also take the time events into account. IO events use the existing pa_rtpoll_item interface. The time events are handled separately from the old timer functionality, which is somewhat ugly. It might be a good idea to remove the old timer functionality and only use the time events. I didn't attempt to do that at this time, because I feared that adapting the pa_rtpoll users to the new system would be difficult. [1] https://bugs.freedesktop.org/show_bug.cgi?id=73429 --- src/pulsecore/rtpoll.c | 504 ++++++++++++++++++++++++++++++++++++++++++++++++- src/pulsecore/rtpoll.h | 4 + 2 files changed, 500 insertions(+), 8 deletions(-) diff --git a/src/pulsecore/rtpoll.c b/src/pulsecore/rtpoll.c index 5f3ca8b..2e1907c 100644 --- a/src/pulsecore/rtpoll.c +++ b/src/pulsecore/rtpoll.c @@ -32,6 +32,7 @@ #include <pulse/xmalloc.h> #include <pulse/timeval.h> +#include <pulsecore/dynarray.h> #include <pulsecore/poll.h> #include <pulsecore/core-error.h> #include <pulsecore/core-rtclock.h> @@ -65,6 +66,18 @@ struct pa_rtpoll { #endif PA_LLIST_HEAD(pa_rtpoll_item, items); + + pa_mainloop_api mainloop_api; + + pa_dynarray *io_events; + + pa_dynarray *time_events; + pa_dynarray *enabled_time_events; + pa_dynarray *expired_time_events; + pa_time_event *cached_next_time_event; + + pa_dynarray *defer_events; + pa_dynarray *enabled_defer_events; }; struct pa_rtpoll_item { @@ -84,8 +97,321 @@ struct pa_rtpoll_item { PA_LLIST_FIELDS(pa_rtpoll_item); }; +struct pa_io_event { + pa_rtpoll *rtpoll; + pa_rtpoll_item *rtpoll_item; + pa_io_event_flags_t events; + pa_io_event_cb_t callback; + pa_io_event_destroy_cb_t destroy_callback; + void *userdata; +}; + +static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events); + +struct pa_time_event { + pa_rtpoll *rtpoll; + pa_usec_t time; + bool use_rtclock; + bool enabled; + pa_time_event_cb_t callback; + pa_time_event_destroy_cb_t destroy_callback; + void *userdata; +}; + +static void time_event_restart(pa_time_event *event, const struct timeval *tv); + +struct pa_defer_event { + pa_rtpoll *rtpoll; + bool enabled; + pa_defer_event_cb_t callback; + pa_defer_event_destroy_cb_t destroy_callback; + void *userdata; +}; + +static void defer_event_enable(pa_defer_event *event, int enable); + PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree); +static short map_flags_to_libc(pa_io_event_flags_t flags) { + return (short) + ((flags & PA_IO_EVENT_INPUT ? POLLIN : 0) | + (flags & PA_IO_EVENT_OUTPUT ? POLLOUT : 0) | + (flags & PA_IO_EVENT_ERROR ? POLLERR : 0) | + (flags & PA_IO_EVENT_HANGUP ? POLLHUP : 0)); +} + +static pa_io_event_flags_t map_flags_from_libc(short flags) { + return + (flags & POLLIN ? PA_IO_EVENT_INPUT : 0) | + (flags & POLLOUT ? PA_IO_EVENT_OUTPUT : 0) | + (flags & POLLERR ? PA_IO_EVENT_ERROR : 0) | + (flags & POLLHUP ? PA_IO_EVENT_HANGUP : 0); +} + +static int io_event_work_cb(pa_rtpoll_item *item) { + pa_io_event *event; + struct pollfd *pollfd; + + pa_assert(item); + + event = pa_rtpoll_item_get_userdata(item); + pollfd = pa_rtpoll_item_get_pollfd(item, NULL); + event->callback(&event->rtpoll->mainloop_api, event, pollfd->fd, map_flags_from_libc(pollfd->revents), event->userdata); + + return 0; +} + +static pa_io_event* io_event_new(pa_mainloop_api *api, int fd, pa_io_event_flags_t events, pa_io_event_cb_t callback, + void *userdata) { + pa_rtpoll *rtpoll; + pa_io_event *event; + struct pollfd *pollfd; + + pa_assert(api); + pa_assert(api->userdata); + pa_assert(fd >= 0); + pa_assert(callback); + + rtpoll = api->userdata; + pa_assert(api == &rtpoll->mainloop_api); + + event = pa_xnew0(pa_io_event, 1); + event->rtpoll = rtpoll; + event->rtpoll_item = pa_rtpoll_item_new(rtpoll, PA_RTPOLL_NORMAL, 1); + pa_rtpoll_item_set_work_callback(event->rtpoll_item, io_event_work_cb); + pa_rtpoll_item_set_userdata(event->rtpoll_item, event); + pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL); + pollfd->fd = fd; + event->callback = callback; + event->userdata = userdata; + + pa_dynarray_append(rtpoll->io_events, event); + io_event_enable(event, events); + + return event; +} + +static void io_event_free(pa_io_event *event) { + pa_assert(event); + + pa_dynarray_remove_by_data(event->rtpoll->io_events, event); + + if (event->destroy_callback) + event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata); + + if (event->rtpoll_item) + pa_rtpoll_item_free(event->rtpoll_item); + + pa_xfree(event); +} + +static void io_event_enable(pa_io_event *event, pa_io_event_flags_t events) { + struct pollfd *pollfd; + + pa_assert(event); + + if (events == event->events) + return; + + event->events = events; + + pollfd = pa_rtpoll_item_get_pollfd(event->rtpoll_item, NULL); + pollfd->events = map_flags_to_libc(events); +} + +static void io_event_set_destroy(pa_io_event *event, pa_io_event_destroy_cb_t callback) { + pa_assert(event); + + event->destroy_callback = callback; +} + +static pa_usec_t make_rt(const struct timeval *tv, bool *use_rtclock) { + struct timeval ttv; + + if (!tv) { + *use_rtclock = false; + return PA_USEC_INVALID; + } + + ttv = *tv; + *use_rtclock = !!(ttv.tv_usec & PA_TIMEVAL_RTCLOCK); + + if (*use_rtclock) + ttv.tv_usec &= ~PA_TIMEVAL_RTCLOCK; + else + pa_rtclock_from_wallclock(&ttv); + + return pa_timeval_load(&ttv); +} + +static pa_time_event* time_event_new(pa_mainloop_api *api, const struct timeval *tv, pa_time_event_cb_t callback, + void *userdata) { + pa_rtpoll *rtpoll; + pa_time_event *event; + + pa_assert(api); + pa_assert(api->userdata); + pa_assert(callback); + + rtpoll = api->userdata; + pa_assert(api == &rtpoll->mainloop_api); + + event = pa_xnew0(pa_time_event, 1); + event->rtpoll = rtpoll; + event->time = PA_USEC_INVALID; + event->callback = callback; + event->userdata = userdata; + + pa_dynarray_append(rtpoll->time_events, event); + time_event_restart(event, tv); + + return event; +} + +static void time_event_free(pa_time_event *event) { + pa_assert(event); + + time_event_restart(event, NULL); + pa_dynarray_remove_by_data(event->rtpoll->time_events, event); + + if (event->destroy_callback) + event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata); + + pa_xfree(event); +} + +static void time_event_restart(pa_time_event *event, const struct timeval *tv) { + pa_usec_t t; + bool use_rtclock; + bool enabled; + bool old_enabled; + + pa_assert(event); + + t = make_rt(tv, &use_rtclock); + enabled = (t != PA_USEC_INVALID); + old_enabled = event->enabled; + + /* We return early only if the event stays disabled. If the event stays + * enabled, we can't return early, because the event time may change. */ + if (!enabled && !old_enabled) + return; + + event->enabled = enabled; + event->time = t; + event->use_rtclock = use_rtclock; + + if (enabled && !old_enabled) + pa_dynarray_append(event->rtpoll->enabled_time_events, event); + else if (!enabled) { + pa_dynarray_remove_by_data(event->rtpoll->enabled_time_events, event); + pa_dynarray_remove_by_data(event->rtpoll->expired_time_events, event); + } + + if (event->rtpoll->cached_next_time_event == event) + event->rtpoll->cached_next_time_event = NULL; + + if (event->rtpoll->cached_next_time_event && enabled) { + pa_assert(event->rtpoll->cached_next_time_event->enabled); + + if (t < event->rtpoll->cached_next_time_event->time) + event->rtpoll->cached_next_time_event = event; + } +} + +static void time_event_set_destroy(pa_time_event *event, pa_time_event_destroy_cb_t callback) { + pa_assert(event); + + event->destroy_callback = callback; +} + +static pa_defer_event* defer_event_new(pa_mainloop_api *api, pa_defer_event_cb_t callback, void *userdata) { + pa_rtpoll *rtpoll; + pa_defer_event *event; + + pa_assert(api); + pa_assert(api->userdata); + pa_assert(callback); + + rtpoll = api->userdata; + pa_assert(api == &rtpoll->mainloop_api); + + event = pa_xnew0(pa_defer_event, 1); + event->rtpoll = rtpoll; + event->callback = callback; + event->userdata = userdata; + + pa_dynarray_append(rtpoll->defer_events, event); + defer_event_enable(event, true); + + return event; +} + +static void defer_event_free(pa_defer_event *event) { + pa_assert(event); + + defer_event_enable(event, false); + pa_dynarray_remove_by_data(event->rtpoll->defer_events, event); + + if (event->destroy_callback) + event->destroy_callback(&event->rtpoll->mainloop_api, event, event->userdata); + + pa_xfree(event); +} + +static void defer_event_enable(pa_defer_event *event, int enable) { + pa_assert(event); + + if (enable == event->enabled) + return; + + event->enabled = enable; + + if (enable) + pa_dynarray_append(event->rtpoll->enabled_defer_events, event); + else + pa_dynarray_remove_by_data(event->rtpoll->enabled_defer_events, event); +} + +static void defer_event_set_destroy(pa_defer_event *event, pa_defer_event_destroy_cb_t callback) { + pa_assert(event); + + event->destroy_callback = callback; +} + +static void mainloop_api_quit(pa_mainloop_api *api, int retval) { + pa_rtpoll *rtpoll; + + pa_assert(api); + pa_assert(api->userdata); + + rtpoll = api->userdata; + pa_assert(api == &rtpoll->mainloop_api); + + pa_rtpoll_quit(rtpoll); +} + +static const pa_mainloop_api vtable = { + .userdata = NULL, + + .io_new = io_event_new, + .io_enable = io_event_enable, + .io_free = io_event_free, + .io_set_destroy = io_event_set_destroy, + + .time_new = time_event_new, + .time_restart = time_event_restart, + .time_free = time_event_free, + .time_set_destroy = time_event_set_destroy, + + .defer_new = defer_event_new, + .defer_enable = defer_event_enable, + .defer_free = defer_event_free, + .defer_set_destroy = defer_event_set_destroy, + + .quit = mainloop_api_quit, +}; + pa_rtpoll *pa_rtpoll_new(void) { pa_rtpoll *p; @@ -99,6 +425,15 @@ pa_rtpoll *pa_rtpoll_new(void) { p->timestamp = pa_rtclock_now(); #endif + p->mainloop_api = vtable; + p->mainloop_api.userdata = p; + p->io_events = pa_dynarray_new(NULL); + p->time_events = pa_dynarray_new(NULL); + p->enabled_time_events = pa_dynarray_new(NULL); + p->expired_time_events = pa_dynarray_new(NULL); + p->defer_events = pa_dynarray_new(NULL); + p->enabled_defer_events = pa_dynarray_new(NULL); + return p; } @@ -167,15 +502,108 @@ static void rtpoll_item_destroy(pa_rtpoll_item *i) { void pa_rtpoll_free(pa_rtpoll *p) { pa_assert(p); + if (p->defer_events) { + pa_defer_event *event; + + while ((event = pa_dynarray_last(p->defer_events))) + defer_event_free(event); + } + + if (p->time_events) { + pa_time_event *event; + + while ((event = pa_dynarray_last(p->time_events))) + time_event_free(event); + } + + if (p->io_events) { + pa_io_event *event; + + while ((event = pa_dynarray_last(p->io_events))) + io_event_free(event); + } + while (p->items) rtpoll_item_destroy(p->items); + if (p->enabled_defer_events) { + pa_assert(pa_dynarray_size(p->enabled_defer_events) == 0); + pa_dynarray_free(p->enabled_defer_events); + } + + if (p->defer_events) { + pa_assert(pa_dynarray_size(p->defer_events) == 0); + pa_dynarray_free(p->defer_events); + } + + if (p->expired_time_events) { + pa_assert(pa_dynarray_size(p->expired_time_events) == 0); + pa_dynarray_free(p->expired_time_events); + } + + if (p->enabled_time_events) { + pa_assert(pa_dynarray_size(p->enabled_time_events) == 0); + pa_dynarray_free(p->enabled_time_events); + } + + if (p->time_events) { + pa_assert(pa_dynarray_size(p->time_events) == 0); + pa_dynarray_free(p->time_events); + } + + if (p->io_events) { + pa_assert(pa_dynarray_size(p->io_events) == 0); + pa_dynarray_free(p->io_events); + } + pa_xfree(p->pollfd); pa_xfree(p->pollfd2); pa_xfree(p); } +pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll) { + pa_assert(rtpoll); + + return &rtpoll->mainloop_api; +} + +static void find_expired_time_events(pa_rtpoll *rtpoll) { + pa_usec_t now; + pa_time_event *event; + unsigned idx; + + pa_assert(rtpoll); + pa_assert(pa_dynarray_size(rtpoll->expired_time_events) == 0); + + now = pa_rtclock_now(); + + PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) { + if (event->time <= now) + pa_dynarray_append(rtpoll->expired_time_events, event); + } +} + +static pa_time_event *find_next_time_event(pa_rtpoll *rtpoll) { + pa_time_event *event; + pa_time_event *result = NULL; + unsigned idx; + + pa_assert(rtpoll); + + if (rtpoll->cached_next_time_event) + return rtpoll->cached_next_time_event; + + PA_DYNARRAY_FOREACH(event, rtpoll->enabled_time_events, idx) { + if (!result || event->time < result->time) + result = event; + } + + rtpoll->cached_next_time_event = result; + + return result; +} + static void reset_revents(pa_rtpoll_item *i) { struct pollfd *f; unsigned n; @@ -204,9 +632,14 @@ static void reset_all_revents(pa_rtpoll *p) { } int pa_rtpoll_run(pa_rtpoll *p) { + pa_defer_event *defer_event; + pa_time_event *time_event; pa_rtpoll_item *i; int r = 0; struct timeval timeout; + pa_time_event *next_time_event; + struct timeval next_time_event_elapse; + bool timer_enabled; pa_assert(p); pa_assert(!p->running); @@ -218,7 +651,28 @@ int pa_rtpoll_run(pa_rtpoll *p) { p->running = true; p->timer_elapsed = false; - /* First, let's do some work */ + /* Dispatch all enabled defer events. */ + while ((defer_event = pa_dynarray_last(p->enabled_defer_events))) { + if (p->quit) + break; + + defer_event->callback(&p->mainloop_api, defer_event, defer_event->userdata); + } + + /* Dispatch all expired time events. */ + find_expired_time_events(p); + while ((time_event = pa_dynarray_last(p->expired_time_events))) { + struct timeval tv; + + if (p->quit) + break; + + time_event_restart(time_event, NULL); + time_event->callback(&p->mainloop_api, time_event, pa_timeval_rtstore(&tv, time_event->time, time_event->use_rtclock), + time_event->userdata); + } + + /* Let's do some work */ for (i = p->items; i && i->priority < PA_RTPOLL_NEVER; i = i->next) { int k; @@ -282,15 +736,40 @@ int pa_rtpoll_run(pa_rtpoll *p) { if (p->rebuild_needed) rtpoll_rebuild(p); + /* Calculate timeout */ + pa_zero(timeout); - /* Calculate timeout */ - if (!p->quit && p->timer_enabled) { + next_time_event = find_next_time_event(p); + if (next_time_event) + pa_timeval_rtstore(&next_time_event_elapse, next_time_event->time, next_time_event->use_rtclock); + + /* p->timer_enabled and p->next_elapse are controlled by the rtpoll owner, + * while the time events can be created by anyone through pa_mainloop_api. + * It might be a good idea to merge p->timer_enabled and p->next_elapse + * with the time events so that we wouldn't need to handle them separately + * here. The reason why they are currently separate is that the + * pa_mainloop_api interface was bolted on pa_rtpoll as an afterthought. */ + timer_enabled = p->timer_enabled || next_time_event; + + if (!p->quit && timer_enabled) { + struct timeval *next_elapse; struct timeval now; + + if (p->timer_enabled && next_time_event) { + if (pa_timeval_cmp(&p->next_elapse, &next_time_event_elapse) > 0) + next_elapse = &next_time_event_elapse; + else + next_elapse = &p->next_elapse; + } else if (p->timer_enabled) + next_elapse = &p->next_elapse; + else + next_elapse = &next_time_event_elapse; + pa_rtclock_get(&now); - if (pa_timeval_cmp(&p->next_elapse, &now) > 0) - pa_timeval_add(&timeout, pa_timeval_diff(&p->next_elapse, &now)); + if (pa_timeval_cmp(next_elapse, &now) > 0) + pa_timeval_add(&timeout, pa_timeval_diff(next_elapse, &now)); } #ifdef DEBUG_TIMING @@ -298,7 +777,7 @@ int pa_rtpoll_run(pa_rtpoll *p) { pa_usec_t now = pa_rtclock_now(); p->awake = now - p->timestamp; p->timestamp = now; - if (!p->quit && p->timer_enabled) + if (!p->quit && timer_enabled) pa_log("poll timeout: %d ms ",(int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000))); else if (q->quit) pa_log("poll timeout is ZERO"); @@ -313,12 +792,21 @@ int pa_rtpoll_run(pa_rtpoll *p) { struct timespec ts; ts.tv_sec = timeout.tv_sec; ts.tv_nsec = timeout.tv_usec * 1000; - r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? &ts : NULL, NULL); + r = ppoll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? &ts : NULL, NULL); } #else - r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || p->timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1); + r = pa_poll(p->pollfd, p->n_pollfd_used, (p->quit || timer_enabled) ? (int) ((timeout.tv_sec*1000) + (timeout.tv_usec / 1000)) : -1); #endif + /* FIXME: We don't know whether the pa_rtpoll owner's timer elapsed or one + * of the time events created by others through pa_mainloop_api. The alsa + * sink and source use pa_rtpoll_timer_elapsed() to check whether *their* + * timer elapsed, so this ambiguity is a problem for them in theory. + * However, currently the pa_rtpoll objects of the alsa sink and source are + * not being used through pa_mainloop_api, so in practice there's no + * ambiguity. We could use pa_rtclock_now() to check whether p->next_elapse + * is in the past, but we don't do that currently, because pa_rtclock_now() + * is somewhat expensive and this ambiguity isn't currently a big issue. */ p->timer_elapsed = r == 0; #ifdef DEBUG_TIMING diff --git a/src/pulsecore/rtpoll.h b/src/pulsecore/rtpoll.h index c0a4dda..e2aee73 100644 --- a/src/pulsecore/rtpoll.h +++ b/src/pulsecore/rtpoll.h @@ -25,7 +25,9 @@ #include <sys/types.h> #include <limits.h> +#include <pulse/mainloop-api.h> #include <pulse/sample.h> + #include <pulsecore/asyncmsgq.h> #include <pulsecore/fdsem.h> #include <pulsecore/macro.h> @@ -56,6 +58,8 @@ typedef enum pa_rtpoll_priority { pa_rtpoll *pa_rtpoll_new(void); void pa_rtpoll_free(pa_rtpoll *p); +pa_mainloop_api *pa_rtpoll_get_mainloop_api(pa_rtpoll *rtpoll); + /* Sleep on the rtpoll until the time event, or any of the fd events * is triggered. Returns negative on error, positive if the loop * should continue to run, 0 when the loop should be terminated -- 1.9.3