From b6cefc8536b936a2029a6ebfafe86334bfd9bdae Mon Sep 17 00:00:00 2001 From: g2px1 Date: Sun, 20 Apr 2025 18:24:28 +0300 Subject: [PATCH] almost done db --- .gitmodules | 6 + core/Memtable.cpp | 11 + core/Memtable.h | 140 ++++++++ utils/datastructures/LFSkipList.cpp | 5 + utils/datastructures/LFSkipList.h | 289 +++++++++++++++++ utils/hash/Hash128.h | 53 ++++ utils/hash/xxhash | 1 + utils/io/Compactor.cpp | 77 +++++ utils/io/Compactor.h | 42 +++ utils/io/RecoveryLog.cpp | 69 ++++ utils/io/RecoveryLog.h | 37 +++ utils/io/SSTableIO.cpp | 5 + utils/io/SSTableIO.h | 476 ++++++++++++++++++++++++++++ utils/io/VersionManager.cpp | 75 +++++ utils/io/VersionManager.h | 44 +++ utils/io/Wal.cpp | 43 +++ utils/io/Wal.h | 28 ++ utils/toml | 1 + 18 files changed, 1402 insertions(+) create mode 100644 .gitmodules create mode 100644 core/Memtable.cpp create mode 100644 core/Memtable.h create mode 100644 utils/datastructures/LFSkipList.cpp create mode 100644 utils/datastructures/LFSkipList.h create mode 100644 utils/hash/Hash128.h create mode 160000 utils/hash/xxhash create mode 100644 utils/io/Compactor.cpp create mode 100644 utils/io/Compactor.h create mode 100644 utils/io/RecoveryLog.cpp create mode 100644 utils/io/RecoveryLog.h create mode 100644 utils/io/SSTableIO.cpp create mode 100644 utils/io/SSTableIO.h create mode 100644 utils/io/VersionManager.cpp create mode 100644 utils/io/VersionManager.h create mode 100644 utils/io/Wal.cpp create mode 100644 utils/io/Wal.h create mode 160000 utils/toml diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5d64232 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "utils/toml"] + path = utils/toml + url = https://github.com/marzer/tomlplusplus.git +[submodule "utils/hash/xxhash"] + path = utils/hash/xxhash + url = https://github.com/Cyan4973/xxHash.git diff --git a/core/Memtable.cpp b/core/Memtable.cpp new file mode 100644 index 0000000..2e8f3e3 --- /dev/null +++ b/core/Memtable.cpp @@ -0,0 +1,11 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "Memtable.h" + + +namespace usub::shared_storage +{ +} // shared_storage +// usub diff --git a/core/Memtable.h b/core/Memtable.h new file mode 100644 index 0000000..56b965c --- /dev/null +++ b/core/Memtable.h @@ -0,0 +1,140 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef MEMTABLE_H +#define MEMTABLE_H + +#include +#include +#include +#include "utils/io/Wal.h" + +namespace usub::shared_storage +{ + template + class MemTableManager + { + public: + MemTableManager(const std::string& wal_file, size_t max_size); + + ~MemTableManager(); + + void put(const typename SkipList::key_type& key, const typename SkipList::value_type& value); + + void remove(const typename SkipList::key_type& key); + + std::optional get(const typename SkipList::key_type& key); + + void flush(); + + void flush_batch(); + + private: + std::atomic active_memtable; + utils::WAL wal; + size_t max_memtable_size; + std::mutex batch_mutex; + std::vector> write_batch; + std::atomic flushing{false}; + + private: + size_t estimate_memtable_size() const; + + size_t estimate_batch_size() const; + }; + + template + MemTableManager::MemTableManager(const std::string& wal_file, size_t max_size) : wal(wal_file), + max_memtable_size(max_size) + { + this->active_memtable.store(new SkipList()); + } + + template + MemTableManager::~MemTableManager() + { + delete this->active_memtable.load(); + } + + template + void MemTableManager::put(const typename SkipList::key_type& key, + const typename SkipList::value_type& value) + { + { + std::lock_guard lock(batch_mutex); + write_batch.emplace_back(key, value); + } + if (estimate_batch_size() >= 64) + { + flush_batch(); + } + } + + template + void MemTableManager::remove(const typename SkipList::key_type& key) + { + this->wal.write_delete(key); + this->active_memtable.load()->erase(key); + if (estimate_memtable_size() > this->max_memtable_size) + { + flush(); + } + } + + template + std::optional MemTableManager::get(const typename SkipList::key_type& key) + { + return this->active_memtable.load()->find(key); + } + + template + void MemTableManager::flush() + { + if (this->flushing.exchange(true)) return; + + auto old_memtable = this->active_memtable.exchange(new SkipList()); + std::string filename = "sstable_" + std::to_string(std::time(nullptr)) + ".dat"; + write_sstable_v2(*old_memtable, filename); + delete old_memtable; + this->wal.close(); + this->flushing.store(false); + } + + template + void MemTableManager::flush_batch() + { + std::vector> local_batch; + { + std::lock_guard lock(batch_mutex); + local_batch.swap(write_batch); + } + + for (const auto& [key, value] : local_batch) + { + wal.write_put(key, value); + active_memtable.load()->insert(key, value); + } + + if (estimate_memtable_size() > max_memtable_size) + { + flush(); + } + } + + template + size_t MemTableManager::estimate_memtable_size() const + { + // For simplicity: count the number of elements * average size + return this->active_memtable.load()->unsafe_size() * 128; // The error is acceptable + } + + template + size_t MemTableManager::estimate_batch_size() const + { + return write_batch.size(); + } +} // shared_storage +// usub + +#endif //MEMTABLE_H diff --git a/utils/datastructures/LFSkipList.cpp b/utils/datastructures/LFSkipList.cpp new file mode 100644 index 0000000..19db018 --- /dev/null +++ b/utils/datastructures/LFSkipList.cpp @@ -0,0 +1,5 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "LFSkipList.h" diff --git a/utils/datastructures/LFSkipList.h b/utils/datastructures/LFSkipList.h new file mode 100644 index 0000000..f87354b --- /dev/null +++ b/utils/datastructures/LFSkipList.h @@ -0,0 +1,289 @@ +#ifndef LFSKIPLIST_H +#define LFSKIPLIST_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils/io/VersionManager.h" + +#if defined(__x86_64__) || defined(_M_X64) || defined(__i386) || defined(_M_IX86) +#include +inline void cpu_relax() noexcept { _mm_pause(); } +inline void prefetch_for_read(const void* ptr) noexcept { + _mm_prefetch(reinterpret_cast(ptr), _MM_HINT_T0); +} +#else +inline void cpu_relax() noexcept +{ +} + +inline void prefetch_for_read(const void*) noexcept +{ +} +#endif + +namespace usub::utils +{ + constexpr int MAX_LEVEL = 16; + + inline int random_level() + { + static thread_local std::mt19937 rng(std::random_device{}()); + static thread_local std::uniform_int_distribution dist(0, 1); + + int lvl = 1; + while (lvl < MAX_LEVEL && dist(rng)) ++lvl; + return lvl; + } + + template + class LFSkipList + { + struct Node + { + Key key; + Value value; + int topLevel; + bool is_tombstone; + uint64_t version; + std::array, MAX_LEVEL> next; + std::atomic marked{false}; + + Node(const Key& k, const Value& v, int level, bool tombstone, uint64_t ver) + : key(k), value(v), topLevel(level), is_tombstone(tombstone), version(ver) + { + for (int i = 0; i < MAX_LEVEL; ++i) + next[i].store(nullptr, std::memory_order_relaxed); + } + }; + + Node* head; + usub::utils::VersionManager& version_manager; + + public: + using key_type = Key; + using value_type = Value; + + LFSkipList(usub::utils::VersionManager& vm) + : version_manager(vm) + { + head = new Node(std::numeric_limits::min(), Value{}, MAX_LEVEL, false, version_manager.next_version()); + } + + ~LFSkipList() + { + Node* curr = head; + while (curr) + { + Node* next = next_node(curr->next[0].load(std::memory_order_relaxed)); + delete curr; + curr = next; + } + } + + bool insert(const Key& key, const Value& value) + { + return insert_internal(key, value, false); + } + + bool erase(const Key& key) + { + return insert_internal(key, Value{}, true); + } + + std::optional find(const Key& key) const + { + Node* best = nullptr; + Node* node = head->next[0].load(std::memory_order_acquire); + + while (node) + { + prefetch_for_read(node); + if (node->key == key && !node->marked.load(std::memory_order_acquire)) + { + if (!best || node->version > best->version) + { + best = node; + } + } + node = next_node(node->next[0].load(std::memory_order_acquire)); + } + + if (best && !best->is_tombstone) + return best->value; + return std::nullopt; + } + + template + void for_each(F&& func) const + { + Node* node = head->next[0].load(std::memory_order_acquire); + while (node) + { + prefetch_for_read(node); + if (!node->marked.load(std::memory_order_acquire) && !node->is_tombstone) + { + func(node->key, node->value); + } + node = next_node(node->next[0].load(std::memory_order_acquire)); + } + } + + template + void for_each_raw(F&& func) const + { + Node* node = head->next[0].load(std::memory_order_acquire); + while (node) + { + prefetch_for_read(node); + if (!node->marked.load(std::memory_order_acquire)) + { + func(node->key, node->value, node->is_tombstone, node->version); + } + node = next_node(node->next[0].load(std::memory_order_acquire)); + } + } + + bool insert_raw(const Key& key, const Value& value, bool tombstone, uint64_t version) + { + Node* preds[MAX_LEVEL]{}; + Node* succs[MAX_LEVEL]{}; + + while (true) + { + bool found = find_internal(key, preds, succs); + + int topLevel = random_level(); + Node* newNode = new Node(key, value, topLevel, tombstone, version); + + for (int i = 0; i < topLevel; ++i) + newNode->next[i].store(succs[i], std::memory_order_relaxed); + + if (!preds[0]->next[0].compare_exchange_strong( + succs[0], newNode, std::memory_order_acq_rel, std::memory_order_relaxed)) + { + delete newNode; + cpu_relax(); + continue; + } + + for (int i = 1; i < topLevel; ++i) + { + while (true) + { + if (preds[i]->next[i].compare_exchange_strong( + succs[i], newNode, std::memory_order_acq_rel, std::memory_order_relaxed)) + break; + cpu_relax(); + find_internal(key, preds, succs); + } + } + + return !found || tombstone; + } + } + + [[nodiscard]] size_t unsafe_size() const + { + size_t count = 0; + Node* node = head->next[0].load(std::memory_order_relaxed); + while (node) + { + if (!node->marked.load(std::memory_order_relaxed) && !node->is_tombstone) + ++count; + node = next_node(node->next[0].load(std::memory_order_relaxed)); + } + return count; + } + + private: + bool insert_internal(const Key& key, const Value& value, bool tombstone) + { + Node* preds[MAX_LEVEL]{}; + Node* succs[MAX_LEVEL]{}; + + while (true) + { + bool found = find_internal(key, preds, succs); + + int topLevel = random_level(); + Node* newNode = new Node(key, value, topLevel, tombstone, version_manager.next_version()); + + for (int i = 0; i < topLevel; ++i) + newNode->next[i].store(succs[i], std::memory_order_relaxed); + + if (!preds[0]->next[0].compare_exchange_strong( + succs[0], newNode, std::memory_order_acq_rel, std::memory_order_relaxed)) + { + delete newNode; + cpu_relax(); + continue; + } + + for (int i = 1; i < topLevel; ++i) + { + while (true) + { + if (preds[i]->next[i].compare_exchange_strong( + succs[i], newNode, std::memory_order_acq_rel, std::memory_order_relaxed)) + break; + cpu_relax(); + find_internal(key, preds, succs); + } + } + + return !found || tombstone; + } + } + + bool find_internal(const Key& key, Node** preds, Node** succs) const + { + bool found = false; + Node* pred = head; + + for (int level = MAX_LEVEL - 1; level >= 0; --level) + { + Node* curr = pred->next[level].load(std::memory_order_acquire); + while (curr) + { + prefetch_for_read(curr); + Node* next = curr->next[level].load(std::memory_order_acquire); + if (reinterpret_cast(next) & 1) + { + curr = next_node(next); + continue; + } + if (curr->key < key) + { + pred = curr; + curr = next; + } + else + { + break; + } + } + preds[level] = pred; + succs[level] = curr; + } + + if (succs[0] && succs[0]->key == key && !succs[0]->marked.load(std::memory_order_acquire)) + found = true; + + return found; + } + + static Node* next_node(Node* n) + { + return reinterpret_cast(reinterpret_cast(n) & ~uintptr_t(1)); + } + }; +} // namespace usub::utils + +#endif //LFSKIPLIST_H diff --git a/utils/hash/Hash128.h b/utils/hash/Hash128.h new file mode 100644 index 0000000..62b90f9 --- /dev/null +++ b/utils/hash/Hash128.h @@ -0,0 +1,53 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef HASHSTRUCTURE_H +#define HASHSTRUCTURE_H + +#include +#include +#include + +#include "utils/hash/xxhash/xxhash.h" + + +namespace usub::utils +{ + struct alignas(16) Hash128 + { + uint64_t low; + uint64_t high; + + Hash128() : low(0), high(0) + { + } + + bool operator<(const Hash128& other) const + { + return (this->high == other.high) ? (this->low < other.low) : (this->high < other.high); + } + + bool operator==(const Hash128& other) const + { + return this->low == other.low && this->high == other.high; + } + }; + + inline Hash128 compute_hash128(const void* data, size_t length) + { + Hash128 h; + XXH128_hash_t hash = XXH3_128bits(data, length); + h.low = hash.low64; + h.high = hash.high64; + return h; + } + + inline Hash128 compute_hash128(const std::string& s) + { + return compute_hash128(s.data(), s.size()); + } +} // namespace usub::utils + + +#endif //HASHSTRUCTURE_H diff --git a/utils/hash/xxhash b/utils/hash/xxhash new file mode 160000 index 0000000..953a09a --- /dev/null +++ b/utils/hash/xxhash @@ -0,0 +1 @@ +Subproject commit 953a09abc39096da9e216b6eb0002c681cdc1199 diff --git a/utils/io/Compactor.cpp b/utils/io/Compactor.cpp new file mode 100644 index 0000000..b131de0 --- /dev/null +++ b/utils/io/Compactor.cpp @@ -0,0 +1,77 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "Compactor.h" + +namespace usub::utils +{ + Compactor::Compactor(VersionManager& vm) : version_manager(vm) + { + this->background_thread = std::thread([this] { this->run(); }); + } + + void Compactor::add_sstable_l0(const std::string& filename) + + { + this->level0_files.push_back(filename); + if (this->level0_files.size() >= 4) + { + compact_level0(); + } + } + + void Compactor::compact_level0() + { + if (this->level0_files.size() < 2) return; + + std::string merged_filename = "L1_" + std::to_string(std::time(nullptr)) + ".dat"; + + LFSkipList merged(this->version_manager); + for (const auto& file : this->level0_files) + { + read_sstable_with_mmap(merged, file); + } + + write_sstable_with_index(merged, merged_filename); + + this->level1_files.push_back(merged_filename); + for (const auto& file : this->level0_files) + { + ::remove(file.c_str()); + } + this->level0_files.clear(); + } + + void Compactor::compact_level1() + { + if (this->level1_files.size() < 4) return; + + std::string merged_filename = "L2_" + std::to_string(std::time(nullptr)) + ".dat"; + + LFSkipList merged(this->version_manager); + for (const auto& file : this->level1_files) + { + read_sstable_with_mmap(merged, file); + } + + write_sstable_with_index(merged, merged_filename); + + this->level2_files.push_back(merged_filename); + for (const auto& file : this->level1_files) + { + ::remove(file.c_str()); + } + this->level1_files.clear(); + } + + void Compactor::run() + { + while (this->running.load()) + { + compact_level0(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } +} // utils +// usub diff --git a/utils/io/Compactor.h b/utils/io/Compactor.h new file mode 100644 index 0000000..a0aef73 --- /dev/null +++ b/utils/io/Compactor.h @@ -0,0 +1,42 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef COMPACTOR_H +#define COMPACTOR_H + +#include +#include +#include +#include "utils/datastructures/LFSkipList.h" +#include "utils/io/SSTableIO.h" + +namespace usub::utils +{ + class Compactor + { + public: + explicit Compactor(VersionManager& vm); + + void add_sstable_l0(const std::string& filename); + + + void run(); + + private: + void compact_level0(); + + void compact_level1(); + + private: + std::vector level0_files; + std::vector level1_files; + std::vector level2_files; + std::atomic running{true}; + std::thread background_thread; + VersionManager& version_manager; + }; // utils +} // usub + + +#endif //COMPACTOR_H diff --git a/utils/io/RecoveryLog.cpp b/utils/io/RecoveryLog.cpp new file mode 100644 index 0000000..94765a8 --- /dev/null +++ b/utils/io/RecoveryLog.cpp @@ -0,0 +1,69 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "RecoveryLog.h" + + +namespace usub::utils +{ + RecoveryLog::RecoveryLog(const std::string& dbname) + : db_name(dbname), + metadata_dir("metadata/" + db_name + "/"), + log_file(metadata_dir + "recovery.log") + { + ensure_metadata_dir(); + this->log_out.open(this->log_file, std::ios::binary | std::ios::app); + if (!this->log_out.is_open()) + { + throw std::runtime_error("Failed to open recovery log for " + this->db_name); + } + } + + RecoveryLog::~RecoveryLog() + { + if (this->log_out.is_open()) + { + this->log_out.close(); + } + } + + void RecoveryLog::log_put(const std::string& key, const std::string& value) + { + uint8_t op = 0; + uint32_t key_len = key.size(); + uint32_t value_len = value.size(); + this->log_out.write(reinterpret_cast(&op), sizeof(op)); + this->log_out.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->log_out.write(key.data(), key_len); + this->log_out.write(reinterpret_cast(&value_len), sizeof(value_len)); + this->log_out.write(value.data(), value_len); + this->log_out.flush(); + } + + void RecoveryLog::log_delete(const std::string& key) + { + uint8_t op = 1; + uint32_t key_len = key.size(); + this->log_out.write(reinterpret_cast(&op), sizeof(op)); + this->log_out.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->log_out.write(key.data(), key_len); + this->log_out.flush(); + } + + void RecoveryLog::ensure_metadata_dir() + { + try + { + if (!std::filesystem::exists(this->metadata_dir)) + { + std::filesystem::create_directories(this->metadata_dir); + } + } + catch (const std::exception& e) + { + std::cerr << "Failed to create metadata dir: " << e.what() << std::endl; + } + } +} // utils +// usub diff --git a/utils/io/RecoveryLog.h b/utils/io/RecoveryLog.h new file mode 100644 index 0000000..988cc2c --- /dev/null +++ b/utils/io/RecoveryLog.h @@ -0,0 +1,37 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef RECOVERYLOG_H +#define RECOVERYLOG_H + +#include +#include +#include + +namespace usub::utils +{ + class RecoveryLog + { + public: + explicit RecoveryLog(const std::string& dbname); + + ~RecoveryLog(); + + void log_put(const std::string& key, const std::string& value); + + void log_delete(const std::string& key); + + private: + void ensure_metadata_dir(); + + private: + std::string db_name; + std::string metadata_dir; + std::string log_file; + std::ofstream log_out; + }; +} // utils +// usub + +#endif //RECOVERYLOG_H diff --git a/utils/io/SSTableIO.cpp b/utils/io/SSTableIO.cpp new file mode 100644 index 0000000..f9de0d5 --- /dev/null +++ b/utils/io/SSTableIO.cpp @@ -0,0 +1,5 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "SSTableIO.h" diff --git a/utils/io/SSTableIO.h b/utils/io/SSTableIO.h new file mode 100644 index 0000000..0a272eb --- /dev/null +++ b/utils/io/SSTableIO.h @@ -0,0 +1,476 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef MEMTABLE_H +#define MEMTABLE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace usub::utils +{ + template + void write_sstable(const SkipList& memtable, const std::string& filename) + { + int fd = ::open(filename.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0644); + if (fd < 0) throw std::runtime_error("Failed to open SSTable"); + + FILE* file = ::fdopen(fd, "wb"); + if (!file) + { + ::close(fd); + throw std::runtime_error("Failed to fdopen SSTable"); + } + + uint64_t current_offset = 0; + std::vector> index_entries; + + memtable.for_each([&](const auto& key, const auto& value) + { + uint8_t is_tombstone = 0; + uint64_t version = 0; + + if constexpr (requires { value.is_tombstone; }) + { + is_tombstone = value.is_tombstone ? 1 : 0; + version = value.version; + } + + uint32_t key_len = sizeof(key); + uint32_t value_len = value.size(); + + ::fwrite(&key_len, sizeof(key_len), 1, file); + ::fwrite(&key, key_len, 1, file); + ::fwrite(&value_len, sizeof(value_len), 1, file); + ::fwrite(value.data(), value_len, 1, file); + ::fwrite(&is_tombstone, sizeof(is_tombstone), 1, file); + ::fwrite(&version, sizeof(version), 1, file); + + index_entries.emplace_back(key, current_offset); + current_offset += sizeof(key_len) + key_len + sizeof(value_len) + value_len + sizeof(is_tombstone) + sizeof( + version); + }); + + uint64_t index_offset = current_offset; + + for (const auto& [key, offset] : index_entries) + { + uint32_t key_len = sizeof(key); + ::fwrite(&key_len, sizeof(key_len), 1, file); + ::fwrite(&key, key_len, 1, file); + ::fwrite(&offset, sizeof(offset), 1, file); + } + + ::fwrite(&index_offset, sizeof(index_offset), 1, file); + + ::fflush(file); + ::fsync(fd); + ::fclose(file); + } + + + template + void read_sstable(SkipList& memtable, const std::string& filename) + { + std::ifstream sstable(filename, std::ios::binary); + if (!sstable.is_open()) + { + throw std::runtime_error("Failed to open file: " + filename); + } + + while (sstable.peek() != EOF) + { + uint32_t key_len = 0; + sstable.read(reinterpret_cast(&key_len), sizeof(key_len)); + + if (key_len != sizeof(typename SkipList::key_type)) + { + throw std::runtime_error("Key size mismatch"); + } + + typename SkipList::key_type key{}; + sstable.read(reinterpret_cast(&key), key_len); + + uint32_t value_len = 0; + sstable.read(reinterpret_cast(&value_len), sizeof(value_len)); + + std::string value(value_len, '\0'); + sstable.read(value.data(), value_len); + + memtable.insert(key, value); + } + } + + template + void write_sstable_with_index(const SkipList& memtable, const std::string& filename) + { + int fd = ::open(filename.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0644); + if (fd < 0) + { + throw std::runtime_error("Failed to open file: " + filename); + } + + FILE* file = ::fdopen(fd, "wb"); + if (!file) + { + ::close(fd); + throw std::runtime_error("Failed to fdopen file: " + filename); + } + + std::vector> index_entries; + uint64_t current_offset = 0; + + memtable.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version) + { + uint32_t key_len = key.size(); + uint32_t value_len = value.size(); + uint8_t tombstone_flag = is_tombstone ? 1 : 0; + + ::fwrite(&key_len, sizeof(key_len), 1, file); + ::fwrite(key.data(), key_len, 1, file); + ::fwrite(&value_len, sizeof(value_len), 1, file); + ::fwrite(value.data(), value_len, 1, file); + ::fwrite(&tombstone_flag, sizeof(tombstone_flag), 1, file); + ::fwrite(&version, sizeof(version), 1, file); + + index_entries.emplace_back(key, current_offset); + + current_offset += sizeof(key_len) + key_len + sizeof(value_len) + value_len + sizeof(tombstone_flag) + + sizeof(version); + }); + + uint64_t index_start_offset = current_offset; + + // Записываем индекс + for (const auto& [key, offset] : index_entries) + { + uint32_t key_len = key.size(); + ::fwrite(&key_len, sizeof(key_len), 1, file); + ::fwrite(key.data(), key_len, 1, file); + ::fwrite(&offset, sizeof(offset), 1, file); + } + + ::fwrite(&index_start_offset, sizeof(index_start_offset), 1, file); + + ::fflush(file); + ::fsync(fd); + ::fclose(file); + } + + template + void read_sstable_with_index(SkipList& memtable, const std::string& filename) + { + std::ifstream sstable(filename, std::ios::binary); + if (!sstable.is_open()) + { + throw std::runtime_error("Failed to open file: " + filename); + } + + sstable.seekg(-sizeof(uint64_t), std::ios::end); + uint64_t index_offset = 0; + sstable.read(reinterpret_cast(&index_offset), sizeof(index_offset)); + + sstable.seekg(index_offset, std::ios::beg); + + std::vector> index_entries; + + while (sstable.tellg() < static_cast(sstable.end)) + { + uint32_t key_len; + if (!sstable.read(reinterpret_cast(&key_len), sizeof(key_len))) + break; + + typename SkipList::key_type key{}; + sstable.read(reinterpret_cast(&key), key_len); + + uint64_t offset = 0; + sstable.read(reinterpret_cast(&offset), sizeof(offset)); + + index_entries.emplace_back(key, offset); + } + + for (const auto& [key, offset] : index_entries) + { + sstable.seekg(offset, std::ios::beg); + + uint32_t key_len = 0; + sstable.read(reinterpret_cast(&key_len), sizeof(key_len)); + + typename SkipList::key_type file_key{}; + sstable.read(reinterpret_cast(&file_key), key_len); + + uint32_t value_len = 0; + sstable.read(reinterpret_cast(&value_len), sizeof(value_len)); + + std::string value(value_len, '\0'); + sstable.read(value.data(), value_len); + + memtable.insert(file_key, value); + } + } + + template + void range_query_sstable(const std::string& filename, + const typename SkipList::key_type& from_key, + const typename SkipList::key_type& to_key, + Callback&& callback) + { + std::ifstream sstable(filename, std::ios::binary); + if (!sstable.is_open()) + { + throw std::runtime_error("Failed to open file: " + filename); + } + + sstable.seekg(-sizeof(uint64_t), std::ios::end); + uint64_t index_offset = 0; + sstable.read(reinterpret_cast(&index_offset), sizeof(index_offset)); + + sstable.seekg(index_offset, std::ios::beg); + + std::vector> index_entries; + + while (sstable.peek() != EOF) + { + uint32_t key_len; + if (!sstable.read(reinterpret_cast(&key_len), sizeof(key_len))) + break; + + typename SkipList::key_type key{}; + sstable.read(reinterpret_cast(&key), key_len); + + uint64_t offset = 0; + sstable.read(reinterpret_cast(&offset), sizeof(offset)); + + index_entries.emplace_back(key, offset); + } + + auto it = std::lower_bound(index_entries.begin(), index_entries.end(), from_key, + [](const auto& pair, const auto& key) { return pair.first < key; }); + + for (; it != index_entries.end() && it->first <= to_key; ++it) + { + sstable.seekg(it->second, std::ios::beg); + + uint32_t key_len; + sstable.read(reinterpret_cast(&key_len), sizeof(key_len)); + + typename SkipList::key_type file_key{}; + sstable.read(reinterpret_cast(&file_key), key_len); + + uint32_t value_len; + sstable.read(reinterpret_cast(&value_len), sizeof(value_len)); + + std::string value(value_len, '\0'); + sstable.read(value.data(), value_len); + + callback(file_key, value); + } + } + + template + void replay_wal(SkipList& memtable, const std::string& wal_filename) + { + std::ifstream wal(wal_filename, std::ios::binary); + if (!wal.is_open()) + { + throw std::runtime_error("Failed to open WAL file: " + wal_filename); + } + + while (wal.peek() != EOF) + { + uint8_t op; + wal.read(reinterpret_cast(&op), sizeof(op)); + + uint32_t key_len; + wal.read(reinterpret_cast(&key_len), sizeof(key_len)); + + std::string key(key_len, '\0'); + wal.read(key.data(), key_len); + + if (op == 0) + { + // PUT + uint32_t value_len; + wal.read(reinterpret_cast(&value_len), sizeof(value_len)); + std::string value(value_len, '\0'); + wal.read(value.data(), value_len); + memtable.insert(key, value); + } + else if (op == 1) + { + memtable.erase(key); + } + else + { + throw std::runtime_error("Unknown WAL operation code"); + } + } + } + + template + void read_sstable_with_mmap(SkipList& memtable, const std::string& filename) + { + int fd = ::open(filename.c_str(), O_RDONLY); + if (fd < 0) throw std::runtime_error("Failed to open SSTable"); + + struct stat st; + if (fstat(fd, &st) != 0) + { + ::close(fd); + throw std::runtime_error("Failed to stat SSTable"); + } + + void* data = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (data == MAP_FAILED) + { + ::close(fd); + throw std::runtime_error("Failed to mmap SSTable"); + } + + const char* ptr = reinterpret_cast(data); + const char* end = ptr + st.st_size; + + uint64_t index_offset = *reinterpret_cast(end - sizeof(uint64_t)); + const char* index_ptr = ptr + index_offset; + + std::vector> index_entries; + while (index_ptr < end - sizeof(uint64_t)) + { + uint32_t key_len = *reinterpret_cast(index_ptr); + index_ptr += sizeof(uint32_t); + + std::string key(key_len, '\0'); + std::memcpy(key.data(), index_ptr, key_len); + index_ptr += key_len; + + uint64_t offset = *reinterpret_cast(index_ptr); + index_ptr += sizeof(uint64_t); + + index_entries.emplace_back(key, offset); + } + + for (const auto& [key, offset] : index_entries) + { + const char* record = ptr + offset; + + uint32_t key_len = *reinterpret_cast(record); + record += sizeof(uint32_t); + + std::string file_key(key_len, '\0'); + std::memcpy(file_key.data(), record, key_len); + record += key_len; + + uint32_t value_len = *reinterpret_cast(record); + record += sizeof(uint32_t); + + std::string value(value_len, '\0'); + std::memcpy(value.data(), record, value_len); + record += value_len; + + uint8_t tombstone_flag = *reinterpret_cast(record); + record += sizeof(uint8_t); + + uint64_t version = *reinterpret_cast(record); + record += sizeof(uint64_t); + + memtable.insert_raw(file_key, value, tombstone_flag == 1, version); + } + + munmap(data, st.st_size); + ::close(fd); + } + + template + void optimized_range_query_sstable(const std::string& filename, + const typename SkipList::key_type& from_key, + const typename SkipList::key_type& to_key, + Callback&& callback) + { + int fd = ::open(filename.c_str(), O_RDONLY); + if (fd < 0) throw std::runtime_error("Failed to open file"); + + struct stat st; + if (fstat(fd, &st) != 0) + { + ::close(fd); + throw std::runtime_error("Failed to stat file"); + } + + void* data = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + if (data == MAP_FAILED) + { + ::close(fd); + throw std::runtime_error("Failed to mmap file"); + } + + const char* ptr = reinterpret_cast(data); + const char* end = ptr + st.st_size; + + uint64_t index_offset = *reinterpret_cast(end - sizeof(uint64_t)); + const char* index_ptr = ptr + index_offset; + + std::vector> index_entries; + while (index_ptr < end - sizeof(uint64_t)) + { + uint32_t key_len = *reinterpret_cast(index_ptr); + index_ptr += sizeof(uint32_t); + + typename SkipList::key_type key; + std::memcpy(&key, index_ptr, key_len); + index_ptr += key_len; + + uint64_t offset = *reinterpret_cast(index_ptr); + index_ptr += sizeof(uint64_t); + + index_entries.emplace_back(key, offset); + } + + // lower_bound по from_key + auto it = std::lower_bound(index_entries.begin(), index_entries.end(), from_key, + [](const auto& pair, const auto& key) { return pair.first < key; }); + + for (; it != index_entries.end() && it->first <= to_key; ++it) + { + const char* record = ptr + it->second; + + uint32_t key_len = *reinterpret_cast(record); + record += sizeof(uint32_t); + + typename SkipList::key_type file_key; + std::memcpy(&file_key, record, key_len); + record += key_len; + + uint32_t value_len = *reinterpret_cast(record); + record += sizeof(uint32_t); + + std::string value(value_len, '\0'); + std::memcpy(value.data(), record, value_len); + record += value_len; + + uint8_t is_tombstone = *reinterpret_cast(record); + record += sizeof(uint8_t); + + uint64_t version = *reinterpret_cast(record); + record += sizeof(uint64_t); + + if (!is_tombstone) + { + callback(file_key, value); + } + } + + munmap(data, st.st_size); + ::close(fd); + } +} + +#endif //MEMTABLE_H diff --git a/utils/io/VersionManager.cpp b/utils/io/VersionManager.cpp new file mode 100644 index 0000000..12167ac --- /dev/null +++ b/utils/io/VersionManager.cpp @@ -0,0 +1,75 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "VersionManager.h" + +namespace usub::utils +{ + VersionManager::VersionManager(const std::string& dbname) + : db_name(dbname), + metadata_dir("metadata/" + db_name + "/"), + version_file(metadata_dir + "version.meta") + { + ensure_metadata_dir(); + load_version(); + } + + uint64_t VersionManager::next_version() + { + return this->version.fetch_add(1, std::memory_order_relaxed); + } + + void VersionManager::ensure_metadata_dir() + { + try + { + if (!std::filesystem::exists(this->metadata_dir)) + { + std::filesystem::create_directories(this->metadata_dir); + } + } + catch (const std::exception& e) + { + std::cerr << "Failed to create metadata dir: " << e.what() << std::endl; + } + } + + void VersionManager::load_version() + { + std::ifstream f(this->version_file, std::ios::binary); + if (f.is_open()) + { + uint64_t v = 0; + f.read(reinterpret_cast(&v), sizeof(v)); + if (f.good() && v > 0) + { + this->version.store(v, std::memory_order_relaxed); + } + else + { + std::cerr << "Warning: empty or corrupted version file for " << this->db_name << "\n"; + } + } + else + { + std::cerr << "Warning: no version file for " << this->db_name << ", starting fresh\n"; + } + } + + void VersionManager::save_version() + { + std::ofstream f(this->version_file, std::ios::binary | std::ios::trunc); + if (!f.is_open()) + { + std::cerr << "Error: cannot save version for " << this->db_name << "\n"; + return; + } + uint64_t v = this->version.load(std::memory_order_relaxed); + f.write(reinterpret_cast(&v), sizeof(v)); + if (!f.good()) + { + std::cerr << "Error: cannot write version for " << this->db_name << "\n"; + } + } +} diff --git a/utils/io/VersionManager.h b/utils/io/VersionManager.h new file mode 100644 index 0000000..1eb3dbb --- /dev/null +++ b/utils/io/VersionManager.h @@ -0,0 +1,44 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef VERSIONMANAGER_H +#define VERSIONMANAGER_H + +#include +#include +#include +#include +#include +#include + +namespace usub::utils +{ + class VersionManager + { + public: + explicit VersionManager(const std::string& dbname); + + ~VersionManager() + { + save_version(); + } + + uint64_t next_version(); + + private: + void ensure_metadata_dir(); + + void load_version(); + + void save_version(); + + private: + std::atomic version{1}; + std::string db_name; + std::string metadata_dir; + std::string version_file; + }; +} // namespace usub::utils + +#endif //VERSIONMANAGER_H diff --git a/utils/io/Wal.cpp b/utils/io/Wal.cpp new file mode 100644 index 0000000..ec6fc13 --- /dev/null +++ b/utils/io/Wal.cpp @@ -0,0 +1,43 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#include "Wal.h" + +namespace usub::utils +{ + WAL::WAL(const std::string& filename) + { + this->out.open(filename, std::ios::binary | std::ios::app); + if (!this->out.is_open()) throw std::runtime_error("Failed to open WAL"); + } + + void WAL::write_put(const std::string& key, const std::string& value) + { + uint8_t op = 0; + uint32_t key_len = key.size(); + uint32_t value_len = value.size(); + this->out.write(reinterpret_cast(&op), sizeof(op)); + this->out.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->out.write(key.data(), key_len); + this->out.write(reinterpret_cast(&value_len), sizeof(value_len)); + this->out.write(value.data(), value_len); + this->out.flush(); + } + + void WAL::write_delete(const std::string& key) + { + uint8_t op = 1; + uint32_t key_len = key.size(); + this->out.write(reinterpret_cast(&op), sizeof(op)); + this->out.write(reinterpret_cast(&key_len), sizeof(key_len)); + this->out.write(key.data(), key_len); + this->out.flush(); + } + + void WAL::close() + { + this->out.close(); + } +} // utils +// usub diff --git a/utils/io/Wal.h b/utils/io/Wal.h new file mode 100644 index 0000000..59cc5a0 --- /dev/null +++ b/utils/io/Wal.h @@ -0,0 +1,28 @@ +// +// Created by Kirill Zhukov on 20.04.2025. +// + +#ifndef WAL_H +#define WAL_H + +#include + +namespace usub::utils +{ + class WAL + { + std::ofstream out; + + public: + explicit WAL(const std::string& filename); + + void write_put(const std::string& key, const std::string& value); + + void write_delete(const std::string& key); + + void close(); + }; +} // utils +// usub + +#endif //WAL_H diff --git a/utils/toml b/utils/toml new file mode 160000 index 0000000..fea1d90 --- /dev/null +++ b/utils/toml @@ -0,0 +1 @@ +Subproject commit fea1d905f2d2a8ad830f1985fe879f4fd4601fe5