From abc8d2f396c50da8933930778527364857fec322 Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Fri, 13 Jan 2017 15:10:28 +0300 Subject: [PATCH 1/4] Provide info on currently running jobs #21 --- .../bootique/job/lock/LocalLockHandler.java | 33 +++++--- .../lock/zookeeper/ZkClusterLockHandler.java | 34 +++++--- .../job/runnable/BaseRunnableJob.java | 29 +++++++ .../ErrorHandlingRunnableJobFactory.java | 26 ++++-- .../io/bootique/job/runnable/JobFuture.java | 55 ++++++++++++- .../io/bootique/job/runnable/RunnableJob.java | 6 ++ .../runnable/SimpleRunnableJobFactory.java | 19 +++-- .../job/scheduler/DefaultScheduler.java | 81 ++++++++++++++++--- .../io/bootique/job/scheduler/Scheduler.java | 8 ++ 9 files changed, 244 insertions(+), 47 deletions(-) create mode 100644 src/main/java/io/bootique/job/runnable/BaseRunnableJob.java diff --git a/src/main/java/io/bootique/job/lock/LocalLockHandler.java b/src/main/java/io/bootique/job/lock/LocalLockHandler.java index a5e4a2b2..582dd0bb 100644 --- a/src/main/java/io/bootique/job/lock/LocalLockHandler.java +++ b/src/main/java/io/bootique/job/lock/LocalLockHandler.java @@ -1,10 +1,12 @@ package io.bootique.job.lock; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.bootique.job.runnable.BaseRunnableJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,23 +30,30 @@ public LocalLockHandler() { @Override public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + String lockName = toLockName(metadata); + Lock lock = getLock(lockName); - return () -> { - String lockName = toLockName(metadata); - Lock lock = getLock(lockName); + LOGGER.info("Attempting to lock '{}'", lockName); - LOGGER.info("Attempting to lock '{}'", lockName); + if (!lock.tryLock()) { + LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName); + return new JobResult(metadata, JobOutcome.SKIPPED, null, + "Another job instance owns the lock. Skipping execution"); + } - if (!lock.tryLock()) { - LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName); - return new JobResult(metadata, JobOutcome.SKIPPED, null, - "Another job instance owns the lock. Skipping execution"); + try { + return executable.run(); + } finally { + lock.unlock(); + } } - try { - return executable.run(); - } finally { - lock.unlock(); + @Override + public Map getParameters() { + return executable.getParameters(); } }; } diff --git a/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java b/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java index 1ebaf2e9..644bf489 100644 --- a/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java +++ b/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java @@ -1,6 +1,7 @@ package io.bootique.job.lock.zookeeper; import io.bootique.job.JobMetadata; +import io.bootique.job.runnable.BaseRunnableJob; import io.bootique.job.runnable.JobOutcome; import io.bootique.job.runnable.RunnableJob; import org.slf4j.Logger; @@ -11,6 +12,8 @@ import io.bootique.job.lock.LockHandler; import io.bootique.job.runnable.JobResult; +import java.util.Map; + public class ZkClusterLockHandler implements LockHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ZkClusterLockHandler.class); @@ -29,23 +32,30 @@ public ZkClusterLockHandler(Injector injector) { @Override public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + String lockName = getLockName(metadata); - return () -> { - String lockName = getLockName(metadata); + LOGGER.info("Attempting to lock '{}'", lockName); - LOGGER.info("Attempting to lock '{}'", lockName); + ZkMutex lock = ZkMutex.acquire(injector, lockName); + if (lock == null) { + LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName); + return new JobResult(metadata, JobOutcome.SKIPPED, null, + "Another job instance owns the lock. Skipping execution"); + } - ZkMutex lock = ZkMutex.acquire(injector, lockName); - if (lock == null) { - LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName); - return new JobResult(metadata, JobOutcome.SKIPPED, null, - "Another job instance owns the lock. Skipping execution"); + try { + return executable.run(); + } finally { + lock.release(); + } } - try { - return executable.run(); - } finally { - lock.release(); + @Override + public Map getParameters() { + return executable.getParameters(); } }; } diff --git a/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java b/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java new file mode 100644 index 00000000..deea3777 --- /dev/null +++ b/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java @@ -0,0 +1,29 @@ +package io.bootique.job.runnable; + +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class BaseRunnableJob implements RunnableJob { + + private final AtomicInteger running; + + public BaseRunnableJob() { + this.running = new AtomicInteger(); + } + + @Override + public JobResult run() { + running.incrementAndGet(); + try { + return doRun(); + } finally { + running.decrementAndGet(); + } + } + + protected abstract JobResult doRun(); + + @Override + public boolean isRunning() { + return running.get() > 0; + } +} diff --git a/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java index dd63595b..86e9efb4 100644 --- a/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java @@ -21,14 +21,26 @@ public RunnableJob runnable(Job job, Map parameters) { RunnableJob rj = delegate.runnable(job, parameters); - return () -> { - try { - return rj.run(); - } catch (Throwable th) { - LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th); - return JobResult.unknown(job.getMetadata(), th); + return new RunnableJob() { + @Override + public JobResult run() { + try { + return rj.run(); + } catch (Throwable th) { + LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th); + return JobResult.unknown(job.getMetadata(), th); + } + } + + @Override + public Map getParameters() { + return rj.getParameters(); + } + + @Override + public boolean isRunning() { + return rj.isRunning(); } }; } - } diff --git a/src/main/java/io/bootique/job/runnable/JobFuture.java b/src/main/java/io/bootique/job/runnable/JobFuture.java index b226933d..93b2760c 100644 --- a/src/main/java/io/bootique/job/runnable/JobFuture.java +++ b/src/main/java/io/bootique/job/runnable/JobFuture.java @@ -1,5 +1,7 @@ package io.bootique.job.runnable; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; @@ -9,10 +11,21 @@ public class JobFuture implements ScheduledFuture { + public static Builder forJob(String job) { + return new Builder(job); + } + + private String job; + private Optional runnable; private ScheduledFuture delegate; private Supplier resultSupplier; - public JobFuture(ScheduledFuture delegate, Supplier resultSupplier) { + public JobFuture(String job, + RunnableJob runnable, + ScheduledFuture delegate, + Supplier resultSupplier) { + this.job = job; + this.runnable = Optional.ofNullable(runnable); this.delegate = delegate; this.resultSupplier = resultSupplier; } @@ -42,6 +55,14 @@ public boolean isDone() { return delegate.isDone(); } + public String getJob() { + return job; + } + + public Optional getRunnable() { + return runnable; + } + public JobResult get() { // wait till the job is done and then return the result try { @@ -65,4 +86,36 @@ public JobResult get(long timeout, TimeUnit unit) { return resultSupplier.get(); } + public static class Builder { + + private String job; + private RunnableJob runnable; + private ScheduledFuture future; + private Supplier resultSupplier; + + public Builder(String job) { + this.job = Objects.requireNonNull(job); + } + + public Builder runnable(RunnableJob runnable) { + this.runnable = runnable; + return this; + } + + public Builder future(ScheduledFuture future) { + this.future = future; + return this; + } + + public Builder resultSupplier(Supplier resultSupplier) { + this.resultSupplier = resultSupplier; + return this; + } + + public JobFuture build() { + Objects.requireNonNull(future); + Objects.requireNonNull(resultSupplier); + return new JobFuture(job, runnable, future, resultSupplier); + } + } } diff --git a/src/main/java/io/bootique/job/runnable/RunnableJob.java b/src/main/java/io/bootique/job/runnable/RunnableJob.java index 647dab0b..ff27dd3f 100644 --- a/src/main/java/io/bootique/job/runnable/RunnableJob.java +++ b/src/main/java/io/bootique/job/runnable/RunnableJob.java @@ -1,6 +1,12 @@ package io.bootique.job.runnable; +import java.util.Map; + public interface RunnableJob { JobResult run(); + + Map getParameters(); + + boolean isRunning(); } diff --git a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java index e00e53ff..606a0782 100644 --- a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java @@ -12,14 +12,21 @@ public class SimpleRunnableJobFactory implements RunnableJobFactory { @Override public RunnableJob runnable(Job job, Map parameters) { - return () -> { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters)); - LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters)); + try { + return job.run(parameters); + } finally { + LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName())); + } + } - try { - return job.run(parameters); - } finally { - LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName())); + @Override + public Map getParameters() { + return parameters; } }; } diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index aa1d05ef..4a673e9f 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -12,13 +12,19 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; import java.util.stream.Collectors; public class DefaultScheduler implements Scheduler { @@ -29,6 +35,7 @@ public class DefaultScheduler implements Scheduler { private RunnableJobFactory runnableJobFactory; private JobRegistry jobRegistry; private Collection triggers; + private JobFutures jobFutures; public DefaultScheduler(Collection triggers, TaskScheduler taskScheduler, @@ -38,6 +45,7 @@ public DefaultScheduler(Collection triggers, this.runnableJobFactory = runnableJobFactory; this.taskScheduler = taskScheduler; this.jobRegistry = jobRegistry; + this.jobFutures = new JobFutures(); } @Override @@ -73,6 +81,11 @@ public Collection getTriggers() { return triggers; } + @Override + public Collection getSubmittedJobs() { + return jobFutures.getActiveJobs(); + } + @Override public JobFuture runOnce(String jobName) { return runOnce(jobName, Collections.emptyMap()); @@ -95,8 +108,10 @@ private Optional findJobByName(String jobName) { } private JobFuture invalidJobNameResult(String jobName) { - return new JobFuture(new ExpiredFuture(), - () -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)); + return JobFuture.forJob(jobName) + .future(new ExpiredFuture()) + .resultSupplier(() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)) + .build(); } @Override @@ -106,16 +121,64 @@ public JobFuture runOnce(Job job) { @Override public JobFuture runOnce(Job job, Map parameters) { - RunnableJob rj = runnableJobFactory.runnable(job, parameters); - JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), new Date()); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), new Date())); } - public ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + private ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), trigger)); + } + + private JobFuture submit(Job job, Map parameters, + BiFunction> executor) { + RunnableJob rj = runnableJobFactory.runnable(job, parameters); JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), trigger); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + ScheduledFuture jobFuture = executor.apply(rj, result); + + JobFuture future = JobFuture.forJob(job.getMetadata().getName()) + .future(jobFuture) + .runnable(rj) + .resultSupplier(() -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())) + .build(); + + jobFutures.addFuture(future); + return future; + } + + private static class JobFutures { + + private final Queue submittedJobs; + private final Collection runningJobs; + + public JobFutures() { + this.runningJobs = new LinkedList<>(); + this.submittedJobs = new LinkedBlockingQueue<>(); + } + + public JobFutures addFuture(JobFuture future) { + submittedJobs.add(future); + return this; + } + + public Collection getActiveJobs() { + synchronized (runningJobs) { + Iterator iter = runningJobs.iterator(); + while (iter.hasNext()) { + if (iter.next().isDone()) { + iter.remove(); + } + } + + JobFuture submitted; + while ((submitted = submittedJobs.poll()) != null && !submitted.isDone()) { + runningJobs.add(submitted); + } + + // do copying prior to releasing the lock to avoid possible errors + return new ArrayList<>(runningJobs); + } + } } } diff --git a/src/main/java/io/bootique/job/scheduler/Scheduler.java b/src/main/java/io/bootique/job/scheduler/Scheduler.java index 355e8f08..d46bf0a5 100644 --- a/src/main/java/io/bootique/job/scheduler/Scheduler.java +++ b/src/main/java/io/bootique/job/scheduler/Scheduler.java @@ -54,4 +54,12 @@ public interface Scheduler { * @since 0.13 */ Collection getTriggers(); + + /** + * Returns jobs that are currently submitted for execution. + * + * @return Collection of jobs that were submitted for execution but hadn't yet completed. + * @since 0.13 + */ + Collection getSubmittedJobs(); } From c27f6107fbaa8ce536a72f72c8f76ab9b0f68657 Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Fri, 13 Jan 2017 15:29:44 +0300 Subject: [PATCH 2/4] Provide info on currently running jobs #21 --- .../io/bootique/job/runnable/JobFuture.java | 18 ++++++++++++++--- .../io/bootique/job/runnable/RunnableJob.java | 8 ++++++++ .../job/scheduler/DefaultScheduler.java | 20 +++++++++++++++++-- .../io/bootique/job/scheduler/Scheduler.java | 4 ++++ 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/bootique/job/runnable/JobFuture.java b/src/main/java/io/bootique/job/runnable/JobFuture.java index 93b2760c..4bc410b8 100644 --- a/src/main/java/io/bootique/job/runnable/JobFuture.java +++ b/src/main/java/io/bootique/job/runnable/JobFuture.java @@ -11,12 +11,15 @@ public class JobFuture implements ScheduledFuture { + /** + * @since 0.13 + */ public static Builder forJob(String job) { return new Builder(job); } private String job; - private Optional runnable; + private RunnableJob runnable; private ScheduledFuture delegate; private Supplier resultSupplier; @@ -25,7 +28,7 @@ public JobFuture(String job, ScheduledFuture delegate, Supplier resultSupplier) { this.job = job; - this.runnable = Optional.ofNullable(runnable); + this.runnable = runnable; this.delegate = delegate; this.resultSupplier = resultSupplier; } @@ -55,11 +58,19 @@ public boolean isDone() { return delegate.isDone(); } + /** + * @return Job name + * @since 0.13 + */ public String getJob() { return job; } - public Optional getRunnable() { + /** + * @return Runnable job implementation + * @since 0.13 + */ + public RunnableJob getRunnable() { return runnable; } @@ -113,6 +124,7 @@ public Builder resultSupplier(Supplier resultSupplier) { } public JobFuture build() { + Objects.requireNonNull(runnable); Objects.requireNonNull(future); Objects.requireNonNull(resultSupplier); return new JobFuture(job, runnable, future, resultSupplier); diff --git a/src/main/java/io/bootique/job/runnable/RunnableJob.java b/src/main/java/io/bootique/job/runnable/RunnableJob.java index ff27dd3f..ca4463f4 100644 --- a/src/main/java/io/bootique/job/runnable/RunnableJob.java +++ b/src/main/java/io/bootique/job/runnable/RunnableJob.java @@ -6,7 +6,15 @@ public interface RunnableJob { JobResult run(); + /** + * @return Parameters that the job was submitted with. + * @since 0.13 + */ Map getParameters(); + /** + * @return true if the job is currently executing its' {@link #run()} method. + * @since 0.13 + */ boolean isRunning(); } diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index 4a673e9f..18f4c327 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -98,7 +98,7 @@ public JobFuture runOnce(String jobName, Map parameters) { Job job = jobOptional.get(); return runOnce(job, parameters); } else { - return invalidJobNameResult(jobName); + return invalidJobNameResult(jobName, parameters); } } @@ -107,9 +107,25 @@ private Optional findJobByName(String jobName) { return (job == null) ? Optional.empty() : Optional.of(job); } - private JobFuture invalidJobNameResult(String jobName) { + private JobFuture invalidJobNameResult(String jobName, Map parameters) { return JobFuture.forJob(jobName) .future(new ExpiredFuture()) + .runnable(new RunnableJob() { + @Override + public JobResult run() { + return JobResult.unknown(JobMetadata.build(jobName)); + } + + @Override + public Map getParameters() { + return parameters; + } + + @Override + public boolean isRunning() { + return false; + } + }) .resultSupplier(() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)) .build(); } diff --git a/src/main/java/io/bootique/job/scheduler/Scheduler.java b/src/main/java/io/bootique/job/scheduler/Scheduler.java index d46bf0a5..9017df1b 100644 --- a/src/main/java/io/bootique/job/scheduler/Scheduler.java +++ b/src/main/java/io/bootique/job/scheduler/Scheduler.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.Map; +import java.util.concurrent.Future; public interface Scheduler { @@ -58,6 +59,9 @@ public interface Scheduler { /** * Returns jobs that are currently submitted for execution. * + * Note that this method is inherently racy, and some futures in the returned collection might in fact be completed + * at the time this method returns. One may check the individual futures for completion by calling {@link Future#isDone()}. + * * @return Collection of jobs that were submitted for execution but hadn't yet completed. * @since 0.13 */ From a2691f26cbe46ac273ba7491e6550eeba3dfaf62 Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Fri, 13 Jan 2017 15:43:56 +0300 Subject: [PATCH 3/4] Provide info on currently running jobs #21 --- .../bootique/job/scheduler/DefaultScheduler.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index 18f4c327..8c3ee3c2 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -12,6 +12,7 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -171,6 +172,19 @@ private static class JobFutures { public JobFutures() { this.runningJobs = new LinkedList<>(); this.submittedJobs = new LinkedBlockingQueue<>(); + + // If getActiveJobs() is rarely called, method response time and memory usage will grow over time. + // To prevent such situation a daemon will perform periodic cleanup of submitted and active jobs. + Thread cleaner = new Thread(() -> { + getActiveJobs(); + try { + Thread.sleep(Duration.ofSeconds(60).toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, "io.bootique.job.Scheduler.JobFutureCleaner"); + cleaner.setDaemon(true); + cleaner.start(); } public JobFutures addFuture(JobFuture future) { From cbe66c9d08385dd02f044f94396b49f965a4db5a Mon Sep 17 00:00:00 2001 From: Andrei Tomashpolskiy Date: Fri, 13 Jan 2017 15:45:46 +0300 Subject: [PATCH 4/4] Provide info on currently running jobs #21 --- .../java/io/bootique/job/scheduler/DefaultScheduler.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index 8c3ee3c2..21461332 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -25,6 +25,7 @@ import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -168,10 +169,12 @@ private static class JobFutures { private final Queue submittedJobs; private final Collection runningJobs; + private final ReentrantLock lock; public JobFutures() { this.runningJobs = new LinkedList<>(); this.submittedJobs = new LinkedBlockingQueue<>(); + this.lock = new ReentrantLock(); // If getActiveJobs() is rarely called, method response time and memory usage will grow over time. // To prevent such situation a daemon will perform periodic cleanup of submitted and active jobs. @@ -193,7 +196,8 @@ public JobFutures addFuture(JobFuture future) { } public Collection getActiveJobs() { - synchronized (runningJobs) { + lock.lock(); + try { Iterator iter = runningJobs.iterator(); while (iter.hasNext()) { if (iter.next().isDone()) { @@ -208,6 +212,8 @@ public Collection getActiveJobs() { // do copying prior to releasing the lock to avoid possible errors return new ArrayList<>(runningJobs); + } finally { + lock.unlock(); } } }