From dc9be4d1aaa7d2b3c5a32af2efbeb8fb240cac1e Mon Sep 17 00:00:00 2001 From: Jonas van Daal Date: Mon, 21 Oct 2024 10:46:36 +0200 Subject: [PATCH] feat: add snapshot reproducer --- paket.dependencies | 4 + paket.lock | 12 +- .../BuildingSnapshotReproducer.cs | 69 +++++++ .../BuildingUnitSnapshotReproducer.cs | 71 +++++++ .../Infrastructure/Modules/ApiModule.cs | 187 ++++++++++++------ .../Infrastructure/Startup.cs | 23 ++- .../SnapshotReproducer.cs | 108 ++++++++++ .../appsettings.json | 4 + .../paket.references | 7 + 9 files changed, 419 insertions(+), 66 deletions(-) create mode 100644 src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingSnapshotReproducer.cs create mode 100644 src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingUnitSnapshotReproducer.cs create mode 100644 src/BuildingRegistry.Producer.Snapshot.Oslo/SnapshotReproducer.cs diff --git a/paket.dependencies b/paket.dependencies index a41a4bcda..a15cfee64 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -4,6 +4,9 @@ source https://api.nuget.org/v3/index.json // PRODUCTION STUFF nuget Microsoft.Extensions.Http.Polly 8.0.3 +// AWS +nuget AWSSDK.Extensions.NETCore.Setup 3.7.300 + // For more healtchecks, look at https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks nuget AspNetCore.HealthChecks.SqlServer 8.0.0 nuget AspNetCore.HealthChecks.NpgSql 8.0.0 @@ -78,6 +81,7 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.14.1 +nuget Be.Vlaanderen.Basisregisters.GrAr.Notifications 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1 nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1 diff --git a/paket.lock b/paket.lock index c0cbf1ba8..390ecaa99 100644 --- a/paket.lock +++ b/paket.lock @@ -32,11 +32,18 @@ NUGET AutoFixture.Xunit2 (4.18.1) AutoFixture (>= 4.18.1) xunit.extensibility.core (>= 2.2 < 3.0) - AWSSDK.Core (3.7.302.20) + AWSSDK.Core (3.7.400.36) AWSSDK.DynamoDBv2 (3.7.301.23) AWSSDK.Core (>= 3.7.302.20 < 4.0) + AWSSDK.Extensions.NETCore.Setup (3.7.300) + AWSSDK.Core (>= 3.7.300) + Microsoft.Extensions.Configuration.Abstractions (>= 2.0) + Microsoft.Extensions.DependencyInjection.Abstractions (>= 2.0) + Microsoft.Extensions.Logging.Abstractions (>= 2.0) AWSSDK.S3 (3.7.307) AWSSDK.Core (>= 3.7.302.20 < 4.0) + AWSSDK.SimpleNotificationService (3.7.301.5) + AWSSDK.Core (>= 3.7.302.18 < 4.0) AWSSDK.SQS (3.7.300.59) AWSSDK.Core (>= 3.7.302.20 < 4.0) Azure.Core (1.38) @@ -283,6 +290,9 @@ NUGET Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1) Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0) Newtonsoft.Json (>= 13.0.3) + Be.Vlaanderen.Basisregisters.GrAr.Notifications (21.14.1) + AWSSDK.SimpleNotificationService (>= 3.7.301.3) + System.Text.Json (>= 8.0.3) Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.14.1) Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json (>= 5.0) Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1) diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingSnapshotReproducer.cs b/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingSnapshotReproducer.cs new file mode 100644 index 000000000..63ec94c9b --- /dev/null +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingSnapshotReproducer.cs @@ -0,0 +1,69 @@ +namespace BuildingRegistry.Producer.Snapshot.Oslo +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Be.Vlaanderen.Basisregisters.GrAr.Notifications; + using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer; + using Dapper; + using Microsoft.Extensions.Logging; + using NodaTime; + using Npgsql; + + public class BuildingSnapshotReproducer : SnapshotReproducer + { + private readonly string _integrationConnectionString; + + public BuildingSnapshotReproducer( + string integrationConnectionString, + IOsloProxy osloProxy, + IProducer producer, + IClock clock, + INotificationService notificationService, + int utcHourToRunWithin, + ILoggerFactory loggerFactory) + : base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory) + { + _integrationConnectionString = integrationConnectionString; + } + + protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow) + { + using var connection = new NpgsqlConnection(_integrationConnectionString); + + var todayMidnight = utcNow.Date; + var yesterdayMidnight = todayMidnight.AddDays(-1); + + var records = connection.Query( + $""" + SELECT building_persistent_local_id, position, version_timestamp + FROM integration_building.building_versions + where version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}' and version_timestamp < '{todayMidnight:yyyy-MM-dd}' + """); + + var duplicateEvents = records + .GroupBy(x => new + { + BuildingPersistentLocalId = x.building_persistent_local_id, + TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp + }) + .Where(x => x.Count() > 1) + .Select(x => + { + var latest = x.MaxBy(y => y.position)!; + return (latest.building_persistent_local_id, latest.position); + }) + .ToList(); + + return duplicateEvents; + } + + private sealed class BuildingPosition + { + public int building_persistent_local_id { get; init; } + public long position { get; init; } + public DateTimeOffset version_timestamp { get; init; } + } + } +} diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingUnitSnapshotReproducer.cs b/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingUnitSnapshotReproducer.cs new file mode 100644 index 000000000..ff9021b4b --- /dev/null +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/BuildingUnitSnapshotReproducer.cs @@ -0,0 +1,71 @@ +namespace BuildingRegistry.Producer.Snapshot.Oslo +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Be.Vlaanderen.Basisregisters.GrAr.Notifications; + using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer; + using Dapper; + using Microsoft.Extensions.Logging; + using NodaTime; + using Npgsql; + + public class BuildingUnitSnapshotReproducer : SnapshotReproducer + { + private readonly string _integrationConnectionString; + + public BuildingUnitSnapshotReproducer( + string integrationConnectionString, + IOsloProxy osloProxy, + IProducer producer, + IClock clock, + INotificationService notificationService, + int utcHourToRunWithin, + ILoggerFactory loggerFactory) + : base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory) + { + _integrationConnectionString = integrationConnectionString; + } + + protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow) + { + using var connection = new NpgsqlConnection(_integrationConnectionString); + + var todayMidnight = utcNow.Date; + var yesterdayMidnight = todayMidnight.AddDays(-1); + + var records = connection.Query( + $""" + SELECT building_unit_persistent_local_id, position, version_timestamp + FROM integration_building.building_unit_versions + where + version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}' + and version_timestamp < '{todayMidnight:yyyy-MM-dd}' + """); + + var duplicateEvents = records + .GroupBy(x => new + { + BuildingUnitPersistentLocalId = x.building_unit_persistent_local_id, + TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp + }) + .Where(x => x.Count() > 1) + .Select(x => + { + var latest = x.MaxBy(y => y.position)!; + return (latest.building_unit_persistent_local_id, latest.position); + }) + .ToList(); + + return duplicateEvents; + } + + private sealed class BuildingUnitPosition + { + public int building_unit_persistent_local_id { get; init; } + public long position { get; init; } + public DateTimeOffset version_timestamp { get; init; } + } + } +} diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Modules/ApiModule.cs b/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Modules/ApiModule.cs index 199fffbac..76d91ee70 100644 --- a/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Modules/ApiModule.cs +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Modules/ApiModule.cs @@ -2,11 +2,13 @@ namespace BuildingRegistry.Producer.Snapshot.Oslo.Infrastructure.Modules { using System; using System.Net.Http; + using Amazon.SimpleNotificationService; using Autofac; using Autofac.Extensions.DependencyInjection; using Be.Vlaanderen.Basisregisters.Api.Exceptions; using Be.Vlaanderen.Basisregisters.EventHandling; using Be.Vlaanderen.Basisregisters.EventHandling.Autofac; + using Be.Vlaanderen.Basisregisters.GrAr.Notifications; using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer; @@ -18,6 +20,7 @@ namespace BuildingRegistry.Producer.Snapshot.Oslo.Infrastructure.Modules using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; + using NodaTime; public class ApiModule : Module { @@ -37,6 +40,10 @@ public ApiModule( protected override void Load(ContainerBuilder builder) { + builder.Register(_ => SystemClock.Instance) + .As() + .SingleInstance(); + RegisterProjectionSetup(builder); builder @@ -53,14 +60,12 @@ private void RegisterProjectionSetup(ContainerBuilder builder) new EventHandlingModule( typeof(DomainAssemblyMarker).Assembly, EventsJsonSerializerSettingsProvider.CreateSerializerSettings())) - .RegisterModule() - .RegisterEventstreamModule(_configuration) - .RegisterModule(new ProjectorModule(_configuration)); RegisterProjections(builder); + RegisterReproducers(); } private void RegisterProjections(ContainerBuilder builder) @@ -78,35 +83,17 @@ private void RegisterProjections(ContainerBuilder builder) x.ConfigureCatchUpUpdatePositionMessageInterval(Convert.ToInt32(_configuration["CatchUpSaveInterval"])); }); - var bootstrapServers = _configuration["Kafka:BootstrapServers"]; - var saslUsername = _configuration["Kafka:SaslUserName"]; - var saslPassword = _configuration["Kafka:SaslPassword"]; - - var maxRetryWaitIntervalSeconds = _configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"]; - var retryBackoffFactor = _configuration["RetryPolicy:RetryBackoffFactor"]; + var maxRetryWaitIntervalSeconds = _configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"]!; + var retryBackoffFactor = _configuration["RetryPolicy:RetryBackoffFactor"]!; builder .RegisterProjectionMigrator( _configuration, _loggerFactory) .RegisterProjections(c => - { - var osloNamespace = _configuration["BuildingOsloNamespace"].TrimEnd('/'); - - var topic = $"{_configuration[ProducerBuildingProjections.TopicKey]}" ?? throw new ArgumentException($"Configuration has no value for {ProducerBuildingProjections.TopicKey}"); - var producerOptions = new ProducerOptions( - new BootstrapServers(bootstrapServers), - new Topic(topic), - true, - EventsJsonSerializerSettingsProvider.CreateSerializerSettings()) - .ConfigureEnableIdempotence(); - if (!string.IsNullOrEmpty(saslUsername) - && !string.IsNullOrEmpty(saslPassword)) - { - producerOptions.ConfigureSaslAuthentication(new SaslAuthentication( - saslUsername, - saslPassword)); - } + { + var osloNamespace = _configuration["BuildingOsloNamespace"]!.TrimEnd('/'); + var producerOptions = CreateBuildingProducerOptions(); return new ProducerBuildingProjections( new Producer(producerOptions), @@ -114,7 +101,7 @@ private void RegisterProjections(ContainerBuilder builder) c.Resolve(), new OsloProxy(new HttpClient { - BaseAddress = new Uri(_configuration["BuildingOsloApiUrl"].TrimEnd('/')), + BaseAddress = new Uri(_configuration["BuildingOsloApiUrl"]!.TrimEnd('/')), }), SnapshotManagerOptions.Create( maxRetryWaitIntervalSeconds, @@ -123,41 +110,123 @@ private void RegisterProjections(ContainerBuilder builder) }, connectedProjectionSettings) .RegisterProjections(c => - { - var osloNamespace = _configuration["BuildingUnitOsloNamespace"].TrimEnd('/'); - - var topic = $"{_configuration[ProducerBuildingUnitProjections.TopicKey]}" ?? throw new ArgumentException($"Configuration has no value for {ProducerBuildingProjections.TopicKey}"); - var producerOptions = new ProducerOptions( - new BootstrapServers(bootstrapServers), - new Topic(topic), - true, - EventsJsonSerializerSettingsProvider.CreateSerializerSettings()) - .ConfigureEnableIdempotence(); - if (!string.IsNullOrEmpty(saslUsername) - && !string.IsNullOrEmpty(saslPassword)) { - producerOptions.ConfigureSaslAuthentication(new SaslAuthentication( - saslUsername, - saslPassword)); - } + var osloNamespace = _configuration["BuildingUnitOsloNamespace"]!.TrimEnd('/'); + var producerOptions = CreateBuildingUnitProducerOptions(); - var osloProxy = new OsloProxy(new HttpClient - { - BaseAddress = new Uri(_configuration["BuildingUnitOsloApiUrl"].TrimEnd('/')), - }); - - return new ProducerBuildingUnitProjections( - new Producer(producerOptions), - new SnapshotManager( - c.Resolve(), - osloProxy, - SnapshotManagerOptions.Create( - maxRetryWaitIntervalSeconds, - retryBackoffFactor)), - osloNamespace, - osloProxy); - }, + var osloProxy = new OsloProxy(new HttpClient + { + BaseAddress = new Uri(_configuration["BuildingUnitOsloApiUrl"]!.TrimEnd('/')), + }); + + return new ProducerBuildingUnitProjections( + new Producer(producerOptions), + new SnapshotManager( + c.Resolve(), + osloProxy, + SnapshotManagerOptions.Create( + maxRetryWaitIntervalSeconds, + retryBackoffFactor)), + osloNamespace, + osloProxy); + }, connectedProjectionSettings); } + + private void RegisterReproducers() + { + _services.AddAWSService(); + _services.AddSingleton(sp => + new NotificationService(sp.GetRequiredService(), + _configuration.GetValue("NotificationTopicArn")!)); + + var connectionString = _configuration.GetConnectionString("Integration"); + var utcHourToRunWithin = _configuration.GetValue("SnapshotReproducerUtcHour"); + + _services.AddHostedService(provider => + { + var producerOptions = CreateBuildingProducerOptions(); + + return new BuildingSnapshotReproducer( + connectionString!, + new OsloProxy(new HttpClient + { + BaseAddress = new Uri(_configuration["BuildingOsloApiUrl"]!.TrimEnd('/')), + }), + new Producer(producerOptions), + provider.GetRequiredService(), + provider.GetRequiredService(), + utcHourToRunWithin, + _loggerFactory); + }); + + _services.AddHostedService(provider => + { + var producerOptions = CreateBuildingUnitProducerOptions(); + + return new BuildingUnitSnapshotReproducer( + connectionString!, + new OsloProxy(new HttpClient + { + BaseAddress = new Uri(_configuration["BuildingUnitOsloApiUrl"]!.TrimEnd('/')), + }), + new Producer(producerOptions), + provider.GetRequiredService(), + provider.GetRequiredService(), + utcHourToRunWithin, + _loggerFactory); + }); + } + + private ProducerOptions CreateBuildingProducerOptions() + { + var bootstrapServers = _configuration["Kafka:BootstrapServers"]; + var saslUsername = _configuration["Kafka:SaslUserName"]; + var saslPassword = _configuration["Kafka:SaslPassword"]; + + var topic = $"{_configuration[ProducerBuildingProjections.TopicKey]}" ?? + throw new ArgumentException($"Configuration has no value for {ProducerBuildingProjections.TopicKey}"); + var producerOptions = new ProducerOptions( + new BootstrapServers(bootstrapServers!), + new Topic(topic), + true, + EventsJsonSerializerSettingsProvider.CreateSerializerSettings()) + .ConfigureEnableIdempotence(); + + if (!string.IsNullOrEmpty(saslUsername) + && !string.IsNullOrEmpty(saslPassword)) + { + producerOptions.ConfigureSaslAuthentication(new SaslAuthentication( + saslUsername, + saslPassword)); + } + + return producerOptions; + } + + private ProducerOptions CreateBuildingUnitProducerOptions() + { + var bootstrapServers = _configuration["Kafka:BootstrapServers"]; + var saslUsername = _configuration["Kafka:SaslUserName"]; + var saslPassword = _configuration["Kafka:SaslPassword"]; + + var topic = $"{_configuration[ProducerBuildingUnitProjections.TopicKey]}" ?? + throw new ArgumentException($"Configuration has no value for {ProducerBuildingProjections.TopicKey}"); + var producerOptions = new ProducerOptions( + new BootstrapServers(bootstrapServers!), + new Topic(topic), + true, + EventsJsonSerializerSettingsProvider.CreateSerializerSettings()) + .ConfigureEnableIdempotence(); + if (!string.IsNullOrEmpty(saslUsername) + && !string.IsNullOrEmpty(saslPassword)) + { + producerOptions.ConfigureSaslAuthentication(new SaslAuthentication( + saslUsername, + saslPassword)); + } + + return producerOptions; + } } } diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Startup.cs b/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Startup.cs index c26faf8c1..bf200ca29 100644 --- a/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Startup.cs +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Startup.cs @@ -57,7 +57,7 @@ public IServiceProvider ConfigureServices(IServiceCollection services) .GetSection("Cors") .GetChildren() .Select(c => c.Value) - .ToArray() + .ToArray()! }, Server = { @@ -77,7 +77,7 @@ public IServiceProvider ConfigureServices(IServiceCollection services) Url = new Uri("https://legacy.basisregisters.vlaanderen") } }, - XmlCommentPaths = new[] {typeof(Startup).GetTypeInfo().Assembly.GetName().Name} + XmlCommentPaths = [typeof(Startup).GetTypeInfo().Assembly.GetName().Name!] }, MiddlewareHooks = { @@ -87,14 +87,25 @@ public IServiceProvider ConfigureServices(IServiceCollection services) { var connectionStrings = _configuration .GetSection("ConnectionStrings") - .GetChildren(); + .GetChildren() + .ToArray(); - foreach (var connectionString in connectionStrings) + foreach (var connectionString in connectionStrings + .Where(x => !x.Value!.Contains("host", StringComparison.OrdinalIgnoreCase))) { health.AddSqlServer( - connectionString.Value, + connectionString.Value!, name: $"sqlserver-{connectionString.Key.ToLowerInvariant()}", - tags: new[] {DatabaseTag, "sql", "sqlserver"}); + tags: [ DatabaseTag, "sql", "sqlserver" ]); + } + + foreach (var connectionString in connectionStrings + .Where(x => x.Value!.Contains("host", StringComparison.OrdinalIgnoreCase))) + { + health.AddNpgSql( + connectionString.Value!, + name: $"npgsql-{connectionString.Key.ToLowerInvariant()}", + tags: [ DatabaseTag, "sql", "npgsql" ]); } health.AddDbContextCheck( diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/SnapshotReproducer.cs b/src/BuildingRegistry.Producer.Snapshot.Oslo/SnapshotReproducer.cs new file mode 100644 index 000000000..7db2aee4d --- /dev/null +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/SnapshotReproducer.cs @@ -0,0 +1,108 @@ +namespace BuildingRegistry.Producer.Snapshot.Oslo +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Be.Vlaanderen.Basisregisters.GrAr.Notifications; + using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer; + using Microsoft.Extensions.Hosting; + using Microsoft.Extensions.Logging; + using NodaTime; + + public abstract class SnapshotReproducer : BackgroundService + { + private readonly IOsloProxy _osloProxy; + private readonly IProducer _producer; + private readonly IClock _clock; + private readonly INotificationService _notificationService; + private readonly int _utcHourToRunWithin; + private readonly ILogger _logger; + + public SnapshotReproducer( + IOsloProxy osloProxy, + IProducer producer, + IClock clock, + INotificationService notificationService, + int utcHourToRunWithin, + ILoggerFactory loggerFactory) + { + _osloProxy = osloProxy; + _producer = producer; + _notificationService = notificationService; + _utcHourToRunWithin = utcHourToRunWithin; + _clock = clock; + + _logger = loggerFactory.CreateLogger(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + var now = _clock.GetCurrentInstant().ToDateTimeUtc(); + if (now.Hour == _utcHourToRunWithin) + { + _logger.LogInformation($"Starting {GetType().Name}"); + + try + { + //execute query + var idsToProcess = GetIdsToProcess(now); + + //reproduce + foreach (var id in idsToProcess) + { + await FindAndProduce(async () => + await _osloProxy.GetSnapshot(id.PersistentLocalId.ToString(), stoppingToken), + id.Position, + stoppingToken); + } + + await Task.Delay(TimeSpan.FromHours(1), stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, ex.Message); + + await _notificationService.PublishToTopicAsync(new NotificationMessage( + GetType().Name, + $"Reproducing snapshot failed: {ex}", + GetType().Name, + NotificationSeverity.Danger)); + } + } + + await Task.Delay(TimeSpan.FromMinutes(15), stoppingToken); + } + } + + protected async Task FindAndProduce(Func> findMatchingSnapshot, long storePosition, CancellationToken ct) + { + var result = await findMatchingSnapshot.Invoke(); + + if (result != null) + { + await Produce(result.Identificator.Id, result.Identificator.ObjectId, result.JsonContent, storePosition, ct); + } + } + + protected async Task Produce(string puri, string objectId, string jsonContent, long storePosition, CancellationToken cancellationToken = default) + { + var result = await _producer.Produce( + new MessageKey(puri), + jsonContent, + new List { new MessageHeader(MessageHeader.IdempotenceKey, $"{objectId}-{storePosition.ToString()}") }, + cancellationToken); + + if (!result.IsSuccess) + { + throw new InvalidOperationException(result.Error + Environment.NewLine + result.ErrorReason); //TODO: create custom exception + } + } + + protected abstract List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow); + } +} diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/appsettings.json b/src/BuildingRegistry.Producer.Snapshot.Oslo/appsettings.json index f8baefa85..8d35a5a51 100644 --- a/src/BuildingRegistry.Producer.Snapshot.Oslo/appsettings.json +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/appsettings.json @@ -1,6 +1,7 @@ { "ConnectionStrings": { "Events": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.BuildingRegistry;Trusted_Connection=True;TrustServerCertificate=True;", + "Integration": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.BuildingRegistry;Trusted_Connection=True;TrustServerCertificate=True;", "ProducerSnapshotProjections": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.BuildingRegistry;Trusted_Connection=True;TrustServerCertificate=True;", "ProducerSnapshotProjectionsAdmin": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.BuildingRegistry;Trusted_Connection=True;TrustServerCertificate=True;" }, @@ -16,6 +17,9 @@ "BuildingOsloApiUrl": "https://api.basisregisters.staging-vlaanderen.be/v2/gebouwen/", "BuildingUnitOsloApiUrl": "https://api.basisregisters.staging-vlaanderen.be/v2/gebouweenheden/", + "NotificationTopicArn": "", + "SnapshotReproducerUtcHour": 1, + "RetryPolicy": { "MaxRetryWaitIntervalSeconds": 3600, "RetryBackoffFactor": 5 diff --git a/src/BuildingRegistry.Producer.Snapshot.Oslo/paket.references b/src/BuildingRegistry.Producer.Snapshot.Oslo/paket.references index 5fea4ee76..89503a565 100644 --- a/src/BuildingRegistry.Producer.Snapshot.Oslo/paket.references +++ b/src/BuildingRegistry.Producer.Snapshot.Oslo/paket.references @@ -1,6 +1,7 @@ Be.Vlaanderen.Basisregisters.Api Be.Vlaanderen.Basisregisters.EventHandling.Autofac Be.Vlaanderen.Basisregisters.GrAr.Legacy +Be.Vlaanderen.Basisregisters.GrAr.Notifications Be.Vlaanderen.Basisregisters.GrAr.Oslo Be.Vlaanderen.BasisRegisters.MessageHandling.Kafka.Producer Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer @@ -9,7 +10,13 @@ Be.Vlaanderen.Basisregisters.Projector Microsoft.Extensions.DependencyInjection +AWSSDK.Extensions.NETCore.Setup + AspNetCore.HealthChecks.SqlServer +AspNetCore.HealthChecks.NpgSql + +Npgsql.EntityFrameworkCore.PostgreSQL +Dapper Datadog.Trace.Bundle