Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide info on currently running jobs #21 #24

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions src/main/java/io/bootique/job/lock/LocalLockHandler.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.bootique.job.lock;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.bootique.job.runnable.BaseRunnableJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,23 +30,30 @@ public LocalLockHandler() {

@Override
public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) {
return new BaseRunnableJob() {
@Override
protected JobResult doRun() {
String lockName = toLockName(metadata);
Lock lock = getLock(lockName);

return () -> {
String lockName = toLockName(metadata);
Lock lock = getLock(lockName);
LOGGER.info("Attempting to lock '{}'", lockName);

LOGGER.info("Attempting to lock '{}'", lockName);
if (!lock.tryLock()) {
LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName);
return new JobResult(metadata, JobOutcome.SKIPPED, null,
"Another job instance owns the lock. Skipping execution");
}

if (!lock.tryLock()) {
LOGGER.info("== Another job instance owns the lock. Skipping execution of '{}'", lockName);
return new JobResult(metadata, JobOutcome.SKIPPED, null,
"Another job instance owns the lock. Skipping execution");
try {
return executable.run();
} finally {
lock.unlock();
}
}

try {
return executable.run();
} finally {
lock.unlock();
@Override
public Map<String, Object> getParameters() {
return executable.getParameters();
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.bootique.job.lock.zookeeper;

import io.bootique.job.JobMetadata;
import io.bootique.job.runnable.BaseRunnableJob;
import io.bootique.job.runnable.JobOutcome;
import io.bootique.job.runnable.RunnableJob;
import org.slf4j.Logger;
Expand All @@ -11,6 +12,8 @@
import io.bootique.job.lock.LockHandler;
import io.bootique.job.runnable.JobResult;

import java.util.Map;

public class ZkClusterLockHandler implements LockHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ZkClusterLockHandler.class);
Expand All @@ -29,23 +32,30 @@ public ZkClusterLockHandler(Injector injector) {

@Override
public RunnableJob lockingJob(RunnableJob executable, JobMetadata metadata) {
return new BaseRunnableJob() {
@Override
protected JobResult doRun() {
String lockName = getLockName(metadata);

return () -> {
String lockName = getLockName(metadata);
LOGGER.info("Attempting to lock '{}'", lockName);

LOGGER.info("Attempting to lock '{}'", lockName);
ZkMutex lock = ZkMutex.acquire(injector, lockName);
if (lock == null) {
LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName);
return new JobResult(metadata, JobOutcome.SKIPPED, null,
"Another job instance owns the lock. Skipping execution");
}

ZkMutex lock = ZkMutex.acquire(injector, lockName);
if (lock == null) {
LOGGER.info("** Another job instance owns the lock. Skipping execution of '{}'", lockName);
return new JobResult(metadata, JobOutcome.SKIPPED, null,
"Another job instance owns the lock. Skipping execution");
try {
return executable.run();
} finally {
lock.release();
}
}

try {
return executable.run();
} finally {
lock.release();
@Override
public Map<String, Object> getParameters() {
return executable.getParameters();
}
};
}
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/bootique/job/runnable/BaseRunnableJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.bootique.job.runnable;

import java.util.concurrent.atomic.AtomicInteger;

public abstract class BaseRunnableJob implements RunnableJob {

private final AtomicInteger running;

public BaseRunnableJob() {
this.running = new AtomicInteger();
}

@Override
public JobResult run() {
running.incrementAndGet();
try {
return doRun();
} finally {
running.decrementAndGet();
}
}

protected abstract JobResult doRun();

@Override
public boolean isRunning() {
return running.get() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,26 @@ public RunnableJob runnable(Job job, Map<String, Object> parameters) {

RunnableJob rj = delegate.runnable(job, parameters);

return () -> {
try {
return rj.run();
} catch (Throwable th) {
LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th);
return JobResult.unknown(job.getMetadata(), th);
return new RunnableJob() {
@Override
public JobResult run() {
try {
return rj.run();
} catch (Throwable th) {
LOGGER.info("Exception while running job '{}'", job.getMetadata().getName(), th);
return JobResult.unknown(job.getMetadata(), th);
}
}

@Override
public Map<String, Object> getParameters() {
return rj.getParameters();
}

@Override
public boolean isRunning() {
return rj.isRunning();
}
};
}

}
67 changes: 66 additions & 1 deletion src/main/java/io/bootique/job/runnable/JobFuture.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.bootique.job.runnable;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -9,10 +11,24 @@

public class JobFuture implements ScheduledFuture<JobResult> {

/**
* @since 0.13
*/
public static Builder forJob(String job) {
return new Builder(job);
}

private String job;
private RunnableJob runnable;
private ScheduledFuture<?> delegate;
private Supplier<JobResult> resultSupplier;

public JobFuture(ScheduledFuture<?> delegate, Supplier<JobResult> resultSupplier) {
public JobFuture(String job,
RunnableJob runnable,
ScheduledFuture<?> delegate,
Supplier<JobResult> resultSupplier) {
this.job = job;
this.runnable = runnable;
this.delegate = delegate;
this.resultSupplier = resultSupplier;
}
Expand Down Expand Up @@ -42,6 +58,22 @@ public boolean isDone() {
return delegate.isDone();
}

/**
* @return Job name
* @since 0.13
*/
public String getJob() {
return job;
}

/**
* @return Runnable job implementation
* @since 0.13
*/
public RunnableJob getRunnable() {
return runnable;
}

public JobResult get() {
// wait till the job is done and then return the result
try {
Expand All @@ -65,4 +97,37 @@ public JobResult get(long timeout, TimeUnit unit) {
return resultSupplier.get();
}

public static class Builder {

private String job;
private RunnableJob runnable;
private ScheduledFuture<?> future;
private Supplier<JobResult> resultSupplier;

public Builder(String job) {
this.job = Objects.requireNonNull(job);
}

public Builder runnable(RunnableJob runnable) {
this.runnable = runnable;
return this;
}

public Builder future(ScheduledFuture<?> future) {
this.future = future;
return this;
}

public Builder resultSupplier(Supplier<JobResult> resultSupplier) {
this.resultSupplier = resultSupplier;
return this;
}

public JobFuture build() {
Objects.requireNonNull(runnable);
Objects.requireNonNull(future);
Objects.requireNonNull(resultSupplier);
return new JobFuture(job, runnable, future, resultSupplier);
}
}
}
14 changes: 14 additions & 0 deletions src/main/java/io/bootique/job/runnable/RunnableJob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
package io.bootique.job.runnable;

import java.util.Map;

public interface RunnableJob {

JobResult run();

/**
* @return Parameters that the job was submitted with.
* @since 0.13
*/
Map<String, Object> getParameters();

/**
* @return true if the job is currently executing its' {@link #run()} method.
* @since 0.13
*/
boolean isRunning();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@ public class SimpleRunnableJobFactory implements RunnableJobFactory {

@Override
public RunnableJob runnable(Job job, Map<String, Object> parameters) {
return () -> {
return new BaseRunnableJob() {
@Override
protected JobResult doRun() {
LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters));

LOGGER.info(String.format("job '%s' started with params %s", job.getMetadata().getName(), parameters));
try {
return job.run(parameters);
} finally {
LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName()));
}
}

try {
return job.run(parameters);
} finally {
LOGGER.info(String.format("job '%s' finished", job.getMetadata().getName()));
@Override
public Map<String, Object> getParameters() {
return parameters;
}
};
}
Expand Down
Loading