diff --git a/core/Command.cpp b/core/Command.cpp index 94437b7..63e95ed 100644 --- a/core/Command.cpp +++ b/core/Command.cpp @@ -4,6 +4,9 @@ #include "Command.h" +#include +#include + namespace usub::core { Command::Command() @@ -45,4 +48,36 @@ namespace usub::core } return *this; } + + void Command::serialize(std::ostream& out) const + { + auto op_val = static_cast(op); + out.write(reinterpret_cast(&op_val), sizeof(op_val)); + out.write(reinterpret_cast(&key), sizeof(key)); + + if (op == OperationType::PUT) + { + out.write(reinterpret_cast(&value_size), sizeof(value_size)); + out.write(value, value_size); + } + } + + Command Command::deserialize(std::istream& in) + { + uint8_t op_val; + in.read(reinterpret_cast(&op_val), sizeof(op_val)); + + Command cmd; + cmd.op = static_cast(op_val); + + in.read(reinterpret_cast(&cmd.key), sizeof(cmd.key)); + + if (cmd.op == OperationType::PUT) + { + in.read(reinterpret_cast(&cmd.value_size), sizeof(cmd.value_size)); + in.read(cmd.value, cmd.value_size); + } + + return cmd; + } } diff --git a/core/Command.h b/core/Command.h index f35bf8f..6e51a77 100644 --- a/core/Command.h +++ b/core/Command.h @@ -27,6 +27,10 @@ namespace usub::core Command& operator=(const Command& other); + void serialize(std::ostream& out) const; + + static Command deserialize(std::istream& in); + std::atomic ready; OperationType op; utils::Hash128 key; diff --git a/core/UDB.cpp b/core/UDB.cpp index 0e1a77e..2e2ad05 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -14,9 +14,10 @@ namespace usub::core max_memtable_size_(settings.max_memtable_size), shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new), version_manager_(db_name), - memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, this->version_manager_), + memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, + this->version_manager_), compactor_(this->version_manager_), - running_(true) + running_(true), recovery_log_(this->db_name_) { if (shm_manager_.needs_init()) { @@ -41,19 +42,28 @@ namespace usub::core void UDB::recover_from_logs() { - utils::RecoveryLog recovery_log(this->db_name_); - - recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone) + this->recovery_log_.replay([this](const Command& cmd) { - if (!is_tombstone) + switch (cmd.op) { - this->fast_cache_[key] = value; - this->memtable_manager_.put(key, value); - } - else - { - this->fast_cache_.erase(key); - this->memtable_manager_.remove(key); + case OperationType::PUT: + { + std::string value(cmd.value, cmd.value_size); + this->fast_cache_[cmd.key] = value; + this->memtable_manager_.put(cmd.key, value); + break; + } + case OperationType::DELETE: + { + this->fast_cache_.erase(cmd.key); + this->memtable_manager_.remove(cmd.key); + break; + } + default: + { + std::cout << "RecoveryLog: skip unknown op = " << static_cast(cmd.op) << "\n"; + break; + } } }); @@ -88,6 +98,7 @@ namespace usub::core void UDB::process_command(Command& cmd) { + this->recovery_log_.log_command(cmd); switch (cmd.op) { case OperationType::PUT: @@ -101,8 +112,8 @@ namespace usub::core case OperationType::DELETE: { std::cout << "DELETE\n"; - fast_cache_.erase(cmd.key); - memtable_manager_.remove(cmd.key); + this->fast_cache_.erase(cmd.key); + this->memtable_manager_.remove(cmd.key); break; } case OperationType::FIND: @@ -133,7 +144,7 @@ namespace usub::core { while (running_) { - memtable_manager_.flush_batch(); + this->memtable_manager_.flush_batch(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } diff --git a/core/UDB.h b/core/UDB.h index ac5a3d7..8dd2e29 100644 --- a/core/UDB.h +++ b/core/UDB.h @@ -51,6 +51,7 @@ namespace usub::core SharedMemoryManager shm_manager_; SharedCommandQueue* command_queue_; + utils::RecoveryLog recovery_log_; utils::VersionManager version_manager_; diff --git a/utils/io/RecoveryLog.cpp b/utils/io/RecoveryLog.cpp index 4f94ad1..0f13a9b 100644 --- a/utils/io/RecoveryLog.cpp +++ b/utils/io/RecoveryLog.cpp @@ -51,6 +51,12 @@ namespace usub::utils this->log_out.flush(); } + void RecoveryLog::log_command(const core::Command& cmd) + { + cmd.serialize(this->log_out); + this->log_out.flush(); + } + void RecoveryLog::ensure_metadata_dir() const { try @@ -66,39 +72,16 @@ namespace usub::utils } } - void RecoveryLog::replay( - const std::function& callback) const + void RecoveryLog::replay(const std::function& callback) const { - std::ifstream in(log_file, std::ios::binary); + std::ifstream in(this->log_file, std::ios::binary); if (!in.is_open()) return; while (in.peek() != EOF) { - uint8_t op; - in.read(reinterpret_cast(&op), sizeof(op)); - - Hash128 key; - in.read(reinterpret_cast(&key), sizeof(key)); - - if (op == 0) // PUT - { - uint32_t value_size; - in.read(reinterpret_cast(&value_size), sizeof(value_size)); - - std::string value(value_size, '\0'); - in.read(&value[0], value_size); - - callback(key, value, false); - } - else if (op == 1) // DELETE - { - callback(key, "", true); - } - else - { - throw std::runtime_error("Invalid RecoveryLog format"); - } + core::Command cmd = core::Command::deserialize(in); + callback(cmd); } } } // utils diff --git a/utils/io/RecoveryLog.h b/utils/io/RecoveryLog.h index bd6c51a..3dced6a 100644 --- a/utils/io/RecoveryLog.h +++ b/utils/io/RecoveryLog.h @@ -10,6 +10,7 @@ #include #include #include "utils/hash/Hash128.h" +#include "core/Command.h" namespace usub::utils { @@ -24,7 +25,9 @@ namespace usub::utils void log_delete(const std::string& key); - void replay(const std::function& callback) const; + void log_command(const core::Command& cmd); + + void replay(const std::function& callback) const; private: void ensure_metadata_dir() const;