From: Martin Blanchard <tchaik@xxxxxxx> This patch switch the packet-buffer to use core memory pool instead of manually allocating the room required for storing TCP/UDP packets. Packets are now stored using pa_memchunk instead of internal struct. Quite a few malloc saved compare to previous design. --- src/modules/raop/raop-client.c | 261 ++++++++++++++++++---------------- src/modules/raop/raop-packet-buffer.c | 187 +++++++++--------------- src/modules/raop/raop-packet-buffer.h | 17 +-- src/modules/raop/raop-sink.c | 4 +- 4 files changed, 216 insertions(+), 253 deletions(-) diff --git a/src/modules/raop/raop-client.c b/src/modules/raop/raop-client.c index d5d636d..4e73d12 100644 --- a/src/modules/raop/raop-client.c +++ b/src/modules/raop/raop-client.c @@ -65,6 +65,8 @@ #define FRAMES_PER_TCP_PACKET 4096 #define FRAMES_PER_UDP_PACKET 352 +#define RTX_BUFFERING_SECONDS 4 + #define DEFAULT_TCP_AUDIO_PORT 6000 #define DEFAULT_UDP_AUDIO_PORT 6000 #define DEFAULT_UDP_CONTROL_PORT 6001 @@ -323,92 +325,102 @@ static size_t write_AAC_data(uint8_t *packet, const size_t max, uint8_t *raw, si return size; } -static size_t build_tcp_audio_packet(pa_raop_client *c, uint8_t *raw, size_t *index, size_t *length, uint32_t **packet) { - const size_t max = sizeof(tcp_audio_header) + 8 + 16384; +static size_t build_tcp_audio_packet(pa_raop_client *c, pa_memchunk *block, pa_memchunk *packet) { + const size_t head = sizeof(tcp_audio_header); uint32_t *buffer = NULL; - size_t size, head; + uint8_t *raw = NULL; + size_t length, size; - *packet = NULL; - if (!(buffer = pa_xmalloc0(max))) - return 0; + raw = pa_memblock_acquire(block->memblock); + buffer = pa_memblock_acquire(packet->memblock); + buffer += packet->index / sizeof(uint32_t); + raw += block->index; - size = head = sizeof(tcp_audio_header); + c->seq++; memcpy(buffer, tcp_audio_header, sizeof(tcp_audio_header)); buffer[1] |= htonl((uint32_t) c->seq); buffer[2] = htonl(c->rtptime); buffer[3] = htonl(c->ssrc); + length = block->length; + size = sizeof(tcp_audio_header); if (c->codec == PA_RAOP_CODEC_PCM) - size += write_PCM_data(((uint8_t *) buffer + head), max - head, raw, length); + size += write_PCM_data(((uint8_t *) buffer + head), packet->length - head, raw, &length); else if (c->codec == PA_RAOP_CODEC_ALAC) - size += write_ALAC_data(((uint8_t *) buffer + head), max - head, raw, length, false); + size += write_ALAC_data(((uint8_t *) buffer + head), packet->length - head, raw, &length, false); else - size += write_AAC_data(((uint8_t *) buffer + head), max - head, raw, length); - c->rtptime += *length / 4; + size += write_AAC_data(((uint8_t *) buffer + head), packet->length - head, raw, &length); + c->rtptime += length / 4; + + pa_memblock_release(block->memblock); buffer[0] |= htonl((uint32_t) size - 4); if (c->encryption == PA_RAOP_ENCRYPTION_RSA) pa_raop_aes_encrypt(c->secret, (uint8_t *) buffer + head, size - head); - *packet = buffer; + pa_memblock_release(packet->memblock); + packet->length = size; + return size; } static ssize_t send_tcp_audio_packet(pa_raop_client *c, pa_memchunk *block, size_t offset) { - static uint32_t * packet = NULL; - static size_t size, sent; + static int write_type = 0; + const size_t max = sizeof(tcp_audio_header) + 8 + 16384; + pa_memchunk *packet = NULL; + uint8_t *buffer = NULL; double progress = 0.0; - size_t index, length; - uint8_t *raw = NULL; - ssize_t written; + ssize_t written = -1; + size_t done = 0; + + if (!(packet = pa_raop_packet_buffer_get(c->pbuf, c->seq, max))) + return -1; - if (!packet) { - index = block->index; - length = block->length; - raw = pa_memblock_acquire(block->memblock); + if (packet->length <= 0) { + pa_assert(block->index == offset); - pa_assert(raw); - pa_assert(index == offset); - pa_assert(length > 0); + if (!(packet = pa_raop_packet_buffer_get(c->pbuf, c->seq + 1, max))) + return -1; - size = build_tcp_audio_packet(c, raw, &index, &length, &packet); - sent = 0; + packet->index = 0; + packet->length = max; + if (!build_tcp_audio_packet(c, block, packet)) + return -1; } - written = -1; - if (packet != NULL && size > 0) - written = pa_write(c->tcp_sfd, packet + sent, size - sent, NULL); - if (block->index == offset) - c->seq++; - if (sent == 0) - pa_memblock_release(block->memblock); + buffer = pa_memblock_acquire(packet->memblock); + + pa_assert(buffer); + + buffer += packet->index; + if (buffer && packet->length > 0) + written = pa_write(c->tcp_sfd, buffer, packet->length, &write_type); if (written > 0) { - sent += written; - progress = (double) sent / (double) size; - index = (block->index + block->length) * progress; - length = (block->index + block->length) - index; - block->length = length; - block->index = index; - } + progress = (double) written / (double) packet->length; + packet->length -= written; + packet->index += written; - if ((size - sent) <= 0) { - pa_xfree(packet); - packet = NULL; + done = block->length * progress; + block->length -= done; + block->index += done; } + pa_memblock_release(packet->memblock); + return written; } -static size_t build_udp_audio_packet(pa_raop_client *c, uint8_t *raw, size_t *index, size_t *length, uint32_t **packet) { - const size_t max = sizeof(udp_audio_header) + 8 + 1408; +static size_t build_udp_audio_packet(pa_raop_client *c, pa_memchunk *block, pa_memchunk *packet) { + const size_t head = sizeof(udp_audio_header); uint32_t *buffer = NULL; - size_t size, head; + uint8_t *raw = NULL; + size_t length, size; - *packet = NULL; - if (!(buffer = pa_xmalloc0(max))) - return 0; + raw = pa_memblock_acquire(block->memblock); + buffer = pa_memblock_acquire(packet->memblock); + buffer += packet->index / sizeof(uint32_t); + raw += block->index; - size = head = sizeof(udp_audio_header); memcpy(buffer, udp_audio_header, sizeof(udp_audio_header)); if (c->is_first_packet) buffer[0] |= htonl((uint32_t) 0x80 << 16); @@ -416,75 +428,79 @@ static size_t build_udp_audio_packet(pa_raop_client *c, uint8_t *raw, size_t *in buffer[1] = htonl(c->rtptime); buffer[2] = htonl(c->ssrc); + length = block->length; + size = sizeof(udp_audio_header); if (c->codec == PA_RAOP_CODEC_PCM) - size += write_PCM_data(((uint8_t *) buffer + head), max - head, raw + *index, length); + size += write_PCM_data(((uint8_t *) buffer + head), packet->length - head, raw, &length); else if (c->codec == PA_RAOP_CODEC_ALAC) - size += write_ALAC_data(((uint8_t *) buffer + head), max - head, raw + *index, length, false); + size += write_ALAC_data(((uint8_t *) buffer + head), packet->length - head, raw, &length, false); else - size += write_AAC_data(((uint8_t *) buffer + head), max - head, raw + *index, length); - c->rtptime += *length / 4; + size += write_AAC_data(((uint8_t *) buffer + head), packet->length - head, raw, &length); + c->rtptime += length / 4; + c->seq++; + + pa_memblock_release(block->memblock); if (c->encryption == PA_RAOP_ENCRYPTION_RSA) pa_raop_aes_encrypt(c->secret, (uint8_t *) buffer + head, size - head); - *index += *length; - *length = 0; - /* It is meaningless to preseve the partial data -> */ - *packet = buffer; + pa_memblock_release(packet->memblock); + packet->length = size; + return size; } static ssize_t send_udp_audio_packet(pa_raop_client *c, pa_memchunk *block, size_t offset) { - uint32_t *packet = NULL; - size_t index, length, size; - uint8_t *raw = NULL; - ssize_t written; + const size_t max = sizeof(udp_audio_retrans_header) + sizeof(udp_audio_header) + 8 + 1408; + pa_memchunk *packet = NULL; + uint8_t *buffer = NULL; + ssize_t written = -1; - index = block->index; - length = block->length; - raw = pa_memblock_acquire(block->memblock); + /* UDP packet has to be sent at once ! */ + pa_assert(block->index == offset); - pa_assert(raw); - /* <- UDP packet has to be sent at once ! */ - pa_assert(index == offset); - pa_assert(length > 0); + if (!(packet = pa_raop_packet_buffer_get(c->pbuf, c->seq, max))) + return -1; - written = -1; - size = build_udp_audio_packet(c, raw, &index, &length, &packet); - if (packet != NULL && size > 0) - written = pa_write(c->udp_sfd, packet, size, NULL); - if (written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - pa_log_debug("Discarding UDP (audio, seq=%d) packet due to EAGAIN (%s)", c->seq, pa_cstrerror(errno)); - c->seq++; + packet->length = max; + packet->index = sizeof(udp_audio_retrans_header); + if (!build_udp_audio_packet(c, block, packet)) + return -1; - /* Store packet for resending in the packet buffer (UDP). */ - pa_raop_pb_write_packet(c->pbuf, c->seq, raw + block->index, block->length); - pa_xfree(packet); + buffer = pa_memblock_acquire(packet->memblock); - pa_memblock_release(block->memblock); - block->length = length; - block->index = index; + pa_assert(buffer); + + buffer += packet->index; + if (buffer && packet->length > 0) + written = pa_write(c->udp_sfd, buffer, packet->length, NULL); + if (written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + pa_log_debug("Discarding UDP (audio, seq=%d) packet due to EAGAIN (%s)", c->seq, pa_cstrerror(errno)); + return (ssize_t) packet->length; + } + + pa_memblock_release(packet->memblock); + /* It is meaningless to preseve the partial data */ + block->index += block->length; + block->length = 0; - if (written < 0) - return (ssize_t) size; return written; } -static size_t rebuild_udp_audio_packet(pa_raop_client *c, uint16_t seq, uint32_t **packet) { +static size_t rebuild_udp_audio_packet(pa_raop_client *c, uint16_t seq, pa_memchunk *packet) { size_t size = sizeof(udp_audio_retrans_header); uint32_t *buffer = NULL; - uint8_t *data = NULL; - size += pa_raop_pb_read_packet(c->pbuf, seq, &data); - if (size == sizeof(udp_audio_retrans_header)) - return 0; - if (!(buffer = pa_xmalloc0(size))) - return 0; + buffer = pa_memblock_acquire(packet->memblock); memcpy(buffer, udp_audio_retrans_header, sizeof(udp_audio_retrans_header)); buffer[0] |= htonl((uint32_t) seq); + size += packet->length; + + pa_memblock_release(packet->memblock); + packet->length += sizeof(udp_audio_retrans_header); + packet->index -= sizeof(udp_audio_retrans_header); - *packet = buffer; return size; } @@ -493,20 +509,32 @@ static ssize_t resend_udp_audio_packets(pa_raop_client *c, uint16_t seq, uint16_ int i = 0; for (i = 0; i < nbp; i++) { - uint32_t * packet = NULL; - ssize_t written = 0; - size_t size = 0; + pa_memchunk *packet = NULL; + uint8_t *buffer = NULL; + ssize_t written = -1; + + if (!(packet = pa_raop_packet_buffer_get(c->pbuf, seq + i, 0))) + continue; + + if (packet->index > 0) { + if (!rebuild_udp_audio_packet(c, seq + i, packet)) + continue; + } + + pa_assert(packet->index == 0); + + buffer = pa_memblock_acquire(packet->memblock); - size = rebuild_udp_audio_packet(c, seq, &packet); - if (packet != NULL && size > 0) - written = pa_write(c->udp_cfd, packet, size, NULL); + pa_assert(buffer); + + if (buffer && packet->length > 0) + written = pa_write(c->udp_cfd, buffer, packet->length, NULL); if (written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - pa_log_debug("Discarding UDP (audio-restransmitted, seq=%d) packet due to EAGAIN", c->seq); + pa_log_debug("Discarding UDP (audio-restransmitted, seq=%d) packet due to EAGAIN", seq + i); continue; } - if (written > 0) - total += written; + total += written; } return total; @@ -557,17 +585,13 @@ static size_t handle_udp_control_packet(pa_raop_client *c, const uint8_t packet[ ssize_t written = 0; /* Control packets are 8 bytes long: */ - if (size != 8 || packet[0] != 0x80) { - pa_log_debug("Received an invalid control packet..."); + if (size != 8 || packet[0] != 0x80) return 1; - } seq = ntohs((uint16_t) packet[4]); nbp = ntohs((uint16_t) packet[6]); - if (nbp <= 0) { - pa_log_debug("Received an invalid control packet..."); + if (nbp <= 0) return 1; - } /* The market bit is always set (see rfc3550 for packet structure) ! */ payload = packet[1] ^ 0x80; @@ -631,10 +655,8 @@ static size_t handle_udp_timing_packet(pa_raop_client *c, const uint8_t packet[] uint64_t rci = 0; /* Timing packets are 32 bytes long: 1 x 8 RTP header (no ssrc) + 3 x 8 NTP timestamps */ - if (size != 32 || packet[0] != 0x80) { - pa_log_debug("Received an invalid UDP timing packet..."); + if (size != 32 || packet[0] != 0x80) return 0; - } rci = timeval_to_ntp(pa_rtclock_get(&tv)); data = (uint32_t *) (packet + sizeof(udp_timming_header)); @@ -1061,6 +1083,8 @@ static void rtsp_stream_cb(pa_rtsp_client *rtsp, pa_rtsp_state_t state, pa_rtsp_ if (alt) pa_atoi(alt, &latency); + pa_raop_packet_buffer_reset(c->pbuf, c->seq); + pa_random(&ssrc, sizeof(ssrc)); c->is_first_packet = true; c->is_recording = true; @@ -1093,9 +1117,6 @@ static void rtsp_stream_cb(pa_rtsp_client *rtsp, pa_rtsp_state_t state, pa_rtsp_ c->is_recording = false; - if (c->pbuf) - pa_raop_pb_clear(c->pbuf); - if (c->tcp_sfd > 0) pa_close(c->tcp_sfd); c->tcp_sfd = -1; @@ -1124,9 +1145,6 @@ static void rtsp_stream_cb(pa_rtsp_client *rtsp, pa_rtsp_state_t state, pa_rtsp_ c->is_recording = false; - if (c->pbuf) - pa_raop_pb_clear(c->pbuf); - if (c->tcp_sfd > 0) pa_close(c->tcp_sfd); c->tcp_sfd = -1; @@ -1331,6 +1349,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot pa_parsed_address a; pa_sample_spec ss; + size_t size = 2; pa_assert(core); pa_assert(host); @@ -1369,6 +1388,8 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot c->secret = pa_raop_secret_new(); ss = core->default_sample_spec; + if (c->protocol == PA_RAOP_PROTOCOL_UDP) + size = RTX_BUFFERING_SECONDS * ss.rate / FRAMES_PER_UDP_PACKET; c->is_recording = false; c->is_first_packet = true; @@ -1376,7 +1397,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot c->sync_interval = ss.rate / FRAMES_PER_UDP_PACKET; c->sync_count = 0; - c->pbuf = pa_raop_pb_new(UDP_DEFAULT_PKT_BUF_SIZE); + c->pbuf = pa_raop_packet_buffer_new(c->core->mempool, size); return c; } @@ -1384,7 +1405,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot void pa_raop_client_free(pa_raop_client *c) { pa_assert(c); - pa_raop_pb_delete(c->pbuf); + pa_raop_packet_buffer_free(c->pbuf); pa_xfree(c->sid); pa_xfree(c->sci); @@ -1685,10 +1706,10 @@ void pa_raop_client_handle_oob_packet(pa_raop_client *c, const int fd, const uin if (c->protocol == PA_RAOP_PROTOCOL_UDP) { if (fd == c->udp_cfd) { - pa_log_debug("Received UDP control packet"); + pa_log_debug("Received UDP control packet..."); handle_udp_control_packet(c, packet, size); } else if (fd == c->udp_tfd) { - pa_log_debug("Received UDP timing packet"); + pa_log_debug("Received UDP timing packet..."); handle_udp_timing_packet(c, packet, size); } } diff --git a/src/modules/raop/raop-packet-buffer.c b/src/modules/raop/raop-packet-buffer.c index d9d8549..06e3125 100644 --- a/src/modules/raop/raop-packet-buffer.c +++ b/src/modules/raop/raop-packet-buffer.c @@ -20,152 +20,99 @@ USA. ***/ -#include <stdlib.h> -#include <limits.h> - #ifdef HAVE_CONFIG_H #include <config.h> #endif +#include <stdlib.h> +#include <limits.h> + +#include <pulse/xmalloc.h> + #include <pulsecore/core-error.h> -#include "raop-client.h" +#include <pulsecore/macro.h> #include "raop-packet-buffer.h" -/* FRAMES_PER_PACKET*2*2 + sizeof(udp_audio_header) + sizeof(ALAC header), unencoded */ -#define PACKET_SIZE_MAX (352*2*2 + 12 + 7) /* FIXME; hardcoded constant ! */ -/* Header room for packet retransmission header */ -#define RETRANS_HEADER_ROOM 4 - -/* Packet element */ -struct pa_raop_packet_element { - uint16_t seq_num; /* RTP sequence number (in host byte order) */ - ssize_t length; /* Actual packet length */ - /* Packet data including RTP header */ - uint8_t data[PACKET_SIZE_MAX + RETRANS_HEADER_ROOM]; -}; - -/* Buffer struct */ struct pa_raop_packet_buffer { - size_t size; /* max number of packets in buffer */ - size_t start; /* index of oldest packet */ - size_t count; /* number of packets in buffer */ - uint16_t first_seq_num; /* Sequence number of first packet in buffer */ - uint16_t latest_seq_num; /* Debug purpose */ - pa_raop_packet_element *packets; /* Packet element pointer */ + pa_memchunk *packets; + pa_mempool *mempool; + size_t size; + + uint16_t seq; + size_t pos; }; -pa_raop_packet_buffer *pa_raop_pb_new(size_t size) { - pa_raop_packet_buffer *pb = pa_xmalloc0(sizeof(*pb)); +pa_raop_packet_buffer *pa_raop_packet_buffer_new(pa_mempool *mempool, const size_t size) { + pa_raop_packet_buffer *pb = pa_xnew0(pa_raop_packet_buffer, 1); - pb->size = size; - pb->packets = (pa_raop_packet_element *) - pa_xmalloc(size * sizeof(pa_raop_packet_element)); + pa_assert(mempool); + pa_assert(size > 0); - pa_raop_pb_clear(pb); + pb->size = size; + pb->mempool = mempool; + pb->packets = pa_xnew0(pa_memchunk, size); + pb->seq = pb->pos = 0; return pb; } -void pa_raop_pb_clear(pa_raop_packet_buffer *pb) { - pb->start = 0; - pb->count = 0; - pb->first_seq_num = 0; - pb->latest_seq_num = 0; - memset(pb->packets, 0, pb->size * sizeof(pa_raop_packet_element)); -} - -void pa_raop_pb_delete(pa_raop_packet_buffer *pb) { - pa_xfree(pb->packets); - pa_xfree(pb); -} - -static int pb_is_full(pa_raop_packet_buffer *pb) { - return pb->count == pb->size; -} - -static int pb_is_empty(pa_raop_packet_buffer *pb) { - return pb->count == 0; -} - -static pa_raop_packet_element *pb_prepare_write(pa_raop_packet_buffer *pb, uint16_t seq) { - size_t end = (pb->start + pb->count) % pb->size; - pa_raop_packet_element *packet; - - /* Set first packet sequence number in buffer if buffer is empty */ - if (pb_is_empty(pb)) - pb->first_seq_num = seq; - else - pa_assert((uint16_t) (pb->latest_seq_num + 1) == seq); - - packet = &pb->packets[end]; - - if (pb_is_full(pb)) { - pb->start = (pb->start + 1) % pb->size; /* full, overwrite */ +void pa_raop_packet_buffer_free(pa_raop_packet_buffer *pb) { + size_t i; - /* Set first packet sequence number in buffer - to new start packet sequence number */ - pb->first_seq_num = pb->packets[pb->start].seq_num; - } else - ++ pb->count; + pa_assert(pb); - pb->latest_seq_num = seq; + for (i = 0; pb->packets && i < pb->size; i++) { + if (pb->packets[i].memblock) + pa_memblock_unref(pb->packets[i].memblock); + pa_memchunk_reset(&pb->packets[i]); + } - return packet; + pa_xfree(pb->packets); + pb->packets = NULL; + pa_xfree(pb); } -/* Write packet data to packet buffer */ -void pa_raop_pb_write_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, const uint8_t *packet_data, ssize_t packet_length) { - pa_raop_packet_element *packet; +void pa_raop_packet_buffer_reset(pa_raop_packet_buffer *pb, uint16_t seq) { + size_t i; pa_assert(pb); - pa_assert(packet_data); - pa_assert(packet_length <= PACKET_SIZE_MAX); - - packet = pb_prepare_write(pb, seq_num); - packet->seq_num = seq_num; - packet->length = packet_length + RETRANS_HEADER_ROOM; - - /* Insert RETRANS_HEADER_ROOM bytes in front of packet data, - for retransmission header */ - memset(packet->data, 0, RETRANS_HEADER_ROOM); - memcpy(packet->data + RETRANS_HEADER_ROOM, packet_data, packet_length); -} - -/* l < r?, considers wrapping */ -static bool seq_lt(uint16_t l, uint16_t r) { - return l - r > USHRT_MAX/2; + pa_assert(pb->packets); + + pb->pos = 0; + pb->seq = seq - 1; + for (i = 0; i < pb->size; i++) { + if (pb->packets[i].memblock) + pa_memblock_unref(pb->packets[i].memblock); + pa_memchunk_reset(&pb->packets[i]); + } } -/* Random access to packet from buffer by sequence number for (re-)sending. */ -ssize_t pa_raop_pb_read_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, uint8_t **packet_data) { - uint16_t index = 0; /* Index of requested packet */ - pa_raop_packet_element *packet; - - /* If the buffer is empty, there is no use in calculating indices */ - if (pb_is_empty(pb)) - return -1; - - /* If the requested packet is too old (seq_num below first seq number - in buffer) or too young (seq_num greater than current seq number), - do nothing and return */ - if (seq_lt(seq_num, pb->first_seq_num)) - return -1; +pa_memchunk *pa_raop_packet_buffer_get(pa_raop_packet_buffer *pb, uint16_t seq, const size_t size) { + pa_memchunk *packet = NULL; + size_t delta, i; - index = (uint16_t) (seq_num - pb->first_seq_num); - if (index >= pb->count) - return -1; - - /* Index of the requested packet in the buffer is calculated - using the first sequence number stored in the buffer. - The offset (seq_num - first_seq_num) is used to access the array. */ - packet = &pb->packets[(pb->start + index) % pb->size]; - - pa_assert(packet->data[RETRANS_HEADER_ROOM + 2] == (seq_num >> 8)); - pa_assert(packet->data[RETRANS_HEADER_ROOM + 3] == (seq_num & 0xff)); - pa_assert(packet_data); - - *packet_data = packet->data; + pa_assert(pb); + pa_assert(pb->packets); + pa_assert(seq > 0); + + if (seq == pb->seq) + packet = &pb->packets[pb->pos]; + else if (seq < pb->seq) { + delta = pb->seq - seq; + i = (pb->size + pb->pos - delta) % pb->size; + if (delta < pb->size) + packet = &pb->packets[i]; + } else { + i = (pb->pos + (seq - pb->seq)) % pb->size; + if (pb->packets[i].memblock) + pa_memblock_unref(pb->packets[i].memblock); + pa_memchunk_reset(&pb->packets[i]); + pb->packets[i].memblock = pa_memblock_new(pb->mempool, size); + packet = &pb->packets[i]; + pb->seq = seq; + pb->pos = i; + } - return packet->length; + return packet; } diff --git a/src/modules/raop/raop-packet-buffer.h b/src/modules/raop/raop-packet-buffer.h index 69a0ce1..b8c7617 100644 --- a/src/modules/raop/raop-packet-buffer.h +++ b/src/modules/raop/raop-packet-buffer.h @@ -23,19 +23,16 @@ USA. ***/ -struct pa_raop_packet_element; -typedef struct pa_raop_packet_element pa_raop_packet_element; +#include <pulsecore/memblock.h> +#include <pulsecore/memchunk.h> -struct pa_raop_packet_buffer; typedef struct pa_raop_packet_buffer pa_raop_packet_buffer; -/* Allocates a new circular packet buffer - size: Maximum number of packets to store */ -pa_raop_packet_buffer *pa_raop_pb_new(size_t size); -void pa_raop_pb_clear(pa_raop_packet_buffer *pb); -void pa_raop_pb_delete(pa_raop_packet_buffer *pb); +/* Allocates a new circular packet buffer, size: Maximum number of packets to store */ +pa_raop_packet_buffer *pa_raop_packet_buffer_new(pa_mempool *mempool, const size_t size); +void pa_raop_packet_buffer_free(pa_raop_packet_buffer *pb); -void pa_raop_pb_write_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, const uint8_t *packet_data, ssize_t packet_length); -ssize_t pa_raop_pb_read_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, uint8_t **packet_data); +void pa_raop_packet_buffer_reset(pa_raop_packet_buffer *pb, uint16_t seq); +pa_memchunk *pa_raop_packet_buffer_get(pa_raop_packet_buffer *pb, uint16_t seq, const size_t size); #endif diff --git a/src/modules/raop/raop-sink.c b/src/modules/raop/raop-sink.c index 22b1ccc..23989d1 100644 --- a/src/modules/raop/raop-sink.c +++ b/src/modules/raop/raop-sink.c @@ -333,7 +333,6 @@ static void thread_func(void *userdata) { unsigned int i, nbfds = 0; pa_usec_t now, estimated, intvl; uint64_t position; - ssize_t written; size_t index; int ret; @@ -399,8 +398,7 @@ static void thread_func(void *userdata) { pa_assert(u->memchunk.length > 0); index = u->memchunk.index; - written = pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset); - if (written < 0) { + if (pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) { if (errno == EINTR) { /* Just try again. */ pa_log_debug("Failed to write data to FIFO (EINTR), retrying"); -- 2.5.0