Skip to content

Commit

Permalink
Inital admin support
Browse files Browse the repository at this point in the history
  • Loading branch information
blehnen committed Jul 19, 2022
1 parent 53a031e commit 1161993
Show file tree
Hide file tree
Showing 67 changed files with 1,276 additions and 40 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
###0.6.8 2022-07-19
* Update npgsql
* Add inital admin interface

###0.6.7 2022-06-30
* Update various packages to latest versions

Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ This library uses multiple 3rd party libaries, listed below.

##### Developed with:

[![Resharper](http://neventstore.org/images/logo_resharper_small.gif)](http://www.jetbrains.com/resharper/)
[![dotCover](http://neventstore.org/images/logo_dotcover_small.gif)](http://www.jetbrains.com/dotcover/)
[![dotTrace](http://neventstore.org/images/logo_dottrace_small.gif)](http://www.jetbrains.com/dottrace/)

<img src="https://resources.jetbrains.com/storage/products/company/brand/logos/ReSharper_icon.png" width="48">
<img src="https://resources.jetbrains.com/storage/products/company/brand/logos/dotCover_icon.png" width="48">
<img src="https://resources.jetbrains.com/storage/products/company/brand/logos/dotTrace_icon.png" width="48">
6 changes: 3 additions & 3 deletions SharedAssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
[assembly: AssemblyCompany("Brian Lehnen")]
[assembly: AssemblyCopyright("Copyright © Brian Lehnen 2022")]
[assembly: ComVisible(false)]
[assembly: AssemblyVersion("0.6.7")]
[assembly: AssemblyFileVersion("0.6.7")]
[assembly: AssemblyInformationalVersion("0.6.7")]
[assembly: AssemblyVersion("0.6.8")]
[assembly: AssemblyFileVersion("0.6.8")]
[assembly: AssemblyInformationalVersion("0.6.8")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
An implementation of AppMetrics (
https://github.com/AppMetrics/AppMetrics) for https://github.com/blehnen/DotNetWorkQueue
</Description>
<Version>0.6.7</Version>
<Version>0.6.8</Version>
<Authors>Brian Lehnen</Authors>
<Copyright>Copyright © Brian Lehnen 2015-2022</Copyright>
<PackageLicenseExpression>LGPL-2.1-or-later</PackageLicenseExpression>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Threading;
using DotNetWorkQueue.Configuration;
using Microsoft.Extensions.Logging;
using Xunit;

namespace DotNetWorkQueue.IntegrationTests.Shared.Admin
{
public class AdminSharedConsumer<TMessage>
where TMessage : class
{
public void RunConsumer<TTransportInit>(QueueConnection queueConnection, bool addInterceptors,
ILogger logProvider,
int runTime, int messageCount,
int workerCount, int timeOut,
TimeSpan heartBeatTime, TimeSpan heartBeatMonitorTime, string updateTime, bool enableChaos,
ICreationScope scope)
where TTransportInit : ITransportInit, new()
{

using (var trace = SharedSetup.CreateTrace("consumer-admin"))
{
if (enableChaos)
timeOut *= 2;

using (var metrics = new Metrics.Metrics(queueConnection.Queue))
{
var addInterceptorConsumer = InterceptorAdding.No;
if (addInterceptors)
{
addInterceptorConsumer = InterceptorAdding.ConfigurationOnly;
}

var processedCount = new IncrementWrapper();
using (
var creator = SharedSetup.CreateCreator<TTransportInit>(addInterceptorConsumer, logProvider,
metrics, false, enableChaos, scope, trace.Source)
)
{
using (
var queue =
creator.CreateConsumer(queueConnection))
{
SharedSetup.SetupDefaultConsumerQueue(queue.Configuration, workerCount, heartBeatTime,
heartBeatMonitorTime, updateTime, null);
var waitForFinish = new ManualResetEventSlim(false);
waitForFinish.Reset();

var admin = creator.CreateAdminFunctions(queueConnection);
var count = admin.Count(null);
Assert.Equal(messageCount, count);

//start looking for work
queue.Start<TMessage>((message, notifications) =>
{
MessageHandlingShared.HandleFakeMessages(message, runTime, processedCount, messageCount,
waitForFinish);
});

if (messageCount <= workerCount && runTime > 10)
{
Thread.Sleep(runTime / 2);
var working = admin.Count(QueueStatusAdmin.Processing);
var waiting = admin.Count(QueueStatusAdmin.Waiting);
Assert.Equal(0, waiting);
Assert.Equal(messageCount, working);
}
else if(runTime > 10)
{
Thread.Sleep(runTime / 2);
var working = admin.Count(QueueStatusAdmin.Processing);
Assert.Equal(workerCount, working);
}

waitForFinish.Wait(timeOut * 1000);
}

Assert.Null(processedCount.IdError);
Assert.Equal(messageCount, processedCount.ProcessedCount);
VerifyMetrics.VerifyProcessedCount(queueConnection.Queue, metrics.GetCurrentMetrics(),
messageCount);
LoggerShared.CheckForErrors(queueConnection.Queue);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.IntegrationTests.Shared.Consumer;
using DotNetWorkQueue.IntegrationTests.Shared.Producer;
using DotNetWorkQueue.Messages;
using Xunit;

namespace DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation
{
public class SimpleConsumerAdmin
{
public void Run<TTransportInit, TMessage, TTransportCreate>(
QueueConnection queueConnection,
int messageCount,
int runtime,
int timeOut,
int workerCount,
bool enableChaos,
Action<TTransportCreate> setOptions,
Func<QueueProducerConfiguration, AdditionalMessageData> generateData,
Action<QueueConnection, QueueProducerConfiguration, long, ICreationScope> verify,
Action<QueueConnection, IBaseTransportOptions, ICreationScope, int, bool, bool> verifyQueueCount)
where TTransportInit : ITransportInit, new()
where TMessage : class
where TTransportCreate : class, IQueueCreation
{

var logProvider = LoggerShared.Create(queueConnection.Queue, GetType().Name);
using (
var queueCreator =
new QueueCreationContainer<TTransportInit>(
serviceRegister => serviceRegister.Register(() => logProvider, LifeStyles.Singleton)))
{
ICreationScope scope = null;
var oCreation = queueCreator.GetQueueCreation<TTransportCreate>(queueConnection);
try
{
setOptions(oCreation);
var result = oCreation.CreateQueue();
Assert.True(result.Success, result.ErrorMessage);
scope = oCreation.Scope;

var producer = new ProducerShared();
producer.RunTest<TTransportInit, TMessage>(queueConnection, false, messageCount,
logProvider, generateData,
verify, false, scope, false);

var consumer = new AdminSharedConsumer<TMessage>();
consumer.RunConsumer<TTransportInit>(queueConnection,
false,
logProvider,
runtime, messageCount,
workerCount, timeOut,
TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(35), "second(*%10)", enableChaos, scope);

verifyQueueCount(queueConnection, oCreation.BaseTransportOptions, scope, 0, false,
false);
}
finally
{
oCreation?.RemoveQueue();
oCreation?.Dispose();
scope?.Dispose();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ public void RunTest<TTransportInit, TMessage>(QueueConnection queueConnection,
}

if (validateMetricCounts)
{
VerifyMetrics.VerifyProducedCount(queueConnection.Queue, metrics.GetCurrentMetrics(),
messageCount);

var admin = creator.CreateAdminFunctions(queueConnection);
Assert.Equal(messageCount, admin.Count(null));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.IntegrationTests.Shared;
using DotNetWorkQueue.Transport.LiteDb.Basic;
using Xunit;

namespace DotNetWorkQueue.Transport.LiteDb.IntegrationTests.Admin
{
[Collection("Consumeradmin")]
public class SimpleConsumer
{
[Theory]
[InlineData(10, 10, 120, 2, false, IntegrationConnectionInfo.ConnectionTypes.Direct),
InlineData(2, 10, 120, 2, false, IntegrationConnectionInfo.ConnectionTypes.Memory),
InlineData(15, 10, 120, 4, true, IntegrationConnectionInfo.ConnectionTypes.Shared)]
public void Run(int messageCount, int runtime, int timeOut, int workerCount, bool enableChaos, IntegrationConnectionInfo.ConnectionTypes connectionType)
{
using (var connectionInfo = new IntegrationConnectionInfo(connectionType))
{
var queueName = GenerateQueueName.Create();
var consumer = new DotNetWorkQueue.IntegrationTests.Shared.Admin.Implementation.SimpleConsumerAdmin();
consumer.Run<LiteDbMessageQueueInit, FakeMessage, LiteDbMessageQueueCreation>(new QueueConnection(queueName,
connectionInfo.ConnectionString),
messageCount, runtime, timeOut, workerCount, enableChaos, x => { },
Helpers.GenerateData, Helpers.Verify, Helpers.VerifyQueueCount);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// ---------------------------------------------------------------------
//This file is part of DotNetWorkQueue
//Copyright © 2015-2022 Brian Lehnen
//
//This library is free software; you can redistribute it and/or
//modify it under the terms of the GNU Lesser General Public
//License as published by the Free Software Foundation; either
//version 2.1 of the License, or (at your option) any later version.
//
//This library is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
//Lesser General Public License for more details.
//
//You should have received a copy of the GNU Lesser General Public
//License along with this library; if not, write to the Free Software
//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
// ---------------------------------------------------------------------
using DotNetWorkQueue.Transport.LiteDb.Basic.Query;
using DotNetWorkQueue.Transport.Shared;
using DotNetWorkQueue.Validation;

namespace DotNetWorkQueue.Transport.LiteDb.Basic.Admin
{
/// <summary>
/// Admin function implementation
/// </summary>
internal class AdminFunctions: IAdminFunctions
{
private readonly IQueryHandler<GetQueueCountQuery, long> _queueCount;
public AdminFunctions(
IQueryHandler<GetQueueCountQuery, long> queueCount)
{
Guard.NotNull(() => queueCount, queueCount);
_queueCount = queueCount;

}
public long? Count(QueueStatusAdmin? status)
{
return _queueCount.Handle(new GetQueueCountQuery(status));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public LiteDbConnectionManager(IConnectionInformation connectionInformation, ICr
Guard.NotNull(() => scope, scope);
_connectionInformation = connectionInformation;

if (string.IsNullOrEmpty(_connectionInformation.ConnectionString) ||
string.IsNullOrEmpty(_connectionInformation.QueueName))
{
return;
}

var builder = new LiteDB.ConnectionString(_connectionInformation.ConnectionString);
_shared = builder.Connection == ConnectionType.Shared;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using DotNetWorkQueue.Configuration;
using DotNetWorkQueue.IoC;
using DotNetWorkQueue.Queue;
using DotNetWorkQueue.Transport.LiteDb.Basic.Admin;
using DotNetWorkQueue.Transport.LiteDb.Basic.CommandHandler;
using DotNetWorkQueue.Transport.LiteDb.Basic.Factory;
using DotNetWorkQueue.Transport.LiteDb.Basic.Message;
Expand Down Expand Up @@ -82,6 +83,7 @@ public override void RegisterImplementations(IContainer container, RegistrationT
container.Register<IIncreaseQueueDelay, IncreaseQueueDelay>(LifeStyles.Singleton);
container.Register<IJobSchema, LiteDbJobSchema>(LifeStyles.Singleton);
container.Register<CreateJobMetaData>(LifeStyles.Singleton);
container.Register<IAdminFunctions, AdminFunctions>(LifeStyles.Singleton);
//**all

//**send
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// ---------------------------------------------------------------------
//This file is part of DotNetWorkQueue
//Copyright © 2015-2022 Brian Lehnen
//
//This library is free software; you can redistribute it and/or
//modify it under the terms of the GNU Lesser General Public
//License as published by the Free Software Foundation; either
//version 2.1 of the License, or (at your option) any later version.
//
//This library is distributed in the hope that it will be useful,
//but WITHOUT ANY WARRANTY; without even the implied warranty of
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
//Lesser General Public License for more details.
//
//You should have received a copy of the GNU Lesser General Public
//License along with this library; if not, write to the Free Software
//Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
// ---------------------------------------------------------------------
using DotNetWorkQueue.Transport.Shared;
namespace DotNetWorkQueue.Transport.LiteDb.Basic.Query
{
internal class GetQueueCountQuery : IQuery<long>
{
public GetQueueCountQuery(QueueStatusAdmin? status)
{
Status = status;
}
/// <summary>
/// The status to search for
/// </summary>
/// <remarks>Null if all</remarks>
public QueueStatusAdmin? Status { get; }
}
}
Loading

0 comments on commit 1161993

Please sign in to comment.