diff --git a/cr_task.h b/cr_task.h index 125dffd..1c24c36 100644 --- a/cr_task.h +++ b/cr_task.h @@ -19,7 +19,7 @@ cr_executor_t* cr_executor_create(int worker_count); #ifdef CR_TASK_IMPL #include -#include +#include #include #include #include @@ -40,8 +40,8 @@ typedef struct _cr_task_ref_t _cr_task_ref_t; typedef struct _cr_pool_t _cr_pool_t; struct _cr_sync_t { - pthread_mutex_t mutex; - pthread_cond_t cond; + mtx_t mutex; + cnd_t cond; int is_done; }; @@ -52,7 +52,7 @@ struct _cr_task_ref_t { }; struct _cr_pool_t { - pthread_mutex_t mutex; + mtx_t mutex; void* batches[1llu << _CR_POOL_BATCH]; uint64_t batch_count; uint64_t item_size; @@ -76,9 +76,9 @@ struct cr_executor_t { _cr_pool_t task_pool; _cr_pool_t task_ref_pool; _Atomic uint64_t c; - pthread_mutex_t mutex; - pthread_cond_t cond; - pthread_t* workers; + mtx_t mutex; + cnd_t cond; + thrd_t* workers; }; void _cr_pool_init(_cr_pool_t* pool, uint64_t item_size); @@ -89,10 +89,10 @@ void _cr_task_end_wait(cr_task_t* task); void _cr_task_begin_signal(cr_task_t* task); void _cr_executor_schedule_task(cr_executor_t* executor, cr_task_t* task); -void* _cr_executor_worker_func(void* args); +int _cr_executor_worker_func(void* args); void _cr_pool_init(_cr_pool_t* pool, uint64_t item_size) { - pthread_mutex_init(&pool->mutex, NULL); + mtx_init(&pool->mutex, mtx_plain); memset(pool->batches, 0, sizeof(pool->batches)); pool->batch_count = 0; pool->item_size = item_size; @@ -106,7 +106,7 @@ uint64_t _cr_pool_pop(_cr_pool_t* pool) { do { idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { - pthread_mutex_lock(&pool->mutex); + mtx_lock(&pool->mutex); c = atomic_load_explicit(&pool->c, memory_order_acquire); idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { @@ -122,10 +122,10 @@ uint64_t _cr_pool_pop(_cr_pool_t* pool) { void* item = (char*)(pool->batches[batch_idx]) + ((1llu << _CR_POOL_ITEM) - 1llu) * pool->item_size; atomic_store_explicit((_Atomic uint32_t*)item, _CR_POOL_IDX_MASK, memory_order_relaxed); atomic_store_explicit(&pool->c, idx | 1llu, memory_order_release); - pthread_mutex_unlock(&pool->mutex); + mtx_unlock(&pool->mutex); return idx; } - pthread_mutex_unlock(&pool->mutex); + mtx_unlock(&pool->mutex); } uint64_t batch_idx = idx >> _CR_POOL_ITEM; uint64_t item_idx = idx & ((1llu << _CR_POOL_ITEM) - 1llu); @@ -221,18 +221,18 @@ void cr_task_run(cr_task_t* task) { void cr_task_sync(cr_task_t* task) { _cr_sync_t sync; - pthread_mutex_init(&sync.mutex, NULL); - pthread_cond_init(&sync.cond, NULL); + mtx_init(&sync.mutex, mtx_plain); + cnd_init(&sync.cond); sync.is_done = 0; task->sync = &sync; uint64_t c = atomic_fetch_or_explicit(&task->signal, _CR_TASK_FLAG_SYNC, memory_order_acq_rel); assert((c & _CR_TASK_FLAG_SYNC) == 0 && "sync flag already set"); if ((c & _CR_TASK_FLAG_S) == 0) { - pthread_mutex_lock(&sync.mutex); + mtx_lock(&sync.mutex); while (!sync.is_done) { - pthread_cond_wait(&sync.cond, &sync.mutex); + cnd_wait(&sync.cond, &sync.mutex); } - pthread_mutex_unlock(&sync.mutex); + mtx_unlock(&sync.mutex); } } @@ -255,10 +255,10 @@ void _cr_task_begin_signal(cr_task_t* task) { uint64_t c = atomic_fetch_or_explicit(&task->signal, _CR_TASK_FLAG_S, memory_order_acq_rel); assert((c & _CR_TASK_FLAG_S) == 0 && "tried to finalize more than once"); if (c & _CR_TASK_FLAG_SYNC) { - pthread_mutex_lock(&task->sync->mutex); + mtx_lock(&task->sync->mutex); task->sync->is_done = 1; - pthread_cond_signal(&task->sync->cond); - pthread_mutex_unlock(&task->sync->mutex); + cnd_signal(&task->sync->cond); + mtx_unlock(&task->sync->mutex); } uint64_t waiting_count = (c >> 32) & 0xffff; uint64_t idx = c & _CR_POOL_IDX_MASK; @@ -278,15 +278,15 @@ void _cr_task_begin_signal(cr_task_t* task) { cr_executor_t* cr_executor_create(int worker_count) { size_t aligned_size_executor = (sizeof(cr_executor_t) + 255) & ~(size_t)0xff; - cr_executor_t* executor = malloc(aligned_size_executor + (size_t)worker_count * sizeof(pthread_t)); + cr_executor_t* executor = malloc(aligned_size_executor + (size_t)worker_count * sizeof(thrd_t)); _cr_pool_init(&executor->task_pool, sizeof(cr_task_t)); _cr_pool_init(&executor->task_ref_pool, sizeof(_cr_task_ref_t)); atomic_store_explicit(&executor->c, _CR_POOL_IDX_MASK, memory_order_release); - pthread_mutex_init(&executor->mutex, NULL); - pthread_cond_init(&executor->cond, NULL); - executor->workers = (pthread_t*)((char*)executor + aligned_size_executor); + mtx_init(&executor->mutex, mtx_plain); + cnd_init(&executor->cond); + executor->workers = (thrd_t*)((char*)executor + aligned_size_executor); for (int i = 0; i < worker_count; i++) { - pthread_create(&executor->workers[i], NULL, _cr_executor_worker_func, executor); + thrd_create(&executor->workers[i], _cr_executor_worker_func, executor); } return executor; } @@ -297,25 +297,25 @@ void _cr_executor_schedule_task(cr_executor_t* executor, cr_task_t* task) { do { uint64_t idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { - pthread_mutex_lock(&executor->mutex); + mtx_lock(&executor->mutex); c = atomic_load_explicit(&executor->c, memory_order_acquire); idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { atomic_store_explicit(&task->next_idx_list, _CR_POOL_IDX_MASK, memory_order_relaxed); atomic_store_explicit(&executor->c, task->idx, memory_order_release); - pthread_cond_signal(&executor->cond); - pthread_mutex_unlock(&executor->mutex); + cnd_signal(&executor->cond); + mtx_unlock(&executor->mutex); return; } - pthread_mutex_unlock(&executor->mutex); + mtx_unlock(&executor->mutex); } atomic_store_explicit(&task->next_idx_list, (uint32_t)idx, memory_order_relaxed); n = ((c + (1llu << 32)) & ~_CR_POOL_IDX_MASK) | task->idx; } while (!atomic_compare_exchange_weak_explicit(&executor->c, &c, n, memory_order_acq_rel, memory_order_acquire)); - pthread_cond_signal(&executor->cond); + cnd_signal(&executor->cond); } -void* _cr_executor_worker_func(void* args) { +int _cr_executor_worker_func(void* args) { cr_executor_t* executor = args; while (1) { cr_task_t* task; @@ -324,15 +324,15 @@ void* _cr_executor_worker_func(void* args) { do { uint64_t idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { - pthread_mutex_lock(&executor->mutex); + mtx_lock(&executor->mutex); c = atomic_load_explicit(&executor->c, memory_order_acquire); idx = c & _CR_POOL_IDX_MASK; if (idx == _CR_POOL_IDX_MASK) { - pthread_cond_wait(&executor->cond, &executor->mutex); - pthread_mutex_unlock(&executor->mutex); + cnd_wait(&executor->cond, &executor->mutex); + mtx_unlock(&executor->mutex); goto next_task; } - pthread_mutex_unlock(&executor->mutex); + mtx_unlock(&executor->mutex); } uint64_t batch_idx = idx >> _CR_POOL_ITEM; uint64_t item_idx = idx & ((1llu << _CR_POOL_ITEM) - 1llu); @@ -343,7 +343,7 @@ void* _cr_executor_worker_func(void* args) { _cr_task_begin_signal(task); next_task: ; } - return NULL; + return 0; } #endif #endif diff --git a/test.c b/test.c index 68834d8..9262491 100644 --- a/test.c +++ b/test.c @@ -66,7 +66,7 @@ static void setup(void* args) { m_t sr = 0; uint32_t r = 0; for (uint64_t i = 0; i < M; i++) { - if (i % 32 == 0) r = arc4random(); + if (i % 32 == 0) r = rand(); m_t v = r & 1; c[i * N + n] = v; s[i * N + n] = sr;