Skip to content

Commit

Permalink
Merge pull request #40 from Belphemur/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
kodiakhq[bot] authored Oct 5, 2021
2 parents 3058e2f + 6515952 commit b89526b
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 24 deletions.
67 changes: 49 additions & 18 deletions Job.Scheduler.Tests/JobSchedulerTests.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Job.Scheduler.Builder;
using Job.Scheduler.Job.Action;
using Job.Scheduler.Scheduler;
using Job.Scheduler.Tests.Mocks;
using Job.Scheduler.Utils;
using NUnit.Framework;

namespace Job.Scheduler.Tests
{
[Parallelizable(ParallelScope.Children)]
public class Tests
{
private IJobScheduler _scheduler;
private IJobRunnerBuilder _builder;

[OneTimeSetUp]
Expand All @@ -22,17 +23,12 @@ public void OneTimeSetup()
_builder = new JobRunnerBuilder();
}

[SetUp]
public void Setup()
{
_scheduler = new JobScheduler(_builder);
}

[Test]
public async Task OneTimeJob()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var job = new OneTimeJob();
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
job.HasRun.Should().BeTrue();
}
Expand All @@ -41,9 +37,10 @@ public async Task OneTimeJob()
[Test]
public async Task FailingJobShouldRetry()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var maxRetries = 3;
var job = new FailingRetringJob(new RetryNTimes(maxRetries));
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
job.Ran.Should().Be(4);
jobRunner.Retries.Should().Be(maxRetries);
Expand All @@ -52,18 +49,20 @@ public async Task FailingJobShouldRetry()
[Test]
public async Task MaxRuntimeIsRespected()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var job = new MaxRuntimeJob(new NoRetry(), TimeSpan.FromMilliseconds(50));
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20));
}

[Test]
public async Task MaxRuntimeIsRespectedAndTaskRetried()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var maxRetries = 2;
var job = new MaxRuntimeJob(new RetryNTimes(maxRetries), TimeSpan.FromMilliseconds(50));
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20));
jobRunner.Retries.Should().Be(maxRetries);
Expand All @@ -73,9 +72,10 @@ public async Task MaxRuntimeIsRespectedAndTaskRetried()
[Test]
public async Task MaxRuntimeIsRespectedAndTaskRetriedWithBackoff()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var maxRetries = 3;
var job = new MaxRuntimeJob(new ExponentialBackoffRetry(TimeSpan.FromMilliseconds(10), maxRetries), TimeSpan.FromMilliseconds(80));
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20));
jobRunner.Retries.Should().Be(maxRetries);
Expand All @@ -84,26 +84,57 @@ public async Task MaxRuntimeIsRespectedAndTaskRetriedWithBackoff()
[Test]
public async Task ExecuteInOwnScheduler()
{
using var scheduler = new MockTaskScheduler();
IJobScheduler scheduler = new JobScheduler(_builder);
using var taskScheduler = new MockTaskScheduler();
var job = new ThreadJob(Thread.CurrentThread);
var jobRunner = _scheduler.ScheduleJobInternal(job, scheduler);
var jobRunner = scheduler.ScheduleJobInternal(job, taskScheduler);
await jobRunner.WaitForJob();
job.HasRun.Should().BeTrue();
jobRunner.Retries.Should().Be(0);
scheduler.Count.Should().Be(1);
taskScheduler.Scheduled.Should().Be(1);
job.InitThread.Should().NotBe(job.RunThread);
job.RunThread.Should().Be(scheduler.MainThread);
job.RunThread.Should().Be(taskScheduler.MainThread);
}

[Test]
public async Task ExecuteInDefaultScheduler()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var job = new ThreadJob(Thread.CurrentThread);
var jobRunner = _scheduler.ScheduleJobInternal(job);
var jobRunner = scheduler.ScheduleJobInternal(job);
await jobRunner.WaitForJob();
job.HasRun.Should().BeTrue();
jobRunner.Retries.Should().Be(0);
job.InitThread.Should().Be(job.RunThread);
}

[Test]
public async Task DebounceJobTest()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var list = new List<string>();
var job = new DebounceJob(list, "Single");
var jobRunnerFirst = scheduler.ScheduleJobInternal(job);
await TaskUtils.WaitForDelayOrCancellation(TimeSpan.FromMilliseconds(10), CancellationToken.None);
var jobRunnerSecond = scheduler.ScheduleJobInternal(job);
await jobRunnerFirst.WaitForJob();
await jobRunnerSecond.WaitForJob();

list.Should().ContainSingle(job.Key);
}

[Test]
public async Task DebounceJobAlreadyFinishedTest()
{
IJobScheduler scheduler = new JobScheduler(_builder);
var list = new List<string>();
var job = new DebounceJob(list, "Multiple");
var jobRunnerFirst = scheduler.ScheduleJobInternal(job);
await jobRunnerFirst.WaitForJob();
var jobRunnerSecond = scheduler.ScheduleJobInternal(job);
await jobRunnerSecond.WaitForJob();

list.Should().OnlyContain(s => s == job.Key).And.HaveCount(2);
}
}
}
38 changes: 38 additions & 0 deletions Job.Scheduler.Tests/Mocks/DebounceJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Job.Scheduler.Job;
using Job.Scheduler.Job.Action;
using Job.Scheduler.Job.Exception;

namespace Job.Scheduler.Tests.Mocks
{
public class DebounceJob : IDebounceJob
{
public IRetryAction FailRule { get; } = new NoRetry();
public TimeSpan? MaxRuntime { get; }

private readonly List<string> _list;

public DebounceJob(List<string> list, string key)
{
_list = list;
Key = key;
}

public Task ExecuteAsync(CancellationToken cancellationToken)
{
_list.Add(Key);
return Task.CompletedTask;
}

public Task OnFailure(JobException exception)
{
return Task.CompletedTask;
}

public string Key { get; }
public TimeSpan DebounceTime { get; } = TimeSpan.FromMilliseconds(100);
}
}
4 changes: 2 additions & 2 deletions Job.Scheduler.Tests/Mocks/MockTaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class MockTaskScheduler : TaskScheduler, IDisposable
private readonly BlockingCollection<Task> _tasksCollection = new();
public Thread MainThread { get; }
private readonly CancellationTokenSource _cts = new();
public int Count { get; private set; }
public int Scheduled { get; private set; }

public MockTaskScheduler()
{
Expand All @@ -31,8 +31,8 @@ private void Execute()
{
foreach (var task in _tasksCollection.GetConsumingEnumerable(_cts.Token))
{
Scheduled++;
TryExecuteTask(task);
Count++;
}
}
catch (OperationCanceledException)
Expand Down
17 changes: 17 additions & 0 deletions Job.Scheduler/Job/IJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Job.Scheduler.Job.Action;
Expand Down Expand Up @@ -62,4 +63,20 @@ public interface IDelayedJob : IJob
/// </summary>
public TimeSpan Delay { get; }
}

/// <summary>
/// Job executed once per <see cref="DebounceTime"/> per <see cref="Key"/>
/// </summary>
public interface IDebounceJob : IJob
{
/// <summary>
/// UniqueID of the job
/// </summary>
public string Key { get; }

/// <summary>
/// Delay to wait to execute the job, to be sure there isn't any other job of the same type scheduled
/// </summary>
public TimeSpan DebounceTime { get; }
}
}
27 changes: 27 additions & 0 deletions Job.Scheduler/Job/Runner/DebounceJobRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Job.Scheduler.Utils;

namespace Job.Scheduler.Job.Runner
{
internal class DebounceJobRunner : JobRunner<IDebounceJob>
{
public override string Key => _job.Key;

protected override async Task StartJobAsync(IDebounceJob job, CancellationToken token)
{
await TaskUtils.WaitForDelayOrCancellation(job.DebounceTime, token);
if (token.IsCancellationRequested)
{
return;
}

InnerExecuteJob(job, token);
}

public DebounceJobRunner(IDebounceJob job, Func<IJobRunner, Task> jobDone, TaskScheduler taskScheduler) : base(job, jobDone, taskScheduler)
{
}
}
}
10 changes: 10 additions & 0 deletions Job.Scheduler/Job/Runner/IJobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public interface IJobRunner
/// </summary>
Guid UniqueId { get; }

/// <summary>
/// Type of the job that is run by the runner
/// </summary>
Type JobType { get; }

/// <summary>
/// Is the job still running
/// </summary>
Expand All @@ -29,6 +34,11 @@ public interface IJobRunner
/// </summary>
int Retries { get; }

/// <summary>
/// Key of the job, used for deduplication
/// </summary>
string Key { get; }

/// <summary>
/// Run the job
/// </summary>
Expand Down
6 changes: 5 additions & 1 deletion Job.Scheduler/Job/Runner/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Job.Scheduler.Job.Runner
/// <typeparam name="T"></typeparam>
internal abstract class JobRunner<T> : IJobRunner where T : IJob
{
private readonly T _job;
protected readonly T _job;
private CancellationTokenSource _cancellationTokenSource;
private Task _runningTask;
private Task _runningTaskWithDone;
Expand All @@ -34,6 +34,10 @@ internal abstract class JobRunner<T> : IJobRunner where T : IJob
public TimeSpan Elapsed => _stopwatch.Elapsed;
public int Retries { get; private set; }

public Type JobType => typeof(T);
public virtual string Key => UniqueId.ToString();


protected JobRunner(T job, Func<IJobRunner, Task> jobDone, [CanBeNull] TaskScheduler taskScheduler)
{
_job = job;
Expand Down
19 changes: 16 additions & 3 deletions Job.Scheduler/Scheduler/JobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace Job.Scheduler.Scheduler
/// </summary>
public class JobScheduler : IJobScheduler
{
private readonly ConcurrentDictionary<Guid, IJobRunner> _jobs = new ConcurrentDictionary<Guid, IJobRunner>();
private readonly ConcurrentDictionary<Guid, IJobRunner> _jobs = new();
private readonly ConcurrentDictionary<string, Guid> _debouncedJobs = new();
private readonly IJobRunnerBuilder _jobRunnerBuilder;

public JobScheduler(IJobRunnerBuilder jobRunnerBuilder)
Expand All @@ -32,7 +33,7 @@ public JobScheduler(IJobRunnerBuilder jobRunnerBuilder)
/// <param name="taskScheduler">In which TaskScheduler should the job be run. Default = TaskScheduler.Default</param>
public JobId ScheduleJob(IJob job, CancellationToken token = default, TaskScheduler taskScheduler = null)
{
var runner = ((IJobScheduler) this).ScheduleJobInternal(job, taskScheduler, token);
var runner = ((IJobScheduler)this).ScheduleJobInternal(job, taskScheduler, token);
return new JobId(runner.UniqueId);
}

Expand All @@ -59,10 +60,22 @@ IJobRunner IJobScheduler.ScheduleJobInternal(IJob job, TaskScheduler taskSchedul
{
var runner = _jobRunnerBuilder.Build(job, jobRunner =>
{
_jobs.Remove(jobRunner.UniqueId, out _);
_jobs.TryRemove(jobRunner.UniqueId, out _);
return Task.CompletedTask;
}, taskScheduler);
_jobs.TryAdd(runner.UniqueId, runner);
if (job is IDebounceJob debounceJob)
{
if (_debouncedJobs.TryGetValue(debounceJob.Key, out var guid))
{
//Job could have ended and not be available to be removed anymore
_jobs.TryGetValue(guid, out var debounceRunner);
debounceRunner?.StopAsync(default);
}

_debouncedJobs.AddOrUpdate(debounceJob.Key, runner.UniqueId, (_, _) => runner.UniqueId);
}

runner.Start(token);
return runner;
}
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ By implementing the `IRecurringJob` the scheduler will run indefinitely your job

By implementing the `IDelayedJob` you tell the scheduler to wait a delay before executing your job.

### Debounce Job

By implementing the `IDebounceJob` you tell the scheduler to only run the latest encounter of the job sharing the same key.


## Usage

I advise you to use a Dependency Injection (DI) engine (like SimpleInjector) to register the `JobRunnerBuilder`and `JobScheduler` as singleton.
Expand Down

0 comments on commit b89526b

Please sign in to comment.