SharedStorage/core/Memtable.h

149 lines
4.4 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#ifndef MEMTABLE_H
#define MEMTABLE_H
#include <atomic>
#include <optional>
#include <mutex>
#include "utils/io/Wal.h"
#include "utils/io/VersionManager.h"
#include "utils/config/GlobalConfig.h"
namespace usub::shared_storage
{
using namespace usub::utils;
template <typename SkipList>
class MemTableManager
{
public:
MemTableManager(const std::string& wal_file, size_t max_size, size_t estimated_element_size, utils::VersionManager& vm);
~MemTableManager();
void put(const typename SkipList::key_type& key, const typename SkipList::value_type& value);
void remove(const typename SkipList::key_type& key);
std::optional<typename SkipList::value_type> get(const typename SkipList::key_type& key);
void flush();
void flush_batch();
private:
std::atomic<SkipList*> active_memtable;
WAL wal;
size_t max_memtable_size;
std::mutex batch_mutex;
std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> write_batch;
std::atomic<bool> flushing{false};
VersionManager& version_manager;
size_t estimated_element_size;
private:
size_t estimate_memtable_size() const;
size_t estimate_batch_size() const;
};
template <typename SkipList>
MemTableManager<SkipList>::MemTableManager(const std::string& wal_file, size_t max_size, size_t estimated_element_size, VersionManager& vm)
: wal(wal_file), max_memtable_size(max_size), version_manager(vm)
{
this->active_memtable.store(new SkipList(this->version_manager));
}
template <typename SkipList>
MemTableManager<SkipList>::~MemTableManager()
{
delete this->active_memtable.load();
}
template <typename SkipList>
void MemTableManager<SkipList>::put(const typename SkipList::key_type& key,
const typename SkipList::value_type& value)
{
{
std::lock_guard<std::mutex> lock(this->batch_mutex);
this->write_batch.emplace_back(key, value);
}
if (estimate_batch_size() >= 64)
{
flush_batch();
}
}
template <typename SkipList>
void MemTableManager<SkipList>::remove(const typename SkipList::key_type& key)
{
this->wal.write_delete(key);
this->active_memtable.load()->erase(key);
if (estimate_memtable_size() > this->max_memtable_size)
{
flush();
}
}
template <typename SkipList>
std::optional<typename SkipList::value_type> MemTableManager<SkipList>::get(const typename SkipList::key_type& key)
{
return this->active_memtable.load()->find(key);
}
template <typename SkipList>
void MemTableManager<SkipList>::flush()
{
if (this->flushing.exchange(true)) return;
auto old_memtable = this->active_memtable.exchange(new SkipList(this->version_manager));
std::string filename = "sstable_" + std::to_string(std::time(nullptr)) + ".dat";
write_sstable_with_index(*old_memtable, filename);
delete old_memtable;
this->wal.close();
this->flushing.store(false);
this->version_manager.flush();
}
template <typename SkipList>
void MemTableManager<SkipList>::flush_batch()
{
std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> local_batch;
{
std::lock_guard<std::mutex> lock(this->batch_mutex);
local_batch.swap(this->write_batch);
}
for (const auto& [key, value] : local_batch)
{
this->wal.write_put(key, value);
this->active_memtable.load()->insert(key, value);
}
if (estimate_memtable_size() > this->max_memtable_size)
{
flush();
}
this->version_manager.flush();
}
template <typename SkipList>
size_t MemTableManager<SkipList>::estimate_memtable_size() const
{
// For simplicity: count the number of elements * average size
return this->active_memtable.load()->unsafe_size() * 128; // The error is acceptable
}
template <typename SkipList>
size_t MemTableManager<SkipList>::estimate_batch_size() const
{
return this->write_batch.size();
}
} // shared_storage
// usub
#endif //MEMTABLE_H