124 lines
3.6 KiB
C++
124 lines
3.6 KiB
C++
//
|
|
// Created by Kirill Zhukov on 20.04.2025.
|
|
//
|
|
|
|
#include "Compactor.h"
|
|
|
|
#include <mutex>
|
|
|
|
namespace usub::utils
|
|
{
|
|
Compactor::Compactor(VersionManager& vm)
|
|
: version_manager_(vm), running_(true)
|
|
{
|
|
}
|
|
|
|
Compactor::~Compactor()
|
|
{
|
|
running_ = false;
|
|
if (worker_thread_.joinable())
|
|
worker_thread_.join();
|
|
}
|
|
|
|
void Compactor::add_sstable_l0(const std::string& filename)
|
|
{
|
|
l0_queue_.push(filename);
|
|
}
|
|
|
|
void Compactor::run()
|
|
{
|
|
worker_thread_ = std::thread(&Compactor::background_worker, this);
|
|
}
|
|
|
|
void Compactor::background_worker()
|
|
{
|
|
while (running_)
|
|
{
|
|
for (size_t i = 0; i < 8; ++i)
|
|
{
|
|
auto file = l0_queue_.pop();
|
|
if (file)
|
|
{
|
|
std::lock_guard<std::mutex> lock(levels_mutex_);
|
|
level0_files_.push_back(*file);
|
|
level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (level0_size_.load(std::memory_order_relaxed) >= 4)
|
|
{
|
|
std::lock_guard<std::mutex> lock(levels_mutex_);
|
|
compact_level(level0_files_, level1_files_, 0);
|
|
}
|
|
|
|
if (level1_size_.load(std::memory_order_relaxed) >= 4)
|
|
{
|
|
std::lock_guard<std::mutex> lock(levels_mutex_);
|
|
compact_level(level1_files_, level2_files_, 1);
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
}
|
|
}
|
|
|
|
void Compactor::compact_level(std::vector<std::string>& source_files,
|
|
std::vector<std::string>& dest_files, int level)
|
|
{
|
|
if (source_files.empty())
|
|
return;
|
|
|
|
size_t batch_size = std::min<size_t>(4, source_files.size());
|
|
std::vector<std::string> batch(source_files.begin(), source_files.begin() + batch_size);
|
|
|
|
std::vector<LFSkipList<Hash128, std::string>> loaded;
|
|
loaded.reserve(batch_size);
|
|
|
|
for (const auto& file : batch)
|
|
{
|
|
VersionManager dummy_vm("dummy");
|
|
LFSkipList<Hash128, std::string> table(dummy_vm);
|
|
read_sstable_with_mmap(table, file);
|
|
loaded.push_back(std::move(table));
|
|
|
|
std::filesystem::remove(file);
|
|
}
|
|
|
|
LFSkipList<Hash128, std::string> merged(version_manager_);
|
|
|
|
for (auto& table : loaded)
|
|
{
|
|
table.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version)
|
|
{
|
|
if (!is_tombstone)
|
|
merged.insert(key, value);
|
|
else
|
|
merged.erase(key);
|
|
});
|
|
}
|
|
|
|
std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" +
|
|
std::to_string(version_manager_.next_version()) + ".dat";
|
|
write_sstable_with_index(merged, new_filename);
|
|
|
|
dest_files.push_back(new_filename);
|
|
|
|
source_files.erase(source_files.begin(), source_files.begin() + batch_size);
|
|
|
|
if (level == 0)
|
|
{
|
|
level0_size_.fetch_sub(batch_size, std::memory_order_relaxed);
|
|
level1_size_.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
else if (level == 1)
|
|
{
|
|
level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
|
|
level2_size_.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
}
|
|
} // utils
|
|
// usub
|