fixed recovery log

This commit is contained in:
g2px1 2025-04-21 12:51:58 +03:00
parent 86dbb1f942
commit b03fd39575
6 changed files with 81 additions and 44 deletions

View File

@ -4,6 +4,9 @@
#include "Command.h"
#include <istream>
#include <ostream>
namespace usub::core
{
Command::Command()
@ -45,4 +48,36 @@ namespace usub::core
}
return *this;
}
void Command::serialize(std::ostream& out) const
{
auto op_val = static_cast<uint8_t>(op);
out.write(reinterpret_cast<const char*>(&op_val), sizeof(op_val));
out.write(reinterpret_cast<const char*>(&key), sizeof(key));
if (op == OperationType::PUT)
{
out.write(reinterpret_cast<const char*>(&value_size), sizeof(value_size));
out.write(value, value_size);
}
}
Command Command::deserialize(std::istream& in)
{
uint8_t op_val;
in.read(reinterpret_cast<char*>(&op_val), sizeof(op_val));
Command cmd;
cmd.op = static_cast<OperationType>(op_val);
in.read(reinterpret_cast<char*>(&cmd.key), sizeof(cmd.key));
if (cmd.op == OperationType::PUT)
{
in.read(reinterpret_cast<char*>(&cmd.value_size), sizeof(cmd.value_size));
in.read(cmd.value, cmd.value_size);
}
return cmd;
}
}

View File

@ -27,6 +27,10 @@ namespace usub::core
Command& operator=(const Command& other);
void serialize(std::ostream& out) const;
static Command deserialize(std::istream& in);
std::atomic<uint8_t> ready;
OperationType op;
utils::Hash128 key;

View File

@ -14,9 +14,10 @@ namespace usub::core
max_memtable_size_(settings.max_memtable_size),
shm_manager_(shm_name, sizeof(SharedCommandQueue), create_new),
version_manager_(db_name),
memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size, this->version_manager_),
memtable_manager_(db_name + "_wal", settings.max_memtable_size, settings.estimated_element_size,
this->version_manager_),
compactor_(this->version_manager_),
running_(true)
running_(true), recovery_log_(this->db_name_)
{
if (shm_manager_.needs_init())
{
@ -41,19 +42,28 @@ namespace usub::core
void UDB::recover_from_logs()
{
utils::RecoveryLog recovery_log(this->db_name_);
recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone)
this->recovery_log_.replay([this](const Command& cmd)
{
if (!is_tombstone)
switch (cmd.op)
{
this->fast_cache_[key] = value;
this->memtable_manager_.put(key, value);
}
else
{
this->fast_cache_.erase(key);
this->memtable_manager_.remove(key);
case OperationType::PUT:
{
std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value);
break;
}
case OperationType::DELETE:
{
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
default:
{
std::cout << "RecoveryLog: skip unknown op = " << static_cast<int>(cmd.op) << "\n";
break;
}
}
});
@ -88,6 +98,7 @@ namespace usub::core
void UDB::process_command(Command& cmd)
{
this->recovery_log_.log_command(cmd);
switch (cmd.op)
{
case OperationType::PUT:
@ -101,8 +112,8 @@ namespace usub::core
case OperationType::DELETE:
{
std::cout << "DELETE\n";
fast_cache_.erase(cmd.key);
memtable_manager_.remove(cmd.key);
this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key);
break;
}
case OperationType::FIND:
@ -133,7 +144,7 @@ namespace usub::core
{
while (running_)
{
memtable_manager_.flush_batch();
this->memtable_manager_.flush_batch();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

View File

@ -51,6 +51,7 @@ namespace usub::core
SharedMemoryManager shm_manager_;
SharedCommandQueue* command_queue_;
utils::RecoveryLog recovery_log_;
utils::VersionManager version_manager_;

View File

@ -51,6 +51,12 @@ namespace usub::utils
this->log_out.flush();
}
void RecoveryLog::log_command(const core::Command& cmd)
{
cmd.serialize(this->log_out);
this->log_out.flush();
}
void RecoveryLog::ensure_metadata_dir() const
{
try
@ -66,39 +72,16 @@ namespace usub::utils
}
}
void RecoveryLog::replay(
const std::function<void(const Hash128& key, const std::string& value, bool is_tombstone)>& callback) const
void RecoveryLog::replay(const std::function<void(const core::Command&)>& callback) const
{
std::ifstream in(log_file, std::ios::binary);
std::ifstream in(this->log_file, std::ios::binary);
if (!in.is_open())
return;
while (in.peek() != EOF)
{
uint8_t op;
in.read(reinterpret_cast<char*>(&op), sizeof(op));
Hash128 key;
in.read(reinterpret_cast<char*>(&key), sizeof(key));
if (op == 0) // PUT
{
uint32_t value_size;
in.read(reinterpret_cast<char*>(&value_size), sizeof(value_size));
std::string value(value_size, '\0');
in.read(&value[0], value_size);
callback(key, value, false);
}
else if (op == 1) // DELETE
{
callback(key, "", true);
}
else
{
throw std::runtime_error("Invalid RecoveryLog format");
}
core::Command cmd = core::Command::deserialize(in);
callback(cmd);
}
}
} // utils

View File

@ -10,6 +10,7 @@
#include <string>
#include <functional>
#include "utils/hash/Hash128.h"
#include "core/Command.h"
namespace usub::utils
{
@ -24,7 +25,9 @@ namespace usub::utils
void log_delete(const std::string& key);
void replay(const std::function<void(const Hash128& key, const std::string& value, bool is_tombstone)>& callback) const;
void log_command(const core::Command& cmd);
void replay(const std::function<void(const core::Command&)>& callback) const;
private:
void ensure_metadata_dir() const;