From 05916c49d0f5a37259200faacc6829d55c8c5938 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Sun, 20 Apr 2025 20:57:13 +0300 Subject: [PATCH] db started, no answer from server --- core/Command.cpp | 5 + core/Command.h | 80 ++++++++++++ core/DatabaseManager.cpp | 60 +++++++++ core/DatabaseManager.h | 36 ++++++ core/Memtable.h | 16 ++- core/SharedCommandQueue.cpp | 29 +++++ core/SharedCommandQueue.h | 33 +++++ core/SharedMemoryManager.cpp | 4 + core/UDB.cpp | 119 +++++++++++++++++ core/UDB.h | 68 ++++++++++ server.cpp | 74 +++++++++++ utils/datastructures/LFCircullarBuffer.h | 34 +++-- utils/datastructures/LFSkipList.h | 28 ---- utils/io/Compactor.cpp | 156 +++++++++++++++-------- utils/io/Compactor.h | 28 ++-- utils/io/RecoveryLog.cpp | 38 +++++- utils/io/RecoveryLog.h | 6 +- utils/io/SSTableIO.h | 6 +- utils/string/basic_utils.h | 35 +++++ 19 files changed, 736 insertions(+), 119 deletions(-) create mode 100644 core/Command.cpp create mode 100644 core/Command.h create mode 100644 core/DatabaseManager.cpp create mode 100644 core/DatabaseManager.h create mode 100644 core/SharedCommandQueue.cpp create mode 100644 core/SharedCommandQueue.h create mode 100644 core/UDB.cpp create mode 100644 core/UDB.h create mode 100644 server.cpp create mode 100644 utils/string/basic_utils.h diff --git a/core/Command.cpp b/core/Command.cpp new file mode 100644 index 0000000..62a7000 --- /dev/null +++ b/core/Command.cpp @@ -0,0 +1,5 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "Command.h" diff --git a/core/Command.h b/core/Command.h new file mode 100644 index 0000000..d4b72e0 --- /dev/null +++ b/core/Command.h @@ -0,0 +1,80 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef COMMAND_H +#define COMMAND_H + +#include +#include +#include "utils/hash/Hash128.h" + +namespace usub::core +{ + enum class OperationType : uint8_t + { + PUT = 0, + DELETE = 1, + FIND = 2 + }; + + struct alignas(64) Command + { + Command() + : ready(0), op(OperationType::PUT), key(), value_size(0), value() + { + } + + Command(const Command& other) + : ready(other.ready.load(std::memory_order_relaxed)), // копируем ready явно + op(other.op), + key(other.key), + value_size(other.value_size), value() + { + std::memcpy(value, other.value, other.value_size); + } + + Command(Command&& other) noexcept + : ready(other.ready.load(std::memory_order_relaxed)), + op(other.op), + key(other.key), + value_size(other.value_size), value() + { + std::memcpy(value, other.value, other.value_size); + } + + Command& operator=(const Command& other) + { + if (this != &other) + { + ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + op = other.op; + key = other.key; + value_size = other.value_size; + std::memcpy(value, other.value, other.value_size); + } + return *this; + } + + Command& operator=(Command&& other) noexcept + { + if (this != &other) + { + ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + op = other.op; + key = other.key; + value_size = other.value_size; + std::memcpy(value, other.value, other.value_size); + } + return *this; + } + + std::atomic ready; + OperationType op; + utils::Hash128 key; + uint32_t value_size; + char value[1024]; + }; +} + +#endif // COMMAND_H diff --git a/core/DatabaseManager.cpp b/core/DatabaseManager.cpp new file mode 100644 index 0000000..b124355 --- /dev/null +++ b/core/DatabaseManager.cpp @@ -0,0 +1,60 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "DatabaseManager.h" + + +namespace usub::core +{ + DatabaseManager::DatabaseManager(const std::string& config_path) + { + auto config = toml::parse_file(config_path); + + if (!config.contains("database")) + throw std::runtime_error("No 'database' section found in config"); + + auto& databases = *config["database"].as_array(); + + for (auto& db_config : databases) + { + std::string db_name; + if (auto* table = db_config.as_table()) + { + if (auto it = table->find("name"); it != table->end() && it->second.is_string()) + { + db_name = it->second.value().value_or(""); + } + else + { + throw std::runtime_error("Database name must be a string"); + } + } + else + { + throw std::runtime_error("Database entry must be a table"); + } + + auto udb = std::make_unique(db_name, "shm_" + db_name); + databases_.push_back(DatabaseInstance{std::move(udb), {}}); + } + } + + void DatabaseManager::run_all() + { + for (auto& db : databases_) + { + db.worker = std::thread([&db]() + { + db.udb->run(); + }); + } + + for (auto& db : databases_) + { + if (db.worker.joinable()) + db.worker.join(); + } + } +} // core +// usub diff --git a/core/DatabaseManager.h b/core/DatabaseManager.h new file mode 100644 index 0000000..5453b05 --- /dev/null +++ b/core/DatabaseManager.h @@ -0,0 +1,36 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef DATABASEMANAGER_H +#define DATABASEMANAGER_H + +#include "UDB.h" +#include "utils/toml/toml.hpp" +#include +#include +#include +#include + +namespace usub::core +{ + class DatabaseManager + { + public: + explicit DatabaseManager(const std::string& config_path); + + void run_all(); + + private: + struct DatabaseInstance + { + std::unique_ptr udb; + std::thread worker; + }; + + std::vector databases_; + }; +} // core +// usub + +#endif //DATABASEMANAGER_H diff --git a/core/Memtable.h b/core/Memtable.h index 56b965c..11425b6 100644 --- a/core/Memtable.h +++ b/core/Memtable.h @@ -9,14 +9,17 @@ #include #include #include "utils/io/Wal.h" +#include "utils/io/VersionManager.h" namespace usub::shared_storage { + using namespace usub::utils; + template class MemTableManager { public: - MemTableManager(const std::string& wal_file, size_t max_size); + MemTableManager(const std::string& wal_file, size_t max_size, utils::VersionManager& vm); ~MemTableManager(); @@ -37,6 +40,7 @@ namespace usub::shared_storage std::mutex batch_mutex; std::vector> write_batch; std::atomic flushing{false}; + utils::VersionManager& version_manager; private: size_t estimate_memtable_size() const; @@ -45,10 +49,10 @@ namespace usub::shared_storage }; template - MemTableManager::MemTableManager(const std::string& wal_file, size_t max_size) : wal(wal_file), - max_memtable_size(max_size) + MemTableManager::MemTableManager(const std::string& wal_file, size_t max_size, utils::VersionManager& vm) + : wal(wal_file), max_memtable_size(max_size), version_manager(vm) { - this->active_memtable.store(new SkipList()); + this->active_memtable.store(new SkipList(this->version_manager)); } template @@ -93,9 +97,9 @@ namespace usub::shared_storage { if (this->flushing.exchange(true)) return; - auto old_memtable = this->active_memtable.exchange(new SkipList()); + auto old_memtable = this->active_memtable.exchange(new SkipList(this->version_manager)); std::string filename = "sstable_" + std::to_string(std::time(nullptr)) + ".dat"; - write_sstable_v2(*old_memtable, filename); + write_sstable_with_index(*old_memtable, filename); delete old_memtable; this->wal.close(); this->flushing.store(false); diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp new file mode 100644 index 0000000..009c8ba --- /dev/null +++ b/core/SharedCommandQueue.cpp @@ -0,0 +1,29 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "SharedCommandQueue.h" + +namespace usub::core +{ + SharedCommandQueue::SharedCommandQueue() + : SharedCommandQueue(1024) + { + } + + SharedCommandQueue::SharedCommandQueue(size_t cap) + : capacity(cap), + queue(std::make_unique>(cap)) + { + } + + bool SharedCommandQueue::try_push(const Command& cmd) const + { + return this->queue->push(cmd); + } + + std::optional SharedCommandQueue::try_pop() const + { + return this->queue->pop(); + } +} diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h new file mode 100644 index 0000000..507f501 --- /dev/null +++ b/core/SharedCommandQueue.h @@ -0,0 +1,33 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef SHAREDCOMMANDQUEUE_H +#define SHAREDCOMMANDQUEUE_H + +#include "Command.h" +#include "utils/datastructures/LFCircullarBuffer.h" +#include +#include + +namespace usub::core +{ + constexpr size_t SHM_QUEUE_CAPACITY = 1024; + + struct SharedCommandQueue + { + SharedCommandQueue(); + + explicit SharedCommandQueue(size_t cap); + + bool try_push(const Command& cmd) const; + + std::optional try_pop() const; + + size_t capacity; + std::unique_ptr> queue; + }; +} + + +#endif //SHAREDCOMMANDQUEUE_H diff --git a/core/SharedMemoryManager.cpp b/core/SharedMemoryManager.cpp index 2ccb262..8092256 100644 --- a/core/SharedMemoryManager.cpp +++ b/core/SharedMemoryManager.cpp @@ -4,6 +4,8 @@ #include "SharedMemoryManager.h" +#include + namespace usub::core { SharedMemoryManager::SharedMemoryManager(const std::string& name, size_t size, bool create_new) : @@ -84,6 +86,8 @@ namespace usub::core if (this->shm_fd == -1) { throw std::runtime_error("Failed to open shared memory: " + this->shm_name); + } else { + std::cout << "[shm_open] name: " << this->shm_name << ", fd: " << this->shm_fd << '\n'; } this->shm_ptr = ::mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, this->shm_fd, 0); if (this->shm_ptr == MAP_FAILED) diff --git a/core/UDB.cpp b/core/UDB.cpp new file mode 100644 index 0000000..ef41c0c --- /dev/null +++ b/core/UDB.cpp @@ -0,0 +1,119 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "UDB.h" + +namespace usub::core +{ + UDB::UDB(const std::string& db_name, const std::string& shm_name, + size_t shm_queue_capacity, size_t max_memtable_size, bool create_new) + : db_name_(db_name), + shm_name_(shm_name), + shm_queue_capacity_(shm_queue_capacity), + max_memtable_size_(max_memtable_size), + shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new), + version_manager_(db_name), + memtable_manager_(db_name + "_wal", max_memtable_size, this->version_manager_), + compactor_(this->version_manager_), + running_(true) + { + ::shm_unlink(("/" + shm_name).c_str()); + new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); + this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); + + recover_from_logs(); + + this->background_flush_thread_ = std::thread(&UDB::background_flush_worker, this); + compactor_.run(); + } + + UDB::~UDB() + { + running_ = false; + + if (this->background_flush_thread_.joinable()) + this->background_flush_thread_.join(); + + } + + void UDB::recover_from_logs() + { + utils::RecoveryLog recovery_log(db_name_); + + recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone) + { + if (!is_tombstone) + { + this->fast_cache_[key] = value; + this->memtable_manager_.put(key, value); + } + else + { + this->fast_cache_.erase(key); + this->memtable_manager_.remove(key); + } + }); + + std::cout << "[UDB] Recovery complete for database: " << this->db_name_ << "\n"; + } + + void UDB::run() + { + std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n"; + + while (this->running_) + { + auto opt_cmd = this->command_queue_->try_pop(); + + if (!opt_cmd) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } + + process_command(*opt_cmd); + } + } + + void UDB::process_command(const Command& cmd) + { + switch (cmd.op) + { + case OperationType::PUT: + { + std::string value(cmd.value, cmd.value_size); + this->fast_cache_[cmd.key] = value; + this->memtable_manager_.put(cmd.key, value); + break; + } + case OperationType::DELETE: + { + fast_cache_.erase(cmd.key); + memtable_manager_.remove(cmd.key); + break; + } + case OperationType::FIND: + { + auto it = fast_cache_.find(cmd.key); + if (it != fast_cache_.end()) + std::cout << "[FIND] " << usub::utils::to_string(cmd.key) << " => " << it->second << "\n"; + else + std::cout << "[FIND] " << usub::utils::to_string(cmd.key) << " not found\n"; + break; + } + default: + std::cout << "[UNKNOWN COMMAND]\n"; + break; + } + } + + void UDB::background_flush_worker() + { + while (running_) + { + memtable_manager_.flush_batch(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +} diff --git a/core/UDB.h b/core/UDB.h new file mode 100644 index 0000000..87022fd --- /dev/null +++ b/core/UDB.h @@ -0,0 +1,68 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef UDB_H +#define UDB_H + +#include +#include +#include +#include +#include + +#include "Memtable.h" +#include "utils/datastructures/LFCircullarBuffer.h" +#include "core/SharedMemoryManager.h" +#include "core/SharedCommandQueue.h" +#include "core/Command.h" +#include "utils/hash/Hash128.h" +#include "utils/datastructures/LFSkipList.h" +#include "utils/string/basic_utils.h" +#include "utils/io/VersionManager.h" +#include "utils/io/Wal.h" +#include "utils/io/SSTableIO.h" +#include "Memtable.h" +#include "utils/io/RecoveryLog.h" +#include "utils/io/Compactor.h" + +namespace usub::core +{ + class UDB + { + public: + UDB(const std::string& db_name, const std::string& shm_name, size_t shm_queue_capacity = 1024, + size_t max_memtable_size = 1024 * 1024, bool create_new = true); + + ~UDB(); + + void run(); + + private: + void process_command(const usub::core::Command& cmd); + void background_flush_worker(); + void recover_from_logs(); + + private: + std::string db_name_; + std::string shm_name_; + size_t shm_queue_capacity_; + size_t max_memtable_size_; + + SharedMemoryManager shm_manager_; + SharedCommandQueue* command_queue_; + + utils::VersionManager version_manager_; + + shared_storage::MemTableManager> memtable_manager_; + utils::Compactor compactor_; + + std::unordered_map fast_cache_; + + std::atomic running_; + std::thread background_flush_thread_; + }; +} + +#endif // UDB_H + diff --git a/server.cpp b/server.cpp new file mode 100644 index 0000000..960d918 --- /dev/null +++ b/server.cpp @@ -0,0 +1,74 @@ +#include "core/SharedMemoryManager.h" +#include "core/SharedCommandQueue.h" +#include "core/Command.h" +#include "utils/hash/Hash128.h" +#include "utils/string/basic_utils.h" + +#include +#include +#include +#include + +using namespace usub::core; +using namespace usub::utils; + +int main() +{ + ::shm_unlink("/shm_command_queue"); + try + { + SharedMemoryManager shm("shm_command_queue", sizeof(SharedCommandQueue)); + auto* cmd_queue = new(shm.base_ptr()) SharedCommandQueue(1024); + + std::unordered_map database; + + std::cout << "Server started. Listening for commands...\n"; + + while (true) + { + std::optional opt_cmd = cmd_queue->try_pop(); + + if (!opt_cmd) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + continue; + } + + Command& cmd = *opt_cmd; + + switch (cmd.op) + { + case OperationType::PUT: + { + std::string value(cmd.value, cmd.value_size); + database[cmd.key] = value; + std::cout << "[PUT] key = " << to_string(cmd.key) << ", value = " << value << "\n"; + break; + } + case OperationType::DELETE: + { + database.erase(cmd.key); + std::cout << "[DELETE] key = " << to_string(cmd.key) << "\n"; + break; + } + case OperationType::FIND: + { + auto it = database.find(cmd.key); + if (it != database.end()) + std::cout << "[FIND] key = " << to_string(cmd.key) << " => " << it->second << "\n"; + else + std::cout << "[FIND] key = " << to_string(cmd.key) << " not found\n"; + break; + } + default: + std::cout << "[UNKNOWN COMMAND]\n"; + break; + } + } + } + catch (const std::exception& ex) + { + std::cerr << "Server exception: " << ex.what() << std::endl; + return 1; + } +} diff --git a/utils/datastructures/LFCircullarBuffer.h b/utils/datastructures/LFCircullarBuffer.h index e75642f..50aa783 100644 --- a/utils/datastructures/LFCircullarBuffer.h +++ b/utils/datastructures/LFCircullarBuffer.h @@ -46,27 +46,31 @@ namespace usub::utils void reset() { this->count = 0; } }; - template + template class LockFreeRingBuffer { - static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of 2"); - struct alignas(CACHELINE_SIZE) Cell { std::atomic sequence; typename std::aligned_storage::type storage; }; - static constexpr size_t MASK = Capacity - 1; - alignas(CACHELINE_SIZE) Cell buffer[Capacity]; + size_t capacity; + size_t mask; + Cell* buffer; alignas(CACHELINE_SIZE) std::atomic head{0}; alignas(CACHELINE_SIZE) std::atomic tail{0}; public: - LockFreeRingBuffer() + explicit LockFreeRingBuffer(size_t cap = 32) + : capacity(cap), mask(cap - 1), buffer(new Cell[cap]) { - for (size_t i = 0; i < Capacity; ++i) + if ((capacity & (capacity - 1)) != 0) + { + throw std::invalid_argument("Capacity must be a power of 2"); + } + for (size_t i = 0; i < capacity; ++i) { buffer[i].sequence.store(i, std::memory_order_relaxed); } @@ -77,16 +81,18 @@ namespace usub::utils while (pop()) { } + delete[] buffer; } - bool push(const T& val) + template + bool push(U&& val) { ExponentialBackoff backoff; size_t pos = this->head.load(std::memory_order_relaxed); while (true) { - Cell& cell = this->buffer[pos & MASK]; + Cell& cell = this->buffer[pos & mask]; prefetch_for_write(&cell); size_t seq = cell.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos); @@ -95,7 +101,7 @@ namespace usub::utils { if (this->head.compare_exchange_strong(pos, pos + 1, std::memory_order_relaxed)) { - new(&cell.storage) T(val); + new(&cell.storage) T(std::forward(val)); cell.sequence.store(pos + 1, std::memory_order_release); return true; } @@ -119,7 +125,7 @@ namespace usub::utils while (true) { - Cell& cell = this->buffer[pos & MASK]; + Cell& cell = this->buffer[pos & mask]; prefetch_for_read(&cell); size_t seq = cell.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos + 1); @@ -131,7 +137,7 @@ namespace usub::utils T* ptr = reinterpret_cast(&cell.storage); T val = std::move(*ptr); ptr->~T(); - cell.sequence.store(pos + Capacity, std::memory_order_release); + cell.sequence.store(pos + capacity, std::memory_order_release); return val; } } @@ -179,8 +185,10 @@ namespace usub::utils { const size_t h = this->head.load(std::memory_order_acquire); const size_t t = this->tail.load(std::memory_order_acquire); - return (h - t) & MASK; + return (h - t) & mask; } + + [[nodiscard]] size_t get_capacity() const noexcept { return capacity; } }; } diff --git a/utils/datastructures/LFSkipList.h b/utils/datastructures/LFSkipList.h index 98b587f..1de66b7 100644 --- a/utils/datastructures/LFSkipList.h +++ b/utils/datastructures/LFSkipList.h @@ -14,34 +14,6 @@ #include "utils/intrinsincs/optimizations.h" -// #if defined(__x86_64__) || defined(_M_X64) || defined(__i386) || defined(_M_IX86) -// #include -// inline void cpu_relax() noexcept { _mm_pause(); } -// inline void prefetch_for_read(const void* ptr) noexcept { -// _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); -// } -// #elif defined(__aarch64__) -// inline void cpu_relax() noexcept { asm volatile("yield" ::: "memory"); } -// -// inline void prefetch_for_write(const void* ptr) noexcept -// { -// asm volatile("prfm pstl1strm, [%0]" :: "r"(ptr)); -// } -// -// inline void prefetch_for_read(const void* ptr) noexcept -// { -// asm volatile("prfm pldl1keep, [%0]" :: "r"(ptr)); -// } -// #else -// inline void cpu_relax() noexcept -// { -// } -// -// inline void prefetch_for_read(const void*) noexcept -// { -// } -// #endif - namespace usub::utils { constexpr int MAX_LEVEL = 16; diff --git a/utils/io/Compactor.cpp b/utils/io/Compactor.cpp index 9c5b86d..f4f0714 100644 --- a/utils/io/Compactor.cpp +++ b/utils/io/Compactor.cpp @@ -4,73 +4,119 @@ #include "Compactor.h" +#include + namespace usub::utils { - Compactor::Compactor(VersionManager& vm) : version_manager(vm) + Compactor::Compactor(VersionManager& vm) + : version_manager_(vm), running_(true) { - this->background_thread = std::thread([this] { this->run(); }); + } + + Compactor::~Compactor() + { + running_ = false; + if (worker_thread_.joinable()) + worker_thread_.join(); } void Compactor::add_sstable_l0(const std::string& filename) - { - this->level0_files.push_back(filename); - if (this->level0_files.size() >= 4) - { - compact_level0(); - } - } - - void Compactor::compact_level0() - { - if (this->level0_files.size() < 2) return; - - std::string merged_filename = "L1_" + std::to_string(std::time(nullptr)) + ".dat"; - - LFSkipList merged(this->version_manager); - for (const auto& file : this->level0_files) - { - read_sstable_with_mmap(merged, file); - } - - write_sstable_with_index(merged, merged_filename); - - this->level1_files.push_back(merged_filename); - for (const auto& file : this->level0_files) - { - ::remove(file.c_str()); - } - this->level0_files.clear(); - } - - void Compactor::compact_level1() - { - if (this->level1_files.size() < 4) return; - - std::string merged_filename = "L2_" + std::to_string(std::time(nullptr)) + ".dat"; - - LFSkipList merged(this->version_manager); - for (const auto& file : this->level1_files) - { - read_sstable_with_mmap(merged, file); - } - - write_sstable_with_index(merged, merged_filename); - - this->level2_files.push_back(merged_filename); - for (const auto& file : this->level1_files) - { - ::remove(file.c_str()); - } - this->level1_files.clear(); + l0_queue_.push(filename); } void Compactor::run() { - while (this->running.load()) + worker_thread_ = std::thread(&Compactor::background_worker, this); + } + + void Compactor::background_worker() + { + while (running_) { - compact_level0(); - std::this_thread::sleep_for(std::chrono::seconds(1)); + for (size_t i = 0; i < 8; ++i) + { + auto file = l0_queue_.pop(); + if (file) + { + std::lock_guard lock(levels_mutex_); + level0_files_.push_back(*file); + level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик + } + else + { + break; + } + } + + if (level0_size_.load(std::memory_order_relaxed) >= 4) + { + std::lock_guard lock(levels_mutex_); + compact_level(level0_files_, level1_files_, 0); + } + + if (level1_size_.load(std::memory_order_relaxed) >= 4) + { + std::lock_guard lock(levels_mutex_); + compact_level(level1_files_, level2_files_, 1); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + + void Compactor::compact_level(std::vector& source_files, + std::vector& dest_files, int level) + { + if (source_files.empty()) + return; + + size_t batch_size = std::min(4, source_files.size()); + std::vector batch(source_files.begin(), source_files.begin() + batch_size); + + std::vector> loaded; + loaded.reserve(batch_size); + + for (const auto& file : batch) + { + VersionManager dummy_vm("dummy"); + LFSkipList table(dummy_vm); + read_sstable_with_mmap(table, file); + loaded.push_back(std::move(table)); + + std::filesystem::remove(file); + } + + LFSkipList merged(version_manager_); + + for (auto& table : loaded) + { + table.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version) + { + if (!is_tombstone) + merged.insert(key, value); + else + merged.erase(key); + }); + } + + std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" + + std::to_string(version_manager_.next_version()) + ".dat"; + write_sstable_with_index(merged, new_filename); + + dest_files.push_back(new_filename); + + source_files.erase(source_files.begin(), source_files.begin() + batch_size); + + if (level == 0) + { + level0_size_.fetch_sub(batch_size, std::memory_order_relaxed); + level1_size_.fetch_add(1, std::memory_order_relaxed); + } + else if (level == 1) + { + level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); + level2_size_.fetch_add(1, std::memory_order_relaxed); } } } // utils diff --git a/utils/io/Compactor.h b/utils/io/Compactor.h index cf82ef5..7b26af4 100644 --- a/utils/io/Compactor.h +++ b/utils/io/Compactor.h @@ -9,6 +9,7 @@ #include #include #include "utils/datastructures/LFSkipList.h" +#include "utils/datastructures/LFCircullarBuffer.h" #include "utils/io/SSTableIO.h" #include "utils/hash/Hash128.h" @@ -18,25 +19,28 @@ namespace usub::utils { public: explicit Compactor(VersionManager& vm); + ~Compactor(); void add_sstable_l0(const std::string& filename); - - void run(); private: - void compact_level0(); - - void compact_level1(); + void background_worker(); + void compact_level(std::vector& source_files, std::vector& dest_files, int level); private: - std::vector level0_files; - std::vector level1_files; - std::vector level2_files; - std::atomic running{true}; - std::thread background_thread; - VersionManager& version_manager; - }; // utils + VersionManager& version_manager_; + std::atomic running_; + std::thread worker_thread_; + LockFreeRingBuffer l0_queue_{1024}; + std::vector level0_files_; + std::vector level1_files_; + std::vector level2_files_; + std::atomic level0_size_{0}; + std::atomic level1_size_{0}; + std::atomic level2_size_{0}; + std::mutex levels_mutex_; + }; } // usub diff --git a/utils/io/RecoveryLog.cpp b/utils/io/RecoveryLog.cpp index 94765a8..4f94ad1 100644 --- a/utils/io/RecoveryLog.cpp +++ b/utils/io/RecoveryLog.cpp @@ -51,7 +51,7 @@ namespace usub::utils this->log_out.flush(); } - void RecoveryLog::ensure_metadata_dir() + void RecoveryLog::ensure_metadata_dir() const { try { @@ -65,5 +65,41 @@ namespace usub::utils std::cerr << "Failed to create metadata dir: " << e.what() << std::endl; } } + + void RecoveryLog::replay( + const std::function& callback) const + { + std::ifstream in(log_file, std::ios::binary); + if (!in.is_open()) + return; + + while (in.peek() != EOF) + { + uint8_t op; + in.read(reinterpret_cast(&op), sizeof(op)); + + Hash128 key; + in.read(reinterpret_cast(&key), sizeof(key)); + + if (op == 0) // PUT + { + uint32_t value_size; + in.read(reinterpret_cast(&value_size), sizeof(value_size)); + + std::string value(value_size, '\0'); + in.read(&value[0], value_size); + + callback(key, value, false); + } + else if (op == 1) // DELETE + { + callback(key, "", true); + } + else + { + throw std::runtime_error("Invalid RecoveryLog format"); + } + } + } } // utils // usub diff --git a/utils/io/RecoveryLog.h b/utils/io/RecoveryLog.h index 988cc2c..bd6c51a 100644 --- a/utils/io/RecoveryLog.h +++ b/utils/io/RecoveryLog.h @@ -8,6 +8,8 @@ #include #include #include +#include +#include "utils/hash/Hash128.h" namespace usub::utils { @@ -22,8 +24,10 @@ namespace usub::utils void log_delete(const std::string& key); + void replay(const std::function& callback) const; + private: - void ensure_metadata_dir(); + void ensure_metadata_dir() const; private: std::string db_name; diff --git a/utils/io/SSTableIO.h b/utils/io/SSTableIO.h index f3aae5c..c590a6b 100644 --- a/utils/io/SSTableIO.h +++ b/utils/io/SSTableIO.h @@ -2,8 +2,8 @@ // Created by Kirill Zhukov on 20.04.2025. // -#ifndef MEMTABLE_H -#define MEMTABLE_H +#ifndef SSTABLE_IO +#define SSTABLE_IO #include #include @@ -453,4 +453,4 @@ namespace usub::utils } } -#endif //MEMTABLE_H +#endif //SSTABLE_IO diff --git a/utils/string/basic_utils.h b/utils/string/basic_utils.h new file mode 100644 index 0000000..fc8213f --- /dev/null +++ b/utils/string/basic_utils.h @@ -0,0 +1,35 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef BASIC_UTILS_H +#define BASIC_UTILS_H + +#include +#include + +namespace usub::utils +{ + inline std::string to_string(const usub::utils::Hash128& hash) + { + std::ostringstream oss; + oss << std::hex << std::setfill('0') + << std::setw(16) << hash.high + << std::setw(16) << hash.low; + return oss.str(); + } +} + +namespace std +{ + template <> + struct hash + { + size_t operator()(const usub::utils::Hash128& h) const noexcept + { + return static_cast(h.high ^ h.low); + } + }; +} + +#endif //BASIC_UTILS_H