457 lines
15 KiB
C++
457 lines
15 KiB
C++
//
|
|
// Created by Kirill Zhukov on 20.04.2025.
|
|
//
|
|
|
|
#ifndef MEMTABLE_H
|
|
#define MEMTABLE_H
|
|
|
|
#include <fstream>
|
|
#include <fcntl.h>
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
#include <sys/mman.h>
|
|
#include <vector>
|
|
#include <utility>
|
|
#include <string>
|
|
|
|
#include "utils/hash/Hash128.h"
|
|
|
|
namespace usub::utils
|
|
{
|
|
template <typename SkipList>
|
|
void write_sstable(const SkipList& memtable, const std::string& filename)
|
|
{
|
|
int fd = ::open(filename.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0644);
|
|
if (fd < 0) throw std::runtime_error("Failed to open SSTable");
|
|
|
|
FILE* file = ::fdopen(fd, "wb");
|
|
if (!file)
|
|
{
|
|
::close(fd);
|
|
throw std::runtime_error("Failed to fdopen SSTable");
|
|
}
|
|
|
|
uint64_t current_offset = 0;
|
|
std::vector<std::pair<typename SkipList::key_type, uint64_t>> index_entries;
|
|
|
|
memtable.for_each([&](const auto& key, const auto& value)
|
|
{
|
|
uint8_t is_tombstone = 0;
|
|
uint64_t version = 0;
|
|
|
|
if constexpr (requires { value.is_tombstone; })
|
|
{
|
|
is_tombstone = value.is_tombstone ? 1 : 0;
|
|
version = value.version;
|
|
}
|
|
|
|
uint32_t key_len = sizeof(key);
|
|
uint32_t value_len = value.size();
|
|
|
|
::fwrite(&key_len, sizeof(key_len), 1, file);
|
|
::fwrite(&key, key_len, 1, file);
|
|
::fwrite(&value_len, sizeof(value_len), 1, file);
|
|
::fwrite(value.data(), value_len, 1, file);
|
|
::fwrite(&is_tombstone, sizeof(is_tombstone), 1, file);
|
|
::fwrite(&version, sizeof(version), 1, file);
|
|
|
|
index_entries.emplace_back(key, current_offset);
|
|
current_offset += sizeof(key_len) + key_len + sizeof(value_len) + value_len + sizeof(is_tombstone) + sizeof(
|
|
version);
|
|
});
|
|
|
|
uint64_t index_offset = current_offset;
|
|
|
|
for (const auto& [key, offset] : index_entries)
|
|
{
|
|
uint32_t key_len = sizeof(key);
|
|
::fwrite(&key_len, sizeof(key_len), 1, file);
|
|
::fwrite(&key, key_len, 1, file);
|
|
::fwrite(&offset, sizeof(offset), 1, file);
|
|
}
|
|
|
|
::fwrite(&index_offset, sizeof(index_offset), 1, file);
|
|
|
|
::fflush(file);
|
|
::fsync(fd);
|
|
::fclose(file);
|
|
}
|
|
|
|
|
|
template <typename SkipList>
|
|
void read_sstable(SkipList& memtable, const std::string& filename)
|
|
{
|
|
std::ifstream sstable(filename, std::ios::binary);
|
|
if (!sstable.is_open())
|
|
{
|
|
throw std::runtime_error("Failed to open file: " + filename);
|
|
}
|
|
|
|
while (sstable.peek() != EOF)
|
|
{
|
|
uint32_t key_len = 0;
|
|
sstable.read(reinterpret_cast<char*>(&key_len), sizeof(key_len));
|
|
|
|
if (key_len != sizeof(typename SkipList::key_type))
|
|
{
|
|
throw std::runtime_error("Key size mismatch");
|
|
}
|
|
|
|
typename SkipList::key_type key{};
|
|
sstable.read(reinterpret_cast<char*>(&key), key_len);
|
|
|
|
uint32_t value_len = 0;
|
|
sstable.read(reinterpret_cast<char*>(&value_len), sizeof(value_len));
|
|
|
|
std::string value(value_len, '\0');
|
|
sstable.read(value.data(), value_len);
|
|
|
|
memtable.insert(key, value);
|
|
}
|
|
}
|
|
|
|
template <typename SkipList>
|
|
void write_sstable_with_index(const SkipList& memtable, const std::string& filename)
|
|
{
|
|
std::ofstream out(filename, std::ios::binary | std::ios::trunc);
|
|
if (!out.is_open())
|
|
{
|
|
throw std::runtime_error("Failed to open SSTable file");
|
|
}
|
|
|
|
std::vector<std::pair<Hash128, uint64_t>> index_entries;
|
|
uint64_t current_offset = 0;
|
|
|
|
memtable.for_each_raw([&](const auto& key, const auto& value, bool is_tombstone, uint64_t version)
|
|
{
|
|
out.write(reinterpret_cast<const char*>(&key), sizeof(key));
|
|
|
|
uint32_t value_len = value.size();
|
|
out.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
|
|
out.write(value.data(), value_len);
|
|
|
|
uint8_t tombstone_flag = is_tombstone ? 1 : 0;
|
|
out.write(reinterpret_cast<const char*>(&tombstone_flag), sizeof(tombstone_flag));
|
|
out.write(reinterpret_cast<const char*>(&version), sizeof(version));
|
|
|
|
index_entries.emplace_back(key, current_offset);
|
|
current_offset += sizeof(key) + sizeof(value_len) + value_len + sizeof(tombstone_flag) + sizeof(version);
|
|
});
|
|
|
|
uint64_t index_offset = current_offset;
|
|
|
|
for (const auto& [key, offset] : index_entries)
|
|
{
|
|
out.write(reinterpret_cast<const char*>(&key), sizeof(key));
|
|
out.write(reinterpret_cast<const char*>(&offset), sizeof(offset));
|
|
}
|
|
|
|
out.write(reinterpret_cast<const char*>(&index_offset), sizeof(index_offset));
|
|
}
|
|
|
|
template <typename SkipList>
|
|
void read_sstable_with_index(SkipList& memtable, const std::string& filename)
|
|
{
|
|
std::ifstream sstable(filename, std::ios::binary);
|
|
if (!sstable.is_open())
|
|
{
|
|
throw std::runtime_error("Failed to open file: " + filename);
|
|
}
|
|
|
|
sstable.seekg(-sizeof(uint64_t), std::ios::end);
|
|
uint64_t index_offset = 0;
|
|
sstable.read(reinterpret_cast<char*>(&index_offset), sizeof(index_offset));
|
|
|
|
sstable.seekg(index_offset, std::ios::beg);
|
|
|
|
std::vector<std::pair<typename SkipList::key_type, uint64_t>> index_entries;
|
|
|
|
while (sstable.tellg() < static_cast<std::streamoff>(sstable.end))
|
|
{
|
|
uint32_t key_len;
|
|
if (!sstable.read(reinterpret_cast<char*>(&key_len), sizeof(key_len)))
|
|
break;
|
|
|
|
typename SkipList::key_type key{};
|
|
sstable.read(reinterpret_cast<char*>(&key), key_len);
|
|
|
|
uint64_t offset = 0;
|
|
sstable.read(reinterpret_cast<char*>(&offset), sizeof(offset));
|
|
|
|
index_entries.emplace_back(key, offset);
|
|
}
|
|
|
|
for (const auto& [key, offset] : index_entries)
|
|
{
|
|
sstable.seekg(offset, std::ios::beg);
|
|
|
|
uint32_t key_len = 0;
|
|
sstable.read(reinterpret_cast<char*>(&key_len), sizeof(key_len));
|
|
|
|
typename SkipList::key_type file_key{};
|
|
sstable.read(reinterpret_cast<char*>(&file_key), key_len);
|
|
|
|
uint32_t value_len = 0;
|
|
sstable.read(reinterpret_cast<char*>(&value_len), sizeof(value_len));
|
|
|
|
std::string value(value_len, '\0');
|
|
sstable.read(value.data(), value_len);
|
|
|
|
memtable.insert(file_key, value);
|
|
}
|
|
}
|
|
|
|
template <typename SkipList, typename Callback>
|
|
void range_query_sstable(const std::string& filename,
|
|
const typename SkipList::key_type& from_key,
|
|
const typename SkipList::key_type& to_key,
|
|
Callback&& callback)
|
|
{
|
|
std::ifstream sstable(filename, std::ios::binary);
|
|
if (!sstable.is_open())
|
|
{
|
|
throw std::runtime_error("Failed to open file: " + filename);
|
|
}
|
|
|
|
sstable.seekg(-sizeof(uint64_t), std::ios::end);
|
|
uint64_t index_offset = 0;
|
|
sstable.read(reinterpret_cast<char*>(&index_offset), sizeof(index_offset));
|
|
|
|
sstable.seekg(index_offset, std::ios::beg);
|
|
|
|
std::vector<std::pair<typename SkipList::key_type, uint64_t>> index_entries;
|
|
|
|
while (sstable.peek() != EOF)
|
|
{
|
|
uint32_t key_len;
|
|
if (!sstable.read(reinterpret_cast<char*>(&key_len), sizeof(key_len)))
|
|
break;
|
|
|
|
typename SkipList::key_type key{};
|
|
sstable.read(reinterpret_cast<char*>(&key), key_len);
|
|
|
|
uint64_t offset = 0;
|
|
sstable.read(reinterpret_cast<char*>(&offset), sizeof(offset));
|
|
|
|
index_entries.emplace_back(key, offset);
|
|
}
|
|
|
|
auto it = std::lower_bound(index_entries.begin(), index_entries.end(), from_key,
|
|
[](const auto& pair, const auto& key) { return pair.first < key; });
|
|
|
|
for (; it != index_entries.end() && it->first <= to_key; ++it)
|
|
{
|
|
sstable.seekg(it->second, std::ios::beg);
|
|
|
|
uint32_t key_len;
|
|
sstable.read(reinterpret_cast<char*>(&key_len), sizeof(key_len));
|
|
|
|
typename SkipList::key_type file_key{};
|
|
sstable.read(reinterpret_cast<char*>(&file_key), key_len);
|
|
|
|
uint32_t value_len;
|
|
sstable.read(reinterpret_cast<char*>(&value_len), sizeof(value_len));
|
|
|
|
std::string value(value_len, '\0');
|
|
sstable.read(value.data(), value_len);
|
|
|
|
callback(file_key, value);
|
|
}
|
|
}
|
|
|
|
template <typename SkipList>
|
|
void replay_wal(SkipList& memtable, const std::string& wal_filename)
|
|
{
|
|
std::ifstream wal(wal_filename, std::ios::binary);
|
|
if (!wal.is_open())
|
|
{
|
|
throw std::runtime_error("Failed to open WAL file: " + wal_filename);
|
|
}
|
|
|
|
while (wal.peek() != EOF)
|
|
{
|
|
uint8_t op;
|
|
wal.read(reinterpret_cast<char*>(&op), sizeof(op));
|
|
|
|
uint32_t key_len;
|
|
wal.read(reinterpret_cast<char*>(&key_len), sizeof(key_len));
|
|
|
|
std::string key(key_len, '\0');
|
|
wal.read(key.data(), key_len);
|
|
|
|
if (op == 0)
|
|
{
|
|
// PUT
|
|
uint32_t value_len;
|
|
wal.read(reinterpret_cast<char*>(&value_len), sizeof(value_len));
|
|
std::string value(value_len, '\0');
|
|
wal.read(value.data(), value_len);
|
|
memtable.insert(key, value);
|
|
}
|
|
else if (op == 1)
|
|
{
|
|
memtable.erase(key);
|
|
}
|
|
else
|
|
{
|
|
throw std::runtime_error("Unknown WAL operation code");
|
|
}
|
|
}
|
|
}
|
|
|
|
template <typename SkipList>
|
|
void read_sstable_with_mmap(SkipList& memtable, const std::string& filename)
|
|
{
|
|
int fd = ::open(filename.c_str(), O_RDONLY);
|
|
if (fd < 0) throw std::runtime_error("Failed to open SSTable");
|
|
|
|
struct stat st;
|
|
if (fstat(fd, &st) != 0)
|
|
{
|
|
::close(fd);
|
|
throw std::runtime_error("Failed to stat SSTable");
|
|
}
|
|
|
|
void* data = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
|
|
if (data == MAP_FAILED)
|
|
{
|
|
::close(fd);
|
|
throw std::runtime_error("Failed to mmap SSTable");
|
|
}
|
|
|
|
const char* ptr = reinterpret_cast<const char*>(data);
|
|
const char* end = ptr + st.st_size;
|
|
|
|
uint64_t index_offset = *reinterpret_cast<const uint64_t*>(end - sizeof(uint64_t));
|
|
const char* index_ptr = ptr + index_offset;
|
|
|
|
std::vector<std::pair<usub::utils::Hash128, uint64_t>> index_entries;
|
|
while (index_ptr < end - sizeof(uint64_t))
|
|
{
|
|
usub::utils::Hash128 key;
|
|
std::memcpy(&key, index_ptr, sizeof(key));
|
|
index_ptr += sizeof(key);
|
|
|
|
uint64_t offset;
|
|
std::memcpy(&offset, index_ptr, sizeof(offset));
|
|
index_ptr += sizeof(offset);
|
|
|
|
index_entries.emplace_back(key, offset);
|
|
}
|
|
|
|
for (const auto& [key, offset] : index_entries)
|
|
{
|
|
const char* record = ptr + offset;
|
|
|
|
usub::utils::Hash128 file_key;
|
|
std::memcpy(&file_key, record, sizeof(file_key));
|
|
record += sizeof(file_key);
|
|
|
|
uint32_t value_len = *reinterpret_cast<const uint32_t*>(record);
|
|
record += sizeof(uint32_t);
|
|
|
|
std::string value(value_len, '\0');
|
|
std::memcpy(value.data(), record, value_len);
|
|
record += value_len;
|
|
|
|
uint8_t tombstone_flag = *reinterpret_cast<const uint8_t*>(record);
|
|
record += sizeof(uint8_t);
|
|
|
|
uint64_t version = *reinterpret_cast<const uint64_t*>(record);
|
|
record += sizeof(uint64_t);
|
|
|
|
memtable.insert_raw(file_key, value, tombstone_flag == 1, version);
|
|
}
|
|
|
|
munmap((void*)ptr, st.st_size);
|
|
::close(fd);
|
|
}
|
|
|
|
|
|
template <typename SkipList, typename Callback>
|
|
void optimized_range_query_sstable(const std::string& filename,
|
|
const typename SkipList::key_type& from_key,
|
|
const typename SkipList::key_type& to_key,
|
|
Callback&& callback)
|
|
{
|
|
int fd = ::open(filename.c_str(), O_RDONLY);
|
|
if (fd < 0) throw std::runtime_error("Failed to open file");
|
|
|
|
struct stat st;
|
|
if (fstat(fd, &st) != 0)
|
|
{
|
|
::close(fd);
|
|
throw std::runtime_error("Failed to stat file");
|
|
}
|
|
|
|
void* data = mmap(nullptr, st.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
|
|
if (data == MAP_FAILED)
|
|
{
|
|
::close(fd);
|
|
throw std::runtime_error("Failed to mmap file");
|
|
}
|
|
|
|
const char* ptr = reinterpret_cast<const char*>(data);
|
|
const char* end = ptr + st.st_size;
|
|
|
|
uint64_t index_offset = *reinterpret_cast<const uint64_t*>(end - sizeof(uint64_t));
|
|
const char* index_ptr = ptr + index_offset;
|
|
|
|
std::vector<std::pair<typename SkipList::key_type, uint64_t>> index_entries;
|
|
while (index_ptr < end - sizeof(uint64_t))
|
|
{
|
|
uint32_t key_len = *reinterpret_cast<const uint32_t*>(index_ptr);
|
|
index_ptr += sizeof(uint32_t);
|
|
|
|
typename SkipList::key_type key;
|
|
std::memcpy(&key, index_ptr, key_len);
|
|
index_ptr += key_len;
|
|
|
|
uint64_t offset = *reinterpret_cast<const uint64_t*>(index_ptr);
|
|
index_ptr += sizeof(uint64_t);
|
|
|
|
index_entries.emplace_back(key, offset);
|
|
}
|
|
|
|
// lower_bound по from_key
|
|
auto it = std::lower_bound(index_entries.begin(), index_entries.end(), from_key,
|
|
[](const auto& pair, const auto& key) { return pair.first < key; });
|
|
|
|
for (; it != index_entries.end() && it->first <= to_key; ++it)
|
|
{
|
|
const char* record = ptr + it->second;
|
|
|
|
uint32_t key_len = *reinterpret_cast<const uint32_t*>(record);
|
|
record += sizeof(uint32_t);
|
|
|
|
typename SkipList::key_type file_key;
|
|
std::memcpy(&file_key, record, key_len);
|
|
record += key_len;
|
|
|
|
uint32_t value_len = *reinterpret_cast<const uint32_t*>(record);
|
|
record += sizeof(uint32_t);
|
|
|
|
std::string value(value_len, '\0');
|
|
std::memcpy(value.data(), record, value_len);
|
|
record += value_len;
|
|
|
|
uint8_t is_tombstone = *reinterpret_cast<const uint8_t*>(record);
|
|
record += sizeof(uint8_t);
|
|
|
|
uint64_t version = *reinterpret_cast<const uint64_t*>(record);
|
|
record += sizeof(uint64_t);
|
|
|
|
if (!is_tombstone)
|
|
{
|
|
callback(file_key, value);
|
|
}
|
|
}
|
|
|
|
munmap(data, st.st_size);
|
|
::close(fd);
|
|
}
|
|
}
|
|
|
|
#endif //MEMTABLE_H
|