2025-04-27 16:32:10 +00:00

123 lines
4.3 KiB
C++

//
// Created by Kirill Zhukov on 20.04.2025.
//
#include "UDB.h"
namespace usub::core {
UDB::UDB(const std::string &db_name, const std::string &shm_name,
utils::DatabaseSettings settings, bool create_new)
: db_name_(db_name),
shm_name_(shm_name),
shm_queue_capacity_(settings.shm_queue_capacity),
max_memtable_size_(settings.max_memtable_size),
shm_manager_(shm_name, SharedCommandQueue::calculate_shm_size(settings.shm_queue_capacity), create_new),
version_manager_(db_name),
memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size,
this->version_manager_),
compactor_(this->version_manager_),
running_(true), recovery_log_(this->db_name_) {
if (shm_manager_.needs_init())
new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_);
this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(shm_manager_.base_ptr());
recover_from_logs();
background_flush_thread_ = std::thread(&UDB::background_flush_worker, this);
compactor_.run();
}
UDB::~UDB() {
running_ = false;
if (this->background_flush_thread_.joinable())
this->background_flush_thread_.join();
}
void UDB::recover_from_logs() {
this->recovery_log_.replay([this](const Command &cmd) {
switch (cmd.op) {
case OperationType::PUT: {
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
break;
}
case OperationType::DELETE: {
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
default: {
std::cout << "RecoveryLog: skip unknown op = " << static_cast<int>(cmd.op) << "\n";
break;
}
}
});
std::cout << "[UDB] Recovery complete for database: " << this->db_name_ << "\n";
}
void UDB::run() {
std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n";
while (this->running_) {
Command *cmd = this->command_queue_->peek(0);
if (!cmd) {
cpu_relax();
continue;
}
std::cout << "[UDB] command were provided\n";
if (cmd->ready.load(std::memory_order_acquire) == 1) {
process_command(*cmd);
this->command_queue_->pop();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
void UDB::process_command(Command &cmd) {
this->recovery_log_.log_command(cmd);
switch (cmd.op) {
case OperationType::PUT: {
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
break;
}
case OperationType::DELETE: {
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
case OperationType::FIND: {
auto it = this->fast_cache_.find(cmd.key);
if (it != this->fast_cache_.end()) {
std::string value = it->second;
cmd.response_size = static_cast<uint32_t>(value.size());
std::memcpy(cmd.response, value.data(), value.size());
} else {
cmd.response_size = 0;
}
cmd.response_ready.store(1, std::memory_order_release);
break;
}
default:
std::cout << "[UNKNOWN COMMAND]\n";
break;
}
}
void UDB::background_flush_worker() {
while (running_) {
this->memtable_manager_.flush_batch();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}