diff --git a/samples/Foundatio.SampleJob/PingQueueJob.cs b/samples/Foundatio.SampleJob/PingQueueJob.cs index 84fe0e2..1b5dc5f 100644 --- a/samples/Foundatio.SampleJob/PingQueueJob.cs +++ b/samples/Foundatio.SampleJob/PingQueueJob.cs @@ -8,7 +8,6 @@ using Foundatio.Lock; using Foundatio.Messaging; using Foundatio.Queues; -using Foundatio.Utility; using Microsoft.Extensions.Logging; namespace Foundatio.SampleJob; @@ -18,10 +17,10 @@ public class PingQueueJob : QueueJobBase private readonly ILockProvider _locker; private int _runCount; - public PingQueueJob(IQueue queue, ILoggerFactory loggerFactory, ICacheClient cacheClient, IMessageBus messageBus) : base(queue, loggerFactory) + public PingQueueJob(IQueue queue, ILoggerFactory loggerFactory, ICacheClient cacheClient, IMessageBus messageBus) : base(queue, null, loggerFactory) { AutoComplete = true; - _locker = new CacheLockProvider(cacheClient, messageBus, loggerFactory); + _locker = new CacheLockProvider(cacheClient, messageBus, null, loggerFactory); } public int RunCount => _runCount; @@ -39,7 +38,7 @@ protected override async Task ProcessQueueEntryAsync(QueueEntryContex if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation("Got {RunCount} ping. Sending pong!", RunCount.ToOrdinal()); - await SystemClock.SleepAsync(TimeSpan.FromMilliseconds(1)).AnyContext(); + await Task.Delay(TimeSpan.FromMilliseconds(1)).AnyContext(); if (RandomData.GetBool(context.QueueEntry.Value.PercentChanceOfException)) throw new ApplicationException("Boom!"); diff --git a/samples/Foundatio.SampleJob/Program.cs b/samples/Foundatio.SampleJob/Program.cs index 81a83ea..388d91e 100644 --- a/samples/Foundatio.SampleJob/Program.cs +++ b/samples/Foundatio.SampleJob/Program.cs @@ -15,10 +15,10 @@ public static int Main() _logger = loggerFactory.CreateLogger("MessageBus"); var serviceProvider = SampleServiceProvider.Create(loggerFactory); - var jobOptions = JobOptions.GetDefaults(() => serviceProvider.GetRequiredService()); + var jobOptions = JobOptions.GetDefaults(_ => serviceProvider.GetRequiredService()); var messageBus = serviceProvider.GetRequiredService(); messageBus.SubscribeAsync(m => HandleEchoMessage(m)).GetAwaiter().GetResult(); - return new JobRunner(jobOptions).RunInConsoleAsync().GetAwaiter().GetResult(); + return new JobRunner(jobOptions, serviceProvider).RunInConsoleAsync().GetAwaiter().GetResult(); } private static void HandleEchoMessage(EchoMessage m) diff --git a/samples/Foundatio.SampleJob/SampleServiceProvider.cs b/samples/Foundatio.SampleJob/SampleServiceProvider.cs index 8cf9f9b..6c9d9d6 100644 --- a/samples/Foundatio.SampleJob/SampleServiceProvider.cs +++ b/samples/Foundatio.SampleJob/SampleServiceProvider.cs @@ -26,7 +26,7 @@ public static IServiceProvider Create(ILoggerFactory loggerFactory) container.AddSingleton>(s => new RedisQueue(o => o.ConnectionMultiplexer(muxer).RetryDelay(TimeSpan.FromSeconds(1)).WorkItemTimeout(TimeSpan.FromSeconds(5)).LoggerFactory(loggerFactory))); container.AddSingleton(s => new RedisCacheClient(o => o.ConnectionMultiplexer(muxer).LoggerFactory(loggerFactory))); container.AddSingleton(s => new RedisMessageBus(o => o.Subscriber(muxer.GetSubscriber()).LoggerFactory(loggerFactory).MapMessageTypeToClassName())); - container.AddSingleton(s => new CacheLockProvider(s.GetRequiredService(), s.GetRequiredService(), loggerFactory)); + container.AddSingleton(s => new CacheLockProvider(s.GetRequiredService(), s.GetRequiredService(), null, loggerFactory)); container.AddTransient(); return container.BuildServiceProvider(); diff --git a/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj b/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj index e5e80bf..c157331 100644 --- a/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj +++ b/samples/Foundatio.SampleJobClient/Foundatio.SampleJobClient.csproj @@ -8,7 +8,7 @@ - + diff --git a/samples/Foundatio.SampleJobClient/Program.cs b/samples/Foundatio.SampleJobClient/Program.cs index b4a52b1..cbad030 100644 --- a/samples/Foundatio.SampleJobClient/Program.cs +++ b/samples/Foundatio.SampleJobClient/Program.cs @@ -4,7 +4,6 @@ using System.Threading.Tasks; using Foundatio.Messaging; using Foundatio.Queues; -using Foundatio.Utility; using Foundatio.Xunit; using Microsoft.Extensions.Logging; using StackExchange.Redis; @@ -97,7 +96,7 @@ private static void MonitorKeyPress() { while (!Console.KeyAvailable) { - SystemClock.Sleep(250); + Thread.Sleep(250); } var key = Console.ReadKey(true).Key; @@ -119,7 +118,7 @@ private static void DrawLoop() Console.SetCursorPosition(0, OPTIONS_MENU_LINE_COUNT + 1); - SystemClock.Sleep(250); + Thread.Sleep(250); } } diff --git a/src/Foundatio.Redis/Foundatio.Redis.csproj b/src/Foundatio.Redis/Foundatio.Redis.csproj index fde423e..88594e2 100644 --- a/src/Foundatio.Redis/Foundatio.Redis.csproj +++ b/src/Foundatio.Redis/Foundatio.Redis.csproj @@ -1,10 +1,10 @@ - + - + diff --git a/src/Foundatio.Redis/Metrics/RedisMetricsClient.cs b/src/Foundatio.Redis/Metrics/RedisMetricsClient.cs deleted file mode 100644 index d7a8585..0000000 --- a/src/Foundatio.Redis/Metrics/RedisMetricsClient.cs +++ /dev/null @@ -1,17 +0,0 @@ -using Foundatio.Caching; - -namespace Foundatio.Metrics; - -public class RedisMetricsClient : CacheBucketMetricsClientBase -{ - public RedisMetricsClient(RedisMetricsClientOptions options) : base(new RedisCacheClient(o => o.ConnectionMultiplexer(options.ConnectionMultiplexer).LoggerFactory(options.LoggerFactory)), options) { } - - public RedisMetricsClient(Builder config) - : this(config(new RedisMetricsClientOptionsBuilder()).Build()) { } - - public override void Dispose() - { - base.Dispose(); - _cache.Dispose(); - } -} diff --git a/src/Foundatio.Redis/Metrics/RedisMetricsClientOptions.cs b/src/Foundatio.Redis/Metrics/RedisMetricsClientOptions.cs deleted file mode 100644 index afb6f86..0000000 --- a/src/Foundatio.Redis/Metrics/RedisMetricsClientOptions.cs +++ /dev/null @@ -1,17 +0,0 @@ -using StackExchange.Redis; - -namespace Foundatio.Metrics; - -public class RedisMetricsClientOptions : SharedMetricsClientOptions -{ - public IConnectionMultiplexer ConnectionMultiplexer { get; set; } -} - -public class RedisMetricsClientOptionsBuilder : SharedMetricsClientOptionsBuilder -{ - public RedisMetricsClientOptionsBuilder ConnectionMultiplexer(IConnectionMultiplexer connectionMultiplexer) - { - Target.ConnectionMultiplexer = connectionMultiplexer; - return this; - } -} diff --git a/src/Foundatio.Redis/Queues/RedisQueue.cs b/src/Foundatio.Redis/Queues/RedisQueue.cs index f72cf76..484ea64 100644 --- a/src/Foundatio.Redis/Queues/RedisQueue.cs +++ b/src/Foundatio.Redis/Queues/RedisQueue.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -225,7 +225,7 @@ protected override async Task EnqueueImplAsync(T data, QueueEntryOptions return null; } - var now = SystemClock.UtcNow; + var now = _timeProvider.GetUtcNow().UtcDateTime; var envelope = new RedisPayloadEnvelope { Properties = options.Properties, @@ -386,7 +386,7 @@ protected override async Task> DequeueImplAsync(CancellationToken public override async Task RenewLockAsync(IQueueEntry entry) { if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} renew lock item: {EntryId}", _options.Name, entry.Id); - await Run.WithRetriesAsync(() => _cache.SetAsync(GetRenewedTimeKey(entry.Id), SystemClock.UtcNow.Ticks, GetWorkItemTimeoutTimeTtl()), logger: _logger).AnyContext(); + await Run.WithRetriesAsync(() => _cache.SetAsync(GetRenewedTimeKey(entry.Id), _timeProvider.GetUtcNow().Ticks, GetWorkItemTimeoutTimeTtl()), logger: _logger).AnyContext(); await OnLockRenewedAsync(entry).AnyContext(); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Renew lock done: {EntryId}", entry.Id); } @@ -423,7 +423,7 @@ private async Task DequeueIdAsync(CancellationToken linkedCancellati return await Run.WithRetriesAsync(async () => { var timeout = GetWorkItemTimeoutTimeTtl(); - long now = SystemClock.UtcNow.Ticks; + long now = _timeProvider.GetUtcNow().Ticks; await LoadScriptsAsync().AnyContext(); var result = await Database.ScriptEvaluateAsync(_dequeueId, new @@ -435,7 +435,7 @@ private async Task DequeueIdAsync(CancellationToken linkedCancellati timeout = timeout.TotalMilliseconds }).AnyContext(); return result.ToString(); - }, 3, TimeSpan.FromMilliseconds(100), linkedCancellationToken, _logger).AnyContext(); + }, 3, TimeSpan.FromMilliseconds(100), _timeProvider, linkedCancellationToken, _logger).AnyContext(); } catch (Exception ex) { @@ -518,7 +518,7 @@ await Run.WithRetriesAsync(() => Task.WhenAll( _logger.LogInformation("Adding item to wait list for future retry: {EntryId}", entry.Id); await Run.WithRetriesAsync(() => Task.WhenAll( - _cache.SetAsync(GetWaitTimeKey(entry.Id), SystemClock.UtcNow.Add(retryDelay).Ticks, GetWaitTimeTtl()), + _cache.SetAsync(GetWaitTimeKey(entry.Id), _timeProvider.GetUtcNow().Add(retryDelay).Ticks, GetWaitTimeTtl()), _cache.IncrementAsync(attemptsCacheKey, 1, GetAttemptsTtl()) ), logger: _logger).AnyContext(); @@ -657,7 +657,7 @@ public async Task DoMaintenanceWorkAsync() return; _logger.LogTrace("Starting DoMaintenance: Name: {Name} Id: {Id}", _options.Name, QueueId); - var utcNow = SystemClock.UtcNow; + var utcNow = _timeProvider.GetUtcNow(); try { @@ -674,7 +674,7 @@ public async Task DoMaintenanceWorkAsync() continue; } - var renewedTime = new DateTime(renewedTimeTicks.Value); + var renewedTime = new DateTimeOffset(new DateTime(renewedTimeTicks.Value), TimeSpan.Zero); _logger.LogTrace("{WorkId}: Renewed time {RenewedTime:o}", workId, renewedTime); if (utcNow.Subtract(renewedTime) <= _options.WorkItemTimeout) @@ -746,7 +746,7 @@ public async Task DoMaintenanceWorkAsync() _logger.LogError(ex, "Error trimming deadletter items: {0}", ex.Message); } - _logger.LogTrace("Finished DoMaintenance: Name: {Name} Id: {Id} Duration: {Duration:g}", _options.Name, QueueId, SystemClock.UtcNow.Subtract(utcNow)); + _logger.LogTrace("Finished DoMaintenance: Name: {Name} Id: {Id} Duration: {Duration:g}", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow)); } private async Task DoMaintenanceWorkLoopAsync() @@ -755,11 +755,11 @@ private async Task DoMaintenanceWorkLoopAsync() { _logger.LogTrace("Requesting Maintenance Lock. Name: {Name} Id: {Id}", _options.Name, QueueId); - var utcNow = SystemClock.UtcNow; + var utcNow = _timeProvider.GetUtcNow(); using var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token); bool gotLock = await _maintenanceLockProvider.TryUsingAsync($"{_options.Name}-maintenance", DoMaintenanceWorkAsync, cancellationToken: linkedCancellationToken.Token).AnyContext(); - _logger.LogTrace("{Status} Maintenance Lock. Name: {Name} Id: {Id} Time To Acquire: {AcquireDuration:g}", gotLock ? "Acquired" : "Failed to acquire", _options.Name, QueueId, SystemClock.UtcNow.Subtract(utcNow)); + _logger.LogTrace("{Status} Maintenance Lock. Name: {Name} Id: {Id} Time To Acquire: {AcquireDuration:g}", gotLock ? "Acquired" : "Failed to acquire", _options.Name, QueueId, _timeProvider.GetUtcNow().Subtract(utcNow)); } } diff --git a/tests/Directory.Build.props b/tests/Directory.Build.props index 4325052..e48829c 100644 --- a/tests/Directory.Build.props +++ b/tests/Directory.Build.props @@ -6,12 +6,12 @@ $(NoWarn);CS1591;NU1701 - - - - + + + + - + diff --git a/tests/Foundatio.Benchmarks/Foundatio.Benchmarks.csproj b/tests/Foundatio.Benchmarks/Foundatio.Benchmarks.csproj index 3a31ee3..9adb722 100644 --- a/tests/Foundatio.Benchmarks/Foundatio.Benchmarks.csproj +++ b/tests/Foundatio.Benchmarks/Foundatio.Benchmarks.csproj @@ -6,6 +6,6 @@ - + diff --git a/tests/Foundatio.Benchmarks/Queues/JobQueueBenchmarks.cs b/tests/Foundatio.Benchmarks/Queues/JobQueueBenchmarks.cs index e6eb386..7043df8 100644 --- a/tests/Foundatio.Benchmarks/Queues/JobQueueBenchmarks.cs +++ b/tests/Foundatio.Benchmarks/Queues/JobQueueBenchmarks.cs @@ -67,9 +67,9 @@ private Task RunJobUntilEmptyAsync(IQueue queue) public class BenchmarkJobQueue : QueueJobBase { - public BenchmarkJobQueue(Lazy> queue, ILoggerFactory loggerFactory = null) : base(queue, loggerFactory) { } + public BenchmarkJobQueue(Lazy> queue, ILoggerFactory loggerFactory = null) : base(queue, null, loggerFactory) { } - public BenchmarkJobQueue(IQueue queue, ILoggerFactory loggerFactory = null) : base(queue, loggerFactory) { } + public BenchmarkJobQueue(IQueue queue, ILoggerFactory loggerFactory = null) : base(queue, null, loggerFactory) { } protected override Task ProcessQueueEntryAsync(QueueEntryContext context) { diff --git a/tests/Foundatio.Redis.Tests/Foundatio.Redis.Tests.csproj b/tests/Foundatio.Redis.Tests/Foundatio.Redis.Tests.csproj index 9ab6a28..9c80709 100644 --- a/tests/Foundatio.Redis.Tests/Foundatio.Redis.Tests.csproj +++ b/tests/Foundatio.Redis.Tests/Foundatio.Redis.Tests.csproj @@ -7,4 +7,9 @@ Always + + + docker-compose.yml + + \ No newline at end of file diff --git a/tests/Foundatio.Redis.Tests/Locks/RedisLockTests.cs b/tests/Foundatio.Redis.Tests/Locks/RedisLockTests.cs index 678eec7..ea5711a 100644 --- a/tests/Foundatio.Redis.Tests/Locks/RedisLockTests.cs +++ b/tests/Foundatio.Redis.Tests/Locks/RedisLockTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Diagnostics; using System.Threading.Tasks; using Foundatio.Caching; @@ -28,12 +28,12 @@ public RedisLockTests(ITestOutputHelper output) : base(output) protected override ILockProvider GetThrottlingLockProvider(int maxHits, TimeSpan period) { - return new ThrottlingLockProvider(_cache, maxHits, period, Log); + return new ThrottlingLockProvider(_cache, maxHits, period, null, Log); } protected override ILockProvider GetLockProvider() { - return new CacheLockProvider(_cache, _messageBus, Log); + return new CacheLockProvider(_cache, _messageBus, null, Log); } [Fact] diff --git a/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs b/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs deleted file mode 100644 index 94540c0..0000000 --- a/tests/Foundatio.Redis.Tests/Metrics/RedisMetricsTests.cs +++ /dev/null @@ -1,117 +0,0 @@ -using System; -using System.Threading.Tasks; -using Foundatio.Metrics; -using Foundatio.Redis.Tests.Extensions; -using Foundatio.Tests.Metrics; -using Foundatio.Xunit; -using Xunit; -using Xunit.Abstractions; - -namespace Foundatio.Redis.Tests.Metrics; - -public class RedisMetricsTests : MetricsClientTestBase, IDisposable -{ - public RedisMetricsTests(ITestOutputHelper output) : base(output) - { - var muxer = SharedConnection.GetMuxer(Log); - muxer.FlushAllAsync().GetAwaiter().GetResult(); - } - -#pragma warning disable CS0618 // Type or member is obsolete - public override IMetricsClient GetMetricsClient(bool buffered = false) - { - return new RedisMetricsClient(o => o.ConnectionMultiplexer(SharedConnection.GetMuxer(Log)).Buffered(buffered).LoggerFactory(Log)); - } -#pragma warning restore CS0618 // Type or member is obsolete - - [Fact] - public override Task CanSetGaugesAsync() - { - return base.CanSetGaugesAsync(); - } - - [Fact] - public override Task CanIncrementCounterAsync() - { - return base.CanIncrementCounterAsync(); - } - - [RetryFact] - public override Task CanWaitForCounterAsync() - { - return base.CanWaitForCounterAsync(); - } - - [Fact] - public override Task CanGetBufferedQueueMetricsAsync() - { - return base.CanGetBufferedQueueMetricsAsync(); - } - - [Fact] - public override Task CanIncrementBufferedCounterAsync() - { - return base.CanIncrementBufferedCounterAsync(); - } - - [Fact] - public override Task CanSendBufferedMetricsAsync() - { - return base.CanSendBufferedMetricsAsync(); - } - - [Fact] - public async Task SendGaugesAsync() - { - using var metrics = GetMetricsClient(); - if (!(metrics is IMetricsClientStats stats)) - return; - - int max = 1000; - for (int index = 0; index <= max; index++) - { - metrics.Gauge("mygauge", index); - metrics.Timer("mygauge", index); - } - - Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); - } - - [Fact] - public async Task SendGaugesBufferedAsync() - { - using var metrics = GetMetricsClient(true); - if (!(metrics is IMetricsClientStats stats)) - return; - - int max = 1000; - for (int index = 0; index <= max; index++) - { - metrics.Gauge("mygauge", index); - metrics.Timer("mygauge", index); - } - - if (metrics is IBufferedMetricsClient bufferedMetrics) - await bufferedMetrics.FlushAsync(); - - Assert.Equal(max, (await stats.GetGaugeStatsAsync("mygauge")).Last); - } - - [Fact] - public async Task SendRedisAsync() - { - var db = SharedConnection.GetMuxer(Log).GetDatabase(); - - int max = 1000; - for (int index = 0; index <= max; index++) - { - await db.SetAddAsync("test", index); - } - } - - public void Dispose() - { - var muxer = SharedConnection.GetMuxer(Log); - muxer.FlushAllAsync().GetAwaiter().GetResult(); - } -} diff --git a/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs b/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs index da49eeb..8da7f80 100644 --- a/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs +++ b/tests/Foundatio.Redis.Tests/Queues/RedisQueueTests.cs @@ -9,15 +9,14 @@ using Foundatio.Caching; using Foundatio.Lock; using Foundatio.Messaging; -using Foundatio.Metrics; using Foundatio.Queues; using Foundatio.Redis.Tests.Extensions; using Foundatio.Tests.Extensions; using Foundatio.Tests.Queue; using Foundatio.Tests.Utility; -using Foundatio.Utility; using Foundatio.Xunit; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Time.Testing; using StackExchange.Redis; using Xunit; using Xunit.Abstractions; @@ -35,14 +34,15 @@ public RedisQueueTests(ITestOutputHelper output) : base(output) muxer.FlushAllAsync().GetAwaiter().GetResult(); } - protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true) + protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true, TimeProvider timeProvider = null) { var queue = new RedisQueue(o => o .ConnectionMultiplexer(SharedConnection.GetMuxer(Log)) .Retries(retries) .RetryDelay(retryDelay.GetValueOrDefault(TimeSpan.FromMinutes(1))) - .RetryMultipliers(retryMultipliers ?? new[] { 1, 3, 5, 10 }) + .RetryMultipliers(retryMultipliers ?? [1, 3, 5, 10]) .DeadLetterMaxItems(deadLetterMaxItems) + .TimeProvider(timeProvider ?? TimeProvider.System) .WorkItemTimeout(workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5))) .RunMaintenanceTasks(runQueueMaintenance) .LoggerFactory(Log) @@ -184,7 +184,7 @@ public override async Task CanDequeueWithLockingAsync() var muxer = SharedConnection.GetMuxer(Log); using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log }); using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log }); - var distributedLock = new CacheLockProvider(cache, messageBus, Log); + var distributedLock = new CacheLockProvider(cache, messageBus, null, Log); await CanDequeueWithLockingImpAsync(distributedLock); } @@ -194,7 +194,7 @@ public override async Task CanHaveMultipleQueueInstancesWithLockingAsync() var muxer = SharedConnection.GetMuxer(Log); using var cache = new RedisCacheClient(new RedisCacheClientOptions { ConnectionMultiplexer = muxer, LoggerFactory = Log }); using var messageBus = new RedisMessageBus(new RedisMessageBusOptions { Subscriber = muxer.GetSubscriber(), Topic = "test-queue", LoggerFactory = Log }); - var distributedLock = new CacheLockProvider(cache, messageBus, Log); + var distributedLock = new CacheLockProvider(cache, messageBus, null, Log); await CanHaveMultipleQueueInstancesWithLockingImplAsync(distributedLock); } @@ -255,140 +255,132 @@ public async Task VerifyCacheKeysAreCorrect() [Fact] public async Task VerifyCacheKeysAreCorrectAfterAbandon() { - var queue = GetQueue(retries: 2, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.Zero, runQueueMaintenance: false) as RedisQueue; + var timeProvider = new FakeTimeProvider(); + var queue = GetQueue(retries: 2, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.Zero, runQueueMaintenance: false, timeProvider: timeProvider) as RedisQueue; if (queue == null) return; - using (TestSystemClock.Install()) + using RedisQueue redisQueue = queue; + var muxer = SharedConnection.GetMuxer(Log); + var db = muxer.GetDatabase(); + string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; + + string id = await queue.EnqueueAsync(new SimpleWorkItem { - using (queue) - { - var muxer = SharedConnection.GetMuxer(Log); - var db = muxer.GetDatabase(); - string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; - - string id = await queue.EnqueueAsync(new SimpleWorkItem - { - Data = "blah", - Id = 1 - }); - _logger.LogTrace("SimpleWorkItem Id: {0}", id); - - var workItem = await queue.DequeueAsync(); - await workItem.AbandonAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.Equal(4, await muxer.CountAllKeysAsync()); - - workItem = await queue.DequeueAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.Equal(6, await muxer.CountAllKeysAsync()); - - // let the work item timeout and become auto abandoned. - TestSystemClock.AddTime(TimeSpan.FromMilliseconds(250)); - await queue.DoMaintenanceWorkAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(2, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.Equal(1, (await queue.GetQueueStatsAsync()).Timeouts); - Assert.InRange(await muxer.CountAllKeysAsync(), 3, 4); - - // should go to deadletter now - workItem = await queue.DequeueAsync(); - await workItem.AbandonAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:dead")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(3, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.InRange(await muxer.CountAllKeysAsync(), 4, 5); - } - } + Data = "blah", + Id = 1 + }); + _logger.LogTrace("SimpleWorkItem Id: {0}", id); + + var workItem = await queue.DequeueAsync(); + await workItem.AbandonAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.Equal(4, await muxer.CountAllKeysAsync()); + + workItem = await queue.DequeueAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.Equal(6, await muxer.CountAllKeysAsync()); + + // let the work item timeout and become auto abandoned. + timeProvider.Advance(TimeSpan.FromMilliseconds(250)); + await queue.DoMaintenanceWorkAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(2, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.Equal(1, (await queue.GetQueueStatsAsync()).Timeouts); + Assert.InRange(await muxer.CountAllKeysAsync(), 3, 4); + + // should go to deadletter now + workItem = await queue.DequeueAsync(); + await workItem.AbandonAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:dead")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(3, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.InRange(await muxer.CountAllKeysAsync(), 4, 5); } [Fact] public async Task VerifyCacheKeysAreCorrectAfterAbandonWithRetryDelay() { - var queue = GetQueue(retries: 2, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.FromMilliseconds(250), runQueueMaintenance: false) as RedisQueue; + var timeProvider = new FakeTimeProvider(); + var queue = GetQueue(retries: 2, workItemTimeout: TimeSpan.FromMilliseconds(100), retryDelay: TimeSpan.FromMilliseconds(250), runQueueMaintenance: false, timeProvider: timeProvider) as RedisQueue; if (queue == null) return; - using (TestSystemClock.Install()) - { - using (queue) - { - var muxer = SharedConnection.GetMuxer(Log); - var db = muxer.GetDatabase(); - string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; - - string id = await queue.EnqueueAsync(new SimpleWorkItem - { - Data = "blah", - Id = 1 - }); - var workItem = await queue.DequeueAsync(); - await workItem.AbandonAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:wait")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":wait")); - Assert.Equal(5, await muxer.CountAllKeysAsync()); - - TestSystemClock.AddTime(TimeSpan.FromSeconds(1)); - await queue.DoMaintenanceWorkAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:wait")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":wait")); - Assert.InRange(await muxer.CountAllKeysAsync(), 4, 5); - - workItem = await queue.DequeueAsync(); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); - Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); - Assert.InRange(await muxer.CountAllKeysAsync(), 6, 7); + using RedisQueue redisQueue = queue; + var muxer = SharedConnection.GetMuxer(Log); + var db = muxer.GetDatabase(); + string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; - await workItem.CompleteAsync(); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); - Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.InRange(await muxer.CountAllKeysAsync(), 0, 1); - } - } + string id = await queue.EnqueueAsync(new SimpleWorkItem + { + Data = "blah", + Id = 1 + }); + var workItem = await queue.DequeueAsync(); + await workItem.AbandonAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:wait")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":wait")); + Assert.Equal(5, await muxer.CountAllKeysAsync()); + + timeProvider.Advance(TimeSpan.FromSeconds(1)); + await queue.DoMaintenanceWorkAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:wait")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":wait")); + Assert.InRange(await muxer.CountAllKeysAsync(), 4, 5); + + workItem = await queue.DequeueAsync(); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(1, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":renewed")); + Assert.Equal(1, await db.StringGetAsync("q:SimpleWorkItem:" + id + ":attempts")); + Assert.InRange(await muxer.CountAllKeysAsync(), 6, 7); + + await workItem.CompleteAsync(); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":enqueued")); + Assert.False(await db.KeyExistsAsync("q:SimpleWorkItem:" + id + ":dequeued")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.InRange(await muxer.CountAllKeysAsync(), 0, 1); } [Fact] @@ -398,42 +390,40 @@ public async Task CanTrimDeadletterItems() if (queue == null) return; - using (queue) - { - var muxer = SharedConnection.GetMuxer(Log); - var db = muxer.GetDatabase(); - string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; - - var workItemIds = new List(); - for (int i = 0; i < 10; i++) - { - string id = await queue.EnqueueAsync(new SimpleWorkItem { Data = "blah", Id = i }); - _logger.LogTrace(id); - workItemIds.Add(id); - } + using RedisQueue redisQueue = queue; + var muxer = SharedConnection.GetMuxer(Log); + var db = muxer.GetDatabase(); + string listPrefix = muxer.IsCluster() ? "{q:SimpleWorkItem}" : "q:SimpleWorkItem"; - for (int i = 0; i < 10; i++) - { - var workItem = await queue.DequeueAsync(); - await workItem.AbandonAsync(); - _logger.LogTrace("Abandoning: " + workItem.Id); - } + var workItemIds = new List(); + for (int i = 0; i < 10; i++) + { + string id = await queue.EnqueueAsync(new SimpleWorkItem { Data = "blah", Id = i }); + _logger.LogTrace(id); + workItemIds.Add(id); + } - workItemIds.Reverse(); - await queue.DoMaintenanceWorkAsync(); + for (int i = 0; i < 10; i++) + { + var workItem = await queue.DequeueAsync(); + await workItem.AbandonAsync(); + _logger.LogTrace("Abandoning: " + workItem.Id); + } - foreach (object id in workItemIds.Take(3)) - { - _logger.LogTrace("Checking: " + id); - Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); - } + workItemIds.Reverse(); + await queue.DoMaintenanceWorkAsync(); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); - Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:wait")); - Assert.Equal(3, await db.ListLengthAsync($"{listPrefix}:dead")); - Assert.InRange(await muxer.CountAllKeysAsync(), 10, 11); + foreach (object id in workItemIds.Take(3)) + { + _logger.LogTrace("Checking: " + id); + Assert.True(await db.KeyExistsAsync("q:SimpleWorkItem:" + id)); } + + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:in")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:work")); + Assert.Equal(0, await db.ListLengthAsync($"{listPrefix}:wait")); + Assert.Equal(3, await db.ListLengthAsync($"{listPrefix}:dead")); + Assert.InRange(await muxer.CountAllKeysAsync(), 10, 11); } [Fact] @@ -448,36 +438,34 @@ public async Task VerifyFirstDequeueTimeout() if (queue == null) return; - using (queue) - { - await queue.DeleteQueueAsync(); + using RedisQueue redisQueue = queue; + await queue.DeleteQueueAsync(); - // Start DequeueAsync but allow it to yield. - var itemTask = queue.DequeueAsync(); + // Start DequeueAsync but allow it to yield. + var itemTask = queue.DequeueAsync(); - // Wait longer than the workItemTimeout. - // This is the period between a queue having DequeueAsync called on it and the first item being enqueued. - await SystemClock.SleepAsync(workItemTimeout.Add(TimeSpan.FromMilliseconds(1))); + // Wait longer than the workItemTimeout. + // This is the period between a queue having DequeueAsync called on it and the first item being enqueued. + await Task.Delay(workItemTimeout.Add(TimeSpan.FromMilliseconds(1))); - // Add an item. DequeueAsync can now return. - string id = await queue.EnqueueAsync(new SimpleWorkItem - { - Data = itemData, - Id = itemId - }); + // Add an item. DequeueAsync can now return. + string id = await queue.EnqueueAsync(new SimpleWorkItem + { + Data = itemData, + Id = itemId + }); - // Run DoMaintenanceWorkAsync to verify that our item will not be auto-abandoned. - await queue.DoMaintenanceWorkAsync(); + // Run DoMaintenanceWorkAsync to verify that our item will not be auto-abandoned. + await queue.DoMaintenanceWorkAsync(); - // Completing the item will throw if the item is abandoned. - var item = await itemTask; - await item.CompleteAsync(); + // Completing the item will throw if the item is abandoned. + var item = await itemTask; + await item.CompleteAsync(); - var value = item.Value; - Assert.NotNull(value); - Assert.Equal(itemData, value.Data); - Assert.Equal(itemId, value.Id); - } + var value = item.Value; + Assert.NotNull(value); + Assert.Equal(itemData, value.Data); + Assert.Equal(itemId, value.Id); } // test to reproduce issue #64 - https://github.com/FoundatioFx/Foundatio.Redis/issues/64 @@ -498,71 +486,69 @@ public async Task DatabaseTimeoutDuringDequeueHandledCorectly_Issue64() .RunMaintenanceTasks(false) ); - using (queue) - { - await queue.DeleteQueueAsync(); + using RedisQueue redisQueue = queue; + await queue.DeleteQueueAsync(); - // enqueue item to queue, no reader yet - await queue.EnqueueAsync(new SimpleWorkItem()); + // enqueue item to queue, no reader yet + await queue.EnqueueAsync(new SimpleWorkItem()); - // create database, we want to cause delay in redis to reproduce the issue - var database = muxer.GetDatabase(); + // create database, we want to cause delay in redis to reproduce the issue + var database = muxer.GetDatabase(); - // sync / async ops timeout is not working as described: https://stackexchange.github.io/StackExchange.Redis/Configuration - // it should have timed out after 100 ms but it actually takes a lot more time to time out so we have to use longer delay until this issue is resolved - // value can be up to 1,000,000 - 1 - //const int DELAY_TIME_USEC = 200000; // 200 msec - //string databaseDelayScript = $"local usecnow = tonumber(redis.call(\"time\")[2]); while ((((tonumber(redis.call(\"time\")[2]) - usecnow) + 1000000) % 1000000) < {DELAY_TIME_USEC}) do end"; + // sync / async ops timeout is not working as described: https://stackexchange.github.io/StackExchange.Redis/Configuration + // it should have timed out after 100 ms but it actually takes a lot more time to time out so we have to use longer delay until this issue is resolved + // value can be up to 1,000,000 - 1 + //const int DELAY_TIME_USEC = 200000; // 200 msec + //string databaseDelayScript = $"local usecnow = tonumber(redis.call(\"time\")[2]); while ((((tonumber(redis.call(\"time\")[2]) - usecnow) + 1000000) % 1000000) < {DELAY_TIME_USEC}) do end"; - const int DELAY_TIME_SEC = 5; - string databaseDelayScript = $@" -local now = tonumber(redis.call(""time"")[1]); + const int DELAY_TIME_SEC = 5; + string databaseDelayScript = $@" +local now = tonumber(redis.call(""time"")[1]); while ((((tonumber(redis.call(""time"")[1]) - now))) < {DELAY_TIME_SEC}) " + -"do end"; + "do end"; - // db will be busy for DELAY_TIME_USEC which will cause timeout on the dequeue to follow - database.ScriptEvaluateAsync(databaseDelayScript); + // db will be busy for DELAY_TIME_USEC which will cause timeout on the dequeue to follow + database.ScriptEvaluateAsync(databaseDelayScript); - var completion = new TaskCompletionSource(); - await queue.StartWorkingAsync(async (item) => - { - await item.CompleteAsync(); - completion.SetResult(true); - }); + var completion = new TaskCompletionSource(); + await queue.StartWorkingAsync(async (item) => + { + await item.CompleteAsync(); + completion.SetResult(true); + }); - // wait for the databaseDelayScript to finish - await Task.Delay(DELAY_TIME_SEC * 1000); + // wait for the databaseDelayScript to finish + await Task.Delay(DELAY_TIME_SEC * 1000); - // item should've either time out at some iterations and after databaseDelayScript is done be received - // or it might have moved to work, in this case we want to make sure the correct keys were created - var stopwatch = Stopwatch.StartNew(); - bool success = false; - while (stopwatch.Elapsed.TotalSeconds < 10) - { + // item should've either time out at some iterations and after databaseDelayScript is done be received + // or it might have moved to work, in this case we want to make sure the correct keys were created + var stopwatch = Stopwatch.StartNew(); + bool success = false; + while (stopwatch.Elapsed.TotalSeconds < 10) + { - string workListName = $"q:{QUEUE_NAME}:work"; - long workListLen = await database.ListLengthAsync(new RedisKey(workListName)); - var item = await database.ListLeftPopAsync(workListName); - string dequeuedItemKey = String.Concat("q:", QUEUE_NAME, ":", item, ":dequeued"); - bool dequeuedItemKeyExists = await database.KeyExistsAsync(new RedisKey(dequeuedItemKey)); - if (workListLen == 1) - { - Assert.True(dequeuedItemKeyExists); - success = true; - break; - } - - var timeoutCancellationTokenSource = new CancellationTokenSource(); - var completedTask = await Task.WhenAny(completion.Task, Task.Delay(TimeSpan.FromMilliseconds(100), timeoutCancellationTokenSource.Token)); - if (completion.Task == completedTask) - { - success = true; - break; - } + string workListName = $"q:{QUEUE_NAME}:work"; + long workListLen = await database.ListLengthAsync(new RedisKey(workListName)); + var item = await database.ListLeftPopAsync(workListName); + string dequeuedItemKey = String.Concat("q:", QUEUE_NAME, ":", item, ":dequeued"); + bool dequeuedItemKeyExists = await database.KeyExistsAsync(new RedisKey(dequeuedItemKey)); + if (workListLen == 1) + { + Assert.True(dequeuedItemKeyExists); + success = true; + break; } - Assert.True(success); + var timeoutCancellationTokenSource = new CancellationTokenSource(); + var completedTask = await Task.WhenAny(completion.Task, Task.Delay(TimeSpan.FromMilliseconds(100), timeoutCancellationTokenSource.Token)); + if (completion.Task == completedTask) + { + success = true; + break; + } } + + Assert.True(success); } // TODO: Need to write tests that verify the cache data is correct after each operation. @@ -574,43 +560,43 @@ public async Task MeasureThroughputWithRandomFailures() if (queue == null) return; - using (queue) - { - await queue.DeleteQueueAsync(); + using IQueue workQueue = queue; + await queue.DeleteQueueAsync(); - const int workItemCount = 1000; - for (int i = 0; i < workItemCount; i++) - { - await queue.EnqueueAsync(new SimpleWorkItem - { - Data = "Hello" - }); - } - Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); - - var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions()); - var workItem = await queue.DequeueAsync(TimeSpan.Zero); - while (workItem != null) + const int workItemCount = 1000; + for (int i = 0; i < workItemCount; i++) + { + await queue.EnqueueAsync(new SimpleWorkItem { - Assert.Equal("Hello", workItem.Value.Data); - if (RandomData.GetBool(10)) - await workItem.AbandonAsync(); - else - await workItem.CompleteAsync(); - - metrics.Counter("work"); - workItem = await queue.DequeueAsync(TimeSpan.FromMilliseconds(100)); - } - _logger.LogTrace((await metrics.GetCounterStatsAsync("work")).ToString()); + Data = "Hello" + }); + } + Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); - var stats = await queue.GetQueueStatsAsync(); - Assert.True(stats.Dequeued >= workItemCount); - Assert.Equal(workItemCount, stats.Completed + stats.Deadletter); - Assert.Equal(0, stats.Queued); + int work = 0; + var sw = Stopwatch.StartNew(); + var workItem = await queue.DequeueAsync(TimeSpan.Zero); + while (workItem != null) + { + Assert.Equal("Hello", workItem.Value.Data); + if (RandomData.GetBool(10)) + await workItem.AbandonAsync(); + else + await workItem.CompleteAsync(); - var muxer = SharedConnection.GetMuxer(Log); - _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); + work++; + workItem = await queue.DequeueAsync(TimeSpan.FromMilliseconds(100)); } + sw.Stop(); + _logger.LogTrace("Work Items: {0} Time: {1}", work, sw.Elapsed); + + var stats = await queue.GetQueueStatsAsync(); + Assert.True(stats.Dequeued >= workItemCount); + Assert.Equal(workItemCount, stats.Completed + stats.Deadletter); + Assert.Equal(0, stats.Queued); + + var muxer = SharedConnection.GetMuxer(Log); + _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); } [Fact(Skip = "Performance Test")] @@ -620,40 +606,40 @@ public async Task MeasureThroughput() if (queue == null) return; - using (queue) - { - await queue.DeleteQueueAsync(); + using IQueue workQueue = queue; + await queue.DeleteQueueAsync(); - const int workItemCount = 1000; - for (int i = 0; i < workItemCount; i++) + const int workItemCount = 1000; + for (int i = 0; i < workItemCount; i++) + { + await queue.EnqueueAsync(new SimpleWorkItem { - await queue.EnqueueAsync(new SimpleWorkItem - { - Data = "Hello" - }); - } - Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); + Data = "Hello" + }); + } + Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); - var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions()); - var workItem = await queue.DequeueAsync(TimeSpan.Zero); - while (workItem != null) - { - Assert.Equal("Hello", workItem.Value.Data); - await workItem.CompleteAsync(); - metrics.Counter("work"); + var workItem = await queue.DequeueAsync(TimeSpan.Zero); + int work = 0; + var sw = Stopwatch.StartNew(); + while (workItem != null) + { + Assert.Equal("Hello", workItem.Value.Data); + await workItem.CompleteAsync(); + work++; - workItem = await queue.DequeueAsync(TimeSpan.Zero); - } - _logger.LogTrace((await metrics.GetCounterStatsAsync("work")).ToString()); + workItem = await queue.DequeueAsync(TimeSpan.Zero); + } + sw.Stop(); + _logger.LogTrace("Work Items: {0} Time: {1}", work, sw.Elapsed); - var stats = await queue.GetQueueStatsAsync(); - Assert.Equal(workItemCount, stats.Dequeued); - Assert.Equal(workItemCount, stats.Completed); - Assert.Equal(0, stats.Queued); + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(workItemCount, stats.Dequeued); + Assert.Equal(workItemCount, stats.Completed); + Assert.Equal(0, stats.Queued); - var muxer = SharedConnection.GetMuxer(Log); - _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); - } + var muxer = SharedConnection.GetMuxer(Log); + _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); } [Fact(Skip = "Performance Test")] @@ -663,42 +649,42 @@ public async Task MeasureWorkerThroughput() if (queue == null) return; - using (queue) - { - await queue.DeleteQueueAsync(); + using IQueue workQueue = queue; + await queue.DeleteQueueAsync(); - const int workItemCount = 1; - for (int i = 0; i < workItemCount; i++) - { - await queue.EnqueueAsync(new SimpleWorkItem - { - Data = "Hello" - }); - } - Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); - - var countdown = new AsyncCountdownEvent(workItemCount); - var metrics = new InMemoryMetricsClient(new InMemoryMetricsClientOptions()); - await queue.StartWorkingAsync(async workItem => + const int workItemCount = 1; + for (int i = 0; i < workItemCount; i++) + { + await queue.EnqueueAsync(new SimpleWorkItem { - Assert.Equal("Hello", workItem.Value.Data); - await workItem.CompleteAsync(); - metrics.Counter("work"); - countdown.Signal(); + Data = "Hello" }); + } + Assert.Equal(workItemCount, (await queue.GetQueueStatsAsync()).Queued); - await countdown.WaitAsync(TimeSpan.FromMinutes(1)); - Assert.Equal(0, countdown.CurrentCount); - _logger.LogTrace((await metrics.GetCounterStatsAsync("work")).ToString()); + var countdown = new AsyncCountdownEvent(workItemCount); + int work = 0; + var sw = Stopwatch.StartNew(); + await queue.StartWorkingAsync(async workItem => + { + Assert.Equal("Hello", workItem.Value.Data); + await workItem.CompleteAsync(); + work++; + countdown.Signal(); + }); - var stats = await queue.GetQueueStatsAsync(); - Assert.Equal(workItemCount, stats.Dequeued); - Assert.Equal(workItemCount, stats.Completed); - Assert.Equal(0, stats.Queued); + await countdown.WaitAsync(TimeSpan.FromMinutes(1)); + Assert.Equal(0, countdown.CurrentCount); + sw.Stop(); + _logger.LogTrace("Work Items: {0} Time: {1}", work, sw.Elapsed); - var muxer = SharedConnection.GetMuxer(Log); - _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); - } + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(workItemCount, stats.Dequeued); + Assert.Equal(workItemCount, stats.Completed); + Assert.Equal(0, stats.Queued); + + var muxer = SharedConnection.GetMuxer(Log); + _logger.LogTrace("# Keys: {0}", muxer.CountAllKeysAsync()); } [Fact] @@ -748,7 +734,7 @@ private Task HandlerCommand1Async() return q.StartWorkingAsync((entry, token) => { - _logger.LogInformation($"{SystemClock.UtcNow:O}: Handler1\t{entry.Value.GetType().Name} {entry.Value.Id}"); + _logger.LogInformation($"{DateTime.UtcNow:O}: Handler1\t{entry.Value.GetType().Name} {entry.Value.Id}"); Assert.InRange(entry.Value.Id, 100, 199); return Task.CompletedTask; }); @@ -760,7 +746,7 @@ private Task HandlerCommand2Async() return q.StartWorkingAsync((entry, token) => { - _logger.LogInformation($"{SystemClock.UtcNow:O}: Handler2\t{entry.Value.GetType().Name} {entry.Value.Id}"); + _logger.LogInformation($"{DateTime.UtcNow:O}: Handler2\t{entry.Value.GetType().Name} {entry.Value.Id}"); Assert.InRange(entry.Value.Id, 200, 299); return Task.CompletedTask; }, true);