191 lines
6.7 KiB
C++
191 lines
6.7 KiB
C++
//
|
|
// Created by Kirill Zhukov on 20.04.2025.
|
|
//
|
|
|
|
#include "SharedCommandQueue.h"
|
|
|
|
namespace usub::core {
|
|
SharedCommandQueue::SharedCommandQueue(size_t capacity)
|
|
: capacity_(capacity), head_(0), tail_(0) {
|
|
static_assert(offsetof(SharedCommandQueue, buffer_) % alignof(Command) == 0, "buffer_ is not properly aligned");
|
|
for (size_t i = 0; i < this->capacity_; ++i)
|
|
new(&this->buffer_[i]) Command();
|
|
}
|
|
|
|
SharedCommandQueue::~SharedCommandQueue() {
|
|
for (size_t i = 0; i < this->capacity_; ++i)
|
|
(this->buffer_ + i)->~Command();
|
|
}
|
|
|
|
bool SharedCommandQueue::try_push(const Command &cmd) {
|
|
size_t head = this->head_.load(std::memory_order_relaxed);
|
|
size_t next_head = (head + 1) % this->capacity_;
|
|
|
|
if (next_head == this->tail_.load(std::memory_order_acquire))
|
|
return false; // Очередь полна
|
|
|
|
Command *slot = reinterpret_cast<Command *>(
|
|
reinterpret_cast<char *>(this) + offsetof(SharedCommandQueue, buffer_)
|
|
) + head;
|
|
|
|
slot->ready.store(0, std::memory_order_relaxed);
|
|
slot->op = cmd.op;
|
|
slot->key = cmd.key;
|
|
slot->value_size = cmd.value_size;
|
|
std::memcpy(slot->value, cmd.value, cmd.value_size);
|
|
slot->response_size = 0;
|
|
slot->response_ready.store(0, std::memory_order_relaxed);
|
|
|
|
this->head_.store(next_head, std::memory_order_release);
|
|
return true;
|
|
}
|
|
|
|
bool SharedCommandQueue::try_push_batch(const Command *cmds, size_t count) {
|
|
size_t head = this->head_.load(std::memory_order_relaxed);
|
|
size_t tail = this->tail_.load(std::memory_order_acquire);
|
|
|
|
size_t free_slots = (tail + this->capacity_ - head) % this->capacity_;
|
|
if (free_slots < count)
|
|
return false;
|
|
|
|
for (size_t i = 0; i < count; ++i) {
|
|
Command &slot = this->buffer_[(head + i) % this->capacity_];
|
|
slot = cmds[i];
|
|
}
|
|
this->head_.store((head + count) % this->capacity_, std::memory_order_release);
|
|
return true;
|
|
}
|
|
|
|
void SharedCommandQueue::finalize(Command *cmd) {
|
|
cmd->ready.store(1, std::memory_order_release);
|
|
}
|
|
|
|
std::optional<Command> SharedCommandQueue::try_pop() {
|
|
size_t tail = this->tail_.load(std::memory_order_relaxed);
|
|
|
|
if (tail == this->head_.load(std::memory_order_acquire)) {
|
|
return std::nullopt;
|
|
}
|
|
|
|
Command &slot = this->buffer_[tail];
|
|
Command cmd = slot;
|
|
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
|
|
return cmd;
|
|
}
|
|
|
|
size_t SharedCommandQueue::try_pop_batch(Command *out, size_t max_count) {
|
|
size_t tail = this->tail_.load(std::memory_order_relaxed);
|
|
size_t head = this->head_.load(std::memory_order_acquire);
|
|
|
|
size_t available = (head + this->capacity_ - tail) % this->capacity_;
|
|
size_t to_pop = (available < max_count) ? available : max_count;
|
|
|
|
size_t copied = 0;
|
|
|
|
for (size_t i = 0; i < to_pop; ++i) {
|
|
Command &src = this->buffer_[(tail + i) % this->capacity_];
|
|
|
|
if (src.ready.load(std::memory_order_acquire) == 0)
|
|
break;
|
|
|
|
out[copied++] = src;
|
|
}
|
|
|
|
this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release);
|
|
return copied;
|
|
}
|
|
|
|
Command *SharedCommandQueue::peek(size_t index) {
|
|
size_t tail = this->tail_.load(std::memory_order_relaxed);
|
|
size_t head = this->head_.load(std::memory_order_acquire);
|
|
|
|
size_t available = (head + this->capacity_ - tail) % this->capacity_;
|
|
if (index >= available)
|
|
return nullptr;
|
|
|
|
size_t pos = (tail + index) % this->capacity_;
|
|
return reinterpret_cast<Command *>(
|
|
reinterpret_cast<char *>(this) + offsetof(SharedCommandQueue, buffer_)
|
|
) + pos;
|
|
}
|
|
|
|
void SharedCommandQueue::pop() {
|
|
size_t tail = this->tail_.load(std::memory_order_relaxed);
|
|
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
|
|
}
|
|
|
|
size_t SharedCommandQueue::pending_count() const {
|
|
size_t head = this->head_.load(std::memory_order_acquire);
|
|
size_t tail = this->tail_.load(std::memory_order_relaxed);
|
|
return (head + this->capacity_ - tail) % this->capacity_;
|
|
}
|
|
|
|
Command *SharedCommandQueue::raw_buffer() noexcept {
|
|
return this->buffer_;
|
|
}
|
|
|
|
size_t SharedCommandQueue::capacity() const noexcept {
|
|
return this->capacity_;
|
|
}
|
|
|
|
std::atomic<size_t> &SharedCommandQueue::head() noexcept {
|
|
return this->head_;
|
|
}
|
|
|
|
std::atomic<size_t> &SharedCommandQueue::tail() noexcept {
|
|
return this->tail_;
|
|
}
|
|
|
|
bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128 &key, const std::string &value) {
|
|
size_t head = this->head_.load(std::memory_order_relaxed);
|
|
size_t next_head = (head + 1) % this->capacity_;
|
|
if (next_head == this->tail_.load(std::memory_order_acquire))
|
|
return false;
|
|
|
|
Command &slot = this->buffer_[head];
|
|
slot.op = OperationType::PUT;
|
|
slot.key = key;
|
|
slot.value_size = static_cast<uint32_t>(value.size());
|
|
std::memcpy(slot.value, value.data(), value.size());
|
|
slot.response_size = 0;
|
|
slot.response_ready.store(0, std::memory_order_relaxed);
|
|
slot.ready.store(1, std::memory_order_release);
|
|
|
|
this->head_.store(next_head, std::memory_order_release);
|
|
return true;
|
|
}
|
|
|
|
bool SharedCommandQueue::enqueue_find(const utils::Hash128 &key) {
|
|
size_t head = this->head_.load(std::memory_order_relaxed);
|
|
size_t next_head = (head + 1) % capacity_;
|
|
if (next_head == this->tail_.load(std::memory_order_acquire))
|
|
return false;
|
|
|
|
Command &slot = buffer_[head];
|
|
slot.op = OperationType::FIND;
|
|
slot.key = key;
|
|
slot.value_size = 0;
|
|
slot.response_size = 0;
|
|
slot.response_ready.store(0, std::memory_order_relaxed);
|
|
slot.ready.store(1, std::memory_order_release);
|
|
|
|
this->head_.store(next_head, std::memory_order_release);
|
|
return true;
|
|
}
|
|
|
|
bool SharedCommandQueue::await_response(Command &cmd) {
|
|
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
return cmd.response_size != 0;
|
|
}
|
|
|
|
size_t SharedCommandQueue::calculate_shm_size(size_t capacity) {
|
|
return (sizeof(SharedCommandQueue) + sizeof(Command) * capacity + SHM_ALIGNMENT - 1) & ~(SHM_ALIGNMENT - 1);
|
|
}
|
|
|
|
void SharedCommandQueue::reset() {
|
|
this->head_.store(0, std::memory_order_relaxed);
|
|
this->tail_.store(0, std::memory_order_relaxed);
|
|
}
|
|
}
|