fixed operations

This commit is contained in:
g2px1 2025-04-27 16:32:10 +00:00
parent e44279add1
commit 5d0d64f895
6 changed files with 35 additions and 35 deletions

View File

@ -70,7 +70,6 @@ int main()
{ {
constexpr const char* shm_name = "/shm_rates1"; constexpr const char* shm_name = "/shm_rates1";
constexpr size_t capacity = 1024; // или то, что реально constexpr size_t capacity = 1024; // или то, что реально
// size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity);
size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity); size_t shm_size = usub::core::SharedCommandQueue::calculate_shm_size(capacity);
int shm_fd = shm_open(shm_name, O_RDWR, 0600); int shm_fd = shm_open(shm_name, O_RDWR, 0600);
@ -80,13 +79,13 @@ int main()
return 1; return 1;
} }
struct stat statbuf; // struct stat statbuf;
if (fstat(shm_fd, &statbuf) == -1) // if (fstat(shm_fd, &statbuf) == -1)
{ // {
perror("fstat failed"); // perror("fstat failed");
close(shm_fd); // close(shm_fd);
return 1; // return 1;
} // }
// void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); // void* ptr = mmap(nullptr, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);

View File

@ -24,14 +24,18 @@ namespace usub::core {
if (next_head == this->tail_.load(std::memory_order_acquire)) if (next_head == this->tail_.load(std::memory_order_acquire))
return false; // Очередь полна return false; // Очередь полна
Command &slot = this->buffer_[head]; Command *slot = reinterpret_cast<Command *>(
slot.ready.store(0, std::memory_order_relaxed); reinterpret_cast<char *>(this) + offsetof(SharedCommandQueue, buffer_)
slot.op = cmd.op; ) + head;
slot.key = cmd.key;
slot.value_size = cmd.value_size; slot->ready.store(0, std::memory_order_relaxed);
std::memcpy(slot.value, cmd.value, cmd.value_size); slot->op = cmd.op;
slot.response_size = 0; slot->key = cmd.key;
slot.response_ready.store(0, std::memory_order_relaxed); slot->value_size = cmd.value_size;
std::memcpy(slot->value, cmd.value, cmd.value_size);
slot->response_size = 0;
slot->response_ready.store(0, std::memory_order_relaxed);
this->head_.store(next_head, std::memory_order_release); this->head_.store(next_head, std::memory_order_release);
return true; return true;
} }
@ -99,7 +103,10 @@ namespace usub::core {
if (index >= available) if (index >= available)
return nullptr; return nullptr;
return &this->buffer_[(tail + index) % this->capacity_]; size_t pos = (tail + index) % this->capacity_;
return reinterpret_cast<Command *>(
reinterpret_cast<char *>(this) + offsetof(SharedCommandQueue, buffer_)
) + pos;
} }
void SharedCommandQueue::pop() { void SharedCommandQueue::pop() {

View File

@ -53,8 +53,7 @@ namespace usub::core {
static size_t calculate_shm_size(size_t capacity); static size_t calculate_shm_size(size_t capacity);
// private: private:
public:
size_t capacity_; size_t capacity_;
alignas(SHM_ALIGNMENT) std::atomic<size_t> head_; alignas(SHM_ALIGNMENT) std::atomic<size_t> head_;
alignas(SHM_ALIGNMENT) std::atomic<size_t> tail_; alignas(SHM_ALIGNMENT) std::atomic<size_t> tail_;

View File

@ -68,7 +68,7 @@ namespace usub::core
bool SharedMemoryManager::needs_init() const noexcept bool SharedMemoryManager::needs_init() const noexcept
{ {
const uint8_t* base = reinterpret_cast<const uint8_t*>(this->shm_ptr); const auto* base = reinterpret_cast<const uint8_t*>(this->shm_ptr);
for (size_t i = 0; i < sizeof(uint64_t); ++i) for (size_t i = 0; i < sizeof(uint64_t); ++i)
{ {
if (base[i] != 0) if (base[i] != 0)

View File

@ -17,11 +17,8 @@ namespace usub::core {
this->version_manager_), this->version_manager_),
compactor_(this->version_manager_), compactor_(this->version_manager_),
running_(true), recovery_log_(this->db_name_) { running_(true), recovery_log_(this->db_name_) {
if (shm_manager_.needs_init()) { if (shm_manager_.needs_init())
new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_); new(this->shm_manager_.base_ptr()) SharedCommandQueue(this->shm_queue_capacity_);
} else {
this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(this->shm_manager_.base_ptr());
}
this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(shm_manager_.base_ptr()); this->command_queue_ = reinterpret_cast<SharedCommandQueue *>(shm_manager_.base_ptr());
@ -69,12 +66,11 @@ namespace usub::core {
while (this->running_) { while (this->running_) {
Command *cmd = this->command_queue_->peek(0); Command *cmd = this->command_queue_->peek(0);
if (!cmd) { if (!cmd) {
std::this_thread::sleep_for(std::chrono::milliseconds(1)); cpu_relax();
continue; continue;
} }
std::cout << "[UDB] command were provided\n";
if (cmd->ready.load(std::memory_order_acquire) == 1) { if (cmd->ready.load(std::memory_order_acquire) == 1) {
process_command(*cmd); process_command(*cmd);
this->command_queue_->pop(); this->command_queue_->pop();

View File

@ -81,8 +81,7 @@ void test_sstable_write_read()
#include "core/DatabaseManager.h" #include "core/DatabaseManager.h"
int main() int main() {
{
#if 0 #if 0
test_skiplist_basic(); test_skiplist_basic();
simulate_restart_and_check_version(); simulate_restart_and_check_version();
@ -91,19 +90,19 @@ int main()
q.push(1); q.push(1);
std::cout << q.pop().value() << std::endl; std::cout << q.pop().value() << std::endl;
#endif #endif
try try {
{ using namespace usub::core;
using namespace usub::core;
usub::core::DatabaseManager manager("../config.toml"); usub::core::DatabaseManager manager("../config.toml");
manager.run_all(); manager.run_all();
// std::cout << sizeof(usub::core::SharedCommandQueue) << ", " << SharedCommandQueue::calculate_shm_size(1024) << "\n"; // std::cout << sizeof(usub::core::SharedCommandQueue) << ", " << SharedCommandQueue::calculate_shm_size(1024) << "\n";
// usub::core::SharedMemoryManager shm_manager{"rates_1", SharedCommandQueue::calculate_shm_size(1024)}; // usub::core::SharedMemoryManager shm_manager{"rates_1", SharedCommandQueue::calculate_shm_size(1024)};
// new(shm_manager.base_ptr()) SharedCommandQueue(1024); // new(shm_manager.base_ptr()) SharedCommandQueue(1024);
// auto* queue = static_cast<usub::core::SharedCommandQueue*>(shm_manager.base_ptr()); // auto* queue = static_cast<usub::core::SharedCommandQueue*>(shm_manager.base_ptr());
// std::cout << queue->head_.load() << '\n'; // auto* cmd = queue->peek(0);
// std::cout << cmd->ready.load() << '\n';
} }
catch (const std::exception& ex) catch (const std::exception &ex) {
{
std::cerr << "Fatal error: " << ex.what() << std::endl; std::cerr << "Fatal error: " << ex.what() << std::endl;
return 1; return 1;
} }