parallel fixes and benchmark update

This commit is contained in:
g2px1 2025-05-02 12:30:49 +03:00
parent 63aaf9a3de
commit 0ef960e4e5
3 changed files with 336 additions and 300 deletions

View File

@ -6,4 +6,5 @@ set(CMAKE_CXX_STANDARD 20)
add_executable(parallelUnorderedMap main.cpp
UnorderedParallelMap.h
optimization.h
UnorderedParallelMap.cpp)
UnorderedParallelMap.cpp
)

View File

@ -1,365 +1,367 @@
#ifndef UNORDEREDPARALLELMAP_H
#define UNORDEREDPARALLELMAP_H
#include <memory>
#include <atomic>
#include <vector>
#include <optional>
#include <thread>
#include <shared_mutex>
#include <atomic>
#include <functional>
#include <cstdint>
#include "optimization.h"
template <typename K, typename V>
class LockFreeMap
{
private:
struct Bucket
static_assert(std::is_trivially_copyable_v<K>);
static_assert(std::is_trivially_copyable_v<V>);
struct Entry
{
std::atomic<bool> occupied{false};
std::atomic<bool> deleted{false};
std::atomic<K> key;
std::atomic<V> value;
std::atomic<Bucket*> next{nullptr};
std::atomic<uint64_t> version{0}; // even: stable, odd: writing
bool occupied = false;
bool deleted = false;
K key{};
V value{};
};
struct Table
{
std::vector<Entry> buckets;
mutable std::shared_mutex resize_mutex;
std::atomic<size_t> count{0};
size_t capacity;
std::vector<Bucket> buckets;
static constexpr float MAX_LOAD = 0.7;
explicit Table(size_t cap) : capacity(cap), buckets(cap)
{
}
};
std::shared_ptr<Table> table;
std::shared_ptr<Table> new_table{nullptr};
std::atomic<size_t> size_counter{0};
std::mutex resize_mutex;
static constexpr float MAX_LOAD_FACTOR = 0.75;
size_t hash(const K& key, size_t capacity) const
size_t hash(const K& key) const
{
return std::hash<K>{}(key) % capacity;
}
void migrate_entry(Bucket& src, const std::shared_ptr<Table>& dest)
size_t probe(size_t h, size_t i) const
{
if (!src.occupied.load() || src.deleted.load()) return;
const K key = src.key.load();
const V val = src.value.load();
size_t idx = hash(key, dest->capacity);
Bucket& head = dest->buckets[idx];
if (!head.occupied.load())
{
bool expected = false;
if (head.occupied.compare_exchange_strong(expected, true))
{
head.key.store(key);
head.value.store(val);
head.deleted.store(false);
return;
}
return (h + i) % capacity;
}
Bucket* current = &head;
while (true)
void resize()
{
if (!current->deleted.load() && current->key.load() == key)
return;
size_t new_capacity = capacity * 2;
std::vector<Entry> new_buckets(new_capacity);
Bucket* next = current->next.load();
if (next)
for (auto& e : buckets)
{
current = next;
prefetch_for_read(&e);
cpu_relax();
if (e.occupied && !e.deleted)
{
size_t h = std::hash<K>{}(e.key) % new_capacity;
for (size_t i = 0; i < new_capacity; ++i)
{
size_t idx = (h + i) % new_capacity;
Entry& ne = new_buckets[idx];
prefetch_for_write(&ne);
cpu_relax();
if (!ne.occupied)
{
ne.version.store(1);
ne.key = e.key;
ne.value = e.value;
ne.occupied = true;
ne.deleted = false;
ne.version.store(2);
break;
}
else
{
Bucket* new_node = new Bucket;
new_node->occupied.store(true);
new_node->key.store(key);
new_node->value.store(val);
new_node->deleted.store(false);
if (current->next.compare_exchange_strong(next, new_node))
return;
delete new_node;
}
}
}
void start_resize(size_t new_capacity)
{
std::lock_guard<std::mutex> lock(resize_mutex);
if (new_table) return;
auto old_table = table;
auto next = std::make_shared<Table>(new_capacity);
new_table = next;
std::thread([this, old_table, next]()
{
for (auto& bucket : old_table->buckets)
{
Bucket* current = &bucket;
while (current)
{
migrate_entry(*current, next);
current = current->next.load();
}
}
table = next;
new_table = nullptr;
for (auto& bucket : old_table->buckets)
{
Bucket* current = bucket.next.load();
while (current)
{
Bucket* next = current->next.load();
delete current;
current = next;
}
}
}).detach();
}
std::shared_ptr<Table> active_table() const
{
auto nt = new_table;
return nt ? nt : table;
buckets = std::move(new_buckets);
capacity = new_capacity;
}
public:
explicit LockFreeMap(size_t initial_capacity = 1024)
explicit LockFreeMap(size_t init_cap = 1024)
: buckets(init_cap), capacity(init_cap)
{
table = std::make_shared<Table>(initial_capacity);
}
~LockFreeMap()
bool insert(const K& key, const V& val)
{
auto t = table;
for (auto& bucket : t->buckets)
{
Bucket* current = bucket.next.load();
while (current)
{
Bucket* next = current->next.load();
delete current;
current = next;
}
}
}
std::unique_lock lock(resize_mutex);
bool insert(const K& key, const V& value)
{
if ((float)(size_counter.load() + 1) / table->capacity > MAX_LOAD_FACTOR)
start_resize(table->capacity * 2);
if ((float)(count.load() + 1) / capacity > MAX_LOAD)
resize();
auto t = active_table();
size_t idx = hash(key, t->capacity);
Bucket& head = t->buckets[idx];
prefetch_for_read(&head);
size_t h = hash(key);
for (size_t i = 0; i < capacity; ++i)
{
size_t idx = probe(h, i);
Entry& e = buckets[idx];
if (!head.occupied.load())
prefetch_for_write(&e);
cpu_relax();
if (!e.occupied || (e.deleted && e.key == key))
{
bool expected = false;
if (head.occupied.compare_exchange_strong(expected, true))
uint64_t v = e.version.load();
if (v % 2 != 0) continue;
if (e.version.compare_exchange_strong(v, v + 1))
{
head.key.store(key);
head.value.store(value);
head.deleted.store(false);
size_counter.fetch_add(1);
e.key = key;
e.value = val;
e.occupied = true;
e.deleted = false;
e.version.store(v + 2);
count.fetch_add(1);
return true;
}
--i;
}
Bucket* current = &head;
while (true)
else if (!e.deleted && e.key == key)
{
if (!current->deleted.load() && current->key.load() == key)
return false;
}
}
return false;
}
Bucket* next = current->next.load();
if (next)
std::optional<V> find(const K& key) const
{
current = next;
size_t h = hash(key);
for (size_t i = 0; i < capacity; ++i)
{
size_t idx = probe(h, i);
const Entry& e = buckets[idx];
prefetch_for_read(&e);
cpu_relax();
}
else
uint64_t v1 = e.version.load(std::memory_order_acquire);
if (v1 % 2 != 0) continue;
if (e.occupied && !e.deleted && e.key == key)
{
Bucket* new_node = new Bucket;
new_node->occupied.store(true);
new_node->key.store(key);
new_node->value.store(value);
new_node->deleted.store(false);
if (current->next.compare_exchange_strong(next, new_node))
{
size_counter.fetch_add(1);
return true;
}
delete new_node;
V val = e.value;
cpu_relax();
uint64_t v2 = e.version.load(std::memory_order_acquire);
if (v1 == v2 && v2 % 2 == 0)
return val;
}
}
}
std::optional<V> find(const K& key)
{
auto t = active_table();
size_t idx = hash(key, t->capacity);
Bucket* current = &t->buckets[idx];
while (current)
{
if (current->occupied.load() &&
!current->deleted.load() &&
current->key.load() == key)
{
return current->value.load();
}
current = current->next.load();
cpu_relax();
}
return std::nullopt;
}
bool erase(const K& key)
{
auto t = active_table();
size_t idx = hash(key, t->capacity);
Bucket* current = &t->buckets[idx];
std::unique_lock lock(resize_mutex);
while (current)
size_t h = hash(key);
for (size_t i = 0; i < capacity; ++i)
{
if (current->occupied.load() &&
!current->deleted.load() &&
current->key.load() == key)
size_t idx = probe(h, i);
Entry& e = buckets[idx];
prefetch_for_write(&e);
cpu_relax();
if (e.occupied && !e.deleted && e.key == key)
{
current->deleted.store(true);
size_counter.fetch_sub(1);
uint64_t v = e.version.load();
if (v % 2 != 0) continue;
if (e.version.compare_exchange_strong(v, v + 1))
{
e.deleted = true;
e.version.store(v + 2);
count.fetch_sub(1);
return true;
}
current = current->next.load();
cpu_relax();
--i;
}
}
return false;
}
bool update(const K& key, const V& new_value)
bool update(const K& key, const V& new_val)
{
auto t = active_table();
size_t idx = hash(key, t->capacity);
Bucket* current = &t->buckets[idx];
size_t h = hash(key);
for (size_t i = 0; i < capacity; ++i)
{
size_t idx = probe(h, i);
Entry& e = buckets[idx];
while (current)
prefetch_for_write(&e);
cpu_relax();
uint64_t v = e.version.load();
if (v % 2 != 0) continue;
if (e.occupied && !e.deleted && e.key == key)
{
if (current->occupied.load() &&
!current->deleted.load() &&
current->key.load() == key)
if (e.version.compare_exchange_strong(v, v + 1))
{
current->value.store(new_value);
e.value = new_val;
e.version.store(v + 2);
return true;
}
current = current->next.load();
cpu_relax();
--i;
}
}
return false;
}
void shrink()
std::vector<K> keys() const
{
size_t current_size = size_counter.load();
auto cap = table->capacity;
if (current_size < cap / 4 && cap > 1024)
start_resize(cap / 2);
std::vector<K> result;
for (const auto& e : buckets)
{
prefetch_for_read(&e);
cpu_relax();
uint64_t v1 = e.version.load(std::memory_order_acquire);
if (v1 % 2 != 0 || !e.occupied || e.deleted) continue;
K key = e.key;
cpu_relax();
uint64_t v2 = e.version.load(std::memory_order_acquire);
if (v1 == v2 && v2 % 2 == 0)
result.push_back(key);
}
return result;
}
std::vector<std::pair<K, V>> entries() const
{
std::vector<std::pair<K, V>> result;
for (const auto& e : buckets)
{
prefetch_for_read(&e);
cpu_relax();
uint64_t v1 = e.version.load(std::memory_order_acquire);
if (v1 % 2 != 0 || !e.occupied || e.deleted) continue;
K key = e.key;
V val = e.value;
cpu_relax();
uint64_t v2 = e.version.load(std::memory_order_acquire);
if (v1 == v2 && v2 % 2 == 0)
result.emplace_back(key, val);
}
return result;
}
void for_each(const std::function<void(const K&, const V&)>& cb) const
{
for (const auto& e : buckets)
{
prefetch_for_read(&e);
cpu_relax();
uint64_t v1 = e.version.load(std::memory_order_acquire);
if (v1 % 2 != 0 || !e.occupied || e.deleted) continue;
K key = e.key;
V val = e.value;
cpu_relax();
uint64_t v2 = e.version.load(std::memory_order_acquire);
if (v1 == v2 && v2 % 2 == 0)
cb(key, val);
}
}
void for_each_mut(const std::function<void(const K&, V&)>& cb)
{
for (auto& e : buckets)
{
prefetch_for_write(&e);
cpu_relax();
if (!e.occupied || e.deleted) continue;
uint64_t v = e.version.load();
if (v % 2 != 0) continue;
if (e.version.compare_exchange_strong(v, v + 1))
{
cb(e.key, e.value);
e.version.store(v + 2);
}
else
{
cpu_relax();
}
}
}
void clear()
{
std::unique_lock lock(resize_mutex);
for (auto& e : buckets)
{
prefetch_for_write(&e);
cpu_relax();
uint64_t v = e.version.load();
if (v % 2 != 0) continue;
if (e.version.compare_exchange_strong(v, v + 1))
{
e.occupied = false;
e.deleted = false;
e.version.store(v + 2);
}
}
count.store(0);
}
void reserve(size_t desired_capacity)
{
std::unique_lock lock(resize_mutex);
if (desired_capacity <= capacity) return;
size_t new_capacity = 1;
while (new_capacity < desired_capacity) new_capacity <<= 1;
std::vector<Entry> new_buckets(new_capacity);
for (auto& e : buckets)
{
prefetch_for_read(&e);
cpu_relax();
if (e.occupied && !e.deleted)
{
size_t h = std::hash<K>{}(e.key) % new_capacity;
for (size_t i = 0; i < new_capacity; ++i)
{
size_t idx = (h + i) % new_capacity;
Entry& ne = new_buckets[idx];
prefetch_for_write(&ne);
cpu_relax();
if (!ne.occupied)
{
ne.version.store(1);
ne.key = e.key;
ne.value = e.value;
ne.occupied = true;
ne.deleted = false;
ne.version.store(2);
break;
}
}
}
}
buckets = std::move(new_buckets);
capacity = new_capacity;
}
size_t size() const
{
return size_counter.load();
}
bool rehash_one(const K& key)
{
auto src = table;
auto dst = new_table;
if (!dst) return false;
size_t idx = hash(key, src->capacity);
Bucket* current = &src->buckets[idx];
while (current)
{
if (current->occupied.load() &&
!current->deleted.load() &&
current->key.load() == key)
{
migrate_entry(*current, dst);
return true;
}
current = current->next.load();
}
return false;
}
std::vector<K> keys()
{
std::vector<K> result;
auto t = active_table();
for (auto& bucket : t->buckets)
{
Bucket* current = &bucket;
while (current)
{
if (current->occupied.load() &&
!current->deleted.load())
{
result.push_back(current->key.load());
}
current = current->next.load();
}
}
return result;
}
std::vector<std::pair<K, V>> entries()
{
std::vector<std::pair<K, V>> result;
auto t = active_table();
for (auto& bucket : t->buckets)
{
Bucket* current = &bucket;
while (current)
{
if (current->occupied.load() && !current->deleted.load())
{
result.emplace_back(current->key.load(), current->value.load());
}
current = current->next.load();
}
}
return result;
}
void for_each(const std::function<void(const K&, const V&)>& cb)
{
auto t = active_table();
for (auto& bucket : t->buckets)
{
Bucket* current = &bucket;
while (current)
{
if (current->occupied.load() && !current->deleted.load())
{
cb(current->key.load(), current->value.load());
}
current = current->next.load();
}
}
return count.load();
}
};

View File

@ -1,13 +1,25 @@
#include <cassert>
#include <iostream>
#include <map>
#include "UnorderedParallelMap.h"
#include <thread>
#include <random>
#include <chrono>
#include <atomic>
#include "UnorderedParallelMap.h"
constexpr int THREADS = 8;
constexpr int OPS_PER_THREAD = 10000;
std::atomic<uint64_t> insert_ns{0};
std::atomic<uint64_t> find_ns{0};
std::atomic<uint64_t> update_ns{0};
std::atomic<uint64_t> erase_ns{0};
inline uint64_t now_ns() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
void worker(LockFreeMap<int, int>& map, int thread_id)
{
std::mt19937 rng(thread_id);
@ -19,19 +31,25 @@ void worker(LockFreeMap<int, int>& map, int thread_id)
int key = dist(rng);
int value = thread_id * 1000 + i;
uint64_t start = now_ns();
switch (op)
{
case 0:
map.insert(key, value);
insert_ns += (now_ns() - start);
break;
case 1:
map.find(key);
find_ns += (now_ns() - start);
break;
case 2:
map.update(key, value + 1);
update_ns += (now_ns() - start);
break;
case 3:
map.erase(key);
erase_ns += (now_ns() - start);
break;
}
}
@ -61,25 +79,40 @@ int main()
simple_test_map.insert(1, 1);
simple_test_map.insert(2, 44);
assert(simple_test_map.size() == 2);
assert(simple_test_map.find(2) == 44);
assert(simple_test_map.find(2).value_or(-1) == 44);
simple_test_map.update(2, 55);
assert(simple_test_map.find(2) == 55);
assert(simple_test_map.find(2).value_or(-1) == 55);
std::cout << "✓ Single-thread test finished\n" << std::endl;
verify_map(simple_test_map);
LockFreeMap<int, int> map(1024);
std::vector<std::thread> threads;
threads.reserve(THREADS);
auto start_total = std::chrono::steady_clock::now();
for (int i = 0; i < THREADS; ++i)
threads.emplace_back(worker, std::ref(map), i);
for (auto& t : threads)
t.join();
auto end_total = std::chrono::steady_clock::now();
auto total_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_total - start_total).count();
std::cout << "✓ Multi-thread test finished\n" << std::endl;
verify_map(map);
uint64_t total_ops = THREADS * OPS_PER_THREAD;
std::cout << "--- Benchmark Results ---\n";
std::cout << "Total ops: " << total_ops << "\n";
std::cout << "Total time: " << total_ns / 1e6 << " ms\n";
std::cout << "Insert avg: " << insert_ns / 1e3 << " μs total (" << (insert_ns / (double)total_ops) << " ns/op)\n";
std::cout << "Find avg: " << find_ns / 1e3 << " μs total (" << (find_ns / (double)total_ops) << " ns/op)\n";
std::cout << "Update avg: " << update_ns / 1e3 << " μs total (" << (update_ns / (double)total_ops) << " ns/op)\n";
std::cout << "Erase avg: " << erase_ns / 1e3 << " μs total (" << (erase_ns / (double)total_ops) << " ns/op)\n";
std::cout << "--------------------------\n";
return 0;
}