// // Created by Kirill Zhukov on 20.04.2025. // #include "Compactor.h" #include namespace usub::utils { Compactor::Compactor(VersionManager& vm) : version_manager_(vm), running_(true) { } Compactor::~Compactor() { this->running_ = false; if (this->worker_thread_.joinable()) this->worker_thread_.join(); } void Compactor::add_sstable_l0(const std::string& filename) { this->l0_queue_.push(filename); } void Compactor::run() { this->worker_thread_ = std::thread(&Compactor::background_worker, this); } void Compactor::background_worker() { while (this->running_) { // Queue → 0 for (size_t i = 0; i < 8; ++i) { auto file = this->l0_queue_.pop(); if (file) { std::lock_guard lock(this->levels_mutex_); this->level0_files_.push_back(*file); this->level0_size_.fetch_add(1, std::memory_order_relaxed); } else { break; } } bool did_compact = false; { std::lock_guard lock(this->levels_mutex_); if (this->level0_size_.load(std::memory_order_relaxed) >= 4) { compact_level(this->level0_files_, this->level1_files_, 0); did_compact = true; } } // 1 → 2 { std::lock_guard lock(this->levels_mutex_); if (this->level1_size_.load(std::memory_order_relaxed) >= 4) { compact_level(this->level1_files_, this->level2_files_, 1); did_compact = true; } } if (!did_compact) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } } void Compactor::compact_level(std::vector& source_files, std::vector& dest_files, int level) { if (source_files.empty()) return; size_t batch_size = std::min(4, source_files.size()); std::vector batch(source_files.begin(), source_files.begin() + batch_size); std::vector> loaded; loaded.reserve(batch_size); for (const auto& file : batch) { VersionManager dummy_vm("dummy"); LFSkipList table(dummy_vm); read_sstable_with_mmap(table, file); loaded.push_back(std::move(table)); std::filesystem::remove(file); } LFSkipList merged(this->version_manager_); for (auto& table : loaded) { table.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version) { if (!is_tombstone) merged.insert(key, value); else merged.erase(key); }); } std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" + std::to_string(this->version_manager_.next_version()) + ".dat"; write_sstable_with_index(merged, new_filename); dest_files.push_back(new_filename); source_files.erase(source_files.begin(), source_files.begin() + batch_size); if (level == 0) { this->level0_size_.fetch_sub(batch_size, std::memory_order_relaxed); this->level1_size_.fetch_add(1, std::memory_order_relaxed); } else if (level == 1) { this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); this->level2_size_.fetch_add(1, std::memory_order_relaxed); } this->version_manager_.flush(); } } // utils // usub