From ab7c39ae8ebe509e0fe8ee5e1ef86b6f4ac728e6 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Sun, 20 Apr 2025 21:58:42 +0300 Subject: [PATCH] fixed try_pop_batch --- README.md | 8 +++ core/Command.cpp | 78 ++++++++++++++++++++++++++++++ core/Command.h | 64 ++++++++++++------------ core/DatabaseManager.cpp | 4 +- core/Memtable.h | 6 +-- core/SharedCommandQueue.cpp | 94 ++++++++++++++++++++++++++++++++---- core/SharedCommandQueue.h | 25 +++++++--- core/SharedMemoryManager.cpp | 14 +++++- core/SharedMemoryManager.h | 2 + core/UDB.cpp | 34 ++++++++----- core/UDB.h | 2 +- server.cpp | 74 ---------------------------- 12 files changed, 263 insertions(+), 142 deletions(-) delete mode 100644 server.cpp diff --git a/README.md b/README.md index e69de29..70e0983 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,8 @@ +Поле | Назначение +op | Тип операции: PUT, DELETE, FIND +key | Хэш ключа Hash128 +value | Буфер под значение (для PUT) +value_size | Размер значения в байтах +response | Буфер под ответ от сервера (используется при FIND) +response_size | Размер ответа в байтах +response_ready | Флаг: 0 (ожидание), 1 (ответ готов) \ No newline at end of file diff --git a/core/Command.cpp b/core/Command.cpp index 62a7000..d793bf3 100644 --- a/core/Command.cpp +++ b/core/Command.cpp @@ -3,3 +3,81 @@ // #include "Command.h" + +namespace usub::core +{ + // Command::Command() : op(OperationType::UNKNOWN), value_size(0), response_ready(0), response_size(0) + // { + // std::memset(value, 0, sizeof(value)); + // std::memset(response, 0, sizeof(response)); + // } + // + // Command::Command(const Command& other) + // : op(other.op), + // key(other.key), + // value_size(0), + // response_ready(false), + // response_size(0) + // { + // if (other.value_size > 0 && other.value_size <= sizeof(value)) + // { + // value_size = other.value_size; + // std::memcpy(value, other.value, value_size); + // } + // if (other.response_size > 0 && other.response_size <= sizeof(response)) + // { + // response_size = other.response_size; + // std::memcpy(response, other.response, response_size); + // } + // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + // } + // + // + // Command::Command(Command&& other) noexcept + // : op(other.op), + // key(other.key), + // value_size(other.value_size), + // response_ready(other.response_ready.load(std::memory_order_relaxed)), + // response_size(other.response_size) + // { + // if (value_size > 0) + // std::memcpy(value, other.value, value_size); + // if (response_size > 0) + // std::memcpy(response, other.response, response_size); + // } + // + // Command& Command::operator=(const Command& other) + // { + // if (this != &other) + // { + // op = other.op; + // key = other.key; + // value_size = other.value_size; + // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + // response_size = other.response_size; + // + // if (value_size > 0) + // std::memcpy(value, other.value, value_size); + // if (response_size > 0) + // std::memcpy(response, other.response, response_size); + // } + // return *this; + // } + // + // Command& Command::operator=(Command&& other) noexcept + // { + // if (this != &other) + // { + // op = other.op; + // key = other.key; + // value_size = other.value_size; + // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + // response_size = other.response_size; + // if (value_size > 0) + // std::memcpy(value, other.value, value_size); + // if (response_size > 0) + // std::memcpy(response, other.response, response_size); + // } + // return *this; + // } +} diff --git a/core/Command.h b/core/Command.h index d4b72e0..12bb774 100644 --- a/core/Command.h +++ b/core/Command.h @@ -15,32 +15,43 @@ namespace usub::core { PUT = 0, DELETE = 1, - FIND = 2 + FIND = 2, + UNKNOWN = 0xFF }; - struct alignas(64) Command + struct Command { + std::atomic ready; + OperationType op; + utils::Hash128 key; + uint32_t value_size; + char value[1024]; + + std::atomic response_ready; + uint32_t response_size; + char response[1024]; + Command() - : ready(0), op(OperationType::PUT), key(), value_size(0), value() { + ready.store(0, std::memory_order_relaxed); + response_ready.store(0, std::memory_order_relaxed); + op = OperationType::PUT; + key = utils::Hash128{}; + value_size = 0; + response_size = 0; } Command(const Command& other) - : ready(other.ready.load(std::memory_order_relaxed)), // копируем ready явно - op(other.op), - key(other.key), - value_size(other.value_size), value() { + ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + op = other.op; + key = other.key; + value_size = other.value_size; std::memcpy(value, other.value, other.value_size); - } - Command(Command&& other) noexcept - : ready(other.ready.load(std::memory_order_relaxed)), - op(other.op), - key(other.key), - value_size(other.value_size), value() - { - std::memcpy(value, other.value, other.value_size); + response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + response_size = other.response_size; + std::memcpy(response, other.response, other.response_size); } Command& operator=(const Command& other) @@ -52,28 +63,13 @@ namespace usub::core key = other.key; value_size = other.value_size; std::memcpy(value, other.value, other.value_size); + + response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + response_size = other.response_size; + std::memcpy(response, other.response, other.response_size); } return *this; } - - Command& operator=(Command&& other) noexcept - { - if (this != &other) - { - ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - op = other.op; - key = other.key; - value_size = other.value_size; - std::memcpy(value, other.value, other.value_size); - } - return *this; - } - - std::atomic ready; - OperationType op; - utils::Hash128 key; - uint32_t value_size; - char value[1024]; }; } diff --git a/core/DatabaseManager.cpp b/core/DatabaseManager.cpp index b124355..30e5a0e 100644 --- a/core/DatabaseManager.cpp +++ b/core/DatabaseManager.cpp @@ -34,8 +34,8 @@ namespace usub::core { throw std::runtime_error("Database entry must be a table"); } - - auto udb = std::make_unique(db_name, "shm_" + db_name); + bool create_new = !SharedMemoryManager::exists("shm_" + db_name); + auto udb = std::make_unique(db_name, "shm_" + db_name, 1024, 1024 * 1024, create_new); databases_.push_back(DatabaseInstance{std::move(udb), {}}); } } diff --git a/core/Memtable.h b/core/Memtable.h index 11425b6..bc68c31 100644 --- a/core/Memtable.h +++ b/core/Memtable.h @@ -35,12 +35,12 @@ namespace usub::shared_storage private: std::atomic active_memtable; - utils::WAL wal; + WAL wal; size_t max_memtable_size; std::mutex batch_mutex; std::vector> write_batch; std::atomic flushing{false}; - utils::VersionManager& version_manager; + VersionManager& version_manager; private: size_t estimate_memtable_size() const; @@ -49,7 +49,7 @@ namespace usub::shared_storage }; template - MemTableManager::MemTableManager(const std::string& wal_file, size_t max_size, utils::VersionManager& vm) + MemTableManager::MemTableManager(const std::string& wal_file, size_t max_size, VersionManager& vm) : wal(wal_file), max_memtable_size(max_size), version_manager(vm) { this->active_memtable.store(new SkipList(this->version_manager)); diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index 009c8ba..a18ce6b 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -6,24 +6,100 @@ namespace usub::core { - SharedCommandQueue::SharedCommandQueue() - : SharedCommandQueue(1024) + SharedCommandQueue::SharedCommandQueue(size_t capacity) + : capacity_(capacity), head_(0), tail_(0) { + for (size_t i = 0; i < capacity_; ++i) + new (&buffer_[i]) Command(); } - SharedCommandQueue::SharedCommandQueue(size_t cap) - : capacity(cap), - queue(std::make_unique>(cap)) + SharedCommandQueue::~SharedCommandQueue() { + for (size_t i = 0; i < capacity_; ++i) + { + buffer_[i].~Command(); + } } - bool SharedCommandQueue::try_push(const Command& cmd) const + bool SharedCommandQueue::try_push(const Command& cmd) { - return this->queue->push(cmd); + size_t head = head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % capacity_; + + if (next_head == tail_.load(std::memory_order_acquire)) + return false; // Очередь полна + + Command& slot = buffer_[head]; + slot.ready.store(0, std::memory_order_relaxed); + slot.op = cmd.op; + slot.key = cmd.key; + slot.value_size = cmd.value_size; + std::memcpy(slot.value, cmd.value, cmd.value_size); + slot.response_size = 0; + slot.response_ready.store(0, std::memory_order_relaxed); + head_.store(next_head, std::memory_order_release); + return true; } - std::optional SharedCommandQueue::try_pop() const + bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) { - return this->queue->pop(); + size_t head = head_.load(std::memory_order_relaxed); + size_t tail = tail_.load(std::memory_order_acquire); + + size_t free_slots = (tail + capacity_ - head) % capacity_; + if (free_slots < count) + return false; + + for (size_t i = 0; i < count; ++i) + { + Command& slot = buffer_[(head + i) % capacity_]; + slot = cmds[i]; + } + head_.store((head + count) % capacity_, std::memory_order_release); + return true; + } + + void SharedCommandQueue::finalize(Command* cmd) + { + cmd->ready.store(1, std::memory_order_release); + } + + std::optional SharedCommandQueue::try_pop() + { + size_t tail = tail_.load(std::memory_order_relaxed); + + if (tail == head_.load(std::memory_order_acquire)) + { + return std::nullopt; + } + + Command& slot = buffer_[tail]; + Command cmd = slot; + tail_.store((tail + 1) % capacity_, std::memory_order_release); + return cmd; + } + + size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) + { + size_t tail = tail_.load(std::memory_order_relaxed); + size_t head = head_.load(std::memory_order_acquire); + + size_t available = (head + capacity_ - tail) % capacity_; + size_t to_pop = (available < max_count) ? available : max_count; + + size_t copied = 0; + + for (size_t i = 0; i < to_pop; ++i) + { + Command& src = buffer_[(tail + i) % capacity_]; + + if (src.ready.load(std::memory_order_acquire) == 0) + break; + + out[copied++] = src; + } + + tail_.store((tail + copied) % capacity_, std::memory_order_release); + return copied; } } diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index 507f501..fdd54ea 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -14,18 +14,29 @@ namespace usub::core { constexpr size_t SHM_QUEUE_CAPACITY = 1024; - struct SharedCommandQueue + class SharedCommandQueue { - SharedCommandQueue(); + public: + explicit SharedCommandQueue(size_t capacity); - explicit SharedCommandQueue(size_t cap); + ~SharedCommandQueue(); - bool try_push(const Command& cmd) const; + bool try_push(const Command& cmd); - std::optional try_pop() const; + bool try_push_batch(const Command* cmds, size_t count); - size_t capacity; - std::unique_ptr> queue; + void finalize(Command* cmd); + + std::optional try_pop(); + + size_t try_pop_batch(Command* out, size_t max_count); + + private: + size_t capacity_; + std::atomic head_; + std::atomic tail_; + Command* commands_; + alignas(64) Command buffer_[]; }; } diff --git a/core/SharedMemoryManager.cpp b/core/SharedMemoryManager.cpp index 8092256..493bd37 100644 --- a/core/SharedMemoryManager.cpp +++ b/core/SharedMemoryManager.cpp @@ -57,8 +57,18 @@ namespace usub::core this->shm_fd = -1; } + bool SharedMemoryManager::exists(const std::string& name) + { + int fd = shm_open(("/" + name).c_str(), O_RDWR, 0600); + if (fd == -1) + return false; + close(fd); + return true; + } + void SharedMemoryManager::create() { + ::shm_unlink((this->shm_name).c_str()); this->shm_fd = ::shm_open(this->shm_name.c_str(), O_CREAT | O_RDWR | O_EXCL, 0600); if (this->shm_fd == -1) { @@ -86,7 +96,9 @@ namespace usub::core if (this->shm_fd == -1) { throw std::runtime_error("Failed to open shared memory: " + this->shm_name); - } else { + } + else + { std::cout << "[shm_open] name: " << this->shm_name << ", fd: " << this->shm_fd << '\n'; } this->shm_ptr = ::mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->shm_fd, 0); diff --git a/core/SharedMemoryManager.h b/core/SharedMemoryManager.h index 0416ad3..10c61fc 100644 --- a/core/SharedMemoryManager.h +++ b/core/SharedMemoryManager.h @@ -30,6 +30,8 @@ namespace usub::core void destroy(); + static bool exists(const std::string& name); + private: void create(); diff --git a/core/UDB.cpp b/core/UDB.cpp index ef41c0c..383e9dc 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -18,8 +18,8 @@ namespace usub::core compactor_(this->version_manager_), running_(true) { - ::shm_unlink(("/" + shm_name).c_str()); - new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); + if (create_new) + new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); recover_from_logs(); @@ -34,7 +34,6 @@ namespace usub::core if (this->background_flush_thread_.joinable()) this->background_flush_thread_.join(); - } void UDB::recover_from_logs() @@ -62,21 +61,26 @@ namespace usub::core { std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n"; + constexpr size_t BATCH_SIZE = 64; + Command batch[BATCH_SIZE]; + while (this->running_) { - auto opt_cmd = this->command_queue_->try_pop(); - - if (!opt_cmd) + size_t count = this->command_queue_->try_pop_batch(batch, BATCH_SIZE); + if (count == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::microseconds(500)); continue; } - process_command(*opt_cmd); + for (size_t i = 0; i < count; ++i) + { + this->process_command(batch[i]); + } } } - void UDB::process_command(const Command& cmd) + void UDB::process_command(Command& cmd) { switch (cmd.op) { @@ -97,9 +101,16 @@ namespace usub::core { auto it = fast_cache_.find(cmd.key); if (it != fast_cache_.end()) - std::cout << "[FIND] " << usub::utils::to_string(cmd.key) << " => " << it->second << "\n"; + { + std::string value = it->second; + cmd.response_size = static_cast(value.size()); + std::memcpy(cmd.response, value.data(), value.size()); + } else - std::cout << "[FIND] " << usub::utils::to_string(cmd.key) << " not found\n"; + { + cmd.response_size = 0; + } + cmd.response_ready.store(1, std::memory_order_release); break; } default: @@ -108,6 +119,7 @@ namespace usub::core } } + void UDB::background_flush_worker() { while (running_) diff --git a/core/UDB.h b/core/UDB.h index 87022fd..8a85fab 100644 --- a/core/UDB.h +++ b/core/UDB.h @@ -39,7 +39,7 @@ namespace usub::core void run(); private: - void process_command(const usub::core::Command& cmd); + void process_command(Command& cmd); void background_flush_worker(); void recover_from_logs(); diff --git a/server.cpp b/server.cpp deleted file mode 100644 index 960d918..0000000 --- a/server.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include "core/SharedMemoryManager.h" -#include "core/SharedCommandQueue.h" -#include "core/Command.h" -#include "utils/hash/Hash128.h" -#include "utils/string/basic_utils.h" - -#include -#include -#include -#include - -using namespace usub::core; -using namespace usub::utils; - -int main() -{ - ::shm_unlink("/shm_command_queue"); - try - { - SharedMemoryManager shm("shm_command_queue", sizeof(SharedCommandQueue)); - auto* cmd_queue = new(shm.base_ptr()) SharedCommandQueue(1024); - - std::unordered_map database; - - std::cout << "Server started. Listening for commands...\n"; - - while (true) - { - std::optional opt_cmd = cmd_queue->try_pop(); - - if (!opt_cmd) - { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - continue; - } - - Command& cmd = *opt_cmd; - - switch (cmd.op) - { - case OperationType::PUT: - { - std::string value(cmd.value, cmd.value_size); - database[cmd.key] = value; - std::cout << "[PUT] key = " << to_string(cmd.key) << ", value = " << value << "\n"; - break; - } - case OperationType::DELETE: - { - database.erase(cmd.key); - std::cout << "[DELETE] key = " << to_string(cmd.key) << "\n"; - break; - } - case OperationType::FIND: - { - auto it = database.find(cmd.key); - if (it != database.end()) - std::cout << "[FIND] key = " << to_string(cmd.key) << " => " << it->second << "\n"; - else - std::cout << "[FIND] key = " << to_string(cmd.key) << " not found\n"; - break; - } - default: - std::cout << "[UNKNOWN COMMAND]\n"; - break; - } - } - } - catch (const std::exception& ex) - { - std::cerr << "Server exception: " << ex.what() << std::endl; - return 1; - } -}