diff --git a/DESCRIPTION b/DESCRIPTION index 5f7adc6a1..d533efdbd 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.2.1.9024 +Version: 1.2.1.9025 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NEWS.md b/NEWS.md index c638e6a07..22ef8475b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# nanonext 1.2.1.9024 (development) +# nanonext 1.2.1.9025 (development) #### New Features @@ -10,7 +10,7 @@ * Warning messages for unserialization or conversion failures of received data are now suppressable. * Upgrades `reply()` to always return even when there is an evaluation error. This allows it to be used safely in a loop without exiting early, for example. * Removes deprecated and defunct `next_config()`. -* Performance enhancements for `promises::as.promise()` methods. +* Internal performance enhancements. * Updates bundled 'libnng' v1.8.0 with latest patches. # nanonext 1.2.1 diff --git a/src/comms.c b/src/comms.c index cf8c95b68..e0ecbc830 100644 --- a/src/comms.c +++ b/src/comms.c @@ -112,42 +112,42 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) { nng_socket *sock = (nng_socket *) NANO_PTR(socket); const int start = NANO_INTEGER(autostart); const char *ur = CHAR(STRING_ELT(url, 0)); - nano_dialer *dp = R_Calloc(1, nano_dialer); - SEXP dialer, attr, newattr; + nng_dialer *dp = R_Calloc(1, nng_dialer); + SEXP dialer, attr, newattr, xp; + nng_tls_config *cfg; nng_url *up; int xc; if (sec) { - if ((xc = nng_dialer_create(&dp->dial, *sock, ur))) + if ((xc = nng_dialer_create(dp, *sock, ur)) || + (xc = nng_url_parse(&up, ur))) goto exitlevel1; - dp->tls = (nng_tls_config *) NANO_PTR(tls); - nng_tls_config_hold(dp->tls); - if ((xc = nng_url_parse(&up, ur))) + cfg = (nng_tls_config *) NANO_PTR(tls); + if ((xc = nng_tls_config_server_name(cfg, up->u_hostname)) || + (xc = nng_dialer_set_ptr(*dp, NNG_OPT_TLS_CONFIG, cfg))) goto exitlevel2; - if ((xc = nng_tls_config_server_name(dp->tls, up->u_hostname)) || - (xc = nng_dialer_set_ptr(dp->dial, NNG_OPT_TLS_CONFIG, dp->tls))) - goto exitlevel3; nng_url_free(up); - } + if (start && (xc = nng_dialer_start(*dp, start == 1 ? NNG_FLAG_NONBLOCK : 0))) + goto exitlevel1; + nng_tls_config_hold(cfg); - switch (start) { - case 0: - xc = sec ? 0 : nng_dialer_create(&dp->dial, *sock, ur); - break; - case 1: - xc = sec ? nng_dialer_start(dp->dial, NNG_FLAG_NONBLOCK) : nng_dial(*sock, ur, &dp->dial, NNG_FLAG_NONBLOCK); - break; - default: - xc = sec ? nng_dialer_start(dp->dial, 0) : nng_dial(*sock, ur, &dp->dial, 0); - } - if (xc) - goto exitlevel1; + PROTECT_INDEX pxi; + PROTECT_WITH_INDEX(xp = R_MakeExternalPtr(cfg, nano_TlsSymbol, R_NilValue), &pxi); + R_RegisterCFinalizerEx(xp, tls_finalizer, TRUE); + REPROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, xp), pxi); + + } else { - PROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue)); + if ((xc = start ? nng_dial(*sock, ur, dp, start == 1 ? NNG_FLAG_NONBLOCK : 0) : nng_dialer_create(dp, *sock, ur))) + goto exitlevel1; + + PROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue)); + + } R_RegisterCFinalizerEx(dialer, dialer_finalizer, TRUE); NANO_CLASS2(dialer, "nanoDialer", "nano"); - Rf_setAttrib(dialer, nano_IdSymbol, Rf_ScalarInteger(nng_dialer_id(dp->dial))); + Rf_setAttrib(dialer, nano_IdSymbol, Rf_ScalarInteger(nng_dialer_id(*dp))); Rf_setAttrib(dialer, nano_UrlSymbol, url); Rf_setAttrib(dialer, nano_StateSymbol, Rf_mkString(start ? "started" : "not started")); Rf_setAttrib(dialer, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock))); @@ -164,10 +164,8 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) { UNPROTECT(2); return nano_success; - exitlevel3: - nng_url_free(up); exitlevel2: - nng_tls_config_free(dp->tls); + nng_url_free(up); exitlevel1: R_Free(dp); if (NANO_INTEGER(error)) ERROR_OUT(xc); @@ -188,37 +186,43 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) { nng_socket *sock = (nng_socket *) NANO_PTR(socket); const int start = NANO_INTEGER(autostart); const char *ur = CHAR(STRING_ELT(url, 0)); - nano_listener *lp = R_Calloc(1, nano_listener); - SEXP listener, attr, newattr; + nng_listener *lp = R_Calloc(1, nng_listener); + SEXP listener, attr, newattr, xp; + nng_tls_config *cfg; nng_url *up; int xc; if (sec) { - if ((xc = nng_listener_create(&lp->list, *sock, ur))) + if ((xc = nng_listener_create(lp, *sock, ur)) || + (xc = nng_url_parse(&up, ur))) goto exitlevel1; - lp->tls = (nng_tls_config *) NANO_PTR(tls); - nng_tls_config_hold(lp->tls); - if ((xc = nng_url_parse(&up, ur))) + cfg = (nng_tls_config *) NANO_PTR(tls); + if ((xc = nng_tls_config_server_name(cfg, up->u_hostname)) || + (xc = nng_listener_set_ptr(*lp, NNG_OPT_TLS_CONFIG, cfg))) goto exitlevel2; - if ((xc = nng_tls_config_server_name(lp->tls, up->u_hostname)) || - (xc = nng_listener_set_ptr(lp->list, NNG_OPT_TLS_CONFIG, lp->tls))) - goto exitlevel3; nng_url_free(up); - } + if (start && (xc = nng_listener_start(*lp, 0))) + goto exitlevel1; + nng_tls_config_hold(cfg); + + PROTECT_INDEX pxi; + PROTECT_WITH_INDEX(xp = R_MakeExternalPtr(cfg, nano_TlsSymbol, R_NilValue), &pxi); + R_RegisterCFinalizerEx(xp, tls_finalizer, TRUE); + REPROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, xp), pxi); - if (start) { - xc = sec ? nng_listener_start(lp->list, 0) : nng_listen(*sock, ur, &lp->list, 0); } else { - xc = sec ? 0 : nng_listener_create(&lp->list, *sock, ur); + + if ((xc = start ? nng_listen(*sock, ur, lp, 0) : nng_listener_create(lp, *sock, ur))) + goto exitlevel1; + + PROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue)); + } - if (xc) - goto exitlevel1; - PROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue)); R_RegisterCFinalizerEx(listener, listener_finalizer, TRUE); NANO_CLASS2(listener, "nanoListener", "nano"); - Rf_setAttrib(listener, nano_IdSymbol, Rf_ScalarInteger(nng_listener_id(lp->list))); + Rf_setAttrib(listener, nano_IdSymbol, Rf_ScalarInteger(nng_listener_id(*lp))); Rf_setAttrib(listener, nano_UrlSymbol, url); Rf_setAttrib(listener, nano_StateSymbol, Rf_mkString(start ? "started" : "not started")); Rf_setAttrib(listener, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock))); @@ -235,10 +239,8 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) { UNPROTECT(2); return nano_success; - exitlevel3: - nng_url_free(up); exitlevel2: - nng_tls_config_free(lp->tls); + nng_url_free(up); exitlevel1: R_Free(lp); if (NANO_INTEGER(error)) ERROR_OUT(xc); diff --git a/src/core.c b/src/core.c index 972cff115..3e08d839b 100644 --- a/src/core.c +++ b/src/core.c @@ -149,13 +149,21 @@ void sendaio_complete(void *arg) { } +void cv_finalizer(SEXP xptr) { + + if (NANO_PTR(xptr) == NULL) return; + nano_cv *xp = (nano_cv *) NANO_PTR(xptr); + nng_cv_free(xp->cv); + nng_mtx_free(xp->mtx); + R_Free(xp); + +} + void dialer_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; - nano_dialer *xp = (nano_dialer *) NANO_PTR(xptr); - nng_dialer_close(xp->dial); - if (xp->tls != NULL) - nng_tls_config_free(xp->tls); + nng_dialer *xp = (nng_dialer *) NANO_PTR(xptr); + nng_dialer_close(*xp); R_Free(xp); } @@ -163,10 +171,8 @@ void dialer_finalizer(SEXP xptr) { void listener_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; - nano_listener *xp = (nano_listener *) NANO_PTR(xptr); - nng_listener_close(xp->list); - if (xp->tls != NULL) - nng_tls_config_free(xp->tls); + nng_listener *xp = (nng_listener *) NANO_PTR(xptr); + nng_listener_close(*xp); R_Free(xp); } @@ -180,13 +186,11 @@ void socket_finalizer(SEXP xptr) { } -void cv_finalizer(SEXP xptr) { +void tls_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; - nano_cv *xp = (nano_cv *) NANO_PTR(xptr); - nng_cv_free(xp->cv); - nng_mtx_free(xp->mtx); - R_Free(xp); + nng_tls_config *xp = (nng_tls_config *) NANO_PTR(xptr); + nng_tls_config_free(xp); } diff --git a/src/nanonext.h b/src/nanonext.h index 1045169b2..8acf5791e 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -137,16 +137,6 @@ typedef union nano_opt_u { uint64_t u; } nano_opt; -typedef struct nano_listener_s { - nng_listener list; - nng_tls_config *tls; -} nano_listener; - -typedef struct nano_dialer_s { - nng_dialer dial; - nng_tls_config *tls; -} nano_dialer; - typedef struct nano_stream_s { nng_stream *stream; union { @@ -303,6 +293,7 @@ int nano_matcharg(const SEXP); int nano_matchargs(const SEXP); void pipe_cb_signal(nng_pipe, nng_pipe_ev, void *); +void tls_finalizer(SEXP); SEXP rnng_advance_rng_state(void); SEXP rnng_aio_call(SEXP); diff --git a/src/thread.c b/src/thread.c index 8cd31ca4c..fbde57ba3 100644 --- a/src/thread.c +++ b/src/thread.c @@ -115,22 +115,22 @@ SEXP rnng_messenger(SEXP url) { const char *up = CHAR(STRING_ELT(url, 0)); nng_socket *sock = R_Calloc(1, nng_socket); - nano_listener *lp; - nano_dialer *dp; + nng_listener *lp; + nng_dialer *dp; int xc, dialer = 0; SEXP socket, con; if ((xc = nng_pair0_open(sock))) goto exitlevel1; - lp = R_Calloc(1, nano_listener); - if ((xc = nng_listen(*sock, up, &lp->list, 0))) { + lp = R_Calloc(1, nng_listener); + if ((xc = nng_listen(*sock, up, lp, 0))) { if (xc != 10 && xc != 15) { R_Free(lp); goto exitlevel1; } R_Free(lp); - dp = R_Calloc(1, nano_dialer); - if ((xc = nng_dial(*sock, up, &dp->dial, 0))) { + dp = R_Calloc(1, nng_dialer); + if ((xc = nng_dial(*sock, up, dp, 0))) { R_Free(dp); goto exitlevel1; } @@ -699,7 +699,7 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) { memcpy(disp->url[i], up, slen); } nng_socket *hsock = R_Calloc(1, nng_socket); - nano_listener *hl = R_Calloc(1, nano_listener); + nng_listener *hl = R_Calloc(1, nng_listener); if (nng_url_parse(&disp->up, disp->url[0])) goto exitlevel3; @@ -708,7 +708,7 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) { goto exitlevel4; if ((xc = nng_socket_set_ms(*hsock, "req:resend-time", 0)) || - (xc = nng_listen(*hsock, disp->host, &hl->list, 0)) || + (xc = nng_listen(*hsock, disp->host, hl, 0)) || (xc = nng_thread_create(&disp->thr, rnng_dispatch_thread, disp))) goto exitlevel5; diff --git a/src/tls.c b/src/tls.c index 7c307d7d0..f97b78fb7 100644 --- a/src/tls.c +++ b/src/tls.c @@ -76,16 +76,6 @@ static int parse_serial_decimal_format(unsigned char *obuf, size_t obufmax, } #endif -// finalizers ------------------------------------------------------------------ - -static void tls_finalizer(SEXP xptr) { - - if (NANO_PTR(xptr) == NULL) return; - nng_tls_config *xp = (nng_tls_config *) NANO_PTR(xptr); - nng_tls_config_free(xp); - -} - // Mbed TLS Random Data Generator ---------------------------------------------- SEXP rnng_random(SEXP n, SEXP convert) {