diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index a18ce6b..8004966 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -10,7 +10,7 @@ namespace usub::core : capacity_(capacity), head_(0), tail_(0) { for (size_t i = 0; i < capacity_; ++i) - new (&buffer_[i]) Command(); + new(&buffer_[i]) Command(); } SharedCommandQueue::~SharedCommandQueue() @@ -102,4 +102,28 @@ namespace usub::core tail_.store((tail + copied) % capacity_, std::memory_order_release); return copied; } + + Command* SharedCommandQueue::peek(size_t index) + { + size_t tail = tail_.load(std::memory_order_relaxed); + size_t head = head_.load(std::memory_order_acquire); + + if (tail == head) + return nullptr; + + return &buffer_[tail % capacity_]; + } + + void SharedCommandQueue::pop() + { + size_t tail = tail_.load(std::memory_order_relaxed); + tail_.store((tail + 1) % capacity_, std::memory_order_release); + } + + size_t SharedCommandQueue::pending_count() const + { + size_t head = head_.load(std::memory_order_acquire); + size_t tail = tail_.load(std::memory_order_relaxed); + return (head + capacity_ - tail) % capacity_; + } } diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index fdd54ea..252cf04 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -31,11 +31,68 @@ namespace usub::core size_t try_pop_batch(Command* out, size_t max_count); + Command* peek(size_t index); + + void pop(); + + size_t pending_count() const; + + Command* raw_buffer() noexcept { return buffer_; } + size_t capacity() const noexcept { return capacity_; } + std::atomic& head() noexcept { return head_; } + std::atomic& tail() noexcept { return tail_; } + + bool enqueue_put(const usub::utils::Hash128& key, const std::string& value) + { + size_t head = head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % capacity_; + if (next_head == tail_.load(std::memory_order_acquire)) + return false; + + Command& slot = buffer_[head]; + slot.op = OperationType::PUT; + slot.key = key; + slot.value_size = static_cast(value.size()); + std::memcpy(slot.value, value.data(), value.size()); + slot.response_size = 0; + slot.response_ready.store(0, std::memory_order_relaxed); + slot.ready.store(1, std::memory_order_release); + + head_.store(next_head, std::memory_order_release); + return true; + } + + bool enqueue_find(const usub::utils::Hash128& key) + { + size_t head = head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % capacity_; + if (next_head == tail_.load(std::memory_order_acquire)) + return false; + + Command& slot = buffer_[head]; + slot.op = OperationType::FIND; + slot.key = key; + slot.value_size = 0; + slot.response_size = 0; + slot.response_ready.store(0, std::memory_order_relaxed); + slot.ready.store(1, std::memory_order_release); + + head_.store(next_head, std::memory_order_release); + return true; + } + + bool await_response(Command& cmd) + { + while (cmd.response_ready.load(std::memory_order_acquire) == 0) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return cmd.response_size != 0; + } + private: size_t capacity_; - std::atomic head_; - std::atomic tail_; Command* commands_; + alignas(64) std::atomic head_; + alignas(64) std::atomic tail_; alignas(64) Command buffer_[]; }; } diff --git a/core/SharedMemoryManager.cpp b/core/SharedMemoryManager.cpp index 493bd37..68cfbb2 100644 --- a/core/SharedMemoryManager.cpp +++ b/core/SharedMemoryManager.cpp @@ -66,6 +66,17 @@ namespace usub::core return true; } + bool SharedMemoryManager::needs_init() const noexcept + { + const uint8_t* base = reinterpret_cast(this->shm_ptr); + for (size_t i = 0; i < sizeof(uint64_t); ++i) + { + if (base[i] != 0) + return false; + } + return true; + } + void SharedMemoryManager::create() { ::shm_unlink((this->shm_name).c_str()); diff --git a/core/SharedMemoryManager.h b/core/SharedMemoryManager.h index 10c61fc..6abc1a1 100644 --- a/core/SharedMemoryManager.h +++ b/core/SharedMemoryManager.h @@ -32,6 +32,8 @@ namespace usub::core static bool exists(const std::string& name); + [[nodiscard]] bool needs_init() const noexcept; + private: void create(); diff --git a/core/UDB.cpp b/core/UDB.cpp index 383e9dc..862096a 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -18,16 +18,30 @@ namespace usub::core compactor_(this->version_manager_), running_(true) { - if (create_new) - new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); - this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); + if (shm_manager_.needs_init()) + { + new(shm_manager_.base_ptr()) SharedCommandQueue(shm_queue_capacity_); + } + + command_queue_ = reinterpret_cast(shm_manager_.base_ptr()); recover_from_logs(); - this->background_flush_thread_ = std::thread(&UDB::background_flush_worker, this); + background_flush_thread_ = std::thread(&UDB::background_flush_worker, this); compactor_.run(); } + // { + // if (create_new) + // new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); + // this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); + // + // recover_from_logs(); + // + // this->background_flush_thread_ = std::thread(&UDB::background_flush_worker, this); + // compactor_.run(); + // } + UDB::~UDB() { running_ = false; @@ -61,21 +75,24 @@ namespace usub::core { std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n"; - constexpr size_t BATCH_SIZE = 64; - Command batch[BATCH_SIZE]; - while (this->running_) { - size_t count = this->command_queue_->try_pop_batch(batch, BATCH_SIZE); - if (count == 0) + Command* cmd = this->command_queue_->peek(0); + + if (!cmd) { - std::this_thread::sleep_for(std::chrono::microseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } - for (size_t i = 0; i < count; ++i) + if (cmd->ready.load(std::memory_order_acquire) == 1) { - this->process_command(batch[i]); + process_command(*cmd); + this->command_queue_->pop(); + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } @@ -86,6 +103,7 @@ namespace usub::core { case OperationType::PUT: { + std::cout << "PUT\n"; std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); @@ -93,6 +111,7 @@ namespace usub::core } case OperationType::DELETE: { + std::cout << "DELETE\n"; fast_cache_.erase(cmd.key); memtable_manager_.remove(cmd.key); break; @@ -102,6 +121,7 @@ namespace usub::core auto it = fast_cache_.find(cmd.key); if (it != fast_cache_.end()) { + std::cout << "FIND\n"; std::string value = it->second; cmd.response_size = static_cast(value.size()); std::memcpy(cmd.response, value.data(), value.size());