Skip to content

Commit

Permalink
feat: add snapshot reproducer
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal authored Oct 21, 2024
1 parent 53e6056 commit dc9be4d
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 66 deletions.
4 changes: 4 additions & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ source https://api.nuget.org/v3/index.json
// PRODUCTION STUFF
nuget Microsoft.Extensions.Http.Polly 8.0.3

// AWS
nuget AWSSDK.Extensions.NETCore.Setup 3.7.300

// For more healtchecks, look at https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks
nuget AspNetCore.HealthChecks.SqlServer 8.0.0
nuget AspNetCore.HealthChecks.NpgSql 8.0.0
Expand Down Expand Up @@ -78,6 +81,7 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Notifications 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1
Expand Down
12 changes: 11 additions & 1 deletion paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ NUGET
AutoFixture.Xunit2 (4.18.1)
AutoFixture (>= 4.18.1)
xunit.extensibility.core (>= 2.2 < 3.0)
AWSSDK.Core (3.7.302.20)
AWSSDK.Core (3.7.400.36)
AWSSDK.DynamoDBv2 (3.7.301.23)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
AWSSDK.Extensions.NETCore.Setup (3.7.300)
AWSSDK.Core (>= 3.7.300)
Microsoft.Extensions.Configuration.Abstractions (>= 2.0)
Microsoft.Extensions.DependencyInjection.Abstractions (>= 2.0)
Microsoft.Extensions.Logging.Abstractions (>= 2.0)
AWSSDK.S3 (3.7.307)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
AWSSDK.SimpleNotificationService (3.7.301.5)
AWSSDK.Core (>= 3.7.302.18 < 4.0)
AWSSDK.SQS (3.7.300.59)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
Azure.Core (1.38)
Expand Down Expand Up @@ -283,6 +290,9 @@ NUGET
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Notifications (21.14.1)
AWSSDK.SimpleNotificationService (>= 3.7.301.3)
System.Text.Json (>= 8.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.14.1)
Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json (>= 5.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace BuildingRegistry.Producer.Snapshot.Oslo
{
using System;
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Dapper;
using Microsoft.Extensions.Logging;
using NodaTime;
using Npgsql;

public class BuildingSnapshotReproducer : SnapshotReproducer
{
private readonly string _integrationConnectionString;

public BuildingSnapshotReproducer(
string integrationConnectionString,
IOsloProxy osloProxy,
IProducer producer,
IClock clock,
INotificationService notificationService,
int utcHourToRunWithin,
ILoggerFactory loggerFactory)
: base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory)
{
_integrationConnectionString = integrationConnectionString;
}

protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow)
{
using var connection = new NpgsqlConnection(_integrationConnectionString);

var todayMidnight = utcNow.Date;
var yesterdayMidnight = todayMidnight.AddDays(-1);

var records = connection.Query<BuildingPosition>(
$"""
SELECT building_persistent_local_id, position, version_timestamp
FROM integration_building.building_versions
where version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}' and version_timestamp < '{todayMidnight:yyyy-MM-dd}'
""");

var duplicateEvents = records
.GroupBy(x => new
{
BuildingPersistentLocalId = x.building_persistent_local_id,
TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp
})
.Where(x => x.Count() > 1)
.Select(x =>
{
var latest = x.MaxBy(y => y.position)!;
return (latest.building_persistent_local_id, latest.position);
})
.ToList();

return duplicateEvents;
}

private sealed class BuildingPosition
{
public int building_persistent_local_id { get; init; }
public long position { get; init; }
public DateTimeOffset version_timestamp { get; init; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace BuildingRegistry.Producer.Snapshot.Oslo
{
using System;
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Dapper;
using Microsoft.Extensions.Logging;
using NodaTime;
using Npgsql;

public class BuildingUnitSnapshotReproducer : SnapshotReproducer
{
private readonly string _integrationConnectionString;

public BuildingUnitSnapshotReproducer(
string integrationConnectionString,
IOsloProxy osloProxy,
IProducer producer,
IClock clock,
INotificationService notificationService,
int utcHourToRunWithin,
ILoggerFactory loggerFactory)
: base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory)
{
_integrationConnectionString = integrationConnectionString;
}

protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow)
{
using var connection = new NpgsqlConnection(_integrationConnectionString);

var todayMidnight = utcNow.Date;
var yesterdayMidnight = todayMidnight.AddDays(-1);

var records = connection.Query<BuildingUnitPosition>(
$"""
SELECT building_unit_persistent_local_id, position, version_timestamp
FROM integration_building.building_unit_versions
where
version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}'
and version_timestamp < '{todayMidnight:yyyy-MM-dd}'
""");

var duplicateEvents = records
.GroupBy(x => new
{
BuildingUnitPersistentLocalId = x.building_unit_persistent_local_id,
TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp
})
.Where(x => x.Count() > 1)
.Select(x =>
{
var latest = x.MaxBy(y => y.position)!;
return (latest.building_unit_persistent_local_id, latest.position);
})
.ToList();

return duplicateEvents;
}

private sealed class BuildingUnitPosition
{
public int building_unit_persistent_local_id { get; init; }
public long position { get; init; }
public DateTimeOffset version_timestamp { get; init; }
}
}
}
Loading

0 comments on commit dc9be4d

Please sign in to comment.