From e44279add109f9763860d43fb685eb6180a3e1c2 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Mon, 21 Apr 2025 19:28:00 +0000 Subject: [PATCH] fixed bug on x86 --- CMakeLists.txt | 2 +- client.cpp | 16 +++++- core/Command.cpp | 57 +++++++++---------- core/SharedCommandQueue.cpp | 93 +++++++++++++------------------ core/SharedCommandQueue.h | 44 ++++++++------- core/UDB.cpp | 108 ++++++++++++++---------------------- main.cpp | 8 ++- 7 files changed, 152 insertions(+), 176 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bce5289..c3fd057 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.30) +cmake_minimum_required(VERSION 3.20..3.30) project(sharedMemoryKeyDB) set(CMAKE_CXX_STANDARD 23) diff --git a/client.cpp b/client.cpp index 98c300a..cd807c8 100644 --- a/client.cpp +++ b/client.cpp @@ -69,7 +69,9 @@ int main() { constexpr const char* shm_name = "/shm_rates1"; - constexpr size_t shm_size = sizeof(usub::core::SharedCommandQueue); + constexpr size_t capacity = 1024; // или то, что реально +// size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity); + size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity); int shm_fd = shm_open(shm_name, O_RDWR, 0600); if (shm_fd == -1) @@ -86,7 +88,8 @@ int main() return 1; } - void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); +// void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { perror("mmap failed"); @@ -102,7 +105,7 @@ int main() // ------------------- { std::cout << "1\n"; - size_t head = queue->head().load(std::memory_order_relaxed); + auto head = queue->head().load(std::memory_order_relaxed); size_t next_head = (head + 1) % queue->capacity(); while (next_head == queue->tail().load(std::memory_order_acquire)) { @@ -112,6 +115,13 @@ int main() usub::core::Command* slot = &queue->raw_buffer()[head]; + std::cout << "queue = " << queue << '\n'; + std::cout << "queue->capacity() = " << queue->capacity() << '\n'; + std::cout << "head = " << head << '\n'; + std::cout << "ptr to raw_buffer = " << static_cast(queue->raw_buffer()) << '\n'; + std::cout << "ptr to slot = " << static_cast(&queue->raw_buffer()[head]) << '\n'; + std::cout.flush(); + slot->op = usub::core::OperationType::PUT; slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов const char* put_value = "test_value"; diff --git a/core/Command.cpp b/core/Command.cpp index 63e95ed..334d622 100644 --- a/core/Command.cpp +++ b/core/Command.cpp @@ -7,10 +7,8 @@ #include #include -namespace usub::core -{ - Command::Command() - { +namespace usub::core { + Command::Command() { ready.store(0, std::memory_order_relaxed); response_ready.store(0, std::memory_order_relaxed); op = OperationType::PUT; @@ -19,8 +17,7 @@ namespace usub::core response_size = 0; } - Command::Command(const Command& other) - { + Command::Command(const Command &other) { ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); op = other.op; key = other.key; @@ -32,10 +29,8 @@ namespace usub::core std::memcpy(response, other.response, other.response_size); } - Command& Command::operator=(const Command& other) - { - if (this != &other) - { + Command &Command::operator=(const Command &other) { + if (this != &other) { ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); op = other.op; key = other.key; @@ -49,34 +44,34 @@ namespace usub::core return *this; } - void Command::serialize(std::ostream& out) const - { + void Command::serialize(std::ostream &out) const { auto op_val = static_cast(op); - out.write(reinterpret_cast(&op_val), sizeof(op_val)); - out.write(reinterpret_cast(&key), sizeof(key)); - - if (op == OperationType::PUT) - { - out.write(reinterpret_cast(&value_size), sizeof(value_size)); - out.write(value, value_size); - } + out.write(reinterpret_cast(&op_val), sizeof(op_val)); + out.write(reinterpret_cast(&key), sizeof(key)); + out.write(reinterpret_cast(&value_size), sizeof(value_size)); + out.write(value, value_size); + out.write(reinterpret_cast(&response_size), sizeof(response_size)); + out.write(response, response_size); } - Command Command::deserialize(std::istream& in) - { - uint8_t op_val; - in.read(reinterpret_cast(&op_val), sizeof(op_val)); - + Command Command::deserialize(std::istream &in) { Command cmd; + + uint8_t op_val; + in.read(reinterpret_cast(&op_val), sizeof(op_val)); cmd.op = static_cast(op_val); - in.read(reinterpret_cast(&cmd.key), sizeof(cmd.key)); - - if (cmd.op == OperationType::PUT) - { - in.read(reinterpret_cast(&cmd.value_size), sizeof(cmd.value_size)); + in.read(reinterpret_cast(&cmd.key), sizeof(cmd.key)); + in.read(reinterpret_cast(&cmd.value_size), sizeof(cmd.value_size)); + if (cmd.value_size > 0) in.read(cmd.value, cmd.value_size); - } + + in.read(reinterpret_cast(&cmd.response_size), sizeof(cmd.response_size)); + if (cmd.response_size > 0) + in.read(cmd.response, cmd.response_size); + + cmd.ready.store(1, std::memory_order_relaxed); + cmd.response_ready.store(1, std::memory_order_relaxed); return cmd; } diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index 528a4e4..53bc862 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -4,32 +4,27 @@ #include "SharedCommandQueue.h" -namespace usub::core -{ +namespace usub::core { SharedCommandQueue::SharedCommandQueue(size_t capacity) - : capacity_(capacity), head_(0), tail_(0) - { + : capacity_(capacity), head_(0), tail_(0) { + static_assert(offsetof(SharedCommandQueue, buffer_) % alignof(Command) == 0, "buffer_ is not properly aligned"); for (size_t i = 0; i < this->capacity_; ++i) new(&this->buffer_[i]) Command(); } - SharedCommandQueue::~SharedCommandQueue() - { + SharedCommandQueue::~SharedCommandQueue() { for (size_t i = 0; i < this->capacity_; ++i) - { - this->buffer_[i].~Command(); - } + (this->buffer_ + i)->~Command(); } - bool SharedCommandQueue::try_push(const Command& cmd) - { + bool SharedCommandQueue::try_push(const Command &cmd) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; // Очередь полна - Command& slot = this->buffer_[head]; + Command &slot = this->buffer_[head]; slot.ready.store(0, std::memory_order_relaxed); slot.op = cmd.op; slot.key = cmd.key; @@ -41,8 +36,7 @@ namespace usub::core return true; } - bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) - { + bool SharedCommandQueue::try_push_batch(const Command *cmds, size_t count) { size_t head = this->head_.load(std::memory_order_relaxed); size_t tail = this->tail_.load(std::memory_order_acquire); @@ -50,37 +44,32 @@ namespace usub::core if (free_slots < count) return false; - for (size_t i = 0; i < count; ++i) - { - Command& slot = this->buffer_[(head + i) % this->capacity_]; + for (size_t i = 0; i < count; ++i) { + Command &slot = this->buffer_[(head + i) % this->capacity_]; slot = cmds[i]; } this->head_.store((head + count) % this->capacity_, std::memory_order_release); return true; } - void SharedCommandQueue::finalize(Command* cmd) - { + void SharedCommandQueue::finalize(Command *cmd) { cmd->ready.store(1, std::memory_order_release); } - std::optional SharedCommandQueue::try_pop() - { + std::optional SharedCommandQueue::try_pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); - if (tail == this->head_.load(std::memory_order_acquire)) - { + if (tail == this->head_.load(std::memory_order_acquire)) { return std::nullopt; } - Command& slot = this->buffer_[tail]; + Command &slot = this->buffer_[tail]; Command cmd = slot; this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); return cmd; } - size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) - { + size_t SharedCommandQueue::try_pop_batch(Command *out, size_t max_count) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); @@ -89,9 +78,8 @@ namespace usub::core size_t copied = 0; - for (size_t i = 0; i < to_pop; ++i) - { - Command& src = this->buffer_[(tail + i) % this->capacity_]; + for (size_t i = 0; i < to_pop; ++i) { + Command &src = this->buffer_[(tail + i) % this->capacity_]; if (src.ready.load(std::memory_order_acquire) == 0) break; @@ -103,58 +91,51 @@ namespace usub::core return copied; } - Command* SharedCommandQueue::peek(size_t index) - { + Command *SharedCommandQueue::peek(size_t index) { size_t tail = this->tail_.load(std::memory_order_relaxed); size_t head = this->head_.load(std::memory_order_acquire); - if (tail == head) + size_t available = (head + this->capacity_ - tail) % this->capacity_; + if (index >= available) return nullptr; - return &this->buffer_[tail % this->capacity_]; + return &this->buffer_[(tail + index) % this->capacity_]; } - void SharedCommandQueue::pop() - { + void SharedCommandQueue::pop() { size_t tail = this->tail_.load(std::memory_order_relaxed); this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); } - size_t SharedCommandQueue::pending_count() const - { + size_t SharedCommandQueue::pending_count() const { size_t head = this->head_.load(std::memory_order_acquire); size_t tail = this->tail_.load(std::memory_order_relaxed); return (head + this->capacity_ - tail) % this->capacity_; } - Command* SharedCommandQueue::raw_buffer() noexcept - { + Command *SharedCommandQueue::raw_buffer() noexcept { return this->buffer_; } - size_t SharedCommandQueue::capacity() const noexcept - { + size_t SharedCommandQueue::capacity() const noexcept { return this->capacity_; } - std::atomic& SharedCommandQueue::head() noexcept - { + std::atomic &SharedCommandQueue::head() noexcept { return this->head_; } - std::atomic& SharedCommandQueue::tail() noexcept - { + std::atomic &SharedCommandQueue::tail() noexcept { return this->tail_; } - bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value) - { + bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128 &key, const std::string &value) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % this->capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; - Command& slot = this->buffer_[head]; + Command &slot = this->buffer_[head]; slot.op = OperationType::PUT; slot.key = key; slot.value_size = static_cast(value.size()); @@ -167,14 +148,13 @@ namespace usub::core return true; } - bool SharedCommandQueue::enqueue_find(const utils::Hash128& key) - { + bool SharedCommandQueue::enqueue_find(const utils::Hash128 &key) { size_t head = this->head_.load(std::memory_order_relaxed); size_t next_head = (head + 1) % capacity_; if (next_head == this->tail_.load(std::memory_order_acquire)) return false; - Command& slot = buffer_[head]; + Command &slot = buffer_[head]; slot.op = OperationType::FIND; slot.key = key; slot.value_size = 0; @@ -186,11 +166,18 @@ namespace usub::core return true; } - bool SharedCommandQueue::await_response(Command& cmd) - { + bool SharedCommandQueue::await_response(Command &cmd) { while (cmd.response_ready.load(std::memory_order_acquire) == 0) std::this_thread::sleep_for(std::chrono::milliseconds(1)); return cmd.response_size != 0; } + size_t SharedCommandQueue::calculate_shm_size(size_t capacity) { + return (sizeof(SharedCommandQueue) + sizeof(Command) * capacity + SHM_ALIGNMENT - 1) & ~(SHM_ALIGNMENT - 1); + } + + void SharedCommandQueue::reset() { + this->head_.store(0, std::memory_order_relaxed); + this->tail_.store(0, std::memory_order_relaxed); + } } diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index 7044b2b..74c39c9 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -10,53 +10,55 @@ #include #include -namespace usub::core -{ - constexpr size_t SHM_QUEUE_CAPACITY = 1024; +namespace usub::core { + constexpr size_t SHM_ALIGNMENT = 64; - class SharedCommandQueue - { + class alignas(SHM_ALIGNMENT) SharedCommandQueue { public: explicit SharedCommandQueue(size_t capacity); ~SharedCommandQueue(); - bool try_push(const Command& cmd); + bool try_push(const Command &cmd); - bool try_push_batch(const Command* cmds, size_t count); + bool try_push_batch(const Command *cmds, size_t count); - void finalize(Command* cmd); + void finalize(Command *cmd); std::optional try_pop(); - size_t try_pop_batch(Command* out, size_t max_count); + size_t try_pop_batch(Command *out, size_t max_count); - Command* peek(size_t index); + Command *peek(size_t index); void pop(); size_t pending_count() const; - Command* raw_buffer() noexcept; + Command *raw_buffer() noexcept; size_t capacity() const noexcept; - std::atomic& head() noexcept; + std::atomic &head() noexcept; - std::atomic& tail() noexcept; + std::atomic &tail() noexcept; - bool enqueue_put(const usub::utils::Hash128& key, const std::string& value); + bool enqueue_put(const usub::utils::Hash128 &key, const std::string &value); - bool enqueue_find(const usub::utils::Hash128& key); + bool enqueue_find(const usub::utils::Hash128 &key); - bool await_response(Command& cmd); + static bool await_response(Command &cmd); - private: + void reset(); + + static size_t calculate_shm_size(size_t capacity); + +// private: + public: size_t capacity_; - Command* commands_; - std::atomic head_; - std::atomic tail_; - Command buffer_[]; + alignas(SHM_ALIGNMENT) std::atomic head_; + alignas(SHM_ALIGNMENT) std::atomic tail_; + alignas(SHM_ALIGNMENT) Command buffer_[]; }; } diff --git a/core/UDB.cpp b/core/UDB.cpp index 8809b2f..786ca39 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -4,27 +4,26 @@ #include "UDB.h" -namespace usub::core -{ - UDB::UDB(const std::string& db_name, const std::string& shm_name, +namespace usub::core { + UDB::UDB(const std::string &db_name, const std::string &shm_name, utils::DatabaseSettings settings, bool create_new) - : db_name_(db_name), - shm_name_(shm_name), - shm_queue_capacity_(settings.shm_queue_capacity), - max_memtable_size_(settings.max_memtable_size), - shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new), - version_manager_(db_name), - memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, - this->version_manager_), - compactor_(this->version_manager_), - running_(true), recovery_log_(this->db_name_) - { - if (shm_manager_.needs_init()) - { - new(shm_manager_.base_ptr()) SharedCommandQueue(shm_queue_capacity_); + : db_name_(db_name), + shm_name_(shm_name), + shm_queue_capacity_(settings.shm_queue_capacity), + max_memtable_size_(settings.max_memtable_size), + shm_manager_(shm_name, SharedCommandQueue::calculate_shm_size(settings.shm_queue_capacity), create_new), + version_manager_(db_name), + memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, + this->version_manager_), + compactor_(this->version_manager_), + running_(true), recovery_log_(this->db_name_) { + if (shm_manager_.needs_init()) { + new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); + } else { + this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); } - command_queue_ = reinterpret_cast(shm_manager_.base_ptr()); + this->command_queue_ = reinterpret_cast(shm_manager_.base_ptr()); recover_from_logs(); @@ -32,37 +31,30 @@ namespace usub::core compactor_.run(); } - UDB::~UDB() - { + UDB::~UDB() { running_ = false; if (this->background_flush_thread_.joinable()) this->background_flush_thread_.join(); } - void UDB::recover_from_logs() - { - this->recovery_log_.replay([this](const Command& cmd) - { - switch (cmd.op) - { - case OperationType::PUT: - { + void UDB::recover_from_logs() { + this->recovery_log_.replay([this](const Command &cmd) { + switch (cmd.op) { + case OperationType::PUT: { std::cout << "PUT\n"; std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); break; } - case OperationType::DELETE: - { + case OperationType::DELETE: { std::cout << "DELETE\n"; this->fast_cache_.erase(cmd.key); this->memtable_manager_.remove(cmd.key); break; } - default: - { + default: { std::cout << "RecoveryLog: skip unknown op = " << static_cast(cmd.op) << "\n"; break; } @@ -72,77 +64,61 @@ namespace usub::core std::cout << "[UDB] Recovery complete for database: " << this->db_name_ << "\n"; } - void UDB::run() - { + void UDB::run() { std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n"; - while (this->running_) - { - Command* cmd = this->command_queue_->peek(0); + while (this->running_) { + Command *cmd = this->command_queue_->peek(0); - if (!cmd) - { + if (!cmd) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } - if (cmd->ready.load(std::memory_order_acquire) == 1) - { + if (cmd->ready.load(std::memory_order_acquire) == 1) { process_command(*cmd); this->command_queue_->pop(); - } - else - { + } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } - void UDB::process_command(Command& cmd) - { + void UDB::process_command(Command &cmd) { this->recovery_log_.log_command(cmd); - switch (cmd.op) - { - case OperationType::PUT: - { + switch (cmd.op) { + case OperationType::PUT: { std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); break; } - case OperationType::DELETE: - { + case OperationType::DELETE: { this->fast_cache_.erase(cmd.key); this->memtable_manager_.remove(cmd.key); break; } - case OperationType::FIND: - { + case OperationType::FIND: { auto it = this->fast_cache_.find(cmd.key); - if (it != this->fast_cache_.end()) - { + if (it != this->fast_cache_.end()) { std::string value = it->second; cmd.response_size = static_cast(value.size()); std::memcpy(cmd.response, value.data(), value.size()); - } - else - { + } else { cmd.response_size = 0; } cmd.response_ready.store(1, std::memory_order_release); break; } - default: - std::cout << "[UNKNOWN COMMAND]\n"; - break; + default: + std::cout << "[UNKNOWN COMMAND]\n"; + break; } } - void UDB::background_flush_worker() - { - while (running_) - { + void UDB::background_flush_worker() { + while (running_) { this->memtable_manager_.flush_batch(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } diff --git a/main.cpp b/main.cpp index 26bcf33..2131628 100644 --- a/main.cpp +++ b/main.cpp @@ -93,8 +93,14 @@ int main() #endif try { - usub::core::DatabaseManager manager("/Users/kirillzhukov/Downloads/test.toml"); + using namespace usub::core; + usub::core::DatabaseManager manager("../config.toml"); manager.run_all(); +// std::cout << sizeof(usub::core::SharedCommandQueue) << ", " << SharedCommandQueue::calculate_shm_size(1024) << "\n"; +// usub::core::SharedMemoryManager shm_manager{"rates_1", SharedCommandQueue::calculate_shm_size(1024)}; +// new(shm_manager.base_ptr()) SharedCommandQueue(1024); +// auto* queue = static_cast(shm_manager.base_ptr()); +// std::cout << queue->head_.load() << '\n'; } catch (const std::exception& ex) {