diff --git a/CHANGELOG.md b/CHANGELOG.md index 39ba6622..10979ac6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +###0.6.8 2022-07-19 +* Update npgsql +* Add inital admin interface + ###0.6.7 2022-06-30 * Update various packages to latest versions diff --git a/README.md b/README.md index 892e9395..ddc1ea52 100644 --- a/README.md +++ b/README.md @@ -275,7 +275,6 @@ This library uses multiple 3rd party libaries, listed below. ##### Developed with: -[![Resharper](http://neventstore.org/images/logo_resharper_small.gif)](http://www.jetbrains.com/resharper/) -[![dotCover](http://neventstore.org/images/logo_dotcover_small.gif)](http://www.jetbrains.com/dotcover/) -[![dotTrace](http://neventstore.org/images/logo_dottrace_small.gif)](http://www.jetbrains.com/dottrace/) - + + + diff --git a/SharedAssemblyInfo.cs b/SharedAssemblyInfo.cs index 74dd0576..f749e9d1 100644 --- a/SharedAssemblyInfo.cs +++ b/SharedAssemblyInfo.cs @@ -4,6 +4,6 @@ [assembly: AssemblyCompany("Brian Lehnen")] [assembly: AssemblyCopyright("Copyright © Brian Lehnen 2022")] [assembly: ComVisible(false)] -[assembly: AssemblyVersion("0.6.7")] -[assembly: AssemblyFileVersion("0.6.7")] -[assembly: AssemblyInformationalVersion("0.6.7")] \ No newline at end of file +[assembly: AssemblyVersion("0.6.8")] +[assembly: AssemblyFileVersion("0.6.8")] +[assembly: AssemblyInformationalVersion("0.6.8")] \ No newline at end of file diff --git a/Source/DotNetWorkQueue.AppMetrics/DotNetWorkQueue.AppMetrics.csproj b/Source/DotNetWorkQueue.AppMetrics/DotNetWorkQueue.AppMetrics.csproj index 25df492c..8fe7cab9 100644 --- a/Source/DotNetWorkQueue.AppMetrics/DotNetWorkQueue.AppMetrics.csproj +++ b/Source/DotNetWorkQueue.AppMetrics/DotNetWorkQueue.AppMetrics.csproj @@ -6,7 +6,7 @@ An implementation of AppMetrics ( https://github.com/AppMetrics/AppMetrics) for https://github.com/blehnen/DotNetWorkQueue - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 LGPL-2.1-or-later diff --git a/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/AdminSharedConsumer.cs b/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/AdminSharedConsumer.cs new file mode 100644 index 00000000..e473999e --- /dev/null +++ b/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/AdminSharedConsumer.cs @@ -0,0 +1,88 @@ +using System; +using System.Threading; +using DotNetWorkQueue.Configuration; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace DotNetWorkQueue.IntegrationTests.Shared.Admin +{ + public class AdminSharedConsumer + where TMessage : class + { + public void RunConsumer(QueueConnection queueConnection, bool addInterceptors, + ILogger logProvider, + int runTime, int messageCount, + int workerCount, int timeOut, + TimeSpan heartBeatTime, TimeSpan heartBeatMonitorTime, string updateTime, bool enableChaos, + ICreationScope scope) + where TTransportInit : ITransportInit, new() + { + + using (var trace = SharedSetup.CreateTrace("consumer-admin")) + { + if (enableChaos) + timeOut *= 2; + + using (var metrics = new Metrics.Metrics(queueConnection.Queue)) + { + var addInterceptorConsumer = InterceptorAdding.No; + if (addInterceptors) + { + addInterceptorConsumer = InterceptorAdding.ConfigurationOnly; + } + + var processedCount = new IncrementWrapper(); + using ( + var creator = SharedSetup.CreateCreator(addInterceptorConsumer, logProvider, + metrics, false, enableChaos, scope, trace.Source) + ) + { + using ( + var queue = + creator.CreateConsumer(queueConnection)) + { + SharedSetup.SetupDefaultConsumerQueue(queue.Configuration, workerCount, heartBeatTime, + heartBeatMonitorTime, updateTime, null); + var waitForFinish = new ManualResetEventSlim(false); + waitForFinish.Reset(); + + var admin = creator.CreateAdminFunctions(queueConnection); + var count = admin.Count(null); + Assert.Equal(messageCount, count); + + //start looking for work + queue.Start((message, notifications) => + { + MessageHandlingShared.HandleFakeMessages(message, runTime, processedCount, messageCount, + waitForFinish); + }); + + if (messageCount <= workerCount && runTime > 10) + { + Thread.Sleep(runTime / 2); + var working = admin.Count(QueueStatusAdmin.Processing); + var waiting = admin.Count(QueueStatusAdmin.Waiting); + Assert.Equal(0, waiting); + Assert.Equal(messageCount, working); + } + else if(runTime > 10) + { + Thread.Sleep(runTime / 2); + var working = admin.Count(QueueStatusAdmin.Processing); + Assert.Equal(workerCount, working); + } + + waitForFinish.Wait(timeOut * 1000); + } + + Assert.Null(processedCount.IdError); + Assert.Equal(messageCount, processedCount.ProcessedCount); + VerifyMetrics.VerifyProcessedCount(queueConnection.Queue, metrics.GetCurrentMetrics(), + messageCount); + LoggerShared.CheckForErrors(queueConnection.Queue); + } + } + } + } + } +} diff --git a/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/Implementation/SimpleConsumerAdmin.cs b/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/Implementation/SimpleConsumerAdmin.cs new file mode 100644 index 00000000..5add7166 --- /dev/null +++ b/Source/DotNetWorkQueue.IntegrationTests.Shared/Admin/Implementation/SimpleConsumerAdmin.cs @@ -0,0 +1,68 @@ +using System; +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared.Consumer; +using DotNetWorkQueue.IntegrationTests.Shared.Producer; +using DotNetWorkQueue.Messages; +using Xunit; + +namespace DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation +{ + public class SimpleConsumerAdmin + { + public void Run( + QueueConnection queueConnection, + int messageCount, + int runtime, + int timeOut, + int workerCount, + bool enableChaos, + Action setOptions, + Func generateData, + Action verify, + Action verifyQueueCount) + where TTransportInit : ITransportInit, new() + where TMessage : class + where TTransportCreate : class, IQueueCreation + { + + var logProvider = LoggerShared.Create(queueConnection.Queue, GetType().Name); + using ( + var queueCreator = + new QueueCreationContainer( + serviceRegister => serviceRegister.Register(() => logProvider, LifeStyles.Singleton))) + { + ICreationScope scope = null; + var oCreation = queueCreator.GetQueueCreation(queueConnection); + try + { + setOptions(oCreation); + var result = oCreation.CreateQueue(); + Assert.True(result.Success, result.ErrorMessage); + scope = oCreation.Scope; + + var producer = new ProducerShared(); + producer.RunTest(queueConnection, false, messageCount, + logProvider, generateData, + verify, false, scope, false); + + var consumer = new AdminSharedConsumer(); + consumer.RunConsumer(queueConnection, + false, + logProvider, + runtime, messageCount, + workerCount, timeOut, + TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(35), "second(*%10)", enableChaos, scope); + + verifyQueueCount(queueConnection, oCreation.BaseTransportOptions, scope, 0, false, + false); + } + finally + { + oCreation?.RemoveQueue(); + oCreation?.Dispose(); + scope?.Dispose(); + } + } + } + } +} diff --git a/Source/DotNetWorkQueue.IntegrationTests.Shared/Producer/ProducerShared.cs b/Source/DotNetWorkQueue.IntegrationTests.Shared/Producer/ProducerShared.cs index dd289eff..cedd6929 100644 --- a/Source/DotNetWorkQueue.IntegrationTests.Shared/Producer/ProducerShared.cs +++ b/Source/DotNetWorkQueue.IntegrationTests.Shared/Producer/ProducerShared.cs @@ -64,8 +64,13 @@ public void RunTest(QueueConnection queueConnection, } if (validateMetricCounts) + { VerifyMetrics.VerifyProducedCount(queueConnection.Queue, metrics.GetCurrentMetrics(), messageCount); + + var admin = creator.CreateAdminFunctions(queueConnection); + Assert.Equal(messageCount, admin.Count(null)); + } } } } diff --git a/Source/DotNetWorkQueue.Transport.LiteDB.IntegrationTests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.LiteDB.IntegrationTests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..bb6796e6 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.LiteDB.IntegrationTests/Admin/SimpleConsumer.cs @@ -0,0 +1,28 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.LiteDb.Basic; +using Xunit; + +namespace DotNetWorkQueue.Transport.LiteDb.IntegrationTests.Admin +{ + [Collection("Consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(10, 10, 120, 2, false, IntegrationConnectionInfo.ConnectionTypes.Direct), + InlineData(2, 10, 120, 2, false, IntegrationConnectionInfo.ConnectionTypes.Memory), + InlineData(15, 10, 120, 4, true, IntegrationConnectionInfo.ConnectionTypes.Shared)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount, bool enableChaos, IntegrationConnectionInfo.ConnectionTypes connectionType) + { + using (var connectionInfo = new IntegrationConnectionInfo(connectionType)) + { + var queueName = GenerateQueueName.Create(); + var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + consumer.Run(new QueueConnection(queueName, + connectionInfo.ConnectionString), + messageCount, runtime, timeOut, workerCount, enableChaos, x => { }, + Helpers.GenerateData, Helpers.Verify, Helpers.VerifyQueueCount); + } + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Admin/AdminFunctions.cs b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Admin/AdminFunctions.cs new file mode 100644 index 00000000..10fb8af4 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Admin/AdminFunctions.cs @@ -0,0 +1,43 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Transport.LiteDb.Basic.Query; +using DotNetWorkQueue.Transport.Shared; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.LiteDb.Basic.Admin +{ + /// + /// Admin function implementation + /// + internal class AdminFunctions: IAdminFunctions + { + private readonly IQueryHandler _queueCount; + public AdminFunctions( + IQueryHandler queueCount) + { + Guard.NotNull(() => queueCount, queueCount); + _queueCount = queueCount; + + } + public long? Count(QueueStatusAdmin? status) + { + return _queueCount.Handle(new GetQueueCountQuery(status)); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbConnectionManager.cs b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbConnectionManager.cs index d0c1f9ee..81c6d5e1 100644 --- a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbConnectionManager.cs +++ b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbConnectionManager.cs @@ -47,6 +47,12 @@ public LiteDbConnectionManager(IConnectionInformation connectionInformation, ICr Guard.NotNull(() => scope, scope); _connectionInformation = connectionInformation; + if (string.IsNullOrEmpty(_connectionInformation.ConnectionString) || + string.IsNullOrEmpty(_connectionInformation.QueueName)) + { + return; + } + var builder = new LiteDB.ConnectionString(_connectionInformation.ConnectionString); _shared = builder.Connection == ConnectionType.Shared; diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbMessageQueueInit.cs b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbMessageQueueInit.cs index 8e6d5db4..1cd6d986 100644 --- a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbMessageQueueInit.cs +++ b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/LiteDbMessageQueueInit.cs @@ -22,6 +22,7 @@ using DotNetWorkQueue.Configuration; using DotNetWorkQueue.IoC; using DotNetWorkQueue.Queue; +using DotNetWorkQueue.Transport.LiteDb.Basic.Admin; using DotNetWorkQueue.Transport.LiteDb.Basic.CommandHandler; using DotNetWorkQueue.Transport.LiteDb.Basic.Factory; using DotNetWorkQueue.Transport.LiteDb.Basic.Message; @@ -82,6 +83,7 @@ public override void RegisterImplementations(IContainer container, RegistrationT container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); //**all //**send diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Query/GetQueueCountQuery.cs b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Query/GetQueueCountQuery.cs new file mode 100644 index 00000000..6b798d59 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/Query/GetQueueCountQuery.cs @@ -0,0 +1,34 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Transport.Shared; +namespace DotNetWorkQueue.Transport.LiteDb.Basic.Query +{ + internal class GetQueueCountQuery : IQuery + { + public GetQueueCountQuery(QueueStatusAdmin? status) + { + Status = status; + } + /// + /// The status to search for + /// + /// Null if all + public QueueStatusAdmin? Status { get; } + } +} diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/Basic/QueryHandler/GetQueueCountQueryHandler.cs b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/QueryHandler/GetQueueCountQueryHandler.cs new file mode 100644 index 00000000..5078b2dd --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.LiteDB/Basic/QueryHandler/GetQueueCountQueryHandler.cs @@ -0,0 +1,62 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- + +using System; +using DotNetWorkQueue.Transport.LiteDb.Basic.Query; +using DotNetWorkQueue.Transport.Shared; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.LiteDb.Basic.QueryHandler +{ + internal class GetQueueCountQueryHandler : IQueryHandler + { + private readonly LiteDbConnectionManager _connectionInformation; + private readonly TableNameHelper _tableNameHelper; + + /// + /// Initializes a new instance of the class. + /// + /// The connection information. + /// The table name helper. + public GetQueueCountQueryHandler(LiteDbConnectionManager connectionInformation, + TableNameHelper tableNameHelper) + { + Guard.NotNull(() => connectionInformation, connectionInformation); + Guard.NotNull(() => tableNameHelper, tableNameHelper); + + _connectionInformation = connectionInformation; + _tableNameHelper = tableNameHelper; + } + + public long Handle(GetQueueCountQuery query) + { + using (var db = _connectionInformation.GetDatabase()) + { + var col = db.Database.GetCollection(_tableNameHelper.MetaDataName); + if (query.Status.HasValue) + { + return col.Query() + .Where(x => x.Status.Equals(Convert.ToInt32(query.Status.Value))) + .LongCount(); + } + return col.LongCount(); + } + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.LiteDB/DotNetWorkQueue.Transport.LiteDb.csproj b/Source/DotNetWorkQueue.Transport.LiteDB/DotNetWorkQueue.Transport.LiteDb.csproj index 65ee2143..9063c631 100644 --- a/Source/DotNetWorkQueue.Transport.LiteDB/DotNetWorkQueue.Transport.LiteDb.csproj +++ b/Source/DotNetWorkQueue.Transport.LiteDB/DotNetWorkQueue.Transport.LiteDb.csproj @@ -4,7 +4,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 LiteDB transport for https://github.com/blehnen/DotNetWorkQueue true - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 LGPL-2.1-or-later diff --git a/Source/DotNetWorkQueue.Transport.Memory.Integration.Tests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.Memory.Integration.Tests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..0b9d37ef --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.Memory.Integration.Tests/Admin/SimpleConsumer.cs @@ -0,0 +1,33 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.Memory.Basic; +using Xunit; + +namespace DotNetWorkQueue.Transport.Memory.Integration.Tests.Admin +{ + [Collection("consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(5, 15, 60, 5), + InlineData(25, 10, 200, 10), + InlineData(10, 15, 180, 7)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount) + { + using (var connectionInfo = new IntegrationConnectionInfo()) + { + var queueName = GenerateQueueName.Create(); + var producer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + producer.Run(new QueueConnection(queueName, + connectionInfo.ConnectionString), + messageCount, runtime, timeOut, workerCount, false, x => { }, + Helpers.GenerateData, Helpers.Verify, VerifyQueueCount); + } + } + + private void VerifyQueueCount(QueueConnection queueConnection, IBaseTransportOptions arg3, ICreationScope arg4, int arg5, bool arg6, bool arg7) + { + new VerifyQueueRecordCount().Verify(arg4, arg5, true); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.PostgreSQL.Integration.Tests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.PostgreSQL.Integration.Tests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..fabc4e19 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.PostgreSQL.Integration.Tests/Admin/SimpleConsumer.cs @@ -0,0 +1,28 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.PostgreSQL.Basic; +using Xunit; + +namespace DotNetWorkQueue.Transport.PostgreSQL.Integration.Tests.Admin +{ + [Collection("consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(10, 10, 120, 10, false), + InlineData(50, 10, 200, 10, true), + InlineData(20, 10, 240, 15, false), + InlineData(20, 10, 240, 15, true)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount, bool useTransactions) + { + var queueName = GenerateQueueName.Create(); + var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + consumer.Run(new QueueConnection(queueName, + ConnectionInfo.ConnectionString), + messageCount, runtime, timeOut, workerCount, false, x => Helpers.SetOptions(x, + true, !useTransactions, useTransactions, false, + false, !useTransactions, true, false), + Helpers.GenerateData, Helpers.Verify, Helpers.VerifyQueueCount); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.PostgreSQL/Basic/PostgreSQLCommandStringCache.cs b/Source/DotNetWorkQueue.Transport.PostgreSQL/Basic/PostgreSQLCommandStringCache.cs index 77ec1fef..c7b325c0 100644 --- a/Source/DotNetWorkQueue.Transport.PostgreSQL/Basic/PostgreSQLCommandStringCache.cs +++ b/Source/DotNetWorkQueue.Transport.PostgreSQL/Basic/PostgreSQLCommandStringCache.cs @@ -132,6 +132,12 @@ protected override void BuildCommands() CommandCache.Add(CommandStringTypes.FindErrorRecordsToDelete, $"select queueid from {TableNameHelper.MetaDataErrorsName} where @CurrentDate > lastexceptiondate FOR UPDATE SKIP LOCKED"); + CommandCache.Add(CommandStringTypes.GetQueueCountStatus, + $"select count(queueid) from {TableNameHelper.MetaDataName} where status = @status"); + + CommandCache.Add(CommandStringTypes.GetQueueCountAll, + $"select count(queueid) from {TableNameHelper.MetaDataName}"); + CommandCache.Add(CommandStringTypes.FindExpiredRecordsToDelete, $"select queueid from {TableNameHelper.MetaDataName} where @CurrentDate > ExpirationTime FOR UPDATE SKIP LOCKED"); diff --git a/Source/DotNetWorkQueue.Transport.PostgreSQL/DotNetWorkQueue.Transport.PostgreSQL.csproj b/Source/DotNetWorkQueue.Transport.PostgreSQL/DotNetWorkQueue.Transport.PostgreSQL.csproj index 21c9f64e..1b5f6e0b 100644 --- a/Source/DotNetWorkQueue.Transport.PostgreSQL/DotNetWorkQueue.Transport.PostgreSQL.csproj +++ b/Source/DotNetWorkQueue.Transport.PostgreSQL/DotNetWorkQueue.Transport.PostgreSQL.csproj @@ -3,7 +3,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 PostgreSQL transport for https://github.com/blehnen/DotNetWorkQueue - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 LGPL-2.1-or-later diff --git a/Source/DotNetWorkQueue.Transport.Redis.IntegrationTests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.Redis.IntegrationTests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..1c320605 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.Redis.IntegrationTests/Admin/SimpleConsumer.cs @@ -0,0 +1,33 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.Redis.Basic; +using DotNetWorkQueue.Transport.Redis.IntegrationTests; +using Xunit; + +namespace DotNetWorkQueue.Transport.Redis.Integration.Tests.Admin +{ + [Collection("Consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(10, 10, 90, 5, ConnectionInfoTypes.Linux), + InlineData(10, 10, 60, 10, ConnectionInfoTypes.Linux)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount, ConnectionInfoTypes type) + { + var queueName = GenerateQueueName.Create(); + var connectionString = new ConnectionInfo(type).ConnectionString; + var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + consumer.Run(new QueueConnection(queueName, connectionString), + messageCount, runtime, timeOut, workerCount, false, x => { }, + Helpers.GenerateData, Helpers.Verify, VerifyQueueCount); + } + + private void VerifyQueueCount(QueueConnection queueConnection, IBaseTransportOptions arg3, ICreationScope arg4, int arg5, bool arg6, bool arg7) + { + using (var count = new VerifyQueueRecordCount(queueConnection.Queue, queueConnection.Connection)) + { + count.Verify(arg5, false, -1); + } + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.Redis/Basic/Admin/AdminFunctions.cs b/Source/DotNetWorkQueue.Transport.Redis/Basic/Admin/AdminFunctions.cs new file mode 100644 index 00000000..6d5826f5 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.Redis/Basic/Admin/AdminFunctions.cs @@ -0,0 +1,51 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.Redis.Basic.Admin +{ + internal class AdminFunctions: IAdminFunctions + { + private readonly RedisNames _names; + private readonly IRedisConnection _connection; + public AdminFunctions(IRedisConnection connection, RedisNames names) + { + Guard.NotNull(() => connection, connection); + Guard.NotNull(() => names, names); + + _connection = connection; + _names = names; + } + public long? Count(QueueStatusAdmin? status) + { + var db = _connection.Connection.GetDatabase(); + if (status.HasValue) + { + switch (status.Value) + { + case QueueStatusAdmin.Processing: + return db.HashLength(_names.Working); + case QueueStatusAdmin.Waiting: + return db.HashLength(_names.MetaData) - db.HashLength(_names.Working); + } + } + return db.HashLength(_names.MetaData); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.Redis/Basic/Lua/BaseLua.cs b/Source/DotNetWorkQueue.Transport.Redis/Basic/Lua/BaseLua.cs index d73488ce..6abe2782 100644 --- a/Source/DotNetWorkQueue.Transport.Redis/Basic/Lua/BaseLua.cs +++ b/Source/DotNetWorkQueue.Transport.Redis/Basic/Lua/BaseLua.cs @@ -143,6 +143,8 @@ public async Task TryExecuteAsync(object parameters) /// public void LoadScript() { + if (Connection.Connection == null) return; + Guard.NotNullOrEmpty(() => Script, Script); var luaScript = LuaScript.Prepare(Script); diff --git a/Source/DotNetWorkQueue.Transport.Redis/Basic/RedisQueueInit.cs b/Source/DotNetWorkQueue.Transport.Redis/Basic/RedisQueueInit.cs index 58c4a45a..ebea6a49 100644 --- a/Source/DotNetWorkQueue.Transport.Redis/Basic/RedisQueueInit.cs +++ b/Source/DotNetWorkQueue.Transport.Redis/Basic/RedisQueueInit.cs @@ -23,6 +23,7 @@ using DotNetWorkQueue.Configuration; using DotNetWorkQueue.IoC; using DotNetWorkQueue.Queue; +using DotNetWorkQueue.Transport.Redis.Basic.Admin; using DotNetWorkQueue.Transport.Redis.Basic.Command; using DotNetWorkQueue.Transport.Redis.Basic.Factory; using DotNetWorkQueue.Transport.Redis.Basic.Lua; @@ -59,6 +60,8 @@ public override void RegisterImplementations(IContainer container, RegistrationT container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); diff --git a/Source/DotNetWorkQueue.Transport.Redis/DotNetWorkQueue.Transport.Redis.csproj b/Source/DotNetWorkQueue.Transport.Redis/DotNetWorkQueue.Transport.Redis.csproj index 50c90e78..4ea9e2e3 100644 --- a/Source/DotNetWorkQueue.Transport.Redis/DotNetWorkQueue.Transport.Redis.csproj +++ b/Source/DotNetWorkQueue.Transport.Redis/DotNetWorkQueue.Transport.Redis.csproj @@ -4,7 +4,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 Redis transport for https://github.com/blehnen/DotNetWorkQueue true - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 LGPL-2.1-or-later diff --git a/Source/DotNetWorkQueue.Transport.Redis/RedisConnection.cs b/Source/DotNetWorkQueue.Transport.Redis/RedisConnection.cs index b1123048..0d3f4869 100644 --- a/Source/DotNetWorkQueue.Transport.Redis/RedisConnection.cs +++ b/Source/DotNetWorkQueue.Transport.Redis/RedisConnection.cs @@ -111,7 +111,7 @@ protected virtual void Dispose(bool disposing) /// The connection will only be opened once private void EnsureCreated() { - if (_connection != null) return; + if (_connection != null || string.IsNullOrEmpty(_connectionInformation.ConnectionString)) return; lock (_connectionLock) { if (_connection != null) return; diff --git a/Source/DotNetWorkQueue.Transport.Redis/RedisConnectionInfo.cs b/Source/DotNetWorkQueue.Transport.Redis/RedisConnectionInfo.cs index 3722cc95..b0a27780 100644 --- a/Source/DotNetWorkQueue.Transport.Redis/RedisConnectionInfo.cs +++ b/Source/DotNetWorkQueue.Transport.Redis/RedisConnectionInfo.cs @@ -34,7 +34,10 @@ internal class RedisConnectionInfo : BaseConnectionInformation /// Queue and connection information. public RedisConnectionInfo(QueueConnection queueConnection) : base(queueConnection) { - ValidateConnection(queueConnection.Connection); + if (!string.IsNullOrEmpty(queueConnection.Connection)) + { + ValidateConnection(queueConnection.Connection); + } } #endregion diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Admin/AdminFunctions.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Admin/AdminFunctions.cs new file mode 100644 index 00000000..3bdb1e81 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Admin/AdminFunctions.cs @@ -0,0 +1,46 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Transport.RelationalDatabase.Basic.Query; +using DotNetWorkQueue.Transport.Shared; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.RelationalDatabase.Basic.Admin +{ + internal class AdminFunctions: IAdminFunctions + { + private readonly IConnectionInformation _connection; + private readonly IQueryHandler _queueCount; + + public AdminFunctions( + IConnectionInformation connection, + IQueryHandler queueCount) + { + Guard.NotNull(() => connection, connection); + Guard.NotNull(() => queueCount, queueCount); + + _connection = connection; + _queueCount = queueCount; + + } + public long? Count(QueueStatusAdmin? status) + { + return _queueCount.Handle(new GetQueueCountQuery(_connection.ConnectionString, status)); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/CommandStringCache.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/CommandStringCache.cs index 2508323b..10c57221 100644 --- a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/CommandStringCache.cs +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/CommandStringCache.cs @@ -293,7 +293,15 @@ public enum CommandStringTypes /// /// Returns records that are in an error status /// - FindErrorRecordsToDelete + FindErrorRecordsToDelete, + /// + /// Returns the current queue quote + /// + GetQueueCountAll, + /// + /// Returns the current queue quote + /// + GetQueueCountStatus } /// diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Query/GetQueueCountQuery.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Query/GetQueueCountQuery.cs new file mode 100644 index 00000000..f6b6ed27 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/Query/GetQueueCountQuery.cs @@ -0,0 +1,52 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Transport.Shared; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.RelationalDatabase.Basic.Query +{ + internal class GetQueueCountQuery : IQuery + { + /// + /// Initializes a new instance of the class. + /// + /// The connection string. + /// The status to filter by, or null for none + public GetQueueCountQuery(string connectionString, QueueStatusAdmin? status) + { + Guard.NotNullOrEmpty(() => connectionString, connectionString); + + ConnectionString = connectionString; + Status = status; + } + /// + /// Gets the connection string. + /// + /// + /// The connection string. + /// + public string ConnectionString { get; } + + /// + /// The status to search for + /// + /// Null if all + public QueueStatusAdmin? Status { get; } + } +} diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryHandler/GetQueueCountQueryHandler.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryHandler/GetQueueCountQueryHandler.cs new file mode 100644 index 00000000..a4f46aed --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryHandler/GetQueueCountQueryHandler.cs @@ -0,0 +1,64 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using DotNetWorkQueue.Transport.RelationalDatabase.Basic.Query; +using DotNetWorkQueue.Transport.Shared; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.RelationalDatabase.Basic.QueryHandler +{ + internal class GetQueueCountQueryHandler : IQueryHandler + { + private readonly IPrepareQueryHandler _prepareQuery; + private readonly IDbConnectionFactory _dbConnectionFactory; + private readonly IReadColumn _readColumn; + + public GetQueueCountQueryHandler(IDbConnectionFactory dbConnectionFactory, + IPrepareQueryHandler prepareQuery, + IReadColumn readColumn) + { + Guard.NotNull(() => dbConnectionFactory, dbConnectionFactory); + Guard.NotNull(() => prepareQuery, prepareQuery); + Guard.NotNull(() => readColumn, readColumn); + + _prepareQuery = prepareQuery; + _dbConnectionFactory = dbConnectionFactory; + _readColumn = readColumn; + } + + public long Handle(GetQueueCountQuery query) + { + using (var connection = _dbConnectionFactory.Create()) + { + connection.Open(); + using (var command = connection.CreateCommand()) + { + _prepareQuery.Handle(query, command, CommandStringTypes.GetQueueCountStatus); + using (var reader = command.ExecuteReader()) + { + if (reader.Read()) + { + return _readColumn.ReadAsInt64(CommandStringTypes.GetQueueCountStatus, 0, reader); + } + } + } + } + return default; + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryPrepareHandler/GetQueueCountQueryPrepareHandler.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryPrepareHandler/GetQueueCountQueryPrepareHandler.cs new file mode 100644 index 00000000..13cb3fc9 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/QueryPrepareHandler/GetQueueCountQueryPrepareHandler.cs @@ -0,0 +1,62 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using System; +using System.Data; +using DotNetWorkQueue.Transport.RelationalDatabase.Basic.Query; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Transport.RelationalDatabase.Basic.QueryPrepareHandler +{ + internal class GetQueueCountQueryPrepareHandler : IPrepareQueryHandler + { + private readonly CommandStringCache _commandCache; + private readonly Lazy _options; + + /// + /// Initializes a new instance of the class. + /// + /// The command cache. + /// The transport options. + public GetQueueCountQueryPrepareHandler(CommandStringCache commandCache, ITransportOptionsFactory options) + { + Guard.NotNull(() => commandCache, commandCache); + Guard.NotNull(() => options, options); + + _commandCache = commandCache; + _options = new Lazy(options.Create); + } + + public void Handle(GetQueueCountQuery query, IDbCommand dbCommand, CommandStringTypes commandType) + { + if (query.Status.HasValue && _options.Value.EnableStatus) //status means nothing if not enabled on the queue + { + dbCommand.CommandText = _commandCache.GetCommand(CommandStringTypes.GetQueueCountStatus); + var status = dbCommand.CreateParameter(); + status.ParameterName = "@Status"; + status.DbType = DbType.Int32; + status.Value = query.Status.Value; + dbCommand.Parameters.Add(status); + } + else + { + dbCommand.CommandText = _commandCache.GetCommand(CommandStringTypes.GetQueueCountAll); + } + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/RelationalDatabaseMessageQueueInit.cs b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/RelationalDatabaseMessageQueueInit.cs index 89931edb..ecd526e7 100644 --- a/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/RelationalDatabaseMessageQueueInit.cs +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/Basic/RelationalDatabaseMessageQueueInit.cs @@ -21,8 +21,10 @@ using System.Reflection; using DotNetWorkQueue.Configuration; using DotNetWorkQueue.Queue; +using DotNetWorkQueue.Transport.RelationalDatabase.Basic.Admin; using DotNetWorkQueue.Transport.RelationalDatabase.Basic.CommandHandler; using DotNetWorkQueue.Transport.RelationalDatabase.Basic.CommandPrepareHandler; +using DotNetWorkQueue.Transport.RelationalDatabase.Basic.Query; using DotNetWorkQueue.Transport.RelationalDatabase.Basic.QueryHandler; using DotNetWorkQueue.Transport.RelationalDatabase.Basic.QueryPrepareHandler; using DotNetWorkQueue.Transport.Shared; @@ -67,6 +69,8 @@ public void RegisterStandardImplementations(IContainer container, params Assembl container.Register>(LifeStyles.Singleton); container.Register>(LifeStyles.Singleton); container.Register>(LifeStyles.Singleton); + + container.Register(LifeStyles.Singleton); //**all //**send @@ -156,6 +160,14 @@ public void SetupMessageExpiration(IContainer container) private void RegisterCommandsExplicit(IContainer container) { + container + .Register, + GetQueueCountQueryHandler>(LifeStyles.Singleton); + + container + .Register, + GetQueueCountQueryPrepareHandler>(LifeStyles.Singleton); + container .Register, Dictionary>, GetMessageErrorsQueryHandler>(LifeStyles.Singleton); diff --git a/Source/DotNetWorkQueue.Transport.RelationalDatabase/DotNetWorkQueue.Transport.RelationalDatabase.csproj b/Source/DotNetWorkQueue.Transport.RelationalDatabase/DotNetWorkQueue.Transport.RelationalDatabase.csproj index dfb16553..e3da835f 100644 --- a/Source/DotNetWorkQueue.Transport.RelationalDatabase/DotNetWorkQueue.Transport.RelationalDatabase.csproj +++ b/Source/DotNetWorkQueue.Transport.RelationalDatabase/DotNetWorkQueue.Transport.RelationalDatabase.csproj @@ -2,7 +2,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 Shared libaries for relational transports diff --git a/Source/DotNetWorkQueue.Transport.SQLite.Integration.Tests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.SQLite.Integration.Tests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..4fcc9a10 --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.SQLite.Integration.Tests/Admin/SimpleConsumer.cs @@ -0,0 +1,28 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.SQLite.Basic; +using Xunit; + +namespace DotNetWorkQueue.Transport.SQLite.Integration.Tests.Admin +{ + [Collection("Consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(10, 10, 60, 5, true), + InlineData(10, 10, 60, 10, false)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount, bool inMemoryDb) + { + using (var connectionInfo = new IntegrationConnectionInfo(inMemoryDb)) + { + var queueName = GenerateQueueName.Create(); + var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + consumer.Run(new QueueConnection(queueName, connectionInfo.ConnectionString), + messageCount, runtime, timeOut, workerCount, false, x => Helpers.SetOptions(x, + true, true, false, + false, true, true, false), + Helpers.GenerateData, Helpers.Verify, Helpers.VerifyQueueCount); + } + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.SQLite/Basic/SqLiteCommandStringCache.cs b/Source/DotNetWorkQueue.Transport.SQLite/Basic/SqLiteCommandStringCache.cs index 8488c55b..03e9ce6b 100644 --- a/Source/DotNetWorkQueue.Transport.SQLite/Basic/SqLiteCommandStringCache.cs +++ b/Source/DotNetWorkQueue.Transport.SQLite/Basic/SqLiteCommandStringCache.cs @@ -132,6 +132,12 @@ protected override void BuildCommands() CommandCache.Add(CommandStringTypes.FindErrorRecordsToDelete, $"select queueid from {TableNameHelper.MetaDataErrorsName} where @CurrentDateTime > LastExceptionDate"); + CommandCache.Add(CommandStringTypes.GetQueueCountStatus, + $"select count(queueid) from {TableNameHelper.MetaDataName} where status = @status"); + + CommandCache.Add(CommandStringTypes.GetQueueCountAll, + $"select count(queueid) from {TableNameHelper.MetaDataName}"); + CommandCache.Add(CommandStringTypes.GetColumnNamesFromTable, "PRAGMA table_info('{0}')"); diff --git a/Source/DotNetWorkQueue.Transport.SQLite/DotNetWorkQueue.Transport.SQLite.csproj b/Source/DotNetWorkQueue.Transport.SQLite/DotNetWorkQueue.Transport.SQLite.csproj index f7ca54ce..ab2971bd 100644 --- a/Source/DotNetWorkQueue.Transport.SQLite/DotNetWorkQueue.Transport.SQLite.csproj +++ b/Source/DotNetWorkQueue.Transport.SQLite/DotNetWorkQueue.Transport.SQLite.csproj @@ -1,7 +1,7 @@  net6.0;net48;net5.0;netstandard2.0;net472;net461 - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 SQLite transport for https://github.com/blehnen/DotNetWorkQueue diff --git a/Source/DotNetWorkQueue.Transport.Shared/DotNetWorkQueue.Transport.Shared.csproj b/Source/DotNetWorkQueue.Transport.Shared/DotNetWorkQueue.Transport.Shared.csproj index 0d4cb55b..112ee655 100644 --- a/Source/DotNetWorkQueue.Transport.Shared/DotNetWorkQueue.Transport.Shared.csproj +++ b/Source/DotNetWorkQueue.Transport.Shared/DotNetWorkQueue.Transport.Shared.csproj @@ -2,7 +2,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 Shared libaries for transports diff --git a/Source/DotNetWorkQueue.Transport.SqlServer.IntegrationTests/Admin/SimpleConsumer.cs b/Source/DotNetWorkQueue.Transport.SqlServer.IntegrationTests/Admin/SimpleConsumer.cs new file mode 100644 index 00000000..53a6a21c --- /dev/null +++ b/Source/DotNetWorkQueue.Transport.SqlServer.IntegrationTests/Admin/SimpleConsumer.cs @@ -0,0 +1,27 @@ +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.IntegrationTests.Shared; +using DotNetWorkQueue.Transport.SqlServer.Basic; +using DotNetWorkQueue.Transport.SqlServer.IntegrationTests; +using Xunit; + +namespace DotNetWorkQueue.Transport.SqlServer.Integration.Tests.Admin +{ + [Collection("Consumeradmin")] + public class SimpleConsumer + { + [Theory] + [InlineData(10, 10, 60, 5, false), + InlineData(10, 10, 30, 10, true)] + public void Run(int messageCount, int runtime, int timeOut, int workerCount, bool useTransactions) + { + var queueName = GenerateQueueName.Create(); + var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin(); + consumer.Run(new QueueConnection(queueName, ConnectionInfo.ConnectionString), + messageCount, runtime, timeOut, workerCount, false, x => Helpers.SetOptions(x, + false, !useTransactions, useTransactions, + false, + false, !useTransactions, true, false), + Helpers.GenerateData, Helpers.Verify, Helpers.VerifyQueueCount); + } + } +} diff --git a/Source/DotNetWorkQueue.Transport.SqlServer/Basic/SqlServerCommandStringCache.cs b/Source/DotNetWorkQueue.Transport.SqlServer/Basic/SqlServerCommandStringCache.cs index 1c2e54ba..d2de9138 100644 --- a/Source/DotNetWorkQueue.Transport.SqlServer/Basic/SqlServerCommandStringCache.cs +++ b/Source/DotNetWorkQueue.Transport.SqlServer/Basic/SqlServerCommandStringCache.cs @@ -137,6 +137,12 @@ protected override void BuildCommands() CommandCache.Add(CommandStringTypes.FindErrorRecordsToDelete, $"select queueid from {TableNameHelper.MetaDataErrorsName} with (updlock, readpast, rowlock) where @CurrentDate > lastexceptiondate"); + CommandCache.Add(CommandStringTypes.GetQueueCountStatus, + $"select count_big(queueid) from {TableNameHelper.MetaDataName} where status = @status"); + + CommandCache.Add(CommandStringTypes.GetQueueCountAll, + $"select count_big(queueid) from {TableNameHelper.MetaDataName}"); + CommandCache.Add(CommandStringTypes.FindExpiredRecordsToDelete, $"select queueid from {TableNameHelper.MetaDataName} with (updlock, readpast, rowlock) where GETUTCDate() > ExpirationTime"); diff --git a/Source/DotNetWorkQueue.Transport.SqlServer/DotNetWorkQueue.Transport.SqlServer.csproj b/Source/DotNetWorkQueue.Transport.SqlServer/DotNetWorkQueue.Transport.SqlServer.csproj index 73df4f02..bc2cdc24 100644 --- a/Source/DotNetWorkQueue.Transport.SqlServer/DotNetWorkQueue.Transport.SqlServer.csproj +++ b/Source/DotNetWorkQueue.Transport.SqlServer/DotNetWorkQueue.Transport.SqlServer.csproj @@ -3,7 +3,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 true - 0.6.7 + 0.6.8 Brian Lehnen Copyright © Brian Lehnen 2015-2022 SQL server transport for https://github.com/blehnen/DotNetWorkQueue diff --git a/Source/DotNetWorkQueue/Admin/AdminApi.cs b/Source/DotNetWorkQueue/Admin/AdminApi.cs new file mode 100644 index 00000000..d513ee08 --- /dev/null +++ b/Source/DotNetWorkQueue/Admin/AdminApi.cs @@ -0,0 +1,141 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using DotNetWorkQueue.Configuration; +using DotNetWorkQueue.Validation; + +namespace DotNetWorkQueue.Admin +{ + /// + /// Implementation of IAdminApi + /// + internal class AdminApi: IAdminApi + { + private readonly ConcurrentDictionary> _queueConnections; + private readonly ConcurrentDictionary _queueFunctions; + private int _disposeCount; + + public AdminApi(AdminApiConfiguration configuration) + { + Guard.NotNull(() => configuration, configuration); + Configuration = configuration; + _queueConnections = new ConcurrentDictionary>(); + _queueFunctions = new ConcurrentDictionary(); + } + + public IReadOnlyDictionary> Connections => _queueConnections; + + public AdminApiConfiguration Configuration { get; } + + public Guid AddQueueConnection(IQueueContainer container, QueueConnection connection) + { + ThrowIfDisposed(); + Guard.NotNull(() => connection, connection); + var id = Guid.NewGuid(); + var added = _queueConnections.TryAdd(id, + new Tuple(container, connection)); + + //failing makes no sense here, but check + if (!added) + { + throw new InvalidOperationException("Failed to add connection"); + } + return id; + } + + #region Implementations + public long? Count(Guid id, QueueStatusAdmin? status) + { + ThrowIfDisposed(); + var functions = ObtainFunctions(id); + return functions.Count(status); + } + #endregion + + #region Create function per connection/queue + private IAdminFunctions ObtainFunctions(Guid id) + { + if (_queueFunctions.ContainsKey(id)) + { + return _queueFunctions[id]; + } + + if (_queueConnections.TryGetValue(id, out var data)) + { + var connection = data.Item2; + var container = data.Item1; + var functions = container.CreateAdminFunctions(connection); + _queueFunctions.TryAdd(id, functions); + return functions; + } + throw new InvalidOperationException($"id {id} was not found"); + } + #endregion + + #region IDisposable, IsDisposed + /// + /// Throws an exception if this instance has been disposed. + /// + /// The name. + /// + protected void ThrowIfDisposed([CallerMemberName] string name = "") + { + if (Interlocked.CompareExchange(ref _disposeCount, 0, 0) != 0) + { + throw new ObjectDisposedException(name); + } + } + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual void Dispose(bool disposing) + { + if (!disposing) return; + + if (Interlocked.Increment(ref _disposeCount) != 1) return; + + _queueFunctions.Clear(); + _queueConnections.Clear(); + } + + /// + /// Gets a value indicating whether this instance is disposed. + /// + /// + /// true if this instance is disposed; otherwise, false. + /// + public bool IsDisposed => Interlocked.CompareExchange(ref _disposeCount, 0, 0) != 0; + + #endregion + } +} diff --git a/Source/DotNetWorkQueue/Configuration/AdminApiConfiguration.cs b/Source/DotNetWorkQueue/Configuration/AdminApiConfiguration.cs new file mode 100644 index 00000000..393e6f0f --- /dev/null +++ b/Source/DotNetWorkQueue/Configuration/AdminApiConfiguration.cs @@ -0,0 +1,29 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +namespace DotNetWorkQueue.Configuration +{ + /// + /// Configuration class for the admin API + /// + /// Currently empty + public class AdminApiConfiguration + { + + } +} diff --git a/Source/DotNetWorkQueue/DotNetWorkQueue.csproj b/Source/DotNetWorkQueue/DotNetWorkQueue.csproj index 7b8c515b..ad209f6a 100644 --- a/Source/DotNetWorkQueue/DotNetWorkQueue.csproj +++ b/Source/DotNetWorkQueue/DotNetWorkQueue.csproj @@ -4,7 +4,7 @@ net6.0;net48;net5.0;netstandard2.0;net472;net461 Brian Lehnen - 0.6.7 + 0.6.8 Work queue for dot net 4.6.1, 4.7.2, 4.8, dot net standard 2.0, dot net 5 and dot net 6 SQL server, SQLite, Redis and PostgreSQL transports are available. diff --git a/Source/DotNetWorkQueue/IAdminApi.cs b/Source/DotNetWorkQueue/IAdminApi.cs new file mode 100644 index 00000000..d7106812 --- /dev/null +++ b/Source/DotNetWorkQueue/IAdminApi.cs @@ -0,0 +1,56 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using DotNetWorkQueue.Configuration; + +namespace DotNetWorkQueue +{ + /// + /// Admin API wrapper for admin functions per-queue + /// + public interface IAdminApi : IDisposable, IIsDisposed + { + /// + /// Gets the configuration. + /// + AdminApiConfiguration Configuration { get; } + + /// + /// Current configured connections + /// + IReadOnlyDictionary> Connections { get; } + + /// + /// Adds a new connection + /// + /// + /// + /// An Id for the connection, used in query calls + Guid AddQueueConnection(IQueueContainer container, QueueConnection connection); + + /// + /// returns the count of items in the queue + /// + /// + /// + /// + long? Count(Guid id, QueueStatusAdmin? status); + } +} diff --git a/Source/DotNetWorkQueue/IAdminFunctions.cs b/Source/DotNetWorkQueue/IAdminFunctions.cs new file mode 100644 index 00000000..a8da0ad6 --- /dev/null +++ b/Source/DotNetWorkQueue/IAdminFunctions.cs @@ -0,0 +1,50 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +namespace DotNetWorkQueue +{ + /// + /// Functions for modifying or listing existing data in the queue + /// + public interface IAdminFunctions + { + /// + /// Returns a count of items in the queue. + /// + /// A status to filter on, null if none + /// + long? Count(QueueStatusAdmin? status); + } + + /// + /// The possible status for items in the queue + /// + public enum QueueStatusAdmin : short + { + /// + /// Waiting for processing + /// + /// Already queue, but waiting for processing + Waiting = 0, + /// + /// Currently being processed + /// + /// In the queue and in the middle of being processed + Processing = 1, + } +} diff --git a/Source/DotNetWorkQueue/IQueueContainer.cs b/Source/DotNetWorkQueue/IQueueContainer.cs index fb3973bf..186e96fd 100644 --- a/Source/DotNetWorkQueue/IQueueContainer.cs +++ b/Source/DotNetWorkQueue/IQueueContainer.cs @@ -129,5 +129,18 @@ IProducerMethodJobQueue CreateMethodJobProducer( /// Queue and connection information. /// IJobSchedulerLastKnownEvent CreateJobSchedulerLastKnownEvent(QueueConnection queueConnection); + + /// + /// A container for accessing data for multiple queues. + /// + /// + IAdminApi CreateAdminApi(); + + /// + /// Creates the admin functions, for modifying and listing data in the queue. + /// + /// + /// + IAdminFunctions CreateAdminFunctions(QueueConnection queueConnection); } } diff --git a/Source/DotNetWorkQueue/IoC/ComponentRegistration.cs b/Source/DotNetWorkQueue/IoC/ComponentRegistration.cs index 7ec84591..07dbc366 100644 --- a/Source/DotNetWorkQueue/IoC/ComponentRegistration.cs +++ b/Source/DotNetWorkQueue/IoC/ComponentRegistration.cs @@ -19,6 +19,7 @@ using System; using System.Diagnostics; using System.Linq; +using DotNetWorkQueue.Admin; using DotNetWorkQueue.Cache; using DotNetWorkQueue.Configuration; using DotNetWorkQueue.Factory; @@ -296,6 +297,9 @@ private static void RegisterSharedDefaults(IContainer container, QueueConnection container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); container.RegisterCollection(Enumerable.Empty()); diff --git a/Source/DotNetWorkQueue/IoC/CreateContainer.cs b/Source/DotNetWorkQueue/IoC/CreateContainer.cs index 944da55e..1f1ff3ba 100644 --- a/Source/DotNetWorkQueue/IoC/CreateContainer.cs +++ b/Source/DotNetWorkQueue/IoC/CreateContainer.cs @@ -81,7 +81,7 @@ public IContainer Create(QueueContexts queueType, var type = GetRegistrationType(register); - if (!string.IsNullOrWhiteSpace(queueConnection.Queue) && !string.IsNullOrWhiteSpace(queueConnection.Connection)) + if ((!string.IsNullOrWhiteSpace(queueConnection.Queue) && !string.IsNullOrWhiteSpace(queueConnection.Connection)) || queueType == QueueContexts.Admin) { ComponentRegistration.RegisterDefaults(containerWrapper, type, queueConnection); } diff --git a/Source/DotNetWorkQueue/QueueContainer.cs b/Source/DotNetWorkQueue/QueueContainer.cs index 8355d76c..72746fb0 100644 --- a/Source/DotNetWorkQueue/QueueContainer.cs +++ b/Source/DotNetWorkQueue/QueueContainer.cs @@ -386,6 +386,34 @@ public IGetTimeFactory CreateTimeSync(string connection) return factory; } #endregion + + #region Admin + + /// + /// Creates the admin API wrapper; this can contain multiple queues. + /// + /// + public IAdminApi CreateAdminApi() + { + var container = _createContainerInternal().Create(QueueContexts.Admin, _registerService, _transportInit, x => { }, _setOptions); + Containers.Add(container); + return container.GetInstance(); + } + + /// + /// Creates an admin function module for querying or modifying data in a queue. + /// + /// + /// + public IAdminFunctions CreateAdminFunctions(QueueConnection queueConnection) + { + Guard.NotNull(() => queueConnection, queueConnection); + + var container = _createContainerInternal().Create(QueueContexts.Admin, _registerService, queueConnection, _transportInit, ConnectionTypes.Status, x => { }, _setOptions); + Containers.Add(container); + return container.GetInstance(); + } + #endregion } #endregion } diff --git a/Source/DotNetWorkQueue/QueueContexts.cs b/Source/DotNetWorkQueue/QueueContexts.cs index 102f816e..61cd3440 100644 --- a/Source/DotNetWorkQueue/QueueContexts.cs +++ b/Source/DotNetWorkQueue/QueueContexts.cs @@ -84,8 +84,12 @@ public enum QueueContexts /// JobScheduler = 17, /// - /// Returns time from + /// Returns time from transport or local clock /// - Time = 18 + Time = 18, + /// + /// Admin functions, such as listing items already in the queue + /// + Admin = 19 } } diff --git a/Source/DotNetWorkQueue/Transport/Memory/Basic/Admin/AdminFunctions.cs b/Source/DotNetWorkQueue/Transport/Memory/Basic/Admin/AdminFunctions.cs new file mode 100644 index 00000000..6ea1a450 --- /dev/null +++ b/Source/DotNetWorkQueue/Transport/Memory/Basic/Admin/AdminFunctions.cs @@ -0,0 +1,46 @@ +// --------------------------------------------------------------------- +//This file is part of DotNetWorkQueue +//Copyright © 2015-2022 Brian Lehnen +// +//This library is free software; you can redistribute it and/or +//modify it under the terms of the GNU Lesser General Public +//License as published by the Free Software Foundation; either +//version 2.1 of the License, or (at your option) any later version. +// +//This library is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +//Lesser General Public License for more details. +// +//You should have received a copy of the GNU Lesser General Public +//License along with this library; if not, write to the Free Software +//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +// --------------------------------------------------------------------- +namespace DotNetWorkQueue.Transport.Memory.Basic.Admin +{ + internal class AdminFunctions: IAdminFunctions + { + private readonly IDataStorage _storage; + public AdminFunctions(IDataStorage storage) + { + _storage = storage; + } + public long? Count(QueueStatusAdmin? status) + { + if (!status.HasValue) + { + return _storage.RecordCount; + } + + switch (status.Value) + { + case QueueStatusAdmin.Processing: + return _storage.WorkingRecordCount; + case QueueStatusAdmin.Waiting: + return _storage.RecordCount; + } + + return null; + } + } +} diff --git a/Source/DotNetWorkQueue/Transport/Memory/Basic/DataStorage.cs b/Source/DotNetWorkQueue/Transport/Memory/Basic/DataStorage.cs index 216faa88..b6334fd0 100644 --- a/Source/DotNetWorkQueue/Transport/Memory/Basic/DataStorage.cs +++ b/Source/DotNetWorkQueue/Transport/Memory/Basic/DataStorage.cs @@ -205,7 +205,7 @@ public IReceivedMessageInternal GetNextMessage(List routes, TimeSpan tim CancellationTokenSource.CreateLinkedTokenSource(_cancelToken.CancelWorkToken, _cancelToken.StopWorkToken)) { - Guid id = Guid.Empty; + Guid id; try { if (!Queues[_connectionInformation].TryTake(out id, Convert.ToInt32(timeout.TotalMilliseconds), linkedCts.Token)) @@ -319,6 +319,9 @@ public QueueStatuses DoesJobExist(string jobName, DateTimeOffset scheduledTime) /// public long RecordCount => QueueData[_connectionInformation].Count; + /// + public long WorkingRecordCount => QueueWorking[_connectionInformation].Count; + /// public long GetErrorCount() { diff --git a/Source/DotNetWorkQueue/Transport/Memory/Basic/MemoryMessageQueueInit.cs b/Source/DotNetWorkQueue/Transport/Memory/Basic/MemoryMessageQueueInit.cs index a7b9bffb..a3e5f50c 100644 --- a/Source/DotNetWorkQueue/Transport/Memory/Basic/MemoryMessageQueueInit.cs +++ b/Source/DotNetWorkQueue/Transport/Memory/Basic/MemoryMessageQueueInit.cs @@ -20,6 +20,7 @@ using System.Collections.Generic; using DotNetWorkQueue.Configuration; using DotNetWorkQueue.IoC; +using DotNetWorkQueue.Transport.Memory.Basic.Admin; using DotNetWorkQueue.Transport.Memory.Basic.Factory; using DotNetWorkQueue.Transport.Memory.Trace.Decorator; using DotNetWorkQueue.Validation; @@ -55,6 +56,7 @@ public override void RegisterImplementations(IContainer container, RegistrationT container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); container.Register(LifeStyles.Singleton); + container.Register(LifeStyles.Singleton); //**all //**send diff --git a/Source/DotNetWorkQueue/Transport/Memory/IDataStorage.cs b/Source/DotNetWorkQueue/Transport/Memory/IDataStorage.cs index 5179f2b6..d6ddaee1 100644 --- a/Source/DotNetWorkQueue/Transport/Memory/IDataStorage.cs +++ b/Source/DotNetWorkQueue/Transport/Memory/IDataStorage.cs @@ -18,9 +18,6 @@ // --------------------------------------------------------------------- using System; using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - namespace DotNetWorkQueue.Transport.Memory { /// @@ -86,6 +83,11 @@ public interface IDataStorage : IClear /// long RecordCount { get; } + /// + /// The number of records currently being processed + /// + long WorkingRecordCount { get; } + /// /// Gets the error count. /// diff --git a/Source/Samples/LiteDb/LiteDbProducer/Program.cs b/Source/Samples/LiteDb/LiteDbProducer/Program.cs index b9e32aa7..995105cd 100644 --- a/Source/Samples/LiteDb/LiteDbProducer/Program.cs +++ b/Source/Samples/LiteDb/LiteDbProducer/Program.cs @@ -58,7 +58,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } diff --git a/Source/Samples/LiteDb/LiteDbProducerConsumer/Program.cs b/Source/Samples/LiteDb/LiteDbProducerConsumer/Program.cs index 08bfbbbd..d1a5750b 100644 --- a/Source/Samples/LiteDb/LiteDbProducerConsumer/Program.cs +++ b/Source/Samples/LiteDb/LiteDbProducerConsumer/Program.cs @@ -124,7 +124,11 @@ static void Main(string[] args) using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } } diff --git a/Source/Samples/LiteDb/LiteDbProducerLinq/Program.cs b/Source/Samples/LiteDb/LiteDbProducerLinq/Program.cs index cc804f28..709da0a8 100644 --- a/Source/Samples/LiteDb/LiteDbProducerLinq/Program.cs +++ b/Source/Samples/LiteDb/LiteDbProducerLinq/Program.cs @@ -63,7 +63,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateMethodProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, admin); + } } } diff --git a/Source/Samples/PostgreSQL/PostgreSQLProducer/Program.cs b/Source/Samples/PostgreSQL/PostgreSQLProducer/Program.cs index 6fb12f7c..0dd881fe 100644 --- a/Source/Samples/PostgreSQL/PostgreSQLProducer/Program.cs +++ b/Source/Samples/PostgreSQL/PostgreSQLProducer/Program.cs @@ -57,7 +57,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } diff --git a/Source/Samples/PostgreSQL/PostgreSQLProducerLinq/Program.cs b/Source/Samples/PostgreSQL/PostgreSQLProducerLinq/Program.cs index 7c90a2f4..cee286f0 100644 --- a/Source/Samples/PostgreSQL/PostgreSQLProducerLinq/Program.cs +++ b/Source/Samples/PostgreSQL/PostgreSQLProducerLinq/Program.cs @@ -57,7 +57,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateMethodProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, admin); + } } } diff --git a/Source/Samples/Redis/RedisProducer/Program.cs b/Source/Samples/Redis/RedisProducer/Program.cs index 6acf3c81..f8ac070f 100644 --- a/Source/Samples/Redis/RedisProducer/Program.cs +++ b/Source/Samples/Redis/RedisProducer/Program.cs @@ -33,7 +33,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } diff --git a/Source/Samples/Redis/RedisProducerLinq/Program.cs b/Source/Samples/Redis/RedisProducerLinq/Program.cs index 8003415a..99a52180 100644 --- a/Source/Samples/Redis/RedisProducerLinq/Program.cs +++ b/Source/Samples/Redis/RedisProducerLinq/Program.cs @@ -37,7 +37,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateMethodProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, admin); + } } } diff --git a/Source/Samples/SQLServer/SQLServerProducer/Program.cs b/Source/Samples/SQLServer/SQLServerProducer/Program.cs index 5aa7cc85..e1f9c90b 100644 --- a/Source/Samples/SQLServer/SQLServerProducer/Program.cs +++ b/Source/Samples/SQLServer/SQLServerProducer/Program.cs @@ -72,7 +72,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } diff --git a/Source/Samples/SQLServer/SQLServerProducerLinq/Program.cs b/Source/Samples/SQLServer/SQLServerProducerLinq/Program.cs index 6a0b9bf3..b300abb3 100644 --- a/Source/Samples/SQLServer/SQLServerProducerLinq/Program.cs +++ b/Source/Samples/SQLServer/SQLServerProducerLinq/Program.cs @@ -63,7 +63,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateMethodProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, admin); + } } } diff --git a/Source/Samples/SQLite/SQLiteProducer/Program.cs b/Source/Samples/SQLite/SQLiteProducer/Program.cs index 676601d5..bf4a937a 100644 --- a/Source/Samples/SQLite/SQLiteProducer/Program.cs +++ b/Source/Samples/SQLite/SQLiteProducer/Program.cs @@ -61,7 +61,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, DelayedProcessing, admin); + } } } diff --git a/Source/Samples/SQLite/SQLiteProducer/SQLiteProducer.csproj b/Source/Samples/SQLite/SQLiteProducer/SQLiteProducer.csproj index 2bf89c92..50a83eae 100644 --- a/Source/Samples/SQLite/SQLiteProducer/SQLiteProducer.csproj +++ b/Source/Samples/SQLite/SQLiteProducer/SQLiteProducer.csproj @@ -40,6 +40,7 @@ + @@ -49,6 +50,7 @@ + diff --git a/Source/Samples/SQLite/SQLiteProducerLinq/Program.cs b/Source/Samples/SQLite/SQLiteProducerLinq/Program.cs index deb1437a..3205ce4a 100644 --- a/Source/Samples/SQLite/SQLiteProducerLinq/Program.cs +++ b/Source/Samples/SQLite/SQLiteProducerLinq/Program.cs @@ -66,7 +66,11 @@ static void Main(string[] args) { using (var queue = queueContainer.CreateMethodProducer(queueConnection)) { - RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture); + using (var admin = queueContainer.CreateAdminApi()) + { + admin.AddQueueConnection(queueContainer, queueConnection); + RunProducer.RunLoop(queue, ExpiredData, ExpiredDataFuture, admin); + } } } diff --git a/Source/Samples/SampleShared/RunProducer.cs b/Source/Samples/SampleShared/RunProducer.cs index 07efbfc7..2136f2ea 100644 --- a/Source/Samples/SampleShared/RunProducer.cs +++ b/Source/Samples/SampleShared/RunProducer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using DotNetWorkQueue; using DotNetWorkQueue.Messages; @@ -135,11 +136,12 @@ public static List RunBatch(IProducerQueue q return data; } - public static void RunLoop(IProducerMethodQueue queue, Func expiredDataInstant, Func expiredDataFuture) + public static void RunLoop(IProducerMethodQueue queue, Func expiredDataInstant, Func expiredDataFuture, IAdminApi admin) { var keepRunning = true; while (keepRunning) { + Console.WriteLine($"{admin.Count(admin.Connections.Keys.FirstOrDefault(), null)} items in queue"); Console.WriteLine(@"To test heartbeat recovery, force kill your consumer after starting to process record(s) To test rollbacks, cancel the consumer by pressing any button. Easier to test with longer running jobs. @@ -180,11 +182,12 @@ public static void RunLoop(IProducerMethodQueue queue, Func queue, Func expiredDataInstant, Func expiredDataFuture, - Func delayProcessing) + Func delayProcessing, IAdminApi admin) { var keepRunning = true; while (keepRunning) { + Console.WriteLine($"{admin.Count(admin.Connections.Keys.FirstOrDefault(), null)} items in queue"); Console.WriteLine(@"To test heartbeat recovery, force kill your consumer after starting to process record(s) To test rollbacks, cancel the consumer by pressing any button. Easier to test with longer running jobs. diff --git a/appveyor.yml b/appveyor.yml index c3ea92b2..34527e9e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,4 +1,4 @@ -version: 0.6.4.{build} +version: 0.6.8.{build} branches: only: - master