From 199fb1f34b527250ebfe1a227abeb4c50d3058b7 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Mon, 21 Apr 2025 14:08:42 +0300 Subject: [PATCH] make compactor more flexible --- client.cpp | 16 +++++- core/Memtable.h | 2 + core/SharedCommandQueue.cpp | 1 + core/SharedCommandQueue.h | 6 +-- core/UDB.cpp | 9 ++-- utils/datastructures/LFCircullarBuffer.h | 18 +++++++ utils/io/Compactor.cpp | 31 +++++++++--- utils/io/Compactor.h | 3 ++ utils/io/RecoveryLog.cpp | 64 +++++++++++++----------- utils/io/RecoveryLog.h | 10 ++-- utils/io/VersionManager.cpp | 11 ++-- utils/io/VersionManager.h | 2 + 12 files changed, 121 insertions(+), 52 deletions(-) diff --git a/client.cpp b/client.cpp index ba13283..98c300a 100644 --- a/client.cpp +++ b/client.cpp @@ -5,6 +5,8 @@ #include #include #include +#include + #include "core/SharedCommandQueue.h" #include "utils/hash/Hash128.h" @@ -76,7 +78,15 @@ int main() return 1; } - void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); + struct stat statbuf; + if (fstat(shm_fd, &statbuf) == -1) + { + perror("fstat failed"); + close(shm_fd); + return 1; + } + + void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { perror("mmap failed"); @@ -91,12 +101,14 @@ int main() // PUT Command // ------------------- { + std::cout << "1\n"; size_t head = queue->head().load(std::memory_order_relaxed); size_t next_head = (head + 1) % queue->capacity(); while (next_head == queue->tail().load(std::memory_order_acquire)) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + std::cout << "2\n"; usub::core::Command* slot = &queue->raw_buffer()[head]; @@ -104,7 +116,9 @@ int main() slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов const char* put_value = "test_value"; slot->value_size = std::strlen(put_value); + std::cout << "3\n"; std::memcpy(slot->value, put_value, slot->value_size); + std::cout << "4\n"; slot->response_size = 0; slot->response_ready.store(0, std::memory_order_relaxed); diff --git a/core/Memtable.h b/core/Memtable.h index 4367c2b..4a85729 100644 --- a/core/Memtable.h +++ b/core/Memtable.h @@ -105,6 +105,7 @@ namespace usub::shared_storage delete old_memtable; this->wal.close(); this->flushing.store(false); + this->version_manager.flush(); } template @@ -126,6 +127,7 @@ namespace usub::shared_storage { flush(); } + this->version_manager.flush(); } template diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index 44785fd..528a4e4 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -192,4 +192,5 @@ namespace usub::core std::this_thread::sleep_for(std::chrono::milliseconds(1)); return cmd.response_size != 0; } + } diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index ccf594b..7044b2b 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -54,9 +54,9 @@ namespace usub::core private: size_t capacity_; Command* commands_; - alignas(64) std::atomic head_; - alignas(64) std::atomic tail_; - alignas(64) Command buffer_[]; + std::atomic head_; + std::atomic tail_; + Command buffer_[]; }; } diff --git a/core/UDB.cpp b/core/UDB.cpp index 2e2ad05..8809b2f 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -48,6 +48,7 @@ namespace usub::core { case OperationType::PUT: { + std::cout << "PUT\n"; std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); @@ -55,6 +56,7 @@ namespace usub::core } case OperationType::DELETE: { + std::cout << "DELETE\n"; this->fast_cache_.erase(cmd.key); this->memtable_manager_.remove(cmd.key); break; @@ -103,7 +105,6 @@ namespace usub::core { case OperationType::PUT: { - std::cout << "PUT\n"; std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); @@ -111,17 +112,15 @@ namespace usub::core } case OperationType::DELETE: { - std::cout << "DELETE\n"; this->fast_cache_.erase(cmd.key); this->memtable_manager_.remove(cmd.key); break; } case OperationType::FIND: { - auto it = fast_cache_.find(cmd.key); - if (it != fast_cache_.end()) + auto it = this->fast_cache_.find(cmd.key); + if (it != this->fast_cache_.end()) { - std::cout << "FIND\n"; std::string value = it->second; cmd.response_size = static_cast(value.size()); std::memcpy(cmd.response, value.data(), value.size()); diff --git a/utils/datastructures/LFCircullarBuffer.h b/utils/datastructures/LFCircullarBuffer.h index 0281d24..78c8478 100644 --- a/utils/datastructures/LFCircullarBuffer.h +++ b/utils/datastructures/LFCircullarBuffer.h @@ -189,6 +189,24 @@ namespace usub::utils } [[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; } + + bool try_push_batch(const T* items, size_t count) + { + size_t head = this->head.load(std::memory_order_relaxed); + size_t tail = this->tail.load(std::memory_order_acquire); + + size_t free_slots = (tail + this->capacity - head - 1) % this->capacity; + if (free_slots < count) + return false; + + for (size_t i = 0; i < count; ++i) + { + this->buffer[(head + i) % this->capacity] = items[i]; + } + + this->head.store((head + count) % this->capacity, std::memory_order_release); + return true; + } }; } diff --git a/utils/io/Compactor.cpp b/utils/io/Compactor.cpp index 969bcbb..b77a863 100644 --- a/utils/io/Compactor.cpp +++ b/utils/io/Compactor.cpp @@ -34,6 +34,7 @@ namespace usub::utils { while (this->running_) { + // Queue → 0 for (size_t i = 0; i < 8; ++i) { auto file = this->l0_queue_.pop(); @@ -41,7 +42,7 @@ namespace usub::utils { std::lock_guard lock(this->levels_mutex_); this->level0_files_.push_back(*file); - this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик + this->level0_size_.fetch_add(1, std::memory_order_relaxed); } else { @@ -49,19 +50,35 @@ namespace usub::utils } } - if (this->level0_size_.load(std::memory_order_relaxed) >= 4) + bool did_compact = false; + { std::lock_guard lock(this->levels_mutex_); - compact_level(this->level0_files_, this->level1_files_, 0); + if (this->level0_size_.load(std::memory_order_relaxed) >= 4) + { + compact_level(this->level0_files_, this->level1_files_, 0); + did_compact = true; + } } - if (level1_size_.load(std::memory_order_relaxed) >= 4) + // 1 → 2 { std::lock_guard lock(this->levels_mutex_); - compact_level(this->level1_files_, this->level2_files_, 1); + if (this->level1_size_.load(std::memory_order_relaxed) >= 4) + { + compact_level(this->level1_files_, this->level2_files_, 1); + did_compact = true; + } } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (!did_compact) + { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } } @@ -118,6 +135,8 @@ namespace usub::utils this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); this->level2_size_.fetch_add(1, std::memory_order_relaxed); } + + this->version_manager_.flush(); } } // utils // usub diff --git a/utils/io/Compactor.h b/utils/io/Compactor.h index 7b26af4..0138c2c 100644 --- a/utils/io/Compactor.h +++ b/utils/io/Compactor.h @@ -19,13 +19,16 @@ namespace usub::utils { public: explicit Compactor(VersionManager& vm); + ~Compactor(); void add_sstable_l0(const std::string& filename); + void run(); private: void background_worker(); + void compact_level(std::vector& source_files, std::vector& dest_files, int level); private: diff --git a/utils/io/RecoveryLog.cpp b/utils/io/RecoveryLog.cpp index 0f13a9b..8f24e7f 100644 --- a/utils/io/RecoveryLog.cpp +++ b/utils/io/RecoveryLog.cpp @@ -8,24 +8,19 @@ namespace usub::utils { RecoveryLog::RecoveryLog(const std::string& dbname) - : db_name(dbname), - metadata_dir("metadata/" + db_name + "/"), - log_file(metadata_dir + "recovery.log") + : db_name_(dbname), + metadata_dir_("metadata/" + dbname + "/"), + log_file_(metadata_dir_ + "recovery.log") { ensure_metadata_dir(); - this->log_out.open(this->log_file, std::ios::binary | std::ios::app); - if (!this->log_out.is_open()) - { - throw std::runtime_error("Failed to open recovery log for " + this->db_name); - } + this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app); + if (!this->log_out_.is_open()) + throw std::runtime_error("Failed to open recovery log for " + this->db_name_); } RecoveryLog::~RecoveryLog() { - if (this->log_out.is_open()) - { - this->log_out.close(); - } + if (log_out_.is_open()) log_out_.close(); } void RecoveryLog::log_put(const std::string& key, const std::string& value) @@ -33,48 +28,46 @@ namespace usub::utils uint8_t op = 0; uint32_t key_len = key.size(); uint32_t value_len = value.size(); - this->log_out.write(reinterpret_cast(&op), sizeof(op)); - this->log_out.write(reinterpret_cast(&key_len), sizeof(key_len)); - this->log_out.write(key.data(), key_len); - this->log_out.write(reinterpret_cast(&value_len), sizeof(value_len)); - this->log_out.write(value.data(), value_len); - this->log_out.flush(); + this->log_out_.write(reinterpret_cast(&op), sizeof(op)); + this->log_out_.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->log_out_.write(key.data(), key_len); + this->log_out_.write(reinterpret_cast(&value_len), sizeof(value_len)); + this->log_out_.write(value.data(), value_len); + this->log_out_.flush(); } void RecoveryLog::log_delete(const std::string& key) { uint8_t op = 1; uint32_t key_len = key.size(); - this->log_out.write(reinterpret_cast(&op), sizeof(op)); - this->log_out.write(reinterpret_cast(&key_len), sizeof(key_len)); - this->log_out.write(key.data(), key_len); - this->log_out.flush(); + this->log_out_.write(reinterpret_cast(&op), sizeof(op)); + this->log_out_.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->log_out_.write(key.data(), key_len); + this->log_out_.flush(); } void RecoveryLog::log_command(const core::Command& cmd) { - cmd.serialize(this->log_out); - this->log_out.flush(); + cmd.serialize(this->log_out_); + this->log_out_.flush(); } void RecoveryLog::ensure_metadata_dir() const { try { - if (!std::filesystem::exists(this->metadata_dir)) - { - std::filesystem::create_directories(this->metadata_dir); - } + if (!std::filesystem::exists(metadata_dir_)) std::filesystem::create_directories(metadata_dir_); } catch (const std::exception& e) { - std::cerr << "Failed to create metadata dir: " << e.what() << std::endl; + std::cerr << "Failed to create metadata directory: " << e.what() << std::endl; } } + void RecoveryLog::replay(const std::function& callback) const { - std::ifstream in(this->log_file, std::ios::binary); + std::ifstream in(this->log_file_, std::ios::binary); if (!in.is_open()) return; @@ -84,5 +77,16 @@ namespace usub::utils callback(cmd); } } + void RecoveryLog::clear() + { + if (this->log_out_.is_open()) + this->log_out_.close(); + + std::filesystem::remove(this->log_file_); + + this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app); + if (!this->log_out_.is_open()) + throw std::runtime_error("Failed to reopen recovery log after clear for " + this->db_name_); + } } // utils // usub diff --git a/utils/io/RecoveryLog.h b/utils/io/RecoveryLog.h index 3dced6a..30908a9 100644 --- a/utils/io/RecoveryLog.h +++ b/utils/io/RecoveryLog.h @@ -29,14 +29,16 @@ namespace usub::utils void replay(const std::function& callback) const; + void clear(); + private: void ensure_metadata_dir() const; private: - std::string db_name; - std::string metadata_dir; - std::string log_file; - std::ofstream log_out; + std::string db_name_; + std::string metadata_dir_; + std::string log_file_; + std::ofstream log_out_; }; } // utils // usub diff --git a/utils/io/VersionManager.cpp b/utils/io/VersionManager.cpp index f6f2253..d1c0280 100644 --- a/utils/io/VersionManager.cpp +++ b/utils/io/VersionManager.cpp @@ -7,9 +7,9 @@ namespace usub::utils { VersionManager::VersionManager(const std::string& dbname) - : db_name(dbname), - metadata_dir("metadata/" + this->db_name + "/"), - version_file(this->metadata_dir + "version.meta") + : db_name(dbname), + metadata_dir("metadata/" + this->db_name + "/"), + version_file(this->metadata_dir + "version.meta") { ensure_metadata_dir(); load_version(); @@ -20,6 +20,11 @@ namespace usub::utils return this->version.fetch_add(1, std::memory_order_relaxed); } + void VersionManager::flush() + { + save_version(); + } + void VersionManager::ensure_metadata_dir() { try diff --git a/utils/io/VersionManager.h b/utils/io/VersionManager.h index 1eb3dbb..bd5550b 100644 --- a/utils/io/VersionManager.h +++ b/utils/io/VersionManager.h @@ -26,6 +26,8 @@ namespace usub::utils uint64_t next_version(); + void flush(); + private: void ensure_metadata_dir();