SharedStorage/utils/io/Compactor.cpp

124 lines
3.6 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#include "Compactor.h"
#include <mutex>
namespace usub::utils
{
Compactor::Compactor(VersionManager& vm)
: version_manager_(vm), running_(true)
{
}
Compactor::~Compactor()
{
running_ = false;
if (worker_thread_.joinable())
worker_thread_.join();
}
void Compactor::add_sstable_l0(const std::string& filename)
{
l0_queue_.push(filename);
}
void Compactor::run()
{
worker_thread_ = std::thread(&Compactor::background_worker, this);
}
void Compactor::background_worker()
{
while (running_)
{
for (size_t i = 0; i < 8; ++i)
{
auto file = l0_queue_.pop();
if (file)
{
std::lock_guard<std::mutex> lock(levels_mutex_);
level0_files_.push_back(*file);
level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик
}
else
{
break;
}
}
if (level0_size_.load(std::memory_order_relaxed) >= 4)
{
std::lock_guard<std::mutex> lock(levels_mutex_);
compact_level(level0_files_, level1_files_, 0);
}
if (level1_size_.load(std::memory_order_relaxed) >= 4)
{
std::lock_guard<std::mutex> lock(levels_mutex_);
compact_level(level1_files_, level2_files_, 1);
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void Compactor::compact_level(std::vector<std::string>& source_files,
std::vector<std::string>& dest_files, int level)
{
if (source_files.empty())
return;
size_t batch_size = std::min<size_t>(4, source_files.size());
std::vector<std::string> batch(source_files.begin(), source_files.begin() + batch_size);
std::vector<LFSkipList<Hash128, std::string>> loaded;
loaded.reserve(batch_size);
for (const auto& file : batch)
{
VersionManager dummy_vm("dummy");
LFSkipList<Hash128, std::string> table(dummy_vm);
read_sstable_with_mmap(table, file);
loaded.push_back(std::move(table));
std::filesystem::remove(file);
}
LFSkipList<Hash128, std::string> merged(version_manager_);
for (auto& table : loaded)
{
table.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version)
{
if (!is_tombstone)
merged.insert(key, value);
else
merged.erase(key);
});
}
std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" +
std::to_string(version_manager_.next_version()) + ".dat";
write_sstable_with_index(merged, new_filename);
dest_files.push_back(new_filename);
source_files.erase(source_files.begin(), source_files.begin() + batch_size);
if (level == 0)
{
level0_size_.fetch_sub(batch_size, std::memory_order_relaxed);
level1_size_.fetch_add(1, std::memory_order_relaxed);
}
else if (level == 1)
{
level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
level2_size_.fetch_add(1, std::memory_order_relaxed);
}
}
} // utils
// usub