Skip to content

Commit

Permalink
fix respecting timeout; increase max for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Dec 6, 2023
1 parent 58668bd commit 506286f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
7 changes: 6 additions & 1 deletion R/master.r
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ master = function(pool, iter, rettype="list", fail_on_error=TRUE,
penv = pool$env(work_chunk=work_chunk)
obj_size = structure(sum(penv$size), class="object_size")
obj_size_fmt = format(obj_size, big.mark=",", units="auto")
if (is.infinite(timeout)) {
timeout = -1L
} else {
timeout = timeout * 1000 # Rcpp API uses msec
}

#TODO: warn before serialization, create pool+env & then submit
if (obj_size/1e6 > getOption("clustermq.data.warning", 500))
Expand All @@ -57,7 +62,7 @@ master = function(pool, iter, rettype="list", fail_on_error=TRUE,

# main event loop
while((!shutdown && submit_index[1] <= n_calls) || jobs_running > 0) {
msg = pool$recv()
msg = pool$recv(timeout)
if (inherits(msg, "worker_error"))
stop("Worker Error: ", msg)

Expand Down
23 changes: 11 additions & 12 deletions tests/testthat/test-5-queue.r
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test_that("control flow", {
skip_on_os("windows")
fx = function(x) x*2
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

Expand All @@ -18,23 +18,23 @@ test_that("control flow with automatic workers", {
options(clustermq.scheduler = "multicore")

fx = function(x) x*2
r = Q(fx, x=1:3, n_jobs=1, timeout=3L)
r = Q(fx, x=1:3, n_jobs=1, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

test_that("common data", {
skip_on_os("windows")
fx = function(x, y) x*2 + y
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, const=list(y=10), workers=w, timeout=3L)
r = Q(fx, x=1:3, const=list(y=10), workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2+10))
})

test_that("export", {
skip_on_os("windows")
fx = function(x) x*2 + z
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, export=list(z=20), workers=w, timeout=3L)
r = Q(fx, x=1:3, export=list(z=20), workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2+20))
})

Expand All @@ -43,7 +43,7 @@ test_that("load package on worker", {
fx = function(x) splitIndices(1,1)
x = "a string"
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=x, pkgs="parallel", workers=w, rettype="character", timeout=3L)
r = Q(fx, x=x, pkgs="parallel", workers=w, rettype="character", timeout=10L)
expect_equal(r, "1")
})

Expand All @@ -52,8 +52,8 @@ test_that("seed reproducibility", {
fx = function(x) sample(1:100, 1)
w1 = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
w2 = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r1 = Q(fx, x=1:3, workers=w1, timeout=3L)
r2 = Q(fx, x=1:3, workers=w2, timeout=3L)
r1 = Q(fx, x=1:3, workers=w1, timeout=10L)
r2 = Q(fx, x=1:3, workers=w2, timeout=10L)
expect_equal(r1, r2)
})

Expand All @@ -64,20 +64,19 @@ test_that("master does not exit loop prematurely", {
x*2
}
w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

test_that("rettype is respected", {
skip_on_os("windows")
fx = function(x) x*2
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, rettype="numeric", workers=w, timeout=3L)
r = Q(fx, x=1:3, rettype="numeric", workers=w, timeout=10L)
expect_equal(r, 1:3*2)
})

test_that("worker timeout throws error", {
skip("FIXME")
skip_on_os("windows")
w = workers(n_jobs=1, qsys_id="multicore", reuse=FALSE)
expect_error(expect_warning(
Expand Down Expand Up @@ -111,7 +110,7 @@ test_that("Q with expired workers throws error quickly", {
w$cleanup()

times = system.time({
expect_error(Q(identity, x=1:3, rettype="numeric", workers=w, timeout=3L))
expect_error(Q(identity, x=1:3, rettype="numeric", workers=w, timeout=10L))
})
expect_true(times[["elapsed"]] < 1)
})
Expand All @@ -122,6 +121,6 @@ test_that("shutdown monitor does not fire on clean disconnects", {
# doing this via a separate call to `workers()` works
# so this seems to be a race condition of some sort
w = workers(n_jobs=2, qsys_id="multicore", reuse=FALSE)
res = Q(Sys.sleep, time=c(0,1), workers=w, timeout=5L)
res = Q(Sys.sleep, time=c(0,1), workers=w, timeout=10L)
expect_equal(res, list(NULL, NULL))
})
16 changes: 8 additions & 8 deletions tests/testthat/test-6-queue_impl.r
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,29 @@ fx = function(x) x*2

test_that("local, explicit", {
w = workers(n_jobs=4, qsys_id="local")
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
success = w$cleanup()
expect_equal(r, as.list(1:3*2))
expect_true(success)
})

test_that("local, n_jobs=0", {
fx = function(x) x*2
r = Q(fx, x=1:3, n_jobs=0, timeout=3L)
r = Q(fx, x=1:3, n_jobs=0, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

test_that("qsys_multicore", {
skip_on_os("windows")
w = workers(n_jobs=4, qsys_id="multicore", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

test_that("qsys_multicore with reuse=TRUE", {
skip_on_os("windows")
w = workers(n_jobs=4, qsys_id="multicore", reuse=TRUE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
success = w$cleanup()
expect_equal(r, as.list(1:3*2))
expect_true(success)
Expand All @@ -38,7 +38,7 @@ test_that("qsys_multiprocess (callr)", {
skip("https://github.com/r-lib/processx/issues/236")

w = workers(n_jobs=2, qsys_id="multiprocess", reuse=TRUE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
success = w$cleanup()
expect_equal(r, as.list(1:3*2))
expect_equal(success, TRUE)
Expand All @@ -51,7 +51,7 @@ test_that("qsys_lsf", {
skip_if_not(has_connectivity(Sys.info()["nodename"]))
skip_on_os("windows")
w = workers(n_jobs=1, qsys_id="lsf", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

Expand All @@ -62,7 +62,7 @@ test_that("qsys_sge", {
skip_if_not(has_connectivity(Sys.info()["nodename"]))
skip_on_os("windows")
w = workers(n_jobs=1, qsys_id="sge", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

Expand All @@ -73,6 +73,6 @@ test_that("qsys_slurm", {
skip_if_not(has_connectivity(Sys.info()["nodename"]))
skip_on_os("windows")
w = workers(n_jobs=1, qsys_id="slurm", reuse=FALSE)
r = Q(fx, x=1:3, workers=w, timeout=3L)
r = Q(fx, x=1:3, workers=w, timeout=10L)
expect_equal(r, as.list(1:3*2))
})

0 comments on commit 506286f

Please sign in to comment.