Skip to content

Commit

Permalink
check for max_calls_worker; add test (fixes #322)
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Dec 7, 2023
1 parent cb0e214 commit b8427e9
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 5 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# git head

* Fix a bug where SSH proxy would not cache data properly (#320)
* Fix a bug where `max_calls_worker` was not respected (#322)
* Local parallelism (`multicore`, `multiprocess`) again uses local IP (#321)

# clustermq 0.9.1
Expand Down
2 changes: 1 addition & 1 deletion R/Q.r
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#' @param chunk_size Number of function calls to chunk together
#' defaults to 100 chunks per worker or max. 10 kb per chunk
#' @param timeout Maximum time in seconds to wait for worker (default: Inf)
#' @param max_calls_worker Maxmimum number of function calls that will be sent to one worker
#' @param max_calls_worker Maxmimum number of chunks that will be sent to one worker
#' @param verbose Print status messages and progress bar (default: TRUE)
#' @return A list of whatever `fun` returned
#' @export
Expand Down
2 changes: 1 addition & 1 deletion R/master.r
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ master = function(pool, iter, rettype="list", fail_on_error=TRUE,
cond_msgs$errors = c(cond_msgs$errors, msg$errors)
}

if (shutdown || (!is.null(msg$n_calls) && msg$n_calls >= max_calls_worker)) {
if (shutdown || with(pool$info(), calls[current]) >= max_calls_worker) {
pool$send_shutdown()
next
}
Expand Down
2 changes: 1 addition & 1 deletion R/pool.r
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Pool = R6::R6Class("Pool",

info = function() {
info = private$master$list_workers()
times = do.call(rbind, info$time)[,1:3]
times = do.call(rbind, info$time)[,1:3,drop=FALSE]
mem = function(field) sapply(info$mem, function(m) sum(m[,field] * c(56,1)))
do.call(data.frame, c(info[c("worker", "status")],
current=list(info$worker==info$cur),
Expand Down
2 changes: 1 addition & 1 deletion man/Q.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/Q_rows.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions tests/testthat/test-5-queue.r
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,18 @@ test_that("shutdown monitor does not fire on clean disconnects", {
res = Q(Sys.sleep, time=c(0,1), workers=w, timeout=10L)
expect_equal(res, list(NULL, NULL))
})

test_that("max_calls_worker is respected", {
skip_on_cran() # not sure if CRAN-safe
skip_on_os("windows")

fx = function(x) { Sys.sleep(x==1); Sys.getpid() }

w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
res = table(unlist(Q(fx, x=1:4, workers=w)))
expect_true(setequal(res, c(1,3)))

w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
res = table(unlist(Q(fx, x=1:4, workers=w, max_calls_worker=2)))
expect_true(setequal(res, 2))
})

0 comments on commit b8427e9

Please sign in to comment.