SharedStorage/utils/datastructures/LFCircullarBuffer.h

196 lines
5.4 KiB
C++

//
// Created by kirill on 11/15/24.
//
#ifndef UVENT_LFCIRCULARBUFFER_H
#define UVENT_LFCIRCULARBUFFER_H
#include <atomic>
#include <vector>
#include <optional>
#include <cstddef>
#include <cstdint>
#include <type_traits>
#include <new>
#include <thread>
#include "utils/intrinsincs/optimizations.h"
namespace usub::utils
{
constexpr size_t CACHELINE_SIZE = 64;
class ExponentialBackoff
{
int count = 0;
public:
void operator()()
{
if (this->count < 6)
{
for (int i = 0; i < (1 << this->count); ++i)
cpu_relax();
}
else if (this->count < 12)
{
std::this_thread::yield();
}
else
{
std::this_thread::sleep_for(std::chrono::nanoseconds(1 << (this->count - 12)));
}
++this->count;
}
void reset() { this->count = 0; }
};
template <typename T>
class LockFreeRingBuffer
{
struct alignas(CACHELINE_SIZE) Cell
{
std::atomic<size_t> sequence;
typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
};
size_t capacity;
size_t mask;
Cell* buffer;
alignas(CACHELINE_SIZE) std::atomic<size_t> head{0};
alignas(CACHELINE_SIZE) std::atomic<size_t> tail{0};
public:
explicit LockFreeRingBuffer(size_t cap = 32)
: capacity(cap), mask(cap - 1), buffer(new Cell[cap])
{
if ((capacity & (capacity - 1)) != 0)
{
throw std::invalid_argument("Capacity must be a power of 2");
}
for (size_t i = 0; i < capacity; ++i)
{
buffer[i].sequence.store(i, std::memory_order_relaxed);
}
}
~LockFreeRingBuffer()
{
while (pop())
{
}
delete[] buffer;
}
template <typename U>
bool push(U&& val)
{
ExponentialBackoff backoff;
size_t pos = this->head.load(std::memory_order_relaxed);
while (true)
{
Cell& cell = this->buffer[pos & mask];
prefetch_for_write(&cell);
size_t seq = cell.sequence.load(std::memory_order_acquire);
intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
if (diff == 0)
{
if (this->head.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed))
{
new(&cell.storage) T(std::forward<U>(val));
cell.sequence.store(pos + 1, std::memory_order_release);
return true;
}
}
else if (diff < 0)
{
return false;
}
else
{
backoff();
pos = this->head.load(std::memory_order_relaxed);
}
}
}
std::optional<T> pop()
{
ExponentialBackoff backoff;
size_t pos = this->tail.load(std::memory_order_relaxed);
while (true)
{
Cell& cell = this->buffer[pos & mask];
prefetch_for_read(&cell);
size_t seq = cell.sequence.load(std::memory_order_acquire);
intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (diff == 0)
{
if (this->tail.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed))
{
T* ptr = reinterpret_cast<T*>(&cell.storage);
T val = std::move(*ptr);
ptr->~T();
cell.sequence.store(pos + capacity, std::memory_order_release);
return val;
}
}
else if (diff < 0)
{
return std::nullopt;
}
else
{
backoff();
pos = this->tail.load(std::memory_order_relaxed);
}
}
}
template <size_t N>
size_t push_batch(const std::array<T, N>& values)
{
size_t pushed = 0;
for (size_t i = 0; i < N; ++i)
{
if (!push(values[i]))
break;
++pushed;
}
return pushed;
}
template <size_t N>
size_t pop_batch(std::array<T, N>& out)
{
size_t popped = 0;
for (size_t i = 0; i < N; ++i)
{
auto val = pop();
if (!val)
break;
out[i] = std::move(*val);
++popped;
}
return popped;
}
[[nodiscard]] size_t unsafe_size() const
{
const size_t h = this->head.load(std::memory_order_acquire);
const size_t t = this->tail.load(std::memory_order_acquire);
return (h - t) & mask;
}
[[nodiscard]] size_t get_capacity() const noexcept { return capacity; }
};
}
#endif //UVENT_LFCIRCULARBUFFER_H