// // Created by Kirill Zhukov on 20.04.2025. // #include "SharedCommandQueue.h" namespace usub::core { SharedCommandQueue::SharedCommandQueue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { static_assert(offsetof(SharedCommandQueue, buffer_) % alignof(Command) == 0, "buffer_ is not properly aligned"); for (size_t i = 0; i < this->capacity_; ++i) new(&this->buffer_[i]) Command(); } SharedCommandQueue::~SharedCommandQueue() { for (size_t i = 0; i < this->capacity_; ++i) (this->buffer_ + i)->~Command(); } bool SharedCommandQueue::try_push(const Command &cmd) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; // Очередь полна Command *slot = reinterpret_cast( reinterpret_cast(this) + offsetof(SharedCommandQueue, buffer_) ) + head; slot->ready.store(0, std::memory_order_relaxed); slot->op = cmd.op; slot->key = cmd.key; slot->value_size = cmd.value_size; std::memcpy(slot->value, cmd.value, cmd.value_size); slot->response_size = 0; slot->response_ready.store(0, std::memory_order_relaxed); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::try_push_batch(const Command *cmds, size_t count) { size_t head = this->head_.load(std::memory_order_relaxed); size_t tail = this->tail_.load(std::memory_order_acquire); size_t free_slots = (tail + this->capacity_ - head) % this->capacity_; if (free_slots < count) return false; for (size_t i = 0; i < count; ++i) { Command &slot = this->buffer_[(head + i) % this->capacity_]; slot = cmds[i]; } this->head_.store((head + count) % this->capacity_, std::memory_order_release); return true; } void SharedCommandQueue::finalize(Command *cmd) { cmd->ready.store(1, std::memory_order_release); } std::optional SharedCommandQueue::try_pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); if (tail == this->head_.load(std::memory_order_acquire)) { return std::nullopt; } Command &slot = this->buffer_[tail]; Command cmd = slot; this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); return cmd; } size_t SharedCommandQueue::try_pop_batch(Command *out, size_t max_count) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); size_t available = (head + this->capacity_ - tail) % this->capacity_; size_t to_pop = (available < max_count) ? available : max_count; size_t copied = 0; for (size_t i = 0; i < to_pop; ++i) { Command &src = this->buffer_[(tail + i) % this->capacity_]; if (src.ready.load(std::memory_order_acquire) == 0) break; out[copied++] = src; } this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release); return copied; } Command *SharedCommandQueue::peek(size_t index) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); size_t available = (head + this->capacity_ - tail) % this->capacity_; if (index >= available) return nullptr; size_t pos = (tail + index) % this->capacity_; return reinterpret_cast( reinterpret_cast(this) + offsetof(SharedCommandQueue, buffer_) ) + pos; } void SharedCommandQueue::pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); } size_t SharedCommandQueue::pending_count() const { size_t head = this->head_.load(std::memory_order_acquire); size_t tail = this->tail_.load(std::memory_order_relaxed); return (head + this->capacity_ - tail) % this->capacity_; } Command *SharedCommandQueue::raw_buffer() noexcept { return this->buffer_; } size_t SharedCommandQueue::capacity() const noexcept { return this->capacity_; } std::atomic &SharedCommandQueue::head() noexcept { return this->head_; } std::atomic &SharedCommandQueue::tail() noexcept { return this->tail_; } bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128 &key, const std::string &value) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; Command &slot = this->buffer_[head]; slot.op = OperationType::PUT; slot.key = key; slot.value_size = static_cast(value.size()); std::memcpy(slot.value, value.data(), value.size()); slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); slot.ready.store(1, std::memory_order_release); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::enqueue_find(const utils::Hash128 &key) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; Command &slot = buffer_[head]; slot.op = OperationType::FIND; slot.key = key; slot.value_size = 0; slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); slot.ready.store(1, std::memory_order_release); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::await_response(Command &cmd) { while (cmd.response_ready.load(std::memory_order_acquire) == 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); return cmd.response_size != 0; } size_t SharedCommandQueue::calculate_shm_size(size_t capacity) { return (sizeof(SharedCommandQueue) + sizeof(Command) * capacity + SHM_ALIGNMENT - 1) & ~(SHM_ALIGNMENT - 1); } void SharedCommandQueue::reset() { this->head_.store(0, std::memory_order_relaxed); this->tail_.store(0, std::memory_order_relaxed); } }