// // Created by kirill on 11/15/24. // #ifndef UVENT_LFCIRCULARBUFFER_H #define UVENT_LFCIRCULARBUFFER_H #include #include #include #include #include #include #include #include #include "utils/intrinsincs/optimizations.h" namespace usub::utils { constexpr size_t CACHELINE_SIZE = 64; class ExponentialBackoff { int count = 0; public: void operator()() { if (this->count < 6) { for (int i = 0; i < (1 << this->count); ++i) cpu_relax(); } else if (this->count < 12) { std::this_thread::yield(); } else { std::this_thread::sleep_for(std::chrono::nanoseconds(1 << (this->count - 12))); } ++this->count; } void reset() { this->count = 0; } }; template class LockFreeRingBuffer { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of 2"); struct alignas(CACHELINE_SIZE) Cell { std::atomic sequence; typename std::aligned_storage::type storage; }; static constexpr size_t MASK = Capacity - 1; alignas(CACHELINE_SIZE) Cell buffer[Capacity]; alignas(CACHELINE_SIZE) std::atomic head{0}; alignas(CACHELINE_SIZE) std::atomic tail{0}; public: LockFreeRingBuffer() { for (size_t i = 0; i < Capacity; ++i) { buffer[i].sequence.store(i, std::memory_order_relaxed); } } ~LockFreeRingBuffer() { while (pop()) { } } bool push(const T& val) { ExponentialBackoff backoff; size_t pos = this->head.load(std::memory_order_relaxed); while (true) { Cell& cell = this->buffer[pos & MASK]; prefetch_for_write(&cell); size_t seq = cell.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos); if (diff == 0) { if (this->head.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed)) { new(&cell.storage) T(val); cell.sequence.store(pos + 1, std::memory_order_release); return true; } } else if (diff < 0) { return false; } else { backoff(); pos = this->head.load(std::memory_order_relaxed); } } } std::optional pop() { ExponentialBackoff backoff; size_t pos = this->tail.load(std::memory_order_relaxed); while (true) { Cell& cell = this->buffer[pos & MASK]; prefetch_for_read(&cell); size_t seq = cell.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos + 1); if (diff == 0) { if (this->tail.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed)) { T* ptr = reinterpret_cast(&cell.storage); T val = std::move(*ptr); ptr->~T(); cell.sequence.store(pos + Capacity, std::memory_order_release); return val; } } else if (diff < 0) { return std::nullopt; } else { backoff(); pos = this->tail.load(std::memory_order_relaxed); } } } template size_t push_batch(const std::array& values) { size_t pushed = 0; for (size_t i = 0; i < N; ++i) { if (!push(values[i])) break; ++pushed; } return pushed; } template size_t pop_batch(std::array& out) { size_t popped = 0; for (size_t i = 0; i < N; ++i) { auto val = pop(); if (!val) break; out[i] = std::move(*val); ++popped; } return popped; } [[nodiscard]] size_t unsafe_size() const { const size_t h = this->head.load(std::memory_order_acquire); const size_t t = this->tail.load(std::memory_order_acquire); return (h - t) & MASK; } }; } #endif //UVENT_LFCIRCULARBUFFER_H