SharedStorage/utils/io/Compactor.cpp

143 lines
4.1 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#include "Compactor.h"
#include <mutex>
namespace usub::utils
{
Compactor::Compactor(VersionManager& vm)
: version_manager_(vm), running_(true)
{
}
Compactor::~Compactor()
{
this->running_ = false;
if (this->worker_thread_.joinable())
this->worker_thread_.join();
}
void Compactor::add_sstable_l0(const std::string& filename)
{
this->l0_queue_.push(filename);
}
void Compactor::run()
{
this->worker_thread_ = std::thread(&Compactor::background_worker, this);
}
void Compactor::background_worker()
{
while (this->running_)
{
// Queue → 0
for (size_t i = 0; i < 8; ++i)
{
auto file = this->l0_queue_.pop();
if (file)
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
this->level0_files_.push_back(*file);
this->level0_size_.fetch_add(1, std::memory_order_relaxed);
}
else
{
break;
}
}
bool did_compact = false;
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
{
compact_level(this->level0_files_, this->level1_files_, 0);
did_compact = true;
}
}
// 1 → 2
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
if (this->level1_size_.load(std::memory_order_relaxed) >= 4)
{
compact_level(this->level1_files_, this->level2_files_, 1);
did_compact = true;
}
}
if (!did_compact)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
void Compactor::compact_level(std::vector<std::string>& source_files,
std::vector<std::string>& dest_files, int level)
{
if (source_files.empty())
return;
size_t batch_size = std::min<size_t>(4, source_files.size());
std::vector<std::string> batch(source_files.begin(), source_files.begin() + batch_size);
std::vector<LFSkipList<Hash128, std::string>> loaded;
loaded.reserve(batch_size);
for (const auto& file : batch)
{
VersionManager dummy_vm("dummy");
LFSkipList<Hash128, std::string> table(dummy_vm);
read_sstable_with_mmap(table, file);
loaded.push_back(std::move(table));
std::filesystem::remove(file);
}
LFSkipList<Hash128, std::string> merged(this->version_manager_);
for (auto& table : loaded)
{
table.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version)
{
if (!is_tombstone)
merged.insert(key, value);
else
merged.erase(key);
});
}
std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" +
std::to_string(this->version_manager_.next_version()) + ".dat";
write_sstable_with_index(merged, new_filename);
dest_files.push_back(new_filename);
source_files.erase(source_files.begin(), source_files.begin() + batch_size);
if (level == 0)
{
this->level0_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level1_size_.fetch_add(1, std::memory_order_relaxed);
}
else if (level == 1)
{
this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level2_size_.fetch_add(1, std::memory_order_relaxed);
}
this->version_manager_.flush();
}
} // utils
// usub