Compare commits
2 Commits
b03fd39575
...
4bcacdc7dc
Author | SHA1 | Date | |
---|---|---|---|
4bcacdc7dc | |||
199fb1f34b |
16
client.cpp
16
client.cpp
@ -5,6 +5,8 @@
|
||||
#include <cstring>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "core/SharedCommandQueue.h"
|
||||
#include "utils/hash/Hash128.h"
|
||||
|
||||
@ -76,7 +78,15 @@ int main()
|
||||
return 1;
|
||||
}
|
||||
|
||||
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
|
||||
struct stat statbuf;
|
||||
if (fstat(shm_fd, &statbuf) == -1)
|
||||
{
|
||||
perror("fstat failed");
|
||||
close(shm_fd);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
|
||||
if (ptr == MAP_FAILED)
|
||||
{
|
||||
perror("mmap failed");
|
||||
@ -91,12 +101,14 @@ int main()
|
||||
// PUT Command
|
||||
// -------------------
|
||||
{
|
||||
std::cout << "1\n";
|
||||
size_t head = queue->head().load(std::memory_order_relaxed);
|
||||
size_t next_head = (head + 1) % queue->capacity();
|
||||
while (next_head == queue->tail().load(std::memory_order_acquire))
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
}
|
||||
std::cout << "2\n";
|
||||
|
||||
usub::core::Command* slot = &queue->raw_buffer()[head];
|
||||
|
||||
@ -104,7 +116,9 @@ int main()
|
||||
slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов
|
||||
const char* put_value = "test_value";
|
||||
slot->value_size = std::strlen(put_value);
|
||||
std::cout << "3\n";
|
||||
std::memcpy(slot->value, put_value, slot->value_size);
|
||||
std::cout << "4\n";
|
||||
slot->response_size = 0;
|
||||
slot->response_ready.store(0, std::memory_order_relaxed);
|
||||
|
||||
|
@ -105,6 +105,7 @@ namespace usub::shared_storage
|
||||
delete old_memtable;
|
||||
this->wal.close();
|
||||
this->flushing.store(false);
|
||||
this->version_manager.flush();
|
||||
}
|
||||
|
||||
template <typename SkipList>
|
||||
@ -126,6 +127,7 @@ namespace usub::shared_storage
|
||||
{
|
||||
flush();
|
||||
}
|
||||
this->version_manager.flush();
|
||||
}
|
||||
|
||||
template <typename SkipList>
|
||||
|
@ -192,4 +192,5 @@ namespace usub::core
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
return cmd.response_size != 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -54,9 +54,9 @@ namespace usub::core
|
||||
private:
|
||||
size_t capacity_;
|
||||
Command* commands_;
|
||||
alignas(64) std::atomic<size_t> head_;
|
||||
alignas(64) std::atomic<size_t> tail_;
|
||||
alignas(64) Command buffer_[];
|
||||
std::atomic<size_t> head_;
|
||||
std::atomic<size_t> tail_;
|
||||
Command buffer_[];
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,7 @@ namespace usub::core
|
||||
{
|
||||
case OperationType::PUT:
|
||||
{
|
||||
std::cout << "PUT\n";
|
||||
std::string value(cmd.value, cmd.value_size);
|
||||
this->fast_cache_[cmd.key] = value;
|
||||
this->memtable_manager_.put(cmd.key, value);
|
||||
@ -55,6 +56,7 @@ namespace usub::core
|
||||
}
|
||||
case OperationType::DELETE:
|
||||
{
|
||||
std::cout << "DELETE\n";
|
||||
this->fast_cache_.erase(cmd.key);
|
||||
this->memtable_manager_.remove(cmd.key);
|
||||
break;
|
||||
@ -103,7 +105,6 @@ namespace usub::core
|
||||
{
|
||||
case OperationType::PUT:
|
||||
{
|
||||
std::cout << "PUT\n";
|
||||
std::string value(cmd.value, cmd.value_size);
|
||||
this->fast_cache_[cmd.key] = value;
|
||||
this->memtable_manager_.put(cmd.key, value);
|
||||
@ -111,17 +112,15 @@ namespace usub::core
|
||||
}
|
||||
case OperationType::DELETE:
|
||||
{
|
||||
std::cout << "DELETE\n";
|
||||
this->fast_cache_.erase(cmd.key);
|
||||
this->memtable_manager_.remove(cmd.key);
|
||||
break;
|
||||
}
|
||||
case OperationType::FIND:
|
||||
{
|
||||
auto it = fast_cache_.find(cmd.key);
|
||||
if (it != fast_cache_.end())
|
||||
auto it = this->fast_cache_.find(cmd.key);
|
||||
if (it != this->fast_cache_.end())
|
||||
{
|
||||
std::cout << "FIND\n";
|
||||
std::string value = it->second;
|
||||
cmd.response_size = static_cast<uint32_t>(value.size());
|
||||
std::memcpy(cmd.response, value.data(), value.size());
|
||||
|
@ -189,6 +189,24 @@ namespace usub::utils
|
||||
}
|
||||
|
||||
[[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; }
|
||||
|
||||
bool try_push_batch(const T* items, size_t count)
|
||||
{
|
||||
size_t head = this->head.load(std::memory_order_relaxed);
|
||||
size_t tail = this->tail.load(std::memory_order_acquire);
|
||||
|
||||
size_t free_slots = (tail + this->capacity - head - 1) % this->capacity;
|
||||
if (free_slots < count)
|
||||
return false;
|
||||
|
||||
for (size_t i = 0; i < count; ++i)
|
||||
{
|
||||
this->buffer[(head + i) % this->capacity] = items[i];
|
||||
}
|
||||
|
||||
this->head.store((head + count) % this->capacity, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ namespace usub::utils
|
||||
{
|
||||
while (this->running_)
|
||||
{
|
||||
// Queue → 0
|
||||
for (size_t i = 0; i < 8; ++i)
|
||||
{
|
||||
auto file = this->l0_queue_.pop();
|
||||
@ -41,7 +42,7 @@ namespace usub::utils
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->levels_mutex_);
|
||||
this->level0_files_.push_back(*file);
|
||||
this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик
|
||||
this->level0_size_.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -49,19 +50,35 @@ namespace usub::utils
|
||||
}
|
||||
}
|
||||
|
||||
if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
|
||||
bool did_compact = false;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->levels_mutex_);
|
||||
compact_level(this->level0_files_, this->level1_files_, 0);
|
||||
if (this->level0_size_.load(std::memory_order_relaxed) >= 4)
|
||||
{
|
||||
compact_level(this->level0_files_, this->level1_files_, 0);
|
||||
did_compact = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (level1_size_.load(std::memory_order_relaxed) >= 4)
|
||||
// 1 → 2
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->levels_mutex_);
|
||||
compact_level(this->level1_files_, this->level2_files_, 1);
|
||||
if (this->level1_size_.load(std::memory_order_relaxed) >= 4)
|
||||
{
|
||||
compact_level(this->level1_files_, this->level2_files_, 1);
|
||||
did_compact = true;
|
||||
}
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
if (!did_compact)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
else
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,6 +135,8 @@ namespace usub::utils
|
||||
this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
|
||||
this->level2_size_.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
this->version_manager_.flush();
|
||||
}
|
||||
} // utils
|
||||
// usub
|
||||
|
@ -19,13 +19,16 @@ namespace usub::utils
|
||||
{
|
||||
public:
|
||||
explicit Compactor(VersionManager& vm);
|
||||
|
||||
~Compactor();
|
||||
|
||||
void add_sstable_l0(const std::string& filename);
|
||||
|
||||
void run();
|
||||
|
||||
private:
|
||||
void background_worker();
|
||||
|
||||
void compact_level(std::vector<std::string>& source_files, std::vector<std::string>& dest_files, int level);
|
||||
|
||||
private:
|
||||
|
@ -8,24 +8,19 @@
|
||||
namespace usub::utils
|
||||
{
|
||||
RecoveryLog::RecoveryLog(const std::string& dbname)
|
||||
: db_name(dbname),
|
||||
metadata_dir("metadata/" + db_name + "/"),
|
||||
log_file(metadata_dir + "recovery.log")
|
||||
: db_name_(dbname),
|
||||
metadata_dir_("metadata/" + dbname + "/"),
|
||||
log_file_(metadata_dir_ + "recovery.log")
|
||||
{
|
||||
ensure_metadata_dir();
|
||||
this->log_out.open(this->log_file, std::ios::binary | std::ios::app);
|
||||
if (!this->log_out.is_open())
|
||||
{
|
||||
throw std::runtime_error("Failed to open recovery log for " + this->db_name);
|
||||
}
|
||||
this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
|
||||
if (!this->log_out_.is_open())
|
||||
throw std::runtime_error("Failed to open recovery log for " + this->db_name_);
|
||||
}
|
||||
|
||||
RecoveryLog::~RecoveryLog()
|
||||
{
|
||||
if (this->log_out.is_open())
|
||||
{
|
||||
this->log_out.close();
|
||||
}
|
||||
if (log_out_.is_open()) log_out_.close();
|
||||
}
|
||||
|
||||
void RecoveryLog::log_put(const std::string& key, const std::string& value)
|
||||
@ -33,48 +28,46 @@ namespace usub::utils
|
||||
uint8_t op = 0;
|
||||
uint32_t key_len = key.size();
|
||||
uint32_t value_len = value.size();
|
||||
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op));
|
||||
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
|
||||
this->log_out.write(key.data(), key_len);
|
||||
this->log_out.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
|
||||
this->log_out.write(value.data(), value_len);
|
||||
this->log_out.flush();
|
||||
this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
|
||||
this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
|
||||
this->log_out_.write(key.data(), key_len);
|
||||
this->log_out_.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
|
||||
this->log_out_.write(value.data(), value_len);
|
||||
this->log_out_.flush();
|
||||
}
|
||||
|
||||
void RecoveryLog::log_delete(const std::string& key)
|
||||
{
|
||||
uint8_t op = 1;
|
||||
uint32_t key_len = key.size();
|
||||
this->log_out.write(reinterpret_cast<const char*>(&op), sizeof(op));
|
||||
this->log_out.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
|
||||
this->log_out.write(key.data(), key_len);
|
||||
this->log_out.flush();
|
||||
this->log_out_.write(reinterpret_cast<const char*>(&op), sizeof(op));
|
||||
this->log_out_.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
|
||||
this->log_out_.write(key.data(), key_len);
|
||||
this->log_out_.flush();
|
||||
}
|
||||
|
||||
void RecoveryLog::log_command(const core::Command& cmd)
|
||||
{
|
||||
cmd.serialize(this->log_out);
|
||||
this->log_out.flush();
|
||||
cmd.serialize(this->log_out_);
|
||||
this->log_out_.flush();
|
||||
}
|
||||
|
||||
void RecoveryLog::ensure_metadata_dir() const
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!std::filesystem::exists(this->metadata_dir))
|
||||
{
|
||||
std::filesystem::create_directories(this->metadata_dir);
|
||||
}
|
||||
if (!std::filesystem::exists(metadata_dir_)) std::filesystem::create_directories(metadata_dir_);
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
std::cerr << "Failed to create metadata dir: " << e.what() << std::endl;
|
||||
std::cerr << "Failed to create metadata directory: " << e.what() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void RecoveryLog::replay(const std::function<void(const core::Command&)>& callback) const
|
||||
{
|
||||
std::ifstream in(this->log_file, std::ios::binary);
|
||||
std::ifstream in(this->log_file_, std::ios::binary);
|
||||
if (!in.is_open())
|
||||
return;
|
||||
|
||||
@ -84,5 +77,16 @@ namespace usub::utils
|
||||
callback(cmd);
|
||||
}
|
||||
}
|
||||
void RecoveryLog::clear()
|
||||
{
|
||||
if (this->log_out_.is_open())
|
||||
this->log_out_.close();
|
||||
|
||||
std::filesystem::remove(this->log_file_);
|
||||
|
||||
this->log_out_.open(this->log_file_, std::ios::binary | std::ios::app);
|
||||
if (!this->log_out_.is_open())
|
||||
throw std::runtime_error("Failed to reopen recovery log after clear for " + this->db_name_);
|
||||
}
|
||||
} // utils
|
||||
// usub
|
||||
|
@ -29,14 +29,16 @@ namespace usub::utils
|
||||
|
||||
void replay(const std::function<void(const core::Command&)>& callback) const;
|
||||
|
||||
void clear();
|
||||
|
||||
private:
|
||||
void ensure_metadata_dir() const;
|
||||
|
||||
private:
|
||||
std::string db_name;
|
||||
std::string metadata_dir;
|
||||
std::string log_file;
|
||||
std::ofstream log_out;
|
||||
std::string db_name_;
|
||||
std::string metadata_dir_;
|
||||
std::string log_file_;
|
||||
std::ofstream log_out_;
|
||||
};
|
||||
} // utils
|
||||
// usub
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <vector>
|
||||
#include <utility>
|
||||
#include <string>
|
||||
#include <cstring>
|
||||
|
||||
#include "utils/hash/Hash128.h"
|
||||
|
||||
|
@ -7,9 +7,9 @@
|
||||
namespace usub::utils
|
||||
{
|
||||
VersionManager::VersionManager(const std::string& dbname)
|
||||
: db_name(dbname),
|
||||
metadata_dir("metadata/" + this->db_name + "/"),
|
||||
version_file(this->metadata_dir + "version.meta")
|
||||
: db_name(dbname),
|
||||
metadata_dir("metadata/" + this->db_name + "/"),
|
||||
version_file(this->metadata_dir + "version.meta")
|
||||
{
|
||||
ensure_metadata_dir();
|
||||
load_version();
|
||||
@ -20,6 +20,11 @@ namespace usub::utils
|
||||
return this->version.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void VersionManager::flush()
|
||||
{
|
||||
save_version();
|
||||
}
|
||||
|
||||
void VersionManager::ensure_metadata_dir()
|
||||
{
|
||||
try
|
||||
|
@ -26,6 +26,8 @@ namespace usub::utils
|
||||
|
||||
uint64_t next_version();
|
||||
|
||||
void flush();
|
||||
|
||||
private:
|
||||
void ensure_metadata_dir();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user