ringbuffer.RingBuffer

dareplane_utils.general.ringbuffer.RingBuffer(shape, dtype=np.float32)

A simple numpy ring buffer for data and timestamps.

This class implements a ring buffer using numpy arrays to store data and corresponding timestamps. It is designed to efficiently handle a fixed-size buffer with FIFO (First In, First Out) behavior. The buffer can be used to store and retrieve data samples along with their timestamps.

Attributes

Name Type Description
buffer np.ndarray The data buffer.
buffer_t np.ndarray The time buffer.
last_t float The latest timestamp.
curr_i int The index of the latest data point in the buffer.
logger logging.Logger The logger used for warnings and debug messages.

Parameters

Name Type Description Default
shape tuple[int, int] The shape of the buffer, which needs to be at least 2D (n_samples, n_features). Arbitrary further dimensions can be added. required
dtype type A numpy data type for the buffer. Defaults to np.float32. np.float32

Examples

>>> rb = RingBuffer(shape=(10, 3), dtype=np.float32)
>>> samples = [np.random.rand(3) for _ in range(5)]
>>> times = list(range(5))
>>> rb.add_samples(samples, times)
>>>
>>> print(rb.buffer)
[[0.6456422  0.6063579  0.46437985]
[0.77531135 0.12675048 0.10201751]
[0.05525873 0.5852236  0.09854987]
[0.7515863  0.20880483 0.7236128 ]
[0.7157382  0.55806357 0.6247236 ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]]
>>>
>>> print(rb.buffer_t)
[0. 1. 2. 3. 4. 0. 0. 0. 0. 0.]
>>>
>>> print(rb.unfold_buffer())  # most recent sample is index=-1, second to most recent index=-2, etc.
[[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.         0.         0.        ]
[0.6456422  0.6063579  0.46437985]
[0.77531135 0.12675048 0.10201751]
[0.05525873 0.5852236  0.09854987]
[0.7515863  0.20880483 0.7236128 ]
[0.7157382  0.55806357 0.6247236 ]]
>>>
>>> print(rb.unfold_buffer_t())
[0. 0. 0. 0. 0. 0. 1. 2. 3. 4.]
>>> samples2 = [np.random.rand(3) for _ in range(8)]
>>> times2 = list(range(8))
>>> rb.add_samples(samples2, times2)
>>> print(rb.buffer_t)
[5. 6. 7. 3. 4. 0. 1. 2. 3. 4.]
>>>
>>> print(rb.unfold_buffer_t())
[3. 4. 0. 1. 2. 3. 4. 5. 6. 7.]

Methods

Name Description
add_continuous_buffer Slice samples should not be necessary >>> as we add continuously
add_samples Add samples to the buffer and progress the index
get_insert_slices Get slices mapping data from the samples to the buffer
unfold_buffer Unfold the buffer to return the data in chronological order.
unfold_buffer_t Unfold the buffer_t (time stamp buffer) to return time stamps in chronological order.

add_continuous_buffer

dareplane_utils.general.ringbuffer.RingBuffer.add_continuous_buffer(
    slice_buffer,
    samples,
    times,
)

Slice samples should not be necessary >>> as we add continuously + slice selection from lists is slow

add_samples

dareplane_utils.general.ringbuffer.RingBuffer.add_samples(samples, times)

Add samples to the buffer and progress the index

get_insert_slices

dareplane_utils.general.ringbuffer.RingBuffer.get_insert_slices(len_samples)

Get slices mapping data from the samples to the buffer

Parameters

Name Type Description Default
len_samples int number of samples to add required

Returns

Name Type Description
tuple[list[slice], list[slice], int]

unfold_buffer

dareplane_utils.general.ringbuffer.RingBuffer.unfold_buffer()

Unfold the buffer to return the data in chronological order.

This method returns the data in the buffer in chronological order, ending with the most recent sample.

Returns

Name Type Description
np.ndarray The unfolded data buffer.

unfold_buffer_t

dareplane_utils.general.ringbuffer.RingBuffer.unfold_buffer_t()

Unfold the buffer_t (time stamp buffer) to return time stamps in chronological order.

This method returns the time stamps in the buffer in chronological order, ending with the most recent time stamp.

Returns

Name Type Description
np.ndarray The unfolded data buffer.