Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements event-driven (non-polling) promises #111

Merged
merged 5 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.13.2.9001
Version: 0.13.2.9002
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand All @@ -23,13 +23,12 @@ Encoding: UTF-8
Depends:
R (>= 3.6)
Imports:
nanonext (>= 0.13.3)
nanonext (>= 0.13.6.9002)
Enhances:
parallel,
promises
Suggests:
knitr,
later,
markdown
VignetteBuilder: knitr
RoxygenNote: 7.3.1
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ importFrom(nanonext,reap)
importFrom(nanonext,recv)
importFrom(nanonext,recv_aio_signal)
importFrom(nanonext,request)
importFrom(nanonext,request2)
importFrom(nanonext,request_signal)
importFrom(nanonext,send)
importFrom(nanonext,socket)
Expand Down
5 changes: 3 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# mirai 0.13.2.9001 (development)
# mirai 0.13.2.9002 (development)

* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng #110).
* Re-implements the promises method with completely event-driven (non-polling) promises (possible thanks to improvements in `nanonext` implemented with the help of @jcheng5)
* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng5 #110).

# mirai 0.13.2

Expand Down
5 changes: 3 additions & 2 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
#'
#' @importFrom nanonext call_aio call_aio_ .context cv cv_value dial
#' is_error_value listen lock mclock msleep next_config opt opt<- parse_url
#' pipe_notify random reap recv recv_aio_signal request request_signal send
#' socket stat stop_aio strcat tls_config unresolved until wait write_cert
#' pipe_notify random reap recv recv_aio_signal request request2
#' request_signal send socket stat stop_aio strcat tls_config unresolved
#' until wait write_cert
#' @importFrom parallel nextRNGStream
#' @importFrom stats rexp
#' @importFrom utils .DollarNames
Expand Down
4 changes: 2 additions & 2 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau
envir <- ..[[.compute]]
if (is.null(envir)) {
sock <- ephemeral_daemon(local_url())
aio <- request(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
aio <- request2(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
`attr<-`(.subset2(aio, "aio"), "sock", sock)
} else {
aio <- request_signal(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout)
aio <- request2(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout)
}

`class<-`(aio, c("mirai", "recvAio"))
Expand Down
43 changes: 27 additions & 16 deletions R/promises.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,33 @@
#'
as.promise.mirai <- function(x) {

force(x)
promises::then(
promise = promises::promise(
function(resolve, reject) {
query <- function()
if (unresolved(x))
later::later(query, delay = 0.1) else
resolve(.subset2(x, "value"))
query()
}
),
onFulfilled = function(value)
if (is_error_value(value) && !is_mirai_interrupt(value))
stop(value) else
value
)
promise <- .subset2(x, "promise")

if (is.null(promise)) {

promise <- promises::then(
promises::promise(
function(resolve, reject)
attr(x, "callback") <- function() resolve(.subset2(x, "data"))
),
onFulfilled = function(value)
if (is_error_value(value) && !is_mirai_interrupt(value))
stop(value) else
value
)

if (!unresolved(x)) {
value <- .subset2(x, "value")
promise <- if (is_error_value(value) && !is_mirai_interrupt(value))
promises::promise_reject(value) else
promises::promise_resolve(value)
}

assign("promise", promise, x)

}

promise

}

Expand Down
2 changes: 1 addition & 1 deletion README.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ We would like to thank in particular:

[Will Landau](https://github.com/wlandau/), for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`.

[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, and guidance for implementing error stack traces.
[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, prototyping non-polling promises and guidance on implementing error stack traces.

[Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R's implementation of L'Ecuyer-CMRG streams, used to ensure statistical independence in parallel processing, and collaboration in 'providing an alternative communications backend for R'.

Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ result.

``` r
m$data
#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864
#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577
#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703
#> [7] -0.9656666 0.1509931 0.5354832 0.4135421
```

Alternatively, explicitly call and wait for the result using
`call_mirai()`.

``` r
call_mirai(m)$data
#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864
#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577
#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703
#> [7] -0.9656666 0.1509931 0.5354832 0.4135421
```

### Daemons
Expand Down Expand Up @@ -184,8 +184,8 @@ for persistent daemons, through to orchestrating robustness testing for
the high performance computing requirements of `crew` and `targets`.

[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises`
method to make `mirai` work seamlessly within Shiny, and guidance for
implementing error stack traces.
method to make `mirai` work seamlessly within Shiny, prototyping
non-polling promises and guidance on implementing error stack traces.

[Luke Tierney](https://github.com/ltierney/), R Core, for discussion on
R’s implementation of L’Ecuyer-CMRG streams, used to ensure statistical
Expand Down
Loading