Skip to content

Commit

Permalink
Convert pool allocator to streamstruct allocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ehpor committed Mar 7, 2025
1 parent 66a2bd2 commit 1a4b187
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 72 deletions.
12 changes: 7 additions & 5 deletions benchmarks/pool_allocator.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#include "PoolAllocator.h"
#include "Timing.h"
#include "StructStream.h"
#include <iostream>

void benchmark_linux_scalability()
{
const size_t N = 10000000;
const size_t CAPACITY = 2 * N;
const size_t CAPACITY = 2 * N;

char *buffer = new char[PoolAllocator::CalculateMetadataBufferSize(CAPACITY)];
char *buffer = new char[PoolAllocator::GetMemorySize(CAPACITY)];
auto stream = StructStream(buffer);

auto allocator = PoolAllocator::Create((PoolAllocator::SharedState *) buffer, CAPACITY);
auto allocator = PoolAllocator::Create(stream, CAPACITY);

auto *handles = new PoolAllocator::BlockHandle[N];

Expand All @@ -30,10 +32,10 @@ void benchmark_linux_scalability()
std::cout << "Linux Scalability:" << std::endl;
std::cout << "Time: " << (end - start) / 1e9 << " sec" << std::endl;
std::cout << "Throughput: " << 2 * N / ((end - start) / 1e9) << " ops/s" << std::endl;
std::cout << "Time per operation: " << (end - start) / (2 * N) << " ns" << std::endl;
std::cout << "Time per operation: " << (end - start) / (2 * N) << " ns" << std::endl;

delete[] handles;
delete[] buffer;
delete[] buffer;
}

int main(int argc, char **argv)
Expand Down
70 changes: 33 additions & 37 deletions catkit_core/PoolAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,66 @@

#include <algorithm>

const std::uint8_t VERSION[4] = {0, 0, 0, 0};

PoolAllocator::PoolAllocator(SharedState *shared_state, std::atomic<BlockHandle> *next)
: m_Header(shared_state->header),
m_Next(next),
m_Capacity(m_Header.capacity),
m_Head(m_Header.head),
ShareableImpl<ShareableType::PoolAllocator>(shared_state, shared_state->header.capacity * sizeof(std::atomic<BlockHandle>))
const std::array<std::uint8_t, 4> VERSION = {0, 0, 0, 0};

PoolAllocator::PoolAllocator(StructStream &stream, std::uint32_t capacity)
{
m_Version = stream.Extract<std::array<std::uint8_t, 4>>();
m_Capacity = stream.Extract<std::uint32_t>();
m_Head = stream.Extract<std::atomic<BlockHandle>>();
m_Next = stream.Extract<std::atomic<BlockHandle>>(capacity);
}

void PoolAllocator::GetMemoryLayout(SharedState *shared_state, std::atomic<BlockHandle> **next)
std::size_t PoolAllocator::GetMemorySize(std::uint32_t capacity)
{
*next = reinterpret_cast<std::atomic<BlockHandle> *>(reinterpret_cast<char *>(shared_state) + sizeof(Header));
auto stream = StructStream(nullptr);
PoolAllocator(stream, capacity);

return stream.GetOffset();
}

std::unique_ptr<PoolAllocator> PoolAllocator::Create(SharedState *shared_state, std::uint32_t capacity)
std::unique_ptr<PoolAllocator> PoolAllocator::Create(StructStream &stream, std::uint32_t capacity)
{
Header *header = &shared_state->header;

std::atomic<BlockHandle> *next;
GetMemoryLayout(shared_state, &next);
auto res = std::unique_ptr<PoolAllocator>(new PoolAllocator(stream, capacity));

// Set version and capacity.
std::copy(VERSION, VERSION + sizeof(VERSION), header->version);
header->capacity = capacity;
*res->m_Version = VERSION;
*res->m_Capacity = capacity;

// Initialize the linked list.
header->head.store(0, std::memory_order_relaxed);
std::construct_at(res->m_Head);
res->m_Head->store(0, std::memory_order_relaxed);

for (std::size_t i = 0; i < capacity; ++i)
{
std::construct_at(&res->m_Next[i]);

if (i == capacity - 1)
{
next[i] = INVALID_HANDLE;
res->m_Next[i] = INVALID_HANDLE;
}
else
{
next[i] = i + 1;
res->m_Next[i] = i + 1;
}
}

return std::unique_ptr<PoolAllocator>(new PoolAllocator(shared_state, next));
return res;
}

std::unique_ptr<PoolAllocator> PoolAllocator::Open(SharedState *shared_state)
std::unique_ptr<PoolAllocator> PoolAllocator::Open(StructStream &stream)
{
std::atomic<BlockHandle> *next;
GetMemoryLayout(shared_state, &next);
StructStream stream_copy = stream;
CheckVersion(stream_copy, VERSION);

return std::unique_ptr<PoolAllocator>(new PoolAllocator(shared_state, next));
}

std::size_t PoolAllocator::CalculateMetadataBufferSize(std::uint32_t capacity)
{
std::size_t size = sizeof(Header);
size += capacity * sizeof(std::atomic<BlockHandle>);
auto capacity = *stream_copy.Extract<std::uint32_t>();

return size;
return std::unique_ptr<PoolAllocator>(new PoolAllocator(stream, capacity));
}

PoolAllocator::BlockHandle PoolAllocator::Allocate()
{
BlockHandle head = m_Head.load(std::memory_order_relaxed);
BlockHandle head = m_Head->load(std::memory_order_relaxed);
BlockHandle next;

// Pop the first element from the linked list.
Expand All @@ -78,7 +74,7 @@ PoolAllocator::BlockHandle PoolAllocator::Allocate()
}

next = m_Next[head].load(std::memory_order_relaxed);
} while (!m_Head.compare_exchange_weak(head, next));
} while (!m_Head->compare_exchange_weak(head, next));

// Return the popped element.
return head;
Expand All @@ -87,16 +83,16 @@ PoolAllocator::BlockHandle PoolAllocator::Allocate()
void PoolAllocator::Deallocate(BlockHandle index)
{
// Check if the element is within the pool bounds.
if (index >= m_Capacity)
if (index >= *m_Capacity)
{
return;
}

BlockHandle head = m_Head.load(std::memory_order_relaxed);;
BlockHandle head = m_Head->load(std::memory_order_relaxed);;

// Push the element back on the front of the linked list.
do
{
m_Next[index] = head;
} while (!m_Head.compare_exchange_weak(head, index));
} while (!m_Head->compare_exchange_weak(head, index));
}
40 changes: 10 additions & 30 deletions catkit_core/PoolAllocator.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef POOL_ALLOCATOR_H
#define POOL_ALLOCATOR_H

#include "Shareable.h"
#include "Shareable.h"

#include <atomic>
#include <cstdint>
Expand All @@ -11,49 +11,29 @@
#include <array>

// A simple lock-free pool allocator.
class PoolAllocator : public ShareableImpl<ShareableType::PoolAllocator>
class PoolAllocator
{
public:
using BlockHandle = std::uint32_t;
static const BlockHandle INVALID_HANDLE = std::numeric_limits<BlockHandle>::max();

static std::size_t CalculateMetadataBufferSize(std::uint32_t capacity);
static std::size_t GetMemorySize(std::uint32_t capacity);

static std::unique_ptr<PoolAllocator> Create(SharedState *shared_state, std::uint32_t capacity);
static std::unique_ptr<PoolAllocator> Open(SharedState *shared_state);
static std::unique_ptr<PoolAllocator> Create(StructStream &stream, std::uint32_t capacity);
static std::unique_ptr<PoolAllocator> Open(StructStream &stream);

BlockHandle Allocate();
void Deallocate(BlockHandle index);

struct Header
{
std::uint8_t version[4];
std::uint32_t capacity;
std::atomic<BlockHandle> head;
};

// Ensure a specific memory layout.
static_assert(offsetof(PoolAllocator::Header, version) == 0);
static_assert(offsetof(PoolAllocator::Header, capacity) == 4);
static_assert(offsetof(PoolAllocator::Header, head) == 8);
static_assert(sizeof(PoolAllocator::Header) == 12);

private:
PoolAllocator(SharedState *header, std::atomic<BlockHandle> *next);

static void GetMemoryLayout(SharedState *shared_state, std::atomic<BlockHandle> **next);
PoolAllocator(StructStream &stream, std::uint32_t capacity);

Header &m_Header;

std::uint32_t &m_Capacity;
std::atomic<BlockHandle> &m_Head;
std::array<std::uint8_t, 4> *m_Version;
std::uint32_t *m_Capacity;
std::atomic<BlockHandle> *m_Head;
std::atomic<BlockHandle> *m_Next;
};

template<>
struct SharedStateInternal<ShareableType::PoolAllocator>
{
PoolAllocator::Header header;
static_assert(std::atomic<BlockHandle>::is_always_lock_free);
};

#endif // POOL_ALLOCATOR_H

0 comments on commit 1a4b187

Please sign in to comment.