finished first version

This commit is contained in:
g2px1 2025-04-20 22:45:29 +03:00
parent d23f9b11b7
commit e6ce46ec1d
3 changed files with 356 additions and 0 deletions

78
CMakeLists.txt Normal file
View File

@ -0,0 +1,78 @@
cmake_minimum_required(VERSION 3.30)
project(sharedMemoryKeyDB)
set(CMAKE_CXX_STANDARD 23)
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0 -fPIC" CACHE STRING "Debug flags" FORCE)
set(CMAKE_C_FLAGS_DEBUG "-g -O0 -fPIC" CACHE STRING "Debug flags" FORCE)
include_directories(${sharedMemoryKeyDB_SOURCE_DIR})
add_executable(sharedMemoryKeyDB
main.cpp
utils/toml/toml.hpp
utils/hash/xxhash/xxh3.h
utils/hash/xxhash/xxhash.h
utils/hash/xxhash/xxhash.c
utils/datastructures/LFSkipList.cpp
utils/datastructures/LFSkipList.h
utils/io/SSTableIO.cpp
utils/io/SSTableIO.h
utils/io/Wal.cpp
utils/io/Wal.h
core/Memtable.cpp
core/Memtable.h
utils/io/Compactor.cpp
utils/io/Compactor.h
utils/io/VersionManager.cpp
utils/io/VersionManager.h
utils/io/RecoveryLog.cpp
utils/io/RecoveryLog.h
utils/hash/Hash128.h
core/SharedMemoryManager.cpp
core/SharedMemoryManager.h
utils/datastructures/LFCircullarBuffer.cpp
utils/datastructures/LFCircullarBuffer.h
utils/intrinsincs/optimizations.h
core/SharedCommandQueue.cpp
core/SharedCommandQueue.h
core/Command.cpp
core/Command.h
utils/string/basic_utils.h
core/UDB.cpp
core/UDB.h
core/DatabaseManager.cpp
core/DatabaseManager.h
)
add_executable(sharedMemoryKeyDB_client
client.cpp
utils/toml/toml.hpp
utils/hash/xxhash/xxh3.h
utils/hash/xxhash/xxhash.h
utils/hash/xxhash/xxhash.c
utils/datastructures/LFSkipList.cpp
utils/datastructures/LFSkipList.h
utils/io/SSTableIO.cpp
utils/io/SSTableIO.h
utils/io/Wal.cpp
utils/io/Wal.h
core/Memtable.cpp
core/Memtable.h
utils/io/Compactor.cpp
utils/io/Compactor.h
utils/io/VersionManager.cpp
utils/io/VersionManager.h
utils/io/RecoveryLog.cpp
utils/io/RecoveryLog.h
utils/hash/Hash128.h
core/SharedMemoryManager.cpp
core/SharedMemoryManager.h
utils/datastructures/LFCircullarBuffer.cpp
utils/datastructures/LFCircullarBuffer.h
utils/intrinsincs/optimizations.h
core/SharedCommandQueue.cpp
core/SharedCommandQueue.h
core/Command.cpp
core/Command.h
utils/string/basic_utils.h
)

164
client.cpp Normal file
View File

@ -0,0 +1,164 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <iostream>
#include <cstring>
#include <thread>
#include <chrono>
#include "core/SharedCommandQueue.h"
#include "utils/hash/Hash128.h"
// int main()
// {
// constexpr const char* shm_name = "/shm_rates1";
// constexpr size_t shm_size = sizeof(usub::core::SharedCommandQueue);
//
// int shm_fd = shm_open(shm_name, O_RDWR, 0600);
// if (shm_fd == -1)
// {
// perror("shm_open failed");
// return 1;
// }
//
// void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
// if (ptr == MAP_FAILED)
// {
// perror("mmap failed");
// close(shm_fd);
// return 1;
// }
//
// auto* queue = static_cast<usub::core::SharedCommandQueue*>(ptr);
// std::cout << "[CLIENT] Connected to shared memory.\n";
//
// size_t head = queue->head().load(std::memory_order_relaxed);
// size_t next_head = (head + 1) % queue->capacity();
// while (next_head == queue->tail().load(std::memory_order_acquire))
// {
// std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wait
// }
//
// usub::core::Command* slot = &queue->raw_buffer()[head];
//
// // заполняем slot
// slot->op = usub::core::OperationType::PUT;
// slot->key = usub::utils::compute_hash128("test_key", 9);
// const char* put_value = "test_value";
// slot->value_size = std::strlen(put_value);
// std::memcpy(slot->value, put_value, slot->value_size);
// slot->response_size = 0;
// slot->response_ready.store(0, std::memory_order_relaxed);
//
// slot->ready.store(1, std::memory_order_release);
//
// // обновляем head
// queue->head().store(next_head, std::memory_order_release);
//
// std::cout << "[CLIENT] PUT command sent.\n";
//
// // Ожидание, можно реализовать FIND аналогично.
//
// munmap(ptr, shm_size);
// close(shm_fd);
//
// return 0;
// }
int main()
{
constexpr const char* shm_name = "/shm_rates1";
constexpr size_t shm_size = sizeof(usub::core::SharedCommandQueue);
int shm_fd = shm_open(shm_name, O_RDWR, 0600);
if (shm_fd == -1)
{
perror("shm_open failed");
return 1;
}
void* ptr = mmap(nullptr, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED)
{
perror("mmap failed");
close(shm_fd);
return 1;
}
auto* queue = static_cast<usub::core::SharedCommandQueue*>(ptr);
std::cout << "[CLIENT] Connected to shared memory.\n";
// -------------------
// PUT Command
// -------------------
{
size_t head = queue->head().load(std::memory_order_relaxed);
size_t next_head = (head + 1) % queue->capacity();
while (next_head == queue->tail().load(std::memory_order_acquire))
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
usub::core::Command* slot = &queue->raw_buffer()[head];
slot->op = usub::core::OperationType::PUT;
slot->key = usub::utils::compute_hash128("test_key", 8); // исправил длину "test_key" = 8 символов
const char* put_value = "test_value";
slot->value_size = std::strlen(put_value);
std::memcpy(slot->value, put_value, slot->value_size);
slot->response_size = 0;
slot->response_ready.store(0, std::memory_order_relaxed);
slot->ready.store(1, std::memory_order_release);
queue->head().store(next_head, std::memory_order_release);
std::cout << "[CLIENT] PUT command sent.\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // дать серверу время
// -------------------
// FIND Command
// -------------------
{
size_t head = queue->head().load(std::memory_order_relaxed);
size_t next_head = (head + 1) % queue->capacity();
while (next_head == queue->tail().load(std::memory_order_acquire))
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
usub::core::Command* slot = &queue->raw_buffer()[head];
slot->op = usub::core::OperationType::FIND;
slot->key = usub::utils::compute_hash128("test_key", 8);
slot->value_size = 0;
slot->response_size = 0;
slot->response_ready.store(0, std::memory_order_relaxed);
slot->ready.store(1, std::memory_order_release);
queue->head().store(next_head, std::memory_order_release);
std::cout << "[CLIENT] FIND command sent.\n";
// Ожидание ответа
while (slot->response_ready.load(std::memory_order_acquire) == 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (slot->response_size > 0)
{
std::string result(reinterpret_cast<char*>(slot->response), slot->response_size);
std::cout << "[CLIENT] FIND result: " << result << "\n";
}
else
{
std::cout << "[CLIENT] FIND result: not found\n";
}
}
munmap(ptr, shm_size);
close(shm_fd);
return 0;
}

114
main.cpp Normal file
View File

@ -0,0 +1,114 @@
#include <iostream>
#include "utils/toml/toml.hpp"
#include <iostream>
#include "utils/hash/xxhash/xxhash.h"
#include "utils/datastructures/LFSkipList.h"
#include "utils/io/SSTableIO.h"
using namespace usub::utils;
#include "utils/io/VersionManager.h"
void test_skiplist_basic()
{
std::string db_name = "test_db";
VersionManager version_manager(db_name);
LFSkipList<Hash128, std::string> table(version_manager);
table.insert(compute_hash128("a", 1), "apple");
table.insert(compute_hash128("b", 1), "banana");
table.insert(compute_hash128("c", 1), "cherry");
auto a = table.find(compute_hash128("a", 1));
auto b = table.find(compute_hash128("b", 1));
auto c = table.find(compute_hash128("c", 1));
auto d = table.find(compute_hash128("d", 1));
assert(a.has_value() && a.value() == "apple");
assert(b.has_value() && b.value() == "banana");
assert(c.has_value() && c.value() == "cherry");
assert(!d.has_value());
std::cout << "SkipList basic test passed.\n";
}
void simulate_restart_and_check_version()
{
std::string db_name = "test_db";
{
VersionManager vm1(db_name);
uint64_t v1 = vm1.next_version();
uint64_t v2 = vm1.next_version();
uint64_t v3 = vm1.next_version();
(void)v1;
(void)v2;
(void)v3;
}
VersionManager vm2(db_name);
uint64_t v4 = vm2.next_version();
assert(v4 > 3 && "Version did not persist correctly");
std::cout << "Version persistence test passed.\n";
}
void test_sstable_write_read()
{
std::string db_name = "test_db";
VersionManager version_manager(db_name);
LFSkipList<Hash128, std::string> table(version_manager);
table.insert(compute_hash128("dog", 3), "bark");
table.insert(compute_hash128("cat", 3), "meow");
table.insert(compute_hash128("cow", 3), "moo");
write_sstable_with_index(table, "test_sstable.dat");
LFSkipList<Hash128, std::string> loaded_table(version_manager);
read_sstable_with_mmap(loaded_table, "test_sstable.dat");
assert(loaded_table.find(compute_hash128("dog", 3)).has_value());
assert(loaded_table.find(compute_hash128("cat", 3)).has_value());
assert(loaded_table.find(compute_hash128("cow", 3)).has_value());
std::cout << "SSTable write/read test passed.\n";
}
#include "utils/datastructures/LFCircullarBuffer.h"
#include "core/SharedMemoryManager.h"
#include "core/SharedCommandQueue.h"
#include "utils/datastructures/LFCircullarBuffer.h"
#include "core/Command.h"
#include "utils/hash/Hash128.h"
#include <unordered_map>
#include "utils/string/basic_utils.h"
#include "core/DatabaseManager.h"
int main()
{
#if 0
test_skiplist_basic();
simulate_restart_and_check_version();
test_sstable_write_read();
LockFreeRingBuffer<int> q;
q.push(1);
std::cout << q.pop().value() << std::endl;
#endif
try
{
usub::core::DatabaseManager manager("/Users/kirillzhukov/Downloads/test.toml");
manager.run_all();
}
catch (const std::exception& ex)
{
std::cerr << "Fatal error: " << ex.what() << std::endl;
return 1;
}
return 0;
}