diff --git a/src/main/java/io/bootique/job/lock/LocalLockHandler.java b/src/main/java/io/bootique/job/lock/LocalLockHandler.java index a5e4a2b2..582dd0bb 100644 --- a/src/main/java/io/bootique/job/lock/LocalLockHandler.java +++ b/src/main/java/io/bootique/job/lock/LocalLockHandler.java @@ -1,10 +1,12 @@ package io.bootique.job.lock; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import io.bootique.job.runnable.BaseRunnableJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,23 +30,30 @@ public LocalLockHandler() { @Override public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + String lockName = toLockName(metadata); + Lock lock = getLock(lockName); - return () -> { - String lockName = toLockName(metadata); - Lock lock = getLock(lockName); + LOGGER.info("Attempting to lock '{}'", lockName); - LOGGER.info("Attempting to lock '{}'", lockName); + if (!lock.tryLock()) { + LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName); + return new JobResult(metadata, JobOutcome.SKIPPED, null, + "Another job instance owns the lock. Skipping execution"); + } - if (!lock.tryLock()) { - LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName); - return new JobResult(metadata, JobOutcome.SKIPPED, null, - "Another job instance owns the lock. Skipping execution"); + try { + return executable.run(); + } finally { + lock.unlock(); + } } - try { - return executable.run(); - } finally { - lock.unlock(); + @Override + public Map getParameters() { + return executable.getParameters(); } }; } diff --git a/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java b/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java index 1ebaf2e9..644bf489 100644 --- a/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java +++ b/src/main/java/io/bootique/job/lock/zookeeper/ZkClusterLockHandler.java @@ -1,6 +1,7 @@ package io.bootique.job.lock.zookeeper; import io.bootique.job.JobMetadata; +import io.bootique.job.runnable.BaseRunnableJob; import io.bootique.job.runnable.JobOutcome; import io.bootique.job.runnable.RunnableJob; import org.slf4j.Logger; @@ -11,6 +12,8 @@ import io.bootique.job.lock.LockHandler; import io.bootique.job.runnable.JobResult; +import java.util.Map; + public class ZkClusterLockHandler implements LockHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ZkClusterLockHandler.class); @@ -29,23 +32,30 @@ public ZkClusterLockHandler(Injector injector) { @Override public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + String lockName = getLockName(metadata); - return () -> { - String lockName = getLockName(metadata); + LOGGER.info("Attempting to lock '{}'", lockName); - LOGGER.info("Attempting to lock '{}'", lockName); + ZkMutex lock = ZkMutex.acquire(injector, lockName); + if (lock == null) { + LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName); + return new JobResult(metadata, JobOutcome.SKIPPED, null, + "Another job instance owns the lock. Skipping execution"); + } - ZkMutex lock = ZkMutex.acquire(injector, lockName); - if (lock == null) { - LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName); - return new JobResult(metadata, JobOutcome.SKIPPED, null, - "Another job instance owns the lock. Skipping execution"); + try { + return executable.run(); + } finally { + lock.release(); + } } - try { - return executable.run(); - } finally { - lock.release(); + @Override + public Map getParameters() { + return executable.getParameters(); } }; } diff --git a/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java b/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java new file mode 100644 index 00000000..deea3777 --- /dev/null +++ b/src/main/java/io/bootique/job/runnable/BaseRunnableJob.java @@ -0,0 +1,29 @@ +package io.bootique.job.runnable; + +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class BaseRunnableJob implements RunnableJob { + + private final AtomicInteger running; + + public BaseRunnableJob() { + this.running = new AtomicInteger(); + } + + @Override + public JobResult run() { + running.incrementAndGet(); + try { + return doRun(); + } finally { + running.decrementAndGet(); + } + } + + protected abstract JobResult doRun(); + + @Override + public boolean isRunning() { + return running.get() > 0; + } +} diff --git a/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java index dd63595b..86e9efb4 100644 --- a/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/ErrorHandlingRunnableJobFactory.java @@ -21,14 +21,26 @@ public RunnableJob runnable(Job job, Map parameters) { RunnableJob rj = delegate.runnable(job, parameters); - return () -> { - try { - return rj.run(); - } catch (Throwable th) { - LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th); - return JobResult.unknown(job.getMetadata(), th); + return new RunnableJob() { + @Override + public JobResult run() { + try { + return rj.run(); + } catch (Throwable th) { + LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th); + return JobResult.unknown(job.getMetadata(), th); + } + } + + @Override + public Map getParameters() { + return rj.getParameters(); + } + + @Override + public boolean isRunning() { + return rj.isRunning(); } }; } - } diff --git a/src/main/java/io/bootique/job/runnable/JobFuture.java b/src/main/java/io/bootique/job/runnable/JobFuture.java index b226933d..4bc410b8 100644 --- a/src/main/java/io/bootique/job/runnable/JobFuture.java +++ b/src/main/java/io/bootique/job/runnable/JobFuture.java @@ -1,5 +1,7 @@ package io.bootique.job.runnable; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; @@ -9,10 +11,24 @@ public class JobFuture implements ScheduledFuture { + /** + * @since 0.13 + */ + public static Builder forJob(String job) { + return new Builder(job); + } + + private String job; + private RunnableJob runnable; private ScheduledFuture delegate; private Supplier resultSupplier; - public JobFuture(ScheduledFuture delegate, Supplier resultSupplier) { + public JobFuture(String job, + RunnableJob runnable, + ScheduledFuture delegate, + Supplier resultSupplier) { + this.job = job; + this.runnable = runnable; this.delegate = delegate; this.resultSupplier = resultSupplier; } @@ -42,6 +58,22 @@ public boolean isDone() { return delegate.isDone(); } + /** + * @return Job name + * @since 0.13 + */ + public String getJob() { + return job; + } + + /** + * @return Runnable job implementation + * @since 0.13 + */ + public RunnableJob getRunnable() { + return runnable; + } + public JobResult get() { // wait till the job is done and then return the result try { @@ -65,4 +97,37 @@ public JobResult get(long timeout, TimeUnit unit) { return resultSupplier.get(); } + public static class Builder { + + private String job; + private RunnableJob runnable; + private ScheduledFuture future; + private Supplier resultSupplier; + + public Builder(String job) { + this.job = Objects.requireNonNull(job); + } + + public Builder runnable(RunnableJob runnable) { + this.runnable = runnable; + return this; + } + + public Builder future(ScheduledFuture future) { + this.future = future; + return this; + } + + public Builder resultSupplier(Supplier resultSupplier) { + this.resultSupplier = resultSupplier; + return this; + } + + public JobFuture build() { + Objects.requireNonNull(runnable); + Objects.requireNonNull(future); + Objects.requireNonNull(resultSupplier); + return new JobFuture(job, runnable, future, resultSupplier); + } + } } diff --git a/src/main/java/io/bootique/job/runnable/RunnableJob.java b/src/main/java/io/bootique/job/runnable/RunnableJob.java index 647dab0b..ca4463f4 100644 --- a/src/main/java/io/bootique/job/runnable/RunnableJob.java +++ b/src/main/java/io/bootique/job/runnable/RunnableJob.java @@ -1,6 +1,20 @@ package io.bootique.job.runnable; +import java.util.Map; + public interface RunnableJob { JobResult run(); + + /** + * @return Parameters that the job was submitted with. + * @since 0.13 + */ + Map getParameters(); + + /** + * @return true if the job is currently executing its' {@link #run()} method. + * @since 0.13 + */ + boolean isRunning(); } diff --git a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java index e00e53ff..606a0782 100644 --- a/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java +++ b/src/main/java/io/bootique/job/runnable/SimpleRunnableJobFactory.java @@ -12,14 +12,21 @@ public class SimpleRunnableJobFactory implements RunnableJobFactory { @Override public RunnableJob runnable(Job job, Map parameters) { - return () -> { + return new BaseRunnableJob() { + @Override + protected JobResult doRun() { + LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters)); - LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters)); + try { + return job.run(parameters); + } finally { + LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName())); + } + } - try { - return job.run(parameters); - } finally { - LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName())); + @Override + public Map getParameters() { + return parameters; } }; } diff --git a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java index aa1d05ef..21461332 100644 --- a/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java +++ b/src/main/java/io/bootique/job/scheduler/DefaultScheduler.java @@ -12,13 +12,21 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.stream.Collectors; public class DefaultScheduler implements Scheduler { @@ -29,6 +37,7 @@ public class DefaultScheduler implements Scheduler { private RunnableJobFactory runnableJobFactory; private JobRegistry jobRegistry; private Collection triggers; + private JobFutures jobFutures; public DefaultScheduler(Collection triggers, TaskScheduler taskScheduler, @@ -38,6 +47,7 @@ public DefaultScheduler(Collection triggers, this.runnableJobFactory = runnableJobFactory; this.taskScheduler = taskScheduler; this.jobRegistry = jobRegistry; + this.jobFutures = new JobFutures(); } @Override @@ -73,6 +83,11 @@ public Collection getTriggers() { return triggers; } + @Override + public Collection getSubmittedJobs() { + return jobFutures.getActiveJobs(); + } + @Override public JobFuture runOnce(String jobName) { return runOnce(jobName, Collections.emptyMap()); @@ -85,7 +100,7 @@ public JobFuture runOnce(String jobName, Map parameters) { Job job = jobOptional.get(); return runOnce(job, parameters); } else { - return invalidJobNameResult(jobName); + return invalidJobNameResult(jobName, parameters); } } @@ -94,9 +109,27 @@ private Optional findJobByName(String jobName) { return (job == null) ? Optional.empty() : Optional.of(job); } - private JobFuture invalidJobNameResult(String jobName) { - return new JobFuture(new ExpiredFuture(), - () -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)); + private JobFuture invalidJobNameResult(String jobName, Map parameters) { + return JobFuture.forJob(jobName) + .future(new ExpiredFuture()) + .runnable(new RunnableJob() { + @Override + public JobResult run() { + return JobResult.unknown(JobMetadata.build(jobName)); + } + + @Override + public Map getParameters() { + return parameters; + } + + @Override + public boolean isRunning() { + return false; + } + }) + .resultSupplier(() -> JobResult.failure(JobMetadata.build(jobName), "Invalid job name: " + jobName)) + .build(); } @Override @@ -106,16 +139,82 @@ public JobFuture runOnce(Job job) { @Override public JobFuture runOnce(Job job, Map parameters) { - RunnableJob rj = runnableJobFactory.runnable(job, parameters); - JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), new Date()); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), new Date())); } - public ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + private ScheduledFuture schedule(Job job, Map parameters, Trigger trigger) { + return submit(job, parameters, + (rj, result) -> taskScheduler.schedule(() -> result[0] = rj.run(), trigger)); + } + + private JobFuture submit(Job job, Map parameters, + BiFunction> executor) { + RunnableJob rj = runnableJobFactory.runnable(job, parameters); JobResult[] result = new JobResult[1]; - ScheduledFuture jobFuture = taskScheduler.schedule(() -> result[0] = rj.run(), trigger); - return new JobFuture(jobFuture, () -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())); + ScheduledFuture jobFuture = executor.apply(rj, result); + + JobFuture future = JobFuture.forJob(job.getMetadata().getName()) + .future(jobFuture) + .runnable(rj) + .resultSupplier(() -> result[0] != null ? result[0] : JobResult.unknown(job.getMetadata())) + .build(); + + jobFutures.addFuture(future); + return future; + } + + private static class JobFutures { + + private final Queue submittedJobs; + private final Collection runningJobs; + private final ReentrantLock lock; + + public JobFutures() { + this.runningJobs = new LinkedList<>(); + this.submittedJobs = new LinkedBlockingQueue<>(); + this.lock = new ReentrantLock(); + + // If getActiveJobs() is rarely called, method response time and memory usage will grow over time. + // To prevent such situation a daemon will perform periodic cleanup of submitted and active jobs. + Thread cleaner = new Thread(() -> { + getActiveJobs(); + try { + Thread.sleep(Duration.ofSeconds(60).toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, "io.bootique.job.Scheduler.JobFutureCleaner"); + cleaner.setDaemon(true); + cleaner.start(); + } + + public JobFutures addFuture(JobFuture future) { + submittedJobs.add(future); + return this; + } + + public Collection getActiveJobs() { + lock.lock(); + try { + Iterator iter = runningJobs.iterator(); + while (iter.hasNext()) { + if (iter.next().isDone()) { + iter.remove(); + } + } + + JobFuture submitted; + while ((submitted = submittedJobs.poll()) != null && !submitted.isDone()) { + runningJobs.add(submitted); + } + + // do copying prior to releasing the lock to avoid possible errors + return new ArrayList<>(runningJobs); + } finally { + lock.unlock(); + } + } } } diff --git a/src/main/java/io/bootique/job/scheduler/Scheduler.java b/src/main/java/io/bootique/job/scheduler/Scheduler.java index 355e8f08..9017df1b 100644 --- a/src/main/java/io/bootique/job/scheduler/Scheduler.java +++ b/src/main/java/io/bootique/job/scheduler/Scheduler.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.Map; +import java.util.concurrent.Future; public interface Scheduler { @@ -54,4 +55,15 @@ public interface Scheduler { * @since 0.13 */ Collection getTriggers(); + + /** + * Returns jobs that are currently submitted for execution. + * + * Note that this method is inherently racy, and some futures in the returned collection might in fact be completed + * at the time this method returns. One may check the individual futures for completion by calling {@link Future#isDone()}. + * + * @return Collection of jobs that were submitted for execution but hadn't yet completed. + * @since 0.13 + */ + Collection getSubmittedJobs(); }