diff --git a/src/thread.c b/src/thread.c index b972e2ba0..8efe4fb79 100644 --- a/src/thread.c +++ b/src/thread.c @@ -303,6 +303,63 @@ static void rnng_wait_thread_single(void *args) { } +void single_wait_thread_create(SEXP x) { + + nano_aio *aiop = (nano_aio *) NANO_PTR(x); + nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); + nano_cv *ncv = R_Calloc(1, nano_cv); + taio->aio = aiop->aio; + taio->cv = ncv; + nng_mtx *mtx; + nng_cv *cv; + int xc, signalled; + + if ((xc = nng_mtx_alloc(&mtx))) + goto exitlevel1; + + if ((xc = nng_cv_alloc(&cv, mtx))) + goto exitlevel2; + + ncv->mtx = mtx; + ncv->cv = cv; + + if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) + goto exitlevel3; + + SEXP xptr; + PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); + R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); + R_MakeWeakRef(x, xptr, R_NilValue, TRUE); + UNPROTECT(1); + + nng_time time = nng_clock(); + + while (1) { + time = time + 400; + signalled = 1; + nng_mtx_lock(mtx); + while (ncv->condition == 0) { + if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { + signalled = 0; + break; + } + } + nng_mtx_unlock(mtx); + if (signalled) break; + R_CheckUserInterrupt(); + } + + return; + + exitlevel3: + nng_cv_free(cv); + exitlevel2: + nng_mtx_free(mtx); + exitlevel1: + ERROR_OUT(xc); + +} + SEXP rnng_wait_thread_create(SEXP x) { const SEXPTYPE typ = TYPEOF(x); @@ -335,60 +392,16 @@ SEXP rnng_wait_thread_create(SEXP x) { nano_shared_aio = aiop->aio; nano_wait_condition = 1; nng_cv_wake(nano_wait_cv); - } else if (nano_shared_aio != aiop->aio) { - thread_required = 1; + } else { + thread_required = nano_shared_aio != aiop->aio; } nng_mtx_unlock(nano_wait_mtx); if (thread_required) { PROTECT(coreaio); - nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); - nano_cv *ncv = R_Calloc(1, nano_cv); - taio->aio = aiop->aio; - taio->cv = ncv; - nng_mtx *mtx; - nng_cv *cv; - - if ((xc = nng_mtx_alloc(&mtx))) - ERROR_OUT(xc); - - if ((xc = nng_cv_alloc(&cv, mtx))) { - nng_mtx_free(mtx); - ERROR_OUT(xc); - } - - ncv->mtx = mtx; - ncv->cv = cv; - - if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) { - nng_cv_free(cv); - nng_mtx_free(mtx); - ERROR_OUT(xc); - } - - SEXP xptr; - PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); - R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); - R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE); - UNPROTECT(2); - - nng_time time = nng_clock(); - - while (1) { - time = time + 400; - signalled = 1; - nng_mtx_lock(mtx); - while (ncv->condition == 0) { - if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { - signalled = 0; - break; - } - } - nng_mtx_unlock(mtx); - if (signalled) break; - R_CheckUserInterrupt(); - } + single_wait_thread_create(coreaio); + UNPROTECT(1); } else {