fixed bug on x86

This commit is contained in:
g2px1 2025-04-21 19:28:00 +00:00
parent 18ff5bdaf8
commit e44279add1
7 changed files with 152 additions and 176 deletions

View File

@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 3.30)
cmake_minimum_required(VERSION 3.20..3.30)
project(sharedMemoryKeyDB)
set(CMAKE_CXX_STANDARD 23)

View File

@ -69,7 +69,9 @@
int main()
{
constexpr const char* shm_name = "/shm_rates1";
constexpr size_t shm_size = sizeof(usub::core::SharedCommandQueue);
constexpr size_t capacity = 1024; // или то, что реально
// size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity);
size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity);
int shm_fd = shm_open(shm_name, O_RDWR, 0600);
if (shm_fd == -1)
@ -86,7 +88,8 @@ int main()
return 1;
}
void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
// void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED)
{
perror("mmap failed");
@ -102,7 +105,7 @@ int main()
// -------------------
{
std::cout << "1\n";
size_t head = queue->head().load(std::memory_order_relaxed);
auto head = queue->head().load(std::memory_order_relaxed);
size_t next_head = (head + 1) % queue->capacity();
while (next_head == queue->tail().load(std::memory_order_acquire))
{
@ -112,6 +115,13 @@ int main()
usub::core::Command* slot = &queue->raw_buffer()[head];
std::cout << "queue = " << queue << '\n';
std::cout << "queue->capacity() = " << queue->capacity() << '\n';
std::cout << "head = " << head << '\n';
std::cout << "ptr to raw_buffer = " << static_cast<void*>(queue->raw_buffer()) << '\n';
std::cout << "ptr to slot = " << static_cast<void*>(&queue->raw_buffer()[head]) << '\n';
std::cout.flush();
slot->op = usub::core::OperationType::PUT;
slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов
const char* put_value = "test_value";

View File

@ -7,10 +7,8 @@
#include <istream>
#include <ostream>
namespace usub::core
{
Command::Command()
{
namespace usub::core {
Command::Command() {
ready.store(0, std::memory_order_relaxed);
response_ready.store(0, std::memory_order_relaxed);
op = OperationType::PUT;
@ -19,8 +17,7 @@ namespace usub::core
response_size = 0;
}
Command::Command(const Command& other)
{
Command::Command(const Command &other) {
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
op = other.op;
key = other.key;
@ -32,10 +29,8 @@ namespace usub::core
std::memcpy(response, other.response, other.response_size);
}
Command& Command::operator=(const Command& other)
{
if (this != &other)
{
Command &Command::operator=(const Command &other) {
if (this != &other) {
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
op = other.op;
key = other.key;
@ -49,34 +44,34 @@ namespace usub::core
return *this;
}
void Command::serialize(std::ostream& out) const
{
void Command::serialize(std::ostream &out) const {
auto op_val = static_cast<uint8_t>(op);
out.write(reinterpret_cast<const char*>(&op_val), sizeof(op_val));
out.write(reinterpret_cast<const char*>(&key), sizeof(key));
if (op == OperationType::PUT)
{
out.write(reinterpret_cast<const char*>(&value_size), sizeof(value_size));
out.write(value, value_size);
}
out.write(reinterpret_cast<const char *>(&op_val), sizeof(op_val));
out.write(reinterpret_cast<const char *>(&key), sizeof(key));
out.write(reinterpret_cast<const char *>(&value_size), sizeof(value_size));
out.write(value, value_size);
out.write(reinterpret_cast<const char *>(&response_size), sizeof(response_size));
out.write(response, response_size);
}
Command Command::deserialize(std::istream& in)
{
uint8_t op_val;
in.read(reinterpret_cast<char*>(&op_val), sizeof(op_val));
Command Command::deserialize(std::istream &in) {
Command cmd;
uint8_t op_val;
in.read(reinterpret_cast<char *>(&op_val), sizeof(op_val));
cmd.op = static_cast<OperationType>(op_val);
in.read(reinterpret_cast<char*>(&cmd.key), sizeof(cmd.key));
if (cmd.op == OperationType::PUT)
{
in.read(reinterpret_cast<char*>(&cmd.value_size), sizeof(cmd.value_size));
in.read(reinterpret_cast<char *>(&cmd.key), sizeof(cmd.key));
in.read(reinterpret_cast<char *>(&cmd.value_size), sizeof(cmd.value_size));
if (cmd.value_size > 0)
in.read(cmd.value, cmd.value_size);
}
in.read(reinterpret_cast<char *>(&cmd.response_size), sizeof(cmd.response_size));
if (cmd.response_size > 0)
in.read(cmd.response, cmd.response_size);
cmd.ready.store(1, std::memory_order_relaxed);
cmd.response_ready.store(1, std::memory_order_relaxed);
return cmd;
}

View File

@ -4,32 +4,27 @@
#include "SharedCommandQueue.h"
namespace usub::core
{
namespace usub::core {
SharedCommandQueue::SharedCommandQueue(size_t capacity)
: capacity_(capacity), head_(0), tail_(0)
{
: capacity_(capacity), head_(0), tail_(0) {
static_assert(offsetof(SharedCommandQueue, buffer_) % alignof(Command) == 0, "buffer_ is not properly aligned");
for (size_t i = 0; i < this->capacity_; ++i)
new(&this->buffer_[i]) Command();
}
SharedCommandQueue::~SharedCommandQueue()
{
SharedCommandQueue::~SharedCommandQueue() {
for (size_t i = 0; i < this->capacity_; ++i)
{
this->buffer_[i].~Command();
}
(this->buffer_ + i)->~Command();
}
bool SharedCommandQueue::try_push(const Command& cmd)
{
bool SharedCommandQueue::try_push(const Command &cmd) {
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false; // Очередь полна
Command& slot = this->buffer_[head];
Command &slot = this->buffer_[head];
slot.ready.store(0, std::memory_order_relaxed);
slot.op = cmd.op;
slot.key = cmd.key;
@ -41,8 +36,7 @@ namespace usub::core
return true;
}
bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count)
{
bool SharedCommandQueue::try_push_batch(const Command *cmds, size_t count) {
size_t head = this->head_.load(std::memory_order_relaxed);
size_t tail = this->tail_.load(std::memory_order_acquire);
@ -50,37 +44,32 @@ namespace usub::core
if (free_slots < count)
return false;
for (size_t i = 0; i < count; ++i)
{
Command& slot = this->buffer_[(head + i) % this->capacity_];
for (size_t i = 0; i < count; ++i) {
Command &slot = this->buffer_[(head + i) % this->capacity_];
slot = cmds[i];
}
this->head_.store((head + count) % this->capacity_, std::memory_order_release);
return true;
}
void SharedCommandQueue::finalize(Command* cmd)
{
void SharedCommandQueue::finalize(Command *cmd) {
cmd->ready.store(1, std::memory_order_release);
}
std::optional<Command> SharedCommandQueue::try_pop()
{
std::optional<Command> SharedCommandQueue::try_pop() {
size_t tail = this->tail_.load(std::memory_order_relaxed);
if (tail == this->head_.load(std::memory_order_acquire))
{
if (tail == this->head_.load(std::memory_order_acquire)) {
return std::nullopt;
}
Command& slot = this->buffer_[tail];
Command &slot = this->buffer_[tail];
Command cmd = slot;
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
return cmd;
}
size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count)
{
size_t SharedCommandQueue::try_pop_batch(Command *out, size_t max_count) {
size_t tail = this->tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire);
@ -89,9 +78,8 @@ namespace usub::core
size_t copied = 0;
for (size_t i = 0; i < to_pop; ++i)
{
Command& src = this->buffer_[(tail + i) % this->capacity_];
for (size_t i = 0; i < to_pop; ++i) {
Command &src = this->buffer_[(tail + i) % this->capacity_];
if (src.ready.load(std::memory_order_acquire) == 0)
break;
@ -103,58 +91,51 @@ namespace usub::core
return copied;
}
Command* SharedCommandQueue::peek(size_t index)
{
Command *SharedCommandQueue::peek(size_t index) {
size_t tail = this->tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire);
if (tail == head)
size_t available = (head + this->capacity_ - tail) % this->capacity_;
if (index >= available)
return nullptr;
return &this->buffer_[tail % this->capacity_];
return &this->buffer_[(tail + index) % this->capacity_];
}
void SharedCommandQueue::pop()
{
void SharedCommandQueue::pop() {
size_t tail = this->tail_.load(std::memory_order_relaxed);
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release);
}
size_t SharedCommandQueue::pending_count() const
{
size_t SharedCommandQueue::pending_count() const {
size_t head = this->head_.load(std::memory_order_acquire);
size_t tail = this->tail_.load(std::memory_order_relaxed);
return (head + this->capacity_ - tail) % this->capacity_;
}
Command* SharedCommandQueue::raw_buffer() noexcept
{
Command *SharedCommandQueue::raw_buffer() noexcept {
return this->buffer_;
}
size_t SharedCommandQueue::capacity() const noexcept
{
size_t SharedCommandQueue::capacity() const noexcept {
return this->capacity_;
}
std::atomic<size_t>& SharedCommandQueue::head() noexcept
{
std::atomic<size_t> &SharedCommandQueue::head() noexcept {
return this->head_;
}
std::atomic<size_t>& SharedCommandQueue::tail() noexcept
{
std::atomic<size_t> &SharedCommandQueue::tail() noexcept {
return this->tail_;
}
bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value)
{
bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128 &key, const std::string &value) {
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = this->buffer_[head];
Command &slot = this->buffer_[head];
slot.op = OperationType::PUT;
slot.key = key;
slot.value_size = static_cast<uint32_t>(value.size());
@ -167,14 +148,13 @@ namespace usub::core
return true;
}
bool SharedCommandQueue::enqueue_find(const utils::Hash128& key)
{
bool SharedCommandQueue::enqueue_find(const utils::Hash128 &key) {
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = buffer_[head];
Command &slot = buffer_[head];
slot.op = OperationType::FIND;
slot.key = key;
slot.value_size = 0;
@ -186,11 +166,18 @@ namespace usub::core
return true;
}
bool SharedCommandQueue::await_response(Command& cmd)
{
bool SharedCommandQueue::await_response(Command &cmd) {
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
}
size_t SharedCommandQueue::calculate_shm_size(size_t capacity) {
return (sizeof(SharedCommandQueue) + sizeof(Command) * capacity + SHM_ALIGNMENT - 1) & ~(SHM_ALIGNMENT - 1);
}
void SharedCommandQueue::reset() {
this->head_.store(0, std::memory_order_relaxed);
this->tail_.store(0, std::memory_order_relaxed);
}
}

View File

@ -10,53 +10,55 @@
#include <atomic>
#include <cstdint>
namespace usub::core
{
constexpr size_t SHM_QUEUE_CAPACITY = 1024;
namespace usub::core {
constexpr size_t SHM_ALIGNMENT = 64;
class SharedCommandQueue
{
class alignas(SHM_ALIGNMENT) SharedCommandQueue {
public:
explicit SharedCommandQueue(size_t capacity);
~SharedCommandQueue();
bool try_push(const Command& cmd);
bool try_push(const Command &cmd);
bool try_push_batch(const Command* cmds, size_t count);
bool try_push_batch(const Command *cmds, size_t count);
void finalize(Command* cmd);
void finalize(Command *cmd);
std::optional<Command> try_pop();
size_t try_pop_batch(Command* out, size_t max_count);
size_t try_pop_batch(Command *out, size_t max_count);
Command* peek(size_t index);
Command *peek(size_t index);
void pop();
size_t pending_count() const;
Command* raw_buffer() noexcept;
Command *raw_buffer() noexcept;
size_t capacity() const noexcept;
std::atomic<size_t>& head() noexcept;
std::atomic<size_t> &head() noexcept;
std::atomic<size_t>& tail() noexcept;
std::atomic<size_t> &tail() noexcept;
bool enqueue_put(const usub::utils::Hash128& key, const std::string& value);
bool enqueue_put(const usub::utils::Hash128 &key, const std::string &value);
bool enqueue_find(const usub::utils::Hash128& key);
bool enqueue_find(const usub::utils::Hash128 &key);
bool await_response(Command& cmd);
static bool await_response(Command &cmd);
private:
void reset();
static size_t calculate_shm_size(size_t capacity);
// private:
public:
size_t capacity_;
Command* commands_;
std::atomic<size_t> head_;
std::atomic<size_t> tail_;
Command buffer_[];
alignas(SHM_ALIGNMENT) std::atomic<size_t> head_;
alignas(SHM_ALIGNMENT) std::atomic<size_t> tail_;
alignas(SHM_ALIGNMENT) Command buffer_[];
};
}

View File

@ -4,27 +4,26 @@
#include "UDB.h"
namespace usub::core
{
UDB::UDB(const std::string& db_name, const std::string& shm_name,
namespace usub::core {
UDB::UDB(const std::string &db_name, const std::string &shm_name,
utils::DatabaseSettings settings, bool create_new)
: db_name_(db_name),
shm_name_(shm_name),
shm_queue_capacity_(settings.shm_queue_capacity),
max_memtable_size_(settings.max_memtable_size),
shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new),
version_manager_(db_name),
memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size,
this->version_manager_),
compactor_(this->version_manager_),
running_(true), recovery_log_(this->db_name_)
{
if (shm_manager_.needs_init())
{
new(shm_manager_.base_ptr()) SharedCommandQueue(shm_queue_capacity_);
: db_name_(db_name),
shm_name_(shm_name),
shm_queue_capacity_(settings.shm_queue_capacity),
max_memtable_size_(settings.max_memtable_size),
shm_manager_(shm_name, SharedCommandQueue::calculate_shm_size(settings.shm_queue_capacity), create_new),
version_manager_(db_name),
memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size,
this->version_manager_),
compactor_(this->version_manager_),
running_(true), recovery_log_(this->db_name_) {
if (shm_manager_.needs_init()) {
new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_);
} else {
this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(this->shm_manager_.base_ptr());
}
command_queue_ = reinterpret_cast<SharedCommandQueue*>(shm_manager_.base_ptr());
this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(shm_manager_.base_ptr());
recover_from_logs();
@ -32,37 +31,30 @@ namespace usub::core
compactor_.run();
}
UDB::~UDB()
{
UDB::~UDB() {
running_ = false;
if (this->background_flush_thread_.joinable())
this->background_flush_thread_.join();
}
void UDB::recover_from_logs()
{
this->recovery_log_.replay([this](const Command& cmd)
{
switch (cmd.op)
{
case OperationType::PUT:
{
void UDB::recover_from_logs() {
this->recovery_log_.replay([this](const Command &cmd) {
switch (cmd.op) {
case OperationType::PUT: {
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
break;
}
case OperationType::DELETE:
{
case OperationType::DELETE: {
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
default:
{
default: {
std::cout << "RecoveryLog: skip unknown op = " << static_cast<int>(cmd.op) << "\n";
break;
}
@ -72,77 +64,61 @@ namespace usub::core
std::cout << "[UDB] Recovery complete for database: " << this->db_name_ << "\n";
}
void UDB::run()
{
void UDB::run() {
std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n";
while (this->running_)
{
Command* cmd = this->command_queue_->peek(0);
while (this->running_) {
Command *cmd = this->command_queue_->peek(0);
if (!cmd)
{
if (!cmd) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
if (cmd->ready.load(std::memory_order_acquire) == 1)
{
if (cmd->ready.load(std::memory_order_acquire) == 1) {
process_command(*cmd);
this->command_queue_->pop();
}
else
{
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
void UDB::process_command(Command& cmd)
{
void UDB::process_command(Command &cmd) {
this->recovery_log_.log_command(cmd);
switch (cmd.op)
{
case OperationType::PUT:
{
switch (cmd.op) {
case OperationType::PUT: {
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
break;
}
case OperationType::DELETE:
{
case OperationType::DELETE: {
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
case OperationType::FIND:
{
case OperationType::FIND: {
auto it = this->fast_cache_.find(cmd.key);
if (it != this->fast_cache_.end())
{
if (it != this->fast_cache_.end()) {
std::string value = it->second;
cmd.response_size = static_cast<uint32_t>(value.size());
std::memcpy(cmd.response, value.data(), value.size());
}
else
{
} else {
cmd.response_size = 0;
}
cmd.response_ready.store(1, std::memory_order_release);
break;
}
default:
std::cout << "[UNKNOWN COMMAND]\n";
break;
default:
std::cout << "[UNKNOWN COMMAND]\n";
break;
}
}
void UDB::background_flush_worker()
{
while (running_)
{
void UDB::background_flush_worker() {
while (running_) {
this->memtable_manager_.flush_batch();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

View File

@ -93,8 +93,14 @@ int main()
#endif
try
{
usub::core::DatabaseManager manager("/Users/kirillzhukov/Downloads/test.toml");
using namespace usub::core;
usub::core::DatabaseManager manager("../config.toml");
manager.run_all();
// std::cout << sizeof(usub::core::SharedCommandQueue) << ", " << SharedCommandQueue::calculate_shm_size(1024) << "\n";
// usub::core::SharedMemoryManager shm_manager{"rates_1", SharedCommandQueue::calculate_shm_size(1024)};
// new(shm_manager.base_ptr()) SharedCommandQueue(1024);
// auto* queue = static_cast<usub::core::SharedCommandQueue*>(shm_manager.base_ptr());
// std::cout << queue->head_.load() << '\n';
}
catch (const std::exception& ex)
{