From 0ef960e4e5c53921fadd343a0287e46b024a4b96 Mon Sep 17 00:00:00 2001 From: g2px1 Date: Fri, 2 May 2025 12:30:49 +0300 Subject: [PATCH] parallel fixes and benchmark update --- CMakeLists.txt | 3 +- UnorderedParallelMap.h | 590 +++++++++++++++++++++-------------------- main.cpp | 43 ++- 3 files changed, 336 insertions(+), 300 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4560046..415d76a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,4 +6,5 @@ set(CMAKE_CXX_STANDARD 20) add_executable(parallelUnorderedMap main.cpp UnorderedParallelMap.h optimization.h - UnorderedParallelMap.cpp) + UnorderedParallelMap.cpp +) diff --git a/UnorderedParallelMap.h b/UnorderedParallelMap.h index 1bb6304..4bce081 100644 --- a/UnorderedParallelMap.h +++ b/UnorderedParallelMap.h @@ -1,365 +1,367 @@ #ifndef UNORDEREDPARALLELMAP_H #define UNORDEREDPARALLELMAP_H -#include -#include #include #include -#include +#include +#include +#include +#include #include "optimization.h" template class LockFreeMap { -private: - struct Bucket + static_assert(std::is_trivially_copyable_v); + static_assert(std::is_trivially_copyable_v); + + struct Entry { - std::atomic occupied{false}; - std::atomic deleted{false}; - std::atomic key; - std::atomic value; - std::atomic next{nullptr}; + std::atomic version{0}; // even: stable, odd: writing + bool occupied = false; + bool deleted = false; + K key{}; + V value{}; }; - struct Table - { - size_t capacity; - std::vector buckets; + std::vector buckets; + mutable std::shared_mutex resize_mutex; + std::atomic count{0}; + size_t capacity; + static constexpr float MAX_LOAD = 0.7; - explicit Table(size_t cap) : capacity(cap), buckets(cap) - { - } - }; - - std::shared_ptr table; - std::shared_ptr
new_table{nullptr}; - std::atomic size_counter{0}; - std::mutex resize_mutex; - static constexpr float MAX_LOAD_FACTOR = 0.75; - - size_t hash(const K& key, size_t capacity) const + size_t hash(const K& key) const { return std::hash{}(key) % capacity; } - void migrate_entry(Bucket& src, const std::shared_ptr
& dest) + size_t probe(size_t h, size_t i) const { - if (!src.occupied.load() || src.deleted.load()) return; + return (h + i) % capacity; + } - const K key = src.key.load(); - const V val = src.value.load(); - size_t idx = hash(key, dest->capacity); - Bucket& head = dest->buckets[idx]; + void resize() + { + size_t new_capacity = capacity * 2; + std::vector new_buckets(new_capacity); - if (!head.occupied.load()) + for (auto& e : buckets) { - bool expected = false; - if (head.occupied.compare_exchange_strong(expected, true)) + prefetch_for_read(&e); + cpu_relax(); + + if (e.occupied && !e.deleted) { - head.key.store(key); - head.value.store(val); - head.deleted.store(false); - return; + size_t h = std::hash{}(e.key) % new_capacity; + for (size_t i = 0; i < new_capacity; ++i) + { + size_t idx = (h + i) % new_capacity; + Entry& ne = new_buckets[idx]; + prefetch_for_write(&ne); + cpu_relax(); + + if (!ne.occupied) + { + ne.version.store(1); + ne.key = e.key; + ne.value = e.value; + ne.occupied = true; + ne.deleted = false; + ne.version.store(2); + break; + } + } } } - Bucket* current = &head; - while (true) - { - if (!current->deleted.load() && current->key.load() == key) - return; - - Bucket* next = current->next.load(); - if (next) - { - current = next; - } - else - { - Bucket* new_node = new Bucket; - new_node->occupied.store(true); - new_node->key.store(key); - new_node->value.store(val); - new_node->deleted.store(false); - if (current->next.compare_exchange_strong(next, new_node)) - return; - delete new_node; - } - } - } - - void start_resize(size_t new_capacity) - { - std::lock_guard lock(resize_mutex); - if (new_table) return; - - auto old_table = table; - auto next = std::make_shared
(new_capacity); - new_table = next; - - std::thread([this, old_table, next]() - { - for (auto& bucket : old_table->buckets) - { - Bucket* current = &bucket; - while (current) - { - migrate_entry(*current, next); - current = current->next.load(); - } - } - table = next; - new_table = nullptr; - - for (auto& bucket : old_table->buckets) - { - Bucket* current = bucket.next.load(); - while (current) - { - Bucket* next = current->next.load(); - delete current; - current = next; - } - } - }).detach(); - } - - std::shared_ptr
active_table() const - { - auto nt = new_table; - return nt ? nt : table; + buckets = std::move(new_buckets); + capacity = new_capacity; } public: - explicit LockFreeMap(size_t initial_capacity = 1024) + explicit LockFreeMap(size_t init_cap = 1024) + : buckets(init_cap), capacity(init_cap) { - table = std::make_shared
(initial_capacity); } - ~LockFreeMap() + bool insert(const K& key, const V& val) { - auto t = table; - for (auto& bucket : t->buckets) + std::unique_lock lock(resize_mutex); + + if ((float)(count.load() + 1) / capacity > MAX_LOAD) + resize(); + + size_t h = hash(key); + for (size_t i = 0; i < capacity; ++i) { - Bucket* current = bucket.next.load(); - while (current) + size_t idx = probe(h, i); + Entry& e = buckets[idx]; + + prefetch_for_write(&e); + cpu_relax(); + + if (!e.occupied || (e.deleted && e.key == key)) { - Bucket* next = current->next.load(); - delete current; - current = next; - } - } - } + uint64_t v = e.version.load(); + if (v % 2 != 0) continue; - bool insert(const K& key, const V& value) - { - if ((float)(size_counter.load() + 1) / table->capacity > MAX_LOAD_FACTOR) - start_resize(table->capacity * 2); - - auto t = active_table(); - size_t idx = hash(key, t->capacity); - Bucket& head = t->buckets[idx]; - prefetch_for_read(&head); - - if (!head.occupied.load()) - { - bool expected = false; - if (head.occupied.compare_exchange_strong(expected, true)) - { - head.key.store(key); - head.value.store(value); - head.deleted.store(false); - size_counter.fetch_add(1); - return true; - } - } - - Bucket* current = &head; - while (true) - { - if (!current->deleted.load() && current->key.load() == key) - return false; - - Bucket* next = current->next.load(); - if (next) - { - current = next; - cpu_relax(); - } - else - { - Bucket* new_node = new Bucket; - new_node->occupied.store(true); - new_node->key.store(key); - new_node->value.store(value); - new_node->deleted.store(false); - if (current->next.compare_exchange_strong(next, new_node)) + if (e.version.compare_exchange_strong(v, v + 1)) { - size_counter.fetch_add(1); + e.key = key; + e.value = val; + e.occupied = true; + e.deleted = false; + e.version.store(v + 2); + count.fetch_add(1); return true; } - delete new_node; - cpu_relax(); + --i; + } + else if (!e.deleted && e.key == key) + { + return false; } } + return false; } - std::optional find(const K& key) + std::optional find(const K& key) const { - auto t = active_table(); - size_t idx = hash(key, t->capacity); - Bucket* current = &t->buckets[idx]; - - while (current) + size_t h = hash(key); + for (size_t i = 0; i < capacity; ++i) { - if (current->occupied.load() && - !current->deleted.load() && - current->key.load() == key) - { - return current->value.load(); - } - current = current->next.load(); - cpu_relax(); - } + size_t idx = probe(h, i); + const Entry& e = buckets[idx]; + prefetch_for_read(&e); + cpu_relax(); + + uint64_t v1 = e.version.load(std::memory_order_acquire); + if (v1 % 2 != 0) continue; + + if (e.occupied && !e.deleted && e.key == key) + { + V val = e.value; + cpu_relax(); + uint64_t v2 = e.version.load(std::memory_order_acquire); + if (v1 == v2 && v2 % 2 == 0) + return val; + } + } return std::nullopt; } bool erase(const K& key) { - auto t = active_table(); - size_t idx = hash(key, t->capacity); - Bucket* current = &t->buckets[idx]; + std::unique_lock lock(resize_mutex); - while (current) + size_t h = hash(key); + for (size_t i = 0; i < capacity; ++i) { - if (current->occupied.load() && - !current->deleted.load() && - current->key.load() == key) - { - current->deleted.store(true); - size_counter.fetch_sub(1); - return true; - } - current = current->next.load(); - cpu_relax(); - } + size_t idx = probe(h, i); + Entry& e = buckets[idx]; + prefetch_for_write(&e); + cpu_relax(); + + if (e.occupied && !e.deleted && e.key == key) + { + uint64_t v = e.version.load(); + if (v % 2 != 0) continue; + + if (e.version.compare_exchange_strong(v, v + 1)) + { + e.deleted = true; + e.version.store(v + 2); + count.fetch_sub(1); + return true; + } + --i; + } + } return false; } - bool update(const K& key, const V& new_value) + bool update(const K& key, const V& new_val) { - auto t = active_table(); - size_t idx = hash(key, t->capacity); - Bucket* current = &t->buckets[idx]; - - while (current) + size_t h = hash(key); + for (size_t i = 0; i < capacity; ++i) { - if (current->occupied.load() && - !current->deleted.load() && - current->key.load() == key) - { - current->value.store(new_value); - return true; - } - current = current->next.load(); - cpu_relax(); - } + size_t idx = probe(h, i); + Entry& e = buckets[idx]; + prefetch_for_write(&e); + cpu_relax(); + + uint64_t v = e.version.load(); + if (v % 2 != 0) continue; + + if (e.occupied && !e.deleted && e.key == key) + { + if (e.version.compare_exchange_strong(v, v + 1)) + { + e.value = new_val; + e.version.store(v + 2); + return true; + } + --i; + } + } return false; } - void shrink() + std::vector keys() const { - size_t current_size = size_counter.load(); - auto cap = table->capacity; - if (current_size < cap / 4 && cap > 1024) - start_resize(cap / 2); + std::vector result; + for (const auto& e : buckets) + { + prefetch_for_read(&e); + cpu_relax(); + + uint64_t v1 = e.version.load(std::memory_order_acquire); + if (v1 % 2 != 0 || !e.occupied || e.deleted) continue; + + K key = e.key; + cpu_relax(); + uint64_t v2 = e.version.load(std::memory_order_acquire); + if (v1 == v2 && v2 % 2 == 0) + result.push_back(key); + } + return result; + } + + std::vector> entries() const + { + std::vector> result; + for (const auto& e : buckets) + { + prefetch_for_read(&e); + cpu_relax(); + + uint64_t v1 = e.version.load(std::memory_order_acquire); + if (v1 % 2 != 0 || !e.occupied || e.deleted) continue; + + K key = e.key; + V val = e.value; + cpu_relax(); + uint64_t v2 = e.version.load(std::memory_order_acquire); + if (v1 == v2 && v2 % 2 == 0) + result.emplace_back(key, val); + } + return result; + } + + void for_each(const std::function& cb) const + { + for (const auto& e : buckets) + { + prefetch_for_read(&e); + cpu_relax(); + + uint64_t v1 = e.version.load(std::memory_order_acquire); + if (v1 % 2 != 0 || !e.occupied || e.deleted) continue; + + K key = e.key; + V val = e.value; + cpu_relax(); + uint64_t v2 = e.version.load(std::memory_order_acquire); + if (v1 == v2 && v2 % 2 == 0) + cb(key, val); + } + } + + void for_each_mut(const std::function& cb) + { + for (auto& e : buckets) + { + prefetch_for_write(&e); + cpu_relax(); + + if (!e.occupied || e.deleted) continue; + + uint64_t v = e.version.load(); + if (v % 2 != 0) continue; + + if (e.version.compare_exchange_strong(v, v + 1)) + { + cb(e.key, e.value); + e.version.store(v + 2); + } + else + { + cpu_relax(); + } + } + } + + void clear() + { + std::unique_lock lock(resize_mutex); + + for (auto& e : buckets) + { + prefetch_for_write(&e); + cpu_relax(); + + uint64_t v = e.version.load(); + if (v % 2 != 0) continue; + + if (e.version.compare_exchange_strong(v, v + 1)) + { + e.occupied = false; + e.deleted = false; + e.version.store(v + 2); + } + } + count.store(0); + } + + void reserve(size_t desired_capacity) + { + std::unique_lock lock(resize_mutex); + if (desired_capacity <= capacity) return; + + size_t new_capacity = 1; + while (new_capacity < desired_capacity) new_capacity <<= 1; + + std::vector new_buckets(new_capacity); + + for (auto& e : buckets) + { + prefetch_for_read(&e); + cpu_relax(); + + if (e.occupied && !e.deleted) + { + size_t h = std::hash{}(e.key) % new_capacity; + for (size_t i = 0; i < new_capacity; ++i) + { + size_t idx = (h + i) % new_capacity; + Entry& ne = new_buckets[idx]; + prefetch_for_write(&ne); + cpu_relax(); + + if (!ne.occupied) + { + ne.version.store(1); + ne.key = e.key; + ne.value = e.value; + ne.occupied = true; + ne.deleted = false; + ne.version.store(2); + break; + } + } + } + } + + buckets = std::move(new_buckets); + capacity = new_capacity; } size_t size() const { - return size_counter.load(); - } - - bool rehash_one(const K& key) - { - auto src = table; - auto dst = new_table; - if (!dst) return false; - - size_t idx = hash(key, src->capacity); - Bucket* current = &src->buckets[idx]; - - while (current) - { - if (current->occupied.load() && - !current->deleted.load() && - current->key.load() == key) - { - migrate_entry(*current, dst); - return true; - } - current = current->next.load(); - } - return false; - } - - std::vector keys() - { - std::vector result; - auto t = active_table(); - for (auto& bucket : t->buckets) - { - Bucket* current = &bucket; - while (current) - { - if (current->occupied.load() && - !current->deleted.load()) - { - result.push_back(current->key.load()); - } - current = current->next.load(); - } - } - return result; - } - - std::vector> entries() - { - std::vector> result; - auto t = active_table(); - for (auto& bucket : t->buckets) - { - Bucket* current = &bucket; - while (current) - { - if (current->occupied.load() && !current->deleted.load()) - { - result.emplace_back(current->key.load(), current->value.load()); - } - current = current->next.load(); - } - } - return result; - } - - void for_each(const std::function& cb) - { - auto t = active_table(); - for (auto& bucket : t->buckets) - { - Bucket* current = &bucket; - while (current) - { - if (current->occupied.load() && !current->deleted.load()) - { - cb(current->key.load(), current->value.load()); - } - current = current->next.load(); - } - } + return count.load(); } }; diff --git a/main.cpp b/main.cpp index f603a64..4ba2893 100644 --- a/main.cpp +++ b/main.cpp @@ -1,13 +1,25 @@ #include #include #include - -#include "UnorderedParallelMap.h" +#include #include +#include +#include +#include "UnorderedParallelMap.h" constexpr int THREADS = 8; constexpr int OPS_PER_THREAD = 10000; +std::atomic insert_ns{0}; +std::atomic find_ns{0}; +std::atomic update_ns{0}; +std::atomic erase_ns{0}; + +inline uint64_t now_ns() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); +} + void worker(LockFreeMap& map, int thread_id) { std::mt19937 rng(thread_id); @@ -19,19 +31,25 @@ void worker(LockFreeMap& map, int thread_id) int key = dist(rng); int value = thread_id * 1000 + i; + uint64_t start = now_ns(); + switch (op) { case 0: map.insert(key, value); + insert_ns += (now_ns() - start); break; case 1: map.find(key); + find_ns += (now_ns() - start); break; case 2: map.update(key, value + 1); + update_ns += (now_ns() - start); break; case 3: map.erase(key); + erase_ns += (now_ns() - start); break; } } @@ -61,25 +79,40 @@ int main() simple_test_map.insert(1, 1); simple_test_map.insert(2, 44); assert(simple_test_map.size() == 2); - assert(simple_test_map.find(2) == 44); + assert(simple_test_map.find(2).value_or(-1) == 44); simple_test_map.update(2, 55); - assert(simple_test_map.find(2) == 55); + assert(simple_test_map.find(2).value_or(-1) == 55); std::cout << "✓ Single-thread test finished\n" << std::endl; verify_map(simple_test_map); LockFreeMap map(1024); - std::vector threads; threads.reserve(THREADS); + + auto start_total = std::chrono::steady_clock::now(); + for (int i = 0; i < THREADS; ++i) threads.emplace_back(worker, std::ref(map), i); for (auto& t : threads) t.join(); + auto end_total = std::chrono::steady_clock::now(); + auto total_ns = std::chrono::duration_cast(end_total - start_total).count(); + std::cout << "✓ Multi-thread test finished\n" << std::endl; verify_map(map); + uint64_t total_ops = THREADS * OPS_PER_THREAD; + std::cout << "--- Benchmark Results ---\n"; + std::cout << "Total ops: " << total_ops << "\n"; + std::cout << "Total time: " << total_ns / 1e6 << " ms\n"; + std::cout << "Insert avg: " << insert_ns / 1e3 << " μs total (" << (insert_ns / (double)total_ops) << " ns/op)\n"; + std::cout << "Find avg: " << find_ns / 1e3 << " μs total (" << (find_ns / (double)total_ops) << " ns/op)\n"; + std::cout << "Update avg: " << update_ns / 1e3 << " μs total (" << (update_ns / (double)total_ops) << " ns/op)\n"; + std::cout << "Erase avg: " << erase_ns / 1e3 << " μs total (" << (erase_ns / (double)total_ops) << " ns/op)\n"; + std::cout << "--------------------------\n"; + return 0; }