[client] audio: rework audiodevs to be pull model from a common buffer

This commit is contained in:
Geoffrey McRae 2022-01-18 09:02:44 +11:00
parent aad65c1cab
commit b334f22223
4 changed files with 128 additions and 104 deletions

View File

@ -26,7 +26,6 @@
#include <math.h> #include <math.h>
#include "common/debug.h" #include "common/debug.h"
#include "common/ringbuffer.h"
#include "common/stringutils.h" #include "common/stringutils.h"
#include "common/util.h" #include "common/util.h"
@ -50,11 +49,11 @@ struct PipeWire
struct pw_stream * stream; struct pw_stream * stream;
struct spa_io_rate_match * rateMatch; struct spa_io_rate_match * rateMatch;
int channels; int channels;
int sampleRate; int sampleRate;
int stride; int stride;
LG_AudioPullFn pullFn;
RingBuffer buffer;
StreamState state; StreamState state;
} }
playback; playback;
@ -63,10 +62,10 @@ struct PipeWire
{ {
struct pw_stream * stream; struct pw_stream * stream;
int channels; int channels;
int sampleRate; int sampleRate;
int stride; int stride;
void (*dataFn)(uint8_t * data, size_t size); LG_AudioPushFn pushFn;
bool active; bool active;
} }
@ -90,19 +89,6 @@ static void pipewire_onPlaybackProcess(void * userdata)
{ {
struct pw_buffer * pbuf; struct pw_buffer * pbuf;
if (!ringbuffer_getCount(pw.playback.buffer))
{
if (pw.playback.state == STREAM_STATE_FLUSHING)
{
pw_thread_loop_lock(pw.thread);
pw_stream_flush(pw.playback.stream, true);
pw.playback.state = STREAM_STATE_DRAINING;
pw_thread_loop_unlock(pw.thread);
}
return;
}
if (!(pbuf = pw_stream_dequeue_buffer(pw.playback.stream))) if (!(pbuf = pw_stream_dequeue_buffer(pw.playback.stream)))
{ {
DEBUG_WARN("out of buffers"); DEBUG_WARN("out of buffers");
@ -119,8 +105,24 @@ static void pipewire_onPlaybackProcess(void * userdata)
if (pw.playback.rateMatch && pw.playback.rateMatch->size > 0) if (pw.playback.rateMatch && pw.playback.rateMatch->size > 0)
frames = min(frames, pw.playback.rateMatch->size); frames = min(frames, pw.playback.rateMatch->size);
void * values = ringbuffer_consume(pw.playback.buffer, &frames); uint8_t * data;
memcpy(dst, values, frames * pw.playback.stride); frames = pw.playback.pullFn(&data, frames);
if (!frames)
{
if (pw.playback.state == STREAM_STATE_FLUSHING)
{
pw_thread_loop_lock(pw.thread);
pw_stream_flush(pw.playback.stream, true);
pw.playback.state = STREAM_STATE_DRAINING;
pw_thread_loop_unlock(pw.thread);
}
sbuf->datas[0].chunk->size = 0;
pw_stream_queue_buffer(pw.playback.stream, pbuf);
return;
}
memcpy(dst, data, frames * pw.playback.stride);
sbuf->datas[0].chunk->offset = 0; sbuf->datas[0].chunk->offset = 0;
sbuf->datas[0].chunk->stride = pw.playback.stride; sbuf->datas[0].chunk->stride = pw.playback.stride;
@ -197,11 +199,11 @@ static void pipewire_playbackStopStream(void)
pw_stream_destroy(pw.playback.stream); pw_stream_destroy(pw.playback.stream);
pw.playback.stream = NULL; pw.playback.stream = NULL;
pw.playback.rateMatch = NULL; pw.playback.rateMatch = NULL;
ringbuffer_free(&pw.playback.buffer);
pw_thread_loop_unlock(pw.thread); pw_thread_loop_unlock(pw.thread);
} }
static void pipewire_playbackStart(int channels, int sampleRate) static void pipewire_playbackSetup(int channels, int sampleRate,
LG_AudioPullFn pullFn)
{ {
const struct spa_pod * params[1]; const struct spa_pod * params[1];
uint8_t buffer[1024]; uint8_t buffer[1024];
@ -226,8 +228,7 @@ static void pipewire_playbackStart(int channels, int sampleRate)
pw.playback.channels = channels; pw.playback.channels = channels;
pw.playback.sampleRate = sampleRate; pw.playback.sampleRate = sampleRate;
pw.playback.stride = sizeof(uint16_t) * channels; pw.playback.stride = sizeof(uint16_t) * channels;
pw.playback.buffer = ringbuffer_new(bufferFrames, pw.playback.pullFn = pullFn;
channels * sizeof(uint16_t));
int maxLatencyFrames = bufferFrames / 2; int maxLatencyFrames = bufferFrames / 2;
char maxLatency[32]; char maxLatency[32];
@ -277,28 +278,21 @@ static void pipewire_playbackStart(int channels, int sampleRate)
pw_thread_loop_unlock(pw.thread); pw_thread_loop_unlock(pw.thread);
} }
static void pipewire_playbackPlay(uint8_t * data, size_t size) static void pipewire_playbackStart(void)
{ {
if (!pw.playback.stream) if (!pw.playback.stream)
return; return;
ringbuffer_append(pw.playback.buffer, data, size / pw.playback.stride);
if (pw.playback.state != STREAM_STATE_ACTIVE && if (pw.playback.state != STREAM_STATE_ACTIVE &&
pw.playback.state != STREAM_STATE_RESTARTING) pw.playback.state != STREAM_STATE_RESTARTING)
{ {
pw_thread_loop_lock(pw.thread); pw_thread_loop_lock(pw.thread);
switch (pw.playback.state) { switch (pw.playback.state)
{
case STREAM_STATE_INACTIVE: case STREAM_STATE_INACTIVE:
// Don't start playback until the buffer is sufficiently full to avoid pw_stream_set_active(pw.playback.stream, true);
// glitches pw.playback.state = STREAM_STATE_ACTIVE;
if (ringbuffer_getCount(pw.playback.buffer) >=
ringbuffer_getLength(pw.playback.buffer) / 4)
{
pw_stream_set_active(pw.playback.stream, true);
pw.playback.state = STREAM_STATE_ACTIVE;
}
break; break;
case STREAM_STATE_FLUSHING: case STREAM_STATE_FLUSHING:
@ -370,14 +364,9 @@ static void pipewire_playbackMute(bool mute)
pw_thread_loop_unlock(pw.thread); pw_thread_loop_unlock(pw.thread);
} }
static uint64_t pipewire_playbackLatency(void) static size_t pipewire_playbackLatency(void)
{ {
const int frames = ringbuffer_getCount(pw.playback.buffer); return 0;
if (frames == 0)
return 0;
// TODO: we should really include the hw latency here too
return (uint64_t)pw.playback.sampleRate * 1000ULL / frames;
} }
static void pipewire_recordStopStream(void) static void pipewire_recordStopStream(void)
@ -408,17 +397,17 @@ static void pipewire_onRecordProcess(void * userdata)
return; return;
dst += sbuf->datas[0].chunk->offset; dst += sbuf->datas[0].chunk->offset;
pw.record.dataFn(dst, pw.record.pushFn(dst,
min( min(
sbuf->datas[0].chunk->size, sbuf->datas[0].chunk->size,
sbuf->datas[0].maxsize - sbuf->datas[0].chunk->offset) sbuf->datas[0].maxsize - sbuf->datas[0].chunk->offset) / pw.record.stride
); );
pw_stream_queue_buffer(pw.record.stream, pbuf); pw_stream_queue_buffer(pw.record.stream, pbuf);
} }
static void pipewire_recordStart(int channels, int sampleRate, static void pipewire_recordStart(int channels, int sampleRate,
void (*dataFn)(uint8_t * data, size_t size)) LG_AudioPushFn pushFn)
{ {
const struct spa_pod * params[1]; const struct spa_pod * params[1];
uint8_t buffer[1024]; uint8_t buffer[1024];
@ -439,7 +428,7 @@ static void pipewire_recordStart(int channels, int sampleRate,
pw.record.channels = channels; pw.record.channels = channels;
pw.record.sampleRate = sampleRate; pw.record.sampleRate = sampleRate;
pw.record.stride = sizeof(uint16_t) * channels; pw.record.stride = sizeof(uint16_t) * channels;
pw.record.dataFn = dataFn; pw.record.pushFn = pushFn;
pw_thread_loop_lock(pw.thread); pw_thread_loop_lock(pw.thread);
pw.record.stream = pw_stream_new_simple( pw.record.stream = pw_stream_new_simple(
@ -526,7 +515,6 @@ static void pipewire_free(void)
pw.loop = NULL; pw.loop = NULL;
pw.thread = NULL; pw.thread = NULL;
ringbuffer_free(&pw.playback.buffer);
pw_deinit(); pw_deinit();
} }
@ -537,8 +525,8 @@ struct LG_AudioDevOps LGAD_PipeWire =
.free = pipewire_free, .free = pipewire_free,
.playback = .playback =
{ {
.setup = pipewire_playbackSetup,
.start = pipewire_playbackStart, .start = pipewire_playbackStart,
.play = pipewire_playbackPlay,
.stop = pipewire_playbackStop, .stop = pipewire_playbackStop,
.volume = pipewire_playbackVolume, .volume = pipewire_playbackVolume,
.mute = pipewire_playbackMute, .mute = pipewire_playbackMute,

View File

@ -25,7 +25,6 @@
#include <math.h> #include <math.h>
#include "common/debug.h" #include "common/debug.h"
#include "common/ringbuffer.h"
struct PulseAudio struct PulseAudio
{ {
@ -42,7 +41,7 @@ struct PulseAudio
int sinkSampleRate; int sinkSampleRate;
int sinkChannels; int sinkChannels;
int sinkStride; int sinkStride;
RingBuffer sinkBuffer; LG_AudioPullFn sinkPullFn;
}; };
static struct PulseAudio pa = {0}; static struct PulseAudio pa = {0};
@ -221,14 +220,14 @@ static void pulseaudio_free(void)
static void pulseaudio_write_cb(pa_stream * p, size_t nbytes, void * userdata) static void pulseaudio_write_cb(pa_stream * p, size_t nbytes, void * userdata)
{ {
uint8_t * dst; uint8_t * dst, * src;
pa_stream_begin_write(p, (void **)&dst, &nbytes); pa_stream_begin_write(p, (void **)&dst, &nbytes);
int frames = nbytes / pa.sinkStride; int frames = nbytes / pa.sinkStride;
void * values = ringbuffer_consume(pa.sinkBuffer, &frames); frames = pa.sinkPullFn(&src, frames);
memcpy(dst, values, frames * pa.sinkStride); memcpy(dst, src, frames * pa.sinkStride);
pa_stream_write(p, dst, frames * pa.sinkStride, NULL, 0, PA_SEEK_RELATIVE); pa_stream_write(p, dst, frames * pa.sinkStride, NULL, 0, PA_SEEK_RELATIVE);
} }
@ -242,7 +241,8 @@ static void pulseaudio_overflow_cb(pa_stream * p, void * userdata)
DEBUG_WARN("Overflow"); DEBUG_WARN("Overflow");
} }
static void pulseaudio_start(int channels, int sampleRate) static void pulseaudio_setup(int channels, int sampleRate,
LG_AudioPullFn pullFn)
{ {
if (pa.sink && pa.sinkChannels == channels && pa.sinkSampleRate == sampleRate) if (pa.sink && pa.sinkChannels == channels && pa.sinkSampleRate == sampleRate)
return; return;
@ -281,27 +281,22 @@ static void pulseaudio_start(int channels, int sampleRate)
NULL, NULL); NULL, NULL);
pa.sinkStride = channels * sizeof(uint16_t); pa.sinkStride = channels * sizeof(uint16_t);
pa.sinkPullFn = pullFn;
pa.sinkStart = attribs.tlength / pa.sinkStride; pa.sinkStart = attribs.tlength / pa.sinkStride;
pa.sinkBuffer = ringbuffer_new(pa.sinkStart * 2, pa.sinkStride);
pa.sinkCorked = true; pa.sinkCorked = true;
pa_threaded_mainloop_unlock(pa.loop); pa_threaded_mainloop_unlock(pa.loop);
} }
static void pulseaudio_play(uint8_t * data, size_t size) static void pulseaudio_start(void)
{ {
if (!pa.sink) if (!pa.sink)
return; return;
ringbuffer_append(pa.sinkBuffer, data, size / pa.sinkStride); pa_threaded_mainloop_lock(pa.loop);
pa_stream_cork(pa.sink, 0, NULL, NULL);
if (pa.sinkCorked && ringbuffer_getCount(pa.sinkBuffer) >= pa.sinkStart) pa.sinkCorked = false;
{ pa_threaded_mainloop_unlock(pa.loop);
pa_threaded_mainloop_lock(pa.loop);
pa_stream_cork(pa.sink, 0, NULL, NULL);
pa.sinkCorked = false;
pa_threaded_mainloop_unlock(pa.loop);
}
} }
static void pulseaudio_stop(void) static void pulseaudio_stop(void)
@ -348,8 +343,8 @@ struct LG_AudioDevOps LGAD_PulseAudio =
.free = pulseaudio_free, .free = pulseaudio_free,
.playback = .playback =
{ {
.setup = pulseaudio_setup,
.start = pulseaudio_start, .start = pulseaudio_start,
.play = pulseaudio_play,
.stop = pulseaudio_stop, .stop = pulseaudio_stop,
.volume = pulseaudio_volume, .volume = pulseaudio_volume,
.mute = pulseaudio_mute .mute = pulseaudio_mute

View File

@ -25,6 +25,9 @@
#include <stdint.h> #include <stdint.h>
#include <stddef.h> #include <stddef.h>
typedef int (*LG_AudioPullFn)(uint8_t ** data, int frames);
typedef void (*LG_AudioPushFn)(uint8_t * data, int frames);
struct LG_AudioDevOps struct LG_AudioDevOps
{ {
/* internal name of the audio for debugging */ /* internal name of the audio for debugging */
@ -41,15 +44,13 @@ struct LG_AudioDevOps
struct struct
{ {
/* start the playback audio stream /* setup the stream for playback but don't start it yet
* Note: currently SPICE only supports S16 samples so always assume so * Note: currently SPICE only supports S16 samples so always assume so
*/ */
void (*start)(int channels, int sampleRate); void (*setup)(int channels, int sampleRate, LG_AudioPullFn pullFn);
/* called for each packet of output audio to play /* called when playback is about to start */
* Note: size is the size of data in bytes, not frames/samples void (*start)(void);
*/
void (*play)(uint8_t * data, size_t size);
/* called when SPICE reports the audio stream has stopped */ /* called when SPICE reports the audio stream has stopped */
void (*stop)(void); void (*stop)(void);
@ -70,8 +71,7 @@ struct LG_AudioDevOps
/* start the record stream /* start the record stream
* Note: currently SPICE only supports S16 samples so always assume so * Note: currently SPICE only supports S16 samples so always assume so
*/ */
void (*start)(int channels, int sampleRate, void (*start)(int channels, int sampleRate, LG_AudioPushFn pushFn);
void (*dataFn)(uint8_t * data, size_t size));
/* called when SPICE reports the audio stream has stopped */ /* called when SPICE reports the audio stream has stopped */
void (*stop)(void); void (*stop)(void);

View File

@ -22,6 +22,7 @@
#include "main.h" #include "main.h"
#include "common/array.h" #include "common/array.h"
#include "common/util.h" #include "common/util.h"
#include "common/ringbuffer.h"
#include "dynamic/audiodev.h" #include "dynamic/audiodev.h"
@ -33,10 +34,14 @@ typedef struct
struct struct
{ {
bool started; bool setup;
int volumeChannels; bool started;
uint16_t volume[8]; int volumeChannels;
bool mute; uint16_t volume[8];
bool mute;
int sampleRate;
int stride;
RingBuffer buffer;
LG_Lock lock; LG_Lock lock;
RingBuffer timings; RingBuffer timings;
@ -50,6 +55,7 @@ typedef struct
int volumeChannels; int volumeChannels;
uint16_t volume[8]; uint16_t volume[8];
bool mute; bool mute;
int stride;
uint32_t time; uint32_t time;
} }
record; record;
@ -101,6 +107,18 @@ static const char * audioGraphFormatFn(const char * name,
return title; return title;
} }
static int playbackPullFrames(uint8_t ** data, int frames)
{
LG_LOCK(audio.playback.lock);
if (audio.playback.buffer)
*data = ringbuffer_consume(audio.playback.buffer, &frames);
else
frames = 0;
LG_UNLOCK(audio.playback.lock);
return frames;
}
void audio_playbackStart(int channels, int sampleRate, PSAudioFormat format, void audio_playbackStart(int channels, int sampleRate, PSAudioFormat format,
uint32_t time) uint32_t time)
{ {
@ -110,7 +128,7 @@ void audio_playbackStart(int channels, int sampleRate, PSAudioFormat format,
static int lastChannels = 0; static int lastChannels = 0;
static int lastSampleRate = 0; static int lastSampleRate = 0;
if (audio.playback.started) if (audio.playback.setup)
{ {
if (channels != lastChannels || sampleRate != lastSampleRate) if (channels != lastChannels || sampleRate != lastSampleRate)
audio.audioDev->playback.stop(); audio.audioDev->playback.stop();
@ -120,11 +138,18 @@ void audio_playbackStart(int channels, int sampleRate, PSAudioFormat format,
LG_LOCK(audio.playback.lock); LG_LOCK(audio.playback.lock);
const int bufferFrames = sampleRate / 10;
audio.playback.buffer = ringbuffer_new(bufferFrames,
channels * sizeof(uint16_t));
lastChannels = channels; lastChannels = channels;
lastSampleRate = sampleRate; lastSampleRate = sampleRate;
audio.playback.started = true;
audio.audioDev->playback.start(channels, sampleRate); audio.playback.sampleRate = sampleRate;
audio.playback.stride = channels * sizeof(uint16_t);
audio.playback.setup = true;
audio.audioDev->playback.setup(channels, sampleRate, playbackPullFrames);
// if a volume level was stored, set it before we return // if a volume level was stored, set it before we return
if (audio.playback.volumeChannels) if (audio.playback.volumeChannels)
@ -137,12 +162,9 @@ void audio_playbackStart(int channels, int sampleRate, PSAudioFormat format,
audio.audioDev->playback.mute(audio.playback.mute); audio.audioDev->playback.mute(audio.playback.mute);
// if the audio dev can report it's latency setup a timing graph // if the audio dev can report it's latency setup a timing graph
if (audio.audioDev->playback.latency) audio.playback.timings = ringbuffer_new(1200, sizeof(float));
{ audio.playback.graph = app_registerGraph("PLAYBACK",
audio.playback.timings = ringbuffer_new(1200, sizeof(float)); audio.playback.timings, 0.0f, 100.0f, audioGraphFormatFn);
audio.playback.graph = app_registerGraph("PLAYBACK",
audio.playback.timings, 0.0f, 100.0f, audioGraphFormatFn);
}
LG_UNLOCK(audio.playback.lock); LG_UNLOCK(audio.playback.lock);
} }
@ -155,7 +177,9 @@ void audio_playbackStop(void)
LG_LOCK(audio.playback.lock); LG_LOCK(audio.playback.lock);
audio.audioDev->playback.stop(); audio.audioDev->playback.stop();
audio.playback.setup = false;
audio.playback.started = false; audio.playback.started = false;
ringbuffer_free(&audio.playback.buffer);
if (audio.playback.timings) if (audio.playback.timings)
{ {
@ -176,7 +200,7 @@ void audio_playbackVolume(int channels, const uint16_t volume[])
memcpy(audio.playback.volume, volume, sizeof(uint16_t) * channels); memcpy(audio.playback.volume, volume, sizeof(uint16_t) * channels);
audio.playback.volumeChannels = channels; audio.playback.volumeChannels = channels;
if (!audio.playback.started) if (!audio.playback.setup)
return; return;
audio.audioDev->playback.volume(channels, volume); audio.audioDev->playback.volume(channels, volume);
@ -189,7 +213,7 @@ void audio_playbackMute(bool mute)
// store the value so we can restore it if the stream is restarted // store the value so we can restore it if the stream is restarted
audio.playback.mute = mute; audio.playback.mute = mute;
if (!audio.playback.started) if (!audio.playback.setup)
return; return;
audio.audioDev->playback.mute(mute); audio.audioDev->playback.mute(mute);
@ -197,10 +221,20 @@ void audio_playbackMute(bool mute)
void audio_playbackData(uint8_t * data, size_t size) void audio_playbackData(uint8_t * data, size_t size)
{ {
if (!audio.audioDev || !audio.playback.started) if (!audio.audioDev || !audio.playback.setup)
return; return;
audio.audioDev->playback.play(data, size); const int frames = size / audio.playback.stride;
ringbuffer_append(audio.playback.buffer, data, frames);
// don't start playback until the buffer is sifficiently full to avoid
// glitches
if (!audio.playback.started && ringbuffer_getCount(audio.playback.buffer) >=
ringbuffer_getLength(audio.playback.buffer) / 4)
{
audio.playback.started = true;
audio.audioDev->playback.start();
}
} }
bool audio_supportsRecord(void) bool audio_supportsRecord(void)
@ -208,9 +242,9 @@ bool audio_supportsRecord(void)
return audio.audioDev && audio.audioDev->record.start; return audio.audioDev && audio.audioDev->record.start;
} }
static void recordData(uint8_t * data, size_t size) static void recordPushFrames(uint8_t * data, int frames)
{ {
purespice_writeAudio(data, size, 0); purespice_writeAudio(data, frames * audio.record.stride, 0);
} }
void audio_recordStart(int channels, int sampleRate, PSAudioFormat format) void audio_recordStart(int channels, int sampleRate, PSAudioFormat format)
@ -232,8 +266,9 @@ void audio_recordStart(int channels, int sampleRate, PSAudioFormat format)
lastChannels = channels; lastChannels = channels;
lastSampleRate = sampleRate; lastSampleRate = sampleRate;
audio.record.started = true; audio.record.started = true;
audio.record.stride = channels * sizeof(uint16_t);
audio.audioDev->record.start(channels, sampleRate, recordData); audio.audioDev->record.start(channels, sampleRate, recordPushFrames);
// if a volume level was stored, set it before we return // if a volume level was stored, set it before we return
if (audio.record.volumeChannels) if (audio.record.volumeChannels)
@ -287,15 +322,21 @@ void audio_recordMute(bool mute)
void audio_tick(unsigned long long tickCount) void audio_tick(unsigned long long tickCount)
{ {
LG_LOCK(audio.playback.lock); LG_LOCK(audio.playback.lock);
if (!audio.playback.timings) if (!audio.playback.buffer)
{ {
LG_UNLOCK(audio.playback.lock); LG_UNLOCK(audio.playback.lock);
return; return;
} }
const uint64_t latency = audio.audioDev->playback.latency(); int frames = ringbuffer_getCount(audio.playback.buffer);
const float flatency = latency > 0 ? (float)latency / 1000.0f : 0.0f; if (audio.audioDev->playback.latency)
ringbuffer_push(audio.playback.timings, &flatency); frames += audio.audioDev->playback.latency();
const float latency = frames > 0
? audio.playback.sampleRate / (float)frames
: 0.0f;
ringbuffer_push(audio.playback.timings, &latency);
LG_UNLOCK(audio.playback.lock); LG_UNLOCK(audio.playback.lock);