SharedStorage/core/Memtable.h
2025-04-20 18:24:28 +03:00

141 lines
3.9 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#ifndef MEMTABLE_H
#define MEMTABLE_H
#include <atomic>
#include <optional>
#include <mutex>
#include "utils/io/Wal.h"
namespace usub::shared_storage
{
template <typename SkipList>
class MemTableManager
{
public:
MemTableManager(const std::string& wal_file, size_t max_size);
~MemTableManager();
void put(const typename SkipList::key_type& key, const typename SkipList::value_type& value);
void remove(const typename SkipList::key_type& key);
std::optional<typename SkipList::value_type> get(const typename SkipList::key_type& key);
void flush();
void flush_batch();
private:
std::atomic<SkipList*> active_memtable;
utils::WAL wal;
size_t max_memtable_size;
std::mutex batch_mutex;
std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> write_batch;
std::atomic<bool> flushing{false};
private:
size_t estimate_memtable_size() const;
size_t estimate_batch_size() const;
};
template <typename SkipList>
MemTableManager<SkipList>::MemTableManager(const std::string& wal_file, size_t max_size) : wal(wal_file),
max_memtable_size(max_size)
{
this->active_memtable.store(new SkipList());
}
template <typename SkipList>
MemTableManager<SkipList>::~MemTableManager()
{
delete this->active_memtable.load();
}
template <typename SkipList>
void MemTableManager<SkipList>::put(const typename SkipList::key_type& key,
const typename SkipList::value_type& value)
{
{
std::lock_guard<std::mutex> lock(batch_mutex);
write_batch.emplace_back(key, value);
}
if (estimate_batch_size() >= 64)
{
flush_batch();
}
}
template <typename SkipList>
void MemTableManager<SkipList>::remove(const typename SkipList::key_type& key)
{
this->wal.write_delete(key);
this->active_memtable.load()->erase(key);
if (estimate_memtable_size() > this->max_memtable_size)
{
flush();
}
}
template <typename SkipList>
std::optional<typename SkipList::value_type> MemTableManager<SkipList>::get(const typename SkipList::key_type& key)
{
return this->active_memtable.load()->find(key);
}
template <typename SkipList>
void MemTableManager<SkipList>::flush()
{
if (this->flushing.exchange(true)) return;
auto old_memtable = this->active_memtable.exchange(new SkipList());
std::string filename = "sstable_" + std::to_string(std::time(nullptr)) + ".dat";
write_sstable_v2(*old_memtable, filename);
delete old_memtable;
this->wal.close();
this->flushing.store(false);
}
template <typename SkipList>
void MemTableManager<SkipList>::flush_batch()
{
std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> local_batch;
{
std::lock_guard<std::mutex> lock(batch_mutex);
local_batch.swap(write_batch);
}
for (const auto& [key, value] : local_batch)
{
wal.write_put(key, value);
active_memtable.load()->insert(key, value);
}
if (estimate_memtable_size() > max_memtable_size)
{
flush();
}
}
template <typename SkipList>
size_t MemTableManager<SkipList>::estimate_memtable_size() const
{
// For simplicity: count the number of elements * average size
return this->active_memtable.load()->unsafe_size() * 128; // The error is acceptable
}
template <typename SkipList>
size_t MemTableManager<SkipList>::estimate_batch_size() const
{
return write_batch.size();
}
} // shared_storage
// usub
#endif //MEMTABLE_H