215 lines
6.5 KiB
C++
215 lines
6.5 KiB
C++
#include <cassert>
|
|
#include <iostream>
|
|
#include <map>
|
|
#include <thread>
|
|
#include <random>
|
|
#include <chrono>
|
|
#include <atomic>
|
|
#include "UnorderedParallelMap.h"
|
|
|
|
constexpr int THREADS = 8;
|
|
constexpr int OPS_PER_THREAD = 10000;
|
|
|
|
constexpr int BATCH_THREADS = 4;
|
|
constexpr int BATCH_SIZE = 1024;
|
|
|
|
std::atomic<uint64_t> insert_ns{0};
|
|
std::atomic<uint64_t> find_ns{0};
|
|
std::atomic<uint64_t> update_ns{0};
|
|
std::atomic<uint64_t> erase_ns{0};
|
|
|
|
inline uint64_t now_ns() {
|
|
return std::chrono::duration_cast<std::chrono::nanoseconds>(
|
|
std::chrono::steady_clock::now().time_since_epoch()).count();
|
|
}
|
|
|
|
void worker(LockFreeMap<int, int>& map, int thread_id)
|
|
{
|
|
std::mt19937 rng(thread_id);
|
|
std::uniform_int_distribution<int> dist(0, 99999);
|
|
|
|
for (int i = 0; i < OPS_PER_THREAD; ++i)
|
|
{
|
|
int op = dist(rng) % 4;
|
|
int key = dist(rng);
|
|
int value = thread_id * 1000 + i;
|
|
|
|
uint64_t start = now_ns();
|
|
|
|
switch (op)
|
|
{
|
|
case 0:
|
|
map.insert(key, value);
|
|
insert_ns += (now_ns() - start);
|
|
break;
|
|
case 1:
|
|
map.find(key);
|
|
find_ns += (now_ns() - start);
|
|
break;
|
|
case 2:
|
|
map.update(key, value + 1);
|
|
update_ns += (now_ns() - start);
|
|
break;
|
|
case 3:
|
|
map.erase(key);
|
|
erase_ns += (now_ns() - start);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void batch_worker_insert(LockFreeMap<int, int>& map, int base_key) {
|
|
std::vector<int> keys(BATCH_SIZE);
|
|
std::vector<int> values(BATCH_SIZE);
|
|
|
|
for (int i = 0; i < BATCH_SIZE; ++i) {
|
|
keys[i] = base_key + i;
|
|
values[i] = i;
|
|
}
|
|
|
|
map.batch_insert(keys.data(), values.data(), BATCH_SIZE);
|
|
}
|
|
|
|
void batch_worker_update(LockFreeMap<int, int>& map, int base_key) {
|
|
std::vector<int> keys(BATCH_SIZE);
|
|
std::vector<int> values(BATCH_SIZE);
|
|
|
|
for (int i = 0; i < BATCH_SIZE; ++i) {
|
|
keys[i] = base_key + i;
|
|
values[i] = 10000 + i;
|
|
}
|
|
|
|
map.batch_insert_or_update(keys.data(), values.data(), BATCH_SIZE);
|
|
}
|
|
|
|
void batch_worker_find(const LockFreeMap<int, int>& map, int base_key) {
|
|
std::vector<int> keys(BATCH_SIZE);
|
|
for (int i = 0; i < BATCH_SIZE; ++i) {
|
|
keys[i] = base_key + i;
|
|
}
|
|
|
|
auto results = map.batch_find(keys.data(), BATCH_SIZE);
|
|
for (size_t i = 0; i < results.size(); ++i) {
|
|
assert(results[i].has_value());
|
|
}
|
|
}
|
|
|
|
void batch_worker_erase(LockFreeMap<int, int>& map, int base_key) {
|
|
std::vector<int> keys(BATCH_SIZE);
|
|
for (int i = 0; i < BATCH_SIZE; ++i) {
|
|
keys[i] = base_key + i;
|
|
}
|
|
|
|
map.batch_erase(keys.data(), BATCH_SIZE);
|
|
}
|
|
|
|
void test_batch_operations_parallel() {
|
|
LockFreeMap<int, int> map;
|
|
std::vector<std::thread> threads;
|
|
|
|
auto benchmark = [&](const std::string& label, auto&& fn) {
|
|
uint64_t start = now_ns();
|
|
fn();
|
|
uint64_t end = now_ns();
|
|
double ms = (end - start) / 1e6;
|
|
double ops = BATCH_THREADS * BATCH_SIZE;
|
|
double throughput = ops / (ms / 1000.0);
|
|
|
|
std::cout << "✓ " << label << " passed in " << ms << " ms, throughput = "
|
|
<< static_cast<size_t>(throughput) << " ops/sec\n";
|
|
};
|
|
|
|
benchmark("batch_insert (parallel)", [&]() {
|
|
for (int i = 0; i < BATCH_THREADS; ++i)
|
|
threads.emplace_back(batch_worker_insert, std::ref(map), i * BATCH_SIZE);
|
|
for (auto& t : threads) t.join();
|
|
threads.clear();
|
|
});
|
|
|
|
benchmark("batch_insert_or_update (parallel)", [&]() {
|
|
for (int i = 0; i < BATCH_THREADS; ++i)
|
|
threads.emplace_back(batch_worker_update, std::ref(map), i * BATCH_SIZE);
|
|
for (auto& t : threads) t.join();
|
|
threads.clear();
|
|
});
|
|
|
|
benchmark("batch_find (parallel)", [&]() {
|
|
for (int i = 0; i < BATCH_THREADS; ++i)
|
|
threads.emplace_back(batch_worker_find, std::cref(map), i * BATCH_SIZE);
|
|
for (auto& t : threads) t.join();
|
|
threads.clear();
|
|
});
|
|
|
|
benchmark("batch_erase (parallel)", [&]() {
|
|
for (int i = 0; i < BATCH_THREADS; ++i)
|
|
threads.emplace_back(batch_worker_erase, std::ref(map), i * BATCH_SIZE);
|
|
for (auto& t : threads) t.join();
|
|
threads.clear();
|
|
});
|
|
}
|
|
|
|
void verify_map(LockFreeMap<int, int>& map)
|
|
{
|
|
std::cout << "Final size: " << map.size() << "\n";
|
|
|
|
auto s = map.size();
|
|
auto k = map.keys().size();
|
|
auto e = map.entries().size();
|
|
|
|
std::cout << "Verifying invariants...\n";
|
|
std::cout << " size() = " << s << "\n";
|
|
std::cout << " keys() = " << k << "\n";
|
|
std::cout << " entries() = " << e << "\n";
|
|
|
|
assert(e <= k);
|
|
assert(k <= s);
|
|
std::cout << "✓ Invariants OK\n\n";
|
|
}
|
|
|
|
int main()
|
|
{
|
|
LockFreeMap<int, int> simple_test_map;
|
|
simple_test_map.insert(1, 1);
|
|
simple_test_map.insert(2, 44);
|
|
assert(simple_test_map.size() == 2);
|
|
assert(simple_test_map.find(2).value_or(-1) == 44);
|
|
simple_test_map.update(2, 55);
|
|
assert(simple_test_map.find(2).value_or(-1) == 55);
|
|
std::cout << "✓ Single-thread test finished\n" << std::endl;
|
|
verify_map(simple_test_map);
|
|
|
|
LockFreeMap<int, int> map(1024);
|
|
std::vector<std::thread> threads;
|
|
threads.reserve(THREADS);
|
|
|
|
auto start_total = std::chrono::steady_clock::now();
|
|
|
|
for (int i = 0; i < THREADS; ++i)
|
|
threads.emplace_back(worker, std::ref(map), i);
|
|
|
|
for (auto& t : threads)
|
|
t.join();
|
|
|
|
auto end_total = std::chrono::steady_clock::now();
|
|
auto total_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_total - start_total).count();
|
|
|
|
std::cout << "✓ Multi-thread test finished\n" << std::endl;
|
|
|
|
verify_map(map);
|
|
|
|
uint64_t total_ops = THREADS * OPS_PER_THREAD;
|
|
std::cout << "--- Benchmark Results ---\n";
|
|
std::cout << "Total ops: " << total_ops << "\n";
|
|
std::cout << "Total time: " << total_ns / 1e6 << " ms\n";
|
|
std::cout << "Insert avg: " << insert_ns / 1e3 << " μs total (" << (insert_ns / (double)total_ops) << " ns/op)\n";
|
|
std::cout << "Find avg: " << find_ns / 1e3 << " μs total (" << (find_ns / (double)total_ops) << " ns/op)\n";
|
|
std::cout << "Update avg: " << update_ns / 1e3 << " μs total (" << (update_ns / (double)total_ops) << " ns/op)\n";
|
|
std::cout << "Erase avg: " << erase_ns / 1e3 << " μs total (" << (erase_ns / (double)total_ops) << " ns/op)\n";
|
|
std::cout << "--------------------------\n";
|
|
|
|
std::cout << "=== Batch Operations: Parallel Test ===\n";
|
|
test_batch_operations_parallel();
|
|
std::cout << "=== Done ===\n";
|
|
return 0;
|
|
}
|