From 5d0d64f895e56b941abc89f9875710ef8014b7d2 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Sun, 27 Apr 2025 16:32:10 +0000 Subject: [PATCH] fixed operations --- client.cpp | 15 +++++++-------- core/SharedCommandQueue.cpp | 25 ++++++++++++++++--------- core/SharedCommandQueue.h | 3 +-- core/SharedMemoryManager.cpp | 2 +- core/UDB.cpp | 10 +++------- main.cpp | 15 +++++++-------- 6 files changed, 35 insertions(+), 35 deletions(-) diff --git a/client.cpp b/client.cpp index cd807c8..e99bf02 100644 --- a/client.cpp +++ b/client.cpp @@ -70,7 +70,6 @@ int main() { constexpr const char* shm_name = "/shm_rates1"; constexpr size_t capacity = 1024; // или то, что реально -// size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity); size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity); int shm_fd = shm_open(shm_name, O_RDWR, 0600); @@ -80,13 +79,13 @@ int main() return 1; } - struct stat statbuf; - if (fstat(shm_fd, &statbuf) == -1) - { - perror("fstat failed"); - close(shm_fd); - return 1; - } +// struct stat statbuf; +// if (fstat(shm_fd, &statbuf) == -1) +// { +// perror("fstat failed"); +// close(shm_fd); +// return 1; +// } // void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); diff --git a/core/SharedCommandQueue.cpp b/core/SharedCommandQueue.cpp index 53bc862..3540a85 100644 --- a/core/SharedCommandQueue.cpp +++ b/core/SharedCommandQueue.cpp @@ -24,14 +24,18 @@ namespace usub::core { if (next_head == this->tail_.load(std::memory_order_acquire)) return false; // Очередь полна - Command &slot = this->buffer_[head]; - slot.ready.store(0, std::memory_order_relaxed); - slot.op = cmd.op; - slot.key = cmd.key; - slot.value_size = cmd.value_size; - std::memcpy(slot.value, cmd.value, cmd.value_size); - slot.response_size = 0; - slot.response_ready.store(0, std::memory_order_relaxed); + Command *slot = reinterpret_cast( + reinterpret_cast(this) + offsetof(SharedCommandQueue, buffer_) + ) + head; + + slot->ready.store(0, std::memory_order_relaxed); + slot->op = cmd.op; + slot->key = cmd.key; + slot->value_size = cmd.value_size; + std::memcpy(slot->value, cmd.value, cmd.value_size); + slot->response_size = 0; + slot->response_ready.store(0, std::memory_order_relaxed); + this->head_.store(next_head, std::memory_order_release); return true; } @@ -99,7 +103,10 @@ namespace usub::core { if (index >= available) return nullptr; - return &this->buffer_[(tail + index) % this->capacity_]; + size_t pos = (tail + index) % this->capacity_; + return reinterpret_cast( + reinterpret_cast(this) + offsetof(SharedCommandQueue, buffer_) + ) + pos; } void SharedCommandQueue::pop() { diff --git a/core/SharedCommandQueue.h b/core/SharedCommandQueue.h index 74c39c9..c82f277 100644 --- a/core/SharedCommandQueue.h +++ b/core/SharedCommandQueue.h @@ -53,8 +53,7 @@ namespace usub::core { static size_t calculate_shm_size(size_t capacity); -// private: - public: + private: size_t capacity_; alignas(SHM_ALIGNMENT) std::atomic head_; alignas(SHM_ALIGNMENT) std::atomic tail_; diff --git a/core/SharedMemoryManager.cpp b/core/SharedMemoryManager.cpp index 68cfbb2..bbd571c 100644 --- a/core/SharedMemoryManager.cpp +++ b/core/SharedMemoryManager.cpp @@ -68,7 +68,7 @@ namespace usub::core bool SharedMemoryManager::needs_init() const noexcept { - const uint8_t* base = reinterpret_cast(this->shm_ptr); + const auto* base = reinterpret_cast(this->shm_ptr); for (size_t i = 0; i < sizeof(uint64_t); ++i) { if (base[i] != 0) diff --git a/core/UDB.cpp b/core/UDB.cpp index 786ca39..58314e0 100644 --- a/core/UDB.cpp +++ b/core/UDB.cpp @@ -17,11 +17,8 @@ namespace usub::core { this->version_manager_), compactor_(this->version_manager_), running_(true), recovery_log_(this->db_name_) { - if (shm_manager_.needs_init()) { + if (shm_manager_.needs_init()) new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); - } else { - this->command_queue_ = reinterpret_cast(this->shm_manager_.base_ptr()); - } this->command_queue_ = reinterpret_cast(shm_manager_.base_ptr()); @@ -69,12 +66,11 @@ namespace usub::core { while (this->running_) { Command *cmd = this->command_queue_->peek(0); - if (!cmd) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + cpu_relax(); continue; } - + std::cout << "[UDB] command were provided\n"; if (cmd->ready.load(std::memory_order_acquire) == 1) { process_command(*cmd); this->command_queue_->pop(); diff --git a/main.cpp b/main.cpp index 2131628..f1bffe4 100644 --- a/main.cpp +++ b/main.cpp @@ -81,8 +81,7 @@ void test_sstable_write_read() #include "core/DatabaseManager.h" -int main() -{ +int main() { #if 0 test_skiplist_basic(); simulate_restart_and_check_version(); @@ -91,19 +90,19 @@ int main() q.push(1); std::cout << q.pop().value() << std::endl; #endif - try - { - using namespace usub::core; + try { + using namespace usub::core; usub::core::DatabaseManager manager("../config.toml"); manager.run_all(); + // std::cout << sizeof(usub::core::SharedCommandQueue) << ", " << SharedCommandQueue::calculate_shm_size(1024) << "\n"; // usub::core::SharedMemoryManager shm_manager{"rates_1", SharedCommandQueue::calculate_shm_size(1024)}; // new(shm_manager.base_ptr()) SharedCommandQueue(1024); // auto* queue = static_cast(shm_manager.base_ptr()); -// std::cout << queue->head_.load() << '\n'; +// auto* cmd = queue->peek(0); +// std::cout << cmd->ready.load() << '\n'; } - catch (const std::exception& ex) - { + catch (const std::exception &ex) { std::cerr << "Fatal error: " << ex.what() << std::endl; return 1; }