Compare commits

..

No commits in common. "c99392fc160966d8c78133e8603876f9dbb7689d" and "e2ae589a0ef3220789c6f265382949bdc1b438a1" have entirely different histories.

13 changed files with 265 additions and 410 deletions

224
README.md
View File

@ -1,213 +1,21 @@
В байтах (C++ layout): ### 📦 Поля структуры `Command`:
| Смещение | Размер | Тип данных | Описание | | Поле | Назначение |
| :------- | :----- | :----------------------------------- | :-------------------------------------------------------- | | ---------------- | -------------------------------------------------- |
| 0 | 1 | `uint8_t` (atomic) | `ready` (0 = не готова, 1 = готова) | | `op` | Тип операции: `PUT`, `DELETE`, `FIND` |
| 1 | 1 | `uint8_t` | `op` — код операции (`PUT` = 0, `DELETE` = 1, `FIND` = 2) | | `key` | Хэш ключа `Hash128` |
| 2 | 16 | `uint8_t[16]` | `key` — хэш ключа (Hash128, 128 бит) | | `value` | Буфер под значение (для PUT) |
| 18 | 4 | `uint32_t` | `value_size` — длина данных `value` | | `value_size` | Размер значения в байтах |
| 22 | 1024 | `uint8_t[1024]` | `value` — содержимое для записи (PUT) | | `response` | Буфер под ответ от сервера (используется при FIND) |
| 1046 | 4 | `uint32_t` | `response_size` — длина ответа в `response` | | `response_size` | Размер ответа в байтах |
| 1050 | 1024 | `uint8_t[1024]` | `response` — содержимое ответа после FIND | | `response_ready` | Флаг: 0 (ожидание), 1 (ответ готов) |
| 2074 | 1 | `uint8_t` (atomic) | `response_ready` (0 = нет ответа, 1 = ответ готов) |
| --- | --- | Итого: **2075 байт** на одну команду | |
> ⚠️ Реально в памяти компилятор **добавит выравнивание**.\
> Обычно команда занимает **2080 байт** с учетом паддинга (например, до 8 байт).
--- ---
## 2. Структура очереди `SharedCommandQueue` ### 🔥 Пошаговый обмен:
| Смещение | Размер | Тип данных | Описание | 1. Клиент заполняет `Command` структуры (устанавливает поля, сбрасывает `response_ready = 0`).
| :------- | :------- | :---------------- | :----------------------------- | 2. Клиент `push_batch()` пачку команд в `SharedCommandQueue`.
| 0 | 8 | `size_t` (atomic) | `head` (где следующий push) | 3. Сервер `try_pop_batch()`, обрабатывает.
| 8 | 8 | `size_t` (atomic) | `tail` (где следующий pop) | 4. Сервер пишет результат обратно в поля `Command` (`response`, `response_size`, `response_ready=1`).
| 16 | 8 | `size_t` | `capacity` (количество слотов) | 5. Клиент ждет готовности (`response_ready.load() == 1`) и читает результат.
| 24 | 2080 × N | `Command[N]` | Массив команд |
> **N** = максимальное количество команд в очереди. Например: 64.
---
# 📦 Процесс записи команды (PUT, FIND, DELETE)
1. Клиент читает `head`.
2. Вычисляет `next_head = (head + 1) % capacity`.
3. Проверяет, что `next_head != tail` (иначе очередь заполнена, нужно ждать).
4. Пишет данные в слот `Command` по индексу `head`:
- ставит `ready = 0`
- заполняет `op`, `key`, `value_size`, `value`
- обнуляет `response_size`, `response_ready`
5. После полной записи устанавливает `ready = 1` (memory\_order\_release).
6. После этого обновляет `head = next_head` (memory\_order\_release).
---
# 🧩 Процесс чтения команды сервером
1. Сервер читает `tail`.
2. Если `tail == head`, значит нет команд (ждет).
3. Читает слот `Command` по индексу `tail`.
4. Проверяет `ready == 1`.
5. Выполняет команду:
- для `PUT`, `DELETE` ничего не нужно возвращать
- для `FIND` после обработки:
- заполняет `response`
- устанавливает `response_size`
- устанавливает `response_ready = 1` (memory\_order\_release)
6. После обработки увеличивает `tail = (tail + 1) % capacity` (memory\_order\_release).
---
# 📑 Коды операций
| Код | Операция |
| :-- | :------- |
| 0 | PUT |
| 1 | DELETE |
| 2 | FIND |
---
# 📈 Диаграмма
```
Client (head) Server (tail)
↓ ↑
[ empty ][ empty ][ empty ][ empty ][ empty ]
↑ write PUT -> ready=1 ↑ read -> process -> tail++
↑ write FIND -> ready=1 ↑ read -> process -> response_ready=1
```
---
# 📋 Минимальная инструкция для других языков
1. Открыть или создать POSIX shared memory (`/shm_rates1`).
2. Маппить размер (например: 24 + 2080×64 байта = \~133 120 байт).
3. Знать layout структуры `SharedCommandQueue` и `Command`.
4. Атомарно обновлять `head`/`tail` через обычные load/store (x86/arm архитектуры обеспечивают это через aligned операции на 64 бита).
---
# ⚙️ Примерная формула расчета памяти:
```text
total_size = 24 + (round_up(sizeof(Command)) × capacity)
```
- **24 байта** на `head` + `tail` + `capacity`
- **capacity** = число команд
- **sizeof(Command)** = около 2080 байт
---
# ❗ Важно
- `head` и `tail` — должны быть атомарными.
- Доступ к слотам только по индексам `(head % capacity)` или `(tail % capacity)`.
- Все указанные поля фиксированы и известны по смещению.
- Все строки (`value`, `response`) — это просто байтовые массивы без `\0`.
---
# 📦 Памятка: Структура SharedCommandQueue в памяти
### Общая структура:
| Смещение (байты) | Размер | Описание |
| :--------------- | :------- | :--------------------------------- |
| 0 | 8 | `head` (size\_t, atomic) |
| 8 | 8 | `tail` (size\_t, atomic) |
| 16 | 8 | `capacity` (size\_t, НЕ atomic) |
| 24 | 2080 × N | Команды Command\[] (N штук подряд) |
---
# 📦 Памятка: Структура одного `Command`
| Смещение | Размер | Описание |
| :-------- | :--------- | :----------------------------------- |
| 0 | 1 byte | `ready` (atomic\<uint8\_t>) |
| 1 | 1 byte | `op` (uint8\_t) |
| 2 | 16 bytes | `key` (Hash128, 128 бит) |
| 18 | 4 bytes | `value_size` (uint32\_t) |
| 22 | 1024 bytes | `value` (байтовый массив данных) |
| 1046 | 4 bytes | `response_size` (uint32\_t) |
| 1050 | 1024 bytes | `response` (ответ от сервера) |
| 2074 | 1 byte | `response_ready` (atomic\<uint8\_t>) |
| 20752079 | 5 bytes | padding (выравнивание до 8 байт) |
> ⚡ Суммарный размер одной команды с паддингом = **2080 байт**.
---
# 📋 HEX Layout всего буфера (SharedCommandQueue + команды)
```
00:00 - 00:07 -> head (size_t, LE)
00:08 - 00:0F -> tail (size_t, LE)
00:10 - 00:17 -> capacity (size_t, LE)
00:18 - 00:18+2080*N -> N команд, каждая:
00:00 -> ready (1 byte)
00:01 -> op (1 byte)
00:02 - 00:11 -> key (16 bytes)
00:12 - 00:15 -> value_size (4 bytes, LE)
00:16 - 04:15 -> value (1024 bytes)
04:16 - 04:19 -> response_size (4 bytes, LE)
04:20 - 08:1F -> response (1024 bytes)
08:20 -> response_ready (1 byte)
08:21 - 08:27 -> padding (5 bytes)
```
---
# 🧪 Мини-псевдокод для любого языка
```pseudo
ptr = mmap(...)
head = read_uint64(ptr, 0)
tail = read_uint64(ptr, 8)
capacity = read_uint64(ptr, 16)
command_base = 24
// Пример: прочитать первую команду (index = 0)
offset = command_base + index * 2080
ready = read_uint8(ptr, offset + 0)
op = read_uint8(ptr, offset + 1)
key = read_bytes(ptr, offset + 2, 16)
value_size = read_uint32(ptr, offset + 18)
value = read_bytes(ptr, offset + 22, value_size)
response_ready = read_uint8(ptr, offset + 2074)
response_size = read_uint32(ptr, offset + 1046)
response = read_bytes(ptr, offset + 1050, response_size)
```
---
# 📊 Краткая таблица памяти:
| Поле | Тип | Офсет | Размер |
| :------- | :----------- | :---- | :----- |
| head | size\_t (8B) | 0 | 8B |
| tail | size\_t (8B) | 8 | 8B |
| capacity | size\_t (8B) | 16 | 8B |
| commands | Command\[N] | 24 | 2080×N |
---
# 📢 Важные правила при использовании
- Всегда выравнивание по 8 байтам (иначе UB на ARM/M1).
- Использовать little-endian (x86, arm64 всегда LE).
- При копировании строк следить за value\_size / response\_size.
- Проверять флаг `ready` перед чтением команды.
- После обработки команды — инкрементировать `tail` мод capacity.
- Ответ через `response` + `response_size` + `response_ready` только для `FIND`.
---

View File

@ -6,43 +6,78 @@
namespace usub::core namespace usub::core
{ {
Command::Command() // Command::Command() : op(OperationType::UNKNOWN), value_size(0), response_ready(0), response_size(0)
{ // {
ready.store(0, std::memory_order_relaxed); // std::memset(value, 0, sizeof(value));
response_ready.store(0, std::memory_order_relaxed); // std::memset(response, 0, sizeof(response));
op = OperationType::PUT; // }
key = utils::Hash128{}; //
value_size = 0; // Command::Command(const Command& other)
response_size = 0; // : op(other.op),
} // key(other.key),
// value_size(0),
Command::Command(const Command& other) // response_ready(false),
{ // response_size(0)
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); // {
op = other.op; // if (other.value_size > 0 && other.value_size <= sizeof(value))
key = other.key; // {
value_size = other.value_size; // value_size = other.value_size;
std::memcpy(value, other.value, other.value_size); // std::memcpy(value, other.value, value_size);
// }
response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); // if (other.response_size > 0 && other.response_size <= sizeof(response))
response_size = other.response_size; // {
std::memcpy(response, other.response, other.response_size); // response_size = other.response_size;
} // std::memcpy(response, other.response, response_size);
// }
Command& Command::operator=(const Command& other) // response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
{ // }
if (this != &other) //
{ //
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed); // Command::Command(Command&& other) noexcept
op = other.op; // : op(other.op),
key = other.key; // key(other.key),
value_size = other.value_size; // value_size(other.value_size),
std::memcpy(value, other.value, other.value_size); // response_ready(other.response_ready.load(std::memory_order_relaxed)),
// response_size(other.response_size)
response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed); // {
response_size = other.response_size; // if (value_size > 0)
std::memcpy(response, other.response, other.response_size); // std::memcpy(value, other.value, value_size);
} // if (response_size > 0)
return *this; // std::memcpy(response, other.response, response_size);
} // }
//
// Command& Command::operator=(const Command& other)
// {
// if (this != &other)
// {
// op = other.op;
// key = other.key;
// value_size = other.value_size;
// response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
// response_size = other.response_size;
//
// if (value_size > 0)
// std::memcpy(value, other.value, value_size);
// if (response_size > 0)
// std::memcpy(response, other.response, response_size);
// }
// return *this;
// }
//
// Command& Command::operator=(Command&& other) noexcept
// {
// if (this != &other)
// {
// op = other.op;
// key = other.key;
// value_size = other.value_size;
// response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
// response_size = other.response_size;
// if (value_size > 0)
// std::memcpy(value, other.value, value_size);
// if (response_size > 0)
// std::memcpy(response, other.response, response_size);
// }
// return *this;
// }
} }

View File

@ -21,12 +21,6 @@ namespace usub::core
struct Command struct Command
{ {
Command();
Command(const Command& other);
Command& operator=(const Command& other);
std::atomic<uint8_t> ready; std::atomic<uint8_t> ready;
OperationType op; OperationType op;
utils::Hash128 key; utils::Hash128 key;
@ -36,6 +30,46 @@ namespace usub::core
std::atomic<uint8_t> response_ready; std::atomic<uint8_t> response_ready;
uint32_t response_size; uint32_t response_size;
char response[1024]; char response[1024];
Command()
{
ready.store(0, std::memory_order_relaxed);
response_ready.store(0, std::memory_order_relaxed);
op = OperationType::PUT;
key = utils::Hash128{};
value_size = 0;
response_size = 0;
}
Command(const Command& other)
{
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
op = other.op;
key = other.key;
value_size = other.value_size;
std::memcpy(value, other.value, other.value_size);
response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
response_size = other.response_size;
std::memcpy(response, other.response, other.response_size);
}
Command& operator=(const Command& other)
{
if (this != &other)
{
ready.store(other.ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
op = other.op;
key = other.key;
value_size = other.value_size;
std::memcpy(value, other.value, other.value_size);
response_ready.store(other.response_ready.load(std::memory_order_relaxed), std::memory_order_relaxed);
response_size = other.response_size;
std::memcpy(response, other.response, other.response_size);
}
return *this;
}
}; };
} }

View File

@ -36,13 +36,13 @@ namespace usub::core
} }
bool create_new = !SharedMemoryManager::exists("shm_" + db_name); bool create_new = !SharedMemoryManager::exists("shm_" + db_name);
auto udb = std::make_unique<UDB>(db_name, "shm_" + db_name, 1024, 1024 * 1024, create_new); auto udb = std::make_unique<UDB>(db_name, "shm_" + db_name, 1024, 1024 * 1024, create_new);
this->databases_.push_back(DatabaseInstance{std::move(udb), {}}); databases_.push_back(DatabaseInstance{std::move(udb), {}});
} }
} }
void DatabaseManager::run_all() void DatabaseManager::run_all()
{ {
for (auto& db : this->databases_) for (auto& db : databases_)
{ {
db.worker = std::thread([&db]() db.worker = std::thread([&db]()
{ {
@ -50,7 +50,7 @@ namespace usub::core
}); });
} }
for (auto& db : this->databases_) for (auto& db : databases_)
{ {
if (db.worker.joinable()) if (db.worker.joinable())
db.worker.join(); db.worker.join();

View File

@ -66,8 +66,8 @@ namespace usub::shared_storage
const typename SkipList::value_type& value) const typename SkipList::value_type& value)
{ {
{ {
std::lock_guard<std::mutex> lock(this->batch_mutex); std::lock_guard<std::mutex> lock(batch_mutex);
this->write_batch.emplace_back(key, value); write_batch.emplace_back(key, value);
} }
if (estimate_batch_size() >= 64) if (estimate_batch_size() >= 64)
{ {
@ -110,17 +110,17 @@ namespace usub::shared_storage
{ {
std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> local_batch; std::vector<std::pair<typename SkipList::key_type, typename SkipList::value_type>> local_batch;
{ {
std::lock_guard<std::mutex> lock(this->batch_mutex); std::lock_guard<std::mutex> lock(batch_mutex);
local_batch.swap(this->write_batch); local_batch.swap(write_batch);
} }
for (const auto& [key, value] : local_batch) for (const auto& [key, value] : local_batch)
{ {
this->wal.write_put(key, value); wal.write_put(key, value);
this->active_memtable.load()->insert(key, value); active_memtable.load()->insert(key, value);
} }
if (estimate_memtable_size() > this->max_memtable_size) if (estimate_memtable_size() > max_memtable_size)
{ {
flush(); flush();
} }
@ -136,7 +136,7 @@ namespace usub::shared_storage
template <typename SkipList> template <typename SkipList>
size_t MemTableManager<SkipList>::estimate_batch_size() const size_t MemTableManager<SkipList>::estimate_batch_size() const
{ {
return this->write_batch.size(); return write_batch.size();
} }
} // shared_storage } // shared_storage
// usub // usub

View File

@ -9,27 +9,27 @@ namespace usub::core
SharedCommandQueue::SharedCommandQueue(size_t capacity) SharedCommandQueue::SharedCommandQueue(size_t capacity)
: capacity_(capacity), head_(0), tail_(0) : capacity_(capacity), head_(0), tail_(0)
{ {
for (size_t i = 0; i < this->capacity_; ++i) for (size_t i = 0; i < capacity_; ++i)
new(&this->buffer_[i]) Command(); new(&buffer_[i]) Command();
} }
SharedCommandQueue::~SharedCommandQueue() SharedCommandQueue::~SharedCommandQueue()
{ {
for (size_t i = 0; i < this->capacity_; ++i) for (size_t i = 0; i < capacity_; ++i)
{ {
this->buffer_[i].~Command(); buffer_[i].~Command();
} }
} }
bool SharedCommandQueue::try_push(const Command& cmd) bool SharedCommandQueue::try_push(const Command& cmd)
{ {
size_t head = this->head_.load(std::memory_order_relaxed); size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_; size_t next_head = (head + 1) % capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire)) if (next_head == tail_.load(std::memory_order_acquire))
return false; // Очередь полна return false; // Очередь полна
Command& slot = this->buffer_[head]; Command& slot = buffer_[head];
slot.ready.store(0, std::memory_order_relaxed); slot.ready.store(0, std::memory_order_relaxed);
slot.op = cmd.op; slot.op = cmd.op;
slot.key = cmd.key; slot.key = cmd.key;
@ -37,25 +37,25 @@ namespace usub::core
std::memcpy(slot.value, cmd.value, cmd.value_size); std::memcpy(slot.value, cmd.value, cmd.value_size);
slot.response_size = 0; slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed); slot.response_ready.store(0, std::memory_order_relaxed);
this->head_.store(next_head, std::memory_order_release); head_.store(next_head, std::memory_order_release);
return true; return true;
} }
bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count) bool SharedCommandQueue::try_push_batch(const Command* cmds, size_t count)
{ {
size_t head = this->head_.load(std::memory_order_relaxed); size_t head = head_.load(std::memory_order_relaxed);
size_t tail = this->tail_.load(std::memory_order_acquire); size_t tail = tail_.load(std::memory_order_acquire);
size_t free_slots = (tail + this->capacity_ - head) % this->capacity_; size_t free_slots = (tail + capacity_ - head) % capacity_;
if (free_slots < count) if (free_slots < count)
return false; return false;
for (size_t i = 0; i < count; ++i) for (size_t i = 0; i < count; ++i)
{ {
Command& slot = this->buffer_[(head + i) % this->capacity_]; Command& slot = buffer_[(head + i) % capacity_];
slot = cmds[i]; slot = cmds[i];
} }
this->head_.store((head + count) % this->capacity_, std::memory_order_release); head_.store((head + count) % capacity_, std::memory_order_release);
return true; return true;
} }
@ -66,32 +66,32 @@ namespace usub::core
std::optional<Command> SharedCommandQueue::try_pop() std::optional<Command> SharedCommandQueue::try_pop()
{ {
size_t tail = this->tail_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_relaxed);
if (tail == this->head_.load(std::memory_order_acquire)) if (tail == head_.load(std::memory_order_acquire))
{ {
return std::nullopt; return std::nullopt;
} }
Command& slot = this->buffer_[tail]; Command& slot = buffer_[tail];
Command cmd = slot; Command cmd = slot;
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); tail_.store((tail + 1) % capacity_, std::memory_order_release);
return cmd; return cmd;
} }
size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count) size_t SharedCommandQueue::try_pop_batch(Command* out, size_t max_count)
{ {
size_t tail = this->tail_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire); size_t head = head_.load(std::memory_order_acquire);
size_t available = (head + this->capacity_ - tail) % this->capacity_; size_t available = (head + capacity_ - tail) % capacity_;
size_t to_pop = (available < max_count) ? available : max_count; size_t to_pop = (available < max_count) ? available : max_count;
size_t copied = 0; size_t copied = 0;
for (size_t i = 0; i < to_pop; ++i) for (size_t i = 0; i < to_pop; ++i)
{ {
Command& src = this->buffer_[(tail + i) % this->capacity_]; Command& src = buffer_[(tail + i) % capacity_];
if (src.ready.load(std::memory_order_acquire) == 0) if (src.ready.load(std::memory_order_acquire) == 0)
break; break;
@ -99,97 +99,31 @@ namespace usub::core
out[copied++] = src; out[copied++] = src;
} }
this->tail_.store((tail + copied) % this->capacity_, std::memory_order_release); tail_.store((tail + copied) % capacity_, std::memory_order_release);
return copied; return copied;
} }
Command* SharedCommandQueue::peek(size_t index) Command* SharedCommandQueue::peek(size_t index)
{ {
size_t tail = this->tail_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_relaxed);
size_t head = this->head_.load(std::memory_order_acquire); size_t head = head_.load(std::memory_order_acquire);
if (tail == head) if (tail == head)
return nullptr; return nullptr;
return &this->buffer_[tail % this->capacity_]; return &buffer_[tail % capacity_];
} }
void SharedCommandQueue::pop() void SharedCommandQueue::pop()
{ {
size_t tail = this->tail_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_relaxed);
this->tail_.store((tail + 1) % this->capacity_, std::memory_order_release); tail_.store((tail + 1) % capacity_, std::memory_order_release);
} }
size_t SharedCommandQueue::pending_count() const size_t SharedCommandQueue::pending_count() const
{ {
size_t head = this->head_.load(std::memory_order_acquire); size_t head = head_.load(std::memory_order_acquire);
size_t tail = this->tail_.load(std::memory_order_relaxed); size_t tail = tail_.load(std::memory_order_relaxed);
return (head + this->capacity_ - tail) % this->capacity_; return (head + capacity_ - tail) % capacity_;
}
Command* SharedCommandQueue::raw_buffer() noexcept
{
return this->buffer_;
}
size_t SharedCommandQueue::capacity() const noexcept
{
return this->capacity_;
}
std::atomic<size_t>& SharedCommandQueue::head() noexcept
{
return this->head_;
}
std::atomic<size_t>& SharedCommandQueue::tail() noexcept
{
return this->tail_;
}
bool SharedCommandQueue::enqueue_put(const usub::utils::Hash128& key, const std::string& value)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % this->capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = this->buffer_[head];
slot.op = OperationType::PUT;
slot.key = key;
slot.value_size = static_cast<uint32_t>(value.size());
std::memcpy(slot.value, value.data(), value.size());
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
this->head_.store(next_head, std::memory_order_release);
return true;
}
bool SharedCommandQueue::enqueue_find(const utils::Hash128& key)
{
size_t head = this->head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == this->tail_.load(std::memory_order_acquire))
return false;
Command& slot = buffer_[head];
slot.op = OperationType::FIND;
slot.key = key;
slot.value_size = 0;
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
this->head_.store(next_head, std::memory_order_release);
return true;
}
bool SharedCommandQueue::await_response(Command& cmd)
{
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
} }
} }

View File

@ -37,19 +37,56 @@ namespace usub::core
size_t pending_count() const; size_t pending_count() const;
Command* raw_buffer() noexcept; Command* raw_buffer() noexcept { return buffer_; }
size_t capacity() const noexcept { return capacity_; }
std::atomic<size_t>& head() noexcept { return head_; }
std::atomic<size_t>& tail() noexcept { return tail_; }
size_t capacity() const noexcept; bool enqueue_put(const usub::utils::Hash128& key, const std::string& value)
{
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == tail_.load(std::memory_order_acquire))
return false;
std::atomic<size_t>& head() noexcept; Command& slot = buffer_[head];
slot.op = OperationType::PUT;
slot.key = key;
slot.value_size = static_cast<uint32_t>(value.size());
std::memcpy(slot.value, value.data(), value.size());
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
std::atomic<size_t>& tail() noexcept; head_.store(next_head, std::memory_order_release);
return true;
}
bool enqueue_put(const usub::utils::Hash128& key, const std::string& value); bool enqueue_find(const usub::utils::Hash128& key)
{
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % capacity_;
if (next_head == tail_.load(std::memory_order_acquire))
return false;
bool enqueue_find(const usub::utils::Hash128& key); Command& slot = buffer_[head];
slot.op = OperationType::FIND;
slot.key = key;
slot.value_size = 0;
slot.response_size = 0;
slot.response_ready.store(0, std::memory_order_relaxed);
slot.ready.store(1, std::memory_order_release);
bool await_response(Command& cmd); head_.store(next_head, std::memory_order_release);
return true;
}
bool await_response(Command& cmd)
{
while (cmd.response_ready.load(std::memory_order_acquire) == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return cmd.response_size != 0;
}
private: private:
size_t capacity_; size_t capacity_;

View File

@ -41,7 +41,7 @@ namespace usub::core
void UDB::recover_from_logs() void UDB::recover_from_logs()
{ {
utils::RecoveryLog recovery_log(this->db_name_); utils::RecoveryLog recovery_log(db_name_);
recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone) recovery_log.replay([this](const utils::Hash128& key, const std::string& value, bool is_tombstone)
{ {

View File

@ -9,7 +9,6 @@
using namespace usub::utils; using namespace usub::utils;
#if 0
#include "utils/io/VersionManager.h" #include "utils/io/VersionManager.h"
void test_skiplist_basic() void test_skiplist_basic()
@ -77,10 +76,20 @@ void test_sstable_write_read()
std::cout << "SSTable write/read test passed.\n"; std::cout << "SSTable write/read test passed.\n";
} }
#endif
#include "utils/datastructures/LFCircullarBuffer.h"
#include "core/SharedMemoryManager.h"
#include "core/SharedCommandQueue.h"
#include "utils/datastructures/LFCircullarBuffer.h"
#include "core/Command.h"
#include "utils/hash/Hash128.h"
#include <unordered_map>
#include "utils/string/basic_utils.h"
#include "core/DatabaseManager.h" #include "core/DatabaseManager.h"
int main() int main()
{ {
#if 0 #if 0

View File

@ -66,13 +66,13 @@ namespace usub::utils
explicit LockFreeRingBuffer(size_t cap = 32) explicit LockFreeRingBuffer(size_t cap = 32)
: capacity(cap), mask(cap - 1), buffer(new Cell[cap]) : capacity(cap), mask(cap - 1), buffer(new Cell[cap])
{ {
if ((this->capacity & (this->capacity - 1)) != 0) if ((capacity & (capacity - 1)) != 0)
{ {
throw std::invalid_argument("Capacity must be a power of 2"); throw std::invalid_argument("Capacity must be a power of 2");
} }
for (size_t i = 0; i < this->capacity; ++i) for (size_t i = 0; i < capacity; ++i)
{ {
this->buffer[i].sequence.store(i, std::memory_order_relaxed); buffer[i].sequence.store(i, std::memory_order_relaxed);
} }
} }
@ -81,7 +81,7 @@ namespace usub::utils
while (pop()) while (pop())
{ {
} }
delete[] this->buffer; delete[] buffer;
} }
template <typename U> template <typename U>
@ -188,7 +188,7 @@ namespace usub::utils
return (h - t) & mask; return (h - t) & mask;
} }
[[nodiscard]] size_t get_capacity() const noexcept { return this->capacity; } [[nodiscard]] size_t get_capacity() const noexcept { return capacity; }
}; };
} }

View File

@ -31,7 +31,6 @@ namespace usub::utils
template <typename Key, typename Value> template <typename Key, typename Value>
class LFSkipList class LFSkipList
{ {
private:
struct Node struct Node
{ {
Key key; Key key;
@ -46,7 +45,7 @@ namespace usub::utils
: key(k), value(v), topLevel(level), is_tombstone(tombstone), version(ver) : key(k), value(v), topLevel(level), is_tombstone(tombstone), version(ver)
{ {
for (int i = 0; i < MAX_LEVEL; ++i) for (int i = 0; i < MAX_LEVEL; ++i)
this->next[i].store(nullptr, std::memory_order_relaxed); next[i].store(nullptr, std::memory_order_relaxed);
} }
}; };
@ -60,13 +59,12 @@ namespace usub::utils
LFSkipList(usub::utils::VersionManager& vm) LFSkipList(usub::utils::VersionManager& vm)
: version_manager(vm) : version_manager(vm)
{ {
this->head = new Node(std::numeric_limits<Key>::min(), Value{}, MAX_LEVEL, false, head = new Node(std::numeric_limits<Key>::min(), Value{}, MAX_LEVEL, false, version_manager.next_version());
this->version_manager.next_version());
} }
~LFSkipList() ~LFSkipList()
{ {
Node* curr = this->head; Node* curr = head;
while (curr) while (curr)
{ {
Node* next = next_node(curr->next[0].load(std::memory_order_relaxed)); Node* next = next_node(curr->next[0].load(std::memory_order_relaxed));
@ -88,7 +86,7 @@ namespace usub::utils
std::optional<Value> find(const Key& key) const std::optional<Value> find(const Key& key) const
{ {
Node* best = nullptr; Node* best = nullptr;
Node* node = this->head->next[0].load(std::memory_order_acquire); Node* node = head->next[0].load(std::memory_order_acquire);
while (node) while (node)
{ {
@ -111,7 +109,7 @@ namespace usub::utils
template <typename F> template <typename F>
void for_each(F&& func) const void for_each(F&& func) const
{ {
Node* node = this->head->next[0].load(std::memory_order_acquire); Node* node = head->next[0].load(std::memory_order_acquire);
while (node) while (node)
{ {
prefetch_for_read(node); prefetch_for_read(node);
@ -126,7 +124,7 @@ namespace usub::utils
template <typename F> template <typename F>
void for_each_raw(F&& func) const void for_each_raw(F&& func) const
{ {
Node* node = this->head->next[0].load(std::memory_order_acquire); Node* node = head->next[0].load(std::memory_order_acquire);
while (node) while (node)
{ {
prefetch_for_read(node); prefetch_for_read(node);
@ -180,7 +178,7 @@ namespace usub::utils
[[nodiscard]] size_t unsafe_size() const [[nodiscard]] size_t unsafe_size() const
{ {
size_t count = 0; size_t count = 0;
Node* node = this->head->next[0].load(std::memory_order_relaxed); Node* node = head->next[0].load(std::memory_order_relaxed);
while (node) while (node)
{ {
if (!node->marked.load(std::memory_order_relaxed) && !node->is_tombstone) if (!node->marked.load(std::memory_order_relaxed) && !node->is_tombstone)
@ -201,7 +199,7 @@ namespace usub::utils
bool found = find_internal(key, preds, succs); bool found = find_internal(key, preds, succs);
int topLevel = random_level(); int topLevel = random_level();
Node* newNode = new Node(key, value, topLevel, tombstone, this->version_manager.next_version()); Node* newNode = new Node(key, value, topLevel, tombstone, version_manager.next_version());
for (int i = 0; i < topLevel; ++i) for (int i = 0; i < topLevel; ++i)
newNode->next[i].store(succs[i], std::memory_order_relaxed); newNode->next[i].store(succs[i], std::memory_order_relaxed);
@ -233,7 +231,7 @@ namespace usub::utils
bool find_internal(const Key& key, Node** preds, Node** succs) const bool find_internal(const Key& key, Node** preds, Node** succs) const
{ {
bool found = false; bool found = false;
Node* pred = this->head; Node* pred = head;
for (int level = MAX_LEVEL - 1; level >= 0; --level) for (int level = MAX_LEVEL - 1; level >= 0; --level)
{ {

View File

@ -15,33 +15,33 @@ namespace usub::utils
Compactor::~Compactor() Compactor::~Compactor()
{ {
this->running_ = false; running_ = false;
if (this->worker_thread_.joinable()) if (worker_thread_.joinable())
this->worker_thread_.join(); worker_thread_.join();
} }
void Compactor::add_sstable_l0(const std::string& filename) void Compactor::add_sstable_l0(const std::string& filename)
{ {
this->l0_queue_.push(filename); l0_queue_.push(filename);
} }
void Compactor::run() void Compactor::run()
{ {
this->worker_thread_ = std::thread(&Compactor::background_worker, this); worker_thread_ = std::thread(&Compactor::background_worker, this);
} }
void Compactor::background_worker() void Compactor::background_worker()
{ {
while (this->running_) while (running_)
{ {
for (size_t i = 0; i < 8; ++i) for (size_t i = 0; i < 8; ++i)
{ {
auto file = this->l0_queue_.pop(); auto file = l0_queue_.pop();
if (file) if (file)
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_); std::lock_guard<std::mutex> lock(levels_mutex_);
this->level0_files_.push_back(*file); level0_files_.push_back(*file);
this->level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик level0_size_.fetch_add(1, std::memory_order_relaxed); // увеличиваем счётчик
} }
else else
{ {
@ -49,16 +49,16 @@ namespace usub::utils
} }
} }
if (this->level0_size_.load(std::memory_order_relaxed) >= 4) if (level0_size_.load(std::memory_order_relaxed) >= 4)
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_); std::lock_guard<std::mutex> lock(levels_mutex_);
compact_level(this->level0_files_, this->level1_files_, 0); compact_level(level0_files_, level1_files_, 0);
} }
if (level1_size_.load(std::memory_order_relaxed) >= 4) if (level1_size_.load(std::memory_order_relaxed) >= 4)
{ {
std::lock_guard<std::mutex> lock(this->levels_mutex_); std::lock_guard<std::mutex> lock(levels_mutex_);
compact_level(this->level1_files_, this->level2_files_, 1); compact_level(level1_files_, level2_files_, 1);
} }
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(50));
@ -87,7 +87,7 @@ namespace usub::utils
std::filesystem::remove(file); std::filesystem::remove(file);
} }
LFSkipList<Hash128, std::string> merged(this->version_manager_); LFSkipList<Hash128, std::string> merged(version_manager_);
for (auto& table : loaded) for (auto& table : loaded)
{ {
@ -101,7 +101,7 @@ namespace usub::utils
} }
std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" + std::string new_filename = "sstable_l" + std::to_string(level + 1) + "_" +
std::to_string(this->version_manager_.next_version()) + ".dat"; std::to_string(version_manager_.next_version()) + ".dat";
write_sstable_with_index(merged, new_filename); write_sstable_with_index(merged, new_filename);
dest_files.push_back(new_filename); dest_files.push_back(new_filename);
@ -110,13 +110,13 @@ namespace usub::utils
if (level == 0) if (level == 0)
{ {
this->level0_size_.fetch_sub(batch_size, std::memory_order_relaxed); level0_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level1_size_.fetch_add(1, std::memory_order_relaxed); level1_size_.fetch_add(1, std::memory_order_relaxed);
} }
else if (level == 1) else if (level == 1)
{ {
this->level1_size_.fetch_sub(batch_size, std::memory_order_relaxed); level1_size_.fetch_sub(batch_size, std::memory_order_relaxed);
this->level2_size_.fetch_add(1, std::memory_order_relaxed); level2_size_.fetch_add(1, std::memory_order_relaxed);
} }
} }
} // utils } // utils

View File

@ -8,8 +8,8 @@ namespace usub::utils
{ {
VersionManager::VersionManager(const std::string& dbname) VersionManager::VersionManager(const std::string& dbname)
: db_name(dbname), : db_name(dbname),
metadata_dir("metadata/" + this->db_name + "/"), metadata_dir("metadata/" + db_name + "/"),
version_file(this->metadata_dir + "version.meta") version_file(metadata_dir + "version.meta")
{ {
ensure_metadata_dir(); ensure_metadata_dir();
load_version(); load_version();