// // Created by Kirill Zhukov on 20.04.2025. // #include "SharedCommandQueue.h" namespace usub::core { SharedCommandQueue::SharedCommandQueue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { for (size_t i = 0; i < capacity_; ++i) new (&buffer_[i]) Command(); } SharedCommandQueue::~SharedCommandQueue() { for (size_t i = 0; i < capacity_; ++i) { buffer_[i].~Command(); } } bool SharedCommandQueue::try_push(const Command& cmd) { size_t head = head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % capacity_; if (next_head == tail_.load(std::memory_order_acquire)) return false; // Очередь полна Command& slot = buffer_[head]; slot.ready.store(0, std::memory_order_relaxed); slot.op = cmd.op; slot.key = cmd.key; slot.value_size = cmd.value_size; std::memcpy(slot.value, cmd.value, cmd.value_size); slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) { size_t head = head_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_acquire); size_t free_slots = (tail + capacity_ - head) % capacity_; if (free_slots < count) return false; for (size_t i = 0; i < count; ++i) { Command& slot = buffer_[(head + i) % capacity_]; slot = cmds[i]; } head_.store((head + count) % capacity_, std::memory_order_release); return true; } void SharedCommandQueue::finalize(Command* cmd) { cmd->ready.store(1, std::memory_order_release); } std::optional SharedCommandQueue::try_pop() { size_t tail = tail_.load(std::memory_order_relaxed); if (tail == head_.load(std::memory_order_acquire)) { return std::nullopt; } Command& slot = buffer_[tail]; Command cmd = slot; tail_.store((tail + 1) % capacity_, std::memory_order_release); return cmd; } size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) { size_t tail = tail_.load(std::memory_order_relaxed); size_t head = head_.load(std::memory_order_acquire); size_t available = (head + capacity_ - tail) % capacity_; size_t to_pop = (available < max_count) ? available : max_count; size_t copied = 0; for (size_t i = 0; i < to_pop; ++i) { Command& src = buffer_[(tail + i) % capacity_]; if (src.ready.load(std::memory_order_acquire) == 0) break; out[copied++] = src; } tail_.store((tail + copied) % capacity_, std::memory_order_release); return copied; } }