SharedStorage/core/SharedCommandQueue.h
2025-04-20 22:29:57 +03:00

102 lines
3.0 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#ifndef SHAREDCOMMANDQUEUE_H
#define SHAREDCOMMANDQUEUE_H
#include "Command.h"
#include "utils/datastructures/LFCircullarBuffer.h"
#include <atomic>
#include <cstdint>
namespace usub::core
{
constexpr size_t SHM_QUEUE_CAPACITY = 1024;
class SharedCommandQueue
{
public:
explicit SharedCommandQueue(size_t capacity);
~SharedCommandQueue();
bool try_push(const Command& cmd);
bool try_push_batch(const Command* cmds, size_t count);
void finalize(Command* cmd);
std::optional<Command> try_pop();
size_t try_pop_batch(Command* out, size_t max_count);
Command* peek(size_t index);
void pop();
size_t pending_count() const;
Command* raw_buffer() noexcept { return buffer_; }
size_t capacity() const noexcept { return capacity_; }
std::atomic<size_t>& head() noexcept { return head_; }
std::atomic<size_t>& tail() noexcept { return tail_; }
bool enqueue_put(const usub::utils::Hash128& key, const std::string& value)
{
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == tail_.load(std::memory_order_acquire))
return false;
Command& slot = buffer_[head];
slot.op = OperationType::PUT;
slot.key = key;
slot.value_size = static_cast<uint32_t>(value.size());
std::memcpy(slot.value, value.data(), value.size());
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
head_.store(next_head, std::memory_order_release);
return true;
}
bool enqueue_find(const usub::utils::Hash128& key)
{
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == tail_.load(std::memory_order_acquire))
return false;
Command& slot = buffer_[head];
slot.op = OperationType::FIND;
slot.key = key;
slot.value_size = 0;
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
head_.store(next_head, std::memory_order_release);
return true;
}
bool await_response(Command& cmd)
{
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
}
private:
size_t capacity_;
Command* commands_;
alignas(64) std::atomic<size_t> head_;
alignas(64) std::atomic<size_t> tail_;
alignas(64) Command buffer_[];
};
}
#endif //SHAREDCOMMANDQUEUE_H