// // Created by Kirill Zhukov on 20.04.2025. // #include "UDB.h" namespace usub::core { UDB::UDB(const std::string& db_name, const std::string& shm_name, utils::DatabaseSettings settings, bool create_new) : db_name_(db_name), shm_name_(shm_name), shm_queue_capacity_(settings.shm_queue_capacity), max_memtable_size_(settings.max_memtable_size), shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new), version_manager_(db_name), memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, this->version_manager_), compactor_(this->version_manager_), running_(true) { if (shm_manager_.needs_init()) { new(shm_manager_.base_ptr()) SharedCommandQueue(shm_queue_capacity_); } command_queue_ = reinterpret_cast(shm_manager_.base_ptr()); recover_from_logs(); background_flush_thread_ = std::thread(&UDB::background_flush_worker, this); compactor_.run(); } UDB::~UDB() { running_ = false; if (this->background_flush_thread_.joinable()) this->background_flush_thread_.join(); } void UDB::recover_from_logs() { utils::RecoveryLog recovery_log(this->db_name_); recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone) { if (!is_tombstone) { this->fast_cache_[key] = value; this->memtable_manager_.put(key, value); } else { this->fast_cache_.erase(key); this->memtable_manager_.remove(key); } }); std::cout << "[UDB] Recovery complete for database: " << this->db_name_ << "\n"; } void UDB::run() { std::cout << "[UDB] Server for DB '" << this->db_name_ << "' started.\n"; while (this->running_) { Command* cmd = this->command_queue_->peek(0); if (!cmd) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); continue; } if (cmd->ready.load(std::memory_order_acquire) == 1) { process_command(*cmd); this->command_queue_->pop(); } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } void UDB::process_command(Command& cmd) { switch (cmd.op) { case OperationType::PUT: { std::cout << "PUT\n"; std::string value(cmd.value, cmd.value_size); this->fast_cache_[cmd.key] = value; this->memtable_manager_.put(cmd.key, value); break; } case OperationType::DELETE: { std::cout << "DELETE\n"; fast_cache_.erase(cmd.key); memtable_manager_.remove(cmd.key); break; } case OperationType::FIND: { auto it = fast_cache_.find(cmd.key); if (it != fast_cache_.end()) { std::cout << "FIND\n"; std::string value = it->second; cmd.response_size = static_cast(value.size()); std::memcpy(cmd.response, value.data(), value.size()); } else { cmd.response_size = 0; } cmd.response_ready.store(1, std::memory_order_release); break; } default: std::cout << "[UNKNOWN COMMAND]\n"; break; } } void UDB::background_flush_worker() { while (running_) { memtable_manager_.flush_batch(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } }