On Wed, 2013-08-07 at 18:05 +0200, Alexander Couzens wrote: > Signed-off-by: Alexander Couzens <lynxis at fe80.eu> > --- The whole commit message, apart from the signed-off-by line, is now all on one line. The first line is supposed to be a very short summary of the changes, suitable for e.g. email subject line. Further explanation should go in a separate paragraph. > src/Makefile.am | 6 + > src/modules/module-tunnel-sink-new.c | 527 +++++++++++++++++++++++++++++++++++ > 2 files changed, 533 insertions(+) > create mode 100644 src/modules/module-tunnel-sink-new.c > > diff --git a/src/Makefile.am b/src/Makefile.am > index 6de6e96..27477e9 100644 > --- a/src/Makefile.am > +++ b/src/Makefile.am > @@ -1097,6 +1097,7 @@ modlibexec_LTLIBRARIES += \ > module-remap-sink.la \ > module-remap-source.la \ > module-ladspa-sink.la \ > + module-tunnel-sink-new.la \ > module-tunnel-sink.la \ > module-tunnel-source.la \ > module-position-event-sounds.la \ > @@ -1368,6 +1369,7 @@ SYMDEF_FILES = \ > module-ladspa-sink-symdef.h \ > module-equalizer-sink-symdef.h \ > module-match-symdef.h \ > + module-tunnel-sink-new-symdef.h \ > module-tunnel-sink-symdef.h \ > module-tunnel-source-symdef.h \ > module-null-sink-symdef.h \ > @@ -1638,6 +1640,10 @@ module_match_la_SOURCES = modules/module-match.c > module_match_la_LDFLAGS = $(MODULE_LDFLAGS) > module_match_la_LIBADD = $(MODULE_LIBADD) > > +module_tunnel_sink_new_la_SOURCES = modules/module-tunnel-sink-new.c > +module_tunnel_sink_new_la_LDFLAGS = $(MODULE_LDFLAGS) > +module_tunnel_sink_new_la_LIBADD = $(MODULE_LIBADD) > + > module_tunnel_sink_la_SOURCES = modules/module-tunnel.c > module_tunnel_sink_la_CFLAGS = -DTUNNEL_SINK=1 $(AM_CFLAGS) > module_tunnel_sink_la_LDFLAGS = $(MODULE_LDFLAGS) > diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c > new file mode 100644 > index 0000000..ccdeb6f > --- /dev/null > +++ b/src/modules/module-tunnel-sink-new.c > @@ -0,0 +1,527 @@ > +/*** > + This file is part of PulseAudio. > + > + Copyright 2013 Alexander Couzens > + > + PulseAudio is free software; you can redistribute it and/or modify > + it under the terms of the GNU Lesser General Public License as published > + by the Free Software Foundation; either version 2.1 of the License, > + or (at your option) any later version. > + > + PulseAudio is distributed in the hope that it will be useful, but > + WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + General Public License for more details. > + > + You should have received a copy of the GNU Lesser General Public License > + along with PulseAudio; if not, write to the Free Software > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 > + USA. > +***/ > + > +#ifdef HAVE_CONFIG_H > +#include <config.h> > +#endif > + > +#include <pulse/context.h> > +#include <pulse/timeval.h> > +#include <pulse/xmalloc.h> > +#include <pulse/stream.h> > +#include <pulse/mainloop.h> > +#include <pulse/subscribe.h> > +#include <pulse/introspect.h> > + > +#include <pulsecore/core.h> > +#include <pulsecore/core-util.h> > +#include <pulsecore/i18n.h> > +#include <pulsecore/sink.h> > +#include <pulsecore/modargs.h> > +#include <pulsecore/log.h> > +#include <pulsecore/thread.h> > +#include <pulsecore/thread-mq.h> > +#include <pulsecore/poll.h> > +#include <pulsecore/proplist-util.h> > + > +#include "module-tunnel-sink-new-symdef.h" > + > +PA_MODULE_AUTHOR("Alexander Couzens"); > +PA_MODULE_DESCRIPTION("Create a network sink which connects via a stream to a remote PulseAudio server"); > +PA_MODULE_VERSION(PACKAGE_VERSION); > +PA_MODULE_LOAD_ONCE(false); > +PA_MODULE_USAGE( > + "server=<address> " > + "sink=<name of the remote sink> " > + "sink_name=<name for the local sink> " > + "sink_properties=<properties for the local sink> " > + "format=<sample format> " > + "channels=<number of channels> " > + "rate=<sample rate> " > + "channel_map=<channel map>" > + ); > + > +#define TUNNEL_THREAD_FAILED_MAINLOOP 1 > + > +/* libpulse callbacks */ > +static void stream_state_callback(pa_stream *stream, void *userdata); Cosmetic: I'd prefer _cb suffix for all callbacks. > +static void context_state_callback(pa_context *c, void *userdata); > +static void sink_update_requested_latency_cb(pa_sink *s); > + > +struct userdata { > + pa_module *module; > + pa_sink *sink; > + pa_thread *thread; > + pa_thread_mq thread_mq; > + pa_mainloop *thread_mainloop; > + pa_mainloop_api *thread_mainloop_api; > + > + /* libpulse context */ In the previous review I said that this comment doesn't have any useful information. I meant to imply that the comment should be removed. > + pa_context *context; > + pa_stream *stream; > + > + pa_buffer_attr bufferattr; > + > + bool connected; > + > + char *remote_server; > + char *remote_sink_name; > +}; > + > +static const char* const valid_modargs[] = { > + "sink_name", > + "sink_properties", > + "server", > + "sink", > + "format", > + "channels", > + "rate", > + "channel_map", > + "cookie", /* unimplemented */ > + "reconnect", /* reconnect if server comes back again - unimplemented*/ "cookie" and "reconnect" aren't included in the usage string. I guess the reason is because the features haven't been implemented, but I think it would be good to keep PA_MODULE_USAGE and valid_modargs in sync, because when you implement the cookie support, for example, it's very easy to forget to update PA_MODULE_USAGE. What's your plan for implementing "reconnect"? I argued in my previous review that module-tunnel-sink shouldn't support reconnecting at all, and should instead leave that job for module-zeroconf-discover. > + NULL, > +}; > + > +static pa_proplist* tunnel_new_proplist(struct userdata *u) { > + pa_proplist *proplist = pa_proplist_new(); > + pa_proplist_sets(proplist, PA_PROP_APPLICATION_NAME, "PulseAudio"); > + pa_proplist_sets(proplist, PA_PROP_APPLICATION_ID, "org.PulseAudio.PulseAudio"); > + pa_proplist_sets(proplist, PA_PROP_APPLICATION_VERSION, PACKAGE_VERSION); > + pa_init_proplist(proplist); > + > + return proplist; > +} > + > +static void thread_func(void *userdata) { > + struct userdata *u = userdata; > + pa_proplist *proplist; > + pa_memchunk memchunk; > + > + pa_assert(u); > + > + pa_log_debug("Thread starting up"); > + pa_thread_mq_install(&u->thread_mq); > + > + pa_memchunk_reset(&memchunk); > + > + proplist = tunnel_new_proplist(u); > + /* init libpulse */ > + u->context = pa_context_new_with_proplist(pa_mainloop_get_api(u->thread_mainloop), Nitpick: using u->thread_mainloop_api instead of pa_mainloop_get_api() would be slightly simpler/shorter. > + "PulseAudio", > + proplist); > + pa_proplist_free(proplist); > + > + if (!u->context) { > + pa_log("Failed to create libpulse context"); > + goto fail; > + } > + > + pa_context_set_state_callback(u->context, context_state_callback, u); > + if (pa_context_connect(u->context, > + u->remote_server, > + PA_CONTEXT_NOFAIL | PA_CONTEXT_NOAUTOSPAWN, As I explained in the previous review, the NOFAIL flag serves no purpose here. > + NULL) < 0) { > + pa_log("Failed to connect libpulse context"); > + goto fail; > + } > + > + for (;;) { > + int ret; > + const void *p; > + > + size_t writable = 0; > + > + if (pa_mainloop_iterate(u->thread_mainloop, 1, &ret) < 0) { > + if (ret == 0) > + goto finish; > + else > + goto fail; > + } > + > + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) > + pa_sink_process_rewind(u->sink, 0); > + > + if (u->connected && > + PA_STREAM_IS_GOOD(pa_stream_get_state(u->stream)) && > + PA_SINK_IS_LINKED(u->sink->thread_info.state)) { > + /* TODO: use IS_RUNNING + cork stream */ What do you mean by this comment? > + > + if (pa_stream_is_corked(u->stream)) { > + pa_stream_cork(u->stream, 0, NULL, NULL); I already pointed out in the previous review that you need to unref the pa_operation object that this function returns. > + } else { > + writable = pa_stream_writable_size(u->stream); > + if (writable > 0) { > + if (memchunk.length <= 0) I already pointed out in the previous review that the length is always zero here. That means that checking the value is pointless. > + pa_sink_render(u->sink, writable, &memchunk); I think pa_sink_render_full() would be better. It's nicer to send one big packet over the network than several smaller ones (or that's at least my gut feeling). > + > + pa_assert(memchunk.length > 0); > + > + /* we have new data to write */ > + p = (const uint8_t *) pa_memblock_acquire(memchunk.memblock); > + /* TODO: ZERO COPY! */ How do you plan to achieve zero copy? If the plan is to use pa_stream_write_begin(), that won't result in zero copy, because the data needs to be copied to the buffer that pa_stream_write_begin() gives (but it's still a good idea to use pa_stream_write_begin()). > + ret = pa_stream_write(u->stream, > + ((uint8_t*) p + memchunk.index), > + memchunk.length, > + NULL, /**< A cleanup routine for the data or NULL to request an internal copy */ > + 0, /** offset */ > + PA_SEEK_RELATIVE > + ); The indentation is still off by two spaces here. > + pa_memblock_release(memchunk.memblock); > + pa_memblock_unref(memchunk.memblock); > + pa_memchunk_reset(&memchunk); > + > + if (ret != 0) { > + /* TODO: we should consider a state change or is that already done ? */ > + pa_log_warn("Could not write data into the stream ... ret = %i", ret); I already answered this question in the previous review: "If pa_stream_writable_size() says that N bytes can be written, and then writing N bytes fails, we can just give up and goto fail." > + } > + } > + } > + } > + } > +fail: > + /* If this was no regular exit from the loop we have to continue > + * processing messages until we received PA_MESSAGE_SHUTDOWN > + * > + * Note: is this a race condition? When a PA_MESSAGE_SHUTDOWN already within the queue? I already answered this question in the previous review. No, it's not a race condition. This code will work just fine if PA_MESSAGE_SHUTDOWN has been sent. If PA_MESSAGE_SHUTDOWN has been sent, it means that pa__done() is being run from pa_module_unload(), and in that case the PA_CORE_MESSAGE_UNLOAD_MODULE handler won't do anything, because the module was already removed from core->modules before pa_module_unload() was called. > + */ > + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->module->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); > + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); > + > +finish: > + > + if (memchunk.memblock) > + pa_memblock_unref(memchunk.memblock); > + > + if (u->stream) { > + pa_stream_disconnect(u->stream); > + pa_stream_unref(u->stream); > + u->stream = NULL; > + } > + > + if (u->context) { > + pa_context_disconnect(u->context); > + pa_context_unref(u->context); > + u->context = NULL; > + } > + > + pa_log_debug("Thread shutting down"); > +} > + > +static void stream_state_callback(pa_stream *stream, void *userdata) { > + struct userdata *u = userdata; > + > + pa_assert(u); > + > + switch (pa_stream_get_state(stream)) { > + case PA_STREAM_FAILED: > + pa_log_error("Stream failed."); > + u->connected = false; > + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); > + break; > + case PA_STREAM_TERMINATED: > + pa_log_debug("Stream terminated."); > + break; > + default: > + break; > + } When the stream becomes ready, you should check what the actual tlength is, and set the sink latency accordingly. Also, if the server changes the tlength at runtime, the sink latency needs to be updated also in that situation. You can use pa_stream_set_buffer_attr_callback() for getting notifications about changes in the buffer attributes. > +} > + > +static void context_state_callback(pa_context *c, void *userdata) { > + struct userdata *u = userdata; > + int c_errno; > + > + pa_assert(u); > + > + switch (pa_context_get_state(c)) { > + case PA_CONTEXT_UNCONNECTED: > + case PA_CONTEXT_CONNECTING: > + case PA_CONTEXT_AUTHORIZING: > + case PA_CONTEXT_SETTING_NAME: > + break; > + case PA_CONTEXT_READY: { > + pa_proplist *proplist; > + const char *username = pa_get_user_name_malloc(); > + const char *hostname = pa_get_host_name_malloc(); > + /* TODO: old tunnel say 'Null-Output' */ What do you mean by this comment? > + char *stream_name = pa_sprintf_malloc("%s for %s@%s", "Tunnel", username, hostname); Why is "Tunnel" inserted to the string via substitution instead of directly? Also, I think the string should be translatable. > + > + pa_log_debug("Connection successful. Creating stream."); > + pa_assert(!u->stream); > + > + proplist = tunnel_new_proplist(u); > + pa_proplist_sets(proplist, PA_PROP_MEDIA_ROLE, "sound"); "sound" is not a standard media role, and therefore it's not useful. It's probably best to not set any role, because we don't know what the tunnel will be used for. (The old tunnel module sets role "abstract", but I don't think that's really useful.) > + pa_assert(proplist); If you want to have this assertion, it should be before pa_proplist_sets(). > + > + u->stream = pa_stream_new_with_proplist(u->context, > + stream_name, > + &u->sink->sample_spec, > + &u->sink->channel_map, > + proplist); > + pa_proplist_free(proplist); > + pa_xfree(stream_name); > + > + if(!u->stream) { Missing space after "if". > + pa_log_error("Could not create a stream."); > + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); > + return; > + } > + > + Extra empty line. > + pa_context_subscribe(u->context, PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL); Subscribing without providing a callback is pointless. Also, you don't unref the pa_operation object that the function returns. > + > + pa_stream_set_state_callback(u->stream, stream_state_callback, userdata); > + if (pa_stream_connect_playback(u->stream, > + u->remote_sink_name, > + &u->bufferattr, > + PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE, I asked you to add DONT_MOVE to the flags. Also, I now realized that PA_STREAM_INTERPOLATE_TIMING should be added to the flags too. > + NULL, > + NULL) < 0) { > + /* TODO fail */ Why is this not implemented? It should be a simple case of logging an error and calling quit() on the mainloop. > + } > + u->connected = true; > + break; > + } > + case PA_CONTEXT_FAILED: > + c_errno = pa_context_errno(u->context); > + pa_log_debug("Context failed with err %d.", c_errno); pa_strerror() is more informative than the plain error code. > + u->connected = false; > + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); There could be a "quit" section, so that you could write just "goto quit;" instead of setting connected = false and calling quit() multiple times in this function. > + break; > + case PA_CONTEXT_TERMINATED: > + pa_log_debug("Context terminated."); > + u->connected = false; > + u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP); > + break; > + default: > + break; > + } > +} > + > +static void sink_update_requested_latency_cb(pa_sink *s) { > + struct userdata *u; > + size_t nbytes; > + pa_usec_t block_usec; > + > + pa_sink_assert_ref(s); > + pa_assert_se(u = s->userdata); > + > + block_usec = pa_sink_get_requested_latency_within_thread(s); > + > + if (block_usec == (pa_usec_t) -1) > + block_usec = s->thread_info.max_latency; > + > + nbytes = pa_usec_to_bytes(block_usec, &s->sample_spec); > + pa_sink_set_max_rewind_within_thread(s, nbytes); max_rewind should be 0, since you don't support rewinding. > + pa_sink_set_max_request_within_thread(s, nbytes); > + > + if (block_usec != (pa_usec_t) -1) { > + u->bufferattr.tlength = nbytes; block_usec is never -1, because if pa_sink_get_requested_latency_within_thread() returns -1, then you set block_usec to s->thread_info.max_latency. To be consistent with the latency all the time, I think you should set the initial tlength to s->thread_info.max_latency too, instead of initializing it to -1 like you do now. > + } Unnecessary braces. > + > + if (u->stream && PA_STREAM_IS_GOOD(pa_stream_get_state(u->stream))) { > + pa_stream_set_buffer_attr(u->stream, &u->bufferattr, NULL, NULL); You don't unref the pa_operation object that the function returns. It's a bit hairy issue what to do if the stream is not yet ready when the sink requested latency changes. You use PA_STREAM_IS_GOOD() here, but it seems that pa_stream_set_buffer_attr() fails if the state is not READY. PA_STREAM_IS_GOOD(), on the other hand, returns true also if the state is CREATING. So what to do? I think it would best to postpone the pa_stream_set_buffer_attr() call until the stream becomes ready. When the stream becomes ready, check the requested latency, and if it's something else than max_latency or the tlength that the server assigned to the stream, call pa_stream_set_buffer_attr(). > + } Unnecessary braces. > +} > + > +static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { > + struct userdata *u = PA_SINK(o)->userdata; > + > + switch (code) { > + case PA_SINK_MESSAGE_GET_LATENCY: { > + int negative; > + pa_usec_t remote_latency; > + > + if (!PA_SINK_IS_LINKED(u->sink->thread_info.state)) { > + *((pa_usec_t*) data) = 0; > + return 0; > + } > + > + if (!u->stream) { > + *((pa_usec_t*) data) = 0; > + return 0; > + } > + > + if (!PA_STREAM_IS_GOOD(pa_stream_get_state(u->stream))) { You should check for PA_STREAM_READY instead of PA_STREAM_IS_GOOD, or alternatively not have have this check at all and let pa_stream_get_latency() return an error when the stream state is something else than READY. > + *((pa_usec_t*) data) = 0; > + return 0; > + } > + > + if (pa_stream_get_latency(u->stream, &remote_latency, &negative) < 0) { > + *((pa_usec_t*) data) = 0; > + return 0; > + } > + > + *((pa_usec_t*) data) = > + /* Add the latency from libpulse */ > + remote_latency; > + /* do we have to add more latency here ? */ I already answered this question in the previous review. You don't need to add more latency. > + return 0; > + } > + } > + return pa_sink_process_msg(o, code, data, offset, chunk); > +} > + > +int pa__init(pa_module *m) { > + struct userdata *u = NULL; > + pa_modargs *ma = NULL; > + pa_sink_new_data sink_data; > + pa_sample_spec ss; > + pa_channel_map map; > + const char *remote_server = NULL; > + const char *sink_name = NULL; > + char *default_sink_name = NULL; > + > + pa_assert(m); > + > + if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { > + pa_log("Failed to parse module arguments."); > + goto fail; > + } > + > + ss = m->core->default_sample_spec; > + map = m->core->default_channel_map; > + if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) { > + pa_log("Invalid sample format specification or channel map"); > + goto fail; > + } > + > + remote_server = pa_modargs_get_value(ma, "server", NULL); > + if (!remote_server) { > + pa_log("No server given!"); > + goto fail; > + } > + > + u = pa_xnew0(struct userdata, 1); > + u->module = m; > + m->userdata = u; > + u->remote_server = pa_xstrdup(remote_server); > + u->thread_mainloop = pa_mainloop_new(); > + if (u->thread_mainloop == NULL) { > + pa_log("Failed to create mainloop"); > + goto fail; > + } > + u->thread_mainloop_api = pa_mainloop_get_api(u->thread_mainloop); > + > + u->remote_sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL)); > + > + u->bufferattr.maxlength = (uint32_t) -1; > + u->bufferattr.minreq = (uint32_t) -1; > + u->bufferattr.prebuf = (uint32_t) -1; > + u->bufferattr.tlength = (uint32_t) -1; > + > + pa_thread_mq_init_thread_mainloop(&u->thread_mq, m->core->mainloop, pa_mainloop_get_api(u->thread_mainloop)); > + > + /* Create sink */ > + pa_sink_new_data_init(&sink_data); > + sink_data.driver = __FILE__; > + sink_data.module = m; > + > + default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server); I would prefer "tunnel-sink" as the prefix instead of "tunnel-sink-new". The reason is that the module name is supposed to be "module-tunnel-sink-new" only temporarily, until it's ready to replace the original module. It's easy to forget to change this string when doing the module renaming, and I don't think it's important to see from the sink name which version of the module is being used. > + sink_name = pa_modargs_get_value(ma, "sink_name", default_sink_name); > + > + pa_sink_new_data_set_name(&sink_data, sink_name); > + pa_sink_new_data_set_sample_spec(&sink_data, &ss); > + pa_sink_new_data_set_channel_map(&sink_data, &map); > + > + pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "sound"); > + pa_proplist_setf(sink_data.proplist, > + PA_PROP_DEVICE_DESCRIPTION, > + _("Tunnel to %s/%s"), > + remote_server, > + pa_strempty(u->remote_sink_name)); > + > + if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) { > + pa_log("Invalid properties"); > + pa_sink_new_data_done(&sink_data); > + goto fail; > + } > + /* TODO: check PA_SINK_LATENCY + PA_SINK_DYNAMIC_LATENCY */ What do you mean by this comment? -- Tanu