Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use C11 threads.h for multithreading and synchronization #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions cr_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cr_executor_t* cr_executor_create(int worker_count);

#ifdef CR_TASK_IMPL
#include <assert.h>
#include <pthread.h>
#include <threads.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
Expand All @@ -40,8 +40,8 @@ typedef struct _cr_task_ref_t _cr_task_ref_t;
typedef struct _cr_pool_t _cr_pool_t;

struct _cr_sync_t {
pthread_mutex_t mutex;
pthread_cond_t cond;
mtx_t mutex;
cnd_t cond;
int is_done;
};

Expand All @@ -52,7 +52,7 @@ struct _cr_task_ref_t {
};

struct _cr_pool_t {
pthread_mutex_t mutex;
mtx_t mutex;
void* batches[1llu << _CR_POOL_BATCH];
uint64_t batch_count;
uint64_t item_size;
Expand All @@ -76,9 +76,9 @@ struct cr_executor_t {
_cr_pool_t task_pool;
_cr_pool_t task_ref_pool;
_Atomic uint64_t c;
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t* workers;
mtx_t mutex;
cnd_t cond;
thrd_t* workers;
};

void _cr_pool_init(_cr_pool_t* pool, uint64_t item_size);
Expand All @@ -89,10 +89,10 @@ void _cr_task_end_wait(cr_task_t* task);
void _cr_task_begin_signal(cr_task_t* task);

void _cr_executor_schedule_task(cr_executor_t* executor, cr_task_t* task);
void* _cr_executor_worker_func(void* args);
int _cr_executor_worker_func(void* args);

void _cr_pool_init(_cr_pool_t* pool, uint64_t item_size) {
pthread_mutex_init(&pool->mutex, NULL);
mtx_init(&pool->mutex, mtx_plain);
memset(pool->batches, 0, sizeof(pool->batches));
pool->batch_count = 0;
pool->item_size = item_size;
Expand All @@ -106,7 +106,7 @@ uint64_t _cr_pool_pop(_cr_pool_t* pool) {
do {
idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
pthread_mutex_lock(&pool->mutex);
mtx_lock(&pool->mutex);
c = atomic_load_explicit(&pool->c, memory_order_acquire);
idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
Expand All @@ -122,10 +122,10 @@ uint64_t _cr_pool_pop(_cr_pool_t* pool) {
void* item = (char*)(pool->batches[batch_idx]) + ((1llu << _CR_POOL_ITEM) - 1llu) * pool->item_size;
atomic_store_explicit((_Atomic uint32_t*)item, _CR_POOL_IDX_MASK, memory_order_relaxed);
atomic_store_explicit(&pool->c, idx | 1llu, memory_order_release);
pthread_mutex_unlock(&pool->mutex);
mtx_unlock(&pool->mutex);
return idx;
}
pthread_mutex_unlock(&pool->mutex);
mtx_unlock(&pool->mutex);
}
uint64_t batch_idx = idx >> _CR_POOL_ITEM;
uint64_t item_idx = idx & ((1llu << _CR_POOL_ITEM) - 1llu);
Expand Down Expand Up @@ -221,18 +221,18 @@ void cr_task_run(cr_task_t* task) {

void cr_task_sync(cr_task_t* task) {
_cr_sync_t sync;
pthread_mutex_init(&sync.mutex, NULL);
pthread_cond_init(&sync.cond, NULL);
mtx_init(&sync.mutex, mtx_plain);
cnd_init(&sync.cond);
sync.is_done = 0;
task->sync = &sync;
uint64_t c = atomic_fetch_or_explicit(&task->signal, _CR_TASK_FLAG_SYNC, memory_order_acq_rel);
assert((c & _CR_TASK_FLAG_SYNC) == 0 && "sync flag already set");
if ((c & _CR_TASK_FLAG_S) == 0) {
pthread_mutex_lock(&sync.mutex);
mtx_lock(&sync.mutex);
while (!sync.is_done) {
pthread_cond_wait(&sync.cond, &sync.mutex);
cnd_wait(&sync.cond, &sync.mutex);
}
pthread_mutex_unlock(&sync.mutex);
mtx_unlock(&sync.mutex);
}
}

Expand All @@ -255,10 +255,10 @@ void _cr_task_begin_signal(cr_task_t* task) {
uint64_t c = atomic_fetch_or_explicit(&task->signal, _CR_TASK_FLAG_S, memory_order_acq_rel);
assert((c & _CR_TASK_FLAG_S) == 0 && "tried to finalize more than once");
if (c & _CR_TASK_FLAG_SYNC) {
pthread_mutex_lock(&task->sync->mutex);
mtx_lock(&task->sync->mutex);
task->sync->is_done = 1;
pthread_cond_signal(&task->sync->cond);
pthread_mutex_unlock(&task->sync->mutex);
cnd_signal(&task->sync->cond);
mtx_unlock(&task->sync->mutex);
}
uint64_t waiting_count = (c >> 32) & 0xffff;
uint64_t idx = c & _CR_POOL_IDX_MASK;
Expand All @@ -278,15 +278,15 @@ void _cr_task_begin_signal(cr_task_t* task) {

cr_executor_t* cr_executor_create(int worker_count) {
size_t aligned_size_executor = (sizeof(cr_executor_t) + 255) & ~(size_t)0xff;
cr_executor_t* executor = malloc(aligned_size_executor + (size_t)worker_count * sizeof(pthread_t));
cr_executor_t* executor = malloc(aligned_size_executor + (size_t)worker_count * sizeof(thrd_t));
_cr_pool_init(&executor->task_pool, sizeof(cr_task_t));
_cr_pool_init(&executor->task_ref_pool, sizeof(_cr_task_ref_t));
atomic_store_explicit(&executor->c, _CR_POOL_IDX_MASK, memory_order_release);
pthread_mutex_init(&executor->mutex, NULL);
pthread_cond_init(&executor->cond, NULL);
executor->workers = (pthread_t*)((char*)executor + aligned_size_executor);
mtx_init(&executor->mutex, mtx_plain);
cnd_init(&executor->cond);
executor->workers = (thrd_t*)((char*)executor + aligned_size_executor);
for (int i = 0; i < worker_count; i++) {
pthread_create(&executor->workers[i], NULL, _cr_executor_worker_func, executor);
thrd_create(&executor->workers[i], _cr_executor_worker_func, executor);
}
return executor;
}
Expand All @@ -297,25 +297,25 @@ void _cr_executor_schedule_task(cr_executor_t* executor, cr_task_t* task) {
do {
uint64_t idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
pthread_mutex_lock(&executor->mutex);
mtx_lock(&executor->mutex);
c = atomic_load_explicit(&executor->c, memory_order_acquire);
idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
atomic_store_explicit(&task->next_idx_list, _CR_POOL_IDX_MASK, memory_order_relaxed);
atomic_store_explicit(&executor->c, task->idx, memory_order_release);
pthread_cond_signal(&executor->cond);
pthread_mutex_unlock(&executor->mutex);
cnd_signal(&executor->cond);
mtx_unlock(&executor->mutex);
return;
}
pthread_mutex_unlock(&executor->mutex);
mtx_unlock(&executor->mutex);
}
atomic_store_explicit(&task->next_idx_list, (uint32_t)idx, memory_order_relaxed);
n = ((c + (1llu << 32)) & ~_CR_POOL_IDX_MASK) | task->idx;
} while (!atomic_compare_exchange_weak_explicit(&executor->c, &c, n, memory_order_acq_rel, memory_order_acquire));
pthread_cond_signal(&executor->cond);
cnd_signal(&executor->cond);
}

void* _cr_executor_worker_func(void* args) {
int _cr_executor_worker_func(void* args) {
cr_executor_t* executor = args;
while (1) {
cr_task_t* task;
Expand All @@ -324,15 +324,15 @@ void* _cr_executor_worker_func(void* args) {
do {
uint64_t idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
pthread_mutex_lock(&executor->mutex);
mtx_lock(&executor->mutex);
c = atomic_load_explicit(&executor->c, memory_order_acquire);
idx = c & _CR_POOL_IDX_MASK;
if (idx == _CR_POOL_IDX_MASK) {
pthread_cond_wait(&executor->cond, &executor->mutex);
pthread_mutex_unlock(&executor->mutex);
cnd_wait(&executor->cond, &executor->mutex);
mtx_unlock(&executor->mutex);
goto next_task;
}
pthread_mutex_unlock(&executor->mutex);
mtx_unlock(&executor->mutex);
}
uint64_t batch_idx = idx >> _CR_POOL_ITEM;
uint64_t item_idx = idx & ((1llu << _CR_POOL_ITEM) - 1llu);
Expand All @@ -343,7 +343,7 @@ void* _cr_executor_worker_func(void* args) {
_cr_task_begin_signal(task);
next_task: ;
}
return NULL;
return 0;
}
#endif
#endif
2 changes: 1 addition & 1 deletion test.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static void setup(void* args) {
m_t sr = 0;
uint32_t r = 0;
for (uint64_t i = 0; i < M; i++) {
if (i % 32 == 0) r = arc4random();
if (i % 32 == 0) r = rand();
m_t v = r & 1;
c[i * N + n] = v;
s[i * N + n] = sr;
Expand Down