diff --git a/core/SharedMemoryManager.cpp b/core/SharedMemoryManager.cpp new file mode 100644 index 0000000..2ccb262 --- /dev/null +++ b/core/SharedMemoryManager.cpp @@ -0,0 +1,95 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "SharedMemoryManager.h" + +namespace usub::core +{ + SharedMemoryManager::SharedMemoryManager(const std::string& name, size_t size, bool create_new) : + shm_name("/" + name), shm_size(size), shm_fd(-1), shm_ptr(nullptr) + { + if (create_new) + { + create(); + } + else + { + open(); + } + } + + SharedMemoryManager::~SharedMemoryManager() + { + if (this->shm_ptr != nullptr) + { + munmap(this->shm_ptr, shm_size); + } + if (this->shm_fd != -1) + { + close(this->shm_fd); + } + } + + void* SharedMemoryManager::base_ptr() const noexcept + { + return this->shm_ptr; + } + + size_t SharedMemoryManager::size() const noexcept + { + return this->shm_size; + } + + const std::string& SharedMemoryManager::name() const noexcept + { + return this->shm_name; + } + + void SharedMemoryManager::destroy() + { + munmap(this->shm_ptr, this->shm_size); + close(this->shm_fd); + shm_unlink(this->shm_name.c_str()); + this->shm_ptr = nullptr; + this->shm_fd = -1; + } + + void SharedMemoryManager::create() + { + this->shm_fd = ::shm_open(this->shm_name.c_str(), O_CREAT | O_RDWR | O_EXCL, 0600); + if (this->shm_fd == -1) + { + throw std::runtime_error("Failed to create shared memory: " + shm_name); + } + if (::ftruncate(this->shm_fd, this->shm_size) == -1) + { + ::close(this->shm_fd); + ::shm_unlink(this->shm_name.c_str()); + throw std::runtime_error("Failed to set size for shared memory: " + this->shm_name); + } + this->shm_ptr = ::mmap(nullptr, this->shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->shm_fd, 0); + if (this->shm_ptr == MAP_FAILED) + { + ::close(this->shm_fd); + ::shm_unlink(this->shm_name.c_str()); + throw std::runtime_error("Failed to mmap shared memory: " + this->shm_name); + } + std::memset(this->shm_ptr, 0, this->shm_size); + } + + void SharedMemoryManager::open() + { + this->shm_fd = ::shm_open(this->shm_name.c_str(), O_RDWR, 0600); + if (this->shm_fd == -1) + { + throw std::runtime_error("Failed to open shared memory: " + this->shm_name); + } + this->shm_ptr = ::mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->shm_fd, 0); + if (this->shm_ptr == MAP_FAILED) + { + ::close(this->shm_fd); + throw std::runtime_error("Failed to mmap shared memory: " + this->shm_name); + } + } +} diff --git a/core/SharedMemoryManager.h b/core/SharedMemoryManager.h new file mode 100644 index 0000000..0416ad3 --- /dev/null +++ b/core/SharedMemoryManager.h @@ -0,0 +1,46 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef SHAREDMEMORYMANAGER_H +#define SHAREDMEMORYMANAGER_H + +#include +#include +#include // O_CREAT, O_RDWR +#include // ftruncate, close +#include // mmap, munmap, shm_open, shm_unlink +#include +#include + +namespace usub::core +{ + class SharedMemoryManager + { + public: + SharedMemoryManager(const std::string& name, size_t size, bool create_new = true); + + ~SharedMemoryManager(); + + [[nodiscard]] void* base_ptr() const noexcept; + + [[nodiscard]] size_t size() const noexcept; + + [[nodiscard]] const std::string& name() const noexcept; + + void destroy(); + + private: + void create(); + + void open(); + + private: + std::string shm_name; + size_t shm_size; + int shm_fd; + void* shm_ptr; + }; +} // namespace usub::utils + +#endif //SHAREDMEMORYMANAGER_H diff --git a/utils/datastructures/LFCircullarBuffer.cpp b/utils/datastructures/LFCircullarBuffer.cpp new file mode 100644 index 0000000..41f597f --- /dev/null +++ b/utils/datastructures/LFCircullarBuffer.cpp @@ -0,0 +1,5 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "LFCircullarBuffer.h" diff --git a/utils/datastructures/LFCircullarBuffer.h b/utils/datastructures/LFCircullarBuffer.h new file mode 100644 index 0000000..e75642f --- /dev/null +++ b/utils/datastructures/LFCircullarBuffer.h @@ -0,0 +1,187 @@ +// +// Created by kirill on 11/15/24. +// + +#ifndef UVENT_LFCIRCULARBUFFER_H +#define UVENT_LFCIRCULARBUFFER_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utils/intrinsincs/optimizations.h" + +namespace usub::utils +{ + constexpr size_t CACHELINE_SIZE = 64; + + class ExponentialBackoff + { + int count = 0; + + public: + void operator()() + { + if (this->count < 6) + { + for (int i = 0; i < (1 << this->count); ++i) + cpu_relax(); + } + else if (this->count < 12) + { + std::this_thread::yield(); + } + else + { + std::this_thread::sleep_for(std::chrono::nanoseconds(1 << (this->count - 12))); + } + ++this->count; + } + + void reset() { this->count = 0; } + }; + + template + class LockFreeRingBuffer + { + static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of 2"); + + struct alignas(CACHELINE_SIZE) Cell + { + std::atomic sequence; + typename std::aligned_storage::type storage; + }; + + static constexpr size_t MASK = Capacity - 1; + alignas(CACHELINE_SIZE) Cell buffer[Capacity]; + + alignas(CACHELINE_SIZE) std::atomic head{0}; + alignas(CACHELINE_SIZE) std::atomic tail{0}; + + public: + LockFreeRingBuffer() + { + for (size_t i = 0; i < Capacity; ++i) + { + buffer[i].sequence.store(i, std::memory_order_relaxed); + } + } + + ~LockFreeRingBuffer() + { + while (pop()) + { + } + } + + bool push(const T& val) + { + ExponentialBackoff backoff; + size_t pos = this->head.load(std::memory_order_relaxed); + + while (true) + { + Cell& cell = this->buffer[pos & MASK]; + prefetch_for_write(&cell); + size_t seq = cell.sequence.load(std::memory_order_acquire); + intptr_t diff = static_cast(seq) - static_cast(pos); + + if (diff == 0) + { + if (this->head.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed)) + { + new(&cell.storage) T(val); + cell.sequence.store(pos + 1, std::memory_order_release); + return true; + } + } + else if (diff < 0) + { + return false; + } + else + { + backoff(); + pos = this->head.load(std::memory_order_relaxed); + } + } + } + + std::optional pop() + { + ExponentialBackoff backoff; + size_t pos = this->tail.load(std::memory_order_relaxed); + + while (true) + { + Cell& cell = this->buffer[pos & MASK]; + prefetch_for_read(&cell); + size_t seq = cell.sequence.load(std::memory_order_acquire); + intptr_t diff = static_cast(seq) - static_cast(pos + 1); + + if (diff == 0) + { + if (this->tail.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed)) + { + T* ptr = reinterpret_cast(&cell.storage); + T val = std::move(*ptr); + ptr->~T(); + cell.sequence.store(pos + Capacity, std::memory_order_release); + return val; + } + } + else if (diff < 0) + { + return std::nullopt; + } + else + { + backoff(); + pos = this->tail.load(std::memory_order_relaxed); + } + } + } + + template + size_t push_batch(const std::array& values) + { + size_t pushed = 0; + for (size_t i = 0; i < N; ++i) + { + if (!push(values[i])) + break; + ++pushed; + } + return pushed; + } + + template + size_t pop_batch(std::array& out) + { + size_t popped = 0; + for (size_t i = 0; i < N; ++i) + { + auto val = pop(); + if (!val) + break; + out[i] = std::move(*val); + ++popped; + } + return popped; + } + + [[nodiscard]] size_t unsafe_size() const + { + const size_t h = this->head.load(std::memory_order_acquire); + const size_t t = this->tail.load(std::memory_order_acquire); + return (h - t) & MASK; + } + }; +} + +#endif //UVENT_LFCIRCULARBUFFER_H diff --git a/utils/datastructures/LFSkipList.h b/utils/datastructures/LFSkipList.h index f87354b..98b587f 100644 --- a/utils/datastructures/LFSkipList.h +++ b/utils/datastructures/LFSkipList.h @@ -12,21 +12,35 @@ #include #include "utils/io/VersionManager.h" -#if defined(__x86_64__) || defined(_M_X64) || defined(__i386) || defined(_M_IX86) -#include -inline void cpu_relax() noexcept { _mm_pause(); } -inline void prefetch_for_read(const void* ptr) noexcept { - _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); -} -#else -inline void cpu_relax() noexcept -{ -} +#include "utils/intrinsincs/optimizations.h" -inline void prefetch_for_read(const void*) noexcept -{ -} -#endif +// #if defined(__x86_64__) || defined(_M_X64) || defined(__i386) || defined(_M_IX86) +// #include +// inline void cpu_relax() noexcept { _mm_pause(); } +// inline void prefetch_for_read(const void* ptr) noexcept { +// _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); +// } +// #elif defined(__aarch64__) +// inline void cpu_relax() noexcept { asm volatile("yield" ::: "memory"); } +// +// inline void prefetch_for_write(const void* ptr) noexcept +// { +// asm volatile("prfm pstl1strm, [%0]" :: "r"(ptr)); +// } +// +// inline void prefetch_for_read(const void* ptr) noexcept +// { +// asm volatile("prfm pldl1keep, [%0]" :: "r"(ptr)); +// } +// #else +// inline void cpu_relax() noexcept +// { +// } +// +// inline void prefetch_for_read(const void*) noexcept +// { +// } +// #endif namespace usub::utils { diff --git a/utils/intrinsincs/optimizations.h b/utils/intrinsincs/optimizations.h new file mode 100644 index 0000000..2ed4771 --- /dev/null +++ b/utils/intrinsincs/optimizations.h @@ -0,0 +1,49 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef OPTIMIZATIONS_H +#define OPTIMIZATIONS_H + +#if defined(__x86_64__) || defined(_M_X64) || defined(__i386) || defined(_M_IX86) + +#include + +inline void cpu_relax() noexcept { _mm_pause(); } + +inline void prefetch_for_write(const void *ptr) noexcept { + _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); +} + +inline void prefetch_for_read(const void *ptr) noexcept { + _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); +} + +#elif defined(__aarch64__) +inline void cpu_relax() noexcept { asm volatile("yield" ::: "memory"); } + +inline void prefetch_for_write(const void* ptr) noexcept +{ + asm volatile("prfm pstl1strm, [%0]" :: "r"(ptr)); +} + +inline void prefetch_for_read(const void* ptr) noexcept +{ + asm volatile("prfm pldl1keep, [%0]" :: "r"(ptr)); +} +#else + +inline void cpu_relax() noexcept +{ +} + +inline void prefetch_for_write(const void*) noexcept +{ +} + +inline void prefetch_for_read(const void*) noexcept +{ +} +#endif + +#endif //OPTIMIZATIONS_H