[common] ringbuffer: add unbounded mode

In unbounded mode, the read and write pointers are free to move
independently of one another. This is useful where the input and output
streams are progressing at the same rate on average, and we want to keep
the latency stable in the event than an underrun or overrun occurs.

If an underrun occurs (i.e., there is not enough data in the buffer to
satisfy a read request), the missing values with be filled with zeros. When
the writer catches up, the same number of values will be skipped from the
input.

If an overrun occurs (i.e., there is not enough free space in the buffer to
satisfy a write request), excess values will be discarded. When the reader
catches up, the same number of values will be zeroed in the output.

Unbounded mode is currently unused since our audio input and output
streams are not synchronised. This will be implemented in a later commit.

Also reimplemented as a lock-free queue which is safer for use in audio
device callbacks.
This commit is contained in:
Chris Spencer
2022-01-23 16:44:00 +00:00
committed by Geoffrey McRae
parent b34b253814
commit 599fdd6ffd
3 changed files with 226 additions and 148 deletions

View File

@@ -19,79 +19,68 @@
*/
#include "common/ringbuffer.h"
#include "common/locking.h"
#include "common/debug.h"
#include "common/util.h"
#include <stdatomic.h>
#include <stdlib.h>
#include <string.h>
struct RingBuffer
{
RingBufferValueFn preOverwriteFn;
void * preOverwriteUdata;
int length;
size_t valueSize;
LG_Lock lock;
int start, pos, count;
char values[0];
uint32_t length;
uint32_t valueSize;
uint32_t readPos;
uint32_t writePos;
bool unbounded;
char values[0];
};
RingBuffer ringbuffer_new(int length, size_t valueSize)
RingBuffer ringbuffer_newInternal(int length, size_t valueSize,
bool unbounded)
{
DEBUG_ASSERT(valueSize > 0 && valueSize < UINT32_MAX);
struct RingBuffer * rb = calloc(1, sizeof(*rb) + valueSize * length);
rb->length = length;
rb->valueSize = valueSize;
LG_LOCK_INIT(rb->lock);
rb->readPos = 0;
rb->writePos = 0;
rb->unbounded = unbounded;
return rb;
}
RingBuffer ringbuffer_new(int length, size_t valueSize)
{
return ringbuffer_newInternal(length, valueSize, false);
}
RingBuffer ringbuffer_newUnbounded(int length, size_t valueSize)
{
return ringbuffer_newInternal(length, valueSize, true);
}
void ringbuffer_free(RingBuffer * rb)
{
if (!*rb)
return;
LG_LOCK_FREE(rb->lock);
free(*rb);
*rb = NULL;
}
void ringbuffer_push(RingBuffer rb, const void * value)
{
void * dst = rb->values + rb->pos * rb->valueSize;
if (rb->count < rb->length)
++rb->count;
else
{
if (++rb->start == rb->length)
rb->start = 0;
if (!rb->unbounded && ringbuffer_getCount(rb) == rb->length)
ringbuffer_consume(rb, NULL, 1);
if (rb->preOverwriteFn)
rb->preOverwriteFn(dst, rb->preOverwriteUdata);
}
memcpy(dst, value, rb->valueSize);
if (++rb->pos == rb->length)
rb->pos = 0;
}
bool ringbuffer_shift(RingBuffer rb, void * dst)
{
if (rb->count == 0)
return false;
memcpy(dst, rb->values + rb->start * rb->valueSize, rb->valueSize);
--rb->count;
if (++rb->start == rb->length)
rb->start = 0;
return true;
ringbuffer_append(rb, value, 1);
}
void ringbuffer_reset(RingBuffer rb)
{
rb->start = 0;
rb->pos = 0;
rb->count = 0;
atomic_store(&rb->readPos, 0);
atomic_store(&rb->writePos, 0);
}
int ringbuffer_getLength(const RingBuffer rb)
@@ -101,12 +90,15 @@ int ringbuffer_getLength(const RingBuffer rb)
int ringbuffer_getStart(const RingBuffer rb)
{
return rb->start;
return atomic_load(&rb->readPos) % rb->length;
}
int ringbuffer_getCount(const RingBuffer rb)
{
return rb->count;
uint32_t writePos = atomic_load(&rb->writePos);
uint32_t readPos = atomic_load(&rb->readPos);
return writePos - readPos;
}
void * ringbuffer_getValues(const RingBuffer rb)
@@ -114,113 +106,192 @@ void * ringbuffer_getValues(const RingBuffer rb)
return rb->values;
}
void * ringBuffer_getLastValue(const RingBuffer rb)
{
if (rb->count == 0)
return NULL;
int index = rb->start + rb->count - 1;
if (index >= rb->length)
index -= rb->length;
return rb->values + index * rb->valueSize;
}
int ringbuffer_append(const RingBuffer rb, const void * values, int count)
{
if (count == 0)
return 0;
LG_LOCK(rb->lock);
if (count > rb->length - rb->count)
count = rb->length - rb->count;
// Seeking backwards is only supported in unbounded mode at the moment
if (count < 0 && !rb->unbounded)
return 0;
const char * p = (const char *)values;
int remain = count;
do
uint32_t readPos = atomic_load_explicit(&rb->readPos, memory_order_acquire);
uint32_t writePos = rb->writePos;
uint32_t newWritePos = writePos;
if (count < 0)
{
int copy = rb->length - rb->pos;
if (copy > remain)
copy = remain;
memcpy(rb->values + rb->pos * rb->valueSize, p, copy * rb->valueSize);
rb->pos += copy;
if (rb->pos == rb->length)
rb->pos = 0;
p += copy * rb->valueSize;
remain -= copy;
// Seeking backwards; just update the write pointer
newWritePos += count;
}
while(remain > 0);
rb->count += count;
LG_UNLOCK(rb->lock);
return count;
}
void * ringbuffer_consume(const RingBuffer rb, int * count)
{
LG_LOCK(rb->lock);
if (rb->count == 0)
else
{
*count = 0;
LG_UNLOCK(rb->lock);
return NULL;
int32_t writeOffset = writePos - readPos;
if (writeOffset < 0)
{
DEBUG_ASSERT(rb->unbounded);
// The reader is ahead of the writer; skip new values to remain in sync
int32_t underrun = -writeOffset;
int32_t skipLen = min(underrun, count);
if (values)
values += skipLen * rb->valueSize;
count -= skipLen;
newWritePos += skipLen;
writeOffset = newWritePos - readPos;
}
if (count > 0)
{
DEBUG_ASSERT(writeOffset >= 0);
// We may not be able to write anything if the writer is too far ahead of
// the reader
uint32_t writeLen = 0;
if (writeOffset < rb->length) {
uint32_t writeIndex = newWritePos % rb->length;
uint32_t writeAvailable = rb->length - writeOffset;
uint32_t writeAvailableBack =
min(rb->length - writeIndex, writeAvailable);
writeLen = min(count, writeAvailable);
uint32_t writeLenBack = min(writeLen, writeAvailableBack);
uint32_t writeLenFront = writeLen - writeLenBack;
if (values)
{
memcpy(rb->values + writeIndex * rb->valueSize, values,
writeLenBack * rb->valueSize);
memcpy(rb->values, values + writeLenBack * rb->valueSize,
writeLenFront * rb->valueSize);
}
else
{
memset(rb->values + writeIndex * rb->valueSize, 0,
writeLenBack * rb->valueSize);
memset(rb->values, 0, writeLenFront * rb->valueSize);
}
}
if (rb->unbounded)
newWritePos += count;
else
newWritePos += writeLen;
}
}
if (*count > rb->count)
*count = rb->count;
atomic_store_explicit(&rb->writePos, newWritePos, memory_order_release);
if (*count > rb->length - rb->start)
*count = rb->length - rb->start;
void * values = rb->values + rb->start * rb->valueSize;
rb->start += *count;
rb->count -= *count;
if (rb->start == rb->length)
rb->start = 0;
LG_UNLOCK(rb->lock);
return values;
return newWritePos - writePos;
}
void ringbuffer_setPreOverwriteFn(const RingBuffer rb, RingBufferValueFn fn,
void * udata)
int ringbuffer_consume(const RingBuffer rb, void * values, int count)
{
rb->preOverwriteFn = fn;
rb->preOverwriteUdata = udata;
if (count == 0)
return 0;
// Seeking backwards is only supported in unbounded mode at the moment
if (count < 0 && !rb->unbounded)
return 0;
uint32_t readPos = rb->readPos;
uint32_t writePos = atomic_load_explicit(&rb->writePos, memory_order_acquire);
uint32_t newReadPos = readPos;
if (count < 0)
{
// Seeking backwards; just update the read pointer
newReadPos += count;
}
else
{
int32_t writeOffset = writePos - newReadPos;
if (writeOffset < 0)
{
DEBUG_ASSERT(rb->unbounded);
// We are already in an underrun condition; just fill the buffer with
// zeros
newReadPos += count;
if (values)
memset(values, 0, count * rb->valueSize);
}
else
{
uint32_t readIndex = newReadPos % rb->length;
uint32_t readAvailable = min(writeOffset, rb->length);
uint32_t readLen = min(count, readAvailable);
if (values)
{
uint32_t readAvailableBack = min(rb->length - readIndex, readAvailable);
uint32_t readLenBack = min(readLen, readAvailableBack);
uint32_t readLenFront = readLen - readLenBack;
memcpy(values, rb->values + readIndex * rb->valueSize,
readLenBack * rb->valueSize);
memcpy(values + readLenBack * rb->valueSize, rb->values,
readLenFront * rb->valueSize);
if (rb->unbounded && readLen < count)
{
// One of two things has happened: we have caught up with the writer
// and are starting to underrun, or we are really far behind the
// writer and an overrun has occurred. Either way, the only thing left
// to do is to fill the rest of the buffer with zeros
uint32_t remaining = count - readLen;
memset(values + readLen * rb->valueSize, 0,
remaining * rb->valueSize);
}
}
if (rb->unbounded)
newReadPos += count;
else
newReadPos += readLen;
}
}
atomic_store_explicit(&rb->readPos, newReadPos, memory_order_release);
return newReadPos - readPos;
}
void ringbuffer_forEach(const RingBuffer rb, RingBufferIterator fn, void * udata,
bool reverse)
void ringbuffer_forEach(const RingBuffer rb, RingBufferIterator fn,
void * udata, bool reverse)
{
uint32_t readPos = rb->readPos;
uint32_t writePos = atomic_load_explicit(&rb->writePos, memory_order_acquire);
int32_t writeOffset = writePos - readPos;
if (writeOffset < 0)
{
DEBUG_ASSERT(rb->unbounded);
return;
}
uint32_t readAvailable = min(writeOffset, rb->length);
if (reverse)
{
int index = rb->start + rb->count - 1;
if (index >= rb->length)
index -= rb->length;
for(int i = 0; i < rb->count; ++i)
readPos = readPos + readAvailable - 1;
for (int i = 0; i < readAvailable; ++i, --readPos)
{
void * value = rb->values + index * rb->valueSize;
if (--index == -1)
index = rb->length - 1;
uint32_t readIndex = readPos % rb->length;
void * value = rb->values + readIndex * rb->valueSize;
if (!fn(i, value, udata))
break;
}
}
else
{
int index = rb->start;
for(int i = 0; i < rb->count; ++i)
for (int i = 0; i < readAvailable; ++i, ++readPos)
{
void * value = rb->values + index * rb->valueSize;
if (++index == rb->length)
index = 0;
uint32_t readIndex = readPos % rb->length;
void * value = rb->values + readIndex * rb->valueSize;
if (!fn(i, value, udata))
break;
}