diff --git a/Job.Scheduler.Tests/JobSchedulerTests.cs b/Job.Scheduler.Tests/JobSchedulerTests.cs index 5a3abd6..b975263 100644 --- a/Job.Scheduler.Tests/JobSchedulerTests.cs +++ b/Job.Scheduler.Tests/JobSchedulerTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using FluentAssertions; @@ -6,6 +7,7 @@ using Job.Scheduler.Job.Action; using Job.Scheduler.Scheduler; using Job.Scheduler.Tests.Mocks; +using Job.Scheduler.Utils; using NUnit.Framework; namespace Job.Scheduler.Tests @@ -13,7 +15,6 @@ namespace Job.Scheduler.Tests [Parallelizable(ParallelScope.Children)] public class Tests { - private IJobScheduler _scheduler; private IJobRunnerBuilder _builder; [OneTimeSetUp] @@ -22,17 +23,12 @@ public void OneTimeSetup() _builder = new JobRunnerBuilder(); } - [SetUp] - public void Setup() - { - _scheduler = new JobScheduler(_builder); - } - [Test] public async Task OneTimeJob() { + IJobScheduler scheduler = new JobScheduler(_builder); var job = new OneTimeJob(); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); job.HasRun.Should().BeTrue(); } @@ -41,9 +37,10 @@ public async Task OneTimeJob() [Test] public async Task FailingJobShouldRetry() { + IJobScheduler scheduler = new JobScheduler(_builder); var maxRetries = 3; var job = new FailingRetringJob(new RetryNTimes(maxRetries)); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); job.Ran.Should().Be(4); jobRunner.Retries.Should().Be(maxRetries); @@ -52,8 +49,9 @@ public async Task FailingJobShouldRetry() [Test] public async Task MaxRuntimeIsRespected() { + IJobScheduler scheduler = new JobScheduler(_builder); var job = new MaxRuntimeJob(new NoRetry(), TimeSpan.FromMilliseconds(50)); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20)); } @@ -61,9 +59,10 @@ public async Task MaxRuntimeIsRespected() [Test] public async Task MaxRuntimeIsRespectedAndTaskRetried() { + IJobScheduler scheduler = new JobScheduler(_builder); var maxRetries = 2; var job = new MaxRuntimeJob(new RetryNTimes(maxRetries), TimeSpan.FromMilliseconds(50)); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20)); jobRunner.Retries.Should().Be(maxRetries); @@ -73,9 +72,10 @@ public async Task MaxRuntimeIsRespectedAndTaskRetried() [Test] public async Task MaxRuntimeIsRespectedAndTaskRetriedWithBackoff() { + IJobScheduler scheduler = new JobScheduler(_builder); var maxRetries = 3; var job = new MaxRuntimeJob(new ExponentialBackoffRetry(TimeSpan.FromMilliseconds(10), maxRetries), TimeSpan.FromMilliseconds(80)); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); jobRunner.Elapsed.Should().BeCloseTo(job.MaxRuntime!.Value, TimeSpan.FromMilliseconds(20)); jobRunner.Retries.Should().Be(maxRetries); @@ -84,26 +84,57 @@ public async Task MaxRuntimeIsRespectedAndTaskRetriedWithBackoff() [Test] public async Task ExecuteInOwnScheduler() { - using var scheduler = new MockTaskScheduler(); + IJobScheduler scheduler = new JobScheduler(_builder); + using var taskScheduler = new MockTaskScheduler(); var job = new ThreadJob(Thread.CurrentThread); - var jobRunner = _scheduler.ScheduleJobInternal(job, scheduler); + var jobRunner = scheduler.ScheduleJobInternal(job, taskScheduler); await jobRunner.WaitForJob(); job.HasRun.Should().BeTrue(); jobRunner.Retries.Should().Be(0); - scheduler.Count.Should().Be(1); + taskScheduler.Scheduled.Should().Be(1); job.InitThread.Should().NotBe(job.RunThread); - job.RunThread.Should().Be(scheduler.MainThread); + job.RunThread.Should().Be(taskScheduler.MainThread); } - + [Test] public async Task ExecuteInDefaultScheduler() { + IJobScheduler scheduler = new JobScheduler(_builder); var job = new ThreadJob(Thread.CurrentThread); - var jobRunner = _scheduler.ScheduleJobInternal(job); + var jobRunner = scheduler.ScheduleJobInternal(job); await jobRunner.WaitForJob(); job.HasRun.Should().BeTrue(); jobRunner.Retries.Should().Be(0); job.InitThread.Should().Be(job.RunThread); } + + [Test] + public async Task DebounceJobTest() + { + IJobScheduler scheduler = new JobScheduler(_builder); + var list = new List(); + var job = new DebounceJob(list, "Single"); + var jobRunnerFirst = scheduler.ScheduleJobInternal(job); + await TaskUtils.WaitForDelayOrCancellation(TimeSpan.FromMilliseconds(10), CancellationToken.None); + var jobRunnerSecond = scheduler.ScheduleJobInternal(job); + await jobRunnerFirst.WaitForJob(); + await jobRunnerSecond.WaitForJob(); + + list.Should().ContainSingle(job.Key); + } + + [Test] + public async Task DebounceJobAlreadyFinishedTest() + { + IJobScheduler scheduler = new JobScheduler(_builder); + var list = new List(); + var job = new DebounceJob(list, "Multiple"); + var jobRunnerFirst = scheduler.ScheduleJobInternal(job); + await jobRunnerFirst.WaitForJob(); + var jobRunnerSecond = scheduler.ScheduleJobInternal(job); + await jobRunnerSecond.WaitForJob(); + + list.Should().OnlyContain(s => s == job.Key).And.HaveCount(2); + } } } \ No newline at end of file diff --git a/Job.Scheduler.Tests/Mocks/DebounceJob.cs b/Job.Scheduler.Tests/Mocks/DebounceJob.cs new file mode 100644 index 0000000..b02622a --- /dev/null +++ b/Job.Scheduler.Tests/Mocks/DebounceJob.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Job.Scheduler.Job; +using Job.Scheduler.Job.Action; +using Job.Scheduler.Job.Exception; + +namespace Job.Scheduler.Tests.Mocks +{ + public class DebounceJob : IDebounceJob + { + public IRetryAction FailRule { get; } = new NoRetry(); + public TimeSpan? MaxRuntime { get; } + + private readonly List _list; + + public DebounceJob(List list, string key) + { + _list = list; + Key = key; + } + + public Task ExecuteAsync(CancellationToken cancellationToken) + { + _list.Add(Key); + return Task.CompletedTask; + } + + public Task OnFailure(JobException exception) + { + return Task.CompletedTask; + } + + public string Key { get; } + public TimeSpan DebounceTime { get; } = TimeSpan.FromMilliseconds(100); + } +} \ No newline at end of file diff --git a/Job.Scheduler.Tests/Mocks/MockTaskScheduler.cs b/Job.Scheduler.Tests/Mocks/MockTaskScheduler.cs index 65e2718..59a0618 100644 --- a/Job.Scheduler.Tests/Mocks/MockTaskScheduler.cs +++ b/Job.Scheduler.Tests/Mocks/MockTaskScheduler.cs @@ -11,7 +11,7 @@ public class MockTaskScheduler : TaskScheduler, IDisposable private readonly BlockingCollection _tasksCollection = new(); public Thread MainThread { get; } private readonly CancellationTokenSource _cts = new(); - public int Count { get; private set; } + public int Scheduled { get; private set; } public MockTaskScheduler() { @@ -31,8 +31,8 @@ private void Execute() { foreach (var task in _tasksCollection.GetConsumingEnumerable(_cts.Token)) { + Scheduled++; TryExecuteTask(task); - Count++; } } catch (OperationCanceledException) diff --git a/Job.Scheduler/Job/IJob.cs b/Job.Scheduler/Job/IJob.cs index 5003637..2761f37 100644 --- a/Job.Scheduler/Job/IJob.cs +++ b/Job.Scheduler/Job/IJob.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using Job.Scheduler.Job.Action; @@ -62,4 +63,20 @@ public interface IDelayedJob : IJob /// public TimeSpan Delay { get; } } + + /// + /// Job executed once per per + /// + public interface IDebounceJob : IJob + { + /// + /// UniqueID of the job + /// + public string Key { get; } + + /// + /// Delay to wait to execute the job, to be sure there isn't any other job of the same type scheduled + /// + public TimeSpan DebounceTime { get; } + } } \ No newline at end of file diff --git a/Job.Scheduler/Job/Runner/DebounceJobRunner.cs b/Job.Scheduler/Job/Runner/DebounceJobRunner.cs new file mode 100644 index 0000000..72a85eb --- /dev/null +++ b/Job.Scheduler/Job/Runner/DebounceJobRunner.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Job.Scheduler.Utils; + +namespace Job.Scheduler.Job.Runner +{ + internal class DebounceJobRunner : JobRunner + { + public override string Key => _job.Key; + + protected override async Task StartJobAsync(IDebounceJob job, CancellationToken token) + { + await TaskUtils.WaitForDelayOrCancellation(job.DebounceTime, token); + if (token.IsCancellationRequested) + { + return; + } + + InnerExecuteJob(job, token); + } + + public DebounceJobRunner(IDebounceJob job, Func jobDone, TaskScheduler taskScheduler) : base(job, jobDone, taskScheduler) + { + } + } +} \ No newline at end of file diff --git a/Job.Scheduler/Job/Runner/IJobRunner.cs b/Job.Scheduler/Job/Runner/IJobRunner.cs index 41c575e..2413f7c 100644 --- a/Job.Scheduler/Job/Runner/IJobRunner.cs +++ b/Job.Scheduler/Job/Runner/IJobRunner.cs @@ -14,6 +14,11 @@ public interface IJobRunner /// Guid UniqueId { get; } + /// + /// Type of the job that is run by the runner + /// + Type JobType { get; } + /// /// Is the job still running /// @@ -29,6 +34,11 @@ public interface IJobRunner /// int Retries { get; } + /// + /// Key of the job, used for deduplication + /// + string Key { get; } + /// /// Run the job /// diff --git a/Job.Scheduler/Job/Runner/JobRunner.cs b/Job.Scheduler/Job/Runner/JobRunner.cs index b2336ea..88360cf 100644 --- a/Job.Scheduler/Job/Runner/JobRunner.cs +++ b/Job.Scheduler/Job/Runner/JobRunner.cs @@ -16,7 +16,7 @@ namespace Job.Scheduler.Job.Runner /// internal abstract class JobRunner : IJobRunner where T : IJob { - private readonly T _job; + protected readonly T _job; private CancellationTokenSource _cancellationTokenSource; private Task _runningTask; private Task _runningTaskWithDone; @@ -34,6 +34,10 @@ internal abstract class JobRunner : IJobRunner where T : IJob public TimeSpan Elapsed => _stopwatch.Elapsed; public int Retries { get; private set; } + public Type JobType => typeof(T); + public virtual string Key => UniqueId.ToString(); + + protected JobRunner(T job, Func jobDone, [CanBeNull] TaskScheduler taskScheduler) { _job = job; diff --git a/Job.Scheduler/Scheduler/JobScheduler.cs b/Job.Scheduler/Scheduler/JobScheduler.cs index d280e8a..769f025 100644 --- a/Job.Scheduler/Scheduler/JobScheduler.cs +++ b/Job.Scheduler/Scheduler/JobScheduler.cs @@ -16,7 +16,8 @@ namespace Job.Scheduler.Scheduler /// public class JobScheduler : IJobScheduler { - private readonly ConcurrentDictionary _jobs = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _jobs = new(); + private readonly ConcurrentDictionary _debouncedJobs = new(); private readonly IJobRunnerBuilder _jobRunnerBuilder; public JobScheduler(IJobRunnerBuilder jobRunnerBuilder) @@ -32,7 +33,7 @@ public JobScheduler(IJobRunnerBuilder jobRunnerBuilder) /// In which TaskScheduler should the job be run. Default = TaskScheduler.Default public JobId ScheduleJob(IJob job, CancellationToken token = default, TaskScheduler taskScheduler = null) { - var runner = ((IJobScheduler) this).ScheduleJobInternal(job, taskScheduler, token); + var runner = ((IJobScheduler)this).ScheduleJobInternal(job, taskScheduler, token); return new JobId(runner.UniqueId); } @@ -59,10 +60,22 @@ IJobRunner IJobScheduler.ScheduleJobInternal(IJob job, TaskScheduler taskSchedul { var runner = _jobRunnerBuilder.Build(job, jobRunner => { - _jobs.Remove(jobRunner.UniqueId, out _); + _jobs.TryRemove(jobRunner.UniqueId, out _); return Task.CompletedTask; }, taskScheduler); _jobs.TryAdd(runner.UniqueId, runner); + if (job is IDebounceJob debounceJob) + { + if (_debouncedJobs.TryGetValue(debounceJob.Key, out var guid)) + { + //Job could have ended and not be available to be removed anymore + _jobs.TryGetValue(guid, out var debounceRunner); + debounceRunner?.StopAsync(default); + } + + _debouncedJobs.AddOrUpdate(debounceJob.Key, runner.UniqueId, (_, _) => runner.UniqueId); + } + runner.Start(token); return runner; } diff --git a/README.md b/README.md index d2acdb0..17df80e 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,11 @@ By implementing the `IRecurringJob` the scheduler will run indefinitely your job By implementing the `IDelayedJob` you tell the scheduler to wait a delay before executing your job. +### Debounce Job + +By implementing the `IDebounceJob` you tell the scheduler to only run the latest encounter of the job sharing the same key. + + ## Usage I advise you to use a Dependency Injection (DI) engine (like SimpleInjector) to register the `JobRunnerBuilder`and `JobScheduler` as singleton.