diff --git a/core/Command.cpp b/core/Command.cpp index d793bf3..94437b7 100644 --- a/core/Command.cpp +++ b/core/Command.cpp @@ -6,78 +6,43 @@ namespace usub::core { - // Command::Command() : op(OperationType::UNKNOWN), value_size(0), response_ready(0), response_size(0) - // { - // std::memset(value, 0, sizeof(value)); - // std::memset(response, 0, sizeof(response)); - // } - // - // Command::Command(const Command& other) - // : op(other.op), - // key(other.key), - // value_size(0), - // response_ready(false), - // response_size(0) - // { - // if (other.value_size > 0 && other.value_size <= sizeof(value)) - // { - // value_size = other.value_size; - // std::memcpy(value, other.value, value_size); - // } - // if (other.response_size > 0 && other.response_size <= sizeof(response)) - // { - // response_size = other.response_size; - // std::memcpy(response, other.response, response_size); - // } - // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - // } - // - // - // Command::Command(Command&& other) noexcept - // : op(other.op), - // key(other.key), - // value_size(other.value_size), - // response_ready(other.response_ready.load(std::memory_order_relaxed)), - // response_size(other.response_size) - // { - // if (value_size > 0) - // std::memcpy(value, other.value, value_size); - // if (response_size > 0) - // std::memcpy(response, other.response, response_size); - // } - // - // Command& Command::operator=(const Command& other) - // { - // if (this != &other) - // { - // op = other.op; - // key = other.key; - // value_size = other.value_size; - // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - // response_size = other.response_size; - // - // if (value_size > 0) - // std::memcpy(value, other.value, value_size); - // if (response_size > 0) - // std::memcpy(response, other.response, response_size); - // } - // return *this; - // } - // - // Command& Command::operator=(Command&& other) noexcept - // { - // if (this != &other) - // { - // op = other.op; - // key = other.key; - // value_size = other.value_size; - // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - // response_size = other.response_size; - // if (value_size > 0) - // std::memcpy(value, other.value, value_size); - // if (response_size > 0) - // std::memcpy(response, other.response, response_size); - // } - // return *this; - // } + Command::Command() + { + ready.store(0, std::memory_order_relaxed); + response_ready.store(0, std::memory_order_relaxed); + op = OperationType::PUT; + key = utils::Hash128{}; + value_size = 0; + response_size = 0; + } + + Command::Command(const Command& other) + { + ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + op = other.op; + key = other.key; + value_size = other.value_size; + std::memcpy(value, other.value, other.value_size); + + response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + response_size = other.response_size; + std::memcpy(response, other.response, other.response_size); + } + + Command& Command::operator=(const Command& other) + { + if (this != &other) + { + ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + op = other.op; + key = other.key; + value_size = other.value_size; + std::memcpy(value, other.value, other.value_size); + + response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); + response_size = other.response_size; + std::memcpy(response, other.response, other.response_size); + } + return *this; + } } diff --git a/core/Command.h b/core/Command.h index 12bb774..f35bf8f 100644 --- a/core/Command.h +++ b/core/Command.h @@ -21,6 +21,12 @@ namespace usub::core struct Command { + Command(); + + Command(const Command& other); + + Command& operator=(const Command& other); + std::atomic ready; OperationType op; utils::Hash128 key; @@ -30,46 +36,6 @@ namespace usub::core std::atomic response_ready; uint32_t response_size; char response[1024]; - - Command() - { - ready.store(0, std::memory_order_relaxed); - response_ready.store(0, std::memory_order_relaxed); - op = OperationType::PUT; - key = utils::Hash128{}; - value_size = 0; - response_size = 0; - } - - Command(const Command& other) - { - ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - op = other.op; - key = other.key; - value_size = other.value_size; - std::memcpy(value, other.value, other.value_size); - - response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - response_size = other.response_size; - std::memcpy(response, other.response, other.response_size); - } - - Command& operator=(const Command& other) - { - if (this != &other) - { - ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - op = other.op; - key = other.key; - value_size = other.value_size; - std::memcpy(value, other.value, other.value_size); - - response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); - response_size = other.response_size; - std::memcpy(response, other.response, other.response_size); - } - return *this; - } }; } diff --git a/core/DatabaseManager.cpp b/core/DatabaseManager.cpp index 30e5a0e..0c5b75b 100644 --- a/core/DatabaseManager.cpp +++ b/core/DatabaseManager.cpp @@ -36,13 +36,13 @@ namespace usub::core } bool create_new = !SharedMemoryManager::exists("shm_" + db_name); auto udb = std::make_unique(db_name, "shm_" + db_name, 1024, 1024 * 1024, create_new); - databases_.push_back(DatabaseInstance{std::move(udb), {}}); + this->databases_.push_back(DatabaseInstance{std::move(udb), {}}); } } void DatabaseManager::run_all() { - for (auto& db : databases_) + for (auto& db : this->databases_) { db.worker = std::thread([&db]() { @@ -50,7 +50,7 @@ namespace usub::core }); } - for (auto& db : databases_) + for (auto& db : this->databases_) { if (db.worker.joinable()) db.worker.join(); diff --git a/core/Memtable.h b/core/Memtable.h index bc68c31..c8d6a0a 100644 --- a/core/Memtable.h +++ b/core/Memtable.h @@ -66,8 +66,8 @@ namespace usub::shared_storage const typename SkipList::value_type& value) { { - std::lock_guard lock(batch_mutex); - write_batch.emplace_back(key, value); + std::lock_guard lock(this->batch_mutex); + this->write_batch.emplace_back(key, value); } if (estimate_batch_size() >= 64) { @@ -110,17 +110,17 @@ namespace usub::shared_storage { std::vector> local_batch; { - std::lock_guard lock(batch_mutex); - local_batch.swap(write_batch); + std::lock_guard lock(this->batch_mutex); + local_batch.swap(this->write_batch); } for (const auto& [key, value] : local_batch) { - wal.write_put(key, value); - active_memtable.load()->insert(key, value); + this->wal.write_put(key, value); + this->active_memtable.load()->insert(key, value); } - if (estimate_memtable_size() > max_memtable_size) + if (estimate_memtable_size() > this->max_memtable_size) { flush(); } @@ -136,7 +136,7 @@ namespace usub::shared_storage template size_t MemTableManager::estimate_batch_size() const { - return write_batch.size(); + return this->write_batch.size(); } } // shared_storage // usub diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index 8004966..44785fd 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -9,27 +9,27 @@ namespace usub::core SharedCommandQueue::SharedCommandQueue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { - for (size_t i = 0; i < capacity_; ++i) - new(&buffer_[i]) Command(); + for (size_t i = 0; i < this->capacity_; ++i) + new(&this->buffer_[i]) Command(); } SharedCommandQueue::~SharedCommandQueue() { - for (size_t i = 0; i < capacity_; ++i) + for (size_t i = 0; i < this->capacity_; ++i) { - buffer_[i].~Command(); + this->buffer_[i].~Command(); } } bool SharedCommandQueue::try_push(const Command& cmd) { - size_t head = head_.load(std::memory_order_relaxed); - size_t next_head = (head + 1) % capacity_; + size_t head = this->head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % this->capacity_; - if (next_head == tail_.load(std::memory_order_acquire)) + if (next_head == this->tail_.load(std::memory_order_acquire)) return false; // Очередь полна - Command& slot = buffer_[head]; + Command& slot = this->buffer_[head]; slot.ready.store(0, std::memory_order_relaxed); slot.op = cmd.op; slot.key = cmd.key; @@ -37,25 +37,25 @@ namespace usub::core std::memcpy(slot.value, cmd.value, cmd.value_size); slot.response_size = 0; slot.response_ready.store(0, std::memory_order_relaxed); - head_.store(next_head, std::memory_order_release); + this->head_.store(next_head, std::memory_order_release); return true; } bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) { - size_t head = head_.load(std::memory_order_relaxed); - size_t tail = tail_.load(std::memory_order_acquire); + size_t head = this->head_.load(std::memory_order_relaxed); + size_t tail = this->tail_.load(std::memory_order_acquire); - size_t free_slots = (tail + capacity_ - head) % capacity_; + size_t free_slots = (tail + this->capacity_ - head) % this->capacity_; if (free_slots < count) return false; for (size_t i = 0; i < count; ++i) { - Command& slot = buffer_[(head + i) % capacity_]; + Command& slot = this->buffer_[(head + i) % this->capacity_]; slot = cmds[i]; } - head_.store((head + count) % capacity_, std::memory_order_release); + this->head_.store((head + count) % this->capacity_, std::memory_order_release); return true; } @@ -66,32 +66,32 @@ namespace usub::core std::optional SharedCommandQueue::try_pop() { - size_t tail = tail_.load(std::memory_order_relaxed); + size_t tail = this->tail_.load(std::memory_order_relaxed); - if (tail == head_.load(std::memory_order_acquire)) + if (tail == this->head_.load(std::memory_order_acquire)) { return std::nullopt; } - Command& slot = buffer_[tail]; + Command& slot = this->buffer_[tail]; Command cmd = slot; - tail_.store((tail + 1) % capacity_, std::memory_order_release); + this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); return cmd; } size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) { - size_t tail = tail_.load(std::memory_order_relaxed); - size_t head = head_.load(std::memory_order_acquire); + size_t tail = this->tail_.load(std::memory_order_relaxed); + size_t head = this->head_.load(std::memory_order_acquire); - size_t available = (head + capacity_ - tail) % capacity_; + size_t available = (head + this->capacity_ - tail) % this->capacity_; size_t to_pop = (available < max_count) ? available : max_count; size_t copied = 0; for (size_t i = 0; i < to_pop; ++i) { - Command& src = buffer_[(tail + i) % capacity_]; + Command& src = this->buffer_[(tail + i) % this->capacity_]; if (src.ready.load(std::memory_order_acquire) == 0) break; @@ -99,31 +99,97 @@ namespace usub::core out[copied++] = src; } - tail_.store((tail + copied) % capacity_, std::memory_order_release); + this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release); return copied; } Command* SharedCommandQueue::peek(size_t index) { - size_t tail = tail_.load(std::memory_order_relaxed); - size_t head = head_.load(std::memory_order_acquire); + size_t tail = this->tail_.load(std::memory_order_relaxed); + size_t head = this->head_.load(std::memory_order_acquire); if (tail == head) return nullptr; - return &buffer_[tail % capacity_]; + return &this->buffer_[tail % this->capacity_]; } void SharedCommandQueue::pop() { - size_t tail = tail_.load(std::memory_order_relaxed); - tail_.store((tail + 1) % capacity_, std::memory_order_release); + size_t tail = this->tail_.load(std::memory_order_relaxed); + this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); } size_t SharedCommandQueue::pending_count() const { - size_t head = head_.load(std::memory_order_acquire); - size_t tail = tail_.load(std::memory_order_relaxed); - return (head + capacity_ - tail) % capacity_; + size_t head = this->head_.load(std::memory_order_acquire); + size_t tail = this->tail_.load(std::memory_order_relaxed); + return (head + this->capacity_ - tail) % this->capacity_; + } + + Command* SharedCommandQueue::raw_buffer() noexcept + { + return this->buffer_; + } + + size_t SharedCommandQueue::capacity() const noexcept + { + return this->capacity_; + } + + std::atomic& SharedCommandQueue::head() noexcept + { + return this->head_; + } + + std::atomic& SharedCommandQueue::tail() noexcept + { + return this->tail_; + } + + bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value) + { + size_t head = this->head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % this->capacity_; + if (next_head == this->tail_.load(std::memory_order_acquire)) + return false; + + Command& slot = this->buffer_[head]; + slot.op = OperationType::PUT; + slot.key = key; + slot.value_size = static_cast(value.size()); + std::memcpy(slot.value, value.data(), value.size()); + slot.response_size = 0; + slot.response_ready.store(0, std::memory_order_relaxed); + slot.ready.store(1, std::memory_order_release); + + this->head_.store(next_head, std::memory_order_release); + return true; + } + + bool SharedCommandQueue::enqueue_find(const utils::Hash128& key) + { + size_t head = this->head_.load(std::memory_order_relaxed); + size_t next_head = (head + 1) % capacity_; + if (next_head == this->tail_.load(std::memory_order_acquire)) + return false; + + Command& slot = buffer_[head]; + slot.op = OperationType::FIND; + slot.key = key; + slot.value_size = 0; + slot.response_size = 0; + slot.response_ready.store(0, std::memory_order_relaxed); + slot.ready.store(1, std::memory_order_release); + + this->head_.store(next_head, std::memory_order_release); + return true; + } + + bool SharedCommandQueue::await_response(Command& cmd) + { + while (cmd.response_ready.load(std::memory_order_acquire) == 0) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return cmd.response_size != 0; } } diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index 252cf04..ccf594b 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -37,56 +37,19 @@ namespace usub::core size_t pending_count() const; - Command* raw_buffer() noexcept { return buffer_; } - size_t capacity() const noexcept { return capacity_; } - std::atomic& head() noexcept { return head_; } - std::atomic& tail() noexcept { return tail_; } + Command* raw_buffer() noexcept; - bool enqueue_put(const usub::utils::Hash128& key, const std::string& value) - { - size_t head = head_.load(std::memory_order_relaxed); - size_t next_head = (head + 1) % capacity_; - if (next_head == tail_.load(std::memory_order_acquire)) - return false; + size_t capacity() const noexcept; - Command& slot = buffer_[head]; - slot.op = OperationType::PUT; - slot.key = key; - slot.value_size = static_cast(value.size()); - std::memcpy(slot.value, value.data(), value.size()); - slot.response_size = 0; - slot.response_ready.store(0, std::memory_order_relaxed); - slot.ready.store(1, std::memory_order_release); + std::atomic& head() noexcept; - head_.store(next_head, std::memory_order_release); - return true; - } + std::atomic& tail() noexcept; - bool enqueue_find(const usub::utils::Hash128& key) - { - size_t head = head_.load(std::memory_order_relaxed); - size_t next_head = (head + 1) % capacity_; - if (next_head == tail_.load(std::memory_order_acquire)) - return false; + bool enqueue_put(const usub::utils::Hash128& key, const std::string& value); - Command& slot = buffer_[head]; - slot.op = OperationType::FIND; - slot.key = key; - slot.value_size = 0; - slot.response_size = 0; - slot.response_ready.store(0, std::memory_order_relaxed); - slot.ready.store(1, std::memory_order_release); + bool enqueue_find(const usub::utils::Hash128& key); - head_.store(next_head, std::memory_order_release); - return true; - } - - bool await_response(Command& cmd) - { - while (cmd.response_ready.load(std::memory_order_acquire) == 0) - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - return cmd.response_size != 0; - } + bool await_response(Command& cmd); private: size_t capacity_; diff --git a/core/UDB.cpp b/core/UDB.cpp index 060a086..05cab13 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -41,7 +41,7 @@ namespace usub::core void UDB::recover_from_logs() { - utils::RecoveryLog recovery_log(db_name_); + utils::RecoveryLog recovery_log(this->db_name_); recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone) { diff --git a/main.cpp b/main.cpp index 009f9e2..26bcf33 100644 --- a/main.cpp +++ b/main.cpp @@ -9,6 +9,7 @@ using namespace usub::utils; +#if 0 #include "utils/io/VersionManager.h" void test_skiplist_basic() @@ -76,20 +77,10 @@ void test_sstable_write_read() std::cout << "SSTable write/read test passed.\n"; } - -#include "utils/datastructures/LFCircullarBuffer.h" - -#include "core/SharedMemoryManager.h" -#include "core/SharedCommandQueue.h" -#include "utils/datastructures/LFCircullarBuffer.h" -#include "core/Command.h" -#include "utils/hash/Hash128.h" -#include -#include "utils/string/basic_utils.h" +#endif #include "core/DatabaseManager.h" - int main() { #if 0 diff --git a/utils/datastructures/LFCircullarBuffer.h b/utils/datastructures/LFCircullarBuffer.h index 50aa783..0281d24 100644 --- a/utils/datastructures/LFCircullarBuffer.h +++ b/utils/datastructures/LFCircullarBuffer.h @@ -66,13 +66,13 @@ namespace usub::utils explicit LockFreeRingBuffer(size_t cap = 32) : capacity(cap), mask(cap - 1), buffer(new Cell[cap]) { - if ((capacity & (capacity - 1)) != 0) + if ((this->capacity & (this->capacity - 1)) != 0) { throw std::invalid_argument("Capacity must be a power of 2"); } - for (size_t i = 0; i < capacity; ++i) + for (size_t i = 0; i < this->capacity; ++i) { - buffer[i].sequence.store(i, std::memory_order_relaxed); + this->buffer[i].sequence.store(i, std::memory_order_relaxed); } } @@ -81,7 +81,7 @@ namespace usub::utils while (pop()) { } - delete[] buffer; + delete[] this->buffer; } template @@ -188,7 +188,7 @@ namespace usub::utils return (h - t) & mask; } - [[nodiscard]] size_t get_capacity() const noexcept { return capacity; } + [[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; } }; } diff --git a/utils/datastructures/LFSkipList.h b/utils/datastructures/LFSkipList.h index 1de66b7..d27f9d0 100644 --- a/utils/datastructures/LFSkipList.h +++ b/utils/datastructures/LFSkipList.h @@ -31,6 +31,7 @@ namespace usub::utils template class LFSkipList { + private: struct Node { Key key; @@ -45,7 +46,7 @@ namespace usub::utils : key(k), value(v), topLevel(level), is_tombstone(tombstone), version(ver) { for (int i = 0; i < MAX_LEVEL; ++i) - next[i].store(nullptr, std::memory_order_relaxed); + this->next[i].store(nullptr, std::memory_order_relaxed); } }; @@ -59,12 +60,13 @@ namespace usub::utils LFSkipList(usub::utils::VersionManager& vm) : version_manager(vm) { - head = new Node(std::numeric_limits::min(), Value{}, MAX_LEVEL, false, version_manager.next_version()); + this->head = new Node(std::numeric_limits::min(), Value{}, MAX_LEVEL, false, + this->version_manager.next_version()); } ~LFSkipList() { - Node* curr = head; + Node* curr = this->head; while (curr) { Node* next = next_node(curr->next[0].load(std::memory_order_relaxed)); @@ -86,7 +88,7 @@ namespace usub::utils std::optional find(const Key& key) const { Node* best = nullptr; - Node* node = head->next[0].load(std::memory_order_acquire); + Node* node = this->head->next[0].load(std::memory_order_acquire); while (node) { @@ -109,7 +111,7 @@ namespace usub::utils template void for_each(F&& func) const { - Node* node = head->next[0].load(std::memory_order_acquire); + Node* node = this->head->next[0].load(std::memory_order_acquire); while (node) { prefetch_for_read(node); @@ -124,7 +126,7 @@ namespace usub::utils template void for_each_raw(F&& func) const { - Node* node = head->next[0].load(std::memory_order_acquire); + Node* node = this->head->next[0].load(std::memory_order_acquire); while (node) { prefetch_for_read(node); @@ -178,7 +180,7 @@ namespace usub::utils [[nodiscard]] size_t unsafe_size() const { size_t count = 0; - Node* node = head->next[0].load(std::memory_order_relaxed); + Node* node = this->head->next[0].load(std::memory_order_relaxed); while (node) { if (!node->marked.load(std::memory_order_relaxed) && !node->is_tombstone) @@ -199,7 +201,7 @@ namespace usub::utils bool found = find_internal(key, preds, succs); int topLevel = random_level(); - Node* newNode = new Node(key, value, topLevel, tombstone, version_manager.next_version()); + Node* newNode = new Node(key, value, topLevel, tombstone, this->version_manager.next_version()); for (int i = 0; i < topLevel; ++i) newNode->next[i].store(succs[i], std::memory_order_relaxed); @@ -231,7 +233,7 @@ namespace usub::utils bool find_internal(const Key& key, Node** preds, Node** succs) const { bool found = false; - Node* pred = head; + Node* pred = this->head; for (int level = MAX_LEVEL - 1; level >= 0; --level) { diff --git a/utils/io/Compactor.cpp b/utils/io/Compactor.cpp index f4f0714..969bcbb 100644 --- a/utils/io/Compactor.cpp +++ b/utils/io/Compactor.cpp @@ -15,33 +15,33 @@ namespace usub::utils Compactor::~Compactor() { - running_ = false; - if (worker_thread_.joinable()) - worker_thread_.join(); + this->running_ = false; + if (this->worker_thread_.joinable()) + this->worker_thread_.join(); } void Compactor::add_sstable_l0(const std::string& filename) { - l0_queue_.push(filename); + this->l0_queue_.push(filename); } void Compactor::run() { - worker_thread_ = std::thread(&Compactor::background_worker, this); + this->worker_thread_ = std::thread(&Compactor::background_worker, this); } void Compactor::background_worker() { - while (running_) + while (this->running_) { for (size_t i = 0; i < 8; ++i) { - auto file = l0_queue_.pop(); + auto file = this->l0_queue_.pop(); if (file) { - std::lock_guard lock(levels_mutex_); - level0_files_.push_back(*file); - level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик + std::lock_guard lock(this->levels_mutex_); + this->level0_files_.push_back(*file); + this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик } else { @@ -49,16 +49,16 @@ namespace usub::utils } } - if (level0_size_.load(std::memory_order_relaxed) >= 4) + if (this->level0_size_.load(std::memory_order_relaxed) >= 4) { - std::lock_guard lock(levels_mutex_); - compact_level(level0_files_, level1_files_, 0); + std::lock_guard lock(this->levels_mutex_); + compact_level(this->level0_files_, this->level1_files_, 0); } if (level1_size_.load(std::memory_order_relaxed) >= 4) { - std::lock_guard lock(levels_mutex_); - compact_level(level1_files_, level2_files_, 1); + std::lock_guard lock(this->levels_mutex_); + compact_level(this->level1_files_, this->level2_files_, 1); } std::this_thread::sleep_for(std::chrono::milliseconds(50)); @@ -87,7 +87,7 @@ namespace usub::utils std::filesystem::remove(file); } - LFSkipList merged(version_manager_); + LFSkipList merged(this->version_manager_); for (auto& table : loaded) { @@ -101,7 +101,7 @@ namespace usub::utils } std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" + - std::to_string(version_manager_.next_version()) + ".dat"; + std::to_string(this->version_manager_.next_version()) + ".dat"; write_sstable_with_index(merged, new_filename); dest_files.push_back(new_filename); @@ -110,13 +110,13 @@ namespace usub::utils if (level == 0) { - level0_size_.fetch_sub(batch_size, std::memory_order_relaxed); - level1_size_.fetch_add(1, std::memory_order_relaxed); + this->level0_size_.fetch_sub(batch_size, std::memory_order_relaxed); + this->level1_size_.fetch_add(1, std::memory_order_relaxed); } else if (level == 1) { - level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); - level2_size_.fetch_add(1, std::memory_order_relaxed); + this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); + this->level2_size_.fetch_add(1, std::memory_order_relaxed); } } } // utils diff --git a/utils/io/VersionManager.cpp b/utils/io/VersionManager.cpp index 12167ac..f6f2253 100644 --- a/utils/io/VersionManager.cpp +++ b/utils/io/VersionManager.cpp @@ -8,8 +8,8 @@ namespace usub::utils { VersionManager::VersionManager(const std::string& dbname) : db_name(dbname), - metadata_dir("metadata/" + db_name + "/"), - version_file(metadata_dir + "version.meta") + metadata_dir("metadata/" + this->db_name + "/"), + version_file(this->metadata_dir + "version.meta") { ensure_metadata_dir(); load_version();