SharedStorage/core/SharedCommandQueue.cpp
2025-04-20 23:32:04 +03:00

196 lines
6.0 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#include "SharedCommandQueue.h"
namespace usub::core
{
SharedCommandQueue::SharedCommandQueue(size_t capacity)
: capacity_(capacity), head_(0), tail_(0)
{
for (size_t i = 0; i < this->capacity_; ++i)
new(&this->buffer_[i]) Command();
}
SharedCommandQueue::~SharedCommandQueue()
{
for (size_t i = 0; i < this->capacity_; ++i)
{
this->buffer_[i].~Command();
}
}
bool SharedCommandQueue::try_push(const Command& cmd)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false; // Очередь полна
Command& slot = this->buffer_[head];
slot.ready.store(0, std::memory_order_relaxed);
slot.op = cmd.op;
slot.key = cmd.key;
slot.value_size = cmd.value_size;
std::memcpy(slot.value, cmd.value, cmd.value_size);
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
this->head_.store(next_head, std::memory_order_release);
return true;
}
bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t tail = this->tail_.load(std::memory_order_acquire);
size_t free_slots = (tail + this->capacity_ - head) % this->capacity_;
if (free_slots < count)
return false;
for (size_t i = 0; i < count; ++i)
{
Command& slot = this->buffer_[(head + i) % this->capacity_];
slot = cmds[i];
}
this->head_.store((head + count) % this->capacity_, std::memory_order_release);
return true;
}
void SharedCommandQueue::finalize(Command* cmd)
{
cmd->ready.store(1, std::memory_order_release);
}
std::optional<Command> SharedCommandQueue::try_pop()
{
size_t tail = this->tail_.load(std::memory_order_relaxed);
if (tail == this->head_.load(std::memory_order_acquire))
{
return std::nullopt;
}
Command& slot = this->buffer_[tail];
Command cmd = slot;
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
return cmd;
}
size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count)
{
size_t tail = this->tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire);
size_t available = (head + this->capacity_ - tail) % this->capacity_;
size_t to_pop = (available < max_count) ? available : max_count;
size_t copied = 0;
for (size_t i = 0; i < to_pop; ++i)
{
Command& src = this->buffer_[(tail + i) % this->capacity_];
if (src.ready.load(std::memory_order_acquire) == 0)
break;
out[copied++] = src;
}
this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release);
return copied;
}
Command* SharedCommandQueue::peek(size_t index)
{
size_t tail = this->tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire);
if (tail == head)
return nullptr;
return &this->buffer_[tail % this->capacity_];
}
void SharedCommandQueue::pop()
{
size_t tail = this->tail_.load(std::memory_order_relaxed);
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
}
size_t SharedCommandQueue::pending_count() const
{
size_t head = this->head_.load(std::memory_order_acquire);
size_t tail = this->tail_.load(std::memory_order_relaxed);
return (head + this->capacity_ - tail) % this->capacity_;
}
Command* SharedCommandQueue::raw_buffer() noexcept
{
return this->buffer_;
}
size_t SharedCommandQueue::capacity() const noexcept
{
return this->capacity_;
}
std::atomic<size_t>& SharedCommandQueue::head() noexcept
{
return this->head_;
}
std::atomic<size_t>& SharedCommandQueue::tail() noexcept
{
return this->tail_;
}
bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = this->buffer_[head];
slot.op = OperationType::PUT;
slot.key = key;
slot.value_size = static_cast<uint32_t>(value.size());
std::memcpy(slot.value, value.data(), value.size());
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
this->head_.store(next_head, std::memory_order_release);
return true;
}
bool SharedCommandQueue::enqueue_find(const utils::Hash128& key)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = buffer_[head];
slot.op = OperationType::FIND;
slot.key = key;
slot.value_size = 0;
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
this->head_.store(next_head, std::memory_order_release);
return true;
}
bool SharedCommandQueue::await_response(Command& cmd)
{
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
}
}