Compare commits

...

2 Commits

Author SHA1 Message Date
4bcacdc7dc added cstring 2025-04-21 15:47:35 +03:00
199fb1f34b make compactor more flexible 2025-04-21 14:08:42 +03:00
13 changed files with 122 additions and 52 deletions

View File

@ -5,6 +5,8 @@
#include <cstring> #include <cstring>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <sys/stat.h>
#include "core/SharedCommandQueue.h" #include "core/SharedCommandQueue.h"
#include "utils/hash/Hash128.h" #include "utils/hash/Hash128.h"
@ -76,7 +78,15 @@ int main()
return 1; return 1;
} }
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); struct stat statbuf;
if (fstat(shm_fd, &statbuf) == -1)
{
perror("fstat failed");
close(shm_fd);
return 1;
}
void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) if (ptr == MAP_FAILED)
{ {
perror("mmap failed"); perror("mmap failed");
@ -91,12 +101,14 @@ int main()
// PUT Command // PUT Command
// ------------------- // -------------------
{ {
std::cout << "1\n";
size_t head = queue->head().load(std::memory_order_relaxed); size_t head = queue->head().load(std::memory_order_relaxed);
size_t next_head = (head + 1) % queue->capacity(); size_t next_head = (head + 1) % queue->capacity();
while (next_head == queue->tail().load(std::memory_order_acquire)) while (next_head == queue->tail().load(std::memory_order_acquire))
{ {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
std::cout << "2\n";
usub::core::Command* slot = &queue->raw_buffer()[head]; usub::core::Command* slot = &queue->raw_buffer()[head];
@ -104,7 +116,9 @@ int main()
slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов
const char* put_value = "test_value"; const char* put_value = "test_value";
slot->value_size = std::strlen(put_value); slot->value_size = std::strlen(put_value);
std::cout << "3\n";
std::memcpy(slot->value, put_value, slot->value_size); std::memcpy(slot->value, put_value, slot->value_size);
std::cout << "4\n";
slot->response_size = 0; slot->response_size = 0;
slot->response_ready.store(0, std::memory_order_relaxed); slot->response_ready.store(0, std::memory_order_relaxed);

View File

@ -105,6 +105,7 @@ namespace usub::shared_storage
delete old_memtable; delete old_memtable;
this->wal.close(); this->wal.close();
this->flushing.store(false); this->flushing.store(false);
this->version_manager.flush();
} }
template <typename SkipList> template <typename SkipList>
@ -126,6 +127,7 @@ namespace usub::shared_storage
{ {
flush(); flush();
} }
this->version_manager.flush();
} }
template <typename SkipList> template <typename SkipList>

View File

@ -192,4 +192,5 @@ namespace usub::core
std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0; return cmd.response_size != 0;
} }
} }

View File

@ -54,9 +54,9 @@ namespace usub::core
private: private:
size_t capacity_; size_t capacity_;
Command* commands_; Command* commands_;
alignas(64) std::atomic<size_t> head_; std::atomic<size_t> head_;
alignas(64) std::atomic<size_t> tail_; std::atomic<size_t> tail_;
alignas(64) Command buffer_[]; Command buffer_[];
}; };
} }

View File

@ -48,6 +48,7 @@ namespace usub::core
{ {
case OperationType::PUT: case OperationType::PUT:
{ {
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size); std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value; this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value); this->memtable_manager_.put(cmd.key, value);
@ -55,6 +56,7 @@ namespace usub::core
} }
case OperationType::DELETE: case OperationType::DELETE:
{ {
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key); this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key); this->memtable_manager_.remove(cmd.key);
break; break;
@ -103,7 +105,6 @@ namespace usub::core
{ {
case OperationType::PUT: case OperationType::PUT:
{ {
std::cout << "PUT\n";
std::string value(cmd.value, cmd.value_size); std::string value(cmd.value, cmd.value_size);
this->fast_cache_[cmd.key] = value; this->fast_cache_[cmd.key] = value;
this->memtable_manager_.put(cmd.key, value); this->memtable_manager_.put(cmd.key, value);
@ -111,17 +112,15 @@ namespace usub::core
} }
case OperationType::DELETE: case OperationType::DELETE:
{ {
std::cout << "DELETE\n";
this->fast_cache_.erase(cmd.key); this->fast_cache_.erase(cmd.key);
this->memtable_manager_.remove(cmd.key); this->memtable_manager_.remove(cmd.key);
break; break;
} }
case OperationType::FIND: case OperationType::FIND:
{ {
auto it = fast_cache_.find(cmd.key); auto it = this->fast_cache_.find(cmd.key);
if (it != fast_cache_.end()) if (it != this->fast_cache_.end())
{ {
std::cout << "FIND\n";
std::string value = it->second; std::string value = it->second;
cmd.response_size = static_cast<uint32_t>(value.size()); cmd.response_size = static_cast<uint32_t>(value.size());
std::memcpy(cmd.response, value.data(), value.size()); std::memcpy(cmd.response, value.data(), value.size());

View File

@ -189,6 +189,24 @@ namespace usub::utils
} }
[[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; } [[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; }
bool try_push_batch(const T* items, size_t count)
{
size_t head = this->head.load(std::memory_order_relaxed);
size_t tail = this->tail.load(std::memory_order_acquire);
size_t free_slots = (tail + this->capacity - head - 1) % this->capacity;
if (free_slots < count)
return false;
for (size_t i = 0; i < count; ++i)
{
this->buffer[(head + i) % this->capacity] = items[i];
}
this->head.store((head + count) % this->capacity, std::memory_order_release);
return true;
}
}; };
} }

View File

@ -34,6 +34,7 @@ namespace usub::utils
{ {
while (this->running_) while (this->running_)
{ {
// Queue → 0
for (size_t i = 0; i < 8; ++i) for (size_t i = 0; i < 8; ++i)
{ {
auto file = this->l0_queue_.pop(); auto file = this->l0_queue_.pop();
@ -41,7 +42,7 @@ namespace usub::utils
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_); std::lock_guard<std::mutex> lock(this->levels_mutex_);
this->level0_files_.push_back(*file); this->level0_files_.push_back(*file);
this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик this->level0_size_.fetch_add(1, std::memory_order_relaxed);
} }
else else
{ {
@ -49,20 +50,36 @@ namespace usub::utils
} }
} }
bool did_compact = false;
{
std::lock_guard<std::mutex> lock(this->levels_mutex_);
if (this->level0_size_.load(std::memory_order_relaxed) >= 4) if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_);
compact_level(this->level0_files_, this->level1_files_, 0); compact_level(this->level0_files_, this->level1_files_, 0);
did_compact = true;
}
} }
if (level1_size_.load(std::memory_order_relaxed) >= 4) // 1 → 2
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_); std::lock_guard<std::mutex> lock(this->levels_mutex_);
if (this->level1_size_.load(std::memory_order_relaxed) >= 4)
{
compact_level(this->level1_files_, this->level2_files_, 1); compact_level(this->level1_files_, this->level2_files_, 1);
did_compact = true;
}
} }
if (!did_compact)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(50));
} }
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
} }
void Compactor::compact_level(std::vector<std::string>& source_files, void Compactor::compact_level(std::vector<std::string>& source_files,
@ -118,6 +135,8 @@ namespace usub::utils
this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level2_size_.fetch_add(1, std::memory_order_relaxed); this->level2_size_.fetch_add(1, std::memory_order_relaxed);
} }
this->version_manager_.flush();
} }
} // utils } // utils
// usub // usub

View File

@ -19,13 +19,16 @@ namespace usub::utils
{ {
public: public:
explicit Compactor(VersionManager& vm); explicit Compactor(VersionManager& vm);
~Compactor(); ~Compactor();
void add_sstable_l0(const std::string& filename); void add_sstable_l0(const std::string& filename);
void run(); void run();
private: private:
void background_worker(); void background_worker();
void compact_level(std::vector<std::string>& source_files, std::vector<std::string>& dest_files, int level); void compact_level(std::vector<std::string>& source_files, std::vector<std::string>& dest_files, int level);
private: private:

View File

@ -8,24 +8,19 @@
namespace usub::utils namespace usub::utils
{ {
RecoveryLog::RecoveryLog(const std::string& dbname) RecoveryLog::RecoveryLog(const std::string& dbname)
: db_name(dbname), : db_name_(dbname),
metadata_dir("metadata/" + db_name + "/"), metadata_dir_("metadata/" + dbname + "/"),
log_file(metadata_dir + "recovery.log") log_file_(metadata_dir_ + "recovery.log")
{ {
ensure_metadata_dir(); ensure_metadata_dir();
this->log_out.open(this->log_file, std::ios::binary | std::ios::app); this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
if (!this->log_out.is_open()) if (!this->log_out_.is_open())
{ throw std::runtime_error("Failed to open recovery log for " + this->db_name_);
throw std::runtime_error("Failed to open recovery log for " + this->db_name);
}
} }
RecoveryLog::~RecoveryLog() RecoveryLog::~RecoveryLog()
{ {
if (this->log_out.is_open()) if (log_out_.is_open()) log_out_.close();
{
this->log_out.close();
}
} }
void RecoveryLog::log_put(const std::string& key, const std::string& value) void RecoveryLog::log_put(const std::string& key, const std::string& value)
@ -33,48 +28,46 @@ namespace usub::utils
uint8_t op = 0; uint8_t op = 0;
uint32_t key_len = key.size(); uint32_t key_len = key.size();
uint32_t value_len = value.size(); uint32_t value_len = value.size();
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op)); this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len)); this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out.write(key.data(), key_len); this->log_out_.write(key.data(), key_len);
this->log_out.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len)); this->log_out_.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
this->log_out.write(value.data(), value_len); this->log_out_.write(value.data(), value_len);
this->log_out.flush(); this->log_out_.flush();
} }
void RecoveryLog::log_delete(const std::string& key) void RecoveryLog::log_delete(const std::string& key)
{ {
uint8_t op = 1; uint8_t op = 1;
uint32_t key_len = key.size(); uint32_t key_len = key.size();
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op)); this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len)); this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
this->log_out.write(key.data(), key_len); this->log_out_.write(key.data(), key_len);
this->log_out.flush(); this->log_out_.flush();
} }
void RecoveryLog::log_command(const core::Command& cmd) void RecoveryLog::log_command(const core::Command& cmd)
{ {
cmd.serialize(this->log_out); cmd.serialize(this->log_out_);
this->log_out.flush(); this->log_out_.flush();
} }
void RecoveryLog::ensure_metadata_dir() const void RecoveryLog::ensure_metadata_dir() const
{ {
try try
{ {
if (!std::filesystem::exists(this->metadata_dir)) if (!std::filesystem::exists(metadata_dir_)) std::filesystem::create_directories(metadata_dir_);
{
std::filesystem::create_directories(this->metadata_dir);
}
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::cerr << "Failed to create metadata dir: " << e.what() << std::endl; std::cerr << "Failed to create metadata directory: " << e.what() << std::endl;
} }
} }
void RecoveryLog::replay(const std::function<void(const core::Command&)>& callback) const void RecoveryLog::replay(const std::function<void(const core::Command&)>& callback) const
{ {
std::ifstream in(this->log_file, std::ios::binary); std::ifstream in(this->log_file_, std::ios::binary);
if (!in.is_open()) if (!in.is_open())
return; return;
@ -84,5 +77,16 @@ namespace usub::utils
callback(cmd); callback(cmd);
} }
} }
void RecoveryLog::clear()
{
if (this->log_out_.is_open())
this->log_out_.close();
std::filesystem::remove(this->log_file_);
this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
if (!this->log_out_.is_open())
throw std::runtime_error("Failed to reopen recovery log after clear for " + this->db_name_);
}
} // utils } // utils
// usub // usub

View File

@ -29,14 +29,16 @@ namespace usub::utils
void replay(const std::function<void(const core::Command&)>& callback) const; void replay(const std::function<void(const core::Command&)>& callback) const;
void clear();
private: private:
void ensure_metadata_dir() const; void ensure_metadata_dir() const;
private: private:
std::string db_name; std::string db_name_;
std::string metadata_dir; std::string metadata_dir_;
std::string log_file; std::string log_file_;
std::ofstream log_out; std::ofstream log_out_;
}; };
} // utils } // utils
// usub // usub

View File

@ -14,6 +14,7 @@
#include <vector> #include <vector>
#include <utility> #include <utility>
#include <string> #include <string>
#include <cstring>
#include "utils/hash/Hash128.h" #include "utils/hash/Hash128.h"

View File

@ -20,6 +20,11 @@ namespace usub::utils
return this->version.fetch_add(1, std::memory_order_relaxed); return this->version.fetch_add(1, std::memory_order_relaxed);
} }
void VersionManager::flush()
{
save_version();
}
void VersionManager::ensure_metadata_dir() void VersionManager::ensure_metadata_dir()
{ {
try try

View File

@ -26,6 +26,8 @@ namespace usub::utils
uint64_t next_version(); uint64_t next_version();
void flush();
private: private:
void ensure_metadata_dir(); void ensure_metadata_dir();