From: Martin Blanchard <tinram@xxxxxx> This patch adds an initial UDP protocol support for RAOP. It is based on Martin Blanchard's work (http://repo.or.cz/w/pulseaudio-raopUDP.git/shortlog/refs/heads/raop) which is inspired by Christophe Fergeau's work (https://github.com/zx2c4/pulseaudio-raop2). Matrin's modifications were edited by Hajime Fujita, so that it would support both TCP and UDP protocol in a single module. --- src/modules/raop/module-raop-sink.c | 312 ++++++++++++++- src/modules/raop/raop_client.c | 780 ++++++++++++++++++++++++++++++++++-- src/modules/raop/raop_client.h | 23 +- 3 files changed, 1073 insertions(+), 42 deletions(-) diff --git a/src/modules/raop/module-raop-sink.c b/src/modules/raop/module-raop-sink.c index 050441c..d9cbe7b 100644 --- a/src/modules/raop/module-raop-sink.c +++ b/src/modules/raop/module-raop-sink.c @@ -102,7 +102,6 @@ struct userdata { int32_t rate; pa_smoother *smoother; - int fd; int64_t offset; int64_t encoding_overhead; @@ -112,6 +111,13 @@ struct userdata { pa_raop_client *raop; size_t block_size; + + /* Members only for the TCP protocol */ + int fd; + + /* Members only for the UDP protocol */ + int control_fd; + int timing_fd; }; static const char* const valid_modargs[] = { @@ -129,7 +135,10 @@ static const char* const valid_modargs[] = { enum { SINK_MESSAGE_PASS_SOCKET = PA_SINK_MESSAGE_MAX, - SINK_MESSAGE_RIP_SOCKET + SINK_MESSAGE_RIP_SOCKET, + SINK_MESSAGE_SETUP, + SINK_MESSAGE_RECORD, + SINK_MESSAGE_DISCONNECTED, }; /* Forward declarations: */ @@ -177,7 +186,7 @@ static pa_usec_t sink_get_latency(const struct userdata *u) { return w > r ? w - r : 0; } -static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { +static int tcp_sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; switch (code) { @@ -272,6 +281,115 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +static int udp_sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u = PA_SINK(o)->userdata; + + switch (code) { + case PA_SINK_MESSAGE_SET_STATE: + switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { + case PA_SINK_SUSPENDED: + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + pa_log_debug("RAOP: SUSPENDED"); + pa_smoother_pause(u->smoother, pa_rtclock_now()); + + if (pa_raop_client_can_stream(u->raop)) { + /* Issue a TEARDOWN if we are still connected. */ + pa_raop_client_teardown(u->raop); + } + + break; + + case PA_SINK_IDLE: + pa_log_debug("RAOP: IDLE"); + /* Issue a flush if we're comming from running state. */ + if (u->sink->thread_info.state == PA_SINK_RUNNING) { + pa_rtpoll_set_timer_disabled(u->rtpoll); + pa_raop_client_flush(u->raop); + } + + break; + + case PA_SINK_RUNNING: + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) + pa_smoother_resume(u->smoother, pa_rtclock_now(), true); + pa_log_debug("RAOP: RUNNING"); + if (!pa_raop_client_can_stream(u->raop)) { + /* Connecting will trigger a RECORD */ + pa_raop_client_connect(u->raop); + } + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + ; + } + + break; + + case PA_SINK_MESSAGE_GET_LATENCY: { + pa_usec_t r = 0; + + if (pa_raop_client_can_stream(u->raop)) + r = sink_get_latency(u); + + *((pa_usec_t*) data) = r; + + return 0; + } + + case SINK_MESSAGE_SETUP: { + struct pollfd *pollfd; + + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 2); + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + + pollfd->fd = u->control_fd; + pollfd->events = POLLIN | POLLPRI; + pollfd->revents = 0; + pollfd++; + pollfd->fd = u->timing_fd; + pollfd->events = POLLIN | POLLPRI; + pollfd->revents = 0; + + u->control_fd = -1; + u->timing_fd = -1; + return 0; + } + + case SINK_MESSAGE_RECORD: { + pa_rtpoll_set_timer_relative(u->rtpoll, pa_bytes_to_usec(u->block_size, &u->sink->sample_spec)); + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + /* Our stream has been suspended so we just flush it... */ + pa_raop_client_flush(u->raop); + } + + return 0; + } + + case SINK_MESSAGE_DISCONNECTED: { + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + pa_rtpoll_set_timer_disabled(u->rtpoll); + if (u->rtpoll_item) + pa_rtpoll_item_free(u->rtpoll_item); + u->rtpoll_item = NULL; + } else { + /* Question: is this valid here: or should we do some sort of: + * return pa_sink_process_msg(PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL); ?? */ + pa_module_unload_request(u->module, true); + } + + u->control_fd = -1; + u->timing_fd = -1; + return 0; + } + } + + return pa_sink_process_msg(o, code, data, offset, chunk); +} + static void sink_set_volume_cb(pa_sink *s) { struct userdata *u = s->userdata; pa_cvolume hw; @@ -317,8 +435,45 @@ static void sink_set_mute_cb(pa_sink *s) { } } -static void thread_func(void *userdata) { +static void raop_setup_cb(int control_fd, int timing_fd, void *userdata) { + struct userdata *u = userdata; + + pa_assert(control_fd); + pa_assert(timing_fd); + pa_assert(u); + + u->control_fd = control_fd; + u->timing_fd = timing_fd; + + pa_log_debug("Connection authenticated, syncing with server..."); + + pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_SETUP, NULL, 0, NULL, NULL); +} + +static void raop_record_cb(void *userdata) { + struct userdata *u = userdata; + + pa_assert(u); + + /* Set the initial volume. */ + sink_set_volume_cb(u->sink); + + pa_log_debug("Synchronization done, pushing job to IO thread..."); + + pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_RECORD, NULL, 0, NULL, NULL); +} + +static void raop_disconnected_cb(void *userdata) { struct userdata *u = userdata; + + pa_assert(u); + + pa_log_debug("Connection closed, informing IO thread..."); + + pa_asyncmsgq_post(u->thread_mq.inq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_DISCONNECTED, NULL, 0, NULL, NULL); +} + +static void tcp_thread_func(struct userdata *u) { int write_type = 0; pa_memchunk silence; uint32_t silence_overhead = 0; @@ -326,7 +481,7 @@ static void thread_func(void *userdata) { pa_assert(u); - pa_log_debug("Thread starting up"); + pa_log_debug("TCP thread starting up"); pa_thread_mq_install(&u->thread_mq); @@ -509,7 +664,130 @@ fail: finish: if (silence.memblock) pa_memblock_unref(silence.memblock); - pa_log_debug("Thread shutting down"); + pa_log_debug("TCP thread shutting down"); +} + +static void udp_thread_func(struct userdata *u) { + pa_assert(u); + + pa_log_debug("UDP thread starting up"); + + pa_thread_mq_install(&u->thread_mq); + pa_smoother_set_time_offset(u->smoother, pa_rtclock_now()); + + for (;;) { + pa_usec_t estimated; + int32_t overhead = 0; + ssize_t written = 0; + size_t length = 0; + int rv = 0; + + if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { + if (u->sink->thread_info.rewind_requested) + pa_sink_process_rewind(u->sink, 0); + } + + /* Polling (audio data + control socket + timing socket). */ + if ((rv = pa_rtpoll_run(u->rtpoll, true)) < 0) + goto fail; + else if (rv == 0) + goto finish; + + if (!pa_rtpoll_timer_elapsed(u->rtpoll)) { + struct pollfd *pollfd; + uint8_t packet[32]; + ssize_t read; + + if (u->rtpoll_item) { + pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); + + /* Event on the control socket ?? */ + if (pollfd->revents & POLLIN) { + pollfd->revents = 0; + pa_log_debug("Received control packet."); + read = pa_read(pollfd->fd, packet, sizeof(packet), NULL); + pa_raop_client_handle_control_packet(u->raop, packet, read); + } + + pollfd++; + + /* Event on the timing port ?? */ + if (pollfd->revents & POLLIN) { + pollfd->revents = 0; + pa_log_debug("Received timing packet."); + read = pa_read(pollfd->fd, packet, sizeof(packet), NULL); + pa_raop_client_handle_timing_packet(u->raop, packet, read); + } + } + + continue; + } + + if (!pa_raop_client_can_stream(u->raop)) + continue; + if (u->sink->thread_info.state != PA_SINK_RUNNING) + continue; + + if (u->encoded_memchunk.length <= 0) { + if (u->encoded_memchunk.memblock != NULL) + pa_memblock_unref(u->encoded_memchunk.memblock); + + if (u->raw_memchunk.length <= 0) { + if (u->raw_memchunk.memblock) + pa_memblock_unref(u->raw_memchunk.memblock); + pa_memchunk_reset(&u->raw_memchunk); + + /* Grab unencoded audio data from PulseAudio. */ + pa_sink_render_full(u->sink, u->block_size, &u->raw_memchunk); + } + + pa_assert(u->raw_memchunk.length > 0); + + length = u->raw_memchunk.length; + pa_raop_client_encode_sample(u->raop, &u->raw_memchunk, &u->encoded_memchunk); + u->encoding_ratio = (double) u->encoded_memchunk.length / (double) (length - u->raw_memchunk.length); + overhead = u->encoded_memchunk.length - (length - u->raw_memchunk.length); + } + + pa_assert(u->encoded_memchunk.length > 0); + + pa_raop_client_send_audio_packet(u->raop, &u->encoded_memchunk, &written); + pa_rtpoll_set_timer_relative(u->rtpoll, pa_bytes_to_usec(u->block_size, &u->sink->sample_spec)); + + pa_assert(written != 0); + + if (written < 0) { + pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); + goto fail; + } + + u->offset += written; + u->encoding_overhead += overhead; + + estimated = pa_bytes_to_usec(u->offset - u->encoding_overhead, &u->sink->sample_spec); + pa_smoother_put(u->smoother, pa_rtclock_now(), estimated); + } + +fail: + /* If this was no regular exit, continue processing messages until PA_MESSAGE_SHUTDOWN. */ + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); + pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: + pa_log_debug("UDP thread shutting down"); +} + +static void thread_func(void *userdata) { + struct userdata *u = userdata; + + if (u->protocol == RAOP_TCP) + tcp_thread_func(u); + else if (u->protocol == RAOP_UDP) + udp_thread_func(u); + else + pa_assert(false); + + return; } int pa__init(pa_module *m) { @@ -590,8 +868,6 @@ int pa__init(pa_module *m) { } else if (pa_streq(protocol, "UDP")) { u->protocol = RAOP_UDP; - pa_log("UDP is not supported in this module version."); - goto fail; } else { pa_log("Unsupported protocol argument given: %s", protocol); goto fail; @@ -606,6 +882,8 @@ int pa__init(pa_module *m) { pa_proplist_sets(data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "music"); pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "RAOP sink '%s'", server); + /* RAOP discover module will eventually overwrite sink_name and others + (PA_UPDATE_REPLACE). */ if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) { pa_log("Invalid properties"); pa_sink_new_data_done(&data); @@ -621,7 +899,10 @@ int pa__init(pa_module *m) { goto fail; } - u->sink->parent.process_msg = sink_process_msg; + if (u->protocol == RAOP_TCP) + u->sink->parent.process_msg = tcp_sink_process_msg; + else + u->sink->parent.process_msg = udp_sink_process_msg; u->sink->userdata = u; pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); @@ -630,7 +911,7 @@ int pa__init(pa_module *m) { pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); - if (!(u->raop = pa_raop_client_new(u->core, server, u->protocol))) { + if (!(u->raop = pa_raop_client_new(u->core, server, u->protocol, ss))) { pa_log("Failed to connect to server."); goto fail; } @@ -638,6 +919,17 @@ int pa__init(pa_module *m) { pa_raop_client_set_callback(u->raop, on_connection, u); pa_raop_client_set_closed_callback(u->raop, on_close, u); + if (u->protocol == RAOP_UDP) { + /* The number of frames per blocks is not negotiable... */ + pa_raop_client_get_blocks_size(u->raop, &u->block_size); + u->block_size *= pa_frame_size(&ss); + pa_sink_set_max_request(u->sink, u->block_size); + + pa_raop_client_set_setup_callback(u->raop, raop_setup_cb, u); + pa_raop_client_set_record_callback(u->raop, raop_record_cb, u); + pa_raop_client_set_disconnected_callback(u->raop, raop_disconnected_cb, u); + } + if (!(u->thread = pa_thread_new("raop-sink", thread_func, u))) { pa_log("Failed to create thread."); goto fail; diff --git a/src/modules/raop/raop_client.c b/src/modules/raop/raop_client.c index f0be62d..1ca8421 100644 --- a/src/modules/raop/raop_client.c +++ b/src/modules/raop/raop_client.c @@ -41,10 +41,13 @@ #include <openssl/engine.h> #include <pulse/xmalloc.h> +#include <pulse/timeval.h> #include <pulsecore/core-error.h> +#include <pulsecore/core-rtclock.h> #include <pulsecore/core-util.h> #include <pulsecore/iochannel.h> +#include <pulsecore/arpa-inet.h> #include <pulsecore/socket-util.h> #include <pulsecore/log.h> #include <pulsecore/parseaddr.h> @@ -56,6 +59,7 @@ #include "rtsp_client.h" #include "base64.h" +#define FRAMES_PER_PACKET 352 #define AES_CHUNKSIZE 16 #define JACK_STATUS_DISCONNECTED 0 @@ -69,6 +73,19 @@ #define VOLUME_MAX 0 #define RAOP_PORT 5000 +#define DEFAULT_RAOP_PORT 5000 +#define DEFAULT_AUDIO_PORT 6000 +#define DEFAULT_CONTROL_PORT 6001 +#define DEFAULT_TIMING_PORT 6002 + +typedef enum { + PAYLOAD_TIMING_REQUEST = 0x52, + PAYLOAD_TIMING_RESPONSE = 0x53, + PAYLOAD_SYNCHRONIZATION = 0x54, + PAYLOAD_RETRANSMIT_REQUEST = 0x55, + PAYLOAD_RETRANSMIT_REPLY = 0x56, + PAYLOAD_AUDIO_DATA = 0x60 +} pa_raop_payload_type; struct pa_raop_client { pa_core *core; @@ -87,16 +104,79 @@ struct pa_raop_client { uint8_t aes_nv[AES_CHUNKSIZE]; /* Next vector for aes-cbc */ uint8_t aes_key[AES_CHUNKSIZE]; /* Key for aes-cbc */ - pa_socket_client *sc; - int fd; - uint16_t seq; uint32_t rtptime; + /* Members only for the TCP protocol */ + pa_socket_client *sc; + int fd; + pa_raop_client_cb_t callback; void *userdata; pa_raop_client_closed_cb_t closed_callback; void *closed_userdata; + + /* Members only for the UDP protocol */ + uint16_t control_port; + uint16_t timing_port; + + int stream_fd; + int control_fd; + int timing_fd; + + uint32_t ssrc; + + bool first_packet; + uint32_t sync_interval; + uint32_t sync_count; + + pa_raop_client_setup_cb_t setup_callback; + void *setup_userdata; + + pa_raop_client_record_cb_t record_callback; + void *record_userdata; + + pa_raop_client_disconnected_cb_t disconnected_callback; + void *disconnected_userdata; +}; + +/* Timming packet header (8x8): + * [0] RTP v2: 0x80, + * [1] Payload type: 0x53 | marker bit: 0x80, + * [2,3] Sequence number: 0x0007, + * [4,7] Timestamp: 0x00000000 (unused). */ +static const uint8_t timming_header[8] = { + 0x80, 0xd3, 0x00, 0x07, + 0x00, 0x00, 0x00, 0x00 +}; + +/* Sync packet header (8x8): + * [0] RTP v2: 0x80, + * [1] Payload type: 0x54 | marker bit: 0x80, + * [2,3] Sequence number: 0x0007, + * [4,7] Timestamp: 0x00000000 (to be set). */ +static const uint8_t sync_header[8] = { + 0x80, 0xd4, 0x00, 0x07, + 0x00, 0x00, 0x00, 0x00 +}; + +static const uint8_t tcp_audio_header[16] = { + 0x24, 0x00, 0x00, 0x00, + 0xF0, 0xFF, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, +}; + +/* Audio packet header (12x8): + * [0] RTP v2: 0x80, + * [1] Payload type: 0x60, + * [2,3] Sequence number: 0x0000 (to be set), + * [4,7] Timestamp: 0x00000000 (to be set), + * [8,12] SSRC: 0x00000000 (to be set).*/ +static const uint8_t udp_audio_header[12] = { + 0x80, 0x60, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00 }; /** @@ -231,6 +311,212 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata c->callback(c->fd, c->userdata); } +static inline uint64_t timeval_to_ntp(struct timeval *tv) { + uint64_t ntp = 0; + + /* Converting micro seconds to a fraction. */ + ntp = (uint64_t) tv->tv_usec * UINT32_MAX / PA_USEC_PER_SEC; + /* Moving reference from 1 Jan 1970 to 1 Jan 1900 (seconds). */ + ntp |= (uint64_t) (tv->tv_sec + 0x83aa7e80) << 32; + + return ntp; +} + +static int bind_socket(pa_raop_client *c, int fd, uint16_t port) { + struct sockaddr_in sa4; +#ifdef HAVE_IPV6 + struct sockaddr_in6 sa6; +#endif + struct sockaddr *sa; + socklen_t salen; + int one = 1; + + pa_zero(sa4); +#ifdef HAVE_IPV6 + pa_zero(sa6); +#endif + if (inet_pton(AF_INET, pa_rtsp_localip(c->rtsp), &sa4.sin_addr) > 0) { + sa4.sin_family = AF_INET; + sa4.sin_port = htons(port); + sa = (struct sockaddr *) &sa4; + salen = sizeof(sa4); +#ifdef HAVE_IPV6 + } else if (inet_pton(AF_INET6, pa_rtsp_localip(c->rtsp), &sa6.sin6_addr) > 0) { + sa6.sin6_family = AF_INET6; + sa6.sin6_port = htons(port); + sa = (struct sockaddr*) &sa6; + salen = sizeof(sa6); +#endif + } else { + pa_log("Invalid destination '%s'", c->host); + goto fail; + } + + one = 1; +#ifdef SO_TIMESTAMP + if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) { + pa_log("setsockopt(SO_TIMESTAMP) failed: %s", pa_cstrerror(errno)); + goto fail; + } +#else + pa_log("SO_TIMESTAMP unsupported on this platform"); + goto fail; +#endif + + one = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) { + pa_log("setsockopt(SO_REUSEADDR) failed: %s", pa_cstrerror(errno)); + goto fail; + } + + if (bind(fd, sa, salen) < 0) { + pa_log("bind() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + return fd; + +fail: + return -1; +} + +static int open_udp_socket(pa_raop_client *c, uint16_t port, uint16_t should_bind) { + struct sockaddr_in sa4; +#ifdef HAVE_IPV6 + struct sockaddr_in6 sa6; +#endif + struct sockaddr *sa; + socklen_t salen; + sa_family_t af; + int fd = -1; + + pa_zero(sa4); +#ifdef HAVE_IPV6 + pa_zero(sa6); +#endif + if (inet_pton(AF_INET, c->host, &sa4.sin_addr) > 0) { + sa4.sin_family = af = AF_INET; + sa4.sin_port = htons(port); + sa = (struct sockaddr *) &sa4; + salen = sizeof(sa4); +#ifdef HAVE_IPV6 + } else if (inet_pton(AF_INET6, c->host, &sa6.sin6_addr) > 0) { + sa6.sin6_family = af = AF_INET6; + sa6.sin6_port = htons(port); + sa = (struct sockaddr *) &sa6; + salen = sizeof(sa6); +#endif + } else { + pa_log("Invalid destination '%s'", c->host); + goto fail; + } + + if ((fd = pa_socket_cloexec(af, SOCK_DGRAM, 0)) < 0) { + pa_log("socket() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + /* If the socket queue is full, let's drop packets */ + pa_make_udp_socket_low_delay(fd); + pa_make_fd_nonblock(fd); + + if (should_bind) { + if (bind_socket(c, fd, port) < 0) { + pa_log("bind_socket() failed: %s", pa_cstrerror(errno)); + goto fail; + } + } + + if (connect(fd, sa, salen) < 0) { + pa_log("connect() failed: %s", pa_cstrerror(errno)); + goto fail; + } + + pa_log_debug("Connected to %s on port %d (SOCK_DGRAM)", c->host, port); + return fd; + +fail: + if (fd >= 0) + pa_close(fd); + + return -1; +} + +static int send_timing_packet(pa_raop_client *c, const uint32_t data[6], uint64_t received) { + uint32_t packet[8]; + struct timeval tv; + ssize_t written = 0; + uint64_t trs = 0; + int rv = 1; + + memcpy(packet, timming_header, sizeof(timming_header)); + /* Copying originate timestamp from the incoming request packet. */ + packet[2] = data[4]; + packet[3] = data[5]; + /* Set the receive timestamp to reception time. */ + packet[4] = htonl(received >> 32); + packet[5] = htonl(received & 0xffffffff); + /* Set the transmit timestamp to current time. */ + trs = timeval_to_ntp(pa_rtclock_get(&tv)); + packet[6] = htonl(trs >> 32); + packet[7] = htonl(trs & 0xffffffff); + + written = pa_loop_write(c->timing_fd, packet, sizeof(packet), NULL); + if (written == sizeof(packet)) + rv = 0; + + return rv; +} + +static int send_sync_packet(pa_raop_client *c, uint32_t stamp) { + const uint32_t delay = 88200; + uint32_t packet[5]; + struct timeval tv; + ssize_t written = 0; + uint64_t trs = 0; + int rv = 1; + + memcpy(packet, sync_header, sizeof(sync_header)); + if (c->first_packet) + packet[0] |= 0x10; + stamp -= delay; + packet[1] = htonl(stamp); + /* Set the transmited timestamp to current time. */ + trs = timeval_to_ntp(pa_rtclock_get(&tv)); + packet[2] = htonl(trs >> 32); + packet[3] = htonl(trs & 0xffffffff); + stamp += delay; + packet[4] = htonl(stamp); + + written = pa_loop_write(c->control_fd, packet, sizeof(packet), NULL); + if (written == sizeof(packet)) + rv = 0; + + return rv; +} + +static int send_audio_packet(pa_raop_client *c, uint32_t *buffer, size_t size, ssize_t *written) { + ssize_t length = 0; + int rv = 1; + + memcpy(buffer, udp_audio_header, sizeof(udp_audio_header)); + if (c->first_packet) + buffer[0] |= ((uint32_t) 0x80) << 8; + buffer[0] |= htonl((uint32_t) c->seq); + buffer[1] = htonl(c->rtptime); + buffer[2] = htonl(c->ssrc); + + length = pa_loop_write(c->stream_fd, buffer, size, NULL); + if (length == ((ssize_t) size)) + rv = 0; + + if (written != NULL) + *written = length; + c->seq++; + + return rv; +} + static void do_rtsp_announce(pa_raop_client *c) { int i; uint8_t rsakey[512]; @@ -271,7 +557,7 @@ static void do_rtsp_announce(pa_raop_client *c) { "a=rsaaeskey:%s\r\n" "a=aesiv:%s\r\n", c->sid, ip, c->host, - 4096, + c->protocol == RAOP_TCP ? 4096 : FRAMES_PER_PACKET, key, iv); pa_rtsp_announce(c->rtsp, sdp); pa_xfree(key); @@ -280,8 +566,8 @@ static void do_rtsp_announce(pa_raop_client *c) { pa_xfree(sdp); } -static void rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist *headers, void *userdata) { - pa_raop_client *c = userdata; +static void tcp_rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist* headers, void *userdata) { + pa_raop_client* c = userdata; pa_assert(c); pa_assert(rtsp); pa_assert(rtsp == c->rtsp); @@ -361,7 +647,7 @@ static void rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist *he pa_rtsp_client_free(c->rtsp); c->rtsp = NULL; if (c->fd > 0) { - /* We do not close the fd, we leave it to the closed callback to do that. */ + /* We do not close the fd, we leave it to the closed callback to do that */ c->fd = -1; } if (c->sc) { @@ -375,14 +661,264 @@ static void rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist *he } } +static void udp_rtsp_cb(pa_rtsp_client *rtsp, pa_rtsp_state state, pa_headerlist *headers, void *userdata) { + pa_raop_client *c = userdata; + + pa_assert(c); + pa_assert(rtsp); + pa_assert(rtsp == c->rtsp); + + switch (state) { + case STATE_CONNECT: { + uint16_t rand; + char *sac; + + /* Set the Apple-Challenge key */ + pa_random(&rand, sizeof(rand)); + pa_base64_encode(&rand, AES_CHUNKSIZE, &sac); + rtrimchar(sac, '='); + pa_rtsp_add_header(c->rtsp, "Apple-Challenge", sac); + + pa_rtsp_options(c->rtsp); + + pa_xfree(sac); + break; + } + + case STATE_OPTIONS: { + pa_log_debug("RAOP: OPTIONS"); + + pa_rtsp_remove_header(c->rtsp, "Apple-Challenge"); + do_rtsp_announce(c); + break; + } + + case STATE_ANNOUNCE: { + char *trs; + + trs = pa_sprintf_malloc("RTP/AVP/UDP;unicast;interleaved=0-1;mode=record;control_port=%d;timing_port=%d", + c->control_port, + c->timing_port); + + pa_rtsp_setup(c->rtsp, trs); + + pa_xfree(trs); + break; + } + + case STATE_SETUP: { + uint32_t stream_port = DEFAULT_AUDIO_PORT; + char *ajs, *trs, *token, *pc; + char delimiters[] = ";"; + const char *token_state = NULL; + uint32_t port = 0; + + pa_log_debug("RAOP: SETUP"); + + ajs = pa_xstrdup(pa_headerlist_gets(headers, "Audio-Jack-Status")); + trs = pa_xstrdup(pa_headerlist_gets(headers, "Transport")); + + if (ajs) { + c->jack_type = JACK_TYPE_ANALOG; + c->jack_status = JACK_STATUS_DISCONNECTED; + + while ((token = pa_split(ajs, delimiters, &token_state))) { + if ((pc = strstr(token, "="))) { + *pc = 0; + if (pa_streq(token, "type") && pa_streq(pc + 1, "digital")) + c->jack_type = JACK_TYPE_DIGITAL; + } else { + if (pa_streq(token, "connected")) + c->jack_status = JACK_STATUS_CONNECTED; + } + pa_xfree(token); + } + + } else { + pa_log_warn("Audio-Jack-Status missing"); + } + + token_state = NULL; + + if (trs) { + /* Now parse out the server port component of the response. */ + while ((token = pa_split(trs, delimiters, &token_state))) { + if ((pc = strstr(token, "="))) { + *pc = 0; + if (pa_streq(token, "control_port")) { + port = 0; + pa_atou(pc + 1, &port); + c->control_port = port; + } + if (pa_streq(token, "timing_port")) { + port = 0; + pa_atou(pc + 1, &port); + c->timing_port = port; + } + *pc = '='; + } + pa_xfree(token); + } + } else { + pa_log_warn("Transport missing"); + } + + pa_xfree(ajs); + pa_xfree(trs); + + stream_port = pa_rtsp_serverport(c->rtsp); + if (stream_port == 0) + goto error; + if (c->control_port == 0 || c->timing_port == 0) + goto error; + + pa_log_debug("Using server_port=%d, control_port=%d & timing_port=%d", + stream_port, + c->control_port, + c->timing_port); + + c->stream_fd = open_udp_socket(c, stream_port, 0); + c->control_fd = open_udp_socket(c, c->control_port, 1); + c->timing_fd = open_udp_socket(c, c->timing_port, 1); + + if (c->stream_fd <= 0) + goto error; + if (c->control_fd <= 0 || c->timing_fd <= 0) + goto error; + + c->setup_callback(c->control_fd, c->timing_fd, c->setup_userdata); + pa_rtsp_record(c->rtsp, &c->seq, &c->rtptime); + + break; + + error: + if (c->stream_fd > 0) { + pa_close(c->stream_fd); + c->stream_fd = -1; + } + if (c->control_fd > 0) { + pa_close(c->control_fd); + c->control_fd = -1; + } + if (c->timing_fd > 0) { + pa_close(c->timing_fd); + c->timing_fd = -1; + } + + pa_rtsp_client_free(c->rtsp); + c->rtsp = NULL; + + c->control_port = DEFAULT_CONTROL_PORT; + c->timing_port = DEFAULT_TIMING_PORT; + + pa_log_error("aborting RTSP setup, failed creating required sockets"); + + break; + } + + case STATE_RECORD: { + int32_t latency = 0; + uint32_t rand; + char *alt; + + pa_log_debug("RAOP: RECORD"); + + alt = pa_xstrdup(pa_headerlist_gets(headers, "Audio-Latency")); + /* Generate a random synchronization source identifier from this session. */ + pa_random(&rand, sizeof(rand)); + c->ssrc = rand; + + if (alt) + pa_atoi(alt, &latency); + + c->first_packet = true; + c->sync_count = 0; + + c->record_callback(c->setup_userdata); + + pa_xfree(alt); + break; + } + + case STATE_SET_PARAMETER: { + pa_log_debug("RAOP: SET_PARAMETER"); + + break; + } + + case STATE_FLUSH: { + pa_log_debug("RAOP: FLUSHED"); + + break; + } + + case STATE_TEARDOWN: { + pa_log_debug("RAOP: TEARDOWN"); + + pa_rtsp_disconnect(c->rtsp); + + if (c->stream_fd > 0) { + pa_close(c->stream_fd); + c->stream_fd = -1; + } + if (c->control_fd > 0) { + pa_close(c->control_fd); + c->control_fd = -1; + } + if (c->timing_fd > 0) { + pa_close(c->timing_fd); + c->timing_fd = -1; + } + + pa_log_debug("RTSP control channel closed"); + + pa_rtsp_client_free(c->rtsp); + pa_xfree(c->sid); + c->rtsp = NULL; + c->sid = NULL; + + break; + } + + case STATE_DISCONNECTED: { + pa_assert(c->disconnected_callback); + pa_assert(c->rtsp); + + if (c->stream_fd > 0) { + pa_close(c->stream_fd); + c->stream_fd = -1; + } + if (c->control_fd > 0) { + pa_close(c->control_fd); + c->control_fd = -1; + } + if (c->timing_fd > 0) { + pa_close(c->timing_fd); + c->timing_fd = -1; + } + + pa_log_debug("RTSP control channel closed"); + + pa_rtsp_client_free(c->rtsp); + pa_xfree(c->sid); + c->rtsp = NULL; + c->sid = NULL; + + c->disconnected_callback(c->disconnected_userdata); + + break; + } + } +} + pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, - pa_raop_protocol_t protocol) { + pa_raop_protocol_t protocol, + pa_sample_spec spec) { pa_parsed_address a; pa_raop_client *c = pa_xnew0(pa_raop_client, 1); pa_assert(core); pa_assert(host); - pa_assert(protocol == RAOP_TCP); if (pa_parse_address(host, &a) < 0 || a.type == PA_PARSED_ADDRESS_UNIX) return NULL; @@ -390,16 +926,29 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, c->core = core; c->fd = -1; c->protocol = protocol; + c->stream_fd = -1; + c->control_fd = -1; + c->timing_fd = -1; + + c->control_port = DEFAULT_CONTROL_PORT; + c->timing_port = DEFAULT_TIMING_PORT; c->host = pa_xstrdup(a.path_or_host); if (a.port) c->port = a.port; else - c->port = RAOP_PORT; + c->port = DEFAULT_RAOP_PORT; - if (pa_raop_client_connect(c)) { - pa_raop_client_free(c); - return NULL; + c->first_packet = true; + /* Packet sync interval should be around 1s. */ + c->sync_interval = spec.rate / FRAMES_PER_PACKET; + c->sync_count = 0; + + if (c->protocol == RAOP_TCP) { + if (pa_raop_client_connect(c)) { + pa_raop_client_free(c); + return NULL; + } } return c; @@ -431,7 +980,10 @@ int pa_raop_client_connect(pa_raop_client *c) { return 0; } - c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, "iTunes/4.6 (Macintosh; U; PPC Mac OS X 10.3)"); + if (c->protocol == RAOP_TCP) + c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, "iTunes/4.6 (Macintosh; U; PPC Mac OS X 10.3)"); + else + c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, "iTunes/7.6.2 (Windows; N;)"); /* Initialise the AES encryption system. */ pa_random(c->aes_iv, sizeof(c->aes_iv)); @@ -445,20 +997,157 @@ int pa_raop_client_connect(pa_raop_client *c) { sci = pa_sprintf_malloc("%08x%08x",rand_data.b, rand_data.c); pa_rtsp_add_header(c->rtsp, "Client-Instance", sci); pa_xfree(sci); - pa_rtsp_set_callback(c->rtsp, rtsp_cb, c); + if (c->protocol == RAOP_TCP) + pa_rtsp_set_callback(c->rtsp, tcp_rtsp_cb, c); + else + pa_rtsp_set_callback(c->rtsp, udp_rtsp_cb, c); return pa_rtsp_connect(c->rtsp); } int pa_raop_client_flush(pa_raop_client *c) { + int rv = 0; pa_assert(c); - pa_rtsp_flush(c->rtsp, c->seq, c->rtptime); - return 0; + if (c->rtsp != NULL) { + rv = pa_rtsp_flush(c->rtsp, c->seq, c->rtptime); + c->sync_count = -1; + } + + return rv; +} + +int pa_raop_client_teardown(pa_raop_client *c) { + int rv = 0; + + pa_assert(c); + + /* This should be followed by a STATE_DISCONNECTED event + * which will take care of cleaning up everything */ + if (c->rtsp != NULL) + rv = pa_rtsp_teardown(c->rtsp); + + return rv; +} + +int pa_raop_client_can_stream(pa_raop_client *c) { + int rv = 0; + + pa_assert(c); + + if (c->stream_fd > 0) + rv = 1; + + return rv; +} + +int pa_raop_client_handle_timing_packet(pa_raop_client *c, const uint8_t packet[], ssize_t size) { + const uint32_t * data = NULL; + uint8_t payload = 0; + struct timeval tv; + uint64_t rci = 0; + int rv = 0; + + pa_assert(c); + pa_assert(packet); + + /* Timing packets are 32 bytes long: 1 x 8 RTP header (no ssrc) + 3 x 8 NTP timestamps. */ + if (size != 32 || packet[0] != 0x80) + { + pa_log_debug("Received an invalid timing packet."); + return 1; + } + + data = (uint32_t *) (packet + sizeof(timming_header)); + rci = timeval_to_ntp(pa_rtclock_get(&tv)); + /* The market bit is always set (see rfc3550 for packet structure) ! */ + payload = packet[1] ^ 0x80; + switch (payload) { + case PAYLOAD_TIMING_REQUEST: + rv = send_timing_packet(c, data, rci); + break; + case PAYLOAD_TIMING_RESPONSE: + default: + pa_log_debug("Got an unexpected payload type on timing channel !"); + return 1; + } + + return rv; +} + +int pa_raop_client_handle_control_packet(pa_raop_client *c, const uint8_t packet[], ssize_t size) { + uint8_t payload = 0; + int rv = 0; + + pa_assert(c); + pa_assert(packet); + + if (size != 20 || packet[0] != 0x80) + { + pa_log_debug("Received an invalid control packet."); + return 1; + } + + /* The market bit is always set (see rfc3550 for packet structure) ! */ + payload = packet[1] ^ 0x80; + switch (payload) { + case PAYLOAD_RETRANSMIT_REQUEST: + /* Packet retransmission not implemented yet... */ + /* rv = ... */ + break; + case PAYLOAD_RETRANSMIT_REPLY: + default: + pa_log_debug("Got an unexpected payload type on control channel !"); + return 1; + } + + return rv; +} + +int pa_raop_client_get_blocks_size(pa_raop_client *c, size_t *size) { + int rv = 0; + + pa_assert(c); + pa_assert(size); + + *size = FRAMES_PER_PACKET; + + return rv; +} + +int pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, ssize_t *written) { + uint32_t *buf = NULL; + ssize_t length = 0; + int rv = 0; + + pa_assert(c); + pa_assert(block); + + /* Sync RTP & NTP timestamp if required. */ + if (c->first_packet || c->sync_count >= c->sync_interval) { + send_sync_packet(c, c->rtptime); + c->sync_count = 0; + } else { + c->sync_count++; + } + + buf = (uint32_t *) pa_memblock_acquire(block->memblock); + if (buf != NULL && block->length > 0) + rv = send_audio_packet(c, buf + block->index, block->length, &length); + pa_memblock_release(block->memblock); + block->index += length; + block->length -= length; + if (written != NULL) + *written = length; + + if (c->first_packet) + c->first_packet = false; + + return rv; } int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume) { - int rv; + int rv = 0; double db; char *param; @@ -473,7 +1162,8 @@ int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume) { param = pa_sprintf_malloc("volume: %0.6f\r\n", db); /* We just hit and hope, cannot wait for the callback. */ - rv = pa_rtsp_setparameter(c->rtsp, param); + if (c->rtsp != NULL) + rv = pa_rtsp_setparameter(c->rtsp, param); pa_xfree(param); return rv; @@ -488,21 +1178,23 @@ int pa_raop_client_encode_sample(pa_raop_client *c, pa_memchunk *raw, pa_memchun uint8_t *b, *p; uint32_t bsize; size_t length; - static uint8_t header[] = { - 0x24, 0x00, 0x00, 0x00, - 0xF0, 0xFF, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, - }; - int header_size = sizeof(header); + const uint8_t *header; + int header_size; pa_assert(c); - pa_assert(c->fd > 0); pa_assert(raw); pa_assert(raw->memblock); pa_assert(raw->length > 0); pa_assert(encoded); + if (c->protocol == RAOP_TCP) { + header = tcp_audio_header; + header_size = sizeof(tcp_audio_header); + } else { + header = udp_audio_header; + header_size = sizeof(udp_audio_header); + } + /* We have to send 4 byte chunks */ bsize = (int)(raw->length / 4); length = bsize * 4; @@ -543,13 +1235,17 @@ int pa_raop_client_encode_sample(pa_raop_client *c, pa_memchunk *raw, pa_memchun raw->index += 4; raw->length -= 4; } + if (c->protocol == RAOP_UDP) + c->rtptime += (ibp - p) / 4; pa_memblock_release(raw->memblock); encoded->length = header_size + size; - /* Store the length (endian swapped: make this better). */ - len = size + header_size - 4; - *(b + 2) = len >> 8; - *(b + 3) = len & 0xff; + if (c->protocol == RAOP_TCP) { + /* Store the length (endian swapped: make this better). */ + len = size + header_size - 4; + *(b + 2) = len >> 8; + *(b + 3) = len & 0xff; + } /* Encrypt our data. */ aes_encrypt(c, (b + header_size), size); @@ -573,3 +1269,25 @@ void pa_raop_client_set_closed_callback(pa_raop_client *c, pa_raop_client_closed c->closed_callback = callback; c->closed_userdata = userdata; } + + +void pa_raop_client_set_setup_callback(pa_raop_client *c, pa_raop_client_setup_cb_t callback, void *userdata) { + pa_assert(c); + + c->setup_callback = callback; + c->setup_userdata = userdata; +} + +void pa_raop_client_set_record_callback(pa_raop_client *c, pa_raop_client_record_cb_t callback, void *userdata) { + pa_assert(c); + + c->record_callback = callback; + c->record_userdata = userdata; +} + +void pa_raop_client_set_disconnected_callback(pa_raop_client *c, pa_raop_client_disconnected_cb_t callback, void *userdata) { + pa_assert(c); + + c->disconnected_callback = callback; + c->disconnected_userdata = userdata; +} diff --git a/src/modules/raop/raop_client.h b/src/modules/raop/raop_client.h index 3ecf8d9..97847d9 100644 --- a/src/modules/raop/raop_client.h +++ b/src/modules/raop/raop_client.h @@ -21,8 +21,11 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. ***/ +#include <pulse/sample.h> +#include <pulse/volume.h> #include <pulsecore/core.h> +#include <pulsecore/memchunk.h> typedef enum pa_raop_protocol { RAOP_TCP, @@ -31,19 +34,37 @@ typedef enum pa_raop_protocol { typedef struct pa_raop_client pa_raop_client; -pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol); +pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol, pa_sample_spec spec); void pa_raop_client_free(pa_raop_client *c); int pa_raop_client_connect(pa_raop_client *c); int pa_raop_client_flush(pa_raop_client *c); +int pa_raop_client_teardown(pa_raop_client *c); +int pa_raop_client_can_stream(pa_raop_client *c); int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume); int pa_raop_client_encode_sample(pa_raop_client *c, pa_memchunk *raw, pa_memchunk *encoded); +int pa_raop_client_handle_timing_packet(pa_raop_client *c, const uint8_t packet +[], ssize_t size); +int pa_raop_client_handle_control_packet(pa_raop_client *c, const uint8_t packet[], ssize_t size); +int pa_raop_client_get_blocks_size(pa_raop_client *c, size_t *size); +int pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, ssize_t *written); + typedef void (*pa_raop_client_cb_t)(int fd, void *userdata); void pa_raop_client_set_callback(pa_raop_client *c, pa_raop_client_cb_t callback, void *userdata); typedef void (*pa_raop_client_closed_cb_t)(void *userdata); void pa_raop_client_set_closed_callback(pa_raop_client *c, pa_raop_client_closed_cb_t callback, void *userdata); + +typedef void (*pa_raop_client_setup_cb_t)(int control_fd, int timing_fd, void *userdata); +void pa_raop_client_set_setup_callback(pa_raop_client *c, pa_raop_client_setup_cb_t callback, void *userdata); + +typedef void (*pa_raop_client_record_cb_t)(void *userdata); +void pa_raop_client_set_record_callback(pa_raop_client *c, pa_raop_client_record_cb_t callback, void *userdata); + +typedef void (*pa_raop_client_disconnected_cb_t)(void *userdata); +void pa_raop_client_set_disconnected_callback(pa_raop_client *c, pa_raop_client_disconnected_cb_t callback, void *userdata); + #endif -- 1.8.1.2