Skip to content

Commit

Permalink
Merge pull request #52 from shikokuchuo/optimize
Browse files Browse the repository at this point in the history
Enhance listener and dialer efficiency
  • Loading branch information
shikokuchuo authored Oct 3, 2024
2 parents 78d0d40 + a0e174d commit 539f6d9
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 91 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.2.1.9024
Version: 1.2.1.9025
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# nanonext 1.2.1.9024 (development)
# nanonext 1.2.1.9025 (development)

#### New Features

Expand All @@ -10,7 +10,7 @@
* Warning messages for unserialization or conversion failures of received data are now suppressable.
* Upgrades `reply()` to always return even when there is an evaluation error. This allows it to be used safely in a loop without exiting early, for example.
* Removes deprecated and defunct `next_config()`.
* Performance enhancements for `promises::as.promise()` methods.
* Internal performance enhancements.
* Updates bundled 'libnng' v1.8.0 with latest patches.

# nanonext 1.2.1
Expand Down
96 changes: 49 additions & 47 deletions src/comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,42 +112,42 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_socket *sock = (nng_socket *) NANO_PTR(socket);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nano_dialer *dp = R_Calloc(1, nano_dialer);
SEXP dialer, attr, newattr;
nng_dialer *dp = R_Calloc(1, nng_dialer);
SEXP dialer, attr, newattr, xp;
nng_tls_config *cfg;
nng_url *up;
int xc;

if (sec) {
if ((xc = nng_dialer_create(&dp->dial, *sock, ur)))
if ((xc = nng_dialer_create(dp, *sock, ur)) ||
(xc = nng_url_parse(&up, ur)))
goto exitlevel1;
dp->tls = (nng_tls_config *) NANO_PTR(tls);
nng_tls_config_hold(dp->tls);
if ((xc = nng_url_parse(&up, ur)))
cfg = (nng_tls_config *) NANO_PTR(tls);
if ((xc = nng_tls_config_server_name(cfg, up->u_hostname)) ||
(xc = nng_dialer_set_ptr(*dp, NNG_OPT_TLS_CONFIG, cfg)))
goto exitlevel2;
if ((xc = nng_tls_config_server_name(dp->tls, up->u_hostname)) ||
(xc = nng_dialer_set_ptr(dp->dial, NNG_OPT_TLS_CONFIG, dp->tls)))
goto exitlevel3;
nng_url_free(up);
}
if (start && (xc = nng_dialer_start(*dp, start == 1 ? NNG_FLAG_NONBLOCK : 0)))
goto exitlevel1;
nng_tls_config_hold(cfg);

switch (start) {
case 0:
xc = sec ? 0 : nng_dialer_create(&dp->dial, *sock, ur);
break;
case 1:
xc = sec ? nng_dialer_start(dp->dial, NNG_FLAG_NONBLOCK) : nng_dial(*sock, ur, &dp->dial, NNG_FLAG_NONBLOCK);
break;
default:
xc = sec ? nng_dialer_start(dp->dial, 0) : nng_dial(*sock, ur, &dp->dial, 0);
}
if (xc)
goto exitlevel1;
PROTECT_INDEX pxi;
PROTECT_WITH_INDEX(xp = R_MakeExternalPtr(cfg, nano_TlsSymbol, R_NilValue), &pxi);
R_RegisterCFinalizerEx(xp, tls_finalizer, TRUE);
REPROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, xp), pxi);

} else {

PROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue));
if ((xc = start ? nng_dial(*sock, ur, dp, start == 1 ? NNG_FLAG_NONBLOCK : 0) : nng_dialer_create(dp, *sock, ur)))
goto exitlevel1;

PROTECT(dialer = R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue));

}
R_RegisterCFinalizerEx(dialer, dialer_finalizer, TRUE);

NANO_CLASS2(dialer, "nanoDialer", "nano");
Rf_setAttrib(dialer, nano_IdSymbol, Rf_ScalarInteger(nng_dialer_id(dp->dial)));
Rf_setAttrib(dialer, nano_IdSymbol, Rf_ScalarInteger(nng_dialer_id(*dp)));
Rf_setAttrib(dialer, nano_UrlSymbol, url);
Rf_setAttrib(dialer, nano_StateSymbol, Rf_mkString(start ? "started" : "not started"));
Rf_setAttrib(dialer, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock)));
Expand All @@ -164,10 +164,8 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
UNPROTECT(2);
return nano_success;

exitlevel3:
nng_url_free(up);
exitlevel2:
nng_tls_config_free(dp->tls);
nng_url_free(up);
exitlevel1:
R_Free(dp);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
Expand All @@ -188,37 +186,43 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_socket *sock = (nng_socket *) NANO_PTR(socket);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nano_listener *lp = R_Calloc(1, nano_listener);
SEXP listener, attr, newattr;
nng_listener *lp = R_Calloc(1, nng_listener);
SEXP listener, attr, newattr, xp;
nng_tls_config *cfg;
nng_url *up;
int xc;

if (sec) {
if ((xc = nng_listener_create(&lp->list, *sock, ur)))
if ((xc = nng_listener_create(lp, *sock, ur)) ||
(xc = nng_url_parse(&up, ur)))
goto exitlevel1;
lp->tls = (nng_tls_config *) NANO_PTR(tls);
nng_tls_config_hold(lp->tls);
if ((xc = nng_url_parse(&up, ur)))
cfg = (nng_tls_config *) NANO_PTR(tls);
if ((xc = nng_tls_config_server_name(cfg, up->u_hostname)) ||
(xc = nng_listener_set_ptr(*lp, NNG_OPT_TLS_CONFIG, cfg)))
goto exitlevel2;
if ((xc = nng_tls_config_server_name(lp->tls, up->u_hostname)) ||
(xc = nng_listener_set_ptr(lp->list, NNG_OPT_TLS_CONFIG, lp->tls)))
goto exitlevel3;
nng_url_free(up);
}
if (start && (xc = nng_listener_start(*lp, 0)))
goto exitlevel1;
nng_tls_config_hold(cfg);

PROTECT_INDEX pxi;
PROTECT_WITH_INDEX(xp = R_MakeExternalPtr(cfg, nano_TlsSymbol, R_NilValue), &pxi);
R_RegisterCFinalizerEx(xp, tls_finalizer, TRUE);
REPROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, xp), pxi);

if (start) {
xc = sec ? nng_listener_start(lp->list, 0) : nng_listen(*sock, ur, &lp->list, 0);
} else {
xc = sec ? 0 : nng_listener_create(&lp->list, *sock, ur);

if ((xc = start ? nng_listen(*sock, ur, lp, 0) : nng_listener_create(lp, *sock, ur)))
goto exitlevel1;

PROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue));

}
if (xc)
goto exitlevel1;

PROTECT(listener = R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue));
R_RegisterCFinalizerEx(listener, listener_finalizer, TRUE);

NANO_CLASS2(listener, "nanoListener", "nano");
Rf_setAttrib(listener, nano_IdSymbol, Rf_ScalarInteger(nng_listener_id(lp->list)));
Rf_setAttrib(listener, nano_IdSymbol, Rf_ScalarInteger(nng_listener_id(*lp)));
Rf_setAttrib(listener, nano_UrlSymbol, url);
Rf_setAttrib(listener, nano_StateSymbol, Rf_mkString(start ? "started" : "not started"));
Rf_setAttrib(listener, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock)));
Expand All @@ -235,10 +239,8 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
UNPROTECT(2);
return nano_success;

exitlevel3:
nng_url_free(up);
exitlevel2:
nng_tls_config_free(lp->tls);
nng_url_free(up);
exitlevel1:
R_Free(lp);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
Expand Down
30 changes: 17 additions & 13 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,30 @@ void sendaio_complete(void *arg) {

}

void cv_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_cv *xp = (nano_cv *) NANO_PTR(xptr);
nng_cv_free(xp->cv);
nng_mtx_free(xp->mtx);
R_Free(xp);

}

void dialer_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_dialer *xp = (nano_dialer *) NANO_PTR(xptr);
nng_dialer_close(xp->dial);
if (xp->tls != NULL)
nng_tls_config_free(xp->tls);
nng_dialer *xp = (nng_dialer *) NANO_PTR(xptr);
nng_dialer_close(*xp);
R_Free(xp);

}

void listener_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_listener *xp = (nano_listener *) NANO_PTR(xptr);
nng_listener_close(xp->list);
if (xp->tls != NULL)
nng_tls_config_free(xp->tls);
nng_listener *xp = (nng_listener *) NANO_PTR(xptr);
nng_listener_close(*xp);
R_Free(xp);

}
Expand All @@ -180,13 +186,11 @@ void socket_finalizer(SEXP xptr) {

}

void cv_finalizer(SEXP xptr) {
void tls_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_cv *xp = (nano_cv *) NANO_PTR(xptr);
nng_cv_free(xp->cv);
nng_mtx_free(xp->mtx);
R_Free(xp);
nng_tls_config *xp = (nng_tls_config *) NANO_PTR(xptr);
nng_tls_config_free(xp);

}

Expand Down
11 changes: 1 addition & 10 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,6 @@ typedef union nano_opt_u {
uint64_t u;
} nano_opt;

typedef struct nano_listener_s {
nng_listener list;
nng_tls_config *tls;
} nano_listener;

typedef struct nano_dialer_s {
nng_dialer dial;
nng_tls_config *tls;
} nano_dialer;

typedef struct nano_stream_s {
nng_stream *stream;
union {
Expand Down Expand Up @@ -303,6 +293,7 @@ int nano_matcharg(const SEXP);
int nano_matchargs(const SEXP);

void pipe_cb_signal(nng_pipe, nng_pipe_ev, void *);
void tls_finalizer(SEXP);

SEXP rnng_advance_rng_state(void);
SEXP rnng_aio_call(SEXP);
Expand Down
16 changes: 8 additions & 8 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,22 @@ SEXP rnng_messenger(SEXP url) {

const char *up = CHAR(STRING_ELT(url, 0));
nng_socket *sock = R_Calloc(1, nng_socket);
nano_listener *lp;
nano_dialer *dp;
nng_listener *lp;
nng_dialer *dp;
int xc, dialer = 0;
SEXP socket, con;

if ((xc = nng_pair0_open(sock)))
goto exitlevel1;
lp = R_Calloc(1, nano_listener);
if ((xc = nng_listen(*sock, up, &lp->list, 0))) {
lp = R_Calloc(1, nng_listener);
if ((xc = nng_listen(*sock, up, lp, 0))) {
if (xc != 10 && xc != 15) {
R_Free(lp);
goto exitlevel1;
}
R_Free(lp);
dp = R_Calloc(1, nano_dialer);
if ((xc = nng_dial(*sock, up, &dp->dial, 0))) {
dp = R_Calloc(1, nng_dialer);
if ((xc = nng_dial(*sock, up, dp, 0))) {
R_Free(dp);
goto exitlevel1;
}
Expand Down Expand Up @@ -699,7 +699,7 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
memcpy(disp->url[i], up, slen);
}
nng_socket *hsock = R_Calloc(1, nng_socket);
nano_listener *hl = R_Calloc(1, nano_listener);
nng_listener *hl = R_Calloc(1, nng_listener);

if (nng_url_parse(&disp->up, disp->url[0]))
goto exitlevel3;
Expand All @@ -708,7 +708,7 @@ SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) {
goto exitlevel4;

if ((xc = nng_socket_set_ms(*hsock, "req:resend-time", 0)) ||
(xc = nng_listen(*hsock, disp->host, &hl->list, 0)) ||
(xc = nng_listen(*hsock, disp->host, hl, 0)) ||
(xc = nng_thread_create(&disp->thr, rnng_dispatch_thread, disp)))
goto exitlevel5;

Expand Down
10 changes: 0 additions & 10 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,6 @@ static int parse_serial_decimal_format(unsigned char *obuf, size_t obufmax,
}
#endif

// finalizers ------------------------------------------------------------------

static void tls_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nng_tls_config *xp = (nng_tls_config *) NANO_PTR(xptr);
nng_tls_config_free(xp);

}

// Mbed TLS Random Data Generator ----------------------------------------------

SEXP rnng_random(SEXP n, SEXP convert) {
Expand Down

0 comments on commit 539f6d9

Please sign in to comment.