// // Created by Kirill Zhukov on 20.04.2025. // #include "SharedCommandQueue.h" namespace usub::core { SharedCommandQueue::SharedCommandQueue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { for (size_t i = 0; i < this->capacity_; ++i) new(&this->buffer_[i]) Command(); } SharedCommandQueue::~SharedCommandQueue() { for (size_t i = 0; i < this->capacity_; ++i) { this->buffer_[i].~Command(); } } bool SharedCommandQueue::try_push(const Command& cmd) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; // Очередь полна Command& slot = this->buffer_[head]; slot.ready.store(0, std::memory_order_relaxed); slot.op = cmd.op; slot.key = cmd.key; slot.value_size = cmd.value_size; std::memcpy(slot.value, cmd.value, cmd.value_size); slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) { size_t head = this->head_.load(std::memory_order_relaxed); size_t tail = this->tail_.load(std::memory_order_acquire); size_t free_slots = (tail + this->capacity_ - head) % this->capacity_; if (free_slots < count) return false; for (size_t i = 0; i < count; ++i) { Command& slot = this->buffer_[(head + i) % this->capacity_]; slot = cmds[i]; } this->head_.store((head + count) % this->capacity_, std::memory_order_release); return true; } void SharedCommandQueue::finalize(Command* cmd) { cmd->ready.store(1, std::memory_order_release); } std::optional SharedCommandQueue::try_pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); if (tail == this->head_.load(std::memory_order_acquire)) { return std::nullopt; } Command& slot = this->buffer_[tail]; Command cmd = slot; this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); return cmd; } size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); size_t available = (head + this->capacity_ - tail) % this->capacity_; size_t to_pop = (available < max_count) ? available : max_count; size_t copied = 0; for (size_t i = 0; i < to_pop; ++i) { Command& src = this->buffer_[(tail + i) % this->capacity_]; if (src.ready.load(std::memory_order_acquire) == 0) break; out[copied++] = src; } this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release); return copied; } Command* SharedCommandQueue::peek(size_t index) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); if (tail == head) return nullptr; return &this->buffer_[tail % this->capacity_]; } void SharedCommandQueue::pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); } size_t SharedCommandQueue::pending_count() const { size_t head = this->head_.load(std::memory_order_acquire); size_t tail = this->tail_.load(std::memory_order_relaxed); return (head + this->capacity_ - tail) % this->capacity_; } Command* SharedCommandQueue::raw_buffer() noexcept { return this->buffer_; } size_t SharedCommandQueue::capacity() const noexcept { return this->capacity_; } std::atomic& SharedCommandQueue::head() noexcept { return this->head_; } std::atomic& SharedCommandQueue::tail() noexcept { return this->tail_; } bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; Command& slot = this->buffer_[head]; slot.op = OperationType::PUT; slot.key = key; slot.value_size = static_cast(value.size()); std::memcpy(slot.value, value.data(), value.size()); slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); slot.ready.store(1, std::memory_order_release); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::enqueue_find(const utils::Hash128& key) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; Command& slot = buffer_[head]; slot.op = OperationType::FIND; slot.key = key; slot.value_size = 0; slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); slot.ready.store(1, std::memory_order_release); this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::await_response(Command& cmd) { while (cmd.response_ready.load(std::memory_order_acquire) == 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); return cmd.response_size != 0; } }