From 1a9e8bf56ab77e2344b547faea1c87a39c5a79a8 Mon Sep 17 00:00:00 2001 From: Jorge Ejarque Date: Wed, 12 Feb 2025 11:34:41 +0100 Subject: [PATCH] Improve Google Batch executor stability and error handling (#5690) Signed-off-by: jorgee Signed-off-by: Paolo Di Tommaso Signed-off-by: Ben Sherman Signed-off-by: Jorge Ejarque Co-authored-by: Paolo Di Tommaso Co-authored-by: Ben Sherman Co-authored-by: Chris Hakkaart --- .../exception/ProcessStageException.groovy | 8 +- .../groovy/nextflow/file/FilePorter.groovy | 11 ++- .../processor/TaskArrayCollector.groovy | 4 +- .../nextflow/processor/TaskHandler.groovy | 10 +++ .../nextflow/processor/TaskProcessor.groovy | 12 ++- .../processor/TaskArrayCollectorTest.groovy | 18 +---- .../nextflow/processor/TaskHandlerTest.groovy | 16 ++++ .../processor/TaskProcessorTest.groovy | 50 +++++++++++- .../batch/GoogleBatchTaskHandler.groovy | 77 ++++++++++++------ .../google/batch/client/BatchClient.groovy | 45 ++++++++++- .../batch/GoogleBatchTaskHandlerTest.groovy | 54 +++++++++++-- .../batch/client/BatchClientTest.groovy | 79 +++++++++++++++++++ 12 files changed, 319 insertions(+), 65 deletions(-) create mode 100644 plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchClientTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/exception/ProcessStageException.groovy b/modules/nextflow/src/main/groovy/nextflow/exception/ProcessStageException.groovy index 6e23844e61..947fc6952b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/exception/ProcessStageException.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/exception/ProcessStageException.groovy @@ -22,7 +22,13 @@ import groovy.transform.InheritConstructors * Exception thrown when an error is raised during the process file staging phase * * @author Paolo Di Tommaso + * + * Note: This exception extends ProcessUrecoverableException to force the execution to fail instead of + * retrying the task execution because the file staging process has its own retry strategy, and + * therefore it's likely to be a permanent error. + * + * See also https://github.com/nextflow-io/nextflow/issues/5727 */ @InheritConstructors -class ProcessStageException extends ProcessException implements ShowOnlyExceptionMessage { +class ProcessStageException extends ProcessUnrecoverableException implements ShowOnlyExceptionMessage { } diff --git a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy index 6b138fe8aa..89e720bc7b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy @@ -331,7 +331,7 @@ class FilePorter { // remove the target file that could be have partially downloaded cleanup(stagePath) // check if a stage/download retry is allowed - if( count++ < maxRetries && e !instanceof NoSuchFileException && e !instanceof InterruptedIOException && !Thread.currentThread().isInterrupted() ) { + if( count++ < maxRetries && recoverableError(e) && !Thread.currentThread().isInterrupted() ) { def message = "Unable to stage foreign file: ${filePath.toUriString()} (try ${count} of ${maxRetries}) -- Cause: $e.message" log.isDebugEnabled() ? log.warn(message, e) : log.warn(message) @@ -344,6 +344,15 @@ class FilePorter { } } + private boolean recoverableError(IOException e){ + final result = + e !instanceof NoSuchFileException + && (e instanceof SocketTimeoutException || e !instanceof InterruptedIOException) + && e !instanceof SocketException + log.debug "Stage foreign file exception: recoverable=$result; type=${e.class.name}; message=${e.message}" + return result + } + private String fmtError(Path filePath, Exception e) { def message = "Can't stage file ${FilesEx.toUriString(filePath)}" if( e instanceof NoSuchFileException ) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy index c76eafa6b5..18c15e2897 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy @@ -92,7 +92,7 @@ class TaskArrayCollector { try { // submit task directly if the collector is closed // or if the task is retried (since it might have dynamic resources) - if( closed || task.config.getAttempt() > 1 ) { + if( closed ) { executor.submit(task) return } @@ -138,7 +138,7 @@ class TaskArrayCollector { */ protected TaskArrayRun createTaskArray(List tasks) { // prepare child job launcher scripts - final handlers = tasks.collect( t -> executor.createTaskHandler(t) ) + final handlers = tasks.collect( t -> executor.createTaskHandler(t).withArrayChild(true) ) for( TaskHandler handler : handlers ) { handler.prepareLauncher() } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 61ad631a58..519b5e9947 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -54,6 +54,16 @@ abstract class TaskHandler { */ TaskRun getTask() { task } + /** + * Whenever this handle reference a job array task child + */ + boolean isArrayChild + + TaskHandler withArrayChild(boolean child) { + this.isArrayChild = child + return this + } + /** * Task current status */ diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 4366fbba4f..6d23b79b47 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -2183,15 +2183,13 @@ class TaskProcessor { session.filePorter.transfer(batch) } - final protected void makeTaskContextStage3( TaskRun task, HashCode hash, Path folder ) { - + protected void makeTaskContextStage3( TaskRun task, HashCode hash, Path folder ) { // set hash-code & working directory task.hash = hash task.workDir = folder task.config.workDir = folder task.config.hash = hash.toString() task.config.name = task.getName() - } final protected HashCode createTaskHashKey(TaskRun task) { @@ -2338,12 +2336,12 @@ class TaskProcessor { makeTaskContextStage3(task, hash, folder) + // when no collector is define OR it's a task retry, then submit directly for execution + if( !arrayCollector || task.config.getAttempt() > 1 ) + executor.submit(task) // add the task to the collection of running tasks - if( arrayCollector ) - arrayCollector.collect(task) else - executor.submit(task) - + arrayCollector.collect(task) } protected boolean checkWhenGuard(TaskRun task) { diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy index 2b4c7fb7e9..9e900b0bda 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy @@ -87,23 +87,6 @@ class TaskArrayCollectorTest extends Specification { 1 * executor.submit(task) } - def 'should submit retried tasks directly' () { - given: - def executor = Mock(DummyExecutor) - def collector = Spy(new TaskArrayCollector(null, executor, 5)) - and: - def task = Mock(TaskRun) { - getConfig() >> Mock(TaskConfig) { - getAttempt() >> 2 - } - } - - when: - collector.collect(task) - then: - 1 * executor.submit(task) - } - def 'should create task array' () { given: def exec = Mock(DummyExecutor) { @@ -137,6 +120,7 @@ class TaskArrayCollectorTest extends Specification { when: def taskArray = collector.createTaskArray([task, task, task]) then: + 3 * handler.withArrayChild(true) >> handler 3 * exec.createTaskHandler(task) >> handler 3 * handler.prepareLauncher() 1 * collector.createArrayTaskScript([handler, handler, handler]) >> 'the-task-array-script' diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index 6ec29faf44..fc0c6fe31a 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -263,4 +263,20 @@ class TaskHandlerTest extends Specification { [:] | "job_1" [TOWER_WORKFLOW_ID: '1234'] | "tw-1234-job_1" } + + @Unroll + def 'should set isChildArray flag'() { + given: + def handler = Spy(TaskHandler) + + expect: + !handler.isArrayChild + and: + handler.withArrayChild(VALUE).isArrayChild == VALUE + + where: + VALUE | _ + false | _ + true | _ + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy index 751feeb03f..a80b0397ed 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskProcessorTest.groovy @@ -22,6 +22,7 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.ExecutorService +import com.google.common.hash.HashCode import groovyx.gpars.agent.Agent import nextflow.Global import nextflow.ISession @@ -401,9 +402,7 @@ class TaskProcessorTest extends Specification { } - def 'should update agent state'() { - when: def state = new Agent(new StateObj()) int i = 0 @@ -1172,4 +1171,51 @@ class TaskProcessorTest extends Specification { } + def 'should submit a task' () { + given: + def exec = Mock(Executor) + def proc = Spy(new TaskProcessor(executor: exec)) + and: + def task = Mock(TaskRun) + def hash = Mock(HashCode) + def path = Mock(Path) + + when: + proc.submitTask(task, hash, path) + then: + 1 * proc.makeTaskContextStage3(task, hash, path) >> null + and: + 1 * exec.submit(task) + } + + def 'should collect a task' () { + given: + def exec = Mock(Executor) + def collector = Mock(TaskArrayCollector) + def proc = Spy(new TaskProcessor(executor: exec, arrayCollector: collector)) + and: + def task = Mock(TaskRun) + def hash = Mock(HashCode) + def path = Mock(Path) + + when: + proc.submitTask(task, hash, path) + then: + task.getConfig()>>Mock(TaskConfig) { getAttempt()>>1 } + and: + 1 * proc.makeTaskContextStage3(task, hash, path) >> null + and: + 1 * collector.collect(task) + 0 * exec.submit(task) + + when: + proc.submitTask(task, hash, path) + then: + task.getConfig()>>Mock(TaskConfig) { getAttempt()>>2 } + and: + 1 * proc.makeTaskContextStage3(task, hash, path) >> null + and: + 0 * collector.collect(task) + 1 * exec.submit(task) + } } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index ccf9a944f0..2207388b81 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -24,6 +24,7 @@ import com.google.cloud.batch.v1.AllocationPolicy import com.google.cloud.batch.v1.ComputeResource import com.google.cloud.batch.v1.Environment import com.google.cloud.batch.v1.Job +import com.google.cloud.batch.v1.JobStatus import com.google.cloud.batch.v1.LifecyclePolicy import com.google.cloud.batch.v1.LogsPolicy import com.google.cloud.batch.v1.Runnable @@ -452,49 +453,73 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { * @return Retrieve the submitted task state */ protected String getTaskState() { - final tasks = client.listTasks(jobId) - if( !tasks.iterator().hasNext() ) { - // if there are no tasks checks the job status - return checkJobStatus() - } + return isArrayChild + ? getStateFromTaskStatus() + : getStateFromJobStatus() + } + + protected String getStateFromTaskStatus() { final now = System.currentTimeMillis() final delta = now - timestamp; if( !taskState || delta >= 1_000) { - final status = client.getTaskStatus(jobId, taskId) - final newState = status?.state as String - if( newState ) { - log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState" - taskState = newState - timestamp = now - } - if( newState == 'PENDING' ) { - final eventsCount = status.getStatusEventsCount() - final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null - if( lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED') ) - log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" + final status = client.getTaskInArrayStatus(jobId, taskId) + if( status ) { + inspectTaskStatus(status) + } else { + // If no task status retrieved check job status + final jobStatus = client.getJobStatus(jobId) + inspectJobStatus(jobStatus) } } return taskState } - protected String checkJobStatus() { - final jobStatus = client.getJobStatus(jobId) - final newState = jobStatus?.state as String + protected String getStateFromJobStatus() { + final now = System.currentTimeMillis() + final delta = now - timestamp; + if( !taskState || delta >= 1_000) { + final status = client.getJobStatus(jobId) + inspectJobStatus(status) + } + return taskState + } + + private void inspectTaskStatus(com.google.cloud.batch.v1.TaskStatus status) { + final newState = status?.state as String if (newState) { + log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState" + taskState = newState + timestamp = System.currentTimeMillis() + } + if (newState == 'PENDING') { + final eventsCount = status.getStatusEventsCount() + final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null + if (lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED')) + log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" + } + } + + protected String inspectJobStatus(JobStatus status) { + final newState = status?.state as String + if (newState) { + log.trace "[GOOGLE BATCH] Get job=$jobId state=$newState" taskState = newState timestamp = System.currentTimeMillis() if (newState == "FAILED") { noTaskJobfailure = true } - return taskState - } else { - return "PENDING" + } + if (newState == 'SCHEDULED') { + final eventsCount = status.getStatusEventsCount() + final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null + if (lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED')) + log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" } } - static private final List RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED'] + static private final List RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS'] - static private final List COMPLETED = ['SUCCEEDED', 'FAILED'] + static private final List COMPLETED = ['SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS'] @Override boolean checkIfRunning() { @@ -526,6 +551,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { task.stderr = errorFile } status = TaskStatus.COMPLETED + if( isArrayChild ) + client.removeFromArrayTasks(jobId, taskId) return true } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy index 8b3fd1780a..c2fcf98ea7 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy @@ -17,6 +17,7 @@ package nextflow.cloud.google.batch.client import java.time.temporal.ChronoUnit +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeoutException import java.util.function.Predicate @@ -49,11 +50,12 @@ import groovy.util.logging.Slf4j @Slf4j @CompileStatic class BatchClient { - + private final static long TASK_STATE_INVALID_TIME = 1_000 protected String projectId protected String location protected BatchServiceClient batchServiceClient protected BatchConfig config + private Map arrayTaskStatus = new ConcurrentHashMap() BatchClient(BatchConfig config) { this.config = config @@ -110,7 +112,7 @@ class BatchClient { } Task describeTask(String jobId, String taskId) { - final name = TaskName.of(projectId, location, jobId, 'group0', taskId) + final name = generateTaskName(jobId, taskId) return apply(()-> batchServiceClient.getTask(name)) } @@ -140,6 +142,10 @@ class BatchClient { return location } + String generateTaskName(String jobId, String taskId) { + TaskName.of(projectId, location, jobId, 'group0', taskId) + } + /** * Creates a retry policy using the configuration specified by {@link BatchRetryConfig} * @@ -191,4 +197,39 @@ class BatchClient { // apply the action with return Failsafe.with(policy).get(action) } + + + TaskStatus getTaskInArrayStatus(String jobId, String taskId) { + final taskName = generateTaskName(jobId,taskId) + final now = System.currentTimeMillis() + TaskStatusRecord record = arrayTaskStatus.get(taskName) + if( !record || now - record.timestamp > TASK_STATE_INVALID_TIME ){ + log.debug("[GOOGLE BATCH] Updating tasks status for job $jobId") + updateArrayTasks(jobId, now) + record = arrayTaskStatus.get(taskName) + } + return record?.status + } + + private void updateArrayTasks(String jobId, long now){ + for( Task t: listTasks(jobId) ){ + arrayTaskStatus.put(t.name, new TaskStatusRecord(t.status, now)) + } + } + + void removeFromArrayTasks(String jobId, String taskId){ + final taskName = generateTaskName(jobId,taskId) + TaskStatusRecord record = arrayTaskStatus.remove(taskName) + } +} + +@CompileStatic +class TaskStatusRecord { + protected TaskStatus status + protected long timestamp + + TaskStatusRecord(TaskStatus status, long timestamp) { + this.status = status + this.timestamp = timestamp + } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 43150d17fd..ccb400b5a8 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -17,8 +17,11 @@ package nextflow.cloud.google.batch +import com.google.api.gax.grpc.GrpcStatusCode +import com.google.api.gax.rpc.NotFoundException import com.google.cloud.batch.v1.JobStatus import com.google.cloud.batch.v1.Task +import io.grpc.Status import java.nio.file.Path @@ -462,13 +465,16 @@ class GoogleBatchTaskHandlerTest extends Specification { } - TaskStatus makeTaskStatus(String desc) { - TaskStatus.newBuilder() - .addStatusEvents( + TaskStatus makeTaskStatus(TaskStatus.State state, String desc) { + def builder = TaskStatus.newBuilder() + if (state) + builder.setState(state) + if (desc) + builder.addStatusEvents( StatusEvent.newBuilder() .setDescription(desc) ) - .build() + builder.build() } def 'should detect spot failures from status event'() { @@ -483,8 +489,8 @@ class GoogleBatchTaskHandlerTest extends Specification { when: client.getTaskStatus(jobId, taskId) >>> [ - makeTaskStatus('Task failed due to Spot VM preemption with exit code 50001.'), - makeTaskStatus('Task succeeded') + makeTaskStatus(null,'Task failed due to Spot VM preemption with exit code 50001.'), + makeTaskStatus(null, 'Task succeeded') ] then: handler.getJobError().message == "Task failed due to Spot VM preemption with exit code 50001." @@ -596,7 +602,7 @@ class GoogleBatchTaskHandlerTest extends Specification { builder.build() } - def 'should check job status when no tasks in job '() { + def 'should check job status when no tasks in task array '() { given: def jobId = 'job-id' @@ -615,8 +621,40 @@ class GoogleBatchTaskHandlerTest extends Specification { makeJobStatus(JobStatus.State.FAILED, message) ] then: - handler.getTaskState() == "PENDING" + handler.getTaskState() == null handler.getTaskState() == "FAILED" handler.getJobError().message == message } + + def 'should manage not found when getting task state in task array'() { + given: + def jobId = '1' + def taskId = '1' + def client = Mock(BatchClient) + def task = Mock(TaskRun) { + lazyName() >> 'foo (1)' + } + def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: true)) + + when: + client.generateTaskName(jobId, taskId) >> "$jobId/group0/$taskId" + //Force errors + client.getTaskStatus(jobId, taskId) >> { throw new NotFoundException(new Exception("Error"), GrpcStatusCode.of(Status.Code.NOT_FOUND), false) } + client.getTaskInArrayStatus(jobId, taskId) >> TASK_STATUS + client.getJobStatus(jobId) >> makeJobStatus(JOB_STATUS, "") + then: + handler.getTaskState() == EXPECTED + + where: + EXPECTED | JOB_STATUS | TASK_STATUS + "FAILED" | JobStatus.State.FAILED | null // Task not in the list, get from job + "SUCCEEDED" | JobStatus.State.FAILED | makeTaskStatus(TaskStatus.State.SUCCEEDED, "") // get from task status + } + + def makeTask(String name, TaskStatus.State state){ + Task.newBuilder().setName(name) + .setStatus(TaskStatus.newBuilder().setState(state).build()) + .build() + + } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchClientTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchClientTest.groovy new file mode 100644 index 0000000000..0c5ce96cb2 --- /dev/null +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/client/BatchClientTest.groovy @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package nextflow.cloud.google.batch.client + +import com.google.cloud.batch.v1.Task +import com.google.cloud.batch.v1.TaskName +import com.google.cloud.batch.v1.TaskStatus +import spock.lang.Specification + +/** + * + * @author Jorge Ejarque + */ +class BatchClientTest extends Specification{ + + def 'should return task status with getTaskInArray' () { + given: + def project = 'project-id' + def location = 'location-id' + def job1 = 'job1-id' + def task1 = 'task1-id' + def task1Name = TaskName.of(project, location, job1, 'group0', task1).toString() + def job2 = 'job2-id' + def task2 = 'task2-id' + def task2Name = TaskName.of(project, location, job2, 'group0', task2).toString() + def job3 = 'job3-id' + def task3 = 'task3-id' + def task3Name = TaskName.of(project, location, job3, 'group0', task3).toString() + def arrayTasks = new HashMap() + def client = Spy( new BatchClient( projectId: project, location: location, arrayTaskStatus: arrayTasks ) ) + + when: + client.listTasks(job2) >> { + def list = new LinkedList<>() + list.add(makeTask(task2Name, TaskStatus.State.FAILED)) + return list + } + client.listTasks(job3) >> { + def list = new LinkedList<>() + list.add(makeTask(task3Name, TaskStatus.State.SUCCEEDED)) + return list + } + arrayTasks.put(task1Name, makeTaskStatusRecord(TaskStatus.State.RUNNING, System.currentTimeMillis())) + arrayTasks.put(task2Name, makeTaskStatusRecord(TaskStatus.State.PENDING, System.currentTimeMillis() - 1_001)) + + then: + // recent cached task + client.getTaskInArrayStatus(job1, task1).state == TaskStatus.State.RUNNING + // Outdated cached task + client.getTaskInArrayStatus(job2, task2).state == TaskStatus.State.FAILED + // no cached task + client.getTaskInArrayStatus(job3, task3).state == TaskStatus.State.SUCCEEDED + } + + TaskStatusRecord makeTaskStatusRecord(TaskStatus.State state, long timestamp) { + return new TaskStatusRecord(TaskStatus.newBuilder().setState(state).build(), timestamp) + } + + def makeTask(String name, TaskStatus.State state){ + Task.newBuilder().setName(name) + .setStatus(TaskStatus.newBuilder().setState(state).build()) + .build() + } + +}