Compare commits

...

2 Commits

Author SHA1 Message Date
4bcacdc7dc added cstring 2025-04-21 15:47:35 +03:00
199fb1f34b make compactor more flexible 2025-04-21 14:08:42 +03:00
13 changed files with 122 additions and 52 deletions

View File

@ -5,6 +5,8 @@
#include <cstring>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include "core/SharedCommandQueue.h"
#include "utils/hash/Hash128.h"
@ -76,7 +78,15 @@ int main()
return 1;
}
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
struct stat statbuf;
if (fstat(shm_fd, &statbuf) == -1)
{
perror("fstat failed");
close(shm_fd);
return 1;
}
void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED)
{
perror("mmap failed");
@ -91,12 +101,14 @@ int main()
// PUT Command
// -------------------
{
std::cout << "1\n";
size_t head = queue->head().load(std::memory_order_relaxed);
size_t next_head = (head + 1) % queue->capacity();
while (next_head == queue->tail().load(std::memory_order_acquire))
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::cout << "2\n";
usub::core::Command* slot = &queue->raw_buffer()[head];
@ -104,7 +116,9 @@ int main()
slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов
const char* put_value = "test_value";
slot->value_size = std::strlen(put_value);
std::cout << "3\n";
std::memcpy(slot->value, put_value, slot->value_size);
std::cout << "4\n";
slot->response_size = 0;
slot->response_ready.store(0, std::memory_order_relaxed);

View File

@ -105,6 +105,7 @@ namespace usub::shared_storage
delete old_memtable;
this->wal.close();
this->flushing.store(false);
this->version_manager.flush();
}
template <typename SkipList>
@ -126,6 +127,7 @@ namespace usub::shared_storage
{
flush();
}
this->version_manager.flush();
}
template <typename SkipList>

View File

@ -192,4 +192,5 @@ namespace usub::core
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
}
}

View File

@ -54,9 +54,9 @@ namespace usub::core
private:
size_t capacity_;
Command* commands_;
alignas(64) std::atomic<size_t> head_;
alignas(64) std::atomic<size_t> tail_;
alignas(64) Command buffer_[];
std::atomic<size_t> head_;
std::atomic<size_t> tail_;
Command buffer_[];
};
}

View File

@ -48,6 +48,7 @@ namespace usub::core
{
case OperationType::PUT:
{
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
@ -55,6 +56,7 @@ namespace usub::core
}
case OperationType::DELETE:
{
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
@ -103,7 +105,6 @@ namespace usub::core
{
case OperationType::PUT:
{
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
@ -111,17 +112,15 @@ namespace usub::core
}
case OperationType::DELETE:
{
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
case OperationType::FIND:
{
auto it = fast_cache_.find(cmd.key);
if (it != fast_cache_.end())
auto it = this->fast_cache_.find(cmd.key);
if (it != this->fast_cache_.end())
{
std::cout << "FIND\n";
std::string value = it->second;
cmd.response_size = static_cast<uint32_t>(value.size());
std::memcpy(cmd.response, value.data(), value.size());

View File

@ -189,6 +189,24 @@ namespace usub::utils
}
[[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; }
bool try_push_batch(const T* items, size_t count)
{
size_t head = this->head.load(std::memory_order_relaxed);
size_t tail = this->tail.load(std::memory_order_acquire);
size_t free_slots = (tail + this->capacity - head - 1) % this->capacity;
if (free_slots < count)
return false;
for (size_t i = 0; i < count; ++i)
{
this->buffer[(head + i) % this->capacity] = items[i];
}
this->head.store((head + count) % this->capacity, std::memory_order_release);
return true;
}
};
}

View File

@ -34,6 +34,7 @@ namespace usub::utils
{
while (this->running_)
{
// Queue → 0
for (size_t i = 0; i < 8; ++i)
{
auto file = this->l0_queue_.pop();
@ -41,7 +42,7 @@ namespace usub::utils
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
this->level0_files_.push_back(*file);
this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик
this->level0_size_.fetch_add(1, std::memory_order_relaxed);
}
else
{
@ -49,19 +50,35 @@ namespace usub::utils
}
}
if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
bool did_compact = false;
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
compact_level(this->level0_files_, this->level1_files_, 0);
if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
{
compact_level(this->level0_files_, this->level1_files_, 0);
did_compact = true;
}
}
if (level1_size_.load(std::memory_order_relaxed) >= 4)
// 1 → 2
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
compact_level(this->level1_files_, this->level2_files_, 1);
if (this->level1_size_.load(std::memory_order_relaxed) >= 4)
{
compact_level(this->level1_files_, this->level2_files_, 1);
did_compact = true;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!did_compact)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
@ -118,6 +135,8 @@ namespace usub::utils
this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level2_size_.fetch_add(1, std::memory_order_relaxed);
}
this->version_manager_.flush();
}
} // utils
// usub

View File

@ -19,13 +19,16 @@ namespace usub::utils
{
public:
explicit Compactor(VersionManager& vm);
~Compactor();
void add_sstable_l0(const std::string& filename);
void run();
private:
void background_worker();
void compact_level(std::vector<std::string>& source_files, std::vector<std::string>& dest_files, int level);
private:

View File

@ -8,24 +8,19 @@
namespace usub::utils
{
RecoveryLog::RecoveryLog(const std::string& dbname)
: db_name(dbname),
metadata_dir("metadata/" + db_name + "/"),
log_file(metadata_dir + "recovery.log")
: db_name_(dbname),
metadata_dir_("metadata/" + dbname + "/"),
log_file_(metadata_dir_ + "recovery.log")
{
ensure_metadata_dir();
this->log_out.open(this->log_file, std::ios::binary | std::ios::app);
if (!this->log_out.is_open())
{
throw std::runtime_error("Failed to open recovery log for " + this->db_name);
}
this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
if (!this->log_out_.is_open())
throw std::runtime_error("Failed to open recovery log for " + this->db_name_);
}
RecoveryLog::~RecoveryLog()
{
if (this->log_out.is_open())
{
this->log_out.close();
}
if (log_out_.is_open()) log_out_.close();
}
void RecoveryLog::log_put(const std::string& key, const std::string& value)
@ -33,48 +28,46 @@ namespace usub::utils
uint8_t op = 0;
uint32_t key_len = key.size();
uint32_t value_len = value.size();
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out.write(key.data(), key_len);
this->log_out.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
this->log_out.write(value.data(), value_len);
this->log_out.flush();
this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out_.write(key.data(), key_len);
this->log_out_.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
this->log_out_.write(value.data(), value_len);
this->log_out_.flush();
}
void RecoveryLog::log_delete(const std::string& key)
{
uint8_t op = 1;
uint32_t key_len = key.size();
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out.write(key.data(), key_len);
this->log_out.flush();
this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out_.write(key.data(), key_len);
this->log_out_.flush();
}
void RecoveryLog::log_command(const core::Command& cmd)
{
cmd.serialize(this->log_out);
this->log_out.flush();
cmd.serialize(this->log_out_);
this->log_out_.flush();
}
void RecoveryLog::ensure_metadata_dir() const
{
try
{
if (!std::filesystem::exists(this->metadata_dir))
{
std::filesystem::create_directories(this->metadata_dir);
}
if (!std::filesystem::exists(metadata_dir_)) std::filesystem::create_directories(metadata_dir_);
}
catch (const std::exception& e)
{
std::cerr << "Failed to create metadata dir: " << e.what() << std::endl;
std::cerr << "Failed to create metadata directory: " << e.what() << std::endl;
}
}
void RecoveryLog::replay(const std::function<void(const core::Command&)>& callback) const
{
std::ifstream in(this->log_file, std::ios::binary);
std::ifstream in(this->log_file_, std::ios::binary);
if (!in.is_open())
return;
@ -84,5 +77,16 @@ namespace usub::utils
callback(cmd);
}
}
void RecoveryLog::clear()
{
if (this->log_out_.is_open())
this->log_out_.close();
std::filesystem::remove(this->log_file_);
this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
if (!this->log_out_.is_open())
throw std::runtime_error("Failed to reopen recovery log after clear for " + this->db_name_);
}
} // utils
// usub

View File

@ -29,14 +29,16 @@ namespace usub::utils
void replay(const std::function<void(const core::Command&)>& callback) const;
void clear();
private:
void ensure_metadata_dir() const;
private:
std::string db_name;
std::string metadata_dir;
std::string log_file;
std::ofstream log_out;
std::string db_name_;
std::string metadata_dir_;
std::string log_file_;
std::ofstream log_out_;
};
} // utils
// usub

View File

@ -14,6 +14,7 @@
#include <vector>
#include <utility>
#include <string>
#include <cstring>
#include "utils/hash/Hash128.h"

View File

@ -7,9 +7,9 @@
namespace usub::utils
{
VersionManager::VersionManager(const std::string& dbname)
: db_name(dbname),
metadata_dir("metadata/" + this->db_name + "/"),
version_file(this->metadata_dir + "version.meta")
: db_name(dbname),
metadata_dir("metadata/" + this->db_name + "/"),
version_file(this->metadata_dir + "version.meta")
{
ensure_metadata_dir();
load_version();
@ -20,6 +20,11 @@ namespace usub::utils
return this->version.fetch_add(1, std::memory_order_relaxed);
}
void VersionManager::flush()
{
save_version();
}
void VersionManager::ensure_metadata_dir()
{
try

View File

@ -26,6 +26,8 @@ namespace usub::utils
uint64_t next_version();
void flush();
private:
void ensure_metadata_dir();